Skip to content

Telemetry Data Collection & Processing

This guide explains how to collect, process, and manage IoT device telemetry data on the De. platform.

Telemetry Overview

Telemetry in IoT refers to the automatic transmission of data from remote devices to a central system for monitoring, analysis, and control purposes. De.IoTB provides a comprehensive framework for handling telemetry data throughout its lifecycle, from ingestion to storage and analysis.

Data Flow Architecture

Telemetry Data Flow
IoT Device MQTT Broker Topic: .../telemetry/... Channel Manager Subscription Handler Active Rules Data Normalization Event Manager Data Routing Database Record Storage Client Application

The telemetry data flow involves the following steps:

  1. Data Ingestion: Device publishes telemetry data to MQTT topics
  2. Channel Management: MQTT client subscribes to topic patterns and receives messages
  3. Data Normalization: Active Rules engine parses and transforms raw data
  4. Event Routing: Event Manager directs normalized data to appropriate destinations
  5. Data Storage: Normalized data is stored in MongoDB collections
  6. Real-time Delivery: Events are delivered to subscribed clients via Socket.IO

Telemetry Topic Structure

The De. platform uses a consistent topic structure for telemetry data:

{workspace_id}/devices/{device_id}/telemetry/{category}

Common Telemetry Categories

CategoryDescriptionExample Data
locationDevice location data{ lat: 37.7749, lng: -122.4194, accuracy: 5 }
sensorsSensor readings{ temperature: 22.5, humidity: 65.2 }
statusDevice status information{ battery: 87, signal: 72, memory: 45 }
diagnosticsSystem diagnostics{ cpu: 12, memory: 45, storage: 68 }
networkNetwork connectivity{ strength: 82, type: "4G", latency: 120 }

Example Telemetry Topics

ws_abc/devices/dev_123/telemetry/location
ws_abc/devices/dev_123/telemetry/sensors
ws_abc/devices/dev_123/telemetry/status
ws_abc/devices/dev_123/telemetry/diagnostics

Telemetry Data Formats

De.IoTB supports multiple data formats to accommodate different device capabilities and use cases.

json
{
  "timestamp": 1737429743000,
  "value": 23.5,
  "unit": "celsius",
  "accuracy": 0.1,
  "metadata": {
    "sensorId": "temp-1",
    "location": "outdoor"
  }
}

CSV Format with Active Rules

For bandwidth-constrained devices, sending compact CSV format:

TEMP,23.5,0.1,temp-1,outdoor$

This format requires an Active Rule configuration:

javascript
const rule = {
  head: ["TEMP", "TEMPERATURE"],  // Message identifiers
  struct: "HEAD,{value},{accuracy},{sensorId},{location}$",
  schema: {
    value: { type: "float", unit: "celsius" },
    accuracy: { type: "float" },
    sensorId: { type: "string" },
    location: { type: "string" }
  }
}

Binary Data Support

For even more efficient transmission, De.IoTB supports binary data formats:

javascript
// On the device (using Protocol Buffers)
const payload = proto.encode({
  timestamp: Date.now(),
  value: 23.5,
  accuracy: 0.1
})

// Publish binary data
client.publish(`${workspaceId}/devices/${deviceId}/telemetry/sensors`, payload)

To process binary data, you need a corresponding decoder in your Active Rule.

Publishing Telemetry

MQTT Publishing (Device-Side)

javascript
// JavaScript example (Node.js)
const mqtt = require('mqtt')

const client = mqtt.connect('mqtts://broker.dedot.io:8883', {
  clientId: `device_${deviceId}_${Date.now()}`,
  username: credentials.username,
  password: credentials.password
})

// Simple telemetry publishing
function publishTelemetry(category, data) {
  const topic = `${workspaceId}/devices/${deviceId}/telemetry/${category}`
  const payload = JSON.stringify({
    ...data,
    timestamp: Date.now()
  })
  
  client.publish(topic, payload, { qos: 1 }, (err) => {
    if (err) console.error(`Failed to publish to ${topic}:`, err)
  })
}

// Publish sensor data every minute
setInterval(() => {
  publishTelemetry('sensors', {
    temperature: 22.5 + (Math.random() * 2 - 1),  // Random variation
    humidity: 65 + (Math.random() * 5 - 2.5),
    pressure: 1013.25 + (Math.random() * 1 - 0.5)
  })
}, 60000)

