-->
These old forums are deprecated now and set to read-only. We are waiting for you on our new forums!
More modern, Discourse-based and with GitHub/Google/Twitter authentication built-in.

All times are UTC - 5 hours [ DST ]



Forum locked This topic is locked, you cannot edit posts or make further replies.  [ 1 post ] 
Author Message
 Post subject: SharedReaderProviderStrategy:Pooledsharedreaderprovider impl
PostPosted: Wed Aug 20, 2008 6:21 am 
Newbie

Joined: Wed May 25, 2005 10:53 pm
Posts: 13
Need help with Hibernate? Read this first:
http://www.hibernate.org/ForumMailingli ... AskForHelp

Hibernate version:Core-3.2.6
Hibernate Search-3.0.1 Lucene:2.3.0


Hi all,

I have written a pooledsharedreaderprovider which creates IndexReaders upfront (4 currently) and pools them such that when there are simultaneous requests, each thread gets an instance of the index reader which is shared by minimal threads in a multi-threaded environment.

The basic idea is to have a pool of index readers upfront at initialization time and distribute them among simultaneous threads.

The problem I am facing is that when 100 simultaneous threads are trying to search over Lucene indexes using Hibernate Search, the average response time is around 10-15 seconds. Since a single index reader will reach it's scalability limit with 100 simultaneous threads, I want to try maintaining a pool of index readers.


Can anyone let me know if this will suffice or are the some things that I have missed?

Code:

import static org.hibernate.search.reader.ReaderProviderHelper.buildMultiReader;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiReader;
import org.hibernate.annotations.common.AssertionFailure;
import org.hibernate.search.SearchException;
import org.hibernate.search.engine.SearchFactoryImplementor;
import org.hibernate.search.reader.ReaderProvider;
import org.hibernate.search.store.DirectoryProvider;

/**
* TODO Documentation
*
* @author rakesh_shete
*
*/
public class PooledSharedReaderProvider implements ReaderProvider {

   private static Field subReadersField;

   private static Log logger = LogFactory
         .getLog(PooledSharedReaderProvider.class);

   /**
    * non fair list of locks to block per IndexReader only Locks have to be
    * acquired at least for indexReader retrieval and switch ie for all
    * activeSearchIndexReaders manipulation this map is read only after
    * initialization, no need to synchronize
    */
   private Map<DirectoryProvider, Lock> perDirectoryProviderManipulationLocks;

   private Map<DirectoryProvider, List<ReaderData>> searchIndexReaders = new HashMap<DirectoryProvider, List<ReaderData>>();

   private Map<IndexReader, DirectoryProvider> readerDirProvider = new HashMap<IndexReader, DirectoryProvider>();

   public IndexReader openReader(DirectoryProvider[] directoryProviders) {
      boolean trace = logger.isTraceEnabled();
      int length = directoryProviders.length;
      IndexReader[] readers = new IndexReader[length];
      if (trace)
         logger.debug("Opening IndexReader for directoryProviders: "
               + length);

      for (int index = 0; index < length; index++) {
         DirectoryProvider directoryProvider = directoryProviders[index];
         IndexReader reader;
         Lock directoryProviderLock = perDirectoryProviderManipulationLocks
               .get(directoryProvider);
         if (trace)
            logger.debug("Opening IndexReader from "
                  + directoryProvider.getDirectory().toString());
         directoryProviderLock.lock(); // needed for same problem as the
         // double-checked locking
         try {
            // Get an appropriate index reader
            reader = getIndexReader(directoryProvider);
         }
         finally {
            directoryProviderLock.unlock();
         }
         if (reader == null) {
            if (trace)
               logger.debug("No IndexReader "
                     + directoryProvider.getDirectory().toString());
         }
         readers[index] = reader;
      }

      // Build a multi-reader out of the given readers
      return buildMultiReader(length, readers);
   }

