The Web That is Instantly On, and Always On

Kaazing Journal

Subscribe to Kaazing Journal: eMailAlertsEmail Alerts newslettersWeekly Newsletters
Get Kaazing Journal: homepageHomepage mobileMobile rssRSS facebookFacebook twitterTwitter linkedinLinkedIn


Kaazing Authors: Jason Bloomberg, Kaazing Blog, Jnan Dash, Jeremy Geelan, Stacy Gorkoff

Related Topics: Intel XML, Kaazing Journal, XML Magazine

Blog Feed Post

Using Kaazing WebSockets with MQTT

In my prior post, Adding Enterprise Features to a Socket.io Application with Kaazing WebSockets, we looked into moving from an existing Socket.io application to an Enterprise-class WebSocket application with very few changes in the code. In that example, we used an AMQP broker with the Kaazing JavaScript AMQP libraries.

While AMQP is a very powerful protocol, it requires some knowledge and understanding—a developer has to learn about exchanges, queues and so on. For mobile applications, there are added considerations of bandwidth and battery life. This is why Facebook chose the simple, yet powerful, MQTT protocol for their messenger (see Building Facebook Messenger).

In this post we will demonstrate how to make an existing AngularJS application using MQTT. To build this demo, we will use an existing Open Source MQTT library, Eclipse Paho, and demonstrate how to make it communicate over WebSocket using Kaazing WebSocket Gateway.

Let’s Start with a Working Socket.IO App

Similar to what we did in the prior post, we are going to start with my AngularJS implementation of the canonical TodoMVC application that uses Socket.IO – we will start with the code from here.

We’ve modified the code of the existing TodoMVC app (a detailed discussion can be found here) adding the following steps:

  1. Connect to Socket.io.
  2. Once connected, load the initial state of TODO items (send the message to the server asking for the initial state).
  3. Set up a callback to process received messages using the function  $scope.processReceivedCommand.
...
$scope.loadData=function(){
        var msg={
                command:"init",
                client:$scope.client
        }
        $scope.socket.emit("todomvc-rcv", msg);
        $log.info("Sent initialization!");
}
...
// Connect to pub-sub service
$scope.socket=io();

// Load initial data on connect
$scope.socket.on("connect", $scope.loadData);

// Setup receive callback
$scope.socket.on("todomvc-snd", $scope.processReceivedCommand);
...

We also added code to notify everyone about any changes in the state of the items. For example, when a new item is created we will execute the following:

...
var msg={
                command:"insert",
                item:
};
$scope.socket.emit("todomvc-rcv", msg);
...

Server side NodeJS component is exactly the same as in our prior sample. The component contains code to connect to Socket.IO, receive the new state of TODO items, and provide the new clients with initialization info by sending the message back using mostly the same code as in the client:

...
function processMessage(cmd) {
        console.log("Command: " + cmd.command + ", Received: " + cmd);
        if (cmd.command === "insert") {
                ...
        }
        else if (cmd.command === "remove") {
                ...
        }
        else if (cmd.command === "update") {
                ...
        }
        else if (cmd.command === 'init') {
                var retCmd = {
                        command: "initdata",
                        client: cmd.client,
                        items: todos
                }
                socket.emit("todomvc-snd", retCmd);
        }
}

io.on('connection', function(s){
        console.log('a user connected');
        socket=s;
        s.on('disconnect', function(){
                console.log('user disconnected');
        });
        s.on('todomvc-rcv', processMessage);
});
...

Now we can run and test the application. For detailed instructions, see README.MD

Now… Adding MQTT to an existing Socket.IO with Kaazing APIs

Before we start changing the code, we need to set up the Gateway and a message broker. While any MQTT broker will do, for the sake of simplicity we will use Eclipse Mosquitto, a very popular and easy to use tool.

Also, we will use the open source Kaazing WebSocket Gateway 5.0. We simply need to add the necessary configuration to the Gateway for it to talk to an MQTT broker.

As you can see in provided gateway-config.xml file, we need to create a proxy service that will deliver all the messages received via WebSocket to a listening MQTT broker. The proxy service accepts the WebSocket messages on URL ws://${gateway.host}:8080/mqtt and delivers the payload to tcp://localhost:1883 where the MQTT broker is running.

<gateway-config>
        ...
        <service>
                <name>mqtt-proxy</name>
                <accept>ws://${gateway.host}:8080/mqtt</accept>
                <connect>tcp://localhost:1883</connect>
                <type>proxy</type>
                <cross-site-constraint>
                        <allow-origin>*</allow-origin>
                </cross-site-constraint>
        </service>
        ...
</gateway-config>

As I mentioned earlier, we are going to use a very common open source MQTT library named Eclipse Paho to communicate with the Gateway.

Modifying Client Code

