In attempting to set up master-slave indexing using JMS/ActiveMQ, I am running into a weird problem on the master side when receiving the JMS message. The situation is the slave posts a delete-lucene-work and add-lucene-work message on the queue. The master then goes into an apparent infinite loop, as in:
Code:
received message
WARN [org.springframework.jms.listener.DefaultMessageListenerContainer#0-7] (BrokerRegistry.java:52) lookup - Broker localhost not started so using jms-broker instead
INFO [org.springframework.jms.listener.DefaultMessageListenerContainer#0-7] (TransportConnector.java:250) start - Connector vm://localhost Started
INFO [org.springframework.jms.listener.DefaultMessageListenerContainer#0-7] (TransportConnector.java:285) stop - Connector vm://localhost Stopped
received message
WARN [org.springframework.jms.listener.DefaultMessageListenerContainer#0-7] (BrokerRegistry.java:52) lookup - Broker localhost not started so using jms-broker instead
INFO [org.springframework.jms.listener.DefaultMessageListenerContainer#0-7] (TransportConnector.java:250) start - Connector vm://localhost Started
INFO [org.springframework.jms.listener.DefaultMessageListenerContainer#0-7] (TransportConnector.java:285) stop - Connector vm://localhost Stopped
received message
etc...
I'm instatiating ActiveMQ as an embedded broker in the master, with two transports: vm and tcp.
My configuration:
Code:
<!-- ActiveMQ Configuration -->
<amq:broker
id="jmsBroker"
brokerName="jms-broker"
persistent="false"
start="true"
>
<amq:destinations>
<amq:queue id="hibernateSearchQueue" physicalName="queue/HibernateSearch" />
</amq:destinations>
<!--
<amq:persistenceAdapter>
<amq:amqPersistenceAdapter directory="${jms.provider.workDir}/data" maxFileLength="32mb"/>
</amq:persistenceAdapter>
-->
<amq:transportConnectors>
<amq:transportConnector uri="${jms.provider.url}"/>
<amq:transportConnector uri="vm://jms-broker"/>
</amq:transportConnectors>
</amq:broker>
<amq:connectionFactory id="jmsConnectionFactory" brokerURL="vm://jms-broker"/>
<!-- Hibernate Configuration -->
<bean id="sessionFactory" class="org.springframework.orm.hibernate3.annotation.AnnotationSessionFactoryBean" depends-on="jmsBroker" lazy-init="true">
<property name="packagesToScan" value="${service.model.packages}"/>
<property name="hibernateProperties">
<props>
<prop key="hibernate.dialect">${hibernate.dialect}</prop>
<prop key="hibernate.show_sql">${hibernate.show_sql}</prop>
<prop key="hibernate.format_sql">${hibernate.format_sql}</prop>
<prop key="hibernate.use_outer_join">${hibernate.use_outer_join}</prop>
<prop key="hibernate.byte_code.use_reflection_optimizer">${hibernate.byte_code.use_reflection_optimizer}</prop>
<!-- Search Configuration -->
<prop key="hibernate.search.default.optimizer.operation_limit.max">${hibernate.search.default.optimizer.operation_limit.max}</prop>
<prop key="hibernate.search.default.optimizer.transaction_limit.max">${hibernate.search.default.optimizer.transaction_limit.max}</prop>
<prop key="hibernate.search.default.indexBase">${hibernate.search.default.indexBase}</prop>
<prop key="hibernate.search.default.refresh">${hibernate.search.default.refresh}</prop>
<prop key="hibernate.search.default.sourceBase">${hibernate.search.default.sourceBase}</prop>
<prop key="hibernate.search.worker.backend">jms</prop>
<prop key="hibernate.search.worker.jms.connection_factory">ConnectionFactory</prop>
<prop key="hibernate.search.worker.jms.queue">HibernateSearchQueue</prop>
<!-- Search Master Configuration -->
<prop key="hibernate.search.default.directory_provider">org.hibernate.search.store.FSMasterDirectoryProvider</prop>
<prop key="hibernate.search.default.exclusive_index_use">true</prop>
</props>
</property>
<property name="dataSource">
<ref bean="dataSource" />
</property>
<property name="lobHandler">
<bean class="org.springframework.jdbc.support.lob.DefaultLobHandler" />
</property>
</bean>
<!-- JMS Index-Update Messages -->
<bean id="searchQueueListener" class="org.csmc.indexer.persistence.impl.hibernate.SearchQueueListener">
<constructor-arg><ref bean="sessionFactory" /></constructor-arg>
</bean>
<jms:listener-container connection-factory="jmsConnectionFactory" transaction-manager="transactionManager">
<jms:listener destination="queue/HibernateSearch" ref="searchQueueListener" />
</jms:listener-container>
<!-- Transactions -->
<bean id="transactionManager" class="org.springframework.orm.hibernate3.HibernateTransactionManager">
<property name="sessionFactory" ref="sessionFactory"/>
</bean>
My search listener class:
Code:
public class SearchQueueListener extends AbstractJMSHibernateSearchController
{
//~ Members ----------------------------------------------------------------------------------------------------------
private SessionFactory sessionFactory;
//~ Constructors -----------------------------------------------------------------------------------------------------
public SearchQueueListener(SessionFactory sessionFactory)
{
this.sessionFactory = sessionFactory;
}
//~ Methods ----------------------------------------------------------------------------------------------------------
@Override
public void onMessage(Message message)
{
System.out.println("received message");
super.onMessage(message);
}
@Override
protected void cleanSessionIfNeeded(Session session)
{
SessionFactoryUtils.releaseSession(session, sessionFactory);
}
@Override
protected Session getSession()
{
return SessionFactoryUtils.getSession(sessionFactory, true);
}
My /jndi.properties:
Code:
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = vm://localhost
connectionFactoryNames = ConnectionFactory
queue.HibernateSearchQueue = queue/HibernateSearch
What could be going on?
Michael