-->
These old forums are deprecated now and set to read-only. We are waiting for you on our new forums!
More modern, Discourse-based and with GitHub/Google/Twitter authentication built-in.

All times are UTC - 5 hours [ DST ]



Forum locked This topic is locked, you cannot edit posts or make further replies.  [ 9 posts ] 
Author Message
 Post subject: Constructing a ParallelMultiSearcher in a cluster enviroment
PostPosted: Wed May 21, 2008 1:27 am 
Newbie

Joined: Thu Apr 10, 2008 1:46 am
Posts: 7
Need help with Hibernate? Read this first:
http://www.hibernate.org/ForumMailingli ... AskForHelp

Hibernate version:
hibernate-core-3.2.6.ga, hibernate-annotations-3.3.1.GA, hibernate-search-3.0.1.GA, lucene-2.3.0, apache-activemq-5.1.0, Tomcat 5.5
Mapping documents:

Code:
@Entity
@Indexed
@Table(name="CLUSTERCAT")
public class ClusterCat implements Serializable {

   /**
    * serialVersionUID
    */
   private static final long serialVersionUID = -6163085527542323460L;

   private Integer cid;

   private String name;

   private Integer price;

   private String description;

   @Id
   @DocumentId
   @GeneratedValue(generator = "hibseq")
   @GenericGenerator(name = "hibseq", strategy = "increment")
   @Column(name = "CID", unique = true, nullable = false, precision = 10, scale = 0)
   public Integer getCid() {
      return cid;
   }

   public void setCid(Integer cid) {
      this.cid = cid;
   }

   @Field( index = Index.UN_TOKENIZED, store = Store.YES )
   @Column(name = "NAME", nullable = false)
   public String getName() {
      return name;
   }

   public void setName(String name) {
      this.name = name;
   }

   @Field( index = Index.UN_TOKENIZED, store = Store.YES )
   @Column(name = "PRICE", nullable = false, precision = 10, scale = 0)
   public Integer getPrice() {
      return price;
   }

   public void setPrice(Integer price) {
      this.price = price;
   }

   @Fields( {
      @Field(name = "description", index = Index.TOKENIZED, store = Store.NO ),
      @Field(name = "description_sort", index = Index.UN_TOKENIZED, store = Store.YES )
      } )
   @Column(name = "DESCRIPTION", nullable = false)
   public String getDescription() {
      return this.description;
   }

   public void setDescription(String description) {
      this.description = description;
   }
}
Code between sessionFactory.openSession() and session.close():
Code:
public class SlaveClass {
   
   public static void main(String[] args) {
      SlaveClass slaveClass = new SlaveClass();
      slaveClass.produceSlaveIndexSearcher();
   }
   
   public void produceSlaveIndexSearcher() {

      Session session = null;
      FullTextSession ftSession = null;
      SearchFactory searchFactory = null;
      DirectoryProvider[] dirProviders = null;
      ReaderProvider readerProvider = null;
      IndexReader indexReader = null;
      IndexSearcher indexSearcher = null;
      RemoteSearchable remoteSearchable = null;

      try {
         session = HibernateUtil.getSessionFactory().openSession();
         ftSession = Search.createFullTextSession( session );
         searchFactory = ftSession.getSearchFactory();
         dirProviders = searchFactory.getDirectoryProviders( ClusterCat.class );
         readerProvider = searchFactory.getReaderProvider();
         indexReader = readerProvider.openReader( dirProviders );
         indexSearcher = new IndexSearcher( indexReader );
         remoteSearchable = new RemoteSearchable( indexSearcher );
         //send the RemoteSearchable to JMS
         JMSBean.getQueueHandler().produceSlaveRemoteSearchables( remoteSearchable );
      } catch ( Exception e ) {
         e.printStackTrace();
      }
   }
}

Code:
public class JMSBean {

   private static QueueHandler queueHandler = new QueueHandler();

   public static QueueHandler getQueueHandler() {
      return queueHandler;
   }

   public static void setQueueHandler(QueueHandler queueHandler) {
      JMSBean.queueHandler = queueHandler;
   }
}

Code:
public class QueueHandler {

   private QueueConnectionFactory queueConnectionFactory = null;
   private QueueConnection queueConnection = null;
   private QueueSession queueSession = null;
   private Queue queue = null;
   private Message message;
   private ObjectMessage objMessage = null;

   /*   producer   */
   private QueueSender queueSender = null;
   /*   consumer   */
   private QueueReceiver queueReceiver;


   public QueueHandler() {
      /*   Create the connection   */
        queueConnectionFactory =
           new ActiveMQConnectionFactory( JMSConstants.USER,
                 JMSConstants.PASSWORD, JMSConstants.URL );
   }

   public void produceSlaveRemoteSearchables( RemoteSearchable remoteSearchable ) {

      try {

           /*   JMS   */
            queueConnection = queueConnectionFactory.createQueueConnection();
            queueSession = queueConnection.createQueueSession( true, 0 );
            queue = queueSession.createQueue( JMSConstants.SLAVE_REMOTE_SEARCHABLES_QUEUE );
            queueSender = queueSession.createSender( queue );
            objMessage = queueSession.createObjectMessage();
            objMessage.setObject( remoteSearchable );
          queueSender.send( objMessage );
              System.out.println("producing SlaveRemoteSearchables queue...");
            queueSession.commit();

        } catch ( JMSException e ) {
           try {
              queueSession.rollback();
           } catch ( JMSException jmse ) {
              jmse.printStackTrace();
           }
           e.printStackTrace();
        } finally {
            if (  queueConnection != null  ) {
                try {
                    queueConnection.close();
                } catch ( JMSException jmse ) {
                   jmse.printStackTrace();
                }
            }
        }
   }

