Hey Sanne,
Thanks for getting back to me on that.
I implemented my own class extending DirectoryBasedIndexManager. Depending on whether the trigger to update the index comes from 'whitin', i.e. a change in the database, or from the central JMS queue, I either have the node send the JMS update message or just update the local index, respectively.
I also implemented a class extending LuceneBackendQueueProcessor in which I send JMS messages to the central node, which distributes it to all subscribers. I send the message from within the applyWork method:
Code:
@Override
public void applyWork(List<LuceneWork> workList, IndexingMonitor monitor) {
if (workList == null) {
throw new IllegalArgumentException("workList should not be null");
}
logger.info("Calling applyWork: " + workList.size() + " | " + workList.get(0).getId().toString() + " | " + workList.get(0).getEntityClass().getCanonicalName());
StackTraceElement[] stackElements = Thread.currentThread().getStackTrace();
for (StackTraceElement ste : stackElements) {
if (ste.getMethodName().contains("doBeforeTransactionCompletion") && ste.getClassName().contains("DelegateToSynchronizationOnBeforeTx")) {
logger.info("skipping");
return;
}
}
this.applyWorkLucene(workList, monitor);
final List<LuceneWork> runnableWorkList = workList;
Runnable operation = new Runnable() {
@Override
public void run() {
List<LuceneWork> filteredQueue = new ArrayList<LuceneWork>(
runnableWorkList);
for (LuceneWork work : runnableWorkList) {
if (work instanceof OptimizeLuceneWork) {
// we don't want optimization to be propagated
filteredQueue.remove(work);
}
}
if (filteredQueue.size() == 0) {
return;
}
LuceneWorkSerializer serializer = indexManager.getSerializer();
byte[] data = serializer.toSerializedModel(filteredQueue);
try {
logger.info("Trying to update index <" + indexName
+ "> via JMS call from <" + rtcNode + ">");
sendMessage(data);
logger.info("Call made");
} catch (Exception e) {
e.printStackTrace();
logger.severe("unableToSendJMSWork for index " + indexName);
}
}
};
operation.run();
You can see at the beginning of the method that I added the dirty fix, which forces the method to only do its job once by checking the thread stacktrace.
The incoming JMS messages are handled by a simple MDB:
Code:
@Override
public void onMessage(Message message) {
logger.info("Got JMS message");
if (!(message instanceof ObjectMessage)) {
logger.severe("incorrectMessageType " + message.getClass());
return;
}
final ObjectMessage objectMessage = (ObjectMessage) message;
final String indexName;
final String rtcNode;
final List<LuceneWork> queue;
final CustomLuceneIndexManager indexManager;
SearchIntegrator searchIntegrator = getSearchIntegrator();
try {
indexName = objectMessage.getStringProperty("indexName");
rtcNode = objectMessage.getStringProperty("node");
if (rtcNode.equalsIgnoreCase(System.getProperty("jboss.node.name"))) {
logger.info(" ignoring message from current node <" + rtcNode
+ ">");
return;
}
logger.info(" for index " + indexName);
indexManager = (CustomLuceneIndexManager) searchIntegrator
.getIndexManager(indexName);
if (indexManager == null) {
logger.severe("messageReceivedForUndefinedIndex " + indexName);
return;
}
queue = indexManager.getSerializer().toLuceneWorks(
(byte[]) objectMessage.getObject());
indexManager.performOperationsLucene(queue, null);
logger.info(" FINISHED");
} catch (JMSException e) {
e.printStackTrace();
logger.severe("unableToRetrieveObjectFromMessage "
+ message.getClass());
return;
} catch (ClassCastException e) {
e.printStackTrace();
logger.severe("illegalObjectRetrievedFromMessage " + e.getMessage());
return;
}
}
The reason why I did all that and not used the out-of-the-box JMS solution is that the central node is just a queue server and itself doesn't maintain an index.
Is the double call a surprise to you? I thought that maybe there's an exception thrown somewhere deep in the trace and the transaction is restarted somehow but this doesn't seem to be the case. I looked in the HS source code and the double call is there explicitly.