Skip to content

Integration Guide

Step-by-step guide to building and deploying your first De. Pipeline for logistics automation.

Prerequisites

Before integrating De. Pipelines, ensure you have:

  • De. Workspace - Active workspace with pipeline service enabled
  • Authentication - Valid API credentials (see Authentication Guide)
  • SDK Installed - De. SDK for your platform (Web or React Native)
  • Context Understanding - Familiar with pipeline context (wid, xcode, type)

Quick Start (15 minutes)

Let's build a complete order fulfillment pipeline from scratch.

Step 1: Define Pipeline Template

Create a template that automates order fulfillment:

typescript
import { DeClient } from '@dedot/sdk'

const client = new DeClient({
  apiKey: 'your-api-key',
  workspace: 'your-workspace-id'
})

// Define the pipeline template
const template = {
  name: "Order Fulfillment Pipeline",
  description: "Automate order fulfillment from validation to carrier assignment",
  version: "1.0.0",
  
  stages: [
    {
      name: "Validate Inventory",
      description: "Check if products are in stock",
      action: "QUERY",
      query: {
        resource: "INVENTORY",
        filters: {
          sku: "${execution.metadata.sku}",
          quantity: { $gte: "${execution.metadata.quantity}" }
        },
        strategy: "BEST_MATCH"
      },
      transitions: {
        onSuccess: {
          type: "CONTINUE",
          next: "Select Warehouse",
          condition: {
            field: "result.available",
            operator: "EQUALS",
            value: true
          }
        },
        onFailure: {
          type: "FAIL",
          next: "Handle Out of Stock"
        }
      },
      timeout: 30000,
      retries: 2
    },
    
    {
      name: "Select Warehouse",
      description: "Find closest warehouse with capacity",
      action: "QUERY",
      query: {
        resource: "WAREHOUSE",
        filters: {
          status: "OPERATIONAL",
          "inventory.sku": "${execution.metadata.sku}",
          "capacity.available": { $gt: 0 }
        },
        strategy: "CLOSEST",
        fallbackOptions: [
          {
            strategy: "FASTEST",
            filters: { status: "OPERATIONAL" }
          }
        ]
      },
      transitions: {
        onSuccess: {
          type: "CONTINUE",
          next: "Assign Carrier"
        },
        onFailure: {
          type: "FAIL",
          next: "Manual Review"
        }
      },
      timeout: 30000
    },
    
    {
      name: "Assign Carrier",
      description: "Select optimal carrier for delivery",
      action: "QUERY",
      query: {
        resource: "CARRIER",
        filters: {
          status: "ACTIVE",
          serviceArea: "${execution.results.Select Warehouse.location.zone}",
          vehicleType: ["VAN", "TRUCK"]
        },
        strategy: "CHEAPEST"
      },
      transitions: {
        onSuccess: {
          type: "CONTINUE",
          next: "Notify Systems"
        },
        onFailure: {
          type: "SKIP",
          next: "Select Warehouse",  // Try different warehouse
          retries: 1
        }
      },
      timeout: 30000
    },
    
    {
      name: "Notify Systems",
      description: "Send notifications to external systems",
      action: "WEBHOOK",
      webhook: {
        url: "https://api.yourcompany.com/orders/fulfill",
        method: "POST",
        headers: {
          "Authorization": "Bearer ${config.apiToken}",
          "Content-Type": "application/json"
        },
        body: {
          orderId: "${execution.metadata.orderId}",
          warehouseId: "${execution.results.Select Warehouse.id}",
          carrierId: "${execution.results.Assign Carrier.id}",
          estimatedDelivery: "${execution.results.Assign Carrier.estimatedDelivery}"
        },
        timeout: 5000,
        retries: 3
      },
      transitions: {
        onSuccess: {
          type: "COMPLETE"
        },
        onFailure: {
          type: "RETRY",
          retries: 2,
          retryDelay: 5000
        }
      }
    }
  ],
  
  config: {
    timeout: 300000,  // 5 minutes max
    concurrency: 10,   // Max 10 parallel executions
    retryPolicy: {
      maxAttempts: 3,
      backoffMultiplier: 2,
      initialDelay: 1000
    }
  },
  
  health: {
    simulationEnabled: true,
    simulationInterval: 600,  // 10 minutes
    alertThresholds: {
      healthScore: 70,
      errorRate: 0.1,
      avgDuration: 120000  // 2 minutes
    }
  }
}

// Create the template
const createdTemplate = await client.pipelines.templates.create(template)
console.log('Template created:', createdTemplate.id)

Step 2: Validate & Deploy Pipeline