   public RemoteSearchable consumeSlaveRemoteSearchables() {

      RemoteSearchable remoteSearchable = null;
       try {

           queueConnection = queueConnectionFactory.createQueueConnection();
           queueSession = queueConnection.createQueueSession( true, 0 );
           queue = queueSession.createQueue( JMSConstants.SLAVE_REMOTE_SEARCHABLES_QUEUE );
           queueReceiver = queueSession.createReceiver( queue );
           queueConnection.start();

           message = queueReceiver.receive( 1000 );

           while(  true  ) {
               if (  message != null  ) {
                  /*   ObjectMessage   */
                   if (  message instanceof ObjectMessage  ) {
                      System.out.println("Reading message...");
                      objMessage = (ObjectMessage)message;
                      remoteSearchable = (RemoteSearchable)objMessage.getObject();
                       break;
                   }
               } else {
                  break;
               }
           }
           queueSession.commit();

       } catch ( JMSException e ) {
          try {
             queueSession.rollback();
          } catch ( JMSException jmse ) {
             jmse.printStackTrace();
          }
           e.printStackTrace();
       } finally {
           if (  queueConnection != null  ) {
               try {
                   queueConnection.close();
               } catch ( JMSException jmse ) {
                  jmse.printStackTrace();
               }
           }
       }

       return remoteSearchable;
   }
}

Full stack trace of any exception that occurs:
Code:
java.lang.RuntimeException: org.apache.lucene.search.IndexSearcher
   at org.apache.activemq.command.ActiveMQObjectMessage.storeContent(ActiveMQObjectMessage.java:104)
   at org.apache.activemq.command.ActiveMQObjectMessage.setObject(ActiveMQObjectMessage.java:155)
   at jms.QueueHandler.produceSlaveRemoteSearchables(QueueHandler.java:144)
   at slave.SlaveClass.produceSlaveIndexSearcher(SlaveClass.java:115)
   at slave.SlaveThread.run(SlaveThread.java:42)
Caused by: java.io.NotSerializableException: org.apache.lucene.search.IndexSearcher
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1081)
   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1375)
   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1347)
   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1290)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1079)
   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:302)
   at org.apache.activemq.command.ActiveMQObjectMessage.storeContent(ActiveMQObjectMessage.java:98)
   ... 4 more

Name and version of the database you are using:
PostgreSQL 8.3
The generated SQL (show_sql=true):
none
Debug level Hibernate log excerpt:
none
Problems with Session and transaction handling?
none

Hello, I have a question about how to construct a ParallelMultiSearcher in a cluster enviroment( with two poor PCs, one master, and one slave). What I'm trying to do is let the slave offer its IndexSearcher and send it to JMS queue, then the master will consume slave's IndexSearcher from JMS queue, and combine it with master's IndexSearcher to construct a "ParallelMultiSearcher" for searching the same entity, so that Lucene can take advantage of multiple CPUs and harddisks.
what I don't understand is when I send a "RemoteSearchable" to a JMS queue, like this:
Code:
indexSearcher = new IndexSearcher( indexReader );
         remoteSearchable = new RemoteSearchable( indexSearcher );
         //send the RemoteSearchable to JMS
         JMSBean.getQueueHandler().produceSlaveRemoteSearchables( remoteSearchable );

then, the holy java.io.NotSerializableException happens, but "RemoteSearchable" itself is subinterfaces of interface "Serializable". From the above error message, it seems to me that ActiveMQ tries to extract what's wrapped inside the RemoteSearcher, and if the inner object doesn't implement Serializable, the exception happens. Is there any solution for this, if using JMS?
By the way, I've also studied "Lucene In Action"(I bought Hibernate Search in Action too, but Chap.10 is not avaliable yet...) and had successfully tried creating a ParallelMultiSearcher through RMI, I noticed that org.apache.lucene.search.Searchable is a subinterface of javax.rmi.Remote, but does it mean that RMI is the only way to gather each IndexSearcher from each node in the cluster to constuct a ParallelMultiSearcher?


Top
 Profile  
 
 Post subject: Re: Constructing a ParallelMultiSearcher in a cluster enviro
PostPosted: Thu May 22, 2008 3:00 am 
Hibernate Team
Hibernate Team

Joined: Thu Apr 05, 2007 5:52 am
Posts: 1689
Location: Sweden
Hi,

atoman wrote:
what I don't understand is when I send a "RemoteSearchable" to a JMS queue, like this:
Code:
indexSearcher = new IndexSearcher( indexReader );
         remoteSearchable = new RemoteSearchable( indexSearcher );
         //send the RemoteSearchable to JMS
         JMSBean.getQueueHandler().produceSlaveRemoteSearchables( remoteSearchable );

then, the holy java.io.NotSerializableException happens, but "RemoteSearchable" itself is subinterfaces of interface "Serializable". From the above error message, it seems to me that ActiveMQ tries to extract what's wrapped inside the RemoteSearcher, and if the inner object doesn't implement Serializable, the exception happens. Is there any solution for this, if using JMS?


