Nov 22, 2011

ActiveMQ - multiple kahaDB instances (mKahaDB) helping reduce journal disk usage

The default store implementation in ActiveMQ, KahaDB, uses a journal and index. The journal uses a sequence of append-only files to store messages, acknowledgements and broker events. The index holds references to messages on a per destination basis. Essentially, the index holds the runtime state of the broker, mostly in memory, where as the journal maintains the persistence store of  raw data and events. It is the journal of record in a sense.
Periodically, unreferenced journal files are removed through a garbage collection process, so disk usage is kept in check.
In the main, this scheme works well, however, when multiple destinations on a broker are used in very different ways, it can lead to excessive disk usage by the journal. What follows is some detail on a solution to that problem.

Mixed destination usage; frequent fast tasks vs infrequent slow tasks
Imagine a toy makers order-processing. There are two types or orders, custom and standard. A custom order takes a few days to fulfill, a standard order takes a matter of hours. You can easily imagine two order queues, standard and custom. Now imagine that we only process custom orders once a month but process standard orders all the time. So we expect a large backup of custom orders that is slowly consumed at the start of each month and a steady load on the standard order queue.

What the broker sees
From a broker perspective, in the single shared journal, there will be a batch of journal files that are filled with custom order messages. Subsequent journal files that will have mostly 'standard order' messages and acknowledgements with the odd acknowledgement for a 'custom order' message. The sporadic distribution of  acknowledgements for 'custom orders' in the journal files can be problematic because even when that journal file no longer contains any unacked 'standard order' messages, it must still be retained.

Some background on the need to retain journal files
Journal data files are append only. Both messages and acknowledgements are appended, nothing is deleted from a data file. Journal data files that are unreferenced are periodically removed (or archived). The idea is that the index (JMS destination state) can be recreated in full from the journal at any point in time. Any message without a corresponding acknowledgement is deemed valid.


Referenced journal files

In the simplest case, a journal file is 'referenced' if it contains messages that have not been acknowledged by a consumer. The more subtle case reflects the persistence of acknowledgements (acks). A journal file is 'referenced' if it contains acks for messages in any 'referenced' journal file. This means that we cannot garbage collect a journal file that just contains acks until we can garbage collect all of the journal files that contain the corresponding messages. If we did, in the event of a failure that requires recovery of the index, we would miss some acks and replay messages as duplicates.

Problem
So back to the broker perspective of our toy makers order processing. The first range of journal data files remain till the 'custom orders' queue is depleted. Custom order message acknowledgements get dotted across journal files that result from the enqueue/dequeue of the 'standard orders' queue and the end result is lots of referenced journal files and excessive disk usage.

Solution
Reducing the default journal file size can help in this case, but at the cost of more runtime file IO as messages are distributed across more files. In an ideal world, the 'custom order' queue could be partitioned into its own journal where linear appends of messages and acks would result in a minimal set of journal files in use. Correspondingly, the 'standard order' queue with their short lived messages could share a journal.

With the Mulitple KahaDB persistence adapter, destination partitioning across journals is possible. It provides a neat solution to the scenario described above.
Replacing the default persistence adapter configuration:

<persistenceAdapter>
     <kahaDB directory="${activemq.base}/data/kahadb" />
</persistenceAdapter>

with:

<persistenceAdapter>
    <mKahaDB directory="${activemq.base}/data/kahadb">
      <filteredPersistenceAdapters>
       <filteredKahaDB queue="CustomOrders">
        <persistenceAdapter>
          <kahaDB />
        </persistenceAdapter>
       </filteredKahaDB>
       <filteredKahaDB>
        <persistenceAdapter>
          <kahaDB />
        </persistenceAdapter>
       </filteredKahaDB>
      </filteredPersistenceAdapters>
    </mKahaDB>
</persistenceAdapter>
  
The mKahaDB (m, short for multiple) adapter is a collection of filtered persistence adapters. The filtering reuses the destination policy matching feature to match destinations to persistence adapters. In the case of the above configuration, the 'custom orders' queue will use the first instance of kahaDb and all other destinations will map to the second instance. The second filter is empty, so the default 'match any' wild card is in effect.
This configuration, splitting the destinations based on their usage pattern over time, allows the respective journal files to get reclaimed in a linear fashion as messages are consumed and processed, resulting in minimum disk usage.


Overhead

When transactions span persistence adapters, there is an additional overhead of local two phase commit to ensure both journals are atomically updated. Two phase commit requires that the outcome is persisted so there is an additional disk write required per transaction. This can be avoided by colocating destinations that share transactions in a single kahaDB instance. When transactions access a single persistence adapter or when there are no transactions, there is no additional overhead.


Alternative Use Cases: Relaxed Durability Guarantee

Each nested kahaDB instance is fully configurable so one scenario where the use of different persistence adapters makes sense is where your durability guarantee is weaker for some destinations than others. JMS requires that a write be on disk before a send reply is generated by the broker. To this end, a disk sync is issued by default after every journal write. This default behavior is configurable by the kahaDB attribute enableJournalDiskSyncs. If some destinations don't need this guarantee, they can be assigned to a kahaDB instance that has this option disabled and have their writes return faster, leaving it to the file system to complete the write. Here is an example configuration:


<persistenceAdapter>
    <mkahaDB directory="${activemq.base}/data/kahadb">
      <filteredPersistenceAdapters>
      <filteredKahaDB queue="ImportantStuff">
        <persistenceAdapter>
          <kahaDB />
        </persistenceAadapter>
      </filteredkahadb>
      <filteredkahadb queue="NotSoImportantStuff">
        <persistenceAdapter>
          <kahaDB enableJournalDiskSyncs="false"/>
        </persistenceAdapter>
      </filteredKahaDB>
    </filteredPersistenceAdapters>
  </mKahaDB>
</persistenceAdapter>


2 comments:

Pieter Callewaert said...

I am very interested in the point you are making with acks locking files.
I would like to test my theory with you. Let's say, for the sake of argument we have datafiles of 32mb and each message takes up 1mb.
So 32 messages fit in one datafile.

The active datafile is full, so 32 messages. Of which 31 are acknowlegded, we are waiting for the 32nd message to be acknowledged in order for the gc to do its job.
All acks for the 31 other messages are (each of them) in a different data file. So 31 files contain an ack for the active file.

So that would mean that in my example 31 files are locked due to the acks + the active datafile = 32 datafiles.
At a rate of 32mb per file that would make a store of about 1024mb for only 1 unacked message.

More generally put ((numberOfMessagesPerFile - 1) + 1 activeFile ) * dataFileSize = storage size
Again in my example ((32 -1) +1 ) * 32 = 1024mb.

Does this make any sense? what is your take on this?

Gary Tully said...

@pieter, Yes, that makes sense.
It would be odd that each ack was in a separate data file, but possible :-) .
Reducing the data file size reduces the probability of such a distribution.