I was trying to do some Index writing speed improvement and thought of creating a asynchronous Lucene index writer. This writer provides a addDocument() method which can be called asynchronously by multiple threads.
Here are the few scenario where you can utilize this implementation
– Reading the data is slower then writing to the index. (Typical scenario where you read over network, or from database.)
– Reading can be divided in multiple logical parts which can be processed in separate threads.
– You are looking for asynchronous behavior to decouple reading and writing processes.
This implementation is a wrapper which utilizes core methods of IndexWriter class, and does not do any change to it except making it asynchronous. It utilizes Java’s java.util.concurrent.BlockingQueue for storing the documents. It can be supplied with any implementation of this class using its constructor.
Below is the Java source of this implementation class
This class provides multiple constructors to have better control.
Few terms which are used here are as follows
Sleep Milliseconds On Empty: This is the sleep duration when writer finds nothing in queue and wants to wait for some data to come in queue. ()
Queue Size: This is the size of the queue which can be configured as a constructor parameter input.
AsynchronousIndexWriter.java
package swiki.lucene.asynchronous; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import org.apache.lucene.document.Document; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexWriter; /** * @author swiki swiki * */ public class AsynchronousIndexWriter implements Runnable { /* * A blocking queue of document to facilitate asynchronous writing. */ private BlockingQueue documents; /* * Instance of core index writer which does the actual writing task. */ private IndexWriter writer; /* * Thread which makes writing asynchronous */ private Thread writerThread; /* * We need to set this to false if the document addition is completed. This * will not immediately stop the writing as there could be some documents in * the queue. It completes once all documents are written and the queue is * empty. */ private boolean keepRunning = true; /* * This flag is set to false once writer is done with the queue data * writing. */ private boolean isRunning = true; /* * Duration in miliseconds for which the writer should sleep when it finds * the queue empty and job is still not completed */ private long sleepMilisecondOnEmpty = 100; /** * This method should be used to add documents to index queue. If the queue * is full it will wait for the queue to be available. * * @param doc * @throws InterruptedException */ public void addDocument(Document doc) throws InterruptedException { documents.put(doc); } public void startWriting() { writerThread = new Thread(this, "AsynchronousIndexWriter"); writerThread.start(); } /** * Constructor with indexwriter as input. It Uses ArrayBlockingQueue with * size 100 and sleepMilisecondOnEmpty is 100ms * * @param w */ public AsynchronousIndexWriter(IndexWriter w) { this(w, 100, 100); } /** * Constructor with indexwriter and queue size as input. It Uses * ArrayBlockingQueue with size queueSize and sleepMilisecondOnEmpty is * 100ms * * @param w * @param queueSize */ public AsynchronousIndexWriter(IndexWriter w, int queueSize) { this(w, queueSize, 100); } /** * Constructor with indexwriter, queueSize as input. It Uses * ArrayBlockingQueue with size queueSize * * @param w * @param queueSize * @param sleepMilisecondOnEmpty */ public AsynchronousIndexWriter(IndexWriter w, int queueSize, long sleepMilisecondOnEmpty) { this(w, new ArrayBlockingQueue(queueSize), sleepMilisecondOnEmpty); } /** * A implementation of BlockingQueue can be used * * @param w * @param queueSize * @param sleepMilisecondOnEmpty */ public AsynchronousIndexWriter(IndexWriter w, BlockingQueue queue, long sleepMilisecondOnEmpty) { writer = w; documents = queue; this.sleepMilisecondOnEmpty = sleepMilisecondOnEmpty; startWriting(); } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ public void run() { while (keepRunning || !documents.isEmpty()) { Document d = (Document) documents.poll(); try { if (d != null) { writer.addDocument(d); } else { /* * Nothing in queue so lets wait */ Thread.sleep(sleepMilisecondOnEmpty); } } catch (ClassCastException e) { e.printStackTrace(); throw new RuntimeException(e); } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException(e); } catch (CorruptIndexException e) { e.printStackTrace(); throw new RuntimeException(e); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e); } } isRunning = false; } /** * Stop the thread gracefully, wait until its done writing. */ private void stopWriting() { this.keepRunning = false; try { while (isRunning) { //using the same sleep duration as writer uses Thread.sleep(sleepMilisecondOnEmpty); } } catch (InterruptedException e) { e.printStackTrace(); } } public void optimize() throws CorruptIndexException, IOException { writer.optimize(); } public void close() throws CorruptIndexException, IOException { stopWriting(); writer.close(); } }
Below is a sample class which demonstrates how we can use this class. Here are few things to note, asynchronous thread is started as soon as you instantiate using new AsynchronousIndexWriter(…)
TestAsyncWriter.java
package swiki.lucene.asynchronous; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; /** * @author swiki swiki * */ public class TestAsyncWriter { public static void main(String[] args) { try { Directory fsdir = FSDirectory.getDirectory("index"); IndexWriter w = new IndexWriter(fsdir, new StandardAnalyzer(), true); AsynchronousIndexWriter writer = new AsynchronousIndexWriter(w); /* * This call can be replaced by the logic of reading * data using multiple threads */ addDocumentsInMultipleThreads(writer); writer.optimize(); writer.close(); } catch (Exception e) { e.printStackTrace(); } } private static void addDocumentsInMultipleThreads( AsynchronousIndexWriter writer) throws InterruptedException { //add here the code for adding document from multiple threads. Document doc = new Document(); doc.add(new Field("content","My Content", Field.Store.YES, Field.Index.UN_TOKENIZED)); writer.addDocument(new Document()); } }
If you find this useful or have some suggestions for improvements please leave a comment and I will try to respond.
Lucene Asynchronous Index Writer, thread based index writer, multi threaded index writing