I don't think there is a solution for your problem. If you serialize an object you have to serialize the whole object graph, meaning the object with all its fields and then field's fields ... This means the whole graph has to be serializable or any non serializable fields have to be marked as transient. IndexSearcher is not serializable. Kind of makes sense. One wold assume it uses I/O streams to access the index. How would you serialize them?

Quote:
By the way, I've also studied "Lucene In Action"(I bought Hibernate Search in Action too, but Chap.10 is not avaliable yet...) and had successfully tried creating a ParallelMultiSearcher through RMI, I noticed that org.apache.lucene.search.Searchable is a subinterface of javax.rmi.Remote, but does it mean that RMI is the only way to gather each IndexSearcher from each node in the cluster to constuct a ParallelMultiSearcher?


I am not an expert on this, but I believe so. RMI is the way to go.

--Hardy


Top
 Profile  
 
 Post subject: Re: Constructing a ParallelMultiSearcher in a cluster enviro
PostPosted: Thu May 22, 2008 5:36 am 
Newbie

Joined: Thu Apr 10, 2008 1:46 am
Posts: 7
hardy.ferentschik wrote:
Hi,

atoman wrote:
what I don't understand is when I send a "RemoteSearchable" to a JMS queue, like this:
Code:
indexSearcher = new IndexSearcher( indexReader );
         remoteSearchable = new RemoteSearchable( indexSearcher );
         //send the RemoteSearchable to JMS
         JMSBean.getQueueHandler().produceSlaveRemoteSearchables( remoteSearchable );

then, the holy java.io.NotSerializableException happens, but "RemoteSearchable" itself is subinterfaces of interface "Serializable". From the above error message, it seems to me that ActiveMQ tries to extract what's wrapped inside the RemoteSearcher, and if the inner object doesn't implement Serializable, the exception happens. Is there any solution for this, if using JMS?


I don't think there is a solution for your problem. If you serialize an object you have to serialize the whole object graph, meaning the object with all its fields and then field's fields ... This means the whole graph has to be serializable or any non serializable fields have to be marked as transient. IndexSearcher is not serializable. Kind of makes sense. One wold assume it uses I/O streams to access the index. How would you serialize them?

Quote:
By the way, I've also studied "Lucene In Action"(I bought Hibernate Search in Action too, but Chap.10 is not avaliable yet...) and had successfully tried creating a ParallelMultiSearcher through RMI, I noticed that org.apache.lucene.search.Searchable is a subinterface of javax.rmi.Remote, but does it mean that RMI is the only way to gather each IndexSearcher from each node in the cluster to constuct a ParallelMultiSearcher?


I am not an expert on this, but I believe so. RMI is the way to go.

--Hardy


Thanks, Hardy, now I understand why java.io.NotSerializableException came up.


Top
 Profile  
 
 Post subject: how to split Lucene index for multiple machine?
PostPosted: Thu May 22, 2008 5:55 am 
Newbie

Joined: Thu Apr 10, 2008 1:46 am
Posts: 7
Although RMI is the only way I've tried successfully to create a ParallelMultiSearcher, I made a mistake. Because master's Lucene index is exactely the same as slave's Lucene index, the Hits returned by the ParallelMultiSearcher contained some duplicated data. For example, the master's IndexSearcher found document-1(Lucene Document), and slave's IndexSearcher found the same document as well, then this document were found in the Hits "TWICE".

Does anyone know how to split the lucene index into parts and put them in different machines, or what to do, so that the ParallelMultiSearcher can take advantage of multiple CPUs, harddisks??


Top
 Profile  
 
 Post subject:
PostPosted: Thu May 22, 2008 6:51 am 
Hibernate Team
Hibernate Team

Joined: Fri Oct 05, 2007 4:47 pm
Posts: 2536
Location: Third rock from the Sun
Hi,

Quote:
Does anyone know how to split the lucene index into parts and put them in different machines, or what to do, so that the ParallelMultiSearcher can take advantage of multiple CPUs, harddisks??

IMHO you could get good performance using multiple harddisks and CPUs on a single machine, the remote calls between different search servers are a huge performance bottleneck, if you can avoid them.
Are your queries so slow? I am getting really impressive results using a single low-budget server; massive indexing has been a problem but the JMS pattern in Hibernate Search solved that nicely.

If you find out you really need to use ParallelMultiSearcher executing on multiple machines maybe we could need to support Solr as a backend, please let us know about your experiences.
Also I think we could combine the index sharding with a "RemoteParallelMultiSearcher", to select a different shard on each node and combine the results, but personally I don't like all that network traffic being generated.

_________________
Sanne
http://in.relation.to/


Top
 Profile  
 
 Post subject:
PostPosted: Fri May 23, 2008 11:15 am 
Newbie

Joined: Thu Apr 10, 2008 1:46 am
Posts: 7
Hello, Sanne

Quote:
IMHO you could get good performance using multiple harddisks and CPUs on a single machine, the remote calls between different search servers are a huge performance bottleneck, if you can avoid them.
Are your queries so slow? I am getting really impressive results using a single low-budget server; massive indexing has been a problem but the JMS pattern in Hibernate Search solved that nicely.

