Pub/Sub Broker

In addition to our standard REST API, we provide a Pub/Sub Broker where applications and devices can push and receive notifications (property updates, actions created, and so on) to and from the Platform. The broker currently supports MQTT and WebSockets access.

The client connects to the broker and publishes/subscribes to different topics, which are the Thngs and products it can access. The client receives push notifications of any change to resources such as properties, Thngs and locations, and actions. Similarly, the client can use the same topic channel to publish new values. For example, update the properties or location of Thngs and products or create actions.

See the Examples section for example snippets for browsers, Node.js, Mosquitto, and Eclipse Paho.

πŸ“˜

Note

If a role's permissions are changed while a user with that role has an open MQTT connection, the permission changes don't apply for that user until they reconnect.


API Status
Stable:
mqtts://mqtt.evrythng.com
wss://ws.evrythng.com/mqtt


Cross-Protocol Communication

The broker supports cross-protocol delivery and subscription. You can publish a message with one protocol (such as MQTT) and your message is delivered through all available protocols REST and WebSockets) to the listening/polling clients.

The data contained in your messages is also recorded on the EVRYTHNG Platform. You can get the latest and other historical values from the dashboard or API anytime.


Available Topics

The topics and the JSON payloads of subscription or publication messages are the same as for our REST API, allowing you to easily switch between one protocol and another.

The list below details the topics available for publishing to and when subscription notifications occur.

  • /actions

    • Publish - Create an action type.
    • Subscribe - An action type is created.
  • /actions/all

    • Publish - Create an action of any type.
    • Subscribe - An action of any type is created.
  • /actions/:type

    • Publish - Create an action of this type.
    • Subscribe - An action of this type is created, or the action type is deleted.
  • /batches/:batchId/tasks

    • Subscribe - A batch task is completed.
  • /batches/:batchId/tasks/:taskId

    • Subscribe - A task is completed on this batch.
  • /collections/:collectionId

    • Publish - Update this collection.
    • Subscribe - This collection is updated or deleted.
  • /collections/:collectionId/actions/:type

    • Publish - Create an action of this type on this collection.
    • Subscribe - An action of this type is created on this collection.
  • /collections/:collectionId/actions/all

    • Publish - Create an action of any type on this collection.
    • Subscribe - An action of any type is created on this collection.
  • /collections/:collectionId/thngs

    • Publish - Add Thngs to this collection.
    • Subscribe - Thngs are added or removed from this collection, or it is emptied.

πŸ“˜

Note

For the /collections/:collectionId/thngs topic, subscription messages are in the following format:

{
    "operation": <operation>,
    "timestamp": <timestamp>,
    "resources": [ <thngId>, ... ]
}

where <operation> is a value from: thngsAdded, thngsRemoved, thngsCleared (all Thngs were removed).

  • /products/:productId/actions/:type

    • Publish - Create an action of this type on this product.
    • Subscribe - An action of this type is created on this product.
  • /products/:productId/actions/all

    • Publish - Create an action of any type on this product.
    • Subscribe - An actio of any type is created on this product.
  • /products/:productId/properties

    • Publish - Create or update this product's properties.
    • Subscribe - This product's properties are created or updated.
  • /products/:productId/properties/:key

    • Publish - Create or update a property of this product.
    • Subscribe - A property of this product is created or updated.
  • /thngs/:thngId

    • Publish - Update this Thng's metadata (excludes properties).
    • Subscribe - This Thng's properties are updated (excludes properties).
  • /thngs/:thngId/actions/:type

    • Publish - Create an action of this type on this Thng.
    • Subscribe - An action of this type is created on this Thng.
  • /thngs/:thngId/actions/all

    • Publish - Create an action of any type on this Thng.
    • Subscribe - An action of any type is created on this Thng.
  • /thngs/:thngId/location

    • Publish - Create or update this Thng's location.
    • Subscribe - This Thng's location is created or updated.
  • /thngs/:thngId/properties

    • Publish - Create or update this Thng's properties.
    • Subscribe - This Thng's properties are created or updated.
  • /thngs/:thngId/properties/:key

    • Publish - Create or update a property of this Thng.
    • Subscribe - A property of this Thng is created or updated.

