← blog

Arquitectura de feeds tiempo-real con Storm

Uno de los productos de código abierto más interesantes que ha salido en los últimos meses es Storm, programado por Nathan Marz de Backtype (ahora en Twitter). Este vídeo de presentación, altamente recomendable, nos explica los casos de uso básicos de Storm y el por qué de la necesidad de este framework. Tambień disponemos de una wiki muy buena donde podemos aprender todo tipo de cosas sobre Storm.

En este artículo voy a presentar un caso de uso práctico que podemos implementar en Storm y voy a discutir por qué Storm es una muy buena herramienta para éste caso.

“Real-time feed processing”

El problema es el siguiente. Tenemos una cantidad masiva de datos que “parsear” de diferentes fuentes de datos, por ejemplo de archivos “feed” XML. Este caso de uso lo encontramos, por ejemplo, en buscadores verticales. En este post ya hemos discutido cómo implementar una arquitectura “batch” escalable para procesar y crear un índice para estos datos. ¿Pero qué sucede si queremos actualizar los datos en tiempo real, por ejemplo alimentándonos de feeds incrementales que cambian muy frecuentemente?

Solución sencilla monopuesto

Una forma sencilla de atacar este problema sería teniendo un proceso que actualiza los feeds contínuamente. El problema de esta solución es que está totalmente limitada a la capacidad de proceso de una sola máquina.

Solución escalable compleja

Si una sola máquina no sirve, podemos poner N máquinas y replicar el proceso. Por ejemplo, haciendo “mod hashing” en cada máquina sobre el ID del feed (feedId % n). De esta manera cada máquina se encargará de actualizar sólo un subconjunto de los feeds. Esta solución es escalable, pero presenta varios problemas:

  • El escalado es totalmente manual. Si queremos escalar, tenemos que encargarnos de desplegar nosotros mismos un nuevo servicio.
  • Es una arquitectura sin fail-over, o con fail-over manual. Si una máquina cae, un subconjunto de los feeds dejará de ser actualizado.

Cola / Workers

Una parte de los problemas que presenta la solución 2 puede ser solventada implementando un servicio “máster” que provée trabajo a servicios “slave” a través de una cola. De esta manera, si uno de los servicios cae, los demás pueden seguir operando sobre todo el conjunto de feeds.

Se trata de un patrón común “cola / workers”. Este patrón se aplica comúnmente a muchos problemas.

Este patrón presenta ciertas complicaciones inherentes: ¿Qué tipo de cola usar? ¿Cómo administrar / desplegar la cola?

¿Hadoop al rescate?

Otra posible solución pasaría por implementar la actualización de los feeds en Hadoop. Programaríamos un Map/Reduce que emite los feeds en la fase Map y los recibe en la fase Reduce. De este modo cada Reducer tendría un subconjunto de los Feeds y podría operar sobre ellos. Sin embargo, el tiempo del Map/Reduce es el tiempo que tarda en acabar el Reducer más lento, con lo cual tendríamos tiempos de ciclo inciertos y la parte “real-time” de la arquitectura sería algo dudosa.

Workers multi-nivel

Otro problema que nos podría asaltar es el siguiente. Imaginemos que tenemos N procesos actualizando feeds pero no queremos que todos ellos actualicen la base de datos directamente. Razones por las que podríamos querer hacer esto: eficiencia, localidad, configuración de nuestra base de datos… En este caso necesitamos otro conjunto de procesos M (donde M < N) que serán los encargados de actualizar la base de datos, haciendo buffering previo, etc. ¿Cómo los comunicamos? A través de otra cola. Las cosas se vuelven cada vez más complicadas.

¿Acaso no existirá una plataforma que nos permita desplegar un sistema de Workers multi-nivel sin las complicaciones inherentes a este tipo de sistemas?

¡Este sistema existe y se llama Storm!

Real-time feed processing con Storm

Storm es un framework que abstrae las complicaciones inherentes a cualquier sistema “cola / workers”. Es una generalización de este tipo de arquitecturas que nos permite escribir “topologías” de proceso en tiempo real sin tener que preocuparnos de cómo escalar, cómo hacer fail-over y de detalles internos como la comunicación entre procesos.

Storm tiene un nodo máster, Nimbus y N nodos esclavos (Supervisor). Implementa coordinación a través de Zookeeper, comunicación entre procesos con 0MQ y permite implementar topologías en Java e incluso algunos lenguajes dinámicos.

Para implementar la topología de feeds en tiempo real con Storm definiremos un Spout (FeedSpout) y un Bolt (FetcherBolt). En Storm, los Spout son los procesos encargados de “generar trabajo” mientras los Bolt pueden hacer cualquier tipo de operación sobre el trabajo en sí. En este caso vamos a hacer que FeedSpout emita feeds a procesar (URLs) y FetcherBolt se encargue de descargárselos y parsearlos.

Veamos el código de FeedSpout:

public class FeedSpout extends SimpleSpout {

  private static final long serialVersionUID = 1L;
  Queue<String> feedQueue = new LinkedList<String>();
  String[] feeds;

  public FeedSpout(String[] feeds) {
    this.feeds = feeds;
  }

  @Override
  public void nextTuple() {
    String nextFeed = feedQueue.poll();
    if(nextFeed != null) {
      collector.emit(new Values(nextFeed), nextFeed);
    }
  }

  @Override
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    super.open(conf, context, collector);
    for(String feed: feeds) {
      feedQueue.add(feed);
    }
  }

  @Override
  public void ack(Object feedId) {
    feedQueue.add((String)feedId);
  }

  @Override
  public void fail(Object feedId) {
    feedQueue.add((String)feedId);
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("feed"));
  }
}

