Device Connectivity
This guide explains how to connect IoT devices to the De. platform using MQTT protocol.
MQTT Protocol Overview
MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol designed for constrained devices and low-bandwidth, high-latency networks. It's the primary communication protocol used by De.IoTB for device-to-cloud and cloud-to-device messaging.
Key MQTT Features
- Publish/Subscribe Model: Devices publish data to topics and subscribe to receive messages
- Quality of Service (QoS) levels:
- QoS 0: At most once (fire and forget)
- QoS 1: At least once (guaranteed delivery)
- QoS 2: Exactly once (guaranteed delivery with deduplication)
- Last Will and Testament: Automatic status updates when devices disconnect abnormally
- Retained Messages: Store the last known value on a topic
- Persistent Sessions: Restore subscriptions and missed messages on reconnect
- Topic-based Filtering: Fine-grained control over message routing
Broker Connection Information
MQTT Broker Endpoints
| Protocol | Hostname | Port | Description |
|---|---|---|---|
| MQTT | broker.dedot.io | 1883 | Unencrypted TCP |
| MQTTS | broker.dedot.io | 8883 | TLS/SSL encrypted |
| WS | broker.dedot.io | 8083 | WebSocket |
| WSS | broker.dedot.io | 8084 | WebSocket with TLS/SSL |
Connection Options
const mqttOptions = {
// Client ID format: dedot_{namespace}_{deviceId}_{timestamp}
clientId: `dedot_${workspaceId}_${deviceId}_${Date.now()}`,
// Authentication
username: credentials.username,
password: credentials.password,
// Connection settings
clean: false, // Persistent sessions for reliability
keepalive: 60, // Heartbeat interval (seconds)
connectTimeout: 30000, // Connection timeout (ms)
// TLS/SSL Configuration (for secure connections)
rejectUnauthorized: true, // Validate server certificate
// ca: fs.readFileSync('./ca.crt'), // Custom CA if needed
// Last Will and Testament (LWT)
will: {
topic: `${workspaceId}/devices/${deviceId}/status`,
payload: 'offline',
qos: 1,
retain: true
}
}Device Authentication
De.IoTB supports two authentication methods for devices:
Token-based Authentication
Most common method for device authentication:
const mqtt = require('mqtt')
const client = mqtt.connect('mqtts://broker.dedot.io:8883', {
clientId: 'device_123',
username: deviceCredentials.username,
password: deviceCredentials.token
})Certificate-based Authentication (mTLS)
For higher security requirements:
const mqtt = require('mqtt')
const fs = require('fs')
const client = mqtt.connect('mqtts://broker.dedot.io:8883', {
clientId: 'device_123',
key: fs.readFileSync('./device_key.pem'),
cert: fs.readFileSync('./device_cert.pem'),
ca: fs.readFileSync('./ca_cert.pem'),
rejectUnauthorized: true
})Topic Structure
The De. platform uses a standardized topic structure to organize messages:
{workspace_id}/devices/{device_id}/{category}/{type}Category Types
- telemetry: Device-to-cloud data streams
- commands: Cloud-to-device instructions
- status: Device state information
- config: Device configuration
- events: Device events and alerts
Standard Topics
Device Telemetry:
ws_abc/devices/dev_123/telemetry/location
ws_abc/devices/dev_123/telemetry/sensors
ws_abc/devices/dev_123/telemetry/batteryCommand & Control:
ws_abc/devices/dev_123/commands/reboot
ws_abc/devices/dev_123/commands/config
ws_abc/devices/dev_123/commands/firmwareStatus Updates:
ws_abc/devices/dev_123/status
ws_abc/devices/dev_123/events/errorsQoS Recommendations
| Message Type | Recommended QoS | Reason |
|---|---|---|
| Critical telemetry | QoS 1 | Ensure delivery without duplicates |
| Regular sensor data | QoS 0 | Maximize throughput, occasional loss acceptable |
| Device commands | QoS 1 | Ensure command delivery |
| Critical commands | QoS 2 | Ensure exactly-once execution |
| Status updates | QoS 1 with retain flag | Ensure delivery and persistence |
| Bulk data | QoS 0 | Optimize for throughput |
Connection Examples
JavaScript (Node.js)
const mqtt = require('mqtt')
const client = mqtt.connect('mqtts://broker.dedot.io:8883', {
clientId: `device_${deviceId}_${Date.now()}`,
username: credentials.username,
password: credentials.password,
clean: false,
keepalive: 60,
will: {
topic: `${workspaceId}/devices/${deviceId}/status`,
payload: 'offline',
qos: 1,
retain: true
}
})
client.on('connect', () => {
console.log('Connected to MQTT broker')
// Publish online status
client.publish(
`${workspaceId}/devices/${deviceId}/status`,
'online',
{ qos: 1, retain: true },
(err) => {
if (!err) console.log('Online status published')
}
)
// Subscribe to command topics
client.subscribe(`${workspaceId}/devices/${deviceId}/commands/#`, { qos: 1 })
})
client.on('message', (topic, payload) => {
console.log(`Message received on topic ${topic}:`, payload.toString())
// Process command
if (topic.includes('/commands/')) {
const command = JSON.parse(payload.toString())
executeCommand(command)
}
})
client.on('error', (error) => {
console.error('Connection error:', error)
})
client.on('offline', () => {
console.log('Client went offline, attempting to reconnect')
})
// Publish telemetry data every 30 seconds
setInterval(() => {
const telemetryData = {
timestamp: Date.now(),
temperature: 23.5,
humidity: 45.2,
pressure: 1013
}
client.publish(
`${workspaceId}/devices/${deviceId}/telemetry/sensors`,
JSON.stringify(telemetryData),
{ qos: 1 },
(err) => {
if (err) console.error('Failed to publish telemetry:', err)
}
)
}, 30000)Python
import json
import time
import paho.mqtt.client as mqtt
# Callback for successful connection
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
# Publish online status with retain flag
client.publish(
f"{workspace_id}/devices/{device_id}/status",
"online",
qos=1,
retain=True
)
# Subscribe to commands
client.subscribe(f"{workspace_id}/devices/{device_id}/commands/#", qos=1)
else:
print(f"Failed to connect, return code: {rc}")
# Callback for received messages
def on_message(client, userdata, msg):
print(f"Message received on topic {msg.topic}: {msg.payload.decode()}")
# Process command
if "/commands/" in msg.topic:
try:
command = json.loads(msg.payload.decode())
process_command(command)
except json.JSONDecodeError:
print("Invalid command format")
# Set up MQTT client
client = mqtt.Client(client_id=f"device_{device_id}_{int(time.time())}")
client.username_pw_set(credentials['username'], credentials['password'])
client.on_connect = on_connect
client.on_message = on_message
# Set up Last Will and Testament
client.will_set(
f"{workspace_id}/devices/{device_id}/status",
"offline",
qos=1,
retain=True
)
# Connect to the broker
client.connect("broker.dedot.io", 8883, keepalive=60)
# Start network loop
client.loop_start()
try:
# Publish telemetry data every 30 seconds
while True:
telemetry_data = {
"timestamp": int(time.time() * 1000),
"temperature": 23.5,
"humidity": 45.2
}
client.publish(
f"{workspace_id}/devices/{device_id}/telemetry/sensors",
json.dumps(telemetry_data),
qos=1
)
time.sleep(30)
except KeyboardInterrupt:
# Clean disconnect
client.publish(
f"{workspace_id}/devices/{device_id}/status",
"offline",
qos=1,
retain=True
)
client.disconnect()
client.loop_stop()Arduino/ESP32
#include <WiFi.h>
#include <PubSubClient.h>
#include <ArduinoJson.h>
// WiFi credentials
const char* ssid = "YOUR_WIFI_SSID";
const char* password = "YOUR_WIFI_PASSWORD";
// MQTT configuration
const char* mqtt_server = "broker.dedot.io";
const int mqtt_port = 8883;
const char* mqtt_username = "device_username";
const char* mqtt_password = "device_password";
const char* client_id = "esp32_device";
// Topic structure
const char* workspace_id = "ws_abc";
const char* device_id = "dev_123";
String status_topic = String(workspace_id) + "/devices/" + String(device_id) + "/status";
String telemetry_topic = String(workspace_id) + "/devices/" + String(device_id) + "/telemetry/sensors";
String commands_topic = String(workspace_id) + "/devices/" + String(device_id) + "/commands/#";
WiFiClient espClient;
PubSubClient client(espClient);
unsigned long lastMsg = 0;
void setup_wifi() {
delay(10);
Serial.println("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
}
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
Serial.print("] ");
String message;
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
Serial.println(message);
// Process commands
if (String(topic).indexOf("/commands/") >= 0) {
DynamicJsonDocument doc(1024);
deserializeJson(doc, message);
if (doc.containsKey("action")) {
String action = doc["action"];
if (action == "restart") {
Serial.println("Restarting device...");
ESP.restart();
}
}
}
}
void reconnect() {
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
// Last Will and Testament
if (client.connect(client_id, mqtt_username, mqtt_password,
status_topic.c_str(), 1, true, "offline")) {
Serial.println("connected");
// Publish online status
client.publish(status_topic.c_str(), "online", true);
// Subscribe to command topics
client.subscribe(commands_topic.c_str(), 1);
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000);
}
}
}
void setup() {
Serial.begin(115200);
setup_wifi();
client.setServer(mqtt_server, mqtt_port);
client.setCallback(callback);
}
void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();
unsigned long now = millis();
if (now - lastMsg > 30000) {
lastMsg = now;
// Create telemetry JSON
DynamicJsonDocument doc(1024);
doc["timestamp"] = now;
doc["temperature"] = random(2000, 3000) / 100.0; // 20.00-30.00
doc["humidity"] = random(3000, 8000) / 100.0; // 30.00-80.00
char buffer[256];
serializeJson(doc, buffer);
// Publish telemetry
client.publish(telemetry_topic.c_str(), buffer, false);
Serial.println("Telemetry published");
}
}Connection Security Best Practices
- Use TLS/SSL: Always connect to secure endpoints (8883/8884) in production
- Implement mutual TLS: For high-security requirements, use client certificates
- Use strong passwords: Generate secure, unique credentials for each device
- Rotate credentials: Implement a credential rotation system for long-lived devices
- Minimize topics: Subscribe only to required topics to reduce attack surface
- Validate payload size: Set reasonable message size limits (default: 256KB)
- Implement backoff: Use exponential backoff for reconnection attempts
- Monitor connections: Track device connection patterns for anomalies
Error Handling and Reconnection
Best practices for maintaining reliable MQTT connections:
// Progressive backoff strategy
let reconnectAttempts = 0;
const maxReconnectAttempts = 10;
const baseReconnectDelay = 1000; // Start with 1 second
function handleReconnect() {
// Calculate exponential backoff delay
const delay = Math.min(
baseReconnectDelay * Math.pow(2, reconnectAttempts),
30000 // Maximum 30 second delay
);
console.log(`Reconnecting in ${delay}ms (attempt ${reconnectAttempts + 1})`);
setTimeout(() => {
reconnectAttempts++;
try {
client.reconnect();
} catch (error) {
console.error('Reconnection failed:', error);
if (reconnectAttempts < maxReconnectAttempts) {
handleReconnect();
} else {
console.error('Maximum reconnection attempts reached');
// Implement recovery strategy (e.g., device reboot)
}
}
}, delay);
}
// Reset reconnect counter on successful connection
client.on('connect', () => {
reconnectAttempts = 0;
});
client.on('offline', handleReconnect);
client.on('error', (error) => {
console.error('Connection error:', error);
handleReconnect();
});MQTT Message Format
The De. platform supports multiple message formats for device telemetry:
JSON Format (Recommended)
// Publishing JSON telemetry
const telemetry = {
timestamp: Date.now(),
values: {
temperature: 23.5,
humidity: 65.2,
pressure: 1013.2
},
metadata: {
firmware: "v1.2.0",
battery: 87
}
};
client.publish(
`${workspaceId}/devices/${deviceId}/telemetry/sensors`,
JSON.stringify(telemetry),
{ qos: 1 }
);CSV Format (Compact)
For bandwidth-constrained devices, CSV format is supported through the Active Rules engine:
// Format: HEAD,field1,field2,field3$
client.publish(
`${workspaceId}/devices/${deviceId}/telemetry/sensors`,
"SENS,23.5,65.2,1013.2$",
{ qos: 0 }
);The CSV format requires a matching Active Rule configuration in the platform to properly parse the data.

