Hibernate Books

All times are UTC - 5 hours [ DST ]



Post new topic Reply to topic  [ 4 posts ] 
Author Message
 Post subject: Backend calls applyWork twice if enlist_in_transaction=true
PostPosted: Mon Dec 05, 2016 6:13 pm 
Beginner
Beginner

Joined: Mon Feb 16, 2015 6:41 am
Posts: 32
Location: Lodz, Poland
I am using a custom backend to send index updates over JMS to a central location which then propagates them to all other nodes. The central location itself does not run the application but is just a communication hub. That's why I was not able to use the default JMS backend.

In order to be able to send messages from my implementation of LuceneBackendQueueProcessor I had to enable the option:

Code:
    <property name="hibernate.search.worker.enlist_in_transaction" value="true" />


However, this causes the applyWork method to be called twice whenever something needs to be updated in the index. This, in turn, causes my index to become corrupt. If I disable this option, the updates work fine but I can no longer send JMS messages because the following code must be called within a transaction:

Code:
InitialContext ic = new InitialContext();
ConnectionFactory connectionFactory = (ConnectionFactory) ic.lookup(JMS_CONNECTION_FACTORY_JNDI);
context = connectionFactory.createContext();
ObjectMessage message = context.createObjectMessage();


Am I doing something wrong? How can I avoid this issue?


Last edited by pawel.predki on Mon Dec 05, 2016 8:22 pm, edited 1 time in total.

Top
 Profile  
 
 Post subject: Re: Custom JMS backend calls applyWork twice
PostPosted: Mon Dec 05, 2016 8:22 pm 
Beginner
Beginner

Joined: Mon Feb 16, 2015 6:41 am
Posts: 32
Location: Lodz, Poland
I looked at the stack trace of the two calls to
Code:
performWorks
and I see that they are indeed called from two different places:

Code:
at org.hibernate.search.backend.impl.EventSourceTransactionContext$DelegateToSynchronizationOnBeforeTx.doBeforeTransactionCompletion(EventSourceTransactionContext.java:169)

at org.hibernate.search.backend.impl.EventSourceTransactionContext$BeforeCommitSynchronizationDelegator.beforeCompletion(EventSourceTransactionContext.java:204)


Is that by design? Should this be done twice? In both cases the rest of the stack trace is the same and it involves

Code:
InTransactionWorkQueueSynchronization
BatchedQueueingProcessor
WorkQueuePerIndexSplitter


Top
 Profile  
 
 Post subject: Re: Backend calls applyWork twice if enlist_in_transaction=true
PostPosted: Thu Dec 08, 2016 8:28 am 
Hibernate Team
Hibernate Team

Joined: Fri Oct 05, 2007 4:47 pm
Posts: 2525
Location: Third rock from the Sun
Hi Pawel,
very interesting, thanks for the insight. I'm wondering if you're having more than one problem though, which makes this a bit confusing. Could we start by understanding how this relates to you getting the index corrupted? Which component is handling the index writes, I assume you had to customize that as well?

Thanks,
Sanne

_________________
Sanne
http://in.relation.to/


Top
 Profile  
 
 Post subject: Re: Backend calls applyWork twice if enlist_in_transaction=true
PostPosted: Sun Dec 11, 2016 10:23 am 
Beginner
Beginner

Joined: Mon Feb 16, 2015 6:41 am
Posts: 32
Location: Lodz, Poland
Hey Sanne,

Thanks for getting back to me on that.

I implemented my own class extending DirectoryBasedIndexManager. Depending on whether the trigger to update the index comes from 'whitin', i.e. a change in the database, or from the central JMS queue, I either have the node send the JMS update message or just update the local index, respectively.

I also implemented a class extending LuceneBackendQueueProcessor in which I send JMS messages to the central node, which distributes it to all subscribers. I send the message from within the applyWork method:

