3. Messaging¶
This section provides details on messaging in XFM. Messaging includes the XFM infrastructure to route and send messages between components. The central part is the message broker (RabbitMQ), to which workers connect.
Below is a description of the default messaging configuration of XFM, which is suited to accept and process messages generated by the XFM Generator (CDAR2 and FHIR). Typically deployments will need to support a different set of source messages, with a custom messaging configuration.
3.1. Broker internals¶
The message broker is the main component for routing messages to and from workers. In explaining the XFM broker some familiarity with RabbitMQ is assumed.
While the message flow through XFM is sequential from ingestion to transformation to loading, the broker provides decoupling of workers.
Internally the message flow is organized as a set of AMQP exchanges and queues, which route and store messages for each processing step.
As can be seen from the diagram above the configuration consists of 2 virtual
hosts (vhosts): gateway
and messaging
. The gateway
virtual host
represents the XFM gateway to which source messages are routed, and typically
runs as a separate RabbitMQ instance. Ingesters obtain messages from the XFM
gateway and send messages to the messaging
virtual host, which runs on the
XFM broker instance (so actually the configuration shown in the figure above
is split between two instances; the gateway and broker).
3.2. Message routing¶
Message sources publish messages to the ingress
exchange on the gateway which is
a direct exchange. By default, it matches on routing keys:
cdar2
Source messages with this routing key are assumed to be in valid Clinical Document Architecture, Release 2 format, which is an existing HL7v3 format and does not need to be translated before forwarding.fhir
XFM supports FHIR as an alternative source format, however such messages first need to be translated to HL7v3 compatible format, so they are treated separately by using a distinct routing key.
A queue is bound to the ingress
exchange for each routing key
(ingress-cdar2
and ingress-fhir
). Ingesters consume messages from these
queues. After an ingester processes a message the result will be a HL7v3
compatible message, and it adds a routing key with value hl7v3
to the message before
forwarding it to the XFM broker’s transform
exchange where it is queued for
transformation to SQL (queue transform-rim
).
A transformer obtains messages from the transform-rim
queue and transforms
it to SQL. When successful it adds a sql
routing key and forwards it to the
load
exchange which queues it in the load-sql
queue.
In last step the loader obtains messages from the load-sql
queue, aggregate messages in groups or micro-batches and store them in its local
data pond for processing and uploading it to the data lake.
3.2.1. Identifier sequences¶
XFM can consist of many loaders and therefore many data ponds. Because data in
a pond is uploaded to a single data lake the ponds share a single identifier
sequence. On the first run of a loader these sequences are requested from the
broker (contained in a persistent queue pond-seq
), and a pond is
initialised with the obtained sequence. The pond sequences are created and
published to the queue during the message broker installation.
When a loader and/or its pond is updated the unused part of the sequence can be returned to the queue. See Decommission loaders.
3.3. Flow control¶
If more source messages enter the system than can be processed the message queues on the broker will grow. RabbitMQ has flow control mechanisms that block publishers when a certain memory usage or disk space threshold is hit (called watermarks in RabbitMQ), however this alarm is server-wide and can result in a deadlock situation in XFM (a manual offload process is required). The deadlock can occur because each XFM component will acknowledge a message upstream only after it has been acknowledged downstream (i.e. after the broker persisted the output message or error message, or after uploading the message in the data lake). New messages can be consumed only after a message is acknowledged upstream and this requires a message publish (which are now blocked!). Eventually message processing will stop. (The only components that can continue are the loaders which do not publish messages to the broker - only error messages, however if messages are piled up earlier in the chain and emptying the loader queue does not cause the watermark to go below the threshold the deadlock is not overcome). XFM deals with this situation in two ways:
- separating (external) source message publishers from XFM workers using two independent RabbitMQ instances, each with their own resource limits. These are represented by the gateway and broker.
- apply basic application-level flow control between the gateway and broker (implemented by the ingesters). This mechanism tracks queue sizes to detect whether messages keep piling up. If a threshold is hit no messages from the gateway are ingested (this provides a soft limit because the queue sizes are checked periodically).
3.4. Error handling¶
The message flow described earlier assumes every processing step executes successfully, which is not always the case. During XFM processing errors can occur. Four types of errors are distinguished:
- Business exceptions: message validation errors or code system constraint errors.
- Application exceptions: configuration errors or application-internal errors
- System errors: out of memory errors
- Network errors
In XFM each of these error types is treated differently.
When a business exception occurs an error message is sent to an error exchange on the message broker. After the error message is acknowledged (i.e. it is persisted on the broker, if persistence is enabled) the processing of the source message is considered complete and it is acknowledged upstream. The error message includes the source message as payload and error information as headers.
Application exceptions such as configuration errors could prevent the component to start. This can be observed in the application log file (no message processing has taken place yet). It is possible that configuration errors or internal-application errors occur after the application started processing messages. The errors are logged and by default the involved source message is not requeued but delivered to a so called dead letter queue which in XFM serves a similar purpose as the error queues for business exceptions: the messages are stored on the broker for manual inspection and are not further processed.
System errors such as out of memory errors typically cause the application to crash. In a running setup the application can hold several messages for processing, and to avoid to lose the messages on a crash XFM uses message acknowledgements to and from the broker. When the application crashes the broker (RabbitMQ) will detect this and requeue all messages which were held by the application but not yet acknowledged.
A network error can occur for example if the broker is not reachable or down. The XFM components will log the failure and retry the connection until the connection can be restored.
The error handling behaviour described above provides XFM with at-least-once semantics: mechanisms are in place to avoid message loss, however it is possible for the same message to be delivered more than once. For example, this can happen if a component crashes (e.g., due to a system error) after successfully processing and forwarding a message but before the upstream acknowledgement could be delivered to the broker. The broker will detect the connection loss and requeue the message.