Mucho se habla de IoT (Internet of Things, Internet de las cosas) por estos días. A partir de la llegada al mercado de dispositivos de cómputo de bajo costo, tamaño y consumo de energía (como por ejemplo Arduino, Raspberry Pi, CubieBoard y otros), se ha multiplicado el número de proyectos y dispositivos que recopilan datos y los envían por Internet.

Como no podía ser menos, también se va a hablar un poco sobre IoT en este blog. Este artículo presenta el protocolo de comunicación M2M (machine-to-machine) MQTT y sus capacidades para enviar datos entre dispositivos y sistemas heterogéneos. Puntualmente trata la implementación de un cliente Java (utilizando la implementación de MQTT provista por Eclipse Paho y el broker de mensajes Moquette), para leer datos publicados por un dispositivo de medición de temperatura y humedad desde un sistema GNU/Linux.



IoT

El término IoT (Internet of Things) surgió en 2009 cuando comenzaron a interconectarse dispositivos físicos a través de Internet. La idea es simple: los dispositivos físicos pueden intercambiar datos entre sí, o ser controlados por otros dispositivos. Algunos ejemplos pueden ser refrigeradores, automóviles, edificios y cualquier dispositivo electrónico. Uno de los casos de uso más comunes es el de la recolección, transmisión, consolidación y visualización de datos provenientes de todo tipo de sensores (presión, temperatura, humedad, luz, nivel, etc.). Los resultados pueden presentarse luego a través de un servidor Web (sitio Web o aplicación para dispositivos móviles), junto con alarmas cuando se superan ciertos umbrales y otras funcionalidades.

MQTT

MQTT fue desarrollado por Andy Stanford-Clark y Arlen Nipper en 1999 para monitorear un tanque de combustible ubicado en el desierto. La idea era contar con un protocolo que sea eficiente en cuanto a consumo de ancho de banda y requiera poca energía, debido a que los dispositivos estaban conectados vía satélite, lo cual era extremadamente costoso en esa época. El protocolo utiliza una arquitectura de publicación/suscripción en contraste al paradigma solicitud/respuesta de HTTP y otros protocolos. Esta arquitectura publicar/suscribirse está basada en eventos y permite que los mensajes sean enviados a los clientes de manera asincrónica (push) en lugar de en forma de respuesta a una solicitud. Para ello es necesario contar con un punto central de comunicación, llamado "broker", el cual se encarga de despachar todos los mensajes entre los emisores y los receptores interesados. Cada cliente que publica un mensaje al broker incluye un tema (topic), el cual es utilizado como mecanismo de direccionado en el broker. Todos los clientes que desean recibir mensajes se suscriben a uno o varios temas, y a partir de ese momento, el broker reenviará todos los mensajes asociados a los mismos. Por lo tanto los clientes no necesitan conocerse unos a otros, sólo se comunican mediante topics a través de un mismo broker. Esta arquitectura permite soluciones altamente escalables sin dependencias entre productores y consumidores de datos.

La diferencia con HTTP es que un cliente no necesita hacer una solicitud para obtener la información que necesita, sino que el broker se la envía (sin que el cliente la solicite) en el momento en que ésta se produce. Por lo tanto cada cliente MQTT mantiene abierta una conexión TCP con el broker de manera permanente. Si esta conexión es interrumpida por alguna circunstancia, el broker puede mantener en un buffer los mensajes que se produzcan, y enviárselos luego al cliente cuando se restablezca la conexión.