Actually, my quries are not slow at all, they are pretty fast, and that's the reason why Lucene impressed me so much. I inserted 3.6 million records into PostreSQL, each record contains about 200 Chinese word average, and they were all read randomly from a number of novels. The average time it took to search is about 30ms. The same query would take forevery for PosgreSQL, and I was doing this experiment on my laptop, which only has a single 2.0 GHz CUP with 2GB RAM, 7200 rpm hard disk.

Well, I knew the latest computers have multiple processors, or even RAID would be good enough for my case, but, as being a R&D, my job is to try my best to make my boss's dream a reality, and I myself also want to try as much as I can to see what Hibernate Search can do because I think the value of clustering are stability, failover and task sharing. Besides, if my 4 years old laptop can do the job, any other machines can do the job as well. What Hibernate Search impressed me is that it does a great job to integrate with Lucene, once persistance layer and full text search layer are set, you don't really have to worry about DB, because it basically can be replaced with any other DB, and still does a nice job on full text searching.

Quote:
If you find out you really need to use ParallelMultiSearcher executing on multiple machines maybe we could need to support Solr as a backend, please let us know about your experiences.
Also I think we could combine the index sharding with a "RemoteParallelMultiSearcher", to select a different shard on each node and combine the results, but personally I don't like all that network traffic being generated.

Thank you for your valuable opinion, you lead me to the right direction. YES, index sharding works! And I would like to share my rough experiment in this case, I tried this experiment on my laptop(the master) and a USB removable drive(VMware, simulating the slave), here's my hibernate.cfg.xml:
Code:
<!--   Master Mode   -->
      <property name="hibernate.search.default.sourceBase">
         D:/LuceneSourceBase
      </property>
      <property name="hibernate.search.default.indexBase">
         ./lucene
      </property>
      <property name="hibernate.search.default.refresh">
         60
      </property>
      <property name="hibernate.search.default.directory_provider">
         org.hibernate.search.store.FSMasterDirectoryProvider
      </property>


      <!--   Slave Mode
      <property name="hibernate.search.default.sourceBase">
         \\10.168.127.3\D$\LuceneSourceBase
      </property>
      <property name="hibernate.search.default.indexBase">
         ./lucene
      </property>
      <property name="hibernate.search.default.refresh">
         60
      </property>
      <property name="hibernate.search.default.directory_provider">
         org.hibernate.search.store.FSSlaveDirectoryProvider
      </property>
      <property name="hibernate.searcch.worker.backend">
         jms
      </property>
      <property name="org.hibernate.worker.jndi">
         org.apache.activemq.jndi.ActiveMQInitialContextFactory
      </property>
      <property name="hibernate.search.worker.jms.connection_factory">
         ConnectionFactory
      </property>
      <property name="hibernate.search.worker.jms.queue">
         InsertObjectQueue
      </property>
      <property name="org.hibernate.worker.execution">
         async
      </property>
      <property name="org.hibernate.worker.thread_pool.size">
         2
      </property>
      <property name="org.hibernate.worker.buffer_queue.max">
         10
      </property>
<property name="hibernate.search.search.basicCluster.ClusterCat.sharding_strategy.nbr_of_shards">
         4
      </property>
      -->
...
...
...


QueueHandler:
Code:
public class QueueHandler {

   private QueueConnectionFactory queueConnectionFactory;
   private QueueConnection queueConnection;
   private QueueSession queueSession;
   private Queue queue;
   private Message message;
   private ObjectMessage objMessage;

   /*   producer   */
   private QueueSender queueSender;
   /*   consumer   */
   private QueueReceiver queueReceiver;

   public QueueHandler() {
      /*   Create the connection   */
        queueConnectionFactory =
           new ActiveMQConnectionFactory( JMSConstants.USER,
                 JMSConstants.PASSWORD, JMSConstants.URL );
   }

   /*
      slave produces insert object
   */
   public void produceInsertObject(Serializable serial) {

        try {

           /*   JMS   */
            queueConnection = queueConnectionFactory.createQueueConnection();
            queueSession = queueConnection.createQueueSession( true, 0 );
            queue = queueSession.createQueue( JMSConstants.INSERT_OBJECT_QUEUE );
            queueSender = queueSession.createSender( queue );
            objMessage = queueSession.createObjectMessage();
            objMessage.setObject( serial );
          queueSender.send( objMessage );
              System.out.println("producing InsertSerialiableObject queue...");
            queueSession.commit();

        } catch ( JMSException e ) {
           try {
              queueSession.rollback();
           } catch ( JMSException jmse ) {
              jmse.printStackTrace();
           }
           e.printStackTrace();
        } finally {
            if (  queueConnection != null  ) {
                try {
                    queueConnection.close();
                } catch ( JMSException jmse ) {
                   jmse.printStackTrace();
                }
            }
        }
   }