   public void closeReader(IndexReader reader) {
      boolean trace = logger.isTraceEnabled();
      if (reader == null)
         return;
      IndexReader[] readers;
      // TODO should it be CacheableMultiReader? Probably no
      if (reader instanceof MultiReader) {
         try {
            // Get the index readers from the multi-reader
            readers = (IndexReader[]) subReadersField.get(reader);
         }
         catch (IllegalAccessException e) {
            throw new SearchException(
                  "Incompatible version of Lucene: MultiReader.subReaders not accessible",
                  e);
         }
         if (trace)
            logger.debug("Closing MultiReader: " + reader);
      }
      else {
         throw new AssertionFailure(
               "Everything should be wrapped in a MultiReader");
      }

      for (IndexReader subReader : readers) {
         // Get the directory-provider for the given index reader
         DirectoryProvider dirProvider = readerDirProvider.get(subReader);

         if (dirProvider != null) {
            Lock directoryProviderLock = perDirectoryProviderManipulationLocks
                  .get(dirProvider);
            try {
               directoryProviderLock.lock();

               // Now decrement the thread-count for the given reader
               decrementIndexReaderCount(subReader, dirProvider);
            }
            finally {
               directoryProviderLock.unlock();
            }
         }

      }
   }

   public void initialize(Properties props,
         SearchFactoryImplementor searchFactoryImplementor) {
      if (subReadersField == null) {
         try {
            subReadersField = MultiReader.class
                  .getDeclaredField("subReaders");
            if (!subReadersField.isAccessible())
               subReadersField.setAccessible(true);
         }
         catch (NoSuchFieldException e) {
            throw new SearchException(
                  "Incompatible version of Lucene: MultiReader.subReaders not accessible",
                  e);
         }
      }
      Set<DirectoryProvider> providers = searchFactoryImplementor
            .getLockableDirectoryProviders().keySet();
      perDirectoryProviderManipulationLocks = new HashMap<DirectoryProvider, Lock>(
            providers.size());

      for (DirectoryProvider dp : providers) {
         perDirectoryProviderManipulationLocks.put(dp, new ReentrantLock());

         // Create a list of reader data objects. Each reader data has an
         // index reader, thread-count indicating the no. of threads using
         // the index reader and directory provider of the index reader
         List<ReaderData> readerDataList = getReaderData(dp);

         // Add the list to the map of directoryProvider-->readerDataList.
         // This will be used to get an appropriate index reader instance for
         // a given directoryProvider
         searchIndexReaders.put(dp, readerDataList);
      }
      perDirectoryProviderManipulationLocks = Collections
            .unmodifiableMap(perDirectoryProviderManipulationLocks);
   }

   private class ReaderData {

      public ReaderData(int threadCount, DirectoryProvider provider,
            IndexReader indexReader) {
         this.threadCount = threadCount;
         this.provider = provider;
         this.indexReader = indexReader;
      }

      /**
       * The no of threads using the index reader instance
       */
      public int threadCount;

      /**
       * The directory provider used by index reader
       */
      public DirectoryProvider provider;

      public IndexReader indexReader;
   }

   /**
    * Creates and returns a list of readerData objects each having a newly
    * opened index reader instance with thread-count as zero
    *
    * @param provider The directory-provider
    * @return The list of readerData
    */
   private List<ReaderData> getReaderData(DirectoryProvider provider) {
      List<ReaderData> readerDataList = new ArrayList<ReaderData>();
      try {
         for (int i = 0; i < 4; i++) {
            // Create an index reader instance for the given
            // directory-provider
            IndexReader reader = IndexReader.open(provider.getDirectory());
            // Create a reader data instance
            ReaderData readerData = new ReaderData(0, provider, reader);
            readerDataList.add(readerData);
            // Add the indexReader and directory-provider to the map of
            // indexReader-->directory-provider. This required to get the
            // directory-provider for a given indexReader instance using
            // which the readerDataList can be retrieved
            readerDirProvider.put(reader, provider);
         }

         return readerDataList;
      }
      catch (CorruptIndexException e) {
         logger.error(
               "Problems encountered while getting a reader insatnce", e);
      }
      catch (IOException e) {
         logger.error(
               "Problems encountered while getting a reader insatnce", e);
      }

      return null;
   }