Topic options

The following subscribe options are supported in the topic string. Options are passed in query parameters, similar to URIs.

  • project - String
    Like with our REST API, scopes the subscription to the specified project.

  • pubStates - Integer
    If set to 1, clients immediately receive an initial notification containing the last value of the topic, which is directly published by the server (that is, the latest property, location, or action value).


MQTT

Host and Ports

Host: mqtt.evrythng.com
Protocol: mqtts
Port:

  • 8883 (TLS) which is the standard MQTTS port. The use of this port and MQTTS is strongly recommended as it keeps your credentials safe.

QoS

The following QoS (Quality of Service) levels are supported for both pub and sub messages:

QoS 0 - No guarantee on message delivery.
QoS 1 - Each message is delivered at least once to all subscribers.

Authentication

🚧

Warning

For secure calls in an embedded device or non-browser environment, you must download our public certificate. You can find a link to download this certificate on the Security page.

To authorize, send an API key in the CONNECT message of MQTT 3.1. Set the User Name field to the string "authorization", and the Password field to the API key for the MQTT session.

πŸ“˜

Note

Like for the REST API, make sure the API key you use is authorized to publish (PUT or POST) or subscribe (GET) to any given resource. Refer to API Key Scopes and Permissions to learn which resources each key can access.

Username: authorization
Password: $API_KEY

❗️

Security Caution

For production applications, always prefer MQTT secured with TLS (MQTTS). Any other way transmits your credentials in plain text.

MQTT Compatibility

The broker implements the MQTT Version 3.1.1 specification, so you can use any MQTT compliant library to communicate with it. A good place to find one for your device is the Eclipse Paho project.

A topic in the context of the broker is a URI-like string that identifies the resource the same way our REST API does.

For example, a topic for a specific Thng's properties: /thngs/:thngId/properties

See available topics for a full list.

πŸ“˜

Notes

  • By design of the MQTT protocol, no failure notifications report unsuccessful subscriptions. If a client subscribes with an invalid API key, topic, or both, the client isn't notified of the failure, and no PUBLISH message arrives.
  • Because the REST resources are identified in the same way as the topics are, use URL safe characters and the appropriate escape sequence for non-safe characters. For example, use %20 instead of + to encode spaces.
  • The MQTT client is expected to keep the TCP/IP connection open by sending PINGREQ messages regularly. We currently support keepalive times of up to 120 seconds. This functionality is supported out of the box by most MQTT client libraries.
  • If ?pubStates is used, the initial PUBLISH message with the current topic value might not arrive in order. It could also be duplicated by a regular update. Clients can rely on the identifiers and timestamps of the data received.
  • If ?pubStates is used, the Retain flag isn't set in the initial PUBLISH message.
  • If ?pubStates is used and the connection is recovered for any reason, the clients receive initial PUBLISH messages again with the current value of the topic.

WebSockets

The broker provides the ability to subscribe to push notifications using WebSockets. Two WebSockets are supported:

MQTT over WebSockets

The MQTT over WebSockets connection wraps the MQTT protocol commands into a WebSockets TCP connection. It behaves the same way as the MQTT support described above.

To use MQTT over WebSockets, you need the mqttws31.js library, available from our CDN.

Use the following address to access to the MQTT service over the WebSockets channel. The request is made on port 443 (HTTPS) to the following address:

wss://ws.evrythng.com/mqtt

The advantage of using the MQTT over WebSockets library instead of native WebSockets is that it allows you to subscribe to multiple topics using a single TCP connection.

An example is provided in our GitHub repository.

Native WebSockets

If using MQTT instead of the WebSockets library isn't suitable for your application, we provide native WebSockets support on the same host. This allows you to open a WebSocket on a topic resource directly. (For example, wss://ws.evrythng.com/thngs/UeECn9a8PBpar8x4FEGdMsDd/properties/temperature). Clients receive the follow-up topic notifications on that socket, and the messages sent by them are published on the Platform.

WebSockets Authentication