   /*
      master consume the insert object queue
   */
   public void consumeInsertObject(MasterTask masterTask) {

        try {

            queueConnection = queueConnectionFactory.createQueueConnection();
            queueSession = queueConnection.createQueueSession( true, 0 );
            queue = queueSession.createQueue( JMSConstants.INSERT_OBJECT_QUEUE );
            queueReceiver = queueSession.createReceiver( queue );
//            queueReceiver.setMessageListener(
//                  new QueueListener( queueSession, masterTask ) );
            queueConnection.start();

            while (  ( message = queueReceiver.receive( 1000 ) ) != null  ){
               System.out.println("has more message left...");
               /*   ObjectMessage   */
               if (  message instanceof ObjectMessage  ) {
                  System.out.println("Reading message...");
                   objMessage = (ObjectMessage)message;
                   Serializable serial = (Serializable)objMessage.getObject();
                   /*   call master to insert this serializable   */
                   masterTask.insertSlaveCommands( serial );
               } else {
                  System.out.println("not an object message...");
               }
               try {
                  Thread.sleep( 100 );
               } catch ( Exception e ){
                  e.printStackTrace();
               }
            }

            queueSession.commit();

        } catch ( JMSException e ) {
           try {
              queueSession.rollback();
           } catch ( JMSException jmse ) {
              jmse.printStackTrace();
           }
            e.printStackTrace();
        } finally {
            if (  queueConnection != null  ) {
                try {
                    queueConnection.close();
                } catch ( JMSException jmse ) {
                   jmse.printStackTrace();
                }
            }
        }
   }
}


ISlaveRMI:
Code:
public interface ISlaveRMI extends Remote, Serializable {

   public RemoteSearchable getSlaveRemoteSearchable(Class theClass) throws Exception;
}


SlaveRMI:
Code:
public class SlaveRMI implements ISlaveRMI    {

   /**
    * serialVersionUID
    */
   private static final long serialVersionUID = -3135131536379687733L;

   public SlaveRMI() throws RemoteException {}

   public RemoteSearchable getSlaveRemoteSearchable(Class theClass) throws RemoteException {

      Session session = null;
      FullTextSession ftSession = null;
      SearchFactory searchFactory = null;
      DirectoryProvider[] dirProviders = null;
      DirectoryProvider[] halfDirProviders = null;
      ReaderProvider readerProvider = null;
      IndexReader indexReader = null;
      IndexSearcher indexSearcher = null;
      RemoteSearchable slaveRemoteSearchable = null;
      try {

         session = HibernateUtil.getSessionFactory().openSession();
         ftSession = Search.createFullTextSession( session );
         searchFactory = ftSession.getSearchFactory();
         readerProvider = searchFactory.getReaderProvider();

         dirProviders = searchFactory.getDirectoryProviders( theClass );
         System.out.println("dirProviders.length = "+dirProviders.length);
         /*   split the original dirProviders into two parts, and
             put either one of them into the indexReader   */
         halfDirProviders = new DirectoryProvider[]{
               dirProviders[ 2 ], dirProviders[ 3 ]
            };

//         indexReader = readerProvider.openReader( dirProviders );
         indexReader = readerProvider.openReader( halfDirProviders );
         indexSearcher = new IndexSearcher( indexReader );
         slaveRemoteSearchable = new RemoteSearchable( indexSearcher );

      } catch ( Exception e ) {
         e.printStackTrace();
      }

      return slaveRemoteSearchable;
   }
}


MasterRMI:
Code:
public class MasterRMI {

   public ParallelMultiSearcher getParallelMultiSearcher() {

      Registry registry = null;
      Session session = null;
      FullTextSession ftSession = null;
      SearchFactory searchFactory = null;
      DirectoryProvider[] dirProviders = null;
      DirectoryProvider[] halfDirProviders = null;
      ReaderProvider readerProvider = null;
      IndexReader indexReader = null;
      IndexSearcher indexSearcher = null;
//      RemoteSearchable slaveRemoteSearchables = null;
      Searchable slaveRemoteSearchables = null;
      ParallelMultiSearcher parallelMultiSearcher = null;


      session = HibernateUtil.getSessionFactory().openSession();
      ftSession = Search.createFullTextSession( session );

      searchFactory = ftSession.getSearchFactory();
      dirProviders = searchFactory.getDirectoryProviders( ClusterCat.class );
      readerProvider = searchFactory.getReaderProvider();

      System.out.println("dirProviders.length = "+dirProviders.length);
      /*   split the original dirProviders into two parts, and
          put either one of them into the indexReader   */
      halfDirProviders = new DirectoryProvider[]{
            dirProviders[ 0 ], dirProviders[ 1 ]
         };

//      indexReader = readerProvider.openReader( dirProviders );
      indexReader = readerProvider.openReader( halfDirProviders );

      /*   master's searchable   */
      indexSearcher = new IndexSearcher( indexReader );
      /*   slave's searchable   */
      try {
         registry = LocateRegistry.getRegistry( RMIConstants.IP );
//         slaveRemoteSearchables = (RemoteSearchable)registry.lookup( RMIConstants.SLAVE_REMOTE_SEARCHABLE );
         ISlaveRMI iSlaveRMI = (ISlaveRMI)registry.lookup( RMIConstants.ISLAVE_RMI );
//         slaveRemoteSearchables = iSlaveRMI.getSlaveRemoteSearchable( search.basicCluster.ClusterCat.class );
         slaveRemoteSearchables = (Searchable)(registry.lookup( RMIConstants.SLAVE_REMOTE_SEARCHABLE) );
         /*   build a ParallelMultiSearcher   */
         parallelMultiSearcher = new ParallelMultiSearcher(
                  new Searchable[]{ indexSearcher, slaveRemoteSearchables } );
         System.out.println("============== master + slave ================");
      } catch ( Exception e ) {
         try {
            parallelMultiSearcher = new ParallelMultiSearcher(
                  new Searchable[]{ indexSearcher } );
            System.out.println("============== master ================");
         } catch ( IOException ioe ) {
            ioe.printStackTrace();
         }
      }

      return parallelMultiSearcher;
   }
}