   /**
    * Returns an index-reader instance from the readerDataList for the given
    * directory-provider such that the index-reader instance is being shared by
    * minimum no. of threads amongst other index-reader instances and is also
    * active
    *
    * @param provider The directory-provider
    * @return The index-reader instance
    */
   private IndexReader getIndexReader(DirectoryProvider provider) {
      List<ReaderData> readerDataList = searchIndexReaders.get(provider);

      // Make the first readerDataList as the default one from which
      // index-reader instance is to be returned
      ReaderData readerData = readerDataList.get(0);
      try {
         // If no thread is using the first index-reader instance after
         // incrementing its thread count
         if (readerData.indexReader.isCurrent()
               && readerData.threadCount == 0) {
            readerData.threadCount++;
            logger.debug("Returning first reader instance : "
                  + readerData.indexReader + " with threadCount value : "
                  + readerData.threadCount);
            return readerData.indexReader;
         }

         int[] staleIndexReaders = new int[3];
         int count = -1;

         // Now loop through the remaining list of readerDataList, find the
         // one shared by minimum no. of threads and is active. In the
         // process if any index-reader is stale, then, mark it for
         // deleting and adding a newly opened index-reader instead
         for (int i = 1; i < readerDataList.size(); i++) {
            ReaderData tempReaderData = readerDataList.get(i);

            // Check if the current readerData's index-reader instance is
            // up-to-date and if it's not being shared by any thread
            if (tempReaderData.threadCount == 0
                  && tempReaderData.indexReader.isCurrent()) {
               tempReaderData.threadCount++;
               logger.debug("Returning reader instance : "
                     + tempReaderData.indexReader
                     + " with threadCount value : "
                     + tempReaderData.threadCount);
               return tempReaderData.indexReader;
            }
            else {
               // Check if the current readerData's index-reader instance
               // is up-to-date
               if (tempReaderData.indexReader.isCurrent()) {
                  // Check and set the readerData with min. threadCount
                  // value
                  if (tempReaderData.threadCount <= readerData.threadCount) {
                     readerData = tempReaderData;
                  }
               }
               else {
                  // Mark the readerData as stale and the its indexReader
                  // needs to be replaced
                  if (tempReaderData.threadCount == 0) {
                     staleIndexReaders[++count] = i;
                  }
               }
            }

         }

         // Replace the stale indexReaders
         if (count > 0) {
            for (int i = 0; i < count; i++) {
               IndexReader reader = IndexReader.open(provider
                     .getDirectory());
               ReaderData newReaderData = new ReaderData(0, provider,
                     reader);
               // Remove the index-reader-->directoryProvider entry and
               // replace with the new one
               readerDirProvider.remove(readerDataList
                     .get(staleIndexReaders[i]).indexReader);
               readerDataList.remove(staleIndexReaders[i]);
               readerDataList.add(newReaderData);
               readerDirProvider.put(reader, provider);
               logger.debug("Added a new reader instance : " + reader
                     + " for a stale index at position : "
                     + staleIndexReaders[i]);
            }

         }

         // Update the thread count of the readerData whose index-reader
         // instance is to be returned
         readerData.threadCount++;
         logger.debug("Returning reader instance : "
               + readerData.indexReader + " with threadCount value : "
               + readerData.threadCount);
         return readerData.indexReader;
      }
      catch (Exception e) {
         logger
               .error("Exception while getting the index reader instance",
                     e);
      }

      return null;
   }

   /**
    * Decrements the the threadCount for the given indexReader instance
    *
    * @param reader The indexReader instance
    * @param dirProvider The directory-provider
    */
   private void decrementIndexReaderCount(IndexReader reader,
         DirectoryProvider dirProvider) {
      List<ReaderData> readerDataList = searchIndexReaders.get(dirProvider);
      for (ReaderData readerData : readerDataList) {
         if (readerData.indexReader.equals(reader)) {
            readerData.threadCount--;
            logger
                  .debug("Decremented threadCount count for reader instance : "
                        + reader
                        + " New threadCount count is : "
                        + readerData.threadCount);
            break;
         }
      }
   }
}



Top
 Profile  
 
Display posts from previous:  Sort by  
Forum locked This topic is locked, you cannot edit posts or make further replies.  [ 1 post ] 

All times are UTC - 5 hours [ DST ]


You cannot post new topics in this forum
You cannot reply to topics in this forum
You cannot edit your posts in this forum
You cannot delete your posts in this forum

Search for:
© Copyright 2014, Red Hat Inc. All rights reserved. JBoss and Hibernate are registered trademarks and servicemarks of Red Hat, Inc.