5. Using xfmctl

xfmctl does not only install and update nodes, but also has some command which help managing a XFM deployment.

5.1. Process Management

Process management is performed with the supervisor command.

By default it shows the status of the XFM processes.

$ xfmctl --roles=ingester supervisor

The above command is the same as:

$ xfmctl --roles=ingester supervisor:status

To stop a process use the identifier of the form as shown in the status overview: <group>:<process>, so for example:

$ xfmctl --roles=loader supervisor:"stop xfm-loader:xfm-loader-0"

To stop all processes in a group you can provide only the group name (including colon):

$ xfmctl --roles=loader supervisor:"stop xfm-loader:"

Or to start it:

$ xfmctl --roles=loader supervisor:"start xfm-loader:"

Show available commands:

$ xfmctl --roles=singlenode supervisor:help
[127.0.0.1] Executing task 'supervisor'
[127.0.0.1] sudo: /usr/bin/supervisorctl -c /etc/xfm/supervisord/supervisord.conf help
[127.0.0.1] out:
[127.0.0.1] out: default commands (type help <topic>):
[127.0.0.1] out: =====================================
[127.0.0.1] out: add    exit      open  reload  restart   start   tail
[127.0.0.1] out: avail  fg        pid   remove  shutdown  status  update
[127.0.0.1] out: clear  maintail  quit  reread  signal    stop    version

5.2. Messaging

The messaging command provides access to and control of messaging components.

Show queue information:

$ xfmctl --roles=singlenode messaging
[127.0.0.1] Executing task 'messaging'
[127.0.0.1] sudo: /opt/mgrid/xfm3-bootstrap/scripts/rabbitmqcmd.py broker list queues
[127.0.0.1] out: +------------+----------------------+-------------+----------------------+-----------+---------+------------------------+---------------------+--------+----------+----------------+-------------------------+------------------+--------+---------+
[127.0.0.1] out: |   vhost    |         name         | auto_delete | consumer_utilisation | consumers | durable | exclusive_consumer_tag |     idle_since      | memory | messages | messages_ready | messages_unacknowledged |       node       | policy |  state  |
[127.0.0.1] out: +------------+----------------------+-------------+----------------------+-----------+---------+------------------------+---------------------+--------+----------+----------------+-------------------------+------------------+--------+---------+
[127.0.0.1] out: | /messaging | dlx-errors-load      | False       |                      | 0         | True    |                        | 2016-01-10 13:48:30 | 14008  | 0        | 0              | 0                       | rabbit@localhost |        | running |
[127.0.0.1] out: | /messaging | dlx-errors-transform | False       |                      | 0         | True    |                        | 2016-01-10 13:48:30 | 14008  | 0        | 0              | 0                       | rabbit@localhost |        | running |
[127.0.0.1] out: | /messaging | dlx-load             | False       |                      | 0         | True    |                        | 2016-01-10 13:48:30 | 14008  | 0        | 0              | 0                       | rabbit@localhost |        | running |
[127.0.0.1] out: | /messaging | dlx-sequencer        | False       |                      | 0         | True    |                        | 2016-01-10 13:48:30 | 14008  | 0        | 0              | 0                       | rabbit@localhost |        | running |
[127.0.0.1] out: | /messaging | dlx-transform        | False       |                      | 0         | True    |                        | 2016-01-10 13:48:30 | 14008  | 0        | 0              | 0                       | rabbit@localhost |        | running |
[127.0.0.1] out: | /messaging | errors-load          | False       |                      | 0         | True    |                        | 2016-01-10 13:48:30 | 14008  | 0        | 0              | 0                       | rabbit@localhost |        | running |
[127.0.0.1] out: | /messaging | errors-transform     | False       |                      | 0         | True    |                        | 2016-01-10 13:48:30 | 14008  | 0        | 0              | 0                       | rabbit@localhost |        | running |
[127.0.0.1] out: | /messaging | load-sql             | False       |                      | 1         | True    |                        | 2016-01-10 14:15:13 | 10328  | 0        | 0              | 0                       | rabbit@localhost |        | running |
[127.0.0.1] out: | /messaging | pond-seq             | False       |                      | 0         | True    |                        | 2016-01-10 14:15:12 | 832088 | 4095     | 4095           | 0                       | rabbit@localhost |        | running |
[127.0.0.1] out: | /messaging | transform-rim        | False       | 1.0                  | 1         | True    |                        |                     | 68328  | 0        | 0              | 0                       | rabbit@localhost |        | running |
[127.0.0.1] out: | /messaging | unrouted             | False       |                      | 0         | True    |                        | 2016-01-10 13:48:30 | 14008  | 0        | 0              | 0                       | rabbit@localhost |        | running |
[127.0.0.1] out: +------------+----------------------+-------------+----------------------+-----------+---------+------------------------+---------------------+--------+----------+----------------+-------------------------+------------------+--------+---------+

