Jul 2, 2012

ActiveMQ Broker Networks - Think, Demand Forwarding Bridge


When we build a mental model of how something works, our initial images are always tainted by our experience of the words used to describe it. Sometimes we have to remold our existing perception of the words based on our new learning context, when the words have multiple meanings, we can often start from the wrong premise.

With this in mind, I want to share the meaning of the words I have used to describe ActiveMQ Broker Networks and provide some context to the words. The hope is that this will help build a model in your mind that is a valid reflection of their reality.


A broker network is two or more brokers that are connected via a "Demand Forwarding Bridge". Lets expand on each word in turn, but in reverse:

Bridge

This part is easy, a bridge it is a link between two brokers. The bridge is identified by the transport url of the remote broker it will connect to. The bridge is realized by a socket connection over which messages will pass for specific destinations. A bridge is initiated on a broker via xml configuration of the form:
 <networkConnector url="scheme://host:port" />

Forwarding

The directionality is implicit in the forwarding. Messages flow in one direction only, from the local broker where the bridge is created, to the remote broker where messages end up. Messages are forwarded, which means they are consumed from the local broker, and sent to the remote broker. This is regular JMS send semantics, the message is acknowledged locally when the send to the remote broker completes. So a message only lives in one broker at a time.

Demand

The bridge is aware of demand. In ActiveMQ, demand for messages comes from consumers. A bridge will only forward messages if it knows that there are consumers for that destination on the remote broker. The trick here is the use of ActiveMQ Advisory messages. When a bridge is started, it registers a  consumer with the remote broker for consumer advisory messages. In this way, the bridge becomes aware of the creation and removal of consumers on the remote broker. The bridge reacts to a new consumer advisory notification by creating a local (proxy) consumer for that destination. When this proxy consumer gets a message dispatch, it responds by forwarding (sending) the message to the remote broker. When the remote consumer disconnects, the corresponding remove advisory notification fires and the local proxy consumer is removed. Messages are no longer forwarded for that destination. In this way, the bridge is led by demand.

Next time you encounter <networkConnector ... /> in xml configuration, think of these three words, Demand, Forwarding, Bridge", then proceed to (re)build your mental model.



Nov 22, 2011

ActiveMQ - multiple kahaDB instances (mKahaDB) helping reduce journal disk usage

The default store implementation in ActiveMQ, KahaDB, uses a journal and index. The journal uses a sequence of append-only files to store messages, acknowledgements and broker events. The index holds references to messages on a per destination basis. Essentially, the index holds the runtime state of the broker, mostly in memory, where as the journal maintains the persistence store of  raw data and events. It is the journal of record in a sense.
Periodically, unreferenced journal files are removed through a garbage collection process, so disk usage is kept in check.
In the main, this scheme works well, however, when multiple destinations on a broker are used in very different ways, it can lead to excessive disk usage by the journal. What follows is some detail on a solution to that problem.

Mixed destination usage; frequent fast tasks vs infrequent slow tasks
Imagine a toy makers order-processing. There are two types or orders, custom and standard. A custom order takes a few days to fulfill, a standard order takes a matter of hours. You can easily imagine two order queues, standard and custom. Now imagine that we only process custom orders once a month but process standard orders all the time. So we expect a large backup of custom orders that is slowly consumed at the start of each month and a steady load on the standard order queue.

What the broker sees
From a broker perspective, in the single shared journal, there will be a batch of journal files that are filled with custom order messages. Subsequent journal files that will have mostly 'standard order' messages and acknowledgements with the odd acknowledgement for a 'custom order' message. The sporadic distribution of  acknowledgements for 'custom orders' in the journal files can be problematic because even when that journal file no longer contains any unacked 'standard order' messages, it must still be retained.

Some background on the need to retain journal files
Journal data files are append only. Both messages and acknowledgements are appended, nothing is deleted from a data file. Journal data files that are unreferenced are periodically removed (or archived). The idea is that the index (JMS destination state) can be recreated in full from the journal at any point in time. Any message without a corresponding acknowledgement is deemed valid.


Referenced journal files

