Quote:
being on the Hibernate Search team I'd love to know more about your Solr integration :-)
Sure. We're currently using Solr and its DataimportHandler to incrementally update our indexes. We've recently integrate Apache Kafka into our infrastructure so we were going for more a real-time indexing solution by firing off indexing events any time one of our indexable entities changes.
Quote:
A thread scoped context would work fine, but it looks like just unneccessary overhead since you're closing it for each operation. You could as well just open a new one.
Opening a Session is not very costly, it's like instantiating a couple of HashMap instances.. so I guess you should try avoiding it for peak performance as it generates objects which need garbage collection.
Still, you're generating many more objects already so it might not winning you anything.
Let me try and clarify something. We are spawning off a small controlled number of threads that correspond to the number of partitions in Kafka. So if we have 6 Kafka partitions to read from then we are only spawning off 6 threads. We aren't spawning off a thread for each event. Here is an example of one of our threads. I've updated the dao with your suggestions.
Code:
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class KafkaConsumerThread implements Runnable {
private final KafkaStream stream;
public KafkaConsumerThread(KafkaStream stream) {
this.stream = stream;
}
@Override
public void run() {
ConsumerIterator iterator = stream.iterator();
while (iterator.hasNext()) {
try {
long id = pullIdFromMessage(stream.next().message());
Foo foo = fooDao.findById(id);
Indexer fooIndexer.index(foo);
} catch (Exception e) {
logger.error("Run Exception", e);
}
}
}
}
public class FooDao extends BaseDao<Foo> {
@Override
public Wantad findById(long id) {
Session session = getSession();
Transaction transaction = session.beginTransaction();
Foo wantad = null;
try {
foo = (Foo) session.createCriteria(Foo.class)
.add(Restrictions.eq("id", id))
.setReadOnly(true)
.setMaxResults(1)
.uniqueResult();
transaction.commit();
} catch (Exception e) {
transaction.rollback();
} finally {
session.clear();
}
}
}
Quote:
First problem is you're not committing the TX, you might be leaking them?
You could try and see if there are more elements in the queue before actually returning committing the current transaction (and closing the Session). In that case I would still clear the Session, but you can reuse the same one.
So is should always commit the transaction and then use session.clear? Should I somehow keep a session open for the lifetime of the thread?
Any more suggestions?