Code:
        @Override
   public void applyWork(List<LuceneWork> workList, IndexingMonitor monitor) {
      if (workList == null) {
         throw new IllegalArgumentException("workList should not be null");
      }
      logger.info("Calling applyWork: " + workList.size() + " | " + workList.get(0).getId().toString() + " | " + workList.get(0).getEntityClass().getCanonicalName());
      StackTraceElement[] stackElements = Thread.currentThread().getStackTrace();
      for (StackTraceElement ste : stackElements) {
         if (ste.getMethodName().contains("doBeforeTransactionCompletion") && ste.getClassName().contains("DelegateToSynchronizationOnBeforeTx")) {
            logger.info("skipping");
            return;
         }
      }
      this.applyWorkLucene(workList, monitor);
      final List<LuceneWork> runnableWorkList = workList;

      Runnable operation = new Runnable() {
         @Override
         public void run() {
            List<LuceneWork> filteredQueue = new ArrayList<LuceneWork>(
                  runnableWorkList);
            for (LuceneWork work : runnableWorkList) {
               if (work instanceof OptimizeLuceneWork) {
                  // we don't want optimization to be propagated
                  filteredQueue.remove(work);
               }
            }
            if (filteredQueue.size() == 0) {
               return;
            }
            LuceneWorkSerializer serializer = indexManager.getSerializer();
            byte[] data = serializer.toSerializedModel(filteredQueue);
            try {
               logger.info("Trying to update index <" + indexName
                     + "> via JMS call from <" + rtcNode + ">");

               sendMessage(data);

               logger.info("Call made");
            } catch (Exception e) {
               e.printStackTrace();
               logger.severe("unableToSendJMSWork for index " + indexName);
            }

         }
      };
      operation.run();


You can see at the beginning of the method that I added the dirty fix, which forces the method to only do its job once by checking the thread stacktrace.

The incoming JMS messages are handled by a simple MDB:

Code:
@Override
   public void onMessage(Message message) {
      logger.info("Got JMS message");
      if (!(message instanceof ObjectMessage)) {
         logger.severe("incorrectMessageType " + message.getClass());
         return;
      }
      final ObjectMessage objectMessage = (ObjectMessage) message;
      final String indexName;
      final String rtcNode;
      final List<LuceneWork> queue;
      final CustomLuceneIndexManager indexManager;
      SearchIntegrator searchIntegrator = getSearchIntegrator();

      try {
         indexName = objectMessage.getStringProperty("indexName");
         rtcNode = objectMessage.getStringProperty("node");
         if (rtcNode.equalsIgnoreCase(System.getProperty("jboss.node.name"))) {
            logger.info("   ignoring message from current node <" + rtcNode
                  + ">");
            return;
         }
         logger.info("   for index " + indexName);
         indexManager = (CustomLuceneIndexManager) searchIntegrator
               .getIndexManager(indexName);
         if (indexManager == null) {
            logger.severe("messageReceivedForUndefinedIndex " + indexName);
            return;
         }
         queue = indexManager.getSerializer().toLuceneWorks(
               (byte[]) objectMessage.getObject());
         indexManager.performOperationsLucene(queue, null);
         logger.info("   FINISHED");
      } catch (JMSException e) {
         e.printStackTrace();
         logger.severe("unableToRetrieveObjectFromMessage "
               + message.getClass());
         return;
      } catch (ClassCastException e) {
         e.printStackTrace();
         logger.severe("illegalObjectRetrievedFromMessage " + e.getMessage());
         return;
      }
   }


The reason why I did all that and not used the out-of-the-box JMS solution is that the central node is just a queue server and itself doesn't maintain an index.

Is the double call a surprise to you? I thought that maybe there's an exception thrown somewhere deep in the trace and the transaction is restarted somehow but this doesn't seem to be the case. I looked in the HS source code and the double call is there explicitly.


Top
 Profile  
 
Display posts from previous:  Sort by  
Post new topic Reply to topic  [ 4 posts ] 

All times are UTC - 5 hours [ DST ]


You cannot post new topics in this forum
You cannot reply to topics in this forum
You cannot edit your posts in this forum
You cannot delete your posts in this forum

Search for:
© Copyright 2014, Red Hat Inc. All rights reserved. JBoss and Hibernate are registered trademarks and servicemarks of Red Hat, Inc.