Abstract
Queuing systems provide three main benefits to the developer.
- Guaranteed delivery
- Transacted re-queue if some failure occurs
- First In First Out order
When dealing with high-performance systems, and queued messages are arriving at speed and are varied, only one of these is certain: guaranteed delivery.
Messages arrive too rapidly to hold them in the queue until failure is no longer an option. Multiple systems de-queuing the messages open the possibility of messages being processed out of order.
This post explores the options available to the developer to ensure messages that must be delivered are delivered, that they are processed as they should be, and that it’s possible to determine what happened.
Basic Queuing Applications
One of the main use cases for queuing applications is to allow Application A to send a message without having to worry that Application B, the recipient, is around to receive it. The queue management infrastructure ensures that when Application B is ready to pick up the message, or messages, they are available and there is no requirement that Application A be up and running when the message, or messages, are delivered.
Loosely-coupled, sender/receiver-agnostic, resilient messaging infrastructure.
Many applications can be built around this infrastructure. The queuing system gives the interconnected applications stability and resilience. That the sending application is done with the message once it has been enqueued allows that application to exit or return to whatever work it had been doing:
- interacting with end-users
- processing transactions as they arrive
- monitoring some Internet of Things (IoT) device(s)
This frees up system resources and can contribute to the real, or perceived, responsiveness of the overall system.
Large-scale e-commerce applications benefit from queuing application approaches.
High-Performance Queuing Applications
But what happens when Application A is not only still around when the messages are picked up, but is only one of several Application A instances, which are sending messages without pause? In like manner, multiple Application B instances may be deployed to cope with the volume and variety of the inbound messages and need to replicate themselves to ensure queues don’t get backed up when the system is under stress.
There are three such scenarios in high-performance queuing applications and the complexity of the receiving applications rises with each.
Scenario 1: Only new data
In this scenario multiple Application A instances are sending new data only. In an IoT application new sensor data may be streaming in from a variety of devices almost continuously. These data are typically simple (spot temperature values, pipeline flow rate past a sensor, start or stop for each of a bank of elevators in a large office building.) In this situation the only concerns are whether there are enough Application B instances to absorb the messages and persist them, and whether later aggregation of the data can determine in which order the messages arrived.
The answer to the first of these questions is a configuration issue. The queuing infrastructure should be configurable in such a way as to allow changing the number of Application B instances to meet the message volume demand. Preferably this configuration should be elastic, scaling up the number of instances as the message volume rises, and throttling back the number of instances as the volume declines.
The answer to the second question requires that Application A encode a sequence number in the message, or the queuing infrastructure attach a date/time stamp to the message as the message is being enqueued. Such a date/time stamp should be of sufficient granularity to identify the order in which messages from a given IoT device were enqueued.
The sequence number has the advantage of being monotonic, removing concerns about granularity. The data/time stamp can serve an additional function of capturing the time from enqueue to dequeue, which may suggest the health of the system. This is valuable for any custom monitoring solutions put in place.
Application B is simply charged with dequeuing messages, possibly adding a dequeue date/time stamp, and writing the resulting data to the persistence layer.
Because the data persistence layer is append-only, the factors for choice of storage mechanism hinge on low write latency and replication. The former to ensure data is persisted with minimal delay. The latter to prevent single-point-of-failure issues. Replication can also have the advantage of speeding up later consumption and aggregation of the data.
Scenario 2: Data distribution hub scenario with optional Publish/Subscribe (Pub/Sub)
In this scenario the Application B instances are charged with the full Insert/Amend/Delete suite of operations. Application A instances insert, amend, delete records in local transaction databases and emit the record details with the record manipulation action, as a message. Application B instances must consume the messages and persist the final state of changed records. Finally, the Application B instances may alert topic subscribers that changes have occurred.
As in Scenario 1, it must be possible for Application B instances to identify the order in which to apply updates to the Publisher data store. Were the rate of message arrival slow, the first-in first-out nature of the queuing infrastructure would be enough. In a high-performance queuing scenario, the messages must contain a means of ordering themselves. This ordering comes into play to address two needs:
Application B dequeues messages in batches. With a sufficiently high transaction rate in Application A, multiple instances of the same record might exist in one batch. Application B will only be interested in persisting the last version of any record. The ordering information within the messages allows Application B instances to select, within primary record identifiers, which instance of each record to persist.
Multiple Application B instances will dequeue messages in batches. It’s possible, given the asynchronicity inherent in network transmissions and parallel processing, that different versions of a given record may be distributed across multiple instances of Application B. Variety in message structure, complexity, length will introduce its own variability in processing time for messages. Application B instances must only persist records if the version they have is later than the version already stored. The ordering information within the record serves this purpose.
With the need to identify records for update or deletion operations, besides the simple store operation, the persistence layer should be a database management system that can rapidly identify records for Amend/Delete operations. Subscribers to changed topics also must identify records related to changed topics quickly. Some form of relational, aggregate, or hybrid relational/memory-mapped database management system should be selected.
The parallel and asynchronous nature of the Application B instances ensures database collisions as different instances of Application B attempt to write to the database. At the least, Application B should only insert a record if a record with that key identifier does not exist yet. Also, a record should only be updated if its sequence number/timestamp is greater than that of the record already persisted. Finally, Application B must use retry logic patterns to recover from database collision, yet avoid infinite retry loops. To avoid these the developer must implement circuit-breaker patterns.
The sequence number/timestamp must be persisted by both Application A and Application B instances. This ensures it’s possible to verify that the data in Application B managed systems match those in Application A managed systems, the system of record.
Logging must be implemented in Application B to record the disposition of all messages delivered to it. As only the latest versions of messages are persisted to the data management layer, the fate of the intermediate versions must be known for auditing and validation purposes.
Pub/Sub for Scenario 2 can be implemented in many ways. There are Pub/Sub infrastructures available that provide subscribe and notification APIs that can be employed. On persisting new data Application B instances simply need to call the notification API specifying the topic just updated and leave the rest to the Pub/Sub infrastructure.
Otherwise, using a queuing infrastructure, Application B instances could enqueue notifications to a set of queues monitored by subscribers to updated topics. Again, once the notification has been sent the work of Application B is done.
Scenario 3: Full Duplex with Writeback
All of scenario 2 with round-trip update to the initiator.
There may be Application C instances—the subscribers in Scenario 2—that enrich the received data through user interaction and/or other data sources, yet want to return that enriched data to the original source.
In this scenario little changes in the main Application B dequeing application. It’s possible this scenario tightens the service level agreement under which Application B operates, particularly in terms of speed. Care must be taken in provisioning enough hardware and network bandwidth to accommodate such a tightening.
Application C instances will work against a persistence layer that is, in the ideal case, not the one Application B instances update. This is to control the volume of writes to a data layer in a high-performance queuing application. There exist in-memory, or in-memory/disk-based hybrid, systems that might accommodate the differing transaction patterns of Application B, Subscribers to a Pub/Sub system, and Application C instances. These data systems could obviate the need for an entirely different data store. The entire pipeline in these systems must be monitored to identify and eliminate choke points as they arise.
Application C instances updating local copies of the data ensures quick response to Application C users. It must be possible to undo such updates should the return of the data back to the Application A source fail. In like manner, should the source data have been updated before the Application C request completes, it must be possible to signal to Application C this has happened so it may undo its update to the local copy.