← blog

Pangool + SOLR

En este blog ya hemos hablado de la conveniencia de usar Hadoop para hacer indexación batch, por ejemplo para producir un índice invertido Lucene que se desplegará en SOLR. Hemos hablado del SOLR-1301, comúnmente usado para indexación SOLR con Hadoop. En este artículo vamos a presentar la reciente integración de Pangool con SOLR, mucho más sencilla y potente.

Pangool + SOLR != Hadoop + SOLR

Seguramente ya conocereis lo que es Pangool, la librería de bajo nivel que hemos hecho para facilitar el desarrollo con Hadoop. Entre sus virtudes están la fácil adaptación a patrones comunes como la ordenación secundaria y los joins. Además, “aumenta” la API de Hadoop proveyendo de facilidades tales como la configuración vía instancia y los múltiple Inputs/Outputs “nativos” – especialmente estas dos últimas cosas las hemos aprovechado para hacer una integración conveniente y molona de Pangool con SOLR.

Pero, ¿Qué problemas tiene la integración Hadoop + SOLR actual?

  • No es nada intuitiva de usar. No hay apenas documentación sobre cómo usarla. Hay que hacer varias cosas: añadir varias líneas que llaman a métodos estáticos, implementar un DocumentConverter por narices… Vamos, un tostón.
  • Es monolítica y limitada. No puedes, por ejemplo, producir más de un índice SOLR como salida de tu Map/Reduce.

Ambos problemas son fruto de defectos inherentes en la API de Hadoop. Pangool soluciona muchos de éstos problemas, por ello hemos querido aprovechar para ofrecer una integración suave y bonita de Pangool con SOLR. Para ilustrarla, vamos a programar un indexador de todas las obras de Shakespeare que producirá un índice para cada tipo de obra (COMEDIES, HISTORIES, POETRY, TRAGEDIES).

Multi Shakespeare Indexer

La idea es la siguiente: nos bajaremos las obras de Shakespeare y las descomprimiremos (teneis el link aquí). Vamos a producir un índice SOLR para cada categoría con los siguientes campos: número de línea, texto (de la línea) y título de la obra. De este modo podríamos buscar ciertas palabras y ver en qué línea y obra salen.

Para ello programaremos una tarea con Pangool que leerá las obras como input y en la fase Map emitirá una Tupla con los siguientes datos:

  • número de línea
  • texto (línea)
  • nombre de la obra
  • categoría (COMEDIES, HISTORIES, POETRY, TRAGEDIES)

Estas Tuplas se recibirán en el TupleReducer, donde a su vez las emitiremos a un “named output” en función de su categoría. Cada “named output” estará configurado para usar TupleSolrOutputFormat, que es el OutputFormat de Pangool que implementa la integración con SOLR. De este modo, al terminar la tarea, tendremos cuatro índices, cada uno de ellos en una carpeta diferente: COMEDIES, HISTORIES, POETRY, TRAGEDIES.

Además, con esta idea tendremos cada índice dividido en “n” fragmentos donde “n” es el número de Reducers de la tarea. Así es como típicamente podemos conseguir una indexación distribuída que luego podremos servir en un SOLR distribuído usando “sharding”.

Implementación

Podeis ver el código de MultiShakespeareIndexer aquí. A continuación vamos a comentar un poco el proceso que hemos seguido para implementarlo.

Lo primero de todo que necesitamos es definir una carpeta con la configuración SOLR. En el repositorio de Pangool hemos añadido la carpeta “examples/src/test/resources/shakespeare-solr” con un conf/schema.xml y un conf/solrconfig.xml. Estos XMLs serán leídos por un SOLR embedido que hará el proceso de indexación. Lo más relevante es la siguiente parte del schema.xml, que define qué campos se indexarán y de qué tipo son:

<fields>
  <field name="line" type="long" indexed="true" stored="true" sortMissingLast="true"/>
  <field name="text" type="string" indexed="true" stored="true" sortMissingLast="true"/>
  <field name="title" type="string" indexed="true" stored="true" sortMissingLast="true"/>
</fields>

<defaultSearchField>text</defaultSearchField> 

A continuación nos ponemos a programar la tarea en sí. Definiremos las categorías de Shakespeare como un enum:

public static enum Category {
	COMEDIES, HISTORIES, POETRY, TRAGEDIES
}

A continuación definiremos los Schema de Pangool que vamos a usar a lo largo de la tarea. Vamos a usar dos Schema ya que el intermedio contendrá la categoría y el final no (no necesitamos indexar para cada línea su categoría ya que estamos creando índices separados).

final static Schema SCHEMA = new Schema("shakespeare", Fields.parse("line:long, text:string, title:string, category:" + Category.class.getName()));
final static Schema OUT_SCHEMA = new Schema("shakespeare", Fields.parse("line:long, text:string, title:string"));

