Stand with Ukraine flag
Try it now Pricing
Professional Edition
Getting Started
Devices Library Guides Installation Architecture API FAQ
On this page

MQTT Integration

Doc info icon
ThingsBoard PE Feature

Only Professional Edition supports Platform Integrations feature.
Use ThingsBoard Cloud or install your own platform instance.

MQTT Integration allows to connect to external MQTT brokers, subscribe to data streams from those brokers and convert any type of payload from your devices to ThingsBoard message format. Its typical use is whenever your devices are already connected to external MQTT broker or any other IoT platform or connectivity provider with MQTT based back-end.

Please review the integration diagram to learn more.

ThingsBoard MQTT Integration acts as an MQTT client. It subscribes to topics and converts the data into telemetry and attribute updates. In case of downlink message, MQTT integration converts it to the device-suitable format and pushes to external MQTT broker. Pay attention: MQTT broker should be either co-located with ThingsBoard instance or deployed in the cloud and have a valid DNS name or static IP address. ThingsBoard instance that is running in the cloud can’t connect to the MQTT broker deployed in local area network.

MQTT Integration Configuration

In this tutorial, we will configure MQTT Integration to provide devices connection to the Platform and ability to send RPC commands to devices.

Prerequisites

In this tutorial, we will use:

  • The instance of ThingsBoard Professional Edition installed locally;

  • MQTT broker, accessible by ThingsBoard PE instance — broker.hivemq.com (port 1883);
  • mosquitto_pub and mosquitto_sub MQTT clients to send and receive messages;
  • an advanced device simulator for RPC simulation example.

Let’s assume that we have a sensor which is sending current temperature readings. Our sensor device SN-001 publishes it’s temperature readings to ‘tb/mqtt-integration-tutorial/sensors/SN-001/temperature’ and it is subscribed to ‘tb/mqtt-integration-tutorial/sensors/SN-001/rx’ to receive RPC calls.

ThingsBoard setup

Before setting up an MQTT integration, you need to create uplink and downlink converters.

Uplink converter is a script for parsing and transforming the data received by MQTT integration.

Downlink converter parses and transforms the data sent from ThingsBoard to the format that is consumed by existing device(s).

The purpose of the decoder function is to parse the incoming data and metadata to a format that ThingsBoard can consume. deviceName and deviceType are required, while attributes and telemetry are optional. Attributes and telemetry are flat key-value objects. Nested objects are not supported.

To create an uplink converter go to the Integrations center -> Data converters page and click “plus” button. Name it “MQTT Uplink Converter” and select type Uplink. Use debug mode for now.

Doc info icon

The Debug mode is very useful for development and troubleshooting. However, having it on all the time can significantly increase the disk space used by the database since all the debug data is stored there.
Therefore, starting from version 3.9, ThingsBoard stores all integrations debug events for the first 15 minutes. After that, only failure events are retained. These settings can be combined or completely disabled.

One can use either TBEL (ThingsBoard expression language) or JavaScript to develop user defined functions. We recommend utilizing TBEL as it’s execution in ThingsBoard is much more efficient compared to JS.

Now copy & paste the following script to the Decoder function section:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/** Decoder **/

// decode payload to string
var payloadStr = decodeToString(payload);
var data = JSON.parse(payloadStr);

var deviceName =  metadata.topic.split("/")[3];
// decode payload to JSON
var deviceType = 'sensor';

// Result object with device attributes/telemetry data
var result = {
    deviceName: deviceName,
    deviceType: deviceType,
    attributes: {
        integrationName: metadata['integrationName'],
    },
    telemetry: {
        temperature: data.value,
    }
};

/** Helper functions 'decodeToString' and 'decodeToJson' are already built-in **/

return result;

image

Now copy & paste the following script to the Decoder function section:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/** Decoder **/

// decode payload to string
var payloadStr = decodeToString(payload);
var data = JSON.parse(payloadStr);

var deviceName =  metadata.topic.split("/")[3];
// decode payload to JSON
var deviceType = 'sensor';

// Result object with device attributes/telemetry data
var result = {
    deviceName: deviceName,
    deviceType: deviceType,
    attributes: {
        integrationName: metadata['integrationName'],
    },
    telemetry: {
        temperature: data.value,
    }
};

/** Helper functions **/

function decodeToString(payload) {
    return String.fromCharCode.apply(String, payload);
}

