Pub/Sub Broker (legacy)

In addition to our standard REST API, we also provide a Pub/Sub Broker where applications and devices can push and receive notifications (property updates, actions created, etc.) from the Platform. The broker currently supports MQTT and WebSockets access.

The client connects to the broker and publishes/subscribes on different topics, which are the Thngs and products it can access. The client is then notified via push notifications of any change happening on these resources, properties, Thngs and locations, and actions. Similarly, the client can use the same topic channel to publish new values, for example updating the properties or location of Thngs and products, or creating new actions.

See the Examples section for example snippets for browsers, Node.js, Mosquitto, and Eclipse Paho.

📘

Note

If a role's permissions are changed while a user with that role has an open MQTT connection, the permission changes will not apply for that user until they reconnect.


API Status
Stable:
mqtts://mqtt.evrythng.com
wss://ws.evrythng.com/mqtt


## Cross-Protocol Communication

The broker supports cross-protocol delivery and subscription. This means that you can publish a message with one protocol (say MQTT) and your message will get delivered through all available protocols (i.e., MQTT, REST, WebSockets) to the listening/polling clients.

The data contained in your messages also gets recorded in the EVRYTHNG Platform so you can always access the latest and historical values from our dashboard or API.


Available Topics

The topics and the JSON payloads of subscription or publication messages are exactly the same as for our REST API, allowing you to easily swap between one protocol and another.

The list below details which topics are available for publishing to, and when subscription notifications will occur.

  • /actions

    • Publish - Create an action type.
    • Subscribe - An action type is created.
  • /actions/all

    • Publish - Create an action of any type.
    • Subscribe - An action of any type is created.
  • /actions/:type

    • Publish - Create an action of this type.
    • Subscribe - An action of this type is created, or the action type is deleted.
  • /batches/:batchId/tasks

    • Subscribe - A batch task is completed.
  • /batches/:batchId/tasks/:taskId

    • Subscribe - A task is completed on this batch.
  • /collections/:collectionId

    • Publish - Update this collection.
    • Subscribe - This collection is updated or deleted.
  • /collections/:collectionId/actions/:type

    • Publish - Create an action of this type on this collection.
    • Subscribe - An action of this type is created on this collection.
  • /collections/:collectionId/actions/all

    • Publish - Create an action of any type on this collection.
    • Subscribe - An action of any type is created on this collection.
  • /collections/:collectionId/thngs

    • Publish - Add Thngs to this collection.
    • Subscribe - Thngs are added or removed from this collection, or it is emptied.

📘

Note

For the /collections/:collectionId/thngs topic, subscription messages will be of the following format:

{
    "operation": <operation>,
    "timestamp": <timestamp>,
    "resources": [ <thngId>, ... ]
}

where <operation> is a value from: thngsAdded, thngsRemoved, thngsCleared (i.e: all Thngs were removed).

  • /products/:productId/actions/:type

    • Publish - Create an action of this type on this product.
    • Subscribe - An action of this type is created on this product.
  • /products/:productId/actions/all

    • Publish - Create an action of any type on this product.
    • Subscribe - An actio of any type is created on this product.
  • /products/:productId/properties

    • Publish - Create or update this product's properties.
    • Subscribe - This product's properties are created or updated.
  • /products/:productId/properties/:key

    • Publish - Create or update a property of this product.
    • Subscribe - A property of this product is created or updated.
  • /thngs/:thngId

    • Publish - Update this Thng's metadata (excludes properties).
    • Subscribe - This Thng's properties are updated (excludes properties).
  • /thngs/:thngId/actions/:type

    • Publish - Create an action of this type on this Thng.
    • Subscribe - An action of this type is created on this Thng.
  • /thngs/:thngId/actions/all

    • Publish - Create an action of any type on this Thng.
    • Subscribe - An action of any type is created on this Thng.
  • /thngs/:thngId/location

    • Publish - Create or update this Thng's location.
    • Subscribe - This Thng's location is created or updated.
  • /thngs/:thngId/properties

    • Publish - Create or update this Thng's properties.
    • Subscribe - This Thng's properties are created or updated.
  • /thngs/:thngId/properties/:key

    • Publish - Create or update a property of this Thng.
    • Subscribe - A property of this Thng is created or updated.