Like the REST API, the WebSockets servers expect an Authorization HTTP Header containing the appropriate API Key. This header is sent along with the initial GET request of the protocol negotiation phase. Unfortunately, the support of WebSockets in browsers does not allow to set custom HTTP headers for that request. For that reason, our native WebSockets support allows the use of a ?access_token query parameter instead of the HTTP header.

Unauthorized Subscription

An important difference between this WebSockets support and the MQTT protocol is that an unauthorized subscription fails during protocol negotiation. For example, the client sends a bad API key or uses the wrong ID in the resource path. In this case, the Platform responds to the initial GET request with a status code 403 Forbidden.


Examples

Node.js Examples

MQTT Module

The mqtt npm package allows you to easily connect to the Pub/Sub Broker in a Node.js native application. The example below shows how to authenticate and subscribe to published updates on all actions in the authenticated account:

const mqtt = require('mqtt');

const MQTT_URL = 'mqtts://mqtt.evrythng.com:8883';
const OPERATOR_API_KEY = 'AGiWrH5OteA4aHiMFiwnwF08p3PQvPIr9...';

const opts = { username: 'authorization', password: OPERATOR_API_KEY };
const client = mqtt.connect(MQTT_URL, opts);

client.on('connect', () => {
  console.log('Connected!');
  client.subscribe('/actions/all');
});

client.on('message', (topic, message) => console.log(`${topic}: ${message}`));

WebSockets Module

The following is another example of native WebSockets connection using the ws library for Node.js:

const WebSocket = require('ws');

const OPERATOR_API_KEY = 'AGiWrH5OteA4aHiMFiwnwF08p3PQvPIr9...';
const MQTT_URL = 'wss://ws.evrythng.com:443/thngs/UeECn9a8PBpar8x4FEGdMsDd/properties/temperature';

const opts = {
  headers: { authorization: API_KEY }
};
const ws = new WebSocket(MQTT_URL, opts);

ws.on('open', () => {
  // Supports the same update payload as the REST API.
  const payload = [{ value: 37.2 }];
  ws.send(JSON.stringify(payload));
});

ws.on('message', (data) => {
  // Incoming update from the topic. data can be parsed as JSON.
  console.log(`Property update: ${JSON.parse(message.data)}`);
});

Browser Examples

Recent browsers support the WebSockets protocol out of the box, but they do not allow you to set custom headers in the initial GET request. Therefore the ?access_token query parameter is used here to pass the API key. As usual, it is highly recommended to use the TLS-encrypted port 443 for connections.

WebSockets

const url = `wss://ws.evrythng.com:443/thngs/UeECn9a8PBpar8x4FEGdMsDd/properties/temperature?access_token=${API_KEY}`;
const socket = new WebSocket(url);

socket.addEventListener('message', (message) => {
  // Incoming update from the topic. message.data can be parsed as JSON.
  console.log(`Property update : ${JSON.parse(message.data)}`);
});

socket.addEventListener('open', () => {
  // Supports the same update payload than the REST API.
  const update = [{ value: 37.2 }];
  socket.send(JSON.stringify(update));
});

Another approach is to use the evrythng-pubsub.js SDK to add publish() and subscribe() methods to SDK resources. Below is a simple example:

<html>
  <head>
    <title>evrythng-ws.js example</title>
  </head>
  <body>
    See devtools console.
    <script src="//cdn.evrythng.net/toolkit/evrythng-js-sdk/evrythng-extended-4.7.2.min.js"></script>

    <script src="//unpkg.com/[email protected]/dist/mqtt.js"></script>
    <script src="//cdn.evrythng.com/toolkit/evrythng-js-sdk/evrythng-ws-2.1.0.min.js"></script>

    <script src="./script.js"></script>
  </body>
</html>
const OPERATOR_API_KEY = '$OPERATOR_API_KEY';
const THNG_ID = '$THNG_ID';
const PROPERTY_KEY = '$PROPERTY_KEY';
const INTERVAL_MS = 2000;

// PubSub is declared after import from <script> tag
evrythng.use(PubSub);

const operator = new evrythng.Operator(OPERATOR_API_KEY);