In the simplest case, a journal file is 'referenced' if it contains messages that have not been acknowledged by a consumer. The more subtle case reflects the persistence of acknowledgements (acks). A journal file is 'referenced' if it contains acks for messages in any 'referenced' journal file. This means that we cannot garbage collect a journal file that just contains acks until we can garbage collect all of the journal files that contain the corresponding messages. If we did, in the event of a failure that requires recovery of the index, we would miss some acks and replay messages as duplicates.

Problem
So back to the broker perspective of our toy makers order processing. The first range of journal data files remain till the 'custom orders' queue is depleted. Custom order message acknowledgements get dotted across journal files that result from the enqueue/dequeue of the 'standard orders' queue and the end result is lots of referenced journal files and excessive disk usage.

Solution
Reducing the default journal file size can help in this case, but at the cost of more runtime file IO as messages are distributed across more files. In an ideal world, the 'custom order' queue could be partitioned into its own journal where linear appends of messages and acks would result in a minimal set of journal files in use. Correspondingly, the 'standard order' queue with their short lived messages could share a journal.

With the Mulitple KahaDB persistence adapter, destination partitioning across journals is possible. It provides a neat solution to the scenario described above.
Replacing the default persistence adapter configuration:

<persistenceAdapter>
     <kahaDB directory="${activemq.base}/data/kahadb" />
</persistenceAdapter>

with:

<persistenceAdapter>
    <mKahaDB directory="${activemq.base}/data/kahadb">
      <filteredPersistenceAdapters>
       <filteredKahaDB queue="CustomOrders">
        <persistenceAdapter>
          <kahaDB />
        </persistenceAdapter>
       </filteredKahaDB>
       <filteredKahaDB>
        <persistenceAdapter>
          <kahaDB />
        </persistenceAdapter>
       </filteredKahaDB>
      </filteredPersistenceAdapters>
    </mKahaDB>
</persistenceAdapter>
  
The mKahaDB (m, short for multiple) adapter is a collection of filtered persistence adapters. The filtering reuses the destination policy matching feature to match destinations to persistence adapters. In the case of the above configuration, the 'custom orders' queue will use the first instance of kahaDb and all other destinations will map to the second instance. The second filter is empty, so the default 'match any' wild card is in effect.
This configuration, splitting the destinations based on their usage pattern over time, allows the respective journal files to get reclaimed in a linear fashion as messages are consumed and processed, resulting in minimum disk usage.


Overhead

When transactions span persistence adapters, there is an additional overhead of local two phase commit to ensure both journals are atomically updated. Two phase commit requires that the outcome is persisted so there is an additional disk write required per transaction. This can be avoided by colocating destinations that share transactions in a single kahaDB instance. When transactions access a single persistence adapter or when there are no transactions, there is no additional overhead.


Alternative Use Cases: Relaxed Durability Guarantee

Each nested kahaDB instance is fully configurable so one scenario where the use of different persistence adapters makes sense is where your durability guarantee is weaker for some destinations than others. JMS requires that a write be on disk before a send reply is generated by the broker. To this end, a disk sync is issued by default after every journal write. This default behavior is configurable by the kahaDB attribute enableJournalDiskSyncs. If some destinations don't need this guarantee, they can be assigned to a kahaDB instance that has this option disabled and have their writes return faster, leaving it to the file system to complete the write. Here is an example configuration:


<persistenceAdapter>
    <mkahaDB directory="${activemq.base}/data/kahadb">
      <filteredPersistenceAdapters>
      <filteredKahaDB queue="ImportantStuff">
        <persistenceAdapter>
          <kahaDB />
        </persistenceAadapter>
      </filteredkahadb>
      <filteredkahadb queue="NotSoImportantStuff">
        <persistenceAdapter>
          <kahaDB enableJournalDiskSyncs="false"/>
        </persistenceAdapter>
      </filteredKahaDB>
    </filteredPersistenceAdapters>
  </mKahaDB>
</persistenceAdapter>


Apr 22, 2011

Government agencies: cut future IT spend - share costs, invest in open source

Open letter to Minister of State for Public Service Reform, Mr. Brian Hayes TD

Hi Brian,
I would like to share a quick response to my reading of the Irish times article: State to demand price cuts from suppliers to reduce €16bn bill.