Topic options

The following subscribe options are supported in the topic string. Options are passed in query parameters, similar to URIs.

  • project - String
    Just like with our REST API, scopes the subscription to the given project.

  • pubStates - Integer
    If set to 1, clients receive immediately an initial notification containing the last value of the topic, which is directly published by the server (i.e., latest property, location or action value).


MQTT

Host and Ports

Host: mqtt.evrythng.com
Protocol: mqtts
Ports:

  • 8883 (TLS) which is the standard MQTTS port. This port is strongly recommended as it keeps your credentials safe.
  • 443 is also supported as an alternate TLS port, in case any actor of your network infrastructure blocks the official MQTTS port.

QoS

The following QoS (Quality of Service) levels are supported for both pub and sub messages:

QoS 0 - No guarantee on message delivery.
QoS 1 - Each message is delivered at least once to all subscribers.

Authentication

🚧

Note

For the secure calls in an embedded device or non-browser environment, you will need to download our public certificate. You can find a link to download this certificate on the Security page.

To authorise, send an API key in the CONNECT message of MQTT 3.1: set the User Name field to the string "authorization", and the Password field to the API key for the MQTT session.

📘

Note

As for the REST API, you need to make sure the API key you use is authorized to publish (PUT or POST) or subscribe (GET) to any given resource. Please refer to API Key Scopes and Permissions to find out which key can access which resource.

Username: authorization
Password: $API_KEY

❗️

Security warning

For production applications always prefer MQTT secured with TLS (MQTTS).
Any other way will transmit your credentials in plain text.

MQTT Compatibility

The broker implements the MQTT Version 3.1.1 specification, so you can use any MQTT compliant library to communicate with it. A good place to find one for your device is the Eclipse Paho project.

A topic in the context of the broker is a URI-like string that identifies the resource the same way our REST API does.

For example, a topic for a specific Thng's properties: /thngs/:thngId/properties

See available topics for a full list.

Notes

  • By design of the MQTT protocol, there is no failure notification to report unsuccessful subscriptions. If a client subscribes with an invalid API key and/or invalid topic, the client will not be notified of the failure and no PUBLISH message will arrive.
  • Since the REST resources are identified in the same way as the topics, use URL safe characters and the appropriate escape sequence for non-safe characters. E.g., use %20 instead of + to encode spaces.
  • The MQTT client is expected to keep the TCP/IP connection open by sending PINGREQ messages regularly. We currently support keepalive times of up to 120 seconds. This functionality should be supported out of the box by any MQTT client library.
  • If ?pubStates is used, the initial PUBLISH message with the current topic value may not arrive in order. It may also be duplicated by a regular update. Clients should rely on the identifiers and timestamps of the data received.
  • If ?pubStates is used, the initial PUBLISH message does not have the Retain flag set.
  • If ?pubStates is used, and the connection is being recovered for any reason, the clients receive initial PUBLISH messages again with the current value of the topic.

WebSockets

The broker provides the ability to subscribe to push notifications using WebSockets. There are two different WebSockets supported:

MQTT over WebSockets

The MQTT over WebSockets connection wraps the MQTT protocol commands into a WebSockets TCP connection. It behaves the exact same way than the MQTT support described above.

In order to use MQTT over WebSockets you need the mqttws31.js library, available here from our CDN.

Use the following address to access to the MQTT service over the WebSockets channel. The request is made on port 443 (HTTPS) to the following address:

wss://ws.evrythng.com/mqtt

The advantage of using the MQTT over WebSockets library instead of native WebSockets is that it allows you to subscribe to multiple topics using a single TCP connection.

A quickstart example is provided in our GitHub repository.

Native WebSockets