Validate the template and deploy it as an active pipeline:

typescript
// Validate template
const validation = await client.pipelines.templates.validate(
  createdTemplate.id
)

if (!validation.valid) {
  console.error('Validation errors:', validation.errors)
  throw new Error('Template validation failed')
}

console.log('Validation passed:', validation.checks)

// Deploy pipeline from template
const pipeline = await client.pipelines.deploy({
  templateId: createdTemplate.id,
  name: "Production Order Fulfillment",
  description: "Active fulfillment pipeline for all orders",
  
  // Override template config if needed
  config: {
    concurrency: 20  // Higher concurrency for production
  }
})

console.log('Pipeline deployed:', pipeline.id)
console.log('Status:', pipeline.status.current)  // 'ACTIVE'

Step 3: Execute Pipeline

Trigger pipeline execution with order data:

typescript
// Execute pipeline for a specific order
const execution = await client.pipelines.execute(pipeline.id, {
  // Execution metadata - available as ${execution.metadata.*}
  metadata: {
    orderId: "ORD-12345",
    sku: "WIDGET-001",
    quantity: 5,
    customerId: "CUST-789",
    deliveryAddress: {
      lat: 40.7128,
      lng: -74.0060,
      city: "New York",
      zip: "10001"
    }
  },
  
  // Optional: Override pipeline config for this execution
  config: {
    timeout: 180000  // 3 minutes for this specific order
  }
})

console.log('Execution started:', execution.id)
console.log('Current stage:', execution.stages[execution.currentStageIndex].name)

Step 4: Monitor Execution

Track execution progress in real-time:

typescript
// Poll execution status
async function waitForCompletion(executionId: string) {
  let status = 'PENDING'
  
  while (status === 'PENDING' || status === 'RUNNING') {
    const execution = await client.pipelines.executions.get(executionId)
    status = execution.status
    
    console.log(`Status: ${status}`)
    console.log(`Current stage: ${execution.stages[execution.currentStageIndex].name}`)
    
    if (status === 'RUNNING') {
      const currentStage = execution.stages[execution.currentStageIndex]
      console.log(`  Stage status: ${currentStage.status}`)
      if (currentStage.result) {
        console.log(`  Stage result:`, currentStage.result)
      }
    }
    
    // Wait before next check
    await new Promise(resolve => setTimeout(resolve, 2000))
  }
  
  return await client.pipelines.executions.get(executionId)
}

const completed = await waitForCompletion(execution.id)

if (completed.status === 'SUCCESS') {
  console.log('✅ Order fulfilled successfully!')
  console.log('Results:', completed.results)
  console.log('Duration:', completed.duration, 'ms')
} else {
  console.error('❌ Execution failed:', completed.error)
}

Step 5: View Pipeline Health

Check pipeline health and performance:

typescript
// Get pipeline with health metrics
const pipelineStatus = await client.pipelines.get(pipeline.id)

console.log('Pipeline Health:', {
  status: pipelineStatus.status.current,
  healthScore: pipelineStatus.healthScore,
  lastHealthCheck: pipelineStatus.lastHealthCheck,
  metrics: {
    totalExecutions: pipelineStatus.metrics.totalExecutions,
    successRate: pipelineStatus.metrics.successRate,
    avgDuration: pipelineStatus.metrics.avgDuration,
    errorRate: pipelineStatus.metrics.errorRate
  }
})

// View recent simulations
if (pipelineStatus.lastSimulation) {
  console.log('Last Simulation:', {
    success: pipelineStatus.lastSimulation.success,
    duration: pipelineStatus.lastSimulation.duration,
    timestamp: pipelineStatus.lastSimulation.completedAt
  })
}

// Check for alerts
if (pipelineStatus.alerts?.length > 0) {
  console.warn('⚠️ Active Alerts:')
  pipelineStatus.alerts.forEach(alert => {
    console.warn(`  - ${alert.severity}: ${alert.message}`)
  })
}

Complete Integration Example

Here's a production-ready integration with error handling:

typescript
import { DeClient } from '@dedot/sdk'

class OrderFulfillmentService {
  private client: DeClient
  private pipelineId: string
  
  constructor(apiKey: string, workspaceId: string) {
    this.client = new DeClient({ apiKey, workspace: workspaceId })
  }
  