To use Eclipse Paho to communicate with the Gateway, we are going to follow the sample as shown at the Paho JavaScript Library page. Here are the steps we need to take in the main controller:

  • Configure the browser to use the Kaazing API:
...
window.WebSocket = Kaazing.Gateway.WebSocket;
...
  • Connect to the Gateway and establish all callbacks (replace $scope.socket=io() and all $scope.socket.on calls)
    • Create an instance of a client. The last parameter is a unique client ID
      ...
      $scope.mqttClient = new Paho.MQTT.Client($scope.host, $scope.port, $scope.client);
      ...
      

      As we mentioned before, the Gateway must be bound to the IP of our station. We cannot simply bind it to localhost as Raspberry PI won’t be able to connect (mentioned later in the article). To deal with the bind, we will pass a port as part of URL and read it with the following code:

      ...
      var hostPort=window.location.search.replace("?", "").split("&")[0];
      $scope.host=hostPort.split(":")[0];
      $scope.port=parseInt(hostPort.split(":")[1]);
      ...
      
    • Establish a callback for the loss of the connection
      ...
      $scope.mqttClient.onConnectionLost = function(responseObject){
              if (responseObject.errorCode !== 0) {
                      alert("Connection is lost!"+responseObject.errorMessage);
              }
      }
      ...
      
    • Establish a callback to process arriving messages.
      ...
      $scope.mqttClient.onMessageArrived = function (message) {
              $log.debug("Message Arrived: from "+message.destinationName+":"+message.payloadString);
              var cmd=JSON.parse(message.payloadString);
              if (cmd.clientID!==$scope.client){
                      $scope.processReceivedCommand(cmd);
              }
      };
      ...
      

      Note: we have to prevent the client from processing the messages it sent. To accomplish this, we are going to add the clientID attribute to every message so we can compare it with the ID of the receiving client. If the ID is the same, the message will be ignored.

    • Establish a connection. Within the callback function that will be called when connection is established, we will subscribe to the topic to receive the data and request initialization data:
      ...
      $scope.onConnect=function () {
              $scope.mqttClient.subscribe(TOPIC_SUB);
              $scope.loadData();
      }
      
      // connect the mqttClient
      $scope.mqttClient.connect({onSuccess:$scope.onConnect});
      ...
      
  • Replace $scope.socket.emit with  the function to send a message
    ...
    $scope.sendMessage=function(cmd){
            cmd.clientID=$scope.client;
            var message = new Paho.MQTT.Message(JSON.stringify(cmd));
            message.destinationName = TOPIC_PUB;
            $scope.mqttClient.send(message);
    }
    ...
    

Note: We no longer need to use different topics for publishing and subscribing. The Gateway will send published messages to all subscribed clients without the need to retransmit them in the server code!

Server Code Changes

To preserve our server-side code we are going to use theKaazing Socket.IO Wrapper for MQTTlocated in the node subdirectory of the TodoMVC implementation.

The wrapper hides the details of the MQTT protocol implementation while exposing the interface that mimics the one used in Socket.IO.

With our Socket.IO wrapper, we only need to make two changes on the server:

  • Change both send and receive topics to todomvc
  • Remove the code that retransmits messages

See https://github.com/kaazing/tutorials/blob/develop/mqtt/javascript/serverwsnoiot.js for the final server code implementation.

Now we can test our application by:

    1. Starting the application:
      $node serverwsnoit.js
    2. Opening multiple browsers and pointing them to http://localhost:3000/?<gateway host:gateway port>, e.g. http://localhost:3000/?192.168.6.153:8080

Connecting to the World of IoT

Now we have a Web application running over MQTT—the protocol that is widely-used in the Internet of Things (IoT) world. With that in mind, we can integrate our application with IoT devices; for example, to help with predictive maintenance.

Let’s outline a use-case:

  • There is a device that periodically reads the data from a mechanical sensor and sends it to a particular MQTT topic. Given that the sensor is mechanical, it has to be serviced after every X reads.
  • When the service is required, we need to:
    • Turn on perform maintenance light on a device.
    • Add an entry to our TODO application to request that the service be performed.
  • Once the service is complete and the TODO item is checked, we need to turn the maintenance light off.

To illustrate this scenario, we are going to use Raspberry PI 2 as a remote device with an LED simulating a maintenance light and a button playing the role of a mechanical device that reads the data. The following diagram illustrates the architecture of our application.

MQTT-architecture

Wiring and Electronics

Raspberry PI is an excellent little computer that runs Linux. More importantly, there is easy access to what’s called GPIO pins: a set of generic pins on an integrated circuit whose behavior—including whether it is an input or output pin—is controllable by the user at run-time.

As shown in the circuit diagram below, we are going to wire an LED to one GPIO pin and the button to another GPIO pin:

MQTT-wiring
The following picture demonstrates the complete wiring implemented using Vilros Raspberry PI Ultimate Starter Kit.

Raspberry-MQTT

Important note on the wiring: If you noticed, our circuit diagram indicates the use of GPIO 0 and GPIO 2. Actually, we wired our button and LED to the pins marked as GPIO 17 and GPIO 27. The reason for this is the GPIO pin numbers shown on the Vilros board use BCM GPIO while the Raspberry PI GPIO access library uses an abstract pin numbering scheme to help insulate software from hardware changes. The following article explains this in detail as well as provides a diagram to find the matching GPIO pins.

Programming Raspberry PI

To make things simple, we are going to write a Java application using an open source Pi4J library to interact with GPIO and the Paho MQTT Java library to communicate with Kaazing WebSocket Gateway.

Interacting with Raspberry PI GPIO

As shown on Simple GPIO Control using Pi4J example, turning the LED on and off is rather simple:

...
// Create an instance of gpio
final GpioController gpio = GpioFactory.getInstance();
// Provision pin 2 for output with initial state as ‘low’ - meaning off
final GpioPinDigitalOutput lightPin02 = gpio.provisionDigitalOutputPin(RaspiPin.GPIO_02, "Pin2", PinState.HIGH);

// Set the state to low (off) when the application quits
lightPin02.setShutdownOptions(true, PinState.LOW);
...
// Turn light on
lightPin02.setState(PinState.HIGH);
...
// Turn light off
lightPin02.setState(PinState.LOW);
...

The following GPIO State Listener Example using Pi4J illustrates how to receive the events from the button:

...
// Create an instance of GPIO
final GpioController gpio = GpioFactory.getInstance();

// Provision GPIO pin 0 for input with its internal pull down resistor enabled
final GpioPinDigitalInput button = gpio.provisionDigitalInputPin(RaspiPin.GPIO_00, PinPullResistance.PULL_DOWN);

// Create an event listener
button.addListener(new GpioPinListenerDigital() {

        @Override
        public void handleGpioPinDigitalStateChangeEvent(GpioPinDigitalStateChangeEvent event) {
                if (event.getState() == PinState.HIGH) {
                        // if event goes back to high (button pushed and released)
                        ... do something
                }
        }
});
...

See the Pi4J – Usage for more information.

Communicating with Kaazing WebSocket Gateway

Similar to the JavaScript case, we need to instruct the Paho MQTT library to use Kaazing Java WebSocket API. Implementing it in Java requires a little more work. We are going to use the same technique as implemented in Inventit MQTT over WebSocket library, except we will use the Kaazing Java WebSocket API instead of Jetty.

Here is what we are going to do:

Using the samples illustrated in http://www.hivemq.com/blog/mqtt-client-library-encyclopedia-eclipse-paho-java we connect and communicate with Kaazing WebSocket Gateway:

...
final KaazingMqttWebSocketClient client=new KaazingMqttWebSocketClient(brokerUrl, clientId);
client.connect();
client.setCallback(new MqttCallback() {
            
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
                ... process received message
        }
            
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
                // TODO Auto-generated method stub                
        }
            
        @Override
        public void connectionLost(Throwable cause) {
                LOG.error("Connection is lost: " + cause.getMessage());
        }
});    
client.subscribe(topicListenName, qos);
...

Sending messages is also very easy:

...
client.publish(topicPublishName, new MqttMessage(commandText.getBytes()));
...

Putting it All Together

Now, when we have all the components, we can put it all together:
We are going to use JSON text messages to communicate with our server:

  • Each click results in a {“button”:”click”,”clientId”:”pi1″} message being published on the topic /Devices/status.
  • Clients subscribe to the topic /Devices/command to receive maintenance notifications; the server sends commands {“clientId”:”pi1″,”maintenancelight”:”on”} or {“clientId”:”pi1″,”maintenancelight”:”off”}, which results in the LED on a specific client being turned on or off.

Our client application takes two parameters: the URL for the Kaazing WebSocket Gateway and the client ID that should be assigned to it. See https://github.com/kaazing/tutorials/blob/develop/mqtt/java/raspberry-mqtt/src/main/java/com/kaazing/mqtt/pi/Application.java for the final implementation.

Adding Predictive Maintenance Logic to NodeJS server Component

To track devices we use a devicesarray. For each device we keep a clientId, current number of clicks, and the ID of a TODO item that will be created or updated when the device needs maintenance.

Our processDeviceMessage function searches for the device in the devices array. If the device is found, we increment the number of clicks; otherwise, we create a new device entry.