The output shows this is a short form for /opt/mgrid/xfm3-bootstrap/scripts/rabbitmqcmd.py broker list queues. rabbitmqcmd.py is a wrapper for the RabbitMQ CLI management tool rabbitmqadmin. rabbitmqcmd.py requires either gateway or broker as first argument to resolve connection details to pass on to rabbitmqadmin.

To specify only the name and messages column:

$ xfmctl --roles=singlenode messaging:"broker list queues name messages"
[127.0.0.1] Executing task 'messaging'
[127.0.0.1] sudo: /opt/mgrid/xfm3-bootstrap/scripts/rabbitmqcmd.py broker list queues name messages
[127.0.0.1] out: +----------------------+----------+
[127.0.0.1] out: |         name         | messages |
[127.0.0.1] out: +----------------------+----------+
[127.0.0.1] out: | dlx-errors-load      | 0        |
[127.0.0.1] out: | dlx-errors-transform | 0        |
[127.0.0.1] out: | dlx-load             | 0        |
[127.0.0.1] out: | dlx-sequencer        | 0        |
[127.0.0.1] out: | dlx-transform        | 0        |
[127.0.0.1] out: | errors-load          | 0        |
[127.0.0.1] out: | errors-transform     | 0        |
[127.0.0.1] out: | load-sql             | 0        |
[127.0.0.1] out: | pond-seq             | 4095     |
[127.0.0.1] out: | transform-rim        | 0        |
[127.0.0.1] out: | unrouted             | 0        |
[127.0.0.1] out: +----------------------+----------+

For the gateway:

$ xfmctl --roles=singlenode messaging:"gateway list queues name messages"
[127.0.0.1] Executing task 'messaging'
[127.0.0.1] sudo: /opt/mgrid/xfm3-bootstrap/scripts/rabbitmqcmd.py gateway list queues name messages
[127.0.0.1] out: +--------------------+----------+
[127.0.0.1] out: |        name        | messages |
[127.0.0.1] out: +--------------------+----------+
[127.0.0.1] out: | dlx-errors-ingress | 0        |
[127.0.0.1] out: | dlx-ingress        | 0        |
[127.0.0.1] out: | errors-ingress     | 1        |
[127.0.0.1] out: | ingress-cdar2      | 0        |
[127.0.0.1] out: | ingress-fhir       | 0        |
[127.0.0.1] out: +--------------------+----------+

Show active connections:

$ xfmctl --roles=singlenode messaging:"broker -f tsv -q list connections name"
[127.0.0.1] Executing task 'messaging'
[127.0.0.1] sudo: /opt/mgrid/xfm3-bootstrap/scripts/rabbitmqcmd.py broker -f tsv -q list connections name
[127.0.0.1] out: 192.168.122.43:39463 -> 192.168.122.43:5672
[127.0.0.1] out: 192.168.122.43:39470 -> 192.168.122.43:5672
[127.0.0.1] out: 192.168.122.43:39471 -> 192.168.122.43:5672
[127.0.0.1] out: 192.168.122.43:39482 -> 192.168.122.43:5672

Purge a specific queue:

$ xfmctl messaging:"broker purge queue name\=errors-sql"

Note

In the provided command (in quotes) certain characters should be escaped, most notably the = character. This is needed due to the way the argument parsing is implemented. For example:

.. code-block:: bash
$ xfmctl –roles=singlenode messaging:”broker get queue=errors-sql”

5.3. Message archiving and replay

xfmctl contains a command qarchive for simple archiving and replaying of messages. This can be of use to retry messages in error or dead-letter queues.

qarchive downloads messages to disk on the management server, path /var/lib/xfm/qarchive. The messages are stored in TAR format with metadata (PAX headers).

Archive maximum of 100 messages (default) from the errors-ingress queue:

$ xfmctl --roles=singlenode qarchive:archive,errors-ingress,100
[127.0.0.1] Executing task 'qarchive'
[127.0.0.1] sudo: /opt/mgrid/xfm3-bootstrap/scripts/qarchive.py /etc/xfm/xfm.json archive errors-ingress 100

Download archive to local machine:

$ xfmctl --roles=singlenode qarchive:get,errors-ingress
[127.0.0.1] Executing task 'qarchive'
[127.0.0.1] download: /home/xfmadmin/127.0.0.1/errors-ingress.tar <- /var/lib/xfm/qarchive/errors-ingress.tar

The archive (TAR) is downloaded to a directory with the name of the originating IP address.

When an archive already exists for a queue (on the node), it has to be removed before it can be archived again:

$ xfmctl --roles=singlenode qarchive:remove,errors-ingress
[127.0.0.1] Executing task 'qarchive'
[127.0.0.1] sudo: /opt/mgrid/xfm3-bootstrap/scripts/qarchive.py /etc/xfm/xfm.json remove errors-ingress 100
[127.0.0.1] out: Removing archive for errors-ingress, are you sure? [y/n]: y

To replay messages:

$ xfmctl --roles=singlenode qarchive:replay,errors-ingress
[127.0.0.1] Executing task 'qarchive'
[127.0.0.1] sudo: /opt/mgrid/xfm3-bootstrap/scripts/qarchive.py /etc/xfm/xfm.json replay errors-ingress 100

The replay command publishes the archived messages to the relevant exchange such that XFM workers can process it. For example, archived messages from the errors-ingress queue are published to the ingress exchange.

5.4. Decommission loaders

Loaders have ponds which hold state; they store an unique identifier range for copying messages to the lake. To retire a pond (e.g., before an upgrade) and return unused identifiers, the loader needs to be decommissioned.

$ xfmctl --roles=singlenode decommission
[127.0.0.1] Executing task 'decommission'
[127.0.0.1] sudo: touch /etc/xfm/decommission
[127.0.0.1] Executing task 'supervisor'
[127.0.0.1] sudo: /usr/bin/supervisorctl -c /etc/xfm/supervisord/supervisord.conf restart xfm-loader:
[127.0.0.1] out: xfm-loader:xfm-loader-0: stopped
[127.0.0.1] out: xfm-loader:xfm-loader-0: started

The decommission command decommissions all loaders (and their ponds) on the selected nodes. It creates the file /etc/xfm/decommission which is checked for existence by the loaders. To re-initilize the loaders simply delete this file and restart the loaders:

$ xfmctl --roles=singlenode -- sudo rm /etc/xfm/decommission
$ xfmctl --roles=singlenode supervisor:"restart xfm-loader:"

Note the use of xfm-loader: which indicates all components in the xfm-loader group.

5.5. Advanced usage

Under the hood xfmctl integrates the Fabric command line tool, and inherits its powerful features. Check the Fabric documentation for details. Below some examples are listed.

Execute an arbitrary command on all loaders:

$ xfmctl --roles=loader -- ls

Sequentially open a shell on each transformer:

$ xfmctl --roles=transformer -- bash

To execute a command on a specific instance (instead of all instances with a specific role) you can first list the network addresses of each instance:

$ xfmctl list

After which the network address can be used to execute a command:

$ xfmctl --hosts=<ADDRESS> update

Or exclude a specific instance:

$ xfmctl --roles=ingester --exclude-hosts=<ADDRESS> -- sudo yum update

Run commands in parallel:

$ xfmctl --parallel --roles=ingester,transformer,loader -- sudo yum update