If for any reason it is not suitable to use the MQTT over WebSockets library, we provide native WebSockets support on the same host. This allows to open a WebSocket on a topic resource directly. (e.g. wss://ws.evrythng.com/thngs/UeECn9a8PBpar8x4FEGdMsDd/properties/temperature). The clients receive the subsequent topic notifications on that socket, and the messages sent by them are published on the Platform.

WebSockets Authentication

Like the REST API, the WebSockets servers expect an Authorization HTTP Header containing the appropriate API Key. This header is sent along with the initial GET request of the protocol negotiation phase. Unfortunately, the support of WebSockets in browsers does not allow to set custom HTTP headers for that request. For that reason our native WebSockets support allows the use of a ?access_token query parameter instead of the HTTP header.

Unauthorized Subscription

An important difference between this WebSockets support and the MQTT protocol is that an unauthorized subscription fails during protocol negotiation. E.g: The client sends a bad API key, or uses a wrong ID in the resource path. In this case the Platform responds to the initial GET request with a status code 403 Forbidden.


## Examples

Node.js Examples

MQTT Module

The mqtt npm package allows you to easily connect to the Pub/Sub Broker in a Node.js native application. The example below shows how to authenticate and subscribe to published updates on all actions in the authenticated account:

const mqtt = require('mqtt');

const MQTT_URL = 'mqtts://mqtt.evrythng.com:8883';
const OPERATOR_API_KEY = 'AGiWrH5OteA4aHiMFiwnwF08p3PQvPIr9...';

const opts = { username: 'authorization', password: OPERATOR_API_KEY };
const client = mqtt.connect(MQTT_URL, opts);

client.on('connect', () => {
  console.log('Connected!');
  client.subscribe('/actions/all');
});

client.on('message', (topic, message) => console.log(`${topic}: ${message}`));

WebSockets Module

The following is another example of native WebSockets connection using the ws library for Node.js:

const WebSocket = require('ws');

const OPERATOR_API_KEY = 'AGiWrH5OteA4aHiMFiwnwF08p3PQvPIr9...';
const MQTT_URL = 'wss://ws.evrythng.com:443/thngs/UeECn9a8PBpar8x4FEGdMsDd/properties/temperature';

const opts = {
  headers: { authorization: API_KEY }
};
const ws = new WebSocket(MQTT_URL, opts);

ws.on('open', () => {
  // Supports the same update payload as the REST API.
  const payload = [{ value: 37.2 }];
  ws.send(JSON.stringify(payload));
});

ws.on('message', (data) => {
  // Incoming update from the topic. data can be parsed as JSON.
  console.log(`Property update: ${JSON.parse(message.data)}`);
});

Browser Examples

Recent browsers support the WebSockets protocol out of the box, but they do not allow you to set custom headers in the initial GET request. Therefore the ?access_token query parameter is used here to pass the API key. As usual, it is highly recommended to use the TLS-encrypted port 443 for connections.

WebSockets

const url = `wss://ws.evrythng.com:443/thngs/UeECn9a8PBpar8x4FEGdMsDd/properties/temperature?access_token=${API_KEY}`;
const socket = new WebSocket(url);

socket.addEventListener('message', (message) => {
  // Incoming update from the topic. message.data can be parsed as JSON.
  console.log(`Property update : ${JSON.parse(message.data)}`);
});

socket.addEventListener('open', () => {
  // Supports the same update payload than the REST API.
  const update = [{ value: 37.2 }];
  socket.send(JSON.stringify(update));
});

Another approach is to use the evrythng-pubsub.js SDK to add publish() and subscribe() methods to SDK resources. Below is a simple example:

<html>
  <head>
    <title>evrythng-ws.js example</title>
  </head>
  <body>
    See devtools console.
    <script src="//cdn.evrythng.net/toolkit/evrythng-js-sdk/evrythng-extended-4.7.2.min.js"></script>

    <script src="//unpkg.com/[email protected]/dist/mqtt.js"></script>
    <script src="//cdn.evrythng.com/toolkit/evrythng-js-sdk/evrythng-ws-2.1.0.min.js"></script>

    <script src="./script.js"></script>
  </body>
</html>
const OPERATOR_API_KEY = '$OPERATOR_API_KEY';
const THNG_ID = '$THNG_ID';
const PROPERTY_KEY = '$PROPERTY_KEY';
const INTERVAL_MS = 2000;

// PubSub is declared after import from <script> tag
evrythng.use(PubSub);

const operator = new evrythng.Operator(OPERATOR_API_KEY);

const updateProperty = () => {
  const payload = [{ value: Date.now() }];
  operator.thng(THNG_ID).property(PROPERTY_KEY)
    .update(payload)
    .then(data => console.log(`Sent ${JSON.stringify(data)}`))
    .catch(console.error);
};

const onMessage = (data) => {
  console.log(`Received ${JSON.stringify(data)}`);

  setTimeout(updateProperty, INTERVAL_MS);
};

const main = () => {
  operator.thng(THNG_ID).property(PROPERTY_KEY)
    .subscribe(onMessage)
    .catch(console.error);

  setTimeout(updateProperty, INTERVAL_MS);
};

main();

MQTT over Websockets

The mqttws31.js library, available here from our CDN allows you to connect using MQTT over a WebSocket connection. This is shown in the example below:

const RESOURCE = `/thngs/UMVm3wUXMGPYEqwaanXVsAbr/properties`;
const DEVICE_API_KEY = '';

const connectToBroker = () => {
  // Connect with client ID
  const client = new Messaging.Client('ws.evrythng.com', 443, `client_${Date.now()}`);
  client.startTrace();
  
  // Print messages received
  client.onMessageArrived = (message) => {
    console.log(`Received ${message.payloadString}`);
  };

  console.log('Connecting...');
  client.connect({
    useSSL: true,
    onSuccess: () => {
      console.log('Connected successfully!');
      
      // Authenticate with access_token parameter
      client.subscribe(`${RESOURCE}?access_token=${DEVICE_API_KEY}`);
    },
  });
};

connectToBroker();

### Mosquitto Examples

The Mosquitto client can be used to publish and subscribe to individual topics. The general from is as follows (where client.pem is the certificate file:

mosquitto_sub -h mqtt.evrythng.com -p 8883 --cafile client.pem \
  -u authorization -P $API_KEY \
  -t $TOPIC
mosquitto_pub -h mqtt.evrythng.com -p 8883 --cafile client.pem \
  -u authorization -P $API_KEY \
  -t $TOPIC -m $PAYLOAD

Include the pubStates=1 query parameter in the topic to receive the initial state of the topic. For example, a Thng property:

mosquitto_sub -h mqtt.evrythng.com -p 8883 --cafile client.pem \
  -u authorization -P $OPERATOR_API_KEY \
  -t /thngs/UVdeTmsbPeKay6amTyNwXtNd/properties/temperature?pubStates=1

Eclipse Paho Examples

The Eclipse Paho MQTT client allows you to publish and subscribe to the Pub/Sub Broker using Java. The examples below use the org.json package to manage JSON objects natively, which is the recommended approach to avoid manually encoding payload strings.

Authentication

The MQTT username and password fields are provided via a MqttConnectOptions object when constructing a MqttClient object:

final String MQTT_URL = "ssl://mqtt.evrythng.com:8883";
final String API_KEY = "AGiWrH5OteA4aHiMFiwnwF08p3PQvPI...";

final MqttClient client = new MqttClient(MQTT_URL, 
                                         MqttClient.generateClientId(), 
                                         new MemoryPersistence());

final MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("authorization");
options.setPassword(API_KEY.toCharArray());
client.connect(options);

Subscriptions

Add a subscription using the subscribe() method belonging to the previously created MqttClient object. For example, a subscription for all created actions:

final String topic = "/actions/all";

client.subscribe(topic, (s, message) -> {
  System.out.println("Message: " + message.toString());
});

Publications

Publish a message to a topic using the publish() method belonging to the previously created MqttClient object. For example, updating a Thng property:

final String THNG_ID = "UHtfD4hsBXsa9paRwFbXqkxh";
final String PROPERTY_KEY = "temperature_celsius";
final String topic = "/thngs/" + THNG_ID + "/properties/" + PROPERTY_KEY;

final JSONArray payload = new JSONArray();
payload.put(new JSONObject()
    .put("value", Math.round(Math.random() * 10000))
    .put("timestamp", System.currentTimeMillis()));
final MqttMessage message = new MqttMessage(payload.toString().getBytes());
client.publish(topic, message);