Skip to content

Device Connectivity

This guide explains how to connect IoT devices to the De. platform using MQTT protocol.

MQTT Protocol Overview

MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol designed for constrained devices and low-bandwidth, high-latency networks. It's the primary communication protocol used by De.IoTB for device-to-cloud and cloud-to-device messaging.

Key MQTT Features

  • Publish/Subscribe Model: Devices publish data to topics and subscribe to receive messages
  • Quality of Service (QoS) levels:
    • QoS 0: At most once (fire and forget)
    • QoS 1: At least once (guaranteed delivery)
    • QoS 2: Exactly once (guaranteed delivery with deduplication)
  • Last Will and Testament: Automatic status updates when devices disconnect abnormally
  • Retained Messages: Store the last known value on a topic
  • Persistent Sessions: Restore subscriptions and missed messages on reconnect
  • Topic-based Filtering: Fine-grained control over message routing

Broker Connection Information

MQTT Broker Endpoints

ProtocolHostnamePortDescription
MQTTbroker.dedot.io1883Unencrypted TCP
MQTTSbroker.dedot.io8883TLS/SSL encrypted
WSbroker.dedot.io8083WebSocket
WSSbroker.dedot.io8084WebSocket with TLS/SSL

Connection Options

javascript
const mqttOptions = {
  // Client ID format: dedot_{namespace}_{deviceId}_{timestamp}
  clientId: `dedot_${workspaceId}_${deviceId}_${Date.now()}`,
  
  // Authentication
  username: credentials.username,
  password: credentials.password,
  
  // Connection settings
  clean: false,           // Persistent sessions for reliability
  keepalive: 60,          // Heartbeat interval (seconds)
  connectTimeout: 30000,  // Connection timeout (ms)
  
  // TLS/SSL Configuration (for secure connections)
  rejectUnauthorized: true,  // Validate server certificate
  // ca: fs.readFileSync('./ca.crt'),  // Custom CA if needed
  
  // Last Will and Testament (LWT)
  will: {
    topic: `${workspaceId}/devices/${deviceId}/status`,
    payload: 'offline',
    qos: 1,
    retain: true
  }
}

Device Authentication

De.IoTB supports two authentication methods for devices:

Token-based Authentication

Most common method for device authentication:

javascript
const mqtt = require('mqtt')

const client = mqtt.connect('mqtts://broker.dedot.io:8883', {
  clientId: 'device_123',
  username: deviceCredentials.username,
  password: deviceCredentials.token
})

Certificate-based Authentication (mTLS)

For higher security requirements:

javascript
const mqtt = require('mqtt')
const fs = require('fs')

const client = mqtt.connect('mqtts://broker.dedot.io:8883', {
  clientId: 'device_123',
  key: fs.readFileSync('./device_key.pem'),
  cert: fs.readFileSync('./device_cert.pem'),
  ca: fs.readFileSync('./ca_cert.pem'),
  rejectUnauthorized: true
})

Topic Structure

The De. platform uses a standardized topic structure to organize messages:

{workspace_id}/devices/{device_id}/{category}/{type}

Category Types

  • telemetry: Device-to-cloud data streams
  • commands: Cloud-to-device instructions
  • status: Device state information
  • config: Device configuration
  • events: Device events and alerts

Standard Topics

Device Telemetry:

ws_abc/devices/dev_123/telemetry/location
ws_abc/devices/dev_123/telemetry/sensors
ws_abc/devices/dev_123/telemetry/battery

Command & Control:

ws_abc/devices/dev_123/commands/reboot
ws_abc/devices/dev_123/commands/config
ws_abc/devices/dev_123/commands/firmware

Status Updates:

ws_abc/devices/dev_123/status
ws_abc/devices/dev_123/events/errors

QoS Recommendations

Message TypeRecommended QoSReason
Critical telemetryQoS 1Ensure delivery without duplicates
Regular sensor dataQoS 0Maximize throughput, occasional loss acceptable
Device commandsQoS 1Ensure command delivery
Critical commandsQoS 2Ensure exactly-once execution
Status updatesQoS 1 with retain flagEnsure delivery and persistence
Bulk dataQoS 0Optimize for throughput

Connection Examples

JavaScript (Node.js)

javascript
const mqtt = require('mqtt')

const client = mqtt.connect('mqtts://broker.dedot.io:8883', {
  clientId: `device_${deviceId}_${Date.now()}`,
  username: credentials.username,
  password: credentials.password,
  clean: false,
  keepalive: 60,
  will: {
    topic: `${workspaceId}/devices/${deviceId}/status`,
    payload: 'offline',
    qos: 1,
    retain: true
  }
})