Entonces, el concepto central de MQTT para despachar mensajes son los topics. Un topic es una simple cadena que puede tener varios niveles de jerarquía, separados por barras (/). Por ejemplo, un topic para enviar datos de temperatura del comedor de una casa podría ser "/casa/comedor/temperatura". Un cliente puede suscribirse a un tema exacto, o puede utilizar comodines (+ y #). La suscripción a "/casa/+/temperatura" obtendrá todos los mensajes enviados al topic "/casa/comedor/temperatura", al igual que a cualquier otro topic con un valor arbitrario en lugar de "comedor", por ejemplo "/casa/cocina/temperatura" o "/casa/dormitorio/temperatura". El símbolo de suma representa un comodín de un sólo nivel, y sólo permite valores arbitrarios para una jerarquía. Para suscribirse a varios niveles se utiliza el wildcard #. Este permite suscribirse a un tema y todos sus subniveles. Por ejemplo el topic "/casa/#" permite recibir todos los mensajes pertenecientes a los temas que comienzan con "/casa/".

Eclipse Paho

El proyecto Eclipse Paho provee una implementación Java del protocolo MQTT. Surgió en 2012 bajo el paraguas de la Fundación Eclipse con la meta de proveer implementaciones de protocolos IoT open source.

Moquette

Moquette es un broker MQTT liviano desarrollado en Java. Es un proyecto open source creado y mantenido por Andrea Selva y está listado en la wiki de la comunidad MQTT como uno de los brokers recomendados.

Manos a la obra

Sin más preámbulo, pasemos directamente a la implementación del cliente y broker Java para comunicarse con el dispositivo Arduino:

Este dispositivo obtiene datos de temperatura y humedad de dos sensores, y los envía a un broker MQTT a través de la red Ethernet. Este artículo no incluye información respecto a la implementación y configuración de este dispositivo, sino que está orientado a cualquier dispositivo que envíe datos utilizando el protocolo MQTT.

El primer paso consiste en conectar el dispositivo a la red (router/switch) o directamente al sistema donde correrá el cliente Java (utilizando un cable cruzado). Es necesario configurar una dirección IP (perteneciente a la misma subred que utiliza el dispositivo) en el host, para poder comunicarse con el dispositivo:

Esta dirección IP debe corresponder con aquella asignada al broker (donde intentará conectarse el dispositivo para enviar mensajes).

Una vez encendido el dispositivo, verificar el acceso al mismo a través de la red (en este caso, su dirección IP es 192.168.0.49):

root@vaio emi # ifconfig eth0
eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
        inet 192.168.0.2  netmask 255.255.255.0  broadcast 192.168.0.255
        inet6 fe80::32f9:edff:fec2:dbe8  prefixlen 64  scopeid 0x20<link>
        ether 11:22:33:44:55:66  txqueuelen 1000  (Ethernet)
        RX packets 299  bytes 17940 (17.5 KiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 22  bytes 2456 (2.3 KiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

emi@vaio ~ $ ping -c 5 192.168.0.49
PING 192.168.0.49 (192.168.0.49) 56(84) bytes of data.
64 bytes from 192.168.0.49: icmp_seq=1 ttl=128 time=0.072 ms
64 bytes from 192.168.0.49: icmp_seq=2 ttl=128 time=0.070 ms
64 bytes from 192.168.0.49: icmp_seq=3 ttl=128 time=0.071 ms
64 bytes from 192.168.0.49: icmp_seq=4 ttl=128 time=0.104 ms
64 bytes from 192.168.0.49: icmp_seq=5 ttl=128 time=0.091 ms

--- 192.168.0.49 ping statistics ---
5 packets transmitted, 5 received, 0% packet loss, time 3999ms
rtt min/avg/max/mdev = 0.070/0.081/0.104/0.016 ms

Habiendo comprobado el acceso al dispositivo, verificar que el intente conectarse al broker abriendo el puerto 1883 con netcat:

root@vaio emi # nc -vv -n -l -s 192.168.0.2 -p 1883
listening on [192.168.0.2] 1883 ...
connect to [192.168.0.2] from (UNKNOWN) [192.168.0.49] 49294
Cliente DHT22

Perfecto, se observa claramente cómo el dispositivo se conecta al puerto 1883. Ya es posible comenzar con la implementación.

Antes de iniciar el desarrollo de la implementación es necesario instalar Java en el sistema donde correrá el cliente. En los sistemas basados en Red Hat, es necesario instalar el paquete OpenJDK con el sufijo -devel para poder compilar fuentes .java.

Crear un directorio de prueba para la implementación:

$ mkdir test
$ cd test/

Luego se procede con la descarga de Moquette y Paho. Descargar y extraer Moquette:

$ wget https://bintray.com/artifact/download/andsel/generic/distribution-0.8-bundle-tar.tar.gz
$ mkdir moquette
$ cd moquette/
$ tar xzf ../distribution-0.8-bundle-tar.tar.gz

Luego simplemente se lanza el servidor Moquette:

$ cd bin/
$ sh moquette.sh

El servidor abre por defecto el puerto 1883, por lo que es necesario que se permita el acceso en el firewall. Inmediatamente se observa que el dispositivo se conecta al broker y envía los primeros mensajes:

emi@vaio bin $ sh moquette.sh
                                                                        
  ___  ___                       _   _        ___  ________ _____ _____ 
  |  \/  |                      | | | |       |  \/  |  _  |_   _|_   _|
  | .  . | ___   __ _ _   _  ___| |_| |_ ___  | .  . | | | | | |   | |  
  | |\/| |/ _ \ / _\ | | | |/ _ \ __| __/ _ \ | |\/| | | | | | |   | |  
  | |  | | (_) | (_| | |_| |  __/ |_| ||  __/ | |  | \ \/' / | |   | |  
  \_|  |_/\___/ \__, |\__,_|\___|\__|\__\___| \_|  |_/\_/\_\ \_/   \_/  
                   | |                                                  
                   |_|                                                  
                                                                        
0    [main] INFO  Server  - Persistent store file: /data/HT22/test/moquette/bin/moquette_store.mapdb
159  [main] INFO  MapDBPersistentStore  - Starting with existing [/data/HT22/test/moquette/bin/moquette_store.mapdb] db file
821  [main] INFO  FileAuthenticator  - Loading password file: /data/HT22/test/moquette/config/password_file.conf
1672 [main] INFO  NettyAcceptor  - Server binded host: 0.0.0.0, port: 1883
1686 [main] INFO  NettyAcceptor  - Server binded host: 0.0.0.0, port: 8080
Server started, version 0.8
2104 [nioEventLoopGroup-3-1] INFO  ClientSession  - cleaning old saved subscriptions for client <Cliente DHT22>
2157 [nioEventLoopGroup-3-1] INFO  ProtocolProcessor  - Connected client ID <Cliente DHT22> with clean session true
2157 [nioEventLoopGroup-3-1] INFO  ProtocolProcessor  - CONNECT processed
2159 [nioEventLoopGroup-3-1] INFO  ClientSession  - <Cliente DHT22> subscribed to topicFilter </Nodo/#> with QoS 0 - MOST_ONE
2382 [nioEventLoopGroup-3-1] INFO  ProtocolProcessor  - PUBLISH from clientID <Cliente DHT22> on topic </sensorA/temp> with QoS MOST_ONE
2385 [nioEventLoopGroup-3-1] INFO  ProtocolProcessor  - PUBLISH from clientID <Cliente DHT22> on topic </sensorA/humidity> with QoS MOST_ONE
2657 [nioEventLoopGroup-3-1] INFO  ProtocolProcessor  - PUBLISH from clientID <Cliente DHT22> on topic </sensorB/temp> with QoS MOST_ONE
2659 [nioEventLoopGroup-3-1] INFO  ProtocolProcessor  - PUBLISH from clientID <Cliente DHT22> on topic </sensorB/humidity> with QoS MOST_ONE

El dispositivo cuenta con dos sensores de temperatura y humedad, y publica los topics "/sensorA/temp", "/sensorB/temp", "/sensorA/humidity" y "/sensorB/humidity", tal como puede observarse en la captura anterior.

Para continuar, dejar el servidor Moquette corriendo y abrir otra consola. Cabe destacar que al momento de poner un broker Moquette en producción será necesario proteger adecuadamente su acceso e implementar conexiones seguras con SSL. Para tal fin, revisar la documentación de Moquette.

Por otra parte, el servidor Moquette por defecto utiliza IPv6, por lo que es necesario que esté habilitado a nivel sistema operativo, y el puerto 1883 abierto en el firewall de IPv6 (en sistemas GNU/Linux suele estar separado):

emi@vaio ~ $ netstat -tulpn | grep 1883
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
tcp6       0      0 :::1883                 :::*                    LISTEN      2291/java

El siguiente paso consiste en descargar el paquete Paho:

$ cd test/
$ wget http://search.maven.org/remotecontent?filepath=org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.1.0/org.eclipse.paho.client.mqttv3-1.1.0.jar -O org.eclipse.paho.client.mqttv3-1.1.0.jar

Ahora es posible comenzar el desarrollo del cliente Java.

Desarrollo del cliente MQTT

El paquete Paho provee la clase MqttClient, la cual provee la funcionalidad necesaria para comunicarse con un broker MQTT (enviar y recibir mensajes). El primer paso entonces, es definir una clase personalizada que contenga una instancia de la clase MqttClient, la cual será llamada Subscriber. Esta clase tiene la funcionalidad mínima para conectarse con el broker y suscribirse a los topics enviados por el dispositivo: "/sensorA/#" y "/sensorB/#".

import org.eclipse.paho.client.mqttv3.*;

public class Subscriber {

     public static final String BROKER_URL = "tcp://192.168.0.2:1883";
     private MqttClient client;

     public Subscriber() {

          String clientId = "demonio";
          try {
              client = new MqttClient(BROKER_URL, clientId);
          }
          catch (MqttException e) {
              e.printStackTrace();
              System.exit(1);
          }
     }

     public void start() {

          try {
              client.setCallback(new SubscribeCallback());
              client.connect();
              client.subscribe("/sensorA/#");
              client.subscribe("/sensorB/#");
          }
          catch (MqttException e) {
              e.printStackTrace();
              System.exit(1);
          }

     }

}

La constante BROKER_URL almacena la URL del broker MQTT con el cual comunicarse. A su vez, client es la instancia de MqttClient.

El constructor de la clase simplemente crea la nueva instancia. Para ello se requiere un id de cliente (una forma de identificación ante el broker). En este caso de ejemplo se utiliza un valor por defecto ("demonio"). La mejor práctica consiste en sobrecargar el constructor para permitir enviar el id de cliente como parámetro al momento de instanciar la clase. El constructor de la clase MqttClient puede arrojar una excepción "MqttException", por lo cual es indispensable capturarla al momento de la creación del objeto.

Luego se define un método start con el cual se conecta al broker (método connect), y se suscribe a los topics anteriormente mencionados (método subscribe). Sin embargo, notar que antes se define un objeto callback, el cual será encargado de gestionar la llegada de mensajes desde el broker. Recordar que se trata de una arquitectura basada en eventos y se mantiene una conexión TCP abierta con el broker desde donde se reciben los mensajes de manera asincrónica.

El objeto callback debe pertenecer a una clase que implemente a la interfaz MqttCallback. En este caso la clase se llama SubscribeCallback:

import org.eclipse.paho.client.mqttv3.*;
import java.lang.Throwable;

public class SubscribeCallback implements MqttCallback {

     @Override
     public void connectionLost(Throwable cause) {}

     @Override
     public void messageArrived(String topic, MqttMessage message) {
          System.out.println(topic + ": " + message.toString());
     }

     @Override
     public void deliveryComplete(IMqttDeliveryToken token) {}

}

La interfaz MqttCallback define tres métodos que deben ser implementados: connectionLost, messageArrived y deliveryComplete. El primero es llamado cuando la conexión con el broker se cierra de forma inesperada. Este es el lugar adecuado para implementar la lógica de reconexión. messageArrived es invocado cada vez que se recibe un mensaje desde el broker. Por último, deliveryComplete es invocado luego de que un mensaje con QoS 1 ó 2 llega al broker. Para este ejemplo basta con implementar el método messageArrived, el cual gestiona la recepción de mensajes.

En este ejemplo simplemente se imprime por salida estándar el topic y mensaje recibidos.

Por último, definir una clase adicional para iniciar el cliente y mantenerlo en ejecución indefinidamente:

public class demonio {

     public static void main(String[] args) {

         Subscriber client = new Subscriber();

         client.start();

         while (true) {}

     }

}

Compilar todo incluyendo el .jar de Paho:

$ javac -cp ".:./org.eclipse.paho.client.mqttv3-1.1.0.jar" *.java

Luego, lanzar el demonio:

$ java -cp ".:./org.eclipse.paho.client.mqttv3-1.1.0.jar" demonio

En la salida del servidor Moquette se observa que el cliente se suscribe correctamente a los tópicos publicados por el dispositivo:

1221359 [nioEventLoopGroup-3-6] INFO  ProtocolProcessor  - Create persistent session for clientID <demonio>
1221363 [nioEventLoopGroup-3-6] INFO  ClientSession  - cleaning old saved subscriptions for client <demonio>
1221388 [nioEventLoopGroup-3-6] INFO  ProtocolProcessor  - Connected client ID <demonio> with clean session true
1221388 [nioEventLoopGroup-3-6] INFO  ProtocolProcessor  - CONNECT processed
1221429 [nioEventLoopGroup-3-6] INFO  ClientSession  - <demonio> subscribed to topicFilter </sensorA/#> with QoS 1 - LEAST_ONE
1221457 [nioEventLoopGroup-3-6] INFO  ClientSession  - <demonio> subscribed to topicFilter </sensorB/#> with QoS 1 - LEAST_ONE

Y una vez que el dispositivo envía un nuevo mensaje, se observa la siguiente salida en el demonio:

/sensorA/temp: 26.70
/sensorA/humidity: 58.00
/sensorB/temp: 28.00
/sensorB/humidity: 56.00

¡A jugar con Arduino y MQTT!

Referencias

MQTT 101 - How to Get Started with the lightweight IoT Protocol

servers - mqtt/mqtt.github.io Wiki

andsel/moquette: Java MQTT lightweight broker

Paho - Open Source messaging for M2M


Tal vez pueda interesarte


Compartí este artículo