function decodeToJson(payload) {
    // convert payload to string.
    var str = decodeToString(payload);

    // parse string to JSON
    var data = JSON.parse(str);
    return data;
}

return result;

image

The Downlink converter transforming outgoing RPC message and then the Integration sends it to external MQTT broker.

Doc info icon

NOTE:
Even if you won’t send downlink RPC, you still need to create a dummy Downlink converter.

Create another converter with the name “MQTT Downlink Converter” and type Downlink. Leave the default script and click Add.

One can use either TBEL (ThingsBoard expression language) or JavaScript to develop user defined functions. We recommend utilizing TBEL as it’s execution in ThingsBoard is much more efficient compared to JS.

Now copy & paste the following script to the Encoder function section:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Encode downlink data from incoming Rule Engine message

// msg - JSON message payload downlink message json
// msgType - type of message, for ex. 'ATTRIBUTES_UPDATED', 'POST_TELEMETRY_REQUEST', etc.
// metadata - list of key-value pairs with additional data about the message
// integrationMetadata - list of key-value pairs with additional data defined in Integration executing this converter

/** Encoder **/

var data = {};

// Process data from incoming message and metadata

data.tempFreq = msg.temperatureUploadFrequency;
data.humFreq = msg.humidityUploadFrequency;

data.devSerialNumber = metadata['ss_serialNumber'];

// Result object with encoded downlink payload
var result = {

    // downlink data content type: JSON, TEXT or BINARY (base64 format)
    contentType: "JSON",

    // downlink data
    data: JSON.stringify(data),

    // Optional metadata object presented in key/value format
    metadata: {
        topic: metadata['deviceType']+'/'+metadata['deviceName']+'/upload'
    }

};

return result;

image

Now copy & paste the following script to the Encoder function section:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Encode downlink data from incoming Rule Engine message

// msg - JSON message payload downlink message json
// msgType - type of message, for ex. 'ATTRIBUTES_UPDATED', 'POST_TELEMETRY_REQUEST', etc.
// metadata - list of key-value pairs with additional data about the message
// integrationMetadata - list of key-value pairs with additional data defined in Integration executing this converter

/** Encoder **/

var data = {};

// Process data from incoming message and metadata

data.tempFreq = msg.temperatureUploadFrequency;
data.humFreq = msg.humidityUploadFrequency;

data.devSerialNumber = metadata['ss_serialNumber'];

// Result object with encoded downlink payload
var result = {

    // downlink data content type: JSON, TEXT or BINARY (base64 format)
    contentType: "JSON",

    // downlink data
    data: JSON.stringify(data),

    // Optional metadata object presented in key/value format
    metadata: {
        topic: metadata['deviceType']+'/'+metadata['deviceName']+'/upload'
    }

};

return result;

image

MQTT Integration Setup

  • Go to the Integrations center -> Integrations page and click “plus” icon to add a new integration. Name it “MQTT Integration”, select type MQTT;

image

  • The next steps is to add the recently created uplink and downlink converters;

image

image

  • Specify host: broker.hivemq.com and port: 1883 at the connection step;

  • Add a topic filter:

1
tb/mqtt-integration-tutorial/sensors/+/temperature
  • You can also select an MQTT QoS level. We use MQTT QoS level 0 (At most once) by default;

image

  • Go to the advanced settings. It is better to uncheck the Clean session parameter. Many brokers do not support sticky sessions, so will silently close the connection if you try to connect with this option enabled;

  • Let’s leave the Downlink topic pattern by default, meaning that the Integration will take the metadata.topic and use it as the downlink topic;

image

  • [Optional] Click on Check connection button to check connection to your Service Bus topic. Click Add button to create the integration.

image

Now let’s simulate the device sending a temperature reading to the integration.

Use terminal for will send a message with temperature readings in a simple format: {"value":25.1}:

1
mosquitto_pub -h broker.hivemq.com -p 1883 -t "tb/mqtt-integration-tutorial/sensors/SN-001/temperature" -m '{"value":25.1}'

image

After you sent uplink message a new device was created. You should receive a notification about it. To view notification click on the bell icon in the upper right corner of the screen. The notification will contain link to created device. Go to this device.

image

Here you will see information about the new device. As well as the telemetry which we sent to the device.

image

Learn more about notifications and how to configure them here.

Go back to your Integrations page and navigate to the Events tab. There you’ll see the message consumed by the MQTT Integration.

image