client.on('connect', () => {
  console.log('Connected to MQTT broker')
  
  // Publish online status
  client.publish(
    `${workspaceId}/devices/${deviceId}/status`,
    'online',
    { qos: 1, retain: true },
    (err) => {
      if (!err) console.log('Online status published')
    }
  )
  
  // Subscribe to command topics
  client.subscribe(`${workspaceId}/devices/${deviceId}/commands/#`, { qos: 1 })
})

client.on('message', (topic, payload) => {
  console.log(`Message received on topic ${topic}:`, payload.toString())
  
  // Process command
  if (topic.includes('/commands/')) {
    const command = JSON.parse(payload.toString())
    executeCommand(command)
  }
})

client.on('error', (error) => {
  console.error('Connection error:', error)
})

client.on('offline', () => {
  console.log('Client went offline, attempting to reconnect')
})

// Publish telemetry data every 30 seconds
setInterval(() => {
  const telemetryData = {
    timestamp: Date.now(),
    temperature: 23.5,
    humidity: 45.2,
    pressure: 1013
  }
  
  client.publish(
    `${workspaceId}/devices/${deviceId}/telemetry/sensors`,
    JSON.stringify(telemetryData),
    { qos: 1 },
    (err) => {
      if (err) console.error('Failed to publish telemetry:', err)
    }
  )
}, 30000)

Python

python
import json
import time
import paho.mqtt.client as mqtt

# Callback for successful connection
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to MQTT Broker!")
        # Publish online status with retain flag
        client.publish(
            f"{workspace_id}/devices/{device_id}/status",
            "online",
            qos=1,
            retain=True
        )
        # Subscribe to commands
        client.subscribe(f"{workspace_id}/devices/{device_id}/commands/#", qos=1)
    else:
        print(f"Failed to connect, return code: {rc}")

# Callback for received messages
def on_message(client, userdata, msg):
    print(f"Message received on topic {msg.topic}: {msg.payload.decode()}")
    
    # Process command
    if "/commands/" in msg.topic:
        try:
            command = json.loads(msg.payload.decode())
            process_command(command)
        except json.JSONDecodeError:
            print("Invalid command format")

# Set up MQTT client
client = mqtt.Client(client_id=f"device_{device_id}_{int(time.time())}")
client.username_pw_set(credentials['username'], credentials['password'])
client.on_connect = on_connect
client.on_message = on_message

# Set up Last Will and Testament
client.will_set(
    f"{workspace_id}/devices/{device_id}/status",
    "offline",
    qos=1,
    retain=True
)

# Connect to the broker
client.connect("broker.dedot.io", 8883, keepalive=60)

# Start network loop
client.loop_start()

try:
    # Publish telemetry data every 30 seconds
    while True:
        telemetry_data = {
            "timestamp": int(time.time() * 1000),
            "temperature": 23.5,
            "humidity": 45.2
        }
        
        client.publish(
            f"{workspace_id}/devices/{device_id}/telemetry/sensors",
            json.dumps(telemetry_data),
            qos=1
        )
        
        time.sleep(30)
except KeyboardInterrupt:
    # Clean disconnect
    client.publish(
        f"{workspace_id}/devices/{device_id}/status",
        "offline",
        qos=1,
        retain=True
    )
    client.disconnect()
    client.loop_stop()

Arduino/ESP32

cpp
#include <WiFi.h>
#include <PubSubClient.h>
#include <ArduinoJson.h>

// WiFi credentials
const char* ssid = "YOUR_WIFI_SSID";
const char* password = "YOUR_WIFI_PASSWORD";

// MQTT configuration
const char* mqtt_server = "broker.dedot.io";
const int mqtt_port = 8883;
const char* mqtt_username = "device_username";
const char* mqtt_password = "device_password";
const char* client_id = "esp32_device";

// Topic structure
const char* workspace_id = "ws_abc";
const char* device_id = "dev_123";
String status_topic = String(workspace_id) + "/devices/" + String(device_id) + "/status";
String telemetry_topic = String(workspace_id) + "/devices/" + String(device_id) + "/telemetry/sensors";
String commands_topic = String(workspace_id) + "/devices/" + String(device_id) + "/commands/#";

WiFiClient espClient;
PubSubClient client(espClient);
unsigned long lastMsg = 0;

void setup_wifi() {
  delay(10);
  Serial.println("Connecting to WiFi...");
  
  WiFi.begin(ssid, password);
  
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
  
  Serial.println("WiFi connected");
  Serial.println("IP address: ");
  Serial.println(WiFi.localIP());
}

