Saturday, November 10, 2012

The Polyglot Rabbit: Examples of Multi-Protocol Queues in RabbitMQ

 

Abstract

RabbitMQ (http://www.rabbitmq.com) is a popular Open Source message queuing system that implements the Advanced Message Queuing Protocol (AMQP). It has been estimated that there are presently some 30,000 production deployments of RabbitMQ across the globe, and this number is growing rapidly. Most of these deployments are business-critical, underpinning everything from internet-based pizza ordering systems through to providing the central nervous system for OpenStack-based cloud deployments. RabbitMQ currently supports versions 0.8.0 and 0.9.1 of AMQP and will soon also provide support for 1.0. However, a somewhat overlooked capability of RabbitMQ is its ability to also readily provide support via its flexible plugin architecture for a variety other popular Open Source message queuing protocols, including STOMP, MQTT, ZeroMQ, and RESTful messaging via the RabbitHub plugin.  Most good message queuing protocols share many features in common; however some are better suited to a particular set of use cases than others. This ability of RabbitMQ to be able to seamlessly receive and propagate messages simultaneously via multiple protocols is an extremely powerful facility, and one that affords great flexibility. For example, it means that it is possible to use the most appropriate protocol for a particular function or to simultaneously use different protocols to disseminate the same data to different types of users via the most appropriate protocol without having to develop and maintain any separate gateway components. The following text discusses this ability of RabbitMQ to support multiple message queuing protocols and presents a number of simple examples to illustrate how this facility may be used.

Introduction

I like visiting Belgium. Aside from the fact the food is good and the beer excellent, the people are friendly and I never have any communication problems, as just about everyone speaks and understands English. I have always taken the somewhat arrogant position that I do not need to learn another language; however the truth of the matter is that I am not sure whether I could do it, and it fascinates me greatly to observe how the vast majority of the Belgian population seem to be able to speak at least three languages and seamlessly interchange between them as the need arises (such as when dealing with monoglots such as myself). Generally speaking my Belgian friends will have a preferred language (one in which they are strongest) and for sure there may be occasions when a particular word or colloquialism from one language will not be understood or will not be interpreted correctly; however for the most part the mapping is exceptionally good in terms of both accuracy and efficiency. Nor does it seem to be a particular bother for my friends to contend with my inadequacies; for them it is simply a normal part of life to speak multiple languages, doing so even as part of day-to-day family life, where husband and wife may have a different preferred language, and children will from birth learn to communicate in both.

There are clearly advantages to being able to communicate with people in multiple languages, and in an analogous fashion there are advantages to message queuing software being able to accommodate and seamlessly map between different message queuing protocols. The world is not uniform, and while one day there may be something approaching a common language that is spoken and understood by just about everyone, there will always be accents, colloquialisms, and other divergences that will ensure that total uniformity  will never be achieved. 

While it is perhaps somewhat easier to argue the merits and feasibility of having a standardised messaging protocol over the merits and feasibility of having a standardised language for the entire human race, the notion of a “one-size-fits-all” messaging protocol is similarly flawed. Some messaging protocols are optimised for low-latency transmission of small messages, while others are designed for efficient transmission of large messages; some messaging protocols sacrifice performance for reliability, while others favour performance at the risk of occasional message loss, and so on. Any attempt to accommodate all possible messaging scenarios within the scope of a single ubiquitous protocol must entail some level compromise, thereby making the resultant protocol less well-suited to one or more messaging scenarios over a less generic protocol that has been optimally designed for a specific use case.  

Evolution has not resulted in one type of plant or animal; it has resulted in a vast array of forms with unique characteristics that allow them to take optimal advantage of their particular environment. Similar comments are applicable to message queuing protocols (and indeed to all other aspects of software). While most good message queuing protocols share many features in common, some are better suited to a particular task than others. Nor does evolution stand still, and what may seem like a good idea today may not be such a good idea tomorrow. The key therefore is adaptability. Just as with plant and animal species, if software cannot readily (and potentially rapidly) adapt to change then it will become extinct. 

RabbitMQ (http://www.rabbitmq.com) is a popular Open Source message queuing system that implements the Advanced Message Queuing Protocol (AMQP). RabbitMQ currently supports versions 0.8.0 and 0.9.1 of AMQP and will soon also provide support for AMQP 1.0. However, a somewhat overlooked but extremely powerful capability of RabbitMQ is its ability to also readily provide support for other popular message queuing protocols via its flexible plugin architecture. For example, there are plugins[1] available for STOMP (http://stomp.github.com/), MQTT (http://mqtt.org/),  ZeroMQ (http://www.zeromq.org/)[2], and for RESTful messaging via HTTP (https://github.com/tonyg/rabbithub). Each of these protocols was designed to address a particular subset of messaging use cases, and while AMQP is for the most part able to accommodate all of these use cases, the specific protocols are better-suited to the particular set of use cases for which they were specifically designed. For example, MQTT (Message Queue Telemetry Transport) was designed to facilitate the transfer of data from pervasive devises over high-latency or otherwise constrained network links. This task may be achieved using AMQP, but MQTT is arguably a more appropriate choice of protocol for these types of scenario. 

The fact that the RabbitMQ broker can support the MQTT protocol via a plugin and can map between MQTT and its native AMQP protocol means for example that data can be published into the messaging environment via MQTT and disseminated to consumers via AMQP without developers needing to implement any specific gateway components; they only need to write the MQTT producer and AMQP consumer functions. Alternatively, data received via MQTT may be disseminated via one of the other protocols listed above using the relevant plugin, or potentially a combination of protocols could be used in tandem – a message published to a fanout or topic exchange in the RabbitMQ broker via the STOMP protocol may for example be simultaneously consumed via AMQP, HTTP, ZeroMQ, or STOMP-based consumers. 

Being able to seamlessly publish messages via one protocol and consume them via another (or via a combination of others) using this plugin model is an incredibly powerful concept, and one that illustrates very well not only the richness and flexibility of the AMQP 0.9.1 model but also the flexibility and adaptability of RabbitMQ and of the Erlang programming language, in which RabbitMQ is written (see http://www.erlang.org). 

It should be noted that all of the protocols mentioned above are essentially Open Standards, and indeed one of the key motivations behind the creation of AMQP was to escape the “middleware hell” (as it was described by John O’Hara, one of the fathers of AMQP) of having multiple proprietary message queuing products that speak different languages (protocols) scattered across the enterprise and having to integrate between them by developing complicated gateways and adapters, which then require costly on-going maintenance and support. However there is a significant difference between developing gateways and adapters to bridge proprietary messaging technologies and having a core open messaging technology that can be readily extended via plugins to facilitate multi-protocol communication amongst a set of Open Standards-based messaging protocols. To extend the language analogy somewhat, it might be likened in part to hiring and paying a translator versus learning to speak the language oneself; the latter being a decidedly better option on multiple levels, assuming you have the aptitude for it. 

The following text presents and discusses some simple examples of how these RabbitMQ protocol plugins may be used. These examples by no means provide a complete illustration of everything that can be done, but are intended to illustrate some of the basic permutations that can be achieved, which will hopefully serve to stimulate the reader to come up with other ideas. For additional insights into what other permutations may be possible and for details about installing and configuring the plugins, the reader should refer the relevant plugin documentation (see links provided above).

Some examples

For the purposes of developing and testing the examples presented below (and for a bit of fun) a 6-node RabbitMQ cluster was created on HP Cloud (http://www.hpcloud.com) with 2 nodes in each Availability Zone (AZ). The configuration details common to all cluster nodes were as follows:

  •  Ubuntu 12.04 LTS 64-bit, 8 vCPU, 32GB RAM
  • RabbitMQ 2.8.7 (specifically build 2.8.7.31106 or later, as this includes some important fixes to the MQTT plugin)
  • RabbitMQ STOMP 2.8.7 plugin
  • RabbitMQ MQTT 2.8.7 plugin
  • RabbitHub plugin (version 0.0.1)

The cluster nodes in AZ1 had the RabbitHub plugin enabled; the nodes in AZ2 had the STOMP plugin enabled; and the nodes in AZ3 had the MQTT plugin enabled. The RabbitMQ Management Plugin (http://www.rabbitmq.com/management.html) was enabled on all cluster nodes. The example clients (producers and consumers) described below were run from a personal laptop that communicated with the cluster over the internet. 

Whilst such a lavish configuration is most certainly not required in order to experiment with the different protocol plugins (a considerably smaller single RabbitMQ instance would generally suffice), using a cluster with different plugins enabled on different nodes serves as a means of demonstrating the perhaps obvious fact that messages published via one protocol to one of the cluster nodes can be consumed via different protocols from other cluster nodes. 

A crude illustration of the cluster configuration is provided below.

RabbitMQ cluster
Figure 1 Simple illustration of the RabbitMQ cluster configuration used to test the examples discussed below. Note that such a configuration is not required for this purpose. However it does serve to further illustrate the flexibility of RabbitMQ and Erlang in terms of the types of configurations that are possible.


It should also be noted at this point that there is nothing particularly special that needs to be done in order to install RabbitMQ or to set up a RabbitMQ cluster on HP Cloud other than to ensure that all necessary ports are open for the relevant security group(s). Depending on the types of virtual instance used, the root file system (/) can be quite small relative to the available RAM, and it may therefore also be wise to consider placing the Mnesia partition under /mnt (which is generally much larger) in order to avoid any flow control issues associated with RabbitMQ’s disk space monitoring.

Example #1:

The first example illustrates using a simple STOMP consumer to consume messages published to the cluster via AMQP. The code for the consumer is shown below. 

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "stompdef.h"


static void die(void *handle)
{
    fprintf(stderr, "%s\n", STOMP_Errstr(handle));

    exit(EXIT_FAILURE);
}

static int foobar(void *handle, char *idata, size_t *ilen, char **odata, size_t *olen)
{
    fprintf(stderr, "%.*s\n", (int) *ilen, idata);

    return (0);
}

int main()
{
    void *                 handle = NULL;

    if ((handle = STOMP_Init()) == NULL)
    {
       die(handle);
    }

    if (STOMP_Connect(handle, "az2-2xl-1", 61613, "guest", "guest") == -1)
    {
       die(handle);
    }

    if (STOMP_Subscribe(handle, "/queue/stomp", 0, 0) == -1)
    {
       die(handle);
    }

    if (STOMP_Register(handle, "/queue/stomp", foobar) == -1)
    {
       die(handle);
    }
    while (1)
    {
       if (STOMP_Consume(handle, NULL, NULL) == -1)
       {
          fprintf(stderr, "%s\n", STOMP_Errstr(handle));
          break;
       }
    }

    if (STOMP_Disconnect(handle) == -1)
    {
       die(handle);
    }

    STOMP_Done(handle);
    return (0);
}

The consumer uses a simple C API developed by the author that supports STOMP 1.1[3] and currently runs on HP OpenVMS and on most UNIX/Linux variants[4]. A detailed description of the API is beyond the scope of this document; however the operation of the API should hopefully be reasonably evident from examining the sample code. Possibly the only aspect of the code that warrants some explanation at this time is the STOMP_Register() function. This function can be used to associate a callback function (foobar() in this case) with a particular destination. When STOMP_Consume() consumes a message, it will examine the value of the destination header and will invoke the associated callback, if one has been registered. A default callback may also be specified, or STOMP_Consume() can itself return directly the details of any messages that are read. To keep the sample code as simple as possible, some of the more advanced features of STOMP such as acknowledgements and transactions have not been used here; however the API does support these functions.

Note that if you wish to retain consumed messages for subsequent processing, it is important to take a copy of the message, as memory allocated by the API for storage of consumed messages may be reclaimed by subsequent API calls.

Running this program will result in the creation of a persistent queue named “stomp” (if it did not already exist) under the default vhost. As with all queues, “stomp” will be automatically bound to the AMQP default exchange, and if we now publish a message to the default exchange using a routing key of “stomp”, the message will be received and displayed by the STOMP client. Running the following piece of producer code (which uses the Pika Python AMQP library) would cause the STOMP consumer to display the message “Hello STOMP consumer”. Note that the message is published to a different cluster node from that to which the consumer is connected.

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters(host='az1-2xl-1'))
chan = conn.channel()

chan.basic_publish(exchange='', routing_key='stomp', body='Hello STOMP consumer!')
conn.close()

One limitation with STOMP that should not be forgotten is that it is (as its name specifies) a text-based protocol, while AMQP can accommodate data of any type. Certainly it may be possible to get around this limitation of the STOMP protocol by encoding any binary data; however a better approach would be to use another protocol better suited to the transmission of such data, such as AMQP or MQTT. It might also be suggested that being constrained to the transmission of textual messages is not a particularly significant limitation, as the use of JSON-formatted messages is today commonplace, and such messages may contain an assortment of data types, encoded or otherwise.

Example #2:

This next example provides a simple illustration of how messages published to a fanout exchange may be consumed via both AMQP and STOMP clients. 

The following code fragment shows the producer for this example. Instead of publishing to the default exchange, messages are published to the predefined fanout exchange amq.fanout with an empty routing key (the routing key is not relevant in this situation since all queues bound to a fanout exchange receive all messages, regardless of the content of the routing key).

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters( host='az1-2xl-2'))
chan = conn.channel()

chan.basic_publish(exchange='amq.fanout', routing_key='', body='Silly message')
conn.close()

The STOMP consumer for this example is listed below. The key difference from the previous example is that instead of specifying a queue destination in the call to STOMP_Subscribe(), we specify an exchange destination (“/exchange/amq.fanout”). Internally, the STOMP plugin creates an exclusive auto-delete queue for the subscription and binds this queue to the specified exchange (amq.fanout). Note that for this example we have also not bothered to register a callback function to process messages, but instead use the last two arguments of the STOMP_Consume() call to return consumed messages and their length. 

As noted previously, if you wish to retain messages for subsequent processing, it is necessary to make a copy of them, as memory allocated internally by the API to hold message data may be freed or reused by subsequent API calls.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "stompdef.h"

static void die(void *handle)
{
    fprintf(stderr, "%s\n", STOMP_Errstr(handle));

    exit(EXIT_FAILURE);
}

int main(int argc, char *argv[])
{
    void *                 handle = NULL;
    size_t                 len;
    char *                 data;

    if ((handle = STOMP_Init()) == NULL)
    {
       die(handle);
    }

    if (STOMP_Connect(handle, "az2-2xl-2", 61613, "guest", "guest") == -1)
    {
       die(handle);
    }

    if (STOMP_Subscribe(handle, "/exchange/amq.fanout", 0, 0) == -1)
    {
       die(handle);
    }

    while (1)
    {
       if (STOMP_Consume(handle, &data, &len) == -1)
       {
          fprintf(stderr, "%s\n", STOMP_Errstr(handle));
          break;
       }

       fprintf(stderr, "Received message: %.*s\n", (int) len, data);
       fprintf(stderr, "destination: %s\n", STOMP_GetHeader(handle, "destination"));
    }

    if (STOMP_Disconnect(handle) == -1)
    {
       die(handle);
    }

    STOMP_Done(handle);
    return (0);
}

In addition to displaying the message text, the above code also outputs the value of the STOMP destination header, which is obtained using the STOMP_GetHeader() function. 

When using the STOMP plugin, it is important to understand how STOMP destinations are mapped to exchanges, queues, and routing keys. The examples provided here are trivial illustrations of what is possible in this regard, and a complete description of these mappings can be found at http://www.rabbitmq.com/stomp.html

The AMQP consumer for this example is as follows. If you run this and the STOMP consumer and then run the producer, the message will be received and displayed simultaneously by both consumers.

import pika

creds = pika.PlainCredentials(username='guest', password='guest')
params = pika.ConnectionParameters(host='az3-2xl-2', credentials=creds)
conn = pika.BlockingConnection(params)
chan = conn.channel()

res = chan.queue_declare(exclusive=True)
q = res.method.queue
chan.queue_bind(exchange='amq.fanout', queue=q)

def callback(chan, method, properties, body):
    print "%r" % (body,)

chan.basic_consume(callback, queue=q)
chan.start_consuming()

Example #3:

This next example extends the previous example by adding an HTTP consumer, using the RabbitHub plugin. 

The following trivial Ruby Sinatra script provides the necessary consumer code. The script implements a single route named “/hubsub” and specifies both get and post verbs (methods) for this route, such that HTTP GET requests for /hubsub will run the get method, and POST requests will execute the post method. 

require 'sinatra'

get '/hubsub' do
   puts "Received #{params}"
   puts "Responding to challenge request..."
   params[:"hub.challenge"]
end

post '/hubsub' do
   puts "Received message: #{params} "
end

Note that both HTTP POST and GET requests for the same route must be supported in this way, as RabbitHub uses both methods with the same URL for different purposes. 

Before the above script will be posted any messages by RabbitHub, it must first be registered. This can be achieved using a simple cURL command such as the following:

curl -vd "hub.mode=subscribe&hub.callback=http://10.1.1.251:4567/hubsub&hub.topic=foo\
&hub.verify=sync&hub.lease_seconds=86400" \
http://guest:guest@az1-2xl-1:55670/subscribe/x/amq.fanout

Running this command will cause the RabbitHub plugin (listening at http://az1-2xl-1:55670) to issue an HTTP GET request to the Ruby Sinatra script listening at http://10.1.1.251:4567/hubsub. This request consists of a simple “challenge” string that must be responded to be sending back to RabbitHub the same string. If this process completes successfully then the endpoint http://10.1.1.251:4567/hubsub will be registered with RabbitHub and this endpoint will be POSTed any messages that are published to the amq.fanout exchange. This may be readily verified by running the Pika publisher from the previous example. In addition to the STOMP and AMQP consumers receiving any messages that are published, messages should also be received and displayed by the Ruby Sinatra script.

Before moving on to the next example, there are a several points regarding the operation of RabbitHub that should be mentioned. Firstly, it should be noted that when subscribing it is possible to specify the subscription duration in seconds via the hub.lease_seconds parameter in the subscription GET request. For the cURL command shown above, a subscription duration of 1 day (86400 seconds) has been specified. Once this duration is reached, the subscription will be terminated by RabbitHub, and no more messages will be posted to the associated endpoint. If no duration is specified when creating the subscription, an essentially infinite value is used. More information regarding this and other options supported by RabbitHub may be found at https://github.com/tonyg/rabbithub.

Another important factor that must be considered when using RabbitHub to deliver messages via HTTP is the performance of HTTP relative to that of AMQP. For a given network, HTTP will invariably be considerably slower than AMQP (or indeed any of the other protocols mentioned here), and care must be taken to ensure that HTTP endpoints are able to keep up with message rates and will not precipitate RabbitMQ flow control to prevent messages being published faster than they can be consumed.

Example #4:

In addition to consuming messages via protocols other than AMQP, it is also straightforward to publish messages via other protocols. For example, the following cURL command may be used to publish a message via HTTP to the RabbitHub plugin that will be delivered into all queues that are bound to the amq.fanout exchange. Assuming that all of the consumers described in Examples #2 and #3 above were running, all would receive a copy of the message. 

curl -v -d "Hello via HTTP" \
http://guest:guest@az1-2xl-1:55670/endpoint/x/amq.fanout?hub.topic=anything

Similarly, messages may be published via the STOMP protocol. The following script uses the stomp.py Python client to publish a message into RabbitMQ via the STOMP plugin that again will be delivered into all queues bound to the amq.fanout exchange. Additional information about stomp.py may be found at http://code.google.com/p/stomppy/.

import sys
import stomp

conn = stomp.Connection([('az2-2xl-1', 61613)], 'guest', 'guest')
conn.start()
conn.connect(wait=True)
conn.send('This is a test!', destination='/exchange/amq.fanout')

conn.disconnect()


Example #5:

This and the next example illustrate the use of the relatively new MQTT adapter, which provides RabbitMQ with the ability to support the MQTT 3.1 protocol. As with the plugins used for the previous examples, it is possible via the MQTT adapter to seamlessly mix the use of MQTT with other protocols. 

The following C program illustrates a simple MQTT consumer implemented using the Paho client API (see http://andypiper.co.uk/2012/03/10/paho-gets-started/ for a good overview on getting stated with the Paho client). When this program is run, it will connect to the MQTT adapter, and assuming that the client it successfully authenticated, the adapter will create a queue for the consumer with a name derived from the MQTT client ID and QOS (quality of service), and this queue will be bound by default to the amq.topic exchange with a routing key of “MQTT Examples[5].  Once this infrastructure has been set up, the consumer will listen for messages. 

#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "MQTTClient.h"

#define ADDRESS     "tcp://az3-2xl-1:1883"
#define CLIENTID    "ExampleClientSub"
#define TOPIC       "MQTT Examples"
#define QOS         1
#define TIMEOUT     10000L

volatile MQTTClient_deliveryToken deliveredtoken;

void delivered(void *context, MQTTClient_deliveryToken dt)
{
    printf("Message with token value %d delivery confirmed\n", dt);
    deliveredtoken = dt;
}

int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
    int i;
    char* payloadptr;

    printf("Message arrived\n");
    printf("     topic: %s\n", topicName);
    printf("   message: ");

    payloadptr = message->payload;
    for(i=0; i<message->payloadlen; i++)
    {
        putchar(*payloadptr++);
    }
    putchar('\n');
    MQTTClient_freeMessage(&message);
    MQTTClient_free(topicName);
    return 1;
}

void connlost(void *context, char *cause)
{
    printf("\nConnection lost\n");
    printf("     cause: %s\n", cause);
}

int main(int argc, char* argv[])
{
    MQTTClient client;
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    int rc;
    int ch;

    MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    conn_opts.username = "guest";
    conn_opts.password = "guest";

    MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);

    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to connect, return code %d\n", rc);
        exit(-1);
    }
    printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
           "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
    MQTTClient_subscribe(client, TOPIC, QOS);

    do
    {
        ch = getchar();
    } while(ch!='Q' && ch != 'q');

    MQTTClient_disconnect(client, 10000);
    MQTTClient_destroy(&client);
    return rc;
}

If we now use the following Pika Python script to publish a message via AMQP to the amq.topic exchange using a routing key of “MQTT Examples”, the message will be received and displayed by the MQTT consumer.

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='az1-2xl-1'))
channel = connection.channel()