In order to best serve the needs of the Irish people right now and into the future, you need to seriously consider open source IT solutions. Across all departments and across all of Europe, government IT departments should be collectively investing in free open source solutions that solve their common IT needs.

Investment in open source IT solutions seeds innovation and is a commitment to shared future value. Investment in proprietary IT solutions is an innovation tax and a commitment to repetition.
This is not some sort of Marxist rant; open source is the best way to innovate. Open source software is a key reason amazon, google, twitter, facebook etc. emerged; they stand on the shoulders of giants.

I imagine this would mean a small shift in how government IT is organised. You would need to extract real value from the smart people therein. Rather than out sourcing decisions to global consultancy companies, you allow a shared need to be met from within.
You enable innovation, by allowing smart individuals to take ownership of both the problem and the solution and most importantly, to share the fruit of their labour.

The bottom line is this, all of the government departments have IT needs in common, they are much more alike than they wish to admit. The also share these needs with other governments thoroughout Europe.
There is no reason to constantly reinvent the wheel. We just need to enable people to share and evolve the best designs. Open source provides the freedom and motivation to do just that.

Apr 10, 2011

Consider Unhosted and open source for eHealth and eGov #DERIopenDay

DERI Galway produced an insightful open day on their developments in the semantic web of linked data. While I listened, two thoughts kept recurring that I want to explore. Chances are I am preaching to the choir but shucks, just in case I am not...

For a web architecture of the future look at Unhosted
At the root of the problem of siloed data and fragmentation (the database hugging phenomena) is the issue of ownership. Institutions have data that they don't really own because that data is of a personal nature. The collection of data is theirs, but not the individual components.
With Unhosted, the ownership problem is turned on its head. Users and aggregators of data only have a 'handle' (a URI) to personal data. A handle that is only useable with permission. Collections of data containing 'handles' can safely be shared. Granted, lots of issues need to be ironed out, but I think the architecture is on the right track and the concept is bang on.

Open source your research
Lots of what you do is plumbing. For new plumbing to be broadly adopted it needs to be better and it needs to be cheap. Publish and be damned. If the research is great the plumbing will proliferate at very little cost. If it does proliferate, you continue to research and innovate and profit above the new infrastructure, it is all good. If it does not proliferate..., well open source was not the problem!

Enterprise Ireland: open source can be a viable business model for shared infrastructure research. It is a world of constant iterative improvement. The profits are smaller but the rewards are greater because simply put, value shared is value multiplied.
In essence, open innovation puts the focus on execution rather than protection, if puts everyone on the front foot.

Oct 29, 2010

Independent FuseSource, a future of shared value

The future is bright for FuseSource and open source adoption, the challenge is to spread the word on shared value so more organisations can benefit.

At FuseSource, we are independent, we have a proven subscription based business plan and we have a clear message: "The experts in open source integration and messaging". A message that that is backed up by our Apache committers and consultants, many of whom are project founders. We are on the right track.

I think the growth of FuseSource is testament to the fact that enterprises are understanding a key benefit of liberal licensed open source:

Value shared is value multiplied

Put simply, each deployment of Apache ServiceMix, Apache ActiveMQ, ApacheCXF and Apache Camel, contributes positively to the shared pool of knowledge about these products. At FuseSource, all enhancement and fixes are delivered first at Apache, so everyone can benefit immediately. A great innovation this week becomes the start point for a new deployment next week. There are no barriers to entry. We all get smarter together.

The reality is that open source consultants rarely repeat themselves, work done for one client is work done for everyone. It is a model of shared incremental improvement. It is constantly challenging work, but most rewarding and always interesting.

My hope is that more organisations, where information technology (IT) is not the core of their competitive advantage, will see the benefit of an open collaborative approach to infrastructure investment. The approach is simple: Use the same open source products as others, invest in those products, contribute back and reap the benefits of the contributions of others. Though we consider our selves individuals, when it comes to what we need computers to do, we are mostly the same.

If you work in health care, government or retail and have an IT problem, somewhere in the world some one is struggling with the same problem as you. You need not be alone, you just need to share a common language and join the community. Open source infrastructure can be that language.

Note: those organisations that use IT for competitive advantage are already on the open source band wagon, layering higher value services over existing open implementations, standing on the shoulders of giants. They just don't always have the same incentive to share.