void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("] ");
  
  String message;
  for (int i = 0; i < length; i++) {
    message += (char)payload[i];
  }
  Serial.println(message);
  
  // Process commands
  if (String(topic).indexOf("/commands/") >= 0) {
    DynamicJsonDocument doc(1024);
    deserializeJson(doc, message);
    
    if (doc.containsKey("action")) {
      String action = doc["action"];
      if (action == "restart") {
        Serial.println("Restarting device...");
        ESP.restart();
      }
    }
  }
}

void reconnect() {
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    
    // Last Will and Testament
    if (client.connect(client_id, mqtt_username, mqtt_password, 
                      status_topic.c_str(), 1, true, "offline")) {
      Serial.println("connected");
      
      // Publish online status
      client.publish(status_topic.c_str(), "online", true);
      
      // Subscribe to command topics
      client.subscribe(commands_topic.c_str(), 1);
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      delay(5000);
    }
  }
}

void setup() {
  Serial.begin(115200);
  setup_wifi();
  client.setServer(mqtt_server, mqtt_port);
  client.setCallback(callback);
}

void loop() {
  if (!client.connected()) {
    reconnect();
  }
  client.loop();
  
  unsigned long now = millis();
  if (now - lastMsg > 30000) {
    lastMsg = now;
    
    // Create telemetry JSON
    DynamicJsonDocument doc(1024);
    doc["timestamp"] = now;
    doc["temperature"] = random(2000, 3000) / 100.0;  // 20.00-30.00
    doc["humidity"] = random(3000, 8000) / 100.0;     // 30.00-80.00
    
    char buffer[256];
    serializeJson(doc, buffer);
    
    // Publish telemetry
    client.publish(telemetry_topic.c_str(), buffer, false);
    Serial.println("Telemetry published");
  }
}

Connection Security Best Practices

  1. Use TLS/SSL: Always connect to secure endpoints (8883/8884) in production
  2. Implement mutual TLS: For high-security requirements, use client certificates
  3. Use strong passwords: Generate secure, unique credentials for each device
  4. Rotate credentials: Implement a credential rotation system for long-lived devices
  5. Minimize topics: Subscribe only to required topics to reduce attack surface
  6. Validate payload size: Set reasonable message size limits (default: 256KB)
  7. Implement backoff: Use exponential backoff for reconnection attempts
  8. Monitor connections: Track device connection patterns for anomalies

Error Handling and Reconnection

Best practices for maintaining reliable MQTT connections:

javascript
// Progressive backoff strategy
let reconnectAttempts = 0;
const maxReconnectAttempts = 10;
const baseReconnectDelay = 1000; // Start with 1 second

function handleReconnect() {
  // Calculate exponential backoff delay
  const delay = Math.min(
    baseReconnectDelay * Math.pow(2, reconnectAttempts),
    30000 // Maximum 30 second delay
  );
  
  console.log(`Reconnecting in ${delay}ms (attempt ${reconnectAttempts + 1})`);
  
  setTimeout(() => {
    reconnectAttempts++;
    
    try {
      client.reconnect();
    } catch (error) {
      console.error('Reconnection failed:', error);
      
      if (reconnectAttempts < maxReconnectAttempts) {
        handleReconnect();
      } else {
        console.error('Maximum reconnection attempts reached');
        // Implement recovery strategy (e.g., device reboot)
      }
    }
  }, delay);
}

// Reset reconnect counter on successful connection
client.on('connect', () => {
  reconnectAttempts = 0;
});

client.on('offline', handleReconnect);
client.on('error', (error) => {
  console.error('Connection error:', error);
  handleReconnect();
});

MQTT Message Format

The De. platform supports multiple message formats for device telemetry:

javascript
// Publishing JSON telemetry
const telemetry = {
  timestamp: Date.now(),
  values: {
    temperature: 23.5,
    humidity: 65.2,
    pressure: 1013.2
  },
  metadata: {
    firmware: "v1.2.0",
    battery: 87
  }
};

client.publish(
  `${workspaceId}/devices/${deviceId}/telemetry/sensors`,
  JSON.stringify(telemetry),
  { qos: 1 }
);

CSV Format (Compact)

For bandwidth-constrained devices, CSV format is supported through the Active Rules engine:

javascript
// Format: HEAD,field1,field2,field3$
client.publish(
  `${workspaceId}/devices/${deviceId}/telemetry/sensors`,
  "SENS,23.5,65.2,1013.2$",
  { qos: 0 }
);

The CSV format requires a matching Active Rule configuration in the platform to properly parse the data.

Next Steps