5. Using xfmctl¶
xfmctl
does not only install and update nodes, but also has some command
which help managing a XFM deployment.
5.1. Process Management¶
Process management is performed with the supervisor
command.
By default it shows the status of the XFM processes.
$ xfmctl --roles=ingester supervisor
The above command is the same as:
$ xfmctl --roles=ingester supervisor:status
To stop a process use the identifier of the form as shown in the status
overview: <group>:<process>
, so for example:
$ xfmctl --roles=loader supervisor:"stop xfm-loader:xfm-loader-0"
To stop all processes in a group you can provide only the group name (including colon):
$ xfmctl --roles=loader supervisor:"stop xfm-loader:"
Or to start it:
$ xfmctl --roles=loader supervisor:"start xfm-loader:"
Show available commands:
$ xfmctl --roles=singlenode supervisor:help
[127.0.0.1] Executing task 'supervisor'
[127.0.0.1] sudo: /usr/bin/supervisorctl -c /etc/xfm/supervisord/supervisord.conf help
[127.0.0.1] out:
[127.0.0.1] out: default commands (type help <topic>):
[127.0.0.1] out: =====================================
[127.0.0.1] out: add exit open reload restart start tail
[127.0.0.1] out: avail fg pid remove shutdown status update
[127.0.0.1] out: clear maintail quit reread signal stop version
5.2. Messaging¶
The messaging
command provides access to and control of messaging components.
Show queue information:
$ xfmctl --roles=singlenode messaging
[127.0.0.1] Executing task 'messaging'
[127.0.0.1] sudo: /opt/mgrid/xfm3-bootstrap/scripts/rabbitmqcmd.py broker list queues
[127.0.0.1] out: +------------+----------------------+-------------+----------------------+-----------+---------+------------------------+---------------------+--------+----------+----------------+-------------------------+------------------+--------+---------+
[127.0.0.1] out: | vhost | name | auto_delete | consumer_utilisation | consumers | durable | exclusive_consumer_tag | idle_since | memory | messages | messages_ready | messages_unacknowledged | node | policy | state |
[127.0.0.1] out: +------------+----------------------+-------------+----------------------+-----------+---------+------------------------+---------------------+--------+----------+----------------+-------------------------+------------------+--------+---------+
[127.0.0.1] out: | /messaging | dlx-errors-load | False | | 0 | True | | 2016-01-10 13:48:30 | 14008 | 0 | 0 | 0 | rabbit@localhost | | running |
[127.0.0.1] out: | /messaging | dlx-errors-transform | False | | 0 | True | | 2016-01-10 13:48:30 | 14008 | 0 | 0 | 0 | rabbit@localhost | | running |
[127.0.0.1] out: | /messaging | dlx-load | False | | 0 | True | | 2016-01-10 13:48:30 | 14008 | 0 | 0 | 0 | rabbit@localhost | | running |
[127.0.0.1] out: | /messaging | dlx-sequencer | False | | 0 | True | | 2016-01-10 13:48:30 | 14008 | 0 | 0 | 0 | rabbit@localhost | | running |
[127.0.0.1] out: | /messaging | dlx-transform | False | | 0 | True | | 2016-01-10 13:48:30 | 14008 | 0 | 0 | 0 | rabbit@localhost | | running |
[127.0.0.1] out: | /messaging | errors-load | False | | 0 | True | | 2016-01-10 13:48:30 | 14008 | 0 | 0 | 0 | rabbit@localhost | | running |
[127.0.0.1] out: | /messaging | errors-transform | False | | 0 | True | | 2016-01-10 13:48:30 | 14008 | 0 | 0 | 0 | rabbit@localhost | | running |
[127.0.0.1] out: | /messaging | load-sql | False | | 1 | True | | 2016-01-10 14:15:13 | 10328 | 0 | 0 | 0 | rabbit@localhost | | running |
[127.0.0.1] out: | /messaging | pond-seq | False | | 0 | True | | 2016-01-10 14:15:12 | 832088 | 4095 | 4095 | 0 | rabbit@localhost | | running |
[127.0.0.1] out: | /messaging | transform-rim | False | 1.0 | 1 | True | | | 68328 | 0 | 0 | 0 | rabbit@localhost | | running |
[127.0.0.1] out: | /messaging | unrouted | False | | 0 | True | | 2016-01-10 13:48:30 | 14008 | 0 | 0 | 0 | rabbit@localhost | | running |
[127.0.0.1] out: +------------+----------------------+-------------+----------------------+-----------+---------+------------------------+---------------------+--------+----------+----------------+-------------------------+------------------+--------+---------+
The output shows this is a short form for
/opt/mgrid/xfm3-bootstrap/scripts/rabbitmqcmd.py broker list queues
.
rabbitmqcmd.py
is a wrapper for the RabbitMQ CLI management tool
rabbitmqadmin
. rabbitmqcmd.py
requires either gateway
or broker
as first argument to resolve connection details to pass on to
rabbitmqadmin
.
To specify only the name
and messages
column:
$ xfmctl --roles=singlenode messaging:"broker list queues name messages"
[127.0.0.1] Executing task 'messaging'
[127.0.0.1] sudo: /opt/mgrid/xfm3-bootstrap/scripts/rabbitmqcmd.py broker list queues name messages
[127.0.0.1] out: +----------------------+----------+
[127.0.0.1] out: | name | messages |
[127.0.0.1] out: +----------------------+----------+
[127.0.0.1] out: | dlx-errors-load | 0 |
[127.0.0.1] out: | dlx-errors-transform | 0 |
[127.0.0.1] out: | dlx-load | 0 |
[127.0.0.1] out: | dlx-sequencer | 0 |
[127.0.0.1] out: | dlx-transform | 0 |
[127.0.0.1] out: | errors-load | 0 |
[127.0.0.1] out: | errors-transform | 0 |
[127.0.0.1] out: | load-sql | 0 |
[127.0.0.1] out: | pond-seq | 4095 |
[127.0.0.1] out: | transform-rim | 0 |
[127.0.0.1] out: | unrouted | 0 |
[127.0.0.1] out: +----------------------+----------+
For the gateway:
$ xfmctl --roles=singlenode messaging:"gateway list queues name messages"
[127.0.0.1] Executing task 'messaging'
[127.0.0.1] sudo: /opt/mgrid/xfm3-bootstrap/scripts/rabbitmqcmd.py gateway list queues name messages
[127.0.0.1] out: +--------------------+----------+
[127.0.0.1] out: | name | messages |
[127.0.0.1] out: +--------------------+----------+
[127.0.0.1] out: | dlx-errors-ingress | 0 |
[127.0.0.1] out: | dlx-ingress | 0 |
[127.0.0.1] out: | errors-ingress | 1 |
[127.0.0.1] out: | ingress-cdar2 | 0 |
[127.0.0.1] out: | ingress-fhir | 0 |
[127.0.0.1] out: +--------------------+----------+
Show active connections:
$ xfmctl --roles=singlenode messaging:"broker -f tsv -q list connections name"
[127.0.0.1] Executing task 'messaging'
[127.0.0.1] sudo: /opt/mgrid/xfm3-bootstrap/scripts/rabbitmqcmd.py broker -f tsv -q list connections name
[127.0.0.1] out: 192.168.122.43:39463 -> 192.168.122.43:5672
[127.0.0.1] out: 192.168.122.43:39470 -> 192.168.122.43:5672
[127.0.0.1] out: 192.168.122.43:39471 -> 192.168.122.43:5672
[127.0.0.1] out: 192.168.122.43:39482 -> 192.168.122.43:5672
Purge a specific queue:
$ xfmctl messaging:"broker purge queue name\=errors-sql"
Note
In the provided command (in quotes) certain characters should be escaped,
most notably the =
character. This is needed due to the way the argument
parsing is implemented. For example:
.. code-block:: bash
$ xfmctl –roles=singlenode messaging:”broker get queue=errors-sql”
5.3. Message archiving and replay¶
xfmctl
contains a command qarchive
for simple archiving and replaying
of messages. This can be of use to retry messages in error or dead-letter
queues.
qarchive
downloads messages to disk on the management server, path
/var/lib/xfm/qarchive
. The messages are stored in TAR format with metadata
(PAX headers).
Archive maximum of 100 messages (default) from the errors-ingress
queue:
$ xfmctl --roles=singlenode qarchive:archive,errors-ingress,100
[127.0.0.1] Executing task 'qarchive'
[127.0.0.1] sudo: /opt/mgrid/xfm3-bootstrap/scripts/qarchive.py /etc/xfm/xfm.json archive errors-ingress 100
Download archive to local machine:
$ xfmctl --roles=singlenode qarchive:get,errors-ingress
[127.0.0.1] Executing task 'qarchive'
[127.0.0.1] download: /home/xfmadmin/127.0.0.1/errors-ingress.tar <- /var/lib/xfm/qarchive/errors-ingress.tar
The archive (TAR) is downloaded to a directory with the name of the originating IP address.
When an archive already exists for a queue (on the node), it has to be removed before it can be archived again:
$ xfmctl --roles=singlenode qarchive:remove,errors-ingress
[127.0.0.1] Executing task 'qarchive'
[127.0.0.1] sudo: /opt/mgrid/xfm3-bootstrap/scripts/qarchive.py /etc/xfm/xfm.json remove errors-ingress 100
[127.0.0.1] out: Removing archive for errors-ingress, are you sure? [y/n]: y
To replay messages:
$ xfmctl --roles=singlenode qarchive:replay,errors-ingress
[127.0.0.1] Executing task 'qarchive'
[127.0.0.1] sudo: /opt/mgrid/xfm3-bootstrap/scripts/qarchive.py /etc/xfm/xfm.json replay errors-ingress 100
The replay command publishes the archived messages to the relevant exchange
such that XFM workers can process it. For example, archived messages from the
errors-ingress
queue are published to the ingress
exchange.
5.4. Decommission loaders¶
Loaders have ponds which hold state; they store an unique identifier range for copying messages to the lake. To retire a pond (e.g., before an upgrade) and return unused identifiers, the loader needs to be decommissioned.
$ xfmctl --roles=singlenode decommission
[127.0.0.1] Executing task 'decommission'
[127.0.0.1] sudo: touch /etc/xfm/decommission
[127.0.0.1] Executing task 'supervisor'
[127.0.0.1] sudo: /usr/bin/supervisorctl -c /etc/xfm/supervisord/supervisord.conf restart xfm-loader:
[127.0.0.1] out: xfm-loader:xfm-loader-0: stopped
[127.0.0.1] out: xfm-loader:xfm-loader-0: started
The decommission
command decommissions all loaders (and their ponds) on the
selected nodes. It creates the file /etc/xfm/decommission
which is checked
for existence by the loaders. To re-initilize the loaders simply delete this
file and restart the loaders:
$ xfmctl --roles=singlenode -- sudo rm /etc/xfm/decommission
$ xfmctl --roles=singlenode supervisor:"restart xfm-loader:"
Note the use of xfm-loader:
which indicates all components in the
xfm-loader
group.
5.5. Advanced usage¶
Under the hood xfmctl
integrates the Fabric command line tool, and inherits its powerful features. Check the
Fabric documentation for details. Below some examples are listed.
Execute an arbitrary command on all loaders:
$ xfmctl --roles=loader -- ls
Sequentially open a shell on each transformer:
$ xfmctl --roles=transformer -- bash
To execute a command on a specific instance (instead of all instances with a specific role) you can first list the network addresses of each instance:
$ xfmctl list
After which the network address can be used to execute a command:
$ xfmctl --hosts=<ADDRESS> update
Or exclude a specific instance:
$ xfmctl --roles=ingester --exclude-hosts=<ADDRESS> -- sudo yum update
Run commands in parallel:
$ xfmctl --parallel --roles=ingester,transformer,loader -- sudo yum update