Need help with Hibernate? Read this first:
http://www.hibernate.org/ForumMailingli ... AskForHelp
Hibernate version:Core-3.2.6
Hibernate Search-3.0.1 Lucene:2.3.0
Hi all,
I have written a pooledsharedreaderprovider which creates IndexReaders upfront (4 currently) and pools them such that when there are simultaneous requests, each thread gets an instance of the index reader which is shared by minimal threads in a multi-threaded environment.
The basic idea is to have a pool of index readers upfront at initialization time and distribute them among simultaneous threads.
The problem I am facing is that when 100 simultaneous threads are trying to search over Lucene indexes using Hibernate Search, the average response time is around 10-15 seconds. Since a single index reader will reach it's scalability limit with 100 simultaneous threads, I want to try maintaining a pool of index readers.
Can anyone let me know if this will suffice or are the some things that I have missed?
Code:
import static org.hibernate.search.reader.ReaderProviderHelper.buildMultiReader;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiReader;
import org.hibernate.annotations.common.AssertionFailure;
import org.hibernate.search.SearchException;
import org.hibernate.search.engine.SearchFactoryImplementor;
import org.hibernate.search.reader.ReaderProvider;
import org.hibernate.search.store.DirectoryProvider;
/**
* TODO Documentation
*
* @author rakesh_shete
*
*/
public class PooledSharedReaderProvider implements ReaderProvider {
private static Field subReadersField;
private static Log logger = LogFactory
.getLog(PooledSharedReaderProvider.class);
/**
* non fair list of locks to block per IndexReader only Locks have to be
* acquired at least for indexReader retrieval and switch ie for all
* activeSearchIndexReaders manipulation this map is read only after
* initialization, no need to synchronize
*/
private Map<DirectoryProvider, Lock> perDirectoryProviderManipulationLocks;
private Map<DirectoryProvider, List<ReaderData>> searchIndexReaders = new HashMap<DirectoryProvider, List<ReaderData>>();
private Map<IndexReader, DirectoryProvider> readerDirProvider = new HashMap<IndexReader, DirectoryProvider>();
public IndexReader openReader(DirectoryProvider[] directoryProviders) {
boolean trace = logger.isTraceEnabled();
int length = directoryProviders.length;
IndexReader[] readers = new IndexReader[length];
if (trace)
logger.debug("Opening IndexReader for directoryProviders: "
+ length);
for (int index = 0; index < length; index++) {
DirectoryProvider directoryProvider = directoryProviders[index];
IndexReader reader;
Lock directoryProviderLock = perDirectoryProviderManipulationLocks
.get(directoryProvider);
if (trace)
logger.debug("Opening IndexReader from "
+ directoryProvider.getDirectory().toString());
directoryProviderLock.lock(); // needed for same problem as the
// double-checked locking
try {
// Get an appropriate index reader
reader = getIndexReader(directoryProvider);
}
finally {
directoryProviderLock.unlock();
}
if (reader == null) {
if (trace)
logger.debug("No IndexReader "
+ directoryProvider.getDirectory().toString());
}
readers[index] = reader;
}
// Build a multi-reader out of the given readers
return buildMultiReader(length, readers);
}
public void closeReader(IndexReader reader) {
boolean trace = logger.isTraceEnabled();
if (reader == null)
return;
IndexReader[] readers;
// TODO should it be CacheableMultiReader? Probably no
if (reader instanceof MultiReader) {
try {
// Get the index readers from the multi-reader
readers = (IndexReader[]) subReadersField.get(reader);
}
catch (IllegalAccessException e) {
throw new SearchException(
"Incompatible version of Lucene: MultiReader.subReaders not accessible",
e);
}
if (trace)
logger.debug("Closing MultiReader: " + reader);
}
else {
throw new AssertionFailure(
"Everything should be wrapped in a MultiReader");
}
for (IndexReader subReader : readers) {
// Get the directory-provider for the given index reader
DirectoryProvider dirProvider = readerDirProvider.get(subReader);
if (dirProvider != null) {
Lock directoryProviderLock = perDirectoryProviderManipulationLocks
.get(dirProvider);
try {
directoryProviderLock.lock();
// Now decrement the thread-count for the given reader
decrementIndexReaderCount(subReader, dirProvider);
}
finally {
directoryProviderLock.unlock();
}
}
}
}
public void initialize(Properties props,
SearchFactoryImplementor searchFactoryImplementor) {
if (subReadersField == null) {
try {
subReadersField = MultiReader.class
.getDeclaredField("subReaders");
if (!subReadersField.isAccessible())
subReadersField.setAccessible(true);
}
catch (NoSuchFieldException e) {
throw new SearchException(
"Incompatible version of Lucene: MultiReader.subReaders not accessible",
e);
}
}
Set<DirectoryProvider> providers = searchFactoryImplementor
.getLockableDirectoryProviders().keySet();
perDirectoryProviderManipulationLocks = new HashMap<DirectoryProvider, Lock>(
providers.size());
for (DirectoryProvider dp : providers) {
perDirectoryProviderManipulationLocks.put(dp, new ReentrantLock());
// Create a list of reader data objects. Each reader data has an
// index reader, thread-count indicating the no. of threads using
// the index reader and directory provider of the index reader
List<ReaderData> readerDataList = getReaderData(dp);
// Add the list to the map of directoryProvider-->readerDataList.
// This will be used to get an appropriate index reader instance for
// a given directoryProvider
searchIndexReaders.put(dp, readerDataList);
}
perDirectoryProviderManipulationLocks = Collections
.unmodifiableMap(perDirectoryProviderManipulationLocks);
}
private class ReaderData {
public ReaderData(int threadCount, DirectoryProvider provider,
IndexReader indexReader) {
this.threadCount = threadCount;
this.provider = provider;
this.indexReader = indexReader;
}
/**
* The no of threads using the index reader instance
*/
public int threadCount;
/**
* The directory provider used by index reader
*/
public DirectoryProvider provider;
public IndexReader indexReader;
}
/**
* Creates and returns a list of readerData objects each having a newly
* opened index reader instance with thread-count as zero
*
* @param provider The directory-provider
* @return The list of readerData
*/
private List<ReaderData> getReaderData(DirectoryProvider provider) {
List<ReaderData> readerDataList = new ArrayList<ReaderData>();
try {
for (int i = 0; i < 4; i++) {
// Create an index reader instance for the given
// directory-provider
IndexReader reader = IndexReader.open(provider.getDirectory());
// Create a reader data instance
ReaderData readerData = new ReaderData(0, provider, reader);
readerDataList.add(readerData);
// Add the indexReader and directory-provider to the map of
// indexReader-->directory-provider. This required to get the
// directory-provider for a given indexReader instance using
// which the readerDataList can be retrieved
readerDirProvider.put(reader, provider);
}
return readerDataList;
}
catch (CorruptIndexException e) {
logger.error(
"Problems encountered while getting a reader insatnce", e);
}
catch (IOException e) {
logger.error(
"Problems encountered while getting a reader insatnce", e);
}
return null;
}
/**
* Returns an index-reader instance from the readerDataList for the given
* directory-provider such that the index-reader instance is being shared by
* minimum no. of threads amongst other index-reader instances and is also
* active
*
* @param provider The directory-provider
* @return The index-reader instance
*/
private IndexReader getIndexReader(DirectoryProvider provider) {
List<ReaderData> readerDataList = searchIndexReaders.get(provider);
// Make the first readerDataList as the default one from which
// index-reader instance is to be returned
ReaderData readerData = readerDataList.get(0);
try {
// If no thread is using the first index-reader instance after
// incrementing its thread count
if (readerData.indexReader.isCurrent()
&& readerData.threadCount == 0) {
readerData.threadCount++;
logger.debug("Returning first reader instance : "
+ readerData.indexReader + " with threadCount value : "
+ readerData.threadCount);
return readerData.indexReader;
}
int[] staleIndexReaders = new int[3];
int count = -1;
// Now loop through the remaining list of readerDataList, find the
// one shared by minimum no. of threads and is active. In the
// process if any index-reader is stale, then, mark it for
// deleting and adding a newly opened index-reader instead
for (int i = 1; i < readerDataList.size(); i++) {
ReaderData tempReaderData = readerDataList.get(i);
// Check if the current readerData's index-reader instance is
// up-to-date and if it's not being shared by any thread
if (tempReaderData.threadCount == 0
&& tempReaderData.indexReader.isCurrent()) {
tempReaderData.threadCount++;
logger.debug("Returning reader instance : "
+ tempReaderData.indexReader
+ " with threadCount value : "
+ tempReaderData.threadCount);
return tempReaderData.indexReader;
}
else {
// Check if the current readerData's index-reader instance
// is up-to-date
if (tempReaderData.indexReader.isCurrent()) {
// Check and set the readerData with min. threadCount
// value
if (tempReaderData.threadCount <= readerData.threadCount) {
readerData = tempReaderData;
}
}
else {
// Mark the readerData as stale and the its indexReader
// needs to be replaced
if (tempReaderData.threadCount == 0) {
staleIndexReaders[++count] = i;
}
}
}
}
// Replace the stale indexReaders
if (count > 0) {
for (int i = 0; i < count; i++) {
IndexReader reader = IndexReader.open(provider
.getDirectory());
ReaderData newReaderData = new ReaderData(0, provider,
reader);
// Remove the index-reader-->directoryProvider entry and
// replace with the new one
readerDirProvider.remove(readerDataList
.get(staleIndexReaders[i]).indexReader);
readerDataList.remove(staleIndexReaders[i]);
readerDataList.add(newReaderData);
readerDirProvider.put(reader, provider);
logger.debug("Added a new reader instance : " + reader
+ " for a stale index at position : "
+ staleIndexReaders[i]);
}
}
// Update the thread count of the readerData whose index-reader
// instance is to be returned
readerData.threadCount++;
logger.debug("Returning reader instance : "
+ readerData.indexReader + " with threadCount value : "
+ readerData.threadCount);
return readerData.indexReader;
}
catch (Exception e) {
logger
.error("Exception while getting the index reader instance",
e);
}
return null;
}
/**
* Decrements the the threadCount for the given indexReader instance
*
* @param reader The indexReader instance
* @param dirProvider The directory-provider
*/
private void decrementIndexReaderCount(IndexReader reader,
DirectoryProvider dirProvider) {
List<ReaderData> readerDataList = searchIndexReaders.get(dirProvider);
for (ReaderData readerData : readerDataList) {
if (readerData.indexReader.equals(reader)) {
readerData.threadCount--;
logger
.debug("Decremented threadCount count for reader instance : "
+ reader
+ " New threadCount count is : "
+ readerData.threadCount);
break;
}
}
}
}