Tutorial de ReactiveX – Los operadores en ReactiveX

Los operadores en ReactiveX

Los operadores son una parte importante dentro de la librería ReactiveX ya que nos permiten realizar múltiples operaciones con el flujo de datos.

Tipos de operadores

ReactiveX nos ofrece un listado de operadores que están ordenados por grupos según su finalidad.

A continuación veremos el listado de los diferentes grupos que existen:

    • Creación (Creation): Se originan nuevos observables.
    • Transformación (Transformation): Podemos realizan transformaciones.
    • Filtro (Filter): Podemos realizar una emisión selectiva de los elementos.
    • Combinación (Combination): Permite trabajar con múltiples fuentes en un observable único.
    • Error (Error Handling): Podemos recuperar notificaciones de error en un observable.
    • Utilidades (Utility): Múltiples utilidades para trabajar con observables.
    • Condicional (Conditional): Permiten evaluar los valores emitidos.
    • Matemáticos (Mathematical): Podemos operar la secuencia completa emitida por un observable.
    • Conversión (Converting): Podemos convertir un observable en otra estructura de datos.
    • Multidifusión (Multicasting): Podemos realizar una multidifusión y que los efectos secundarios se repartan entre múltiples suscriptores.

Debemos saber que tanto para un operador, como para múltiples operadores haremos uso de la función pipe().

Como recordatorio y antes de comenzar, en gitHub podéis acceder a todo el código.

Ahora si, comencemos!!!

Operadores y ejemplo de uso

map: Aplica una función por cada valor emitido.

import { from } from 'rxjs'; 
import { map } from 'rxjs/operators';

const arr = from([1, 2, 3, 4, 5]);

// [1, 2, 3, 4, 5] => 100, 200, 300, 400, 500
const obs$ = arr.pipe(map(data => data * 100));

obs$.subscribe(console.log);

mapTo: Es similar al operador map pero emite siempre un valor fijo.

import { from } from 'rxjs'; 
import { mapTo } from 'rxjs/operators';


const cfg = {
  next: response => {
    console.log(response);
  },
  error: err => {
    console.log(`Error: ${err}`);
  },
  complete: () => {
    console.log(`Finalizado`);
  }
};
  
// Creamos nuestro observable desde un array gracias a 'from'
const data = from([1, 2, 3, 4, 5, 6, 7, 8, 9]);

const obs$ = data.pipe(mapTo('Valor leido'));

const subscribe = obs$.subscribe(cfg);

filter: Es parecido al operador map, salvo que filter  nos permite filtrar los valores que se emiten.

import { fromEvent } from 'rxjs'; 
import { filter } from 'rxjs/operators';


const cfg = {
  next: response => {
    console.log(response);
  },
  error: err => {
    console.log(`Error: ${err}`);
  },
  complete: () => {
    console.log(`Finalizado`);
  }
};

const keyboard = fromEvent<KeyboardEvent>(document, 'keydown');

// Filtramos para que solo emita el valor cuando la tecla pulsada se encuentre entre la 'a' y la 'z'
const obs$ = keyboard.pipe(filter(({keyCode}) => keyCode >= 65 && keyCode <= 90));

obs$.subscribe(cfg);

pluck: Emite el valor de la propiedad que seleccionemos del objeto de entrada.

import { from } from 'rxjs'; 
import { pluck } from 'rxjs/operators';

(async () => {
  const cfg = {
    next: response => {
      console.log(response);
    },
    error: err => {
      console.log(`Error: ${err}`);
    },
    complete: () => {
      console.log(`Finalizado`);
    }
  };
  const urlPost = 'https://jsonplaceholder.typicode.com/posts';
  
  const response = await (await fetch(urlPost)).json();
  /*
   Estructura del objeto recibido:
   {
     userId,
     id,
     title,
     body
   }
  */
  
  // Creamos nuestro observable desde un array gracias a 'from'
  const data = from(response);

  // Filtramos solo la propiedad 'title'
  const obs$ = data.pipe(pluck('title'));

  // Muestra solo el 'title'
  const subscribe = obs$.subscribe(cfg);

})();