// Publish device status every 5 minutes
setInterval(() => {
  publishTelemetry('status', {
    battery: Math.floor(batteryLevel),
    signal: Math.floor(signalStrength),
    uptime: process.uptime(),
    memory: process.memoryUsage().heapUsed / 1024 / 1024
  })
}, 300000)

HTTP API (Alternative)

For devices that can't use MQTT:

javascript
// Using fetch API
async function sendTelemetry(category, data) {
  try {
    const response = await fetch(`https://iot.dedot.io/devices/${deviceId}/telemetry/${category}`, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${accessToken}`
      },
      body: JSON.stringify({
        ...data,
        timestamp: Date.now()
      })
    })
    
    if (!response.ok) {
      throw new Error(`HTTP error ${response.status}`)
    }
    
    return await response.json()
  } catch (error) {
    console.error('Failed to send telemetry:', error)
  }
}

Telemetry Processing

De.IoTB processes telemetry data through the Active Rules engine.

Active Rules for Telemetry

The Active Rules engine provides schema-based parsing and normalization of device messages:

javascript
// Server-side rule definition
const temperatureRule = {
  head: ["TEMP", "TEMPERATURE"],
  struct: "HEAD,{value},{accuracy},{sensorId},{location}$",
  schema: {
    value: { 
      type: "float",
      unit: "celsius",
      description: "Temperature reading"
    },
    accuracy: { 
      type: "float",
      description: "Reading accuracy"
    },
    sensorId: { 
      type: "string",
      description: "Sensor identifier"
    },
    location: { 
      type: "string",
      description: "Sensor location"
    }
  },
  connectors: ["ws_abc"],  // Applicable to specific workspace
  enabled: true
}

When a device sends a message like "TEMP,23.5,0.1,temp-1,outdoor$", the Active Rules engine:

  1. Identifies the message format from the "TEMP" header
  2. Applies the matching rule schema
  3. Extracts and converts fields to their specified types
  4. Normalizes the data to a structured JSON object

Type Transformations

The Active Rules engine performs automatic type conversion:

TypeDescriptionExample InputTransformed Output
stringNo transformation"hello""hello"
numberParse as number"42"42
integerParse as integer"42.9"42
floatParse as floating point"42.9"42.9
booleanConvert to boolean"true" or "1"true
arraySplit comma-separated values"a,b,c"["a", "b", "c"]

Data Storage

De.IoTB stores telemetry data in MongoDB collections with optimized indexing.

Records Collection

Telemetry records are stored in the records collection with the following schema:

typescript
interface TelemetryRecord {
  _id: ObjectId            // MongoDB document ID
  nsp: string              // Namespace (workspace)
  chn: string              // Channel ID
  type: 'REPORT'           // Record type (telemetry is REPORT)
  status: 'SUCCESS' | 'FAILED'
  topic: string            // Full MQTT topic
  rule?: string            // Matched rule reference
  qos?: number             // MQTT QoS level
  raw: string              // Original raw message
  normalized: object       // Parsed data object
  timestamp: number        // Message timestamp
}

Query API

Retrieve telemetry data through the De. SDK:

javascript
// Get latest telemetry for a device
const latestTelemetry = await access.request({
  url: `/devices/${deviceId}/telemetry/latest`,
  method: 'GET',
  query: {
    category: 'sensors'  // Optional filter by category
  }
})

console.log('Latest telemetry:', latestTelemetry.data)

// Query historical telemetry
const telemetryHistory = await access.request({
  url: `/devices/${deviceId}/telemetry`,
  method: 'GET',
  query: {
    category: 'sensors',
    startTime: '2026-01-01T00:00:00.000Z',
    endTime: '2026-01-19T23:59:59.999Z',
    limit: 100,
    page: 1,
    sort: 'timestamp:desc'
  }
})

console.log(`Retrieved ${telemetryHistory.data.length} records`)

Aggregation & Statistics

javascript
// Get statistics for device telemetry
const statistics = await access.request({
  url: `/devices/${deviceId}/telemetry/stats`,
  method: 'GET',
  query: {
    category: 'sensors',
    field: 'temperature',
    timeframe: '24h'  // '1h', '24h', '7d', '30d'
  }
})

console.log('Temperature statistics:', statistics.data)
// Output:
// {
//   count: 1440,         // Number of data points
//   min: 19.2,           // Minimum value
//   max: 26.8,           // Maximum value
//   avg: 22.7,           // Average value
//   stdDev: 1.2,         // Standard deviation
//   latest: 23.5,        // Most recent value
//   trend: "+0.2/h"      // Trend (change per hour)
// }

Real-time Data Access

De.IoTB enables real-time access to telemetry data via Socket.IO.

Socket.IO Client Integration

javascript
import io from 'socket.io-client'

const socket = io('wss://iot.dedot.io', {
  auth: {
    token: accessToken,
    workspace: workspaceId
  }
})

// Subscribe to specific device telemetry
socket.emit('subscribe:device', {
  deviceId: 'dev_123',
  categories: ['sensors', 'location']
})

// Listen for telemetry events
socket.on('telemetry', (data) => {
  console.log(`Received ${data.category} data:`, data)
  
  // Update UI with real-time data
  updateDashboard(data)
})

// Subscribe to multiple devices
socket.emit('subscribe:devices', {
  deviceIds: ['dev_123', 'dev_456', 'dev_789'],
  categories: ['location']  // Just track locations
})

// You can also subscribe by group
socket.emit('subscribe:group', {
  groupId: 'delivery-fleet',
  categories: ['location', 'status']
})

// Unsubscribe when no longer needed
socket.emit('unsubscribe:device', { deviceId: 'dev_123' })

Rate Limiting

De.IoTB implements rate limiting for real-time data subscriptions:

javascript
// Configure rate limits when provisioning a device
const deviceConfig = await access.request({
  url: `/devices/${deviceId}/config`,
  method: 'PATCH',
  body: {
    telemetry: {
      throttling: {
        sensors: 60,     // One message per minute
        location: 30,    // One message per 30 seconds
        status: 300      // One message per 5 minutes
      },
      aggregation: {
        enabled: true,   // Enable server-side aggregation
        interval: 60     // Aggregate into 60-second buckets
      }
    }
  }
})

Data Retention & Time-Series Storage

De.IoTB provides configurable data retention policies:

javascript
// Configure workspace-level data retention
const retention = await access.request({
  url: '/workspace/config/retention',
  method: 'PATCH',
  body: {
    telemetry: {
      default: '30d',    // Keep all telemetry for 30 days
      location: '90d',   // Keep location data for 90 days
      sensors: {
        raw: '7d',       // Keep raw data for 7 days
        hourly: '180d',  // Keep hourly aggregates for 180 days
        daily: '3y'      // Keep daily aggregates for 3 years
      }
    }
  }
})

Advanced Features

Time-Series Analysis

javascript
// Perform time-series analysis on telemetry data
const analysis = await access.request({
  url: `/devices/${deviceId}/telemetry/analysis`,
  method: 'POST',
  body: {
    category: 'sensors',
    field: 'temperature',
    timeframe: '7d',
    operations: [
      { type: 'movingAverage', window: 5 },
      { type: 'outlierDetection', sensitivity: 2 },
      { type: 'forecast', horizon: 24 }
    ]
  }
})

console.log('Analysis results:', analysis.data)

Data Export

javascript
// Export telemetry data
const exportUrl = await access.request({
  url: '/telemetry/export',
  method: 'POST',
  body: {
    deviceIds: ['dev_123', 'dev_456'],
    categories: ['sensors', 'location'],
    timeframe: {
      start: '2026-01-01T00:00:00.000Z',
      end: '2026-01-31T23:59:59.999Z'
    },
    format: 'csv'  // 'csv', 'json', or 'excel'
  }
})

console.log('Export ready at:', exportUrl.data.url)

Batch Imports

javascript
// Import historical telemetry data
const importJob = await access.request({
  url: `/devices/${deviceId}/telemetry/import`,
  method: 'POST',
  body: {
    category: 'sensors',
    format: 'csv',
    mapping: {
      timestamp: 'column_a',
      temperature: 'column_b',
      humidity: 'column_c'
    },
    fileUrl: 'https://storage.example.com/historical-data.csv'
  }
})

console.log('Import job started:', importJob.data.jobId)

Best Practices

  1. Optimize Message Size: Keep telemetry messages compact to reduce bandwidth
  2. Use Appropriate QoS: Select QoS levels based on data criticality
  3. Include Timestamps: Always include device-side timestamps in telemetry
  4. Batch Small Readings: Combine frequent small readings for efficiency
  5. Use Categories: Organize telemetry into logical categories
  6. Normalize Units: Use consistent units across your device fleet
  7. Set Retention Policies: Configure data retention based on business needs
  8. Use Data Aggregation: Enable server-side aggregation for high-frequency data

Next Steps