const updateProperty = () => {
  const payload = [{ value: Date.now() }];
  operator.thng(THNG_ID).property(PROPERTY_KEY)
    .update(payload)
    .then(data => console.log(`Sent ${JSON.stringify(data)}`))
    .catch(console.error);
};

const onMessage = (data) => {
  console.log(`Received ${JSON.stringify(data)}`);

  setTimeout(updateProperty, INTERVAL_MS);
};

const main = () => {
  operator.thng(THNG_ID).property(PROPERTY_KEY)
    .subscribe(onMessage)
    .catch(console.error);

  setTimeout(updateProperty, INTERVAL_MS);
};

main();

MQTT over Websockets

The mqttws31.js library, available from our CDN, allows you to connect using MQTT over a WebSocket connection. This is shown in the example below:

const RESOURCE = `/thngs/UMVm3wUXMGPYEqwaanXVsAbr/properties`;
const DEVICE_API_KEY = '';

const connectToBroker = () => {
  // Connect with client ID
  const client = new Messaging.Client('ws.evrythng.com', 443, `client_${Date.now()}`);
  client.startTrace();
  
  // Print messages received
  client.onMessageArrived = (message) => {
    console.log(`Received ${message.payloadString}`);
  };

  console.log('Connecting...');
  client.connect({
    useSSL: true,
    onSuccess: () => {
      console.log('Connected successfully!');
      
      // Authenticate with access_token parameter
      client.subscribe(`${RESOURCE}?access_token=${DEVICE_API_KEY}`);
    },
  });
};

connectToBroker();

Mosquitto Examples

The Mosquitto client can be used to publish and subscribe to individual topics. The general form is as follows (where client.pem is the certificate file:

mosquitto_sub -h mqtt.evrythng.com -p 8883 --cafile client.pem \
  -u authorization -P $API_KEY \
  -t $TOPIC
mosquitto_pub -h mqtt.evrythng.com -p 8883 --cafile client.pem \
  -u authorization -P $API_KEY \
  -t $TOPIC -m $PAYLOAD

Include the pubStates=1 query parameter in the topic to receive the initial state of the topic. For example, a Thng property:

mosquitto_sub -h mqtt.evrythng.com -p 8883 --cafile client.pem \
  -u authorization -P $OPERATOR_API_KEY \
  -t /thngs/UVdeTmsbPeKay6amTyNwXtNd/properties/temperature?pubStates=1

Eclipse Paho Examples

The Eclipse Paho MQTT client allows you to publish and subscribe to the Pub/Sub Broker using Java. The examples below use the org.json package to manage JSON objects natively, which is the recommended approach to avoid manually encoding payload strings.

Authentication

The MQTT username and password fields are provided via a MqttConnectOptions object when constructing a MqttClient object:

final String MQTT_URL = "ssl://mqtt.evrythng.com:8883";
final String API_KEY = "AGiWrH5OteA4aHiMFiwnwF08p3PQvPI...";

final MqttClient client = new MqttClient(MQTT_URL, 
                                         MqttClient.generateClientId(), 
                                         new MemoryPersistence());

final MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("authorization");
options.setPassword(API_KEY.toCharArray());
client.connect(options);

Subscriptions

Add a subscription using the subscribe() method belonging to the previously created MqttClient object. For example, a subscription for all created actions:

final String topic = "/actions/all";

client.subscribe(topic, (s, message) -> {
  System.out.println("Message: " + message.toString());
});

Publications

Publish a message to a topic using the publish() method belonging to the previously created MqttClient object. For example, to update a Thng property:

final String THNG_ID = "UHtfD4hsBXsa9paRwFbXqkxh";
final String PROPERTY_KEY = "temperature_celsius";
final String topic = "/thngs/" + THNG_ID + "/properties/" + PROPERTY_KEY;

final JSONArray payload = new JSONArray();
payload.put(new JSONObject()
    .put("value", Math.round(Math.random() * 10000))
    .put("timestamp", System.currentTimeMillis()));
final MqttMessage message = new MqttMessage(payload.toString().getBytes());
client.publish(topic, message);