3. Messaging

This section provides details on messaging in XFM. Messaging includes the XFM infrastructure to route and send messages between components. The central part is the message broker (RabbitMQ), to which workers connect.

Below is a description of the default messaging configuration of XFM, which is suited to accept and process messages generated by the XFM Generator (CDAR2 and FHIR). Typically deployments will need to support a different set of source messages, with a custom messaging configuration.

3.1. Broker internals

The message broker is the main component for routing messages to and from workers. In explaining the XFM broker some familiarity with RabbitMQ is assumed.

While the message flow through XFM is sequential from ingestion to transformation to loading, the broker provides decoupling of workers.

_images/msgflow.svg

Internally the message flow is organized as a set of AMQP exchanges and queues, which route and store messages for each processing step.

_images/brokerconfig.svg

As can be seen from the diagram above the configuration consists of 2 virtual hosts (vhosts): gateway and messaging. The gateway virtual host represents the XFM gateway to which source messages are routed, and typically runs as a separate RabbitMQ instance. Ingesters obtain messages from the XFM gateway and send messages to the messaging virtual host, which runs on the XFM broker instance (so actually the configuration shown in the figure above is split between two instances; the gateway and broker).

3.2. Message routing

Message sources publish messages to the ingress exchange on the gateway which is a direct exchange. By default, it matches on routing keys:

  • cdar2 Source messages with this routing key are assumed to be in valid Clinical Document Architecture, Release 2 format, which is an existing HL7v3 format and does not need to be translated before forwarding.
  • fhir XFM supports FHIR as an alternative source format, however such messages first need to be translated to HL7v3 compatible format, so they are treated separately by using a distinct routing key.

A queue is bound to the ingress exchange for each routing key (ingress-cdar2 and ingress-fhir). Ingesters consume messages from these queues. After an ingester processes a message the result will be a HL7v3 compatible message, and it adds a routing key with value hl7v3 to the message before forwarding it to the XFM broker’s transform exchange where it is queued for transformation to SQL (queue transform-rim).

A transformer obtains messages from the transform-rim queue and transforms it to SQL. When successful it adds a sql routing key and forwards it to the load exchange which queues it in the load-sql queue.

In last step the loader obtains messages from the load-sql queue, aggregate messages in groups or micro-batches and store them in its local data pond for processing and uploading it to the data lake.

3.2.1. Identifier sequences

XFM can consist of many loaders and therefore many data ponds. Because data in a pond is uploaded to a single data lake the ponds share a single identifier sequence. On the first run of a loader these sequences are requested from the broker (contained in a persistent queue pond-seq), and a pond is initialised with the obtained sequence. The pond sequences are created and published to the queue during the message broker installation.

When a loader and/or its pond is updated the unused part of the sequence can be returned to the queue. See Decommission loaders.

3.3. Flow control

If more source messages enter the system than can be processed the message queues on the broker will grow. RabbitMQ has flow control mechanisms that block publishers when a certain memory usage or disk space threshold is hit (called watermarks in RabbitMQ), however this alarm is server-wide and can result in a deadlock situation in XFM (a manual offload process is required). The deadlock can occur because each XFM component will acknowledge a message upstream only after it has been acknowledged downstream (i.e. after the broker persisted the output message or error message, or after uploading the message in the data lake). New messages can be consumed only after a message is acknowledged upstream and this requires a message publish (which are now blocked!). Eventually message processing will stop. (The only components that can continue are the loaders which do not publish messages to the broker - only error messages, however if messages are piled up earlier in the chain and emptying the loader queue does not cause the watermark to go below the threshold the deadlock is not overcome). XFM deals with this situation in two ways:

  1. separating (external) source message publishers from XFM workers using two independent RabbitMQ instances, each with their own resource limits. These are represented by the gateway and broker.
  2. apply basic application-level flow control between the gateway and broker (implemented by the ingesters). This mechanism tracks queue sizes to detect whether messages keep piling up. If a threshold is hit no messages from the gateway are ingested (this provides a soft limit because the queue sizes are checked periodically).

3.4. Error handling

The message flow described earlier assumes every processing step executes successfully, which is not always the case. During XFM processing errors can occur. Four types of errors are distinguished:

  • Business exceptions: message validation errors or code system constraint errors.
  • Application exceptions: configuration errors or application-internal errors
  • System errors: out of memory errors
  • Network errors

In XFM each of these error types is treated differently.

When a business exception occurs an error message is sent to an error exchange on the message broker. After the error message is acknowledged (i.e. it is persisted on the broker, if persistence is enabled) the processing of the source message is considered complete and it is acknowledged upstream. The error message includes the source message as payload and error information as headers.

Application exceptions such as configuration errors could prevent the component to start. This can be observed in the application log file (no message processing has taken place yet). It is possible that configuration errors or internal-application errors occur after the application started processing messages. The errors are logged and by default the involved source message is not requeued but delivered to a so called dead letter queue which in XFM serves a similar purpose as the error queues for business exceptions: the messages are stored on the broker for manual inspection and are not further processed.

System errors such as out of memory errors typically cause the application to crash. In a running setup the application can hold several messages for processing, and to avoid to lose the messages on a crash XFM uses message acknowledgements to and from the broker. When the application crashes the broker (RabbitMQ) will detect this and requeue all messages which were held by the application but not yet acknowledged.

A network error can occur for example if the broker is not reachable or down. The XFM components will log the failure and retry the connection until the connection can be restored.

The error handling behaviour described above provides XFM with at-least-once semantics: mechanisms are in place to avoid message loss, however it is possible for the same message to be delivered more than once. For example, this can happen if a component crashes (e.g., due to a system error) after successfully processing and forwarding a message but before the upstream acknowledgement could be delivered to the broker. The broker will detect the connection loss and requeue the message.