channel.basic_publish(exchange='amq.topic', routing_key='MQTT Examples', body='Hello World!')
connection.close()

Example #6:

This final example extends the previous example by adding an MQTT publisher and a STOMP consumer to the configuration.

The code for the publisher (again implemented using the Paho C API) is illustrated below. Messages will be published via the adapter to the RabbitMQ amq.topic exchange, whereupon they will be routed to any queues bound to the amq.topic exchange with routing key “MQTT Examples”. 

#include "stdio.h"
#include "stdlib.h"
#include "string.h"
#include "MQTTClient.h"

#define ADDRESS     "tcp://az3-2xl-1:1883"
#define CLIENTID    "ExampleClientPub"
#define TOPIC       "MQTT Examples"
#define PAYLOAD     "Hello World!"
#define QOS         1
#define TIMEOUT     10000L

int main(int argc, char* argv[])
{
    MQTTClient client;
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    MQTTClient_message pubmsg = MQTTClient_message_initializer;
    MQTTClient_deliveryToken token;
    int rc;

    MQTTClient_create(&client, ADDRESS, CLIENTID,
        MQTTCLIENT_PERSISTENCE_NONE, NULL);
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    conn_opts.username = "guest";
    conn_opts.password = "guest";

    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to connect, return code %d\n", rc);
        exit(-1);
    }
    pubmsg.payload = PAYLOAD;
    pubmsg.payloadlen = strlen(PAYLOAD);
    pubmsg.qos = QOS;
    pubmsg.retained = 0;
    MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
    printf("Waiting for up to %d seconds for publication of %s\n"
            "on topic %s for client with ClientID: %s\n",
            (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
    rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
    printf("Message with delivery token %d delivered\n", token);
    MQTTClient_disconnect(client, 10000);
    MQTTClient_destroy(&client);
    return rc;
}

In addition to consuming messages via the MQTT consumer presented in the previous example, we can also simultaneously consume these messages via STOMP simply by changing the subscription in the consumer code for Example #2 to specify the following topic destination. Alternatively an explicit exchange destination could be used (see https://www.rabbitmq.com/stomp.html for additional information).

    if (STOMP_Subscribe(handle, "/topic/MQTT Examples", 0, 0) == -1)
    {
       die(handle);
    }

Summary

It has been estimated that there are some 30,000 deployments of RabbitMQ across the globe (and this number is growing rapidly, somewhat in accordance with the reproductive rate of its mammalian namesake), handling everything from internet pizza orders through to providing the central nervous system for OpenStack-based cloud deployments. It may be speculated that the individuals who chose RabbitMQ for these deployments for the most part were not concerned about what protocol standard the software supported; this was largely an irrelevance to most of them or at most an interesting side note. What they arguably cared more about was that the software did the job required and did it well, and that it was easy to use with good administrative tools, had good support and documentation, and that there was a client API available for their language or choice. Having standards is important from an interoperability perspective, but it becomes less relevant in an Open Source world and where wire protocols are fully open and readily amenable to the implementation of client API’s in any programming language; it also becomes less relevant when the messaging product is highly adaptable and able to natively support and map between different Open Source messaging protocols and protocol versions in an efficient and seamless manner. 

RabbitMQ is a humble and modest polyglot that does not boast about its ability to speak multiple languages and different dialects of the same language[6], but perhaps it should. The ability to be able to seamlessly receive and propagate data via multiple protocols affords great flexibility. It means that it is possible to use the most appropriate protocol for a particular function or to simultaneously use different protocols to disseminate the same data to different types of users via the most appropriate mechanism without having to develop and maintain separate gateway components. Other than enabling the appropriate plugin, in general all that is required is the specification a few configuration details and potentially some level user management. For example, while AMQP might be the best way to distribute data to users directly connected to the corporate LAN, it may be more appropriate for remote field workers using mobile devices to receive the same data via HTTP, which can be readily facilitated by enabling the RabbitHub plugin and subscribing the users to the relevant data feeds. 

There are client APIs available in numerous languages for all of the messaging protocols mentioned here (ZeroMQ, STOMP, HTTP, MQTT), and none of the protocols is particularly complex to understand or use. While it is true that some of these protocol plugins are still evolving and that there are some limitations in terms of fully mapping some of the protocols to the AMQP 0.9.1 model, these matters are quite insignificant compared to the capabilities that these plugins provide. The software is actively supported by the RabbitMQ team and the wider RabbitMQ community, and it is very likely that support for additional protocols will be provided in the future, along with ongoing enhancements to existing plugins.




[1] Details of these and other plugins can be found via the RabbitMQ web site (http://www.rabbitmq.com). Note that in addition to the protocol plugins discussed in this document, there are also plugins for different authentication mechanisms, federation, message replication, management, alternative exchange types, and numerous other functions.

[2] ZeroMQ is not strictly an Open Standard; however this seems to have had little bearing on its continued impressive rate of adoption, which is perhaps indicative of the fact that the overall quality and usefulness of a particular piece of Open Source software is of considerably greater importance to its users than the standards it supports.

[3] Version 1.2 of the STOMP protocol specification was published October 22nd 2012, and includes several updates. The API will be updated in due course to include support for these updates.

[4] Details of other STOMP client implementations in various languages (and in various states of completeness) may be found at http://stomp.github.com/implementations.html. It is hoped that the C API used for some of the examples in this document can be made available in the not too distant future.

[5] An alternative exchange name and various other adapter settings can be specified in the RabbitMQ configuration file (rabbitmq.config). See http://hg.rabbitmq.com/rabbitmq-mqtt/file/default/README.md for details of the various configuration options.

[6] AMQP 1.0 is arguably not a dialect of earlier versions of the protocol, being a considerable departure from earlier versions in several respects (see http://www.amqp.org for more information), and it will be most interesting to monitor the adoption rate of 1.0 or whether developers prefer to continue using the 0.9.1 model. The motivations behind the design and inception of AMQP 1.0 are possibly a topic for subsequent discussion. There has already been much discussion on this topic – see for example http://www.250bpm.com/blog:11, which provides some excellent perspectives on the matter.

1 comment:

  1. • I very much enjoyed this article. Nice article thanks for given this information. I hope it useful to many Peopledata since Online Training Bangalore

    ReplyDelete