Once the device clicks counter reaches a maxClicks number (in our case we set it to 10), we do the following:

  • Try to locate TODO item using the TODO item ID that is associated with the device. If the item is located, we uncheck it and send an update message to the todomvc topic; otherwise, we create a new item and send an insert message to the todomvc topic.
  • Send a maintenance light on message to a device.
...
function processDeviceMessage(cmd){
        var deviceInfo=devices[cmd.clientId];
        if (deviceInfo==null){
                deviceInfo={counter:1, itemId:getRandomInt(1, 100000), clientId:cmd.clientId};
                devices[cmd.clientId]=deviceInfo;
                console.log("Registering new device:"+cmd.clientId);
        }
        else{
                deviceInfo.counter++;
                console.log("Updating device:"+cmd.clientId+", new counter="+deviceInfo.counter);
        }
        devices[cmd.clientId]=deviceInfo;
        if (deviceInfo.counter>=maxClicks){
                var exists=false;
                var item=-1;
                for(var i=0;i<todos.length;i++){
                        if (todos[i].id===deviceInfo.itemId){
                                item=i;
                        }
                }
                if (item<0){
                        var newTodo = {
                                id: deviceInfo.itemId,
                                title: "Perform maintenance on "+deviceInfo.clientId,
                                completed: false,
                                busy: false
                        };
                        todos.push(newTodo);
                        var cmd={command:"insert", item:newTodo, clientID:"server"};
                        socket.emit("todomvc", cmd);
                }
                else{
                        todos[item].completed=false;
                        var cmd={command:"update", item:todos[item]};
                        socket.emit("todomvc", cmd);
                }
                socket.emit("/Devices/command", {clientId:deviceInfo.clientId, maintenancelight:"on"});
        }
}
...

Now we need to add another subscription to our NodeJS component. We need to subscribe to /Devices/status to receive messages from devices about the clicks that will use our processDeviceMessage function:

...
io.on('connection', function(s){
        console.log('a user connected');
        socket=s;
        s.on('todomvc', processMessage);
        s.on('/Devices/status', processDeviceMessage)
});
...

The next step is to add the code to turn the light off when the item about device maintenance is checked. We are going to add the code to the processMessage function for the item update message processing case.

...
function processMessage(cmd) {
        ...
        else if (cmd.command === "update") {
                var index = -1;
                for (var i = 0; i < todos.length; i++) {
                        if (todos[i].id === cmd.item.id) {
                                index = i;
                        }
                }
                if(todos[index].completed!=cmd.item.completed) {
                        for (var key in devices){
                                var deviceInfo=devices[key];
                                if (deviceInfo.itemId=cmd.item.id){
                                        if (cmd.item.completed){
                                                deviceInfo.counter=0;
                                                socket.emit("/Devices/command", {clientId:deviceInfo.clientId, maintenancelight:"off"});
                                                devices[deviceInfo.clientId]=deviceInfo;
                                        }
                                        else{
                                                socket.emit("/Devices/command", {clientId:deviceInfo.clientId, maintenancelight:"on"});
                                        }
                                }
                        }
                }
                todos[index] = cmd.item;
        }  
        else {
                ...
        }
}
...

If the item’s completed status changed, we try to determine whether this item is about the maintenance of a particular device. We iterate through the array of the devices, matching the id of the TODO item with the TODO item ids associated with the devices. If a device is found, we send a maintenance light on or off command based on the checked or unchecked state of the item.

As mentioned earlier, the MQTT protocol does not have no local feature, although no local is being considered for a future version. In other words, if the client publishes and subscribes to the same topic, it will receive its own messages, which in our case is not desireable. Similar to the browser code, the server also needs to check whether the message received is not one of its own. In order to check, it does the following:

  • Every message that server sends to the todomvc topic has clientID set to server; for example:
    ...
    var cmd={command:"insert", item:newTodo, clientID:"server"};
    socket.emit("todomvc", cmd);
    ...
    
  • We then add the check to processMessage function
    ...
    function processMessage(cmd) {
            if (cmd.clientID==="server")
                    return;
            ...
    }
    ...
    

Application in Action

Let’s build and start all of the components of our application using instructions specified at https://github.com/kaazing/tutorials/tree/develop/mqtt.
This video shows an application in action.

Conclusion

Its very easy to use the MQTT protocol and Kaazing WebSocket Gateway to upgrade an Socket.IO application and add WebSocket capabilities to IoT devices that use MQTT. Few changes to your existing code are needed and you can use open source MQTT libraries to work directly with Kaazing WebSocket Gateway.  Not only do you now get high-performance, advanced security, and scalability features, but Kaazing technology will extend your MQTT applications to work with mobile and Web clients and many B2B and behind-the-firewall solutions.


Read the original blog entry...

More Stories By Kaazing Blog

Kaazing is helping define the future of the event-driven enterprise by accelerating the Web for the Internet of Things.