Active Rules Engine
The Active Rules Engine is a core component of De.IoTB that provides schema-driven message parsing, data transformation, and event-driven automation.
Overview
The Active Rules engine serves several critical functions:
- Message Parsing: Converts raw device messages into structured data
- Data Normalization: Transforms values into standardized formats
- Event Triggering: Initiates automated workflows based on rule conditions
- Data Routing: Directs processed data to appropriate destinations
Rule Architecture
The Active Rules engine functions within the container architecture:
┌─────────────────────────────────────────────────┐
│ IoT Device │
└───────────────────────┬─────────────────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ MQTT Broker │
└───────────────────────┬─────────────────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ Channel Manager │
└───────────────────────┬─────────────────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ ┌─────────────────────────────────────────────┐ │
│ │ Active Rules Engine │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────────┐ │ │
│ │ │ Rule Lookup │──────▶│Schema Processor │ │ │
│ │ └─────────────┘ └─────────────────┘ │ │
│ │ │ │ │ │
│ │ ▼ ▼ │ │
│ │ ┌─────────────┐ ┌─────────────────┐ │ │
│ │ │ LRU Cache │◀─────▶│ Type Conversion │ │ │
│ │ └─────────────┘ └─────────────────┘ │ │
│ └─────────────────────────────────────────────┘ │
└───────────────────────┬─────────────────────────┘
│
▼
┌─────────────────────────────────────────────────┐
│ Event Manager │
└─────────────────────────────────────────────────┘Rule Structure
Rules are defined using the following schema:
interface Rule {
id: string // System-generated unique identifier
reference: string // User-provided reference code
icode: string // Internal workspace code
type: 'REPORT' | 'COMMAND' // Message direction
head: string[] // Message identifiers (e.g., ["POS", "POSITION"])
struct: string // Template: "HEAD,{field1},{field2}$"
schema: {
[field: string]: {
type: 'string' | 'number' | 'integer' | 'float' | 'boolean' | 'array'
unit?: string
description?: string
}
}
connectors: string[] // Associated workspace IDs
tags: string[] // Classification tags
description: string // Human-readable description
tested: boolean // Validation status
enabled: boolean // Rule activation status
added: {
at: number // Timestamp
by: string // User identifier
}
}Rule Examples
Example 1: GPS Position Rule
{
"reference": "position.report",
"icode": "DEV-9X00-233",
"type": "REPORT",
"head": ["POS", "POSITION"],
"struct": "HEAD,{latitude},{longitude},{speed},{heading}$",
"schema": {
"latitude": {
"type": "float",
"unit": "degrees",
"description": "Latitude in decimal degrees"
},
"longitude": {
"type": "float",
"unit": "degrees",
"description": "Longitude in decimal degrees"
},
"speed": {
"type": "number",
"unit": "km/h",
"description": "Speed in kilometers per hour"
},
"heading": {
"type": "number",
"unit": "degrees",
"description": "Heading in degrees (0-359)"
}
},
"connectors": ["ws_abc"],
"tags": ["gps", "location", "tracking"],
"description": "Parse GPS position data from tracking devices",
"tested": true,
"enabled": true,
"added": {
"at": 1698765432000,
"by": "[email protected]"
}
}Example 2: Sensor Reading Rule
{
"reference": "sensor.environmental",
"icode": "DEV-9X00-233",
"type": "REPORT",
"head": ["ENV", "ENVIRONMENTAL"],
"struct": "HEAD,{temperature},{humidity},{pressure},{light},{battery}$",
"schema": {
"temperature": {
"type": "float",
"unit": "celsius",
"description": "Temperature in degrees Celsius"
},
"humidity": {
"type": "float",
"unit": "%",
"description": "Relative humidity percentage"
},
"pressure": {
"type": "float",
"unit": "hPa",
"description": "Atmospheric pressure in hectopascals"
},
"light": {
"type": "integer",
"unit": "lux",
"description": "Ambient light level in lux"
},
"battery": {
"type": "integer",
"unit": "%",
"description": "Battery level percentage"
}
},
"connectors": ["ws_abc"],
"tags": ["sensors", "environmental"],
"description": "Parse environmental sensor data",
"tested": true,
"enabled": true,
"added": {
"at": 1698765432000,
"by": "[email protected]"
}
}Example 3: Device Command Rule
{
"reference": "command.configuration",
"icode": "DEV-9X00-233",
"type": "COMMAND",
"head": ["CFG", "CONFIG"],
"struct": "HEAD,{interval},{mode},{led},{verbose}$",
"schema": {
"interval": {
"type": "integer",
"unit": "seconds",
"description": "Reporting interval in seconds"
},
"mode": {
"type": "string",
"description": "Power mode: normal, eco, or power"
},
"led": {
"type": "boolean",
"description": "LED indicator enabled"
},
"verbose": {
"type": "boolean",
"description": "Verbose logging enabled"
}
},
"connectors": ["ws_abc"],
"tags": ["commands", "configuration"],
"description": "Configuration command formatting",
"tested": true,
"enabled": true,
"added": {
"at": 1698765432000,
"by": "[email protected]"
}
}Processing Algorithm
The Active Rules engine processes messages in the following steps:
- Message Reception: Raw message received from device via Channel Manager
- Header Extraction: First element identified as the message header
- Rule Lookup: Header matched against available rules in LRU cache
- Cache Management: If rule not in cache, loaded from database and cached
- Schema Application: Rule schema applied to message structure
- Field Extraction: Individual fields extracted based on structure pattern
- Type Conversion: Values converted according to schema type definitions
- Object Construction: Normalized object constructed from converted values
- Event Publishing: Processed data published to Event Manager
Type Conversion Details
The Active Rules engine performs the following type conversions:
| Schema Type | Input Example | Output | Notes |
|---|---|---|---|
string | "hello" | "hello" | No transformation |
number | "42.5" | 42.5 | Parsed with Number() |
integer | "42.9" | 42 | Truncated to integer |
float | "42.9" | 42.9 | Maintained as floating-point |
boolean | "true" or "1" | true | "false", "0", empty strings become false |
array | "a,b,c" | ["a", "b", "c"] | Comma-separated string split into array |
Creating and Managing Rules
Create a New Rule
import De from '@de./sdk'
const access = new De.Access({
workspace: 'your-workspace-id',
accessToken: 'your-access-token',
env: 'prod'
})
// Create a new rule
const rule = await access.request({
url: '/rules',
method: 'POST',
body: {
reference: 'fuel.level',
type: 'REPORT',
head: ['FUEL', 'FL'],
struct: 'HEAD,{level},{range},{efficiency}$',
schema: {
level: {
type: 'float',
unit: '%',
description: 'Fuel level percentage'
},
range: {
type: 'integer',
unit: 'km',
description: 'Estimated remaining range'
},
efficiency: {
type: 'float',
unit: 'km/L',
description: 'Current fuel efficiency'
}
},
tags: ['vehicle', 'fuel'],
description: 'Vehicle fuel level reporting'
}
})
console.log('Rule created:', rule.data)List Active Rules
// Get all rules
const rules = await access.request({
url: '/rules',
method: 'GET',
query: {
type: 'REPORT', // Filter by type
tags: 'vehicle,tracking', // Filter by tags (comma-separated)
enabled: true, // Only enabled rules
page: 1,
limit: 50
}
})
console.log(`Found ${rules.data.length} rules`)Update an Existing Rule
// Update a rule
const updatedRule = await access.request({
url: `/rules/${ruleId}`,
method: 'PATCH',
body: {
head: ['FUEL', 'FL', 'FUELLEVEL'], // Add alternative header
schema: {
level: {
type: 'float',
unit: '%',
description: 'Updated description'
},
// Add new field
timestamp: {
type: 'integer',
description: 'Timestamp in milliseconds'
}
},
enabled: true // Enable/disable rule
}
})
console.log('Rule updated:', updatedRule.data)Test a Rule
// Test rule against sample data
const testResult = await access.request({
url: '/rules/test',
method: 'POST',
body: {
ruleId: 'rule_123', // Optional, specify rule ID
// Or provide rule definition directly
rule: {
head: ['TEMP'],
struct: 'HEAD,{value},{unit}$',
schema: {
value: { type: 'float' },
unit: { type: 'string' }
}
},
// Sample messages to test
samples: [
'TEMP,23.5,C$',
'TEMP,72.1,F$',
'TEMPERATURE,18.2,C$' // Test alternative header
]
}
})
console.log('Test results:', testResult.data)
// Output:
// {
// success: 2,
// failed: 1,
// results: [
// {
// input: 'TEMP,23.5,C$',
// success: true,
// output: { value: 23.5, unit: 'C' },
// matchedRule: 'rule_123'
// },
// {
// input: 'TEMP,72.1,F$',
// success: true,
// output: { value: 72.1, unit: 'F' },
// matchedRule: 'rule_123'
// },
// {
// input: 'TEMPERATURE,18.2,C$',
// success: false,
// error: 'Header not matched',
// matchedRule: null
// }
// ]
// }Delete a Rule
// Delete a rule
await access.request({
url: `/rules/${ruleId}`,
method: 'DELETE'
})Rule Templates
De.IoTB provides pre-built rule templates for common device types:
// Get available rule templates
const templates = await access.request({
url: '/rules/templates',
method: 'GET'
})
console.log('Available templates:', templates.data)
// Create rule from template
const newRule = await access.request({
url: '/rules/from-template',
method: 'POST',
body: {
templateId: 'gps-tracker',
reference: 'custom.gps',
customizations: {
// Override specific fields
schema: {
altitude: {
type: 'float',
unit: 'meters',
description: 'Altitude above sea level'
}
}
}
}
})Advanced Features
Batch Operations
// Bulk create/update rules
const batchResult = await access.request({
url: '/rules/batch',
method: 'POST',
body: {
rules: [
{
reference: 'rule1',
head: ['HEAD1'],
struct: 'HEAD,{field1},{field2}$',
schema: { /* ... */ }
},
{
reference: 'rule2',
head: ['HEAD2'],
struct: 'HEAD,{fieldA},{fieldB}$',
schema: { /* ... */ }
}
],
operation: 'create' // 'create', 'update', or 'upsert'
}
})
console.log(`Processed ${batchResult.data.successful} rules successfully`)Import/Export
// Export rules
const exportedRules = await access.request({
url: '/rules/export',
method: 'GET',
query: {
format: 'json', // 'json' or 'csv'
tags: 'vehicle' // Optional filter
}
})
// Download export
console.log('Export URL:', exportedRules.data.url)
// Import rules from file
const importResult = await access.request({
url: '/rules/import',
method: 'POST',
body: {
fileUrl: 'https://example.com/rules.json',
operation: 'upsert' // 'create', 'update', or 'upsert'
}
})
console.log(`Imported ${importResult.data.count} rules`)Version Control
// Get rule history
const ruleHistory = await access.request({
url: `/rules/${ruleId}/history`,
method: 'GET'
})
console.log('Rule versions:', ruleHistory.data)
// Restore previous version
await access.request({
url: `/rules/${ruleId}/restore`,
method: 'POST',
body: {
versionId: 'v2' // Version to restore
}
})Performance Optimization
The Active Rules engine uses several optimization techniques:
LRU Cache
The engine implements an LRU (Least Recently Used) cache for frequently accessed rules:
// Configure cache settings
await access.request({
url: '/workspace/config/rules-cache',
method: 'PATCH',
body: {
maxSize: 100, // Maximum rules in cache
ttl: 3600, // Time-to-live in seconds
cleanupInterval: 300 // Cache cleanup interval
}
})
// Get cache statistics
const cacheStats = await access.request({
url: '/rules/cache/stats',
method: 'GET'
})
console.log('Cache stats:', cacheStats.data)
// Output:
// {
// size: 47, // Current cache size
// maxSize: 100, // Maximum cache size
// hits: 12543, // Cache hit count
// misses: 378, // Cache miss count
// hitRate: 0.97, // Cache hit rate (0-1)
// avgAccessTime: 0.8 // Average access time (ms)
// }
// Clear cache (rarely needed)
await access.request({
url: '/rules/cache',
method: 'DELETE'
})Rule Compilation
The engine pre-compiles rules for faster execution:
// Compile all rules
await access.request({
url: '/rules/compile',
method: 'POST'
})Integration with Event System
Rules can trigger events when matched:
// Create event binding for rule
await access.request({
url: '/rules/events',
method: 'POST',
body: {
ruleId: 'rule_123',
events: [
{
condition: {
field: 'temperature',
operator: '>',
value: 30
},
actions: [
{
type: 'notification',
config: {
title: 'High Temperature Alert',
message: 'Temperature exceeded threshold',
severity: 'warning'
}
},
{
type: 'webhook',
config: {
url: 'https://example.com/webhook',
method: 'POST',
headers: {
'Content-Type': 'application/json'
}
}
}
]
}
]
}
})Monitoring
Monitor rule performance and errors:
// Get rule usage statistics
const ruleStats = await access.request({
url: `/rules/${ruleId}/stats`,
method: 'GET',
query: {
period: '24h' // '1h', '24h', '7d', '30d'
}
})
console.log('Rule statistics:', ruleStats.data)
// Output:
// {
// matchCount: 5432, // Number of successful matches
// errorCount: 43, // Number of processing errors
// averageProcessingTime: 1.2, // Average processing time (ms)
// matchRate: 0.98, // Success rate (0-1)
// mostFrequentErrors: [
// { type: 'type_conversion', field: 'temperature', count: 23 },
// { type: 'missing_field', field: 'humidity', count: 12 }
// ]
// }
// Get overall system statistics
const systemStats = await access.request({
url: '/rules/stats',
method: 'GET'
})
console.log('System statistics:', systemStats.data)Security Considerations
Rule Validation
The Active Rules engine performs strict validation to prevent security issues:
- Input Validation: All rule definitions are validated against a strict schema
- Schema Enforcement: Only allowed types and properties are permitted
- Access Control: Rules are scoped to specific workspaces
- Performance Limits: Maximum complexity limits prevent DoS attacks
- Execution Timeouts: Rule processing has time limits
Best Practices
- Rule Testing: Always test rules with sample data before deployment
- Field Validation: Define clear field types and validation rules
- Error Handling: Configure appropriate error handling for rule failures
- Monitoring: Set up alerts for rule processing errors
- Documentation: Document rule purposes and expected message formats
- Version Control: Track rule changes and enable rollbacks if needed