MasterTask:
Code:
public class MasterTask extends DemoContent {

   public void insertLocal() {

      Session session = HibernateUtil.getSessionFactory().openSession();
      FullTextSession ftSession = Search.createFullTextSession( session );
      Transaction tx = ftSession.beginTransaction();

      Criteria criteria = session.createCriteria( ClusterCat.class );
      criteria.setProjection( Projections.max("cid") );
      ClusterCat cat = null;
      int rowCount = 0;

      for ( int i = 0; i < 10; i++ ) {
         try {
            rowCount = new Integer( criteria.list().iterator().next().toString() ).intValue() + 1;
         } catch ( Exception e ) {
            rowCount = 1;
         }
         cat = new ClusterCat();

         cat.setName( "CatName - " + rowCount );
         cat.setPrice( DemoUtil.getARandPrice() );
         cat.setDescription( DemoConstants.newsArr[ DemoUtil.getANewsRandNum() ] );
         ftSession.save( cat );
      }

      commitTx( tx );
   }

   public void insertSlaveCommands( Serializable serial ) {

      Session session = HibernateUtil.getSessionFactory().openSession();
      FullTextSession ftSession = Search.createFullTextSession( session );
      Transaction tx = ftSession.beginTransaction();

      ftSession.save( serial );

      commitTx( tx );
   }

   public void checkInsertQueue() {

//      do {
         JMSBean.getQueueHandler().consumeInsertObject( this );
//         if (  serial != null  ) {
//            insertSlaveCommands( serial );
//         }
//      } while(  serial != null );
   }

   /*
      master - search local
   */
   public void searchLocal() {

      Session session = HibernateUtil.currentSession();
      FullTextSession ftSession = Search.createFullTextSession( session );
      Transaction tx = ftSession.beginTransaction();

      QueryParser parser = new QueryParser( "description", new StandardAnalyzer() );

      org.apache.lucene.search.Query luceneQuery = null;
      try{
         luceneQuery = parser.parse( "bus" );
      }catch( Exception e ){
         e.printStackTrace();
      }

      long start = System.currentTimeMillis();
      FullTextQuery ftQuery = ftSession.createFullTextQuery( luceneQuery, ClusterCat.class );
      long end = System.currentTimeMillis();
      System.out.println("search.time = " + ( end - start ) + " ms");

      /*   sort by cid   */
      Sort sort = new Sort(
            new SortField[]{
               new SortField("cid", true)
            }
         );
      ftQuery.setSort( sort );
      ftQuery.setMaxResults( 5 );
      List result = null;
      try {
         result = ftQuery.list();
      } catch ( Exception e ) {
         result = new ArrayList();
      }

      /*   commit   */
      commitTx( tx );

      FancyPrinter.getInstance().print( result.iterator(), true );

      HibernateUtil.closeSession();
   }

   public void searchLocalAndRemote() {

      ParallelMultiSearcher parallelMultiSearcher = null;
      Hits hits = null;

      try {

         /*   parallelMultiSearcher   */
         parallelMultiSearcher = new MasterRMI().getParallelMultiSearcher();

         QueryParser parser = new QueryParser( "description", new StandardAnalyzer() );
         org.apache.lucene.search.Query luceneQuery = null;
         try{
            luceneQuery = parser.parse( "bus" );
         }catch( Exception e ){
            e.printStackTrace();
         }
         //sorting
         Sort sort = new Sort(
               new SortField[] {
                     new SortField( "cid", SortField.INT, true )
                  }
               );

         long start = System.currentTimeMillis();
         hits = parallelMultiSearcher.search( luceneQuery, sort );
         long end = System.currentTimeMillis();
         System.out.println("parallelSearch.total time spent = " + ( end - start ) + " ms" );

//         FancyPrinter.getInstance().print(
//               LuceneUtil.getInstance().readHits( hits ).iterator(), true );
         FancyPrinter.getInstance().setCheckMethodPrefix( false );
         FancyPrinter.getInstance().print(
               LuceneUtil.getInstance().readHits( hits, 5 ).iterator(), true );
      } catch ( Exception e ) {
         e.printStackTrace();
      }

      HibernateUtil.closeSession();
   }
}


SlaveTask:
Code:
public class SlaveTask extends DemoContent {

   /*
      slave - insert through JMS
   */
   public void insertThroughJMS() {

      ClusterCat cat = null;
      for ( int i = 0; i < 10; i++ ) {
         cat = new ClusterCat();
         cat.setName( "CatName - " + DemoUtil.getARandPrice() );
         cat.setPrice( DemoUtil.getARandPrice() );
         cat.setDescription( DemoConstants.newsArr[ DemoUtil.getANewsRandNum() ] );

         JMSBean.getQueueHandler().produceInsertObject( cat );
      }
   }

