Hello, Sanne
Quote:
IMHO you could get good performance using multiple harddisks and CPUs on a single machine, the remote calls between different search servers are a huge performance bottleneck, if you can avoid them.
Are your queries so slow? I am getting really impressive results using a single low-budget server; massive indexing has been a problem but the JMS pattern in Hibernate Search solved that nicely.
Actually, my quries are not slow at all, they are pretty fast, and that's the reason why Lucene impressed me so much. I inserted 3.6 million records into PostreSQL, each record contains about 200 Chinese word average, and they were all read randomly from a number of novels. The average time it took to search is about 30ms. The same query would take forevery for PosgreSQL, and I was doing this experiment on my laptop, which only has a single 2.0 GHz CUP with 2GB RAM, 7200 rpm hard disk.
Well, I knew the latest computers have multiple processors, or even RAID would be good enough for my case, but, as being a R&D, my job is to try my best to make my boss's dream a reality, and I myself also want to try as much as I can to see what Hibernate Search can do because I think the value of clustering are stability, failover and task sharing. Besides, if my 4 years old laptop can do the job, any other machines can do the job as well. What Hibernate Search impressed me is that it does a great job to integrate with Lucene, once persistance layer and full text search layer are set, you don't really have to worry about DB, because it basically can be replaced with any other DB, and still does a nice job on full text searching.
Quote:
If you find out you really need to use ParallelMultiSearcher executing on multiple machines maybe we could need to support Solr as a backend, please let us know about your experiences.
Also I think we could combine the index sharding with a "RemoteParallelMultiSearcher", to select a different shard on each node and combine the results, but personally I don't like all that network traffic being generated.
Thank you for your valuable opinion, you lead me to the right direction. YES, index sharding works! And I would like to share my rough experiment in this case, I tried this experiment on my laptop(the master) and a USB removable drive(VMware, simulating the slave), here's my hibernate.cfg.xml:
Code:
<!-- Master Mode -->
<property name="hibernate.search.default.sourceBase">
D:/LuceneSourceBase
</property>
<property name="hibernate.search.default.indexBase">
./lucene
</property>
<property name="hibernate.search.default.refresh">
60
</property>
<property name="hibernate.search.default.directory_provider">
org.hibernate.search.store.FSMasterDirectoryProvider
</property>
<!-- Slave Mode
<property name="hibernate.search.default.sourceBase">
\\10.168.127.3\D$\LuceneSourceBase
</property>
<property name="hibernate.search.default.indexBase">
./lucene
</property>
<property name="hibernate.search.default.refresh">
60
</property>
<property name="hibernate.search.default.directory_provider">
org.hibernate.search.store.FSSlaveDirectoryProvider
</property>
<property name="hibernate.searcch.worker.backend">
jms
</property>
<property name="org.hibernate.worker.jndi">
org.apache.activemq.jndi.ActiveMQInitialContextFactory
</property>
<property name="hibernate.search.worker.jms.connection_factory">
ConnectionFactory
</property>
<property name="hibernate.search.worker.jms.queue">
InsertObjectQueue
</property>
<property name="org.hibernate.worker.execution">
async
</property>
<property name="org.hibernate.worker.thread_pool.size">
2
</property>
<property name="org.hibernate.worker.buffer_queue.max">
10
</property>
<property name="hibernate.search.search.basicCluster.ClusterCat.sharding_strategy.nbr_of_shards">
4
</property>
-->
...
...
...
QueueHandler:
Code:
public class QueueHandler {
private QueueConnectionFactory queueConnectionFactory;
private QueueConnection queueConnection;
private QueueSession queueSession;
private Queue queue;
private Message message;
private ObjectMessage objMessage;
/* producer */
private QueueSender queueSender;
/* consumer */
private QueueReceiver queueReceiver;
public QueueHandler() {
/* Create the connection */
queueConnectionFactory =
new ActiveMQConnectionFactory( JMSConstants.USER,
JMSConstants.PASSWORD, JMSConstants.URL );
}
/*
slave produces insert object
*/
public void produceInsertObject(Serializable serial) {
try {
/* JMS */
queueConnection = queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession( true, 0 );
queue = queueSession.createQueue( JMSConstants.INSERT_OBJECT_QUEUE );
queueSender = queueSession.createSender( queue );
objMessage = queueSession.createObjectMessage();
objMessage.setObject( serial );
queueSender.send( objMessage );
System.out.println("producing InsertSerialiableObject queue...");
queueSession.commit();
} catch ( JMSException e ) {
try {
queueSession.rollback();
} catch ( JMSException jmse ) {
jmse.printStackTrace();
}
e.printStackTrace();
} finally {
if ( queueConnection != null ) {
try {
queueConnection.close();
} catch ( JMSException jmse ) {
jmse.printStackTrace();
}
}
}
}
/*
master consume the insert object queue
*/
public void consumeInsertObject(MasterTask masterTask) {
try {
queueConnection = queueConnectionFactory.createQueueConnection();
queueSession = queueConnection.createQueueSession( true, 0 );
queue = queueSession.createQueue( JMSConstants.INSERT_OBJECT_QUEUE );
queueReceiver = queueSession.createReceiver( queue );
// queueReceiver.setMessageListener(
// new QueueListener( queueSession, masterTask ) );
queueConnection.start();
while ( ( message = queueReceiver.receive( 1000 ) ) != null ){
System.out.println("has more message left...");
/* ObjectMessage */
if ( message instanceof ObjectMessage ) {
System.out.println("Reading message...");
objMessage = (ObjectMessage)message;
Serializable serial = (Serializable)objMessage.getObject();
/* call master to insert this serializable */
masterTask.insertSlaveCommands( serial );
} else {
System.out.println("not an object message...");
}
try {
Thread.sleep( 100 );
} catch ( Exception e ){
e.printStackTrace();
}
}
queueSession.commit();
} catch ( JMSException e ) {
try {
queueSession.rollback();
} catch ( JMSException jmse ) {
jmse.printStackTrace();
}
e.printStackTrace();
} finally {
if ( queueConnection != null ) {
try {
queueConnection.close();
} catch ( JMSException jmse ) {
jmse.printStackTrace();
}
}
}
}
}
ISlaveRMI:
Code:
public interface ISlaveRMI extends Remote, Serializable {
public RemoteSearchable getSlaveRemoteSearchable(Class theClass) throws Exception;
}
SlaveRMI:
Code:
public class SlaveRMI implements ISlaveRMI {
/**
* serialVersionUID
*/
private static final long serialVersionUID = -3135131536379687733L;
public SlaveRMI() throws RemoteException {}
public RemoteSearchable getSlaveRemoteSearchable(Class theClass) throws RemoteException {
Session session = null;
FullTextSession ftSession = null;
SearchFactory searchFactory = null;
DirectoryProvider[] dirProviders = null;
DirectoryProvider[] halfDirProviders = null;
ReaderProvider readerProvider = null;
IndexReader indexReader = null;
IndexSearcher indexSearcher = null;
RemoteSearchable slaveRemoteSearchable = null;
try {
session = HibernateUtil.getSessionFactory().openSession();
ftSession = Search.createFullTextSession( session );
searchFactory = ftSession.getSearchFactory();
readerProvider = searchFactory.getReaderProvider();
dirProviders = searchFactory.getDirectoryProviders( theClass );
System.out.println("dirProviders.length = "+dirProviders.length);
/* split the original dirProviders into two parts, and
put either one of them into the indexReader */
halfDirProviders = new DirectoryProvider[]{
dirProviders[ 2 ], dirProviders[ 3 ]
};
// indexReader = readerProvider.openReader( dirProviders );
indexReader = readerProvider.openReader( halfDirProviders );
indexSearcher = new IndexSearcher( indexReader );
slaveRemoteSearchable = new RemoteSearchable( indexSearcher );
} catch ( Exception e ) {
e.printStackTrace();
}
return slaveRemoteSearchable;
}
}
MasterRMI:
Code:
public class MasterRMI {
public ParallelMultiSearcher getParallelMultiSearcher() {
Registry registry = null;
Session session = null;
FullTextSession ftSession = null;
SearchFactory searchFactory = null;
DirectoryProvider[] dirProviders = null;
DirectoryProvider[] halfDirProviders = null;
ReaderProvider readerProvider = null;
IndexReader indexReader = null;
IndexSearcher indexSearcher = null;
// RemoteSearchable slaveRemoteSearchables = null;
Searchable slaveRemoteSearchables = null;
ParallelMultiSearcher parallelMultiSearcher = null;
session = HibernateUtil.getSessionFactory().openSession();
ftSession = Search.createFullTextSession( session );
searchFactory = ftSession.getSearchFactory();
dirProviders = searchFactory.getDirectoryProviders( ClusterCat.class );
readerProvider = searchFactory.getReaderProvider();
System.out.println("dirProviders.length = "+dirProviders.length);
/* split the original dirProviders into two parts, and
put either one of them into the indexReader */
halfDirProviders = new DirectoryProvider[]{
dirProviders[ 0 ], dirProviders[ 1 ]
};
// indexReader = readerProvider.openReader( dirProviders );
indexReader = readerProvider.openReader( halfDirProviders );
/* master's searchable */
indexSearcher = new IndexSearcher( indexReader );
/* slave's searchable */
try {
registry = LocateRegistry.getRegistry( RMIConstants.IP );
// slaveRemoteSearchables = (RemoteSearchable)registry.lookup( RMIConstants.SLAVE_REMOTE_SEARCHABLE );
ISlaveRMI iSlaveRMI = (ISlaveRMI)registry.lookup( RMIConstants.ISLAVE_RMI );
// slaveRemoteSearchables = iSlaveRMI.getSlaveRemoteSearchable( search.basicCluster.ClusterCat.class );
slaveRemoteSearchables = (Searchable)(registry.lookup( RMIConstants.SLAVE_REMOTE_SEARCHABLE) );
/* build a ParallelMultiSearcher */
parallelMultiSearcher = new ParallelMultiSearcher(
new Searchable[]{ indexSearcher, slaveRemoteSearchables } );
System.out.println("============== master + slave ================");
} catch ( Exception e ) {
try {
parallelMultiSearcher = new ParallelMultiSearcher(
new Searchable[]{ indexSearcher } );
System.out.println("============== master ================");
} catch ( IOException ioe ) {
ioe.printStackTrace();
}
}
return parallelMultiSearcher;
}
}
MasterTask:
Code:
public class MasterTask extends DemoContent {
public void insertLocal() {
Session session = HibernateUtil.getSessionFactory().openSession();
FullTextSession ftSession = Search.createFullTextSession( session );
Transaction tx = ftSession.beginTransaction();
Criteria criteria = session.createCriteria( ClusterCat.class );
criteria.setProjection( Projections.max("cid") );
ClusterCat cat = null;
int rowCount = 0;
for ( int i = 0; i < 10; i++ ) {
try {
rowCount = new Integer( criteria.list().iterator().next().toString() ).intValue() + 1;
} catch ( Exception e ) {
rowCount = 1;
}
cat = new ClusterCat();
cat.setName( "CatName - " + rowCount );
cat.setPrice( DemoUtil.getARandPrice() );
cat.setDescription( DemoConstants.newsArr[ DemoUtil.getANewsRandNum() ] );
ftSession.save( cat );
}
commitTx( tx );
}
public void insertSlaveCommands( Serializable serial ) {
Session session = HibernateUtil.getSessionFactory().openSession();
FullTextSession ftSession = Search.createFullTextSession( session );
Transaction tx = ftSession.beginTransaction();
ftSession.save( serial );
commitTx( tx );
}
public void checkInsertQueue() {
// do {
JMSBean.getQueueHandler().consumeInsertObject( this );
// if ( serial != null ) {
// insertSlaveCommands( serial );
// }
// } while( serial != null );
}
/*
master - search local
*/
public void searchLocal() {
Session session = HibernateUtil.currentSession();
FullTextSession ftSession = Search.createFullTextSession( session );
Transaction tx = ftSession.beginTransaction();
QueryParser parser = new QueryParser( "description", new StandardAnalyzer() );
org.apache.lucene.search.Query luceneQuery = null;
try{
luceneQuery = parser.parse( "bus" );
}catch( Exception e ){
e.printStackTrace();
}
long start = System.currentTimeMillis();
FullTextQuery ftQuery = ftSession.createFullTextQuery( luceneQuery, ClusterCat.class );
long end = System.currentTimeMillis();
System.out.println("search.time = " + ( end - start ) + " ms");
/* sort by cid */
Sort sort = new Sort(
new SortField[]{
new SortField("cid", true)
}
);
ftQuery.setSort( sort );
ftQuery.setMaxResults( 5 );
List result = null;
try {
result = ftQuery.list();
} catch ( Exception e ) {
result = new ArrayList();
}
/* commit */
commitTx( tx );
FancyPrinter.getInstance().print( result.iterator(), true );
HibernateUtil.closeSession();
}
public void searchLocalAndRemote() {
ParallelMultiSearcher parallelMultiSearcher = null;
Hits hits = null;
try {
/* parallelMultiSearcher */
parallelMultiSearcher = new MasterRMI().getParallelMultiSearcher();
QueryParser parser = new QueryParser( "description", new StandardAnalyzer() );
org.apache.lucene.search.Query luceneQuery = null;
try{
luceneQuery = parser.parse( "bus" );
}catch( Exception e ){
e.printStackTrace();
}
//sorting
Sort sort = new Sort(
new SortField[] {
new SortField( "cid", SortField.INT, true )
}
);
long start = System.currentTimeMillis();
hits = parallelMultiSearcher.search( luceneQuery, sort );
long end = System.currentTimeMillis();
System.out.println("parallelSearch.total time spent = " + ( end - start ) + " ms" );
// FancyPrinter.getInstance().print(
// LuceneUtil.getInstance().readHits( hits ).iterator(), true );
FancyPrinter.getInstance().setCheckMethodPrefix( false );
FancyPrinter.getInstance().print(
LuceneUtil.getInstance().readHits( hits, 5 ).iterator(), true );
} catch ( Exception e ) {
e.printStackTrace();
}
HibernateUtil.closeSession();
}
}
SlaveTask:
Code:
public class SlaveTask extends DemoContent {
/*
slave - insert through JMS
*/
public void insertThroughJMS() {
ClusterCat cat = null;
for ( int i = 0; i < 10; i++ ) {
cat = new ClusterCat();
cat.setName( "CatName - " + DemoUtil.getARandPrice() );
cat.setPrice( DemoUtil.getARandPrice() );
cat.setDescription( DemoConstants.newsArr[ DemoUtil.getANewsRandNum() ] );
JMSBean.getQueueHandler().produceInsertObject( cat );
}
}
/*
slave - search local
*/
public void searchLocal() {
Session session = HibernateUtil.currentSession();
FullTextSession ftSession = Search.createFullTextSession( session );
Transaction tx = ftSession.beginTransaction();
QueryParser parser = new QueryParser( "description", new StandardAnalyzer() );
org.apache.lucene.search.Query luceneQuery = null;
try{
luceneQuery = parser.parse( "bush" );
}catch( Exception e ){
e.printStackTrace();
}
long start = System.currentTimeMillis();
FullTextQuery ftQuery = ftSession.createFullTextQuery( luceneQuery, ClusterCat.class );
long end = System.currentTimeMillis();
System.out.println("search.time = " + ( end - start ) + " ms");
/* sort by cid */
Sort sort = new Sort(
new SortField[]{
new SortField("cid", true)
}
);
ftQuery.setSort( sort );
ftQuery.setMaxResults( 5 );
List result = null;
try {
result = ftQuery.list();
} catch ( Exception e ) {
result = new ArrayList();
}
/* commit */
commitTx( tx );
FancyPrinter.getInstance().print( result.iterator(), true );
HibernateUtil.closeSession();
}
/*
slave - search local half index
*/
public void searchLocalHalf() {
Session session = null;
FullTextSession ftSession = null;
SearchFactory searchFactory = null;
DirectoryProvider[] dirProviders = null;
DirectoryProvider[] halfDirProviders = null;
ReaderProvider readerProvider = null;
IndexReader indexReader = null;
IndexSearcher indexSearcher = null;
Hits hits = null;
try {
session = HibernateUtil.getSessionFactory().openSession();
ftSession = Search.createFullTextSession( session );
searchFactory = ftSession.getSearchFactory();
readerProvider = searchFactory.getReaderProvider();
dirProviders = searchFactory.getDirectoryProviders( ClusterCat.class );
System.out.println("dirProviders.length = "+dirProviders.length);
/* split the original dirProviders into two parts, and
put either one of them into the indexReader */
halfDirProviders = new DirectoryProvider[]{
dirProviders[ 2 ], dirProviders[ 3 ]
};
// indexReader = readerProvider.openReader( dirProviders );
indexReader = readerProvider.openReader( halfDirProviders );
indexSearcher = new IndexSearcher( indexReader );
QueryParser parser = new QueryParser( "description", new StandardAnalyzer() );
org.apache.lucene.search.Query luceneQuery = parser.parse( "bush" );
//sorting
Sort sort = new Sort(
new SortField[] {
new SortField( "cid", SortField.INT, true )
}
);
long start = System.currentTimeMillis();
hits = indexSearcher.search( luceneQuery, sort );
long end = System.currentTimeMillis();
System.out.println("parallelSearch.total time spent = " + ( end - start ) + " ms" );
FancyPrinter.getInstance().setCheckMethodPrefix( false );
FancyPrinter.getInstance().print(
LuceneUtil.getInstance().readHits( hits, 5 ).iterator(), true );
HibernateUtil.closeSession();
} catch ( Exception e ) {
e.printStackTrace();
}
}
public void exposeSlaveRemoteSearchables() {
ISlaveRMI iSlaveRMI = null;
SlaveRMI slaveRMI = null;
RemoteSearchable parallelSearcher = null;
Registry registry = null;
Class theClass = ClusterCat.class;
try {
slaveRMI = new SlaveRMI();
iSlaveRMI = (ISlaveRMI)UnicastRemoteObject.exportObject( slaveRMI, RMIConstants.PORT );
//ParallelMultiSearcher
parallelSearcher = slaveRMI.getSlaveRemoteSearchable( theClass );
registry = LocateRegistry.getRegistry();
registry.rebind( RMIConstants.ISLAVE_RMI, iSlaveRMI );
registry.rebind( RMIConstants.SLAVE_REMOTE_SEARCHABLE, parallelSearcher);
System.out.println("SlaveRMI ready...");
} catch ( Exception e ) {
e.printStackTrace();
}
}
}
MasterThread:
Code:
public class MasterThread extends Thread {
private MasterTask master;
private String threadName;
private long sleepTime;
private int runCount;
private boolean stop;
public MasterThread(long sleepTime, String threadName) {
this.sleepTime = sleepTime;
this.threadName = threadName;
master = new MasterTask();
}
public boolean isStop() {
return stop;
}
public void setStop(boolean stop) {
this.stop = stop;
}
public void run() {
while( !stop ) {
if ( runCount == 0 ) {
System.out.println( threadName + ".insertLocal...");
master.insertLocal();
}
System.out.println( threadName + ".checkInsertQueue...");
master.checkInsertQueue();
// System.out.println( threadName + ".searchLocal...");
// master.searchLocal();
System.out.println( threadName + ".searchLocalAndRemote...");
master.searchLocalAndRemote();
try {
System.out.println( threadName + ".sleep..." + sleepTime +" ms" );
sleep( sleepTime );
} catch ( Exception e ) {
e.printStackTrace();
}
++runCount;
}
}
}
SlaveThread:
Code:
public class SlaveThread extends Thread {
private SlaveTask slave;
private String threadName;
private long sleepTime;
private int runCount;
private boolean stop;
public SlaveThread(long sleepTime, String threadName) {
this.sleepTime = sleepTime;
this.threadName = threadName;
slave = new SlaveTask();
}
public boolean isStop() {
return stop;
}
public void setStop(boolean stop) {
this.stop = stop;
}
public void run() {
SlaveRMI slaveRMI = null;
try {
slaveRMI = new SlaveRMI();
} catch ( Exception e ) {
e.printStackTrace();
}
while( !stop ) {
/*
slave - index modification through JMS
*/
if ( runCount > 0 && runCount % 10 == 0 ) {
System.out.println( threadName + ".insert...");
slave.insertThroughJMS();
}
System.out.println( threadName + ".searchLocal...");
// slave.searchLocal();
slave.searchLocalHalf();
// /*
// slave - offer slave's IndexSearcher
// */
// slave.produceSlaveIndexSearcher();
/*
expose slave node's IndexSearcher
*/
// slaveRMI.exposeSlaveRemoteSearchables();
// if ( runCount == 0 ) {
System.out.println( threadName + ".exposeSlaveRemoteSearchables...");
slave.exposeSlaveRemoteSearchables();
// }
try {
System.out.println( threadName + ".sleep..." + sleepTime +" ms" );
sleep( sleepTime );
} catch ( Exception e ) {
e.printStackTrace();
}
++runCount;
}
}
}
then all you need to do is start MasterThread and SlaveThread on different machines respectively, and on the slave's side, you have to start the RMI before you run the thread. The trick of creating a ParallelMultiSearcher is let each IndexSearcher to search different Lucene index, then the ParallelMultiSearcher itself will handle the results. Hibernate Search still can do this job without using solr.
Here's one more thing I don't really understand, what are
AbstractJMSHibernateSearchController
BackendQueueProcessorFactory
JMSBackendQueueProcessor
JMSBackendQueueProcessorFactory
LuceneBackendQueueProcessor
LuceneBackendQueueProcessorFactory
?? When will I need them? And What are their responsibilities? Can anyone get me a direction or sample code??