Aug 16, 2010

Reminder: JMS is client server infrastructure; update broker ∴ update client

Often we get the following question:
If I upgrade the broker to version 5.x, do I also need to upgrade all my clients?
 The short answer is:
maybe, but error on the safe size and upgrade your clients if it does not cause too much disruption.

For the longer answer there are at least two things to consider:
  • Does the reason for upgrade include the need for fixes that affect client side code? If yes, then obviously update all clients. (Issues of this kind typically focus on some aspect of the JMS API or consumer delivery semantics.)
  • Is there an increment to the openwire protocol version? if so, does it affect me? read on...

Does an update to the openwire protocol version affect me?
The openwire protocol is the set of commands that is used to communicate between an ActiveMQ client and an ActiveMQ broker (and from broker to broker in a cluster scenario). The openwire protocol supports version negotiation such that an old client can negotiate the lowest common version with it's peer and use that version. As a result, in most cases, old clients can work as expected with a newer broker.

There are two potential pitfalls that you should be aware of:
  • fixes/features that depend on the openwire version update.
  • the ever increasing and incomplete version testing matrix.

Fixes or features that depend on the openwire version update
These are typically fixes that require additional information to be passed from the clients to the broker or vice versa. Some examples include the addition of a last delivered sequence id parameter to a consumer close command such that the redelivery count could be more accurately calculated. Another is the addition of a reconnecting flag to a connection command that allows duplicate suppression to be implemented consistently at the transport connection level. In some cases it is not obvious if an issue requires a protocol update without some consideration of the implementation, if in doubt ask on the activemq mailing list.

The ever increasing and incomplete version testing matrix
With every protocol version change, there are new additions to the client/server testing matrix. In ActiveMQ, virtually all tests assume a uniform openwire version, with the exception of a few that validate negotiation. The net result is that validation of the compatibility matrix is largely completed by the community. This works in practice but it is important to be aware of. If you are in doubt as to whether a particular scenarios will work across a broker version mismatch, be sure; ask the computer yourself with a little test.

In summary, If you update the broker, you also need to update the clients; or at least consider it!

Jan 18, 2010

ActiveMQ (prefetch and asyncDispatch) negative destination inflight values in Jconsole explained

While tracking down an issue for a customer over the past few days I noticed the inflight count for my destination in jconsole has a negative value. On closer inspection, I found that the value was fluctuating wildly with negative values before settling down again to a more reasonable positive range. I took a detour to investigate and it turns out this behavior is expected. The negative values are the result of prefetch and asyncDispatch, let me explain with a little note to self:

The use case included a pre filled queue with ~30k messages and multiple(10) consumers which dequeued a small amount(again 10) of messages before disconnecting and immediately reconnecting. In this case, a prefetch value of 10 is ideal, but with the default prefetch value of 1000, the broker is busy dispatching messages to the consumer long after it has decided to quit. In addition, with asyncDispath, while dispatch to a consumer is instigated by the broker, the actual delivery is delegated, to the broker transport connection worker thread. This means that the delivery attempts back up on the individual transport connections rather than slowing down the broker.

The destination inflight count is a measure of the number of messages that have been dispatched by the broker but not yet acknowledged by any consumer. On each dispatch completion by the worker thread, the inflight value is incremented. The decrementing normally happens on a message acknowledge. In the event of a consumer closure with unconsumed messages, the remaining value is decremented when the consumer closes.

This is the crux. From the broker perspective, on consumer closure, it has dispatched 1000 messages and got an ack for 10 so it needs to decrement the inflight by 1090 990 (thanks for the correction Arjan). But from the perspective of the worker thread, busy doing the actual dispatch, it still has a lot of incrementing to do. The negative values arise from the consumer closure occurring before async dispatch is complete. When there are many concurrent consumers, the negative swing can be quite noticeable and quite large.

The good news is that this is perfectly fine, the books are kept in balance and there is eventual consistency. In addition, using either of a prefetch value of 10 or asyncDispatch=false ensures that the negative values do not occur as the broker is kept directly in step with message delivery to the consumer. In general though, the an appropriate value of prefetch is the correct solution if it is known in advance that a consumer will do work in batches.