   /*
      slave - search local
   */
   public void searchLocal() {

      Session session = HibernateUtil.currentSession();
      FullTextSession ftSession = Search.createFullTextSession( session );
      Transaction tx = ftSession.beginTransaction();

      QueryParser parser = new QueryParser( "description", new StandardAnalyzer() );

      org.apache.lucene.search.Query luceneQuery = null;
      try{
         luceneQuery = parser.parse( "bush" );
      }catch( Exception e ){
         e.printStackTrace();
      }
      long start = System.currentTimeMillis();
      FullTextQuery ftQuery = ftSession.createFullTextQuery( luceneQuery, ClusterCat.class );
      long end = System.currentTimeMillis();
      System.out.println("search.time = " + ( end - start ) + " ms");

      /*   sort by cid   */
      Sort sort = new Sort(
            new SortField[]{
               new SortField("cid", true)
            }
         );
      ftQuery.setSort( sort );
      ftQuery.setMaxResults( 5 );
      List result = null;
      try {
         result = ftQuery.list();
      } catch ( Exception e ) {
         result = new ArrayList();
      }

      /*   commit   */
      commitTx( tx );

      FancyPrinter.getInstance().print( result.iterator(), true );

      HibernateUtil.closeSession();
   }

   /*
      slave - search local half index
   */
   public void searchLocalHalf() {

      Session session = null;
      FullTextSession ftSession = null;
      SearchFactory searchFactory = null;
      DirectoryProvider[] dirProviders = null;
      DirectoryProvider[] halfDirProviders = null;
      ReaderProvider readerProvider = null;
      IndexReader indexReader = null;
      IndexSearcher indexSearcher = null;
      Hits hits = null;

      try {

         session = HibernateUtil.getSessionFactory().openSession();
         ftSession = Search.createFullTextSession( session );
         searchFactory = ftSession.getSearchFactory();
         readerProvider = searchFactory.getReaderProvider();

         dirProviders = searchFactory.getDirectoryProviders( ClusterCat.class );
         System.out.println("dirProviders.length = "+dirProviders.length);
         /*   split the original dirProviders into two parts, and
             put either one of them into the indexReader   */
         halfDirProviders = new DirectoryProvider[]{
               dirProviders[ 2 ], dirProviders[ 3 ]
            };

//         indexReader = readerProvider.openReader( dirProviders );
         indexReader = readerProvider.openReader( halfDirProviders );
         indexSearcher = new IndexSearcher( indexReader );

         QueryParser parser = new QueryParser( "description", new StandardAnalyzer() );
         org.apache.lucene.search.Query luceneQuery = parser.parse( "bush" );

         //sorting
         Sort sort = new Sort(
               new SortField[] {
                     new SortField( "cid", SortField.INT, true )
                  }
               );
         long start = System.currentTimeMillis();
         hits = indexSearcher.search( luceneQuery, sort );
         long end = System.currentTimeMillis();
         System.out.println("parallelSearch.total time spent = " + ( end - start ) + " ms" );

         FancyPrinter.getInstance().setCheckMethodPrefix( false );
         FancyPrinter.getInstance().print(
               LuceneUtil.getInstance().readHits( hits, 5 ).iterator(), true );

         HibernateUtil.closeSession();

      } catch ( Exception e ) {
         e.printStackTrace();
      }
   }

   public void exposeSlaveRemoteSearchables() {

      ISlaveRMI iSlaveRMI = null;
      SlaveRMI slaveRMI = null;
      RemoteSearchable parallelSearcher = null;
      Registry registry = null;
      Class theClass = ClusterCat.class;
      try {

         slaveRMI = new SlaveRMI();
         iSlaveRMI = (ISlaveRMI)UnicastRemoteObject.exportObject( slaveRMI, RMIConstants.PORT );
         //ParallelMultiSearcher
         parallelSearcher = slaveRMI.getSlaveRemoteSearchable( theClass );
         registry = LocateRegistry.getRegistry();
          registry.rebind( RMIConstants.ISLAVE_RMI, iSlaveRMI );
          registry.rebind( RMIConstants.SLAVE_REMOTE_SEARCHABLE, parallelSearcher);

          System.out.println("SlaveRMI ready...");

      } catch ( Exception e ) {
         e.printStackTrace();
      }
   }
}


MasterThread:
Code:
public class MasterThread extends Thread {

   private MasterTask master;
   private String threadName;
   private long sleepTime;
   private int runCount;
   private boolean stop;

   public MasterThread(long sleepTime, String threadName) {
      this.sleepTime = sleepTime;
      this.threadName = threadName;
      master = new MasterTask();
   }

   public boolean isStop() {
      return stop;
   }
   public void setStop(boolean stop) {
      this.stop = stop;
   }

   public void run() {

      while(  !stop  ) {

         if (  runCount == 0 ) {
            System.out.println( threadName + ".insertLocal...");
            master.insertLocal();
         }

         System.out.println( threadName + ".checkInsertQueue...");
         master.checkInsertQueue();

//         System.out.println( threadName + ".searchLocal...");
//         master.searchLocal();

         System.out.println( threadName + ".searchLocalAndRemote...");
         master.searchLocalAndRemote();

         try {
            System.out.println( threadName + ".sleep..." + sleepTime +" ms" );
            sleep( sleepTime );
         } catch ( Exception e ) {
            e.printStackTrace();
         }
         ++runCount;
      }
   }
}


SlaveThread:
Code:
public class SlaveThread extends Thread {

   private SlaveTask slave;
   private String threadName;
   private long sleepTime;
   private int runCount;
   private boolean stop;