Para simplificar este ejemplo, hemos hecho que FeedBolt reciba la lista completa de feeds que actualizar por constructor, aunque estos feeds podrían estar en una base de datos o en cualquier otro lado. Hay una cola en memoria, y de esta cola se van sacando uno a uno los feeds que mandar. Es Storm con su capa de control el que decide cuándo hay que llamar al método nextTuple(). Podemos configurar cosas como, por ejemplo, cuántos mensajes pendientes pueden quedar por procesar después de que este método se haya llamado, como máximo. De esta manera las colas intermedias nunca se van a saturar.

Por otro lado, los Spout reciben ACKS y FAILS. Este mecanismo se llama “Reliability API” y nos permite saber si un determinado mensaje se ha procesado o por el contrario ha fallado. En este caso estamos usando esta API para tener feedback de los Bolts y saber cuándo un feed puede volver a ser procesado.

Veamos ahora el FetcherBolt:

public class FetcherBolt implements IRichBolt {

  private static final long serialVersionUID = 1L;
  private OutputCollector collector;

  @Override
  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
  }

  @Override
  public void execute(Tuple input) {
    FeedFetcher feedFetcher = new HttpURLFeedFetcher();
    String feedUrl = input.getStringByField("feed");
    try {
      SyndFeed feed = feedFetcher.retrieveFeed(new URL(feedUrl));
      for(Object obj : feed.getEntries()) {
        SyndEntry syndEntry = (SyndEntry) obj;
        Date entryDate = getDate(syndEntry, feed);
        collector.emit(new Values(syndEntry.getLink(), entryDate.getTime(), syndEntry.getDescription().getValue()));
      }
      collector.ack(input);
    } catch(Throwable t) {
      t.printStackTrace();
      collector.fail(input);
    }
  }

  private Date getDate(SyndEntry syndEntry, SyndFeed feed) {
    return syndEntry.getUpdatedDate() == null ? (syndEntry.getPublishedDate() == null ? feed.getPublishedDate() : syndEntry.getPublishedDate()) : syndEntry.getUpdatedDate();
  }

  @Override
  public void cleanup() {
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("link", "date", "description"));
  }
}

En este caso el Bolt es muy sencillo, tan sólo recibe un feed que procesar, se lo descarga y lo parsea. Finalmente emite “listados” (tuplas con campos “link”, “date” y “description”). Estos listados podrían ser procesados por otra capa de Bolts, por ejemplo para calcular estadísticas en tiempo real o para guardarlos en una base de datos.

También vemos cómo usamos la “Reliability API” para comunicar el éxito o fracaso en el procesamiento de un feed (métodos ack() y fail()). ¿Cómo funciona exactamente? Storm tiene unos procesos llamados “ackers” que guardan el árbol de ejecución de una tupla. Si queremos que esto suceda, tenemos que hacer “anchoring” (emitir la tupla asociada con un Id). Cuando la tupla falla en algún Bolt, Storm sabe a qué Spout mandar el mensaje de FAIL. También podría ocurrir que la tupla hiciera “timeout” (que por defecto es 30 segundos), lo cual también mandaría un FAIL al Spout originario. Con este mecanismo podemos implementar políticas sofisticadas de fail-over.

Finalmente lo que nos falta por definir es la topología misma (FeedTopology).

El código de este ejemplo lo podeis encontrar aquí y puede ser ejecutado contra una lista de feeds XML de Craiglist. En este proyecto, además, he incluído un Bolt más (ListingBolt) que calcula los 10 listings más recientes parseados por FetcherBolt. Para ejecutar el proyecto sólo hay que hacer git clone y mvn install. Se ejecutará la topología durante unos segundos y se imprimirá por pantalla los 10 listings más recientes, actualizados cada segundo.

Conclusiones

Hemos visto qué es Storm y hemos explicado un caso concreto de uso. Este caso de uso resultaría complicado de implementar sin Storm ya que tendríamos que encargarnos de desplegar una arquitectura “cola / workers” potencialmente multi-nivel, con todas las complejidades que ello acarrea. Con Storm, todo lo que tenemos que hacer es definir la topología y ejecutarla en un clúster. Podemos usar este script de despliegue para ello. Si queremos escalar nuestra solución, tan sólo tenemos que añadir más máquinas al clúster.

Storm es a la computación en tiempo real lo que Hadoop es a la computación en “batch”.

Finalmente, decir que en este proyecto hay varios ejemplos de uso muy representativos y útiles.



4 Comentarios en "Arquitectura de feeds tiempo-real con Storm"

  1. Gran artículo!

    Cómo lo probaste, pudiste utilizar un clúster o hiciste pruebas en local ?

  2. Hola Marc,
    Este ejemplo concreto no lo he ejecutado en el cloud. Pero he hecho algunos benchmarks sencillos para Distributed RPC en un clúster de 3 máquinas en EC2 usando este script de deploy: https://github.com/nathanmarz/storm-deploy
    El deploy funciona a la perfección, ninguna queja. En cuanto al funcionamiento de Storm en un clúster, la interfaz web del Nimbus es especialmente útil ya que te muestra la latencia de cada proceso (incluído procesos internos de Storm como el ACKing) y así ves si hay algo que está tardando más de la cuenta o no.

  3. Me ha parecido muy interesante el articulo y muy bien escrito. Como único apunte creo que en tiempo real deberia ser sustituido por tiempo de ejecucion.

  4. Un artículo bastante gráfico para hacer una prueba. Creo que voy a echarle un vistazo a la wiki, tiene muy buena pinta.

Comentar