I have used embedded EJB to write an importer that reads account records from a CSV file then writes them to a mysql database. I have successfully run the program importing > 500,000 account records without any problems - because it was the first run of the importer entityManager.persist() will have been called for each record.
I have now tried running it against a new file containing ~4000 records. There are 18 new account records in there, but the rest of the records are updates to accounts that already exist in my database therefore I will be calling entityManager.merge() the majority of the time. After processing only ~300 records I had an out of memory exception. I have tried increasing the maximum heap size to 512M but this just delays the problem.
Early on in my development I had problems with out of memory exceptions when processing large numbers of records. To get around this problem I split the files into smaller chunks and then process each chunk within a new transactions.
The code that writes away the account records is as follows:
Code:
/**
* Imports the accounts data file.
*
* @param file the accounts file.
*
* @return the number of records imported.
*
* @throws OCSSImporterException
*/
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public int doImport(final File file)
throws OCSSImporterException
{
// This does entityManager.setFlushMode(FlushModeType.COMMIT);
getFlushDao().setFlushModeCommit();
try {
final Parser parser = new Parser(file);
int cnt = 0;
String [] line;
while ((line = parser.nextLine()) != null) {
final Account account;
final AccountStatus accStatus;
final AccountStatusSpark accStatusSpark;
final String accCde = removeQuotes(line[ACCOUNT_SALENO]);
final List<Account> accounts =
getAccountDao().findByHouseAndOrigRef(FULFILLMENT_HOUSE_ID, accCde);
switch (accounts.size()) {
case 0:
// Create a new one
account = new Account();
accStatus = new AccountStatus();
accStatusSpark = new AccountStatusSpark();
break;
case 1:
// Update the existing one
account = accounts.get(0);
accStatus = getAccountStatusDao().findById(account.getId());
accStatusSpark = getAccountStatusSparkDao().findById(account.getId());
if (accStatus == null) {
throw new OCSSImporterException("No account status record for account " + account.getId());
}
break;
default:
throw new OCSSImporterException("Duplicate account [" + accCde + "] " +
"found for fulfillment house " + FULFILLMENT_HOUSE_ID);
}
updateAccount(account, line);
updateAccStatusSpark(account, accStatusSpark, line);
updateAccStatus(account, accStatusSpark, accStatus, line);
cnt++;
}
parser.close();
return cnt;
}
catch (IOException cause) {
throw new OCSSImporterException("Failed to read account data file [" + file.getAbsolutePath() + "] - " +
cause.getMessage(), cause);
}
}
/**
* Updates an account bean and uses it to create a new or update
* an existing entry in the database.
*
* @param account the bean to update.
* @param line a CSV parsed line used to populate the account bean.
*
* @throws OCSSImporterException
*/
private void updateAccount(final Account account, final String [] line)
throws OCSSImporterException
{
Address delAddr = account.getDeliveryAddress();
Address invAddr = account.getInvoiceAddress();
if (delAddr == null) {
// New account so create a new address entry and set the invoice address and
// delivery address to be the same for now.
invAddr = delAddr = new Address();
}
updateDelAddr(delAddr, line);
account.setOriginalRef(removeQuotes(line[ACCOUNT_SALENO]));
account.setInvoiceAddress(delAddr);
account.setDeliveryAddress(invAddr);
// The title codes in the account file have an extra digit on the end
// therefore the use of substring is required.
String title = removeQuotes(line[ACCOUNT_TITLE]);
title = title.substring(0, title.length() - 1);
if (account.getTitle() == null || !account.getTitle().getTitleCode().equals(title)) {
final List<Title> titles =
getTitleDao().findByHouseAndCode(FULFILLMENT_HOUSE_ID, title);
switch (titles.size()) {
case 0:
throw new OCSSImporterException("Failed to import account [" + account.getOriginalRef() + "]. " +
"Cannot find title for code : " + title);
case 1:
account.setTitle(titles.get(0));
break;
default:
throw new OCSSImporterException("Failed to import account. Duplicate title [" + title + "] found " +
"for fulfillment house " + FULFILLMENT_HOUSE_ID);
}
}
account.setCurrencySymbol(convertCurrToSymbol(removeQuotes(line[ACCOUNT_CURR])));
try {
account.setStartDate(df.parse(line[ACCOUNT_START]));
}
catch (ParseException cause) {
account.setStartDate(null);
}
try {
account.setCancelDate(df.parse(line[ACCOUNT_CANDATE]));
}
catch (ParseException cause) {
account.setCancelDate(null);
}
account.setEmail(removeQuotes(line[ACCOUNT_EMAIL]));
getAccountDao().saveOrUpdate(account);
}
/**
* Updates an account status bean and uses it to update an existing or
* create a new entry in the database.
*
* @param account the account the status is for.
* @param accStatusSpark the account status spark bean.
* @param accStatus the bean to update.
* @param line a CSV parsed line used to populate the account bean.
*
* @throws OCSSImporterException
*/
private void updateAccStatus(final Account account,
final AccountStatusSpark accStatusSpark,
final AccountStatus accStatus,
final String [] line)
throws OCSSImporterException
{
final boolean newAcc = accStatus.getAccountId() == 0;
accStatus.setAccountId(account.getId());
accStatus.setBalance(calcAccBalance(accStatusSpark));
accStatus.setPaymentType(conv.getPaymentType(removeQuotes(line[ACCOUNT_PAYMETH])));
accStatus.setCardType(conv.getCardType(removeQuotes(line[ACCOUNT_PAYMETH])));
if (newAcc) {
getAccountStatusDao().persist(accStatus);
}
else {
getAccountStatusDao().merge(accStatus);
}
}
/**
* Updates an account status spark bean and uses it to update an existing or
* create a new entry in the database.
*
* @param account the account the status is for.
* @param accStatusSpark the bean to update.
* @param line a CSV parsed line used to populate the bean.
*
* @throws OCSSImporterException
*/
private void updateAccStatusSpark(final Account account,
final AccountStatusSpark accStatusSpark,
final String [] line)
throws OCSSImporterException
{
final boolean newAcc = accStatusSpark.getAccountId() == 0;
final BigDecimal newDespVal = convertValue(line[ACCOUNT_DESPVAL]);
final BigDecimal newPayVal = convertValue(line[ACCOUNT_PAYVAL]);
final BigDecimal newCredVal = convertValue(line[ACCOUNT_CREDVAL]);
final BigDecimal newAdminVal = convertValue(line[ACCOUNT_ADMINVAL]);
BigDecimal oldPayVal = accStatusSpark.getPaidValue();
BigDecimal oldCredVal = accStatusSpark.getCreditValue();
BigDecimal oldAdminVal = accStatusSpark.getAdminChgValue();
accStatusSpark.setAccountId(account.getId());
accStatusSpark.setDespatchedValue(newDespVal);
accStatusSpark.setPaidValue(newPayVal);
accStatusSpark.setCreditValue(newCredVal);
accStatusSpark.setAdminChgValue(newAdminVal);
if (newAcc) {
getAccountStatusSparkDao().persist(accStatusSpark);
}
else {
getAccountStatusSparkDao().merge(accStatusSpark);
}
// Create any transactions required
if (oldCredVal == null) {
oldCredVal = ZERO;
}
if (oldPayVal == null) {
oldPayVal = ZERO;
}
if (oldAdminVal == null) {
oldAdminVal = ZERO;
}
final BigDecimal credit = newCredVal.subtract(oldCredVal);
if (oldCredVal.compareTo(newCredVal) < 0) {
// The credit value has increased so add a credit transaction
tranImporter.importCredit(account, line, credit);
}
else if (oldCredVal.compareTo(newCredVal) > 0) {
// The credit value has decreased, if the payval hasn't increased then that
// payment was rejected.
if (oldPayVal.compareTo(newPayVal) >= 0) {
tranImporter.importCreditReject(account, line, credit);
}
}
final BigDecimal admin = oldAdminVal.subtract(newAdminVal);
if (oldAdminVal.compareTo(newAdminVal) < 0) {
// The admin val has increased so add a admin transaction
tranImporter.importAdminCharge(account, line, admin);
}
else if (oldAdminVal.compareTo(newAdminVal) > 0) {
// The admin value has decreased so add a admin charge paid transaction
tranImporter.importAdminChargePayment(account, line, admin);
}
}
/**
* Updates an address bean for the delivery address and uses it
* to create a new or update an existing entry in the database.
*
* @param address the address bean to update.
* @param line a CSV parsed line used to populate the title bean.
*/
private void updateDelAddr(final Address address, final String [] line)
throws OCSSImporterException
{
address.setTitle(removeQuotes(line[ACCOUNT_DEL_TITLE]));
address.setInitial(removeQuotes(line[ACCOUNT_INIT]));
address.setSurname(removeQuotes(line[ACCOUNT_SNAME]));
address.setAddressLine1(removeQuotes(line[ACCOUNT_ADDR1]));
address.setAddressLine2(removeQuotes(line[ACCOUNT_ADDR2]));
address.setAddressLine3(removeQuotes(line[ACCOUNT_ADDR3]));
address.setAddressLine4(removeQuotes(line[ACCOUNT_ADDR4]));
// Post code and telephone number are used for manually adding accounts
// to "My Subscriptions" so need to be validated
final String postCode = removeQuotes(line[ACCOUNT_PCODE]);
final String convPostCode = PostcodeUtils.convert(postCode);
if (PostcodeUtils.validate(convPostCode)) {
address.setPostcode(convPostCode);
}
else {
address.setPostcode(postCode);
}
final String homeTel = removeQuotes(line[ACCOUNT_TEL]);
final String convHomeTel = PhoneNumberUtils.convert(homeTel);
if (PhoneNumberUtils.validate(convHomeTel)) {
address.setHomeTel(convHomeTel);
}
else {
address.setHomeTel(homeTel);
}
getAddrDao().saveOrUpdate(address);
}
If the bean code is likely to help I can soon post that as well but thought I have bombarded the reader with enough code as it is! :o)
I am running this on a fedora core 4 box, using various versions of java (1.5.0_09 1.5.0_10 1.6.0). I am using hibernate version 3.2.1.
I have also tried running the importer with the same set up on windows and profiling it using the netbeans profiler. In windows it seemed to go through fine - e.g. without the memory error. What I think has happened is that the first time I ran the 4000 record file, none of the accounts existed in the database I was using, therefore the importer just created new records with entityManager.persist(). Then on subsequent runs of the importer with the same data, although entityManager.update() was being called, because the data was in effect not changing it wasn't doing anything hence I was not seeing the problem manifest itself in windows - does my logic seem sensible? I'm going to create a copy of the linux database in my windows environment over the weekend and confirm this anyway.
TIA
Simon