   public SlaveThread(long sleepTime, String threadName) {
      this.sleepTime = sleepTime;
      this.threadName = threadName;
      slave = new SlaveTask();
   }

   public boolean isStop() {
      return stop;
   }
   public void setStop(boolean stop) {
      this.stop = stop;
   }

   public void run() {
      SlaveRMI slaveRMI = null;

      try   {
         slaveRMI = new SlaveRMI();
      } catch ( Exception e ) {
         e.printStackTrace();
      }

      while(  !stop  ) {

         /*
            slave - index modification through JMS
         */
         if (  runCount > 0  &&  runCount % 10 == 0  ) {
            System.out.println( threadName + ".insert...");
            slave.insertThroughJMS();
         }
         System.out.println( threadName + ".searchLocal...");
//         slave.searchLocal();
         slave.searchLocalHalf();

//         /*
//            slave - offer slave's IndexSearcher
//         */
//         slave.produceSlaveIndexSearcher();

         /*
            expose slave node's IndexSearcher
         */

//         slaveRMI.exposeSlaveRemoteSearchables();
//         if (  runCount == 0  ) {
            System.out.println( threadName + ".exposeSlaveRemoteSearchables...");
            slave.exposeSlaveRemoteSearchables();
//         }

         try {
            System.out.println( threadName + ".sleep..." + sleepTime +" ms" );
            sleep( sleepTime );
         } catch ( Exception e ) {
            e.printStackTrace();
         }
         ++runCount;
      }
   }
}

then all you need to do is start MasterThread and SlaveThread on different machines respectively, and on the slave's side, you have to start the RMI before you run the thread. The trick of creating a ParallelMultiSearcher is let each IndexSearcher to search different Lucene index, then the ParallelMultiSearcher itself will handle the results. Hibernate Search still can do this job without using solr.

Here's one more thing I don't really understand, what are
AbstractJMSHibernateSearchController
BackendQueueProcessorFactory
JMSBackendQueueProcessor
JMSBackendQueueProcessorFactory
LuceneBackendQueueProcessor
LuceneBackendQueueProcessorFactory
?? When will I need them? And What are their responsibilities? Can anyone get me a direction or sample code??


Top
 Profile  
 
 Post subject:
PostPosted: Fri May 23, 2008 12:15 pm 
Hibernate Team
Hibernate Team

Joined: Fri Oct 05, 2007 4:47 pm
Posts: 2536
Location: Third rock from the Sun
Hi,
really impressive post; I'll need some time to fully understand it, but it certainly looks promising.

Manik Surtani, the lead developer for JBoss Cachehttp://www.jboss.org/jbosscache/, has some experimental code about putting the lucene index in the distributed cache, you could look at the code here:

http://anonsvn.jboss.org/repos/jbosscac ... bosscache/

Imagine the index being distributed safely across the cluster, and having searches being merged from multiple results using something like your code; it would be particularly efficient if you could select as searching machines the ones which are currently owning the cache nodes containing the relevant index segments, so to minimize traffic.

Also Lucene has the notion of Filters, something like searching in cached previous results, it could be useful to keep the filter-results in the caches themselves.

kind regards,

_________________
Sanne
http://in.relation.to/


Top
 Profile  
 
 Post subject:
PostPosted: Fri May 23, 2008 3:03 pm 
Hibernate Team
Hibernate Team

Joined: Sun Sep 14, 2003 3:54 am
Posts: 7256
Location: Paris, France
atoman wrote:
AbstractJMSHibernateSearchController
BackendQueueProcessorFactory
JMSBackendQueueProcessor
JMSBackendQueueProcessorFactory
LuceneBackendQueueProcessor
LuceneBackendQueueProcessorFactory
?? When will I need them? And What are their responsibilities? Can anyone get me a direction or sample code??


What you have is a long post I need some time to digest.
The new revision of Hibernate Search in Action will contain explainations for most of those classes in chapter 5. It should go out tonight or next tuesday.

Let me know if there is still something you don't get

_________________
Emmanuel


Top
 Profile  
 
 Post subject:
PostPosted: Fri May 23, 2008 3:14 pm 
Hibernate Team
Hibernate Team

Joined: Sun Sep 14, 2003 3:54 am
Posts: 7256
Location: Paris, France
Thinking more about the Query distribution.
I think it is a complete overhead and totally useless for most applications.

But in case it's needed I think it can be achieved by:
- providing a custom ReaderProvider implementation. Instead of opening an actual IndexReader on top of the local DirectoryProvider, they can delegate to a remote (RMI) Reader. A set of configurations can point to the right back end servers depending on the DP
- on the other side, you need a logic to find the DirectoryProviders and apply a ReaderProvider logic to build the remote Reader.

That way it totally stays into the HSearch paradigm.

that's an interesting idea that might be discussed during the GSoC project with JBoss Cache and Hibernate Search

_________________
Emmanuel


Top
 Profile  
 
Display posts from previous:  Sort by  
Forum locked This topic is locked, you cannot edit posts or make further replies.  [ 9 posts ] 

All times are UTC - 5 hours [ DST ]


You cannot post new topics in this forum
You cannot reply to topics in this forum
You cannot edit your posts in this forum
You cannot delete your posts in this forum

Search for:
© Copyright 2014, Red Hat Inc. All rights reserved. JBoss and Hibernate are registered trademarks and servicemarks of Red Hat, Inc.