tap: Usaremos este operador para realizar cualquier acción transparente sin que afecte al flujo actual.

import { fromEvent } from 'rxjs'; 
import { pluck, tap } from 'rxjs/operators';

const cfg = {
  next: response => {
    console.log(response);
  },
  error: err => {
    console.log(`Error: ${err}`);
  },
  complete: () => {
    console.log(`Finalizado`);
  }
};

let text: string[] = [];
let word: string = '';
const obs$ = fromEvent<KeyboardEvent>(document, 'keydown');

obs$.pipe(
  tap(data => {
    console.log('Antes de "pluck", este operador lo usaremos para informar y saber que entra');
    console.log(data);
  }),
  pluck('key'),
  tap(data => {
      console.log('Despues de de "pluck", ahora vamos a guardar la información modificada');
      if (data === 'Enter') {
        text.push(word);
        word = '';
        console.log(text);
      } else {
        word += data;
      }
    })
).subscribe(cfg);

reduce: Reduce los valores a un valor único emitiéndose cuando este completado.

import { from } from 'rxjs'; 
import { reduce } from 'rxjs/operators';

const cfg = {
  next: response => {
    console.log(response);
  },
  error: err => {
    console.log(`Error: ${err}`);
  },
  complete: () => {
    console.log(`Finalizado`);
  }
};

const obs$ = from([1, 2, 3, 4, 5, 6, 7]).pipe(reduce((acc, value) => acc + value, 0));

// 1 + 2 +3 + 4 + 5 + 6 + 7 => 28
obs$.subscribe(cfg); // 28

groupBy: Se agrupan los resultados en observables según el valor proporcionado.

import { from } from 'rxjs'; 
import { groupBy, mergeMap, toArray } from 'rxjs/operators';

(async () => {
  const cfg = {
    next: response => {
      console.log(response);
    },
    error: err => {
      console.log(`Error: ${err}`);
    },
    complete: () => {
      console.log(`Finalizado`);
    }
  };
  
  interface dataResponse {
    postId: number;
    id: number;
    name: string;
    email: string;
    body: string;
  }
  // Estructura del objeto recibido: { postId, id, name, email, body } 
  const urlPost: string = 'https://jsonplaceholder.typicode.com/comments';
  const response: dataResponse[] = await (await fetch(urlPost)).json();
  const data = from(response);

  const obs$ = data.pipe(
    groupBy(values => values.postId),
    mergeMap(group => group.pipe(toArray()))
  ).subscribe(cfg);

})();

take: Nos permite emitir un determinado número de veces un valor.

import { fromEvent } from 'rxjs';
import { take, pluck } from 'rxjs/operators';

const MAX_CHAR = 10;

const keyboard$ = fromEvent(document, 'keydown').pipe(
  take(MAX_CHAR),
  pluck('code')
);

const subscribe = keyboard$.subscribe({
  next: (data) => {
    console.log('TECLA...', data);
  },
  error: (err) => {
    console.log('ERR', err);
  },
  complete: () => {
    console.log('END');
  }
});

distinct: Nos permite emitir los elementos que sean distintos.

import { from } from 'rxjs';
import { distinct } from 'rxjs/operators';
interface idata {
  id: number,
  name: string,
  age: number
}

const data: idata[] = [
  { id: 0, name: 'a', age: 10, },
  { id: 1, name: 'a', age: 20, },
  { id: 2, name: 'b', age: 30, },
  { id: 3, name: 'c', age: 40, },
  { id: 4, name: 'c', age: 50, },
];

const obs$ = from(data).pipe(
  distinct((data:idata) => data.name)
)
obs$.subscribe({
  next: (data) => {
    console.log(data);
    /*
      {id: 0, name: "a", age: 10}
      {id: 2, name: "b", age: 30}
      {id: 3, name: "c", age: 40}
    */
  },
  error: (err) => {
    console.log('ERR', err);
  },
  complete: () => {
    console.log('END');
  }
});

Con el fin de evitar que la entrada se extienda demasiado (ya que tenemos decenas de operadores), en la próxima entrada del blog sobre ReactiveX continuaremos viendo mas operadores y ejemplos de uso.