Tutorial de ReactiveX – Empezando con los observables

Que es ReactiveX

ReactiveX es una librería basada en eventos y que se apoya siguiendo el patrón observador.

Esta librería se puede usar tanto en Backend como en Frontend y esta disponible en diferentes lenguajes de programación, además nos facilita el uso de excepciones y asincronía.

En ReactiveX podemos encontrar tres conceptos muy importantes y que forman la base de la programación reactiva:

  • Observables: Es el origen de nuestra información, encargándose de la emisión de eventos pudiendo ser síncronos o asíncronos.
  • Subscribers: Estos «elementos» se suscriben a un observable, y reciben la información de los observables.
  • Operators: ReactiveX trata los eventos como si fueran una colección de datos, de tal forma que los operadores nos permiten manipular esa colección de datos.

Nuestra primera suscripción

Ahora que sabemos que es un observable, veremos como realizar nuestra primera suscripción.

// Importamos los observables de la librería ReactiveX
import { Observable } from 'rxjs';

// Declaramos nuestro observador
const observer$ = new Observable<string>(subscriber => {

  // Emitimos estos valores a nuestro suscriptor
  subscriber.next('1'); 
  subscriber.next('2');
  subscriber.next('3');
  subscriber.next('4');

  // Cuando usamos complete, ya deja de emitir
  subscriber.complete(); 

  // Esto ya no se emite
  subscriber.next('5');
  subscriber.next('6');
  subscriber.next('7');
});

// Nos suscribimos al observador
observer$.subscribe(response => console.log(response));

El funcionamiento es muy sencillo:

  1. Importamos los observables de ReactiveX.
  2. Declaramos nuestro observable incluyendo el tipo de dato que nos devolverá.
  3. Nos suscribimos a nuestro observable, ejecutando de esta manera el código dentro de nuestro observable.

En el momento de suscribirnos a nuestro observable se ejecuta el código, pero… ¿y si ocurriese un error durante la ejecución de nuestro código, o necesitásemos saber que ha terminado de emitir valores?

Bien, cuando nos suscribimos a un observable este recibe tres argumentos, el primero es obligatorio ya que es el valor que emite, pero los dos siguientes son opcionales y nos permiten saber si ha ocurrido algún error y si ha finalizado la emisión de eventos.

// Importamos los observables de la librería ReactiveX
import { Observable } from 'rxjs';

// Declaramos nuestro observador
const observer$ = new Observable<string>(subscriber => {

  // Emitimos estos valores a nuestro suscriptor
  subscriber.next('1'); 
  subscriber.next('2');
  subscriber.next('3');
  subscriber.next('4');

  // Vamos a generar un error aleatorio y probar el callback err
  const error = Math.floor(Math.random() * 100);
  console.log(error < 50 ? `Error < 50.... ${error}` : 'Success');
  if (error < 50) {
    throw new Error(`Ocurrio el error.... ${error}`);
  }

  // Cuando usamos complete, ya deja de emitir
  subscriber.complete(); 

  // Esto ya no se emite
  subscriber.next('5');
  subscriber.next('6');
  subscriber.next('7');
});

// Nos suscribimos al observador
// $.subscribe(response, error, complete);
observer$.subscribe(response => console.log(response), err => console.error(err), () => console.log('Finalizado'));

Cuando nos suscribimos a un observable también podemos usar un objeto con la configuración.

// Importamos los observables de la librería ReactiveX
import { Observable, Observer } from 'rxjs';

// Declaramos nuestro observador
const observer$ = new Observable<string>(subscriber => {

  // Emitimos estos valores a nuestro suscriptor
  subscriber.next('1'); 
  subscriber.next('2');
  subscriber.next('3');
  subscriber.next('4');

  // Vamos a generar un error aleatorio y probar el callback err
  const error = Math.floor(Math.random() * 100);
  console.log(error < 50 ? `Error < 50.... ${error}` : 'Success');
  if (error < 50) {
    throw new Error(`Ocurrio el error.... ${error}`);
  }

  // Cuando usamos complete, ya deja de emitir
  subscriber.complete(); 

  // Esto ya no se emite
  subscriber.next('5');
  subscriber.next('6');
  subscriber.next('7');
});