Para poder añadir la categoría a las tuplas intermedias necesitamos tener un TupleMapper con estado asociado con cada obra. Para ello haremos un CategoryMapper al que le pasaremos una Category y un título de obra, y asociaremos cada una de las obras de Shakespeare con una instancia diferente de CategoryMapper:

public static class CategoryMapper extends TupleMapper<LongWritable, Text> {

	Category category;
	String title;
	ITuple tuple = new Tuple(SCHEMA);

	public CategoryMapper(Category category, String title) {
		this.category = category;
		this.title = title;
		tuple.set("title", title);
		tuple.set("category", category);
	}

	@Override
	public void map(LongWritable key, Text value, TupleMRContext context, Collector collector) throws IOException, InterruptedException {
		tuple.set("line", key.get());
		tuple.set("text", value.toString());
		collector.write(tuple);
	}
}

Para comenzar a configurar la tarea creamos un TupleMRBuilder, seteamos el Schema intermedio, definimos un group by (agrupamos por “line” para conseguir una distribución más o menos homogénea de los datos y que cada Reducer produzca un sub-índice de tamaño similar) e iteramos sobre las Category para añadir inputs y outputs:

TupleMRBuilder job = new TupleMRBuilder(conf);
job.addIntermediateSchema(SCHEMA);
job.setGroupByFields("line");
...		
for(Category category : Category.values()) { // For each Category
	String categoryString = category.toString().toLowerCase();
	// Add the category, book title input spec with the associated CategoryMapper
	for(FileStatus fileStatus: fileSystem.listStatus(new Path(input + "/" + categoryString))) {
		job.addInput(fileStatus.getPath(), new HadoopInputFormat(TextInputFormat.class), new CategoryMapper(category, fileStatus.getPath().getName()));
	}
	// Add a named output for each category
	job.addNamedOutput(categoryString, new TupleSolrOutputFormat(new File("src/test/resources/shakespeare-solr"), conf), ITuple.class, NullWritable.class);
}

Especialmente relevante es ésta parte:

new TupleSolrOutputFormat(new File("src/test/resources/shakespeare-solr"), conf)

Eso es todo lo que tenemos que hacer para definir un output que producirá un índice SOLR. Por constructor le pasamos el File correspondiente al directorio que tiene conf/schema.xml, conf/solrconfig.xml, etc y el objecto Configuration.

Finalmente, hemos de programar un Reducer que reciba las Tuplas intermedias y escriba a un “named output” u otro en función de la categoría:

// The reducer will just emit the tuple to the corresponding Category output
job.setTupleReducer(new TupleReducer<ITuple, NullWritable>() {

	ITuple outTuple = new Tuple(OUT_SCHEMA);
			
	public void reduce(ITuple group, Iterable<ITuple> tuples, TupleMRContext context, Collector collector) throws IOException, InterruptedException, TupleMRException {

		Category category = (Category) group.get("category"); 
		for(ITuple tuple: tuples) {
			outTuple.set("line",  tuple.get("line"));
			outTuple.set("text",  tuple.get("text"));
			outTuple.set("title", tuple.get("title"));
			collector.getNamedOutput(category.toString().toLowerCase()).write(outTuple, NullWritable.get());
		}
	}
});

En este caso no nos importa el output principal del Job, así que podemos setearlo a NullOutputFormat:

job.setOutput(new Path(output), new HadoopOutputFormat(NullOutputFormat.class), ITuple.class, NullWritable.class);

Finalmente, creamos el Job Hadoop y lo ejecutamos, y ¡listo!

Job hadoopJob = job.createJob();
hadoopJob.waitForCompletion(true);

Algunas cosas a comentar:

  • Las Tuplas que emitimos tienen que ser coherentes con el schema.xml definido. Pangool las convierte automáticamente a instancias de SolrInputDocument usando la clase DefaultTupleDocumentConverter.
  • Si, por algún motivo, necesitáramos tener control sobre el proceso de conversión entre ITuple y SolrInputDocument (por ejemplo, para añadir “boosting” a uno de los campos) entonces deberemos pasar una instancia de nuestra implementación de TupleDocumentConverter al TupleSolrOutputFormat.

Este ejemplo está disponible a partir de la versión __ de Pangool. Para ejecutarlo (deberíamos tener las obras de Shakespeare en el HDFS en la carpeta “shakespeare/”):

hadoop jar pangool-*-examples-job.jar multi_shakespeare_indexer shakespeare/ out-shakespeare

Conclusión

Hemos presentado un ejemplo sencillo de uso Pangool + SOLR donde hemos indexado las obras de Shakespeare en cuatro índices diferentes, en función de su categoría. La integración de Pangool con SOLR es muy sencilla de usar, necesitando tan sólo instanciar un TupleSolrOutputFormat para usarla. Además, podemos usar éste OutputFormat en cualquier “named output” para producir más de un índice por tarea, cosa que no es posible en Hadoop.



Comentar