Programación reactiva: lidiando con la asincronía


Ricardo Sanz
sanzante / tunic

Ricardo Sanz
Drupal Developer & DevOps
drupal.org/u/tunic
sanzante

Programación reactiva
  • Una forma de gestionar la asincronía.
  • Esto NO es una charla sobre React.
  • Basado en la librería ReactiveX.
  • Utilizable en muchos lenguajes además de JavaScript: PHP, Java, Scala, C#, C++, Pyhton y otros.
Ventajas
  • Simplifica tremendamente el trabajo asíncrono.
  • Muy fácil combinar y gestionar fuentes de datos asíncronas.
  • Genera un código más corto, simple e intuitivo*.
  • Mantenabilidad.
La asincronía
  • Eventos que suceden en el tiempo sin previsión.
  • El entorno de un navegador es asíncrono.
  • Interacción del usuario (ratón, teclado, etc).
  • AJAX.
  • Temporizadores o animaciones.
  • WebSockets.
  • Workers.
Callbacks
  • Reacción simple ante un evento.
  • Llamadas a una función (listeners).
                   
    jQuery('#grip').click(function() {
      jQuery('#panel').slideUp();
    });
                   
                 
  • Programar respuestas complejas es complicado.
  • Callback hell.
Callback hell
              
var doc;
memStorage.get(docId, response => {
doc = memResponse;
  if (!memResponse) {
    localStorage.get(docId, localResponse => {
      doc = localResponse;
      if (!localResponse) {
        serverStorage.get(docId, serverResponse => {
          doc = serverResponse;
          if (!serverResponse) {
            // Error.
          }
      });
    }
  });
});
              
            
Promises
  • Objetos que envuelven operaciones asícronas y reaccionan cuando hay un resultado.
  • Son casi callbacks convertidas en objetos.
  • Mejoran enormemente la gestión del flujo y de los errores.
  • Incorporadas a ES2015 (ECMAScript 6).
Promises
  • Solo se resuelven una vez: devuelven un único valor.
  • Seguir el hilo puede no ser trivial.
  • No son diferidas (lazy).
  • No son cancelables.
Programación reactiva
  • Es una forma de llevar capacidades de programación funcional a la programación imperativa.
  • Programación usando flujos de datos asíncronos.
  • Declaración de caminos de datos que procesan y transforman los datos que los cruzan.
  • Se declaran 'reacciones' ante el cambio.
  • Parecido en cierta forma al renderizado de Angular, React y Vue.js.
Camino de datos
  • El camino que siguen unos ciertos datos.
  • Es un concepto abstracto, una idea, no una estructura de datos.
Camino simple
Ejemplo: un listener sobre un evento de click.
              
jQuery('#grip').click(function() {
  jQuery('#panel').slideUp();
});
              
            
Camino complejo
Ejemplo: Pasar de un click a una petición AJAX, y de ahí a unos objetos instanciados.
Observables/Streams
  • Observables del patrón Observador.
  • Flujos de datos (streams).
  • Emiten datos hasta que terminan o se produce un error.
Diágramas de cánicas
  • Representa los datos de un flujo a lo largo del tiempo.
  • Flujo que emite 4 elementos y luego finaliza.
  • Flujo que emite 3 elementos y luego emite un error.
Operadores
  • Encadenan streams.
  • Permiten realizar transformaciones a los datos.
  • Modulan los flujos.
  • Crean streams.
  • Combinan streams.
Operadores
Operador filter, filtra valores que no cumplen una condición.
Operadores
Operador map, transforma los datos del stream.
Ej: Cronómetro acumulativo
  • Cronómetro manual que va acumulando tiempo.
  • Un botón para activarlo.
  • Otro botón para pausarlo/continuar.
https://codepen.io/sanzante/pen/gzyMNx
              
var clockValue = 0;
var clockOn = true;

const btS$ = Rx.Observable.fromEvent(buttonStart, 'click');
const btP$ = Rx.Observable.fromEvent(buttonPause, 'click');
const clock$ = Rx.Observable.interval(1000);

btP$.subscribe(() => clockOn = !clockOn);

clock$.do(x => output1.innerHTML = x)
  .combineLatest(btS$, v => v)
  .filter(() => clockOn)
  .subscribe(() => output2.innerHTML = ++clockValue);
              
            
Diagrama de eventos

  clock$  ---1---2---3---4---5---6---7---8---9------>
                    do
          ---1---2---3---4---5---6---7---8---9------>
  btS$    --------x--------------------------------->
                  combineLatest
          --------2--3---4---5---6---7---8---9------>
                  filter (clockOn)
          --------2--3---4---6-----------8---9------>

  btP$    ---------------------x--------x----------->
  clockV  000000001112222333344444444444455556666--->
  clockOn 111111111111111111111000000000011111111--->

            
Decathlon
¡Machacabotones!
Machacabotonismo
            
var button = document.querySelector('.this');
var clickStream = Rx.Observable.fromEvent(button, 'click');
var text = document.querySelector('h2');
var multiClickStream = clickStream
    .buffer(() => Rx.Observable.interval(660))
    .map(list => list.length)
    .filter(x => x >= 2);

multiClickStream.subscribe(numclicks => {
    text.textContent = '' + numclicks + 'x click';
});

multiClickStream.throttle(1000)
    .subscribe(suggestion => {
      document.querySelector('h2').textContent = ''
    });

            
          
Pedir URL e instanciar
            
Rx.Observable.fromEvent(button, 'click')
  .map(() => 'https://ejemplo.es/api/clasificacion'
  .flatMap(url => http.get(url))
  .map(items => items.map(new MyItem(item)))
  .subscribe(instancias => this.list = instancias);
            
          
Pedir URL e instanciar
Con refresco automático
            
Rx.Observable.fromEvent(button, 'click')
  .merge(Rx.Observable.interval(60000))
  .map(() => 'https://ejemplo.es/api/clasificacion'
  .flatMap(url => http.get(url))
  .map(items => items.map(new MyItem(item)))
  .subscribe(instancias => this.list = instancias);
            
          
Pedir URL e instanciar
Con refresco automático desactivable
            
Rx.Observable.interval(60000)
  .filter(() => this.autorefresh)
  .merge(Rx.Observable.fromEvent(button, 'click'))
  .map(() => 'https://ejemplo.es/api/clasificacion'
  .flatMap(url => http.get(url))
  .map(items => items.map(new MyItem(item)))
  .subscribe(instancias => this.list = instancias);
            
          
Pedir URL e instanciar
Con refresco automático desactivable y gestión de errores.
            
Rx.Observable.interval(60000)
  .filter(() => this.autorefresh)
  .merge(Rx.Observable.fromEvent(button, 'click'))
    .map(() => 'https://ejemplo.es/api/clasificacion'
  .flatMap(url => http.get(url))
  .map(items => items.map(new MyItem(item)))
  .catch(err => Rx.Observable.of([]))
  .subscribe(instancias => this.list = instancias);
            
          
Notas sobre Observables
  • No son EventEmitter.
  • Pueden ser asíncronos y síncronos.
  • Pueden ser cold o hot.
  • Pueden ser multicast o unicast.
  • Pueden compartir o no el origen de datos con sus subscriptores.
¿A quién seguir?
Ejemplo de https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
  • Al iniciarse, lee datos de la API y muestra 3 sugerencias.
  • Al clicar 'Refresh' carga 3 nuevas sugerencias en las 3 filas.
  • Al clicar en la 'x' de una fila, elimina esa sugerencia y carga otra solo en esa fila.
  • En cada fila hay un avatar y un enlace a la cuenta.
¿A quién seguir?
¿Por qué adoptarla?
Porque otros ya lo han hecho
¿Por qué adoptarla?
  • Pueden modelizar casi cualquier cosa.
  • Trabajar con cualquier fuente de datos.
  • TC39 Observables: propuesta para estándar.
  • Menos código, mejora de mantenimiento.
  • Disponible en muchos lenguajes.
  • Asincronía.
Enlaces
  • The introduction to Reactive Programming you've been missing: https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
  • Diagramas de canicas interactivos: http://rxmarbles.com/
  • Rx Visualizer: https://rxviz.com/
  • ReactiveX: http://reactivex.io/
  • Ben Lesh: https://medium.com/@benlesh/
¿Preguntas?
¡Gracias!