// Nos suscribimos al observador
// $.subscribe(response, error, complete);
// observer$.subscribe(response => console.log(response), err => console.error(err), () => console.log('Finalizado'));

// Podemos definir un objeto de tipo Observer y pasarlo como argumento
const observerOBJ: Observer<string> = {
  next: response => console.log(response),
  error: err => console.log(err),
  complete: () => console.log('Finalizado')
};
observer$.subscribe(observerOBJ);

Cancelando la suscripción

Ya sabemos como suscribirnos, pero.. ¿y si quisiéramos cancelar la suscripción?.

Para ello el observable nos ofrece otro método llamado unsubscribe, el cual nos permite cancelar la subcripción y evitar problemas con la memoria.

// Importamos los observables de la librería ReactiveX
import { Observable, Observer } from 'rxjs';

const observerOBJ: Observer<number> = {
  next: response => console.log('response', response),
  error: err => console.log(err),
  complete: () => console.log('Finalizado')
};

const counter$ = new Observable<number>(subscriber => {
  let cont = 0;
  const count = setInterval(() => {
    console.log('setInterval', cont);
    subscriber.next(cont++);
  }, 200);

  // Este return es lo que devuelve nuestro observable cuando cancelemos la suscripción (unsubscribe())
  return (() => {
    // Aqui eliminamos el ID del método setInterval(), si no continuaría ejecutándose aunque ya no estemos suscritos
    clearInterval(count);
  })
});

const subscription = counter$.subscribe(observerOBJ);

setTimeout(() => subscription.unsubscribe(), 5000)

A continuación veremos el flujo que sucede en el código anterior:

  1. Nos suscribimos y asignamos a una variable la suscripción que nos devuelve ese observable.
  2. El código del observable empieza a ejecutarse, en este caso un setInterval, emitiendo un valor cada x milisegundos.
  3. Fuera del observable tenemos esperando a ejecutarse la cancelación de la suscripción cuando pasen 5 segundos.
  4. Cuando se realice la cancelación de la suscripción entonces dejaremos se escuchar los valores que se emiten, pero el método setInterval continuara ejecutándose.
  5. Para ello dentro del observable devolveremos una función que se ejecutara cuando se cancele (pasa antes por complete) la suscripción, y dentro se podrá ejecutar nuestro código.

Otro detalle a tener en cuenta es que si dentro de nuestro observable ejecutásemos el método complete de nuestro subscriptor entonces la emisión de eventos finalizaría.

// Importamos los observables de la librería ReactiveX
import { Observable, Observer } from 'rxjs';

const getDate = () => new Intl.DateTimeFormat('default', {
  hour: 'numeric',
  minute: 'numeric',
  second: 'numeric'
}).format(new Date());

const observerOBJ: Observer<number> = {
  next: response => {
    console.log(`[${getDate()}] - response: ${response}`);
  },
  error: err => {
    console.log(`[${getDate()}] - error: ${err}`);
  },
  complete: () => {
    console.log(`[${getDate()}] - Finalizado`);
  }
};

const counter$ = new Observable<number>(subscriber => {
  let cont = 0;
  const count = setInterval(() => {
    console.log(`[${getDate()}] - setInterval: ${cont}`);
    subscriber.next(cont++);
  }, 1000);

  // Se ejecuta el método complete() antes de cancelar la suscripción, y en este momento dejaría de emitir
  setTimeout(() => {
    console.log(`[${getDate()}] - Ya no se emite, complete() setTimeout()`);
    subscriber.complete();
  }, 5000);
  
  /* 
    Este return es lo que devuelve nuestro observable cuando cancelemos la suscripción (unsubscribe()),
    ya que finaliza y ejecuta el método complete()
  */
  return (() => {
    // Aqui eliminamos el ID del método setInterval(), si no continuaría ejecutándose aunque ya no estemos suscritos
    clearInterval(count);
  })
});

const subscription = counter$.subscribe(observerOBJ);

setTimeout(() => {
  console.log(`[${getDate()}] - UNSUBSCRIBE`);
  subscription.unsubscribe();
}, 12000);

Como siempre todo el código del proyecto lo podéis encontrar en Github.