On Events tab of your MQTT Uplink Converter there will be “In”, “Out”, and “Metadata” columns. The “In” and “Metadata” are the input for the data converter, and “Out” is the result.

image


Summary: the Uplink Data Converter defines provisioning of device and interpreting the input data. In our example we capture the device name from the topic (SN-001), set the default device type (sensor) and populate a telemetry value to it. In more complex cases you can write a script that will take this data from any part of data or metadata.

Send One-Way RPC to Device

This section describes how to send a one-way RPC request to the device using Control Widgets.

  • Go to the Dashboards page and create a new dashboard named MQTT RPC. Open the dashboard add an alias by clicking on Entity aliases icon;
  • Name the alias (Sensor, for example), select filter type Single Entity, type Device and choose our SN-001 sensor. Press Add and then Save;
  • Now add a new widget, select the Control Widgets bundle from drop down menu and select the Knob Control widget;
  • On the Data field select created alias (Sensor). Set Number of digits after floating point to 0;
  • Go to Advanced tab and set Minimum value to 15 and Maximum value to 45. Leave the rest by default. Click Add to create widget;
  • Save changes.

Now go to the Rule Chains page and open Root Rule Chain. Double-click on message type switch node and enable the Debug mode on it.

Now go back to your dashboard and turn knob a couple of times.

image

In the message type switch node on the Events tab you should then see incoming messages with the message type RPC_CALL_FROM_SERVER_TO_DEVICE and relation type RPC Request to Device. You can check out what data and metadata was sent by the Knob Control to the Rule Engine.

To make sensor performing this command you need RPC Request to Device type messages to be forwarded to the downlink data converter. In the Root Rule Chain editor find the integration downlink node, drag and drop it to the rule chain. Name it MQTT Integration Downlink, select our MQTT Integration and click Add. Drag a connection from Message Type Switch node to MQTT Integration Downlink node with label “RPC Request to Device” and click add.

Go to the Data converters page, open your “MQTT Downlink Converter” and replace the default script with this one:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** Encoder **/

var value = parseInt(msg.params.replaceAll("[\"]",""));
var data = {value: value};
// Result object with encoded downlink payload
var result = {

    // downlink data content type: JSON, TEXT or BINARY (base64 format)
    contentType: "JSON",

    // downlink data
    data: JSON.stringify(data),

    // Optional metadata object presented in key/value format
    metadata: {
        topic: 'tb/mqtt-integration-tutorial/sensors/'+metadata['deviceName']+'/rx'
    }

};

return result;

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** Encoder **/

var value = parseInt(msg.params.replace(/"/g,""));
var data = {value: value};
// Result object with encoded downlink payload
var result = {

    // downlink data content type: JSON, TEXT or BINARY (base64 format)
    contentType: "JSON",

    // downlink data
    data: JSON.stringify(data),

    // Optional metadata object presented in key/value format
    metadata: {
        topic: 'tb/mqtt-integration-tutorial/sensors/'+metadata['deviceName']+'/rx'
    }

};

return result;

image

The script above removes quotes from msg.params value, which comes as quoted string, and parses it to integer. Then it builds a result object which is passed to the Integration. The result object structure should be followed: the data (the message payload sent to the external MQTT broker as-is) and the metadata (is used by Integrationin). As mentioned: the Integration downlink topic is configured to ${topic}, which means that the integration will take the metadata.topic and use it as the downlink topic.

Open the terminal window and execute the following command:

1
mosquitto_sub -h broker.hivemq.com -p 1883 -t "tb/mqtt-integration-tutorial/sensors/+/rx"

image

Go to the dashboard and turn the wheel again. In your terminal window you should receive series of the incoming messages sent by thew knob control widget:

1
2
{"value":33}
{"value":42}

image

image

Simulating of Two-Way RPC

Now try to simulate sending an RPC request to the device with recieving a response.

First you should modify converters to send downlink messages to ‘tb/mqtt-integration-tutorial/sensors/+/rx/twoway’ topic and receive device responses on tb/mqtt-integration-tutorial/sensors/+/rx/response’ topic.

Change downlink converter code to send messages to ‘tb/mqtt-integration-tutorial/sensors/+/rx/twoway’ topic. Go to the “MQTT Downlink Converter” and change the code on line 16 to

1
topic: 'tb/mqtt-integration-tutorial/sensors/'+metadata['deviceName']+'/rx/twoway'

Or just paste the following code in the encoder window:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** Encoder **/

var value = parseInt(msg.params.replaceAll("[\"]",""));
var data = {value: value};
// Result object with encoded downlink payload
var result = {

    // downlink data content type: JSON, TEXT or BINARY (base64 format)
    contentType: "JSON",

    // downlink data
    data: JSON.stringify(data),

    // Optional metadata object presented in key/value format
    metadata: {
        topic: 'tb/mqtt-integration-tutorial/sensors/'+metadata['deviceName']+'/rx/twoway'
    }

};

return result;

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** Encoder **/

var value = parseInt(msg.params.replace(/"/g,""));
var data = {value: value};
// Result object with encoded downlink payload
var result = {

    // downlink data content type: JSON, TEXT or BINARY (base64 format)
    contentType: "JSON",

    // downlink data
    data: JSON.stringify(data),

    // Optional metadata object presented in key/value format
    metadata: {
        topic: 'tb/mqtt-integration-tutorial/sensors/'+metadata['deviceName']+'/rx/twoway'
    }

};

return result;

image

Then prepare the uplink converter to receive the response messages. Go to the “MQTT Uplink Converter” and paste the following code in the decoder window:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/** Decoder **/

// decode payload to string
var payloadStr = decodeToString(payload);
var data = JSON.parse(payloadStr);
var deviceName =  metadata.topic.split("/")[3];
// decode payload to JSON
var deviceType = 'sensor';

// Result object with device attributes/telemetry data
var telemetry;
if (metadata.topic.endsWith('/temperature')) {
    // Transform the incoming data as before
    telemetry = getTemperatureTelemetry(data);
} else if (metadata.topic.endsWith('/rx/response')) {
    // Get the input value as is
    telemetry = data;
}

var result = {
    deviceName: deviceName,
    deviceType: deviceType,
    attributes: {
        integrationName: metadata['integrationName'],
    },
    telemetry: telemetry
};

/** Helper functions 'decodeToString' and 'decodeToJson' are already built-in **/

return result;

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/** Decoder **/

// decode payload to string
var payloadStr = decodeToString(payload);
var data = JSON.parse(payloadStr);
var deviceName =  metadata.topic.split("/")[3];
// decode payload to JSON
var deviceType = 'sensor';

// Result object with device attributes/telemetry data
var telemetry;
if (metadata.topic.endsWith('/temperature')) {
    // Transform the incoming data as before
    telemetry = getTemperatureTelemetry(data);
} else if (metadata.topic.endsWith('/rx/response')) {
    // Get the input value as is
    telemetry = data;
}

var result = {
    deviceName: deviceName,
    deviceType: deviceType,
    attributes: {
        integrationName: metadata['integrationName'],
    },
    telemetry: telemetry
};

/** Helper functions **/

function getTemperatureTelemetry(data) {
    return {temperature: data.value}
}

function decodeToString(payload) {
    return String.fromCharCode.apply(String, payload);
}

function decodeToJson(payload) {
    // covert payload to string.
    var str = decodeToString(payload);

    // parse string to JSON
    var data = JSON.parse(str);
    return data;
}

return result;

image

The script above is slightly different from what we had initially. It distinguishes between Post Telemetry requests and RPC call Responses, thus publishing different kind of output to Rule Engine.

You also must add a topic filter in your integration to receive the RPC response messages: MQTT Integration -> Topic filters -> Add topic filter. Add this topic with the default QoS level:

1
tb/mqtt-integration-tutorial/sensors/+/rx/response

Apply changes.

image

Now run device emulator. Notice, the mosquitto_pub and mosquitto_sub is not suffice, so please launch an advanced simulator:

1
python mqtt-client.py

image

Try to turn the knob wheel on a dashboard. In the terminal window you should have an output similar to :

1
2
3
4
5
Incoming message
Topic: tb/mqtt-integration-tutorial/sensors/SN-001/rx/twoway
Message: {"value":40}
This is a Two-way RPC call. Going to reply now!
Sending a response message: {"rpcReceived":"OK"}

image

Go to the Devices page and find rpcReceived telemetry value is “OK” on the Latest telemetry tab of your SN-001 device.

image

Video tutorials

Setting up MQTT Integration

This video is a step-by-step tutorial on setting up of MQTT Integration.


Configure RPC request to the device

For your convenience you may follow this video to configure RPC request to device and receive simulated response via MQTT Integration.


Next steps