  async initialize() {
    // Check if pipeline already exists
    const pipelines = await this.client.pipelines.list({
      filters: { name: "Order Fulfillment Pipeline" }
    })
    
    if (pipelines.length > 0) {
      this.pipelineId = pipelines[0].id
      console.log('Using existing pipeline:', this.pipelineId)
      return
    }
    
    // Create and deploy new pipeline
    const template = await this.createTemplate()
    const validation = await this.client.pipelines.templates.validate(template.id)
    
    if (!validation.valid) {
      throw new Error(`Template validation failed: ${validation.errors.join(', ')}`)
    }
    
    const pipeline = await this.client.pipelines.deploy({
      templateId: template.id,
      name: "Order Fulfillment Pipeline"
    })
    
    this.pipelineId = pipeline.id
    console.log('Pipeline deployed:', this.pipelineId)
  }
  
  async fulfillOrder(order: Order): Promise<FulfillmentResult> {
    try {
      // Execute pipeline
      const execution = await this.client.pipelines.execute(this.pipelineId, {
        metadata: {
          orderId: order.id,
          sku: order.items[0].sku,
          quantity: order.items[0].quantity,
          deliveryAddress: order.deliveryAddress
        }
      })
      
      // Wait for completion (with timeout)
      const result = await this.waitForCompletion(execution.id, 300000)
      
      if (result.status === 'SUCCESS') {
        return {
          success: true,
          warehouseId: result.results['Select Warehouse']?.id,
          carrierId: result.results['Assign Carrier']?.id,
          estimatedDelivery: result.results['Assign Carrier']?.estimatedDelivery,
          duration: result.duration
        }
      } else {
        return {
          success: false,
          error: result.error?.message || 'Unknown error',
          failedStage: result.stages[result.currentStageIndex]?.name
        }
      }
    } catch (error) {
      console.error('Order fulfillment error:', error)
      return {
        success: false,
        error: error.message
      }
    }
  }
  
  async getPipelineHealth(): Promise<HealthReport> {
    const pipeline = await this.client.pipelines.get(this.pipelineId)
    
    return {
      status: pipeline.status.current,
      healthScore: pipeline.healthScore,
      metrics: pipeline.metrics,
      alerts: pipeline.alerts || []
    }
  }
  
  private async waitForCompletion(executionId: string, timeout: number) {
    const startTime = Date.now()
    
    while (Date.now() - startTime < timeout) {
      const execution = await this.client.pipelines.executions.get(executionId)
      
      if (execution.status === 'SUCCESS' || execution.status === 'FAILED') {
        return execution
      }
      
      await new Promise(resolve => setTimeout(resolve, 2000))
    }
    
    throw new Error('Execution timeout')
  }
  
  private async createTemplate() {
    // Template creation logic from Step 1
    // ...
  }
}

// Usage
const service = new OrderFulfillmentService(
  process.env.DE_API_KEY,
  process.env.DE_WORKSPACE_ID
)

await service.initialize()

// Fulfill orders
const order = await getOrder('ORD-12345')
const result = await service.fulfillOrder(order)

if (result.success) {
  console.log('✅ Order fulfilled:', result)
} else {
  console.error('❌ Fulfillment failed:', result.error)
}

// Monitor health
const health = await service.getPipelineHealth()
console.log('Pipeline health:', health)

Advanced Features

Dynamic Query Filters

Use execution metadata in query filters:

typescript
{
  name: "Select Warehouse",
  action: "QUERY",
  query: {
    resource: "WAREHOUSE",
    filters: {
      // Access execution metadata
      region: "${execution.metadata.deliveryAddress.region}",
      
      // Access previous stage results
      "inventory.sku": "${execution.results.Validate Inventory.sku}",
      
      // Complex expressions
      "capacity.available": {
        $gte: "${execution.metadata.quantity * 1.2}"  // 20% buffer
      }
    }
  }
}

Conditional Transitions

Complex branching logic:

typescript
{
  transitions: {
    onSuccess: {
      type: "CONTINUE",
      next: "Express Shipping",
      condition: {
        logic: "AND",
        conditions: [
          {
            field: "execution.metadata.priority",
            operator: "EQUALS",
            value: "HIGH"
          },
          {
            field: "result.distance",
            operator: "LESS_THAN",
            value: 50
          }
        ]
      }
    },
    onFailure: {
      type: "SKIP",
      next: "Standard Shipping"
    }
  }
}

Webhook Authentication

Secure webhook requests:

typescript
{
  action: "WEBHOOK",
  webhook: {
    url: "https://api.partner.com/fulfill",
    method: "POST",
    headers: {
      "Authorization": "Bearer ${config.partnerApiKey}",
      "X-Signature": "${generateSignature(body, config.webhookSecret)}",
      "X-Request-ID": "${execution.id}"
    },
    body: {
      orderId: "${execution.metadata.orderId}",
      timestamp: "${Date.now()}"
    }
  }
}

Parallel Stage Execution

Execute multiple stages concurrently:

typescript
{
  name: "Parallel Resource Selection",
  action: "QUERY",
  parallel: [
    {
      name: "Select Warehouse",
      query: { resource: "WAREHOUSE", /* ... */ }
    },
    {
      name: "Select Carrier",
      query: { resource: "CARRIER", /* ... */ }
    },
    {
      name: "Check Inventory",
      query: { resource: "INVENTORY", /* ... */ }
    }
  ],
  // Wait for all parallel actions to complete
  waitForAll: true,
  timeout: 30000
}

Testing Pipelines

Manual Testing

Execute pipeline with test data:

typescript
const testExecution = await client.pipelines.execute(pipelineId, {
  metadata: {
    orderId: `TEST-${Date.now()}`,
    sku: "TEST-WIDGET",
    quantity: 1,
    deliveryAddress: {
      lat: 40.7128,
      lng: -74.0060
    }
  },
  tags: ['test', 'manual']
})

Simulation Mode

Enable automatic simulations:

typescript
await client.pipelines.update(pipelineId, {
  health: {
    simulationEnabled: true,
    simulationInterval: 300,  // Every 5 minutes
    testData: {
      orderId: "SIM-${timestamp}",
      sku: "TEST-ITEM",
      quantity: 1
    }
  }
})

// View simulation results
const pipeline = await client.pipelines.get(pipelineId)
console.log('Last simulation:', pipeline.lastSimulation)

Dry Run Mode

Test without side effects:

typescript
const dryRun = await client.pipelines.executions.dryRun(pipelineId, {
  metadata: { /* test data */ }
})

console.log('Dry run results:')
dryRun.stages.forEach(stage => {
  console.log(`${stage.name}: ${stage.predictedResult}`)
})
console.log('Estimated duration:', dryRun.estimatedDuration)

Production Best Practices

1. Error Handling

Always handle pipeline failures:

typescript
try {
  const result = await fulfillOrder(order)
  if (!result.success) {
    // Log failure
    await logFailure(order.id, result.error, result.failedStage)
    
    // Trigger fallback
    await fallbackFulfillment(order)
    
    // Alert operations team
    await sendAlert('Order fulfillment failed', order.id)
  }
} catch (error) {
  await handleCriticalError(error)
}

2. Monitoring & Alerts

Set up health monitoring:

typescript
// Check pipeline health every 5 minutes
setInterval(async () => {
  const health = await service.getPipelineHealth()
  
  if (health.healthScore < 70) {
    await sendAlert('Pipeline health degraded', {
      score: health.healthScore,
      status: health.status,
      alerts: health.alerts
    })
  }
}, 300000)

3. Resource Limits

Configure appropriate limits:

typescript
const pipeline = await client.pipelines.deploy({
  templateId: template.id,
  config: {
    concurrency: 50,        // Max 50 parallel executions
    timeout: 300000,        // 5 minute timeout
    maxExecutionsPerHour: 10000,  // Rate limit
    retryPolicy: {
      maxAttempts: 3,
      backoffMultiplier: 2
    }
  }
})

4. Version Management

Use template versioning:

typescript
// Create new version
const v2Template = await client.pipelines.templates.create({
  ...originalTemplate,
  version: "2.0.0",
  stages: updatedStages
})

// Deploy side-by-side
const v2Pipeline = await client.pipelines.deploy({
  templateId: v2Template.id,
  name: "Order Fulfillment v2"
})

// Gradually shift traffic
// ... migrate executions ...

// Disable old version
await client.pipelines.update(oldPipelineId, {
  status: 'DISABLED'
})

Troubleshooting

Common Issues

Execution Stuck in PENDING

  • Check worker status: Workers may be stopped
  • Verify context: Ensure workspace has active workers
  • Check logs: Look for initialization errors

High Failure Rate

  • Review failed executions: Identify common failure patterns
  • Check resource availability: Ensure warehouses/carriers are active
  • Validate filters: Filters may be too restrictive

Slow Execution

  • Check stage timeouts: May need adjustment
  • Review query complexity: Optimize filters
  • Monitor database performance: Add indexes if needed

Debug Mode

Enable detailed logging:

typescript
const execution = await client.pipelines.execute(pipelineId, {
  metadata: { /* ... */ },
  debug: true  // Enable debug logging
})

// View detailed logs
const logs = await client.pipelines.executions.getLogs(execution.id)
console.log('Execution logs:', logs)

Next Steps