Création d'un Observable

Il est rare de devoir créer un Observable "from scratch" car Angular fournit généralement des "wrappers" à base d'Observables pour la plupart des sources de données asynchrones (http, forms, route changes etc...) mais il est intéressant de s'y aventurer au moins une fois pour mieux en comprendre le fonctionnement.

Observer

L'exemple suivant :

import { Observable } from 'rxjs';

const data$ = new Observable(observer => {

    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();

});

data$.subscribe({
    next: value => console.log(value),
    error: err => console.error(err),
    complete: () => console.log('DONE!')
});

... produit le résultat :

1
2
3
DONE!

Vous remarquerez quelques similitudes avec la création d'une Promise. Le duo de paramètres resolve et reject est remplacé par un objet Observer disposant des méthodes next, error, complete.

En omettant l'appel à la méthode complete, nous produisons un Observable infini et risquons des fuites mémoire si l'on oublie d'unsubscribe.

Erreur

La méthode error permet de déclencher une erreur.

import { Observable } from 'rxjs';

const data$ = new Observable(observer => {

    observer.next(1);
    observer.next(2);
    observer.error(new Error('Oups!'));
    observer.next(3);
    observer.complete();

});

data$.subscribe({
    next: value => console.log(value),
    error: error => console.error(error.toString()),
    complete: () => console.log('DONE!')
});
1
2
Error: Oups!

Une fois les méthodes error ou complete appelées, les appels suivants aux méthodes next, error et complete sont simplement ignorés.

"Teardown logic"

Voici un exemple d'implémentation de la fonction RxJS interval utilisée précédemment.

import { Observable } from 'rxjs';

const interval = period => new Observable(observer => {

    let i = 0;

    setInterval(() => observer.next(i++), period);

});

const data$ = interval(1000);

const subscription = data$.subscribe({
    next: value => console.log(value),
    error: error => console.error(error.toString()),
    complete: () => console.log('DONE!')
});

/* Unsubscribe after 5 seconds. */
setTimeout(() => subscription.unsubscribe(), 5000);

Cela produit le résultat suivant :

5 secondes après la "subscription", la "callback" next ne reçoit plus de valeurs. Par contre, NodeJS ne rend pas la main car le setInterval continue à tourner (et émettre des valeurs ignorées) en tâche de fond malgré l'appel à unsubscribe. Il s'agit d'une fuite.

Pour remédier à ce problème, il faudrait appeler la fonction clearInterval dès que :

  • une erreur est détectée,

  • la méthode complete est appelée,

  • le consommateur de l'Observable appelle la méthode unsubscribe.

Il faut donc implémenter une sorte de destructeur d'Observable.

Ce destructeur est appelé "Teardown Logic" et doit être retourné par la fonction de subscribe passé en paramètre au constructeur de l'Observable.

import { Observable } from 'rxjs';

const interval = period => new Observable(observer => {

    let i = 0;

    const handler = setInterval(() => observer.next(i++), period);

    return () => clearInterval(handler);

});

const data$ = interval(1000);

const subscription = data$.subscribe({
    next: value => console.log(value),
    error: error => console.error(error.toString()),
    complete: () => console.log('DONE!')
});

/* Unsubscribe after 5 seconds. */
setTimeout(() => subscription.unsubscribe(), 5000);

Le problème est ainsi résolu car clearInterval sera appelé peu importe l'origine de l'interruption : unsubscribe, error ou complete.

Dernière mise à jour