Pipeline Examples
Production-ready pipeline templates for common logistics workflows.
Order Fulfillment Pipeline
Complete workflow for e-commerce order fulfillment with inventory validation, warehouse selection, and carrier assignment.
Template Configuration
json
{
"name": "E-commerce Order Fulfillment",
"version": "1.0.0",
"description": "Automated order fulfillment from inventory check to carrier assignment",
"stages": [
{
"name": "Validate Inventory",
"description": "Check product availability and reserve stock",
"action": "QUERY",
"query": {
"resource": "INVENTORY",
"filters": {
"sku": "${execution.metadata.sku}",
"quantity": { "$gte": "${execution.metadata.quantity}" },
"status": "AVAILABLE"
},
"strategy": "BEST_MATCH"
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Select Warehouse"
},
"onFailure": {
"type": "SKIP",
"next": "Request Replenishment"
}
},
"timeout": 30000
},
{
"name": "Select Warehouse",
"description": "Find optimal warehouse for order fulfillment",
"action": "QUERY",
"query": {
"resource": "WAREHOUSE",
"filters": {
"type": { "$in": ["FULFILLMENT", "DISTRIBUTION"] },
"status": "OPERATIONAL",
"inventory.sku": "${execution.metadata.sku}",
"serviceLevel.orderProcessing.enabled": true
},
"strategy": "CLOSEST",
"fallbackOptions": [
{
"strategy": "FASTEST",
"filters": {
"status": "OPERATIONAL",
"inventory.sku": "${execution.metadata.sku}"
}
}
]
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Assign Carrier"
},
"onFailure": {
"type": "FAIL",
"next": "Manual Intervention"
}
},
"timeout": 30000
},
{
"name": "Assign Carrier",
"description": "Select delivery carrier based on cost and speed",
"action": "QUERY",
"query": {
"resource": "CARRIER",
"filters": {
"status": "ACTIVE",
"serviceArea": "${execution.results.Select Warehouse.location.zone}",
"capabilities": ["LAST_MILE"]
},
"strategy": "CHEAPEST"
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Create Shipment"
},
"onFailure": {
"type": "RETRY",
"retries": 1,
"retryDelay": 5000
}
},
"timeout": 30000
},
{
"name": "Create Shipment",
"description": "Create shipment in WMS",
"action": "WEBHOOK",
"webhook": {
"url": "${config.wmsUrl}/shipments",
"method": "POST",
"headers": {
"Authorization": "Bearer ${config.wmsApiKey}",
"Content-Type": "application/json"
},
"body": {
"orderId": "${execution.metadata.orderId}",
"warehouseId": "${execution.results.Select Warehouse.id}",
"carrierId": "${execution.results.Assign Carrier.id}",
"items": "${execution.metadata.items}"
},
"timeout": 10000,
"retries": 3
},
"transitions": {
"onSuccess": {
"type": "COMPLETE"
},
"onFailure": {
"type": "FAIL"
}
}
}
],
"config": {
"timeout": 300000,
"concurrency": 50,
"retryPolicy": {
"maxAttempts": 3,
"backoffMultiplier": 2,
"initialDelay": 1000
}
},
"health": {
"simulationEnabled": true,
"simulationInterval": 600,
"alertThresholds": {
"healthScore": 75,
"errorRate": 0.05,
"avgDuration": 120000
}
}
}Usage
typescript
const execution = await client.pipelines.execute(pipelineId, {
metadata: {
orderId: "ORD-12345",
sku: "PROD-001",
quantity: 2,
items: [
{ sku: "PROD-001", quantity: 2, name: "Widget" }
],
deliveryAddress: {
lat: 40.7128,
lng: -74.0060,
city: "New York",
zip: "10001"
}
}
})Cold Chain Inventory Management
Automated monitoring and replenishment for temperature-sensitive inventory.
Template Configuration
json
{
"name": "Cold Chain Inventory Management",
"version": "1.0.0",
"description": "Monitor cold chain inventory and trigger automatic replenishment",
"stages": [
{
"name": "Check Inventory Levels",
"description": "Monitor current stock levels",
"action": "QUERY",
"query": {
"resource": "INVENTORY",
"filters": {
"type": "COLD_CHAIN",
"quantity": { "$lt": "${config.reorderPoint}" },
"status": "ACTIVE"
},
"strategy": "BEST_MATCH"
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Verify Cold Chain Compliance",
"condition": {
"field": "result.quantity",
"operator": "LESS_THAN",
"value": "${config.reorderPoint}"
}
},
"onFailure": {
"type": "COMPLETE"
}
},
"timeout": 30000
},
{
"name": "Verify Cold Chain Compliance",
"description": "Check warehouse cold chain capabilities",
"action": "QUERY",
"query": {
"resource": "WAREHOUSE",
"filters": {
"type": "COLD_CHAIN",
"status": "OPERATIONAL",
"environmental.temperatureControlled": true,
"compliance.certifications": { "$in": ["HACCP", "FDA"] }
},
"strategy": "CLOSEST"
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Create Replenishment Order"
},
"onFailure": {
"type": "FAIL",
"next": "Alert Compliance Team"
}
},
"timeout": 30000
},
{
"name": "Create Replenishment Order",
"description": "Generate purchase order for replenishment",
"action": "WEBHOOK",
"webhook": {
"url": "${config.erpUrl}/purchase-orders",
"method": "POST",
"headers": {
"Authorization": "Bearer ${config.erpApiKey}"
},
"body": {
"sku": "${execution.results.Check Inventory Levels.sku}",
"quantity": "${config.reorderQuantity}",
"warehouseId": "${execution.results.Verify Cold Chain Compliance.id}",
"priority": "HIGH",
"specialHandling": "COLD_CHAIN"
},
"timeout": 10000,
"retries": 3
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Notify Stakeholders"
}
}
},
{
"name": "Notify Stakeholders",
"description": "Send notifications about replenishment",
"action": "WEBHOOK",
"webhook": {
"url": "${config.notificationUrl}/send",
"method": "POST",
"body": {
"type": "INVENTORY_REPLENISHMENT",
"sku": "${execution.results.Check Inventory Levels.sku}",
"currentLevel": "${execution.results.Check Inventory Levels.quantity}",
"orderQuantity": "${config.reorderQuantity}",
"warehouseName": "${execution.results.Verify Cold Chain Compliance.name}"
}
},
"transitions": {
"onSuccess": {
"type": "COMPLETE"
}
}
}
],
"config": {
"timeout": 180000,
"concurrency": 20
}
}Dynamic Route Optimization
Real-time route optimization for last-mile delivery.
Template Configuration
json
{
"name": "Dynamic Route Optimization",
"version": "1.0.0",
"description": "Optimize delivery routes based on real-time traffic and constraints",
"stages": [
{
"name": "Fetch Pending Deliveries",
"description": "Get all pending deliveries for route planning",
"action": "QUERY",
"query": {
"resource": "INVENTORY",
"filters": {
"status": "READY_FOR_DELIVERY",
"deliveryDate": "${execution.metadata.targetDate}",
"zone": "${execution.metadata.zone}"
},
"strategy": "BEST_MATCH"
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Select Available Vehicles"
}
},
"timeout": 30000
},
{
"name": "Select Available Vehicles",
"description": "Find available vehicles for route",
"action": "QUERY",
"query": {
"resource": "VEHICLE",
"filters": {
"status": "AVAILABLE",
"currentLocation.zone": "${execution.metadata.zone}",
"type": { "$in": ["VAN", "TRUCK"] }
},
"strategy": "CLOSEST"
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Calculate Optimal Route"
},
"onFailure": {
"type": "WAIT",
"next": "Select Available Vehicles"
}
},
"timeout": 30000
},
{
"name": "Calculate Optimal Route",
"description": "Use route optimization API",
"action": "WEBHOOK",
"webhook": {
"url": "${config.routingApiUrl}/optimize",
"method": "POST",
"body": {
"deliveries": "${execution.results.Fetch Pending Deliveries}",
"vehicle": "${execution.results.Select Available Vehicles}",
"constraints": {
"maxDuration": 28800,
"maxDistance": 200,
"timeWindows": true
},
"optimization": "DISTANCE"
},
"timeout": 15000
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Assign Route to Driver"
}
}
},
{
"name": "Assign Route to Driver",
"description": "Assign optimized route to driver",
"action": "WEBHOOK",
"webhook": {
"url": "${config.fleetUrl}/routes/assign",
"method": "POST",
"body": {
"vehicleId": "${execution.results.Select Available Vehicles.id}",
"route": "${execution.results.Calculate Optimal Route}",
"estimatedDuration": "${execution.results.Calculate Optimal Route.duration}",
"estimatedDistance": "${execution.results.Calculate Optimal Route.distance}"
}
},
"transitions": {
"onSuccess": {
"type": "COMPLETE"
}
}
}
]
}Multi-Stage Quality Control
Automated quality checks across warehouse operations.
Template Configuration
json
{
"name": "Multi-Stage Quality Control",
"version": "1.0.0",
"description": "Automated quality control for inbound, storage, and outbound operations",
"stages": [
{
"name": "Inbound Quality Check",
"description": "Verify received goods quality",
"action": "WEBHOOK",
"webhook": {
"url": "${config.qmsUrl}/inspections/inbound",
"method": "POST",
"body": {
"shipmentId": "${execution.metadata.shipmentId}",
"items": "${execution.metadata.items}",
"checkpoints": ["QUANTITY", "CONDITION", "DOCUMENTATION"]
},
"timeout": 30000
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Assign Storage Location",
"condition": {
"field": "result.passed",
"operator": "EQUALS",
"value": true
}
},
"onFailure": {
"type": "SKIP",
"next": "Quarantine Items"
}
}
},
{
"name": "Assign Storage Location",
"description": "Find optimal storage location",
"action": "QUERY",
"query": {
"resource": "WAREHOUSE",
"filters": {
"type": { "$in": ["STORAGE", "DISTRIBUTION"] },
"capacity.available": { "$gte": "${execution.metadata.volumeRequired}" },
"zones.temperature": "${execution.metadata.storageRequirements.temperature}"
},
"strategy": "BEST_MATCH"
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Storage Quality Check"
}
},
"timeout": 30000
},
{
"name": "Storage Quality Check",
"description": "Periodic quality checks during storage",
"action": "WAIT",
"wait": {
"type": "DURATION",
"duration": 86400000,
"timeout": 604800000
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Verify Storage Conditions"
}
}
},
{
"name": "Verify Storage Conditions",
"description": "Check environmental conditions",
"action": "WEBHOOK",
"webhook": {
"url": "${config.iotUrl}/sensors/check",
"method": "GET",
"body": {
"warehouseId": "${execution.results.Assign Storage Location.id}",
"location": "${execution.results.Assign Storage Location.zone}",
"metrics": ["TEMPERATURE", "HUMIDITY", "LIGHT"]
}
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Outbound Quality Check",
"condition": {
"field": "result.compliant",
"operator": "EQUALS",
"value": true
}
},
"onFailure": {
"type": "FAIL",
"next": "Alert Quality Team"
}
}
},
{
"name": "Outbound Quality Check",
"description": "Final quality check before shipment",
"action": "WEBHOOK",
"webhook": {
"url": "${config.qmsUrl}/inspections/outbound",
"method": "POST",
"body": {
"orderId": "${execution.metadata.orderId}",
"items": "${execution.metadata.items}",
"checkpoints": ["QUANTITY", "PACKAGING", "LABELING"]
}
},
"transitions": {
"onSuccess": {
"type": "COMPLETE"
},
"onFailure": {
"type": "FAIL",
"next": "Return to Storage"
}
}
}
]
}Cross-Dock Operations
Automated cross-docking workflow for fast-moving goods.
Template Configuration
json
{
"name": "Cross-Dock Operations",
"version": "1.0.0",
"description": "Direct transfer from inbound to outbound without storage",
"stages": [
{
"name": "Verify Inbound Shipment",
"description": "Confirm incoming shipment arrival",
"action": "QUERY",
"query": {
"resource": "TERMINAL",
"filters": {
"type": "CROSS_DOCK",
"status": "OPERATIONAL",
"inbound.shipmentId": "${execution.metadata.shipmentId}"
},
"strategy": "BEST_MATCH"
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Match Outbound Orders"
}
},
"timeout": 30000
},
{
"name": "Match Outbound Orders",
"description": "Find orders ready for cross-dock transfer",
"action": "QUERY",
"query": {
"resource": "INVENTORY",
"filters": {
"status": "PENDING_CROSSDOCK",
"items.sku": { "$in": "${execution.metadata.skus}" },
"priority": "HIGH"
},
"strategy": "BEST_MATCH"
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Assign Outbound Carrier"
}
},
"timeout": 30000
},
{
"name": "Assign Outbound Carrier",
"description": "Select carrier for immediate dispatch",
"action": "QUERY",
"query": {
"resource": "CARRIER",
"filters": {
"status": "AVAILABLE",
"currentLocation": "${execution.results.Verify Inbound Shipment.location}",
"capabilities": ["IMMEDIATE_DISPATCH"]
},
"strategy": "FASTEST"
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Process Cross-Dock Transfer"
}
},
"timeout": 30000
},
{
"name": "Process Cross-Dock Transfer",
"description": "Execute transfer and update systems",
"action": "WEBHOOK",
"webhook": {
"url": "${config.wmsUrl}/cross-dock/transfer",
"method": "POST",
"body": {
"inboundShipmentId": "${execution.metadata.shipmentId}",
"outboundOrders": "${execution.results.Match Outbound Orders}",
"carrierId": "${execution.results.Assign Outbound Carrier.id}",
"terminalId": "${execution.results.Verify Inbound Shipment.id}"
}
},
"transitions": {
"onSuccess": {
"type": "COMPLETE"
}
}
}
],
"config": {
"timeout": 120000,
"concurrency": 30
}
}Predictive Maintenance
Automated vehicle maintenance scheduling based on usage and condition.
Template Configuration
json
{
"name": "Predictive Maintenance Scheduler",
"version": "1.0.0",
"description": "Monitor vehicle health and schedule proactive maintenance",
"stages": [
{
"name": "Check Vehicle Metrics",
"description": "Fetch vehicle telemetry data",
"action": "WEBHOOK",
"webhook": {
"url": "${config.iotUrl}/vehicles/${execution.metadata.vehicleId}/telemetry",
"method": "GET"
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Analyze Maintenance Needs",
"condition": {
"logic": "OR",
"conditions": [
{
"field": "result.mileage",
"operator": "GREATER_THAN",
"value": "${config.maintenanceThreshold.mileage}"
},
{
"field": "result.alerts",
"operator": "EXISTS",
"value": true
}
]
}
},
"onFailure": {
"type": "COMPLETE"
}
},
"timeout": 30000
},
{
"name": "Analyze Maintenance Needs",
"description": "Determine required maintenance type",
"action": "WEBHOOK",
"webhook": {
"url": "${config.maintenanceApiUrl}/analyze",
"method": "POST",
"body": {
"vehicleId": "${execution.metadata.vehicleId}",
"telemetry": "${execution.results.Check Vehicle Metrics}",
"history": "${execution.metadata.maintenanceHistory}"
}
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Find Available Service Center"
}
}
},
{
"name": "Find Available Service Center",
"description": "Locate nearby service center with capacity",
"action": "QUERY",
"query": {
"resource": "TERMINAL",
"filters": {
"type": "MAINTENANCE",
"status": "OPERATIONAL",
"services": { "$in": "${execution.results.Analyze Maintenance Needs.requiredServices}" },
"capacity.available": { "$gt": 0 }
},
"strategy": "CLOSEST"
},
"transitions": {
"onSuccess": {
"type": "CONTINUE",
"next": "Schedule Maintenance"
}
},
"timeout": 30000
},
{
"name": "Schedule Maintenance",
"description": "Book maintenance appointment",
"action": "WEBHOOK",
"webhook": {
"url": "${config.fleetUrl}/maintenance/schedule",
"method": "POST",
"body": {
"vehicleId": "${execution.metadata.vehicleId}",
"serviceCenterId": "${execution.results.Find Available Service Center.id}",
"services": "${execution.results.Analyze Maintenance Needs.requiredServices}",
"priority": "${execution.results.Analyze Maintenance Needs.priority}",
"estimatedDuration": "${execution.results.Analyze Maintenance Needs.duration}"
}
},
"transitions": {
"onSuccess": {
"type": "COMPLETE"
}
}
}
]
}Testing Pipelines
Test Data Generator
Use this helper to generate test executions:
typescript
class PipelineTestHelper {
static generateOrderTestData() {
return {
orderId: `TEST-${Date.now()}`,
sku: "TEST-WIDGET-001",
quantity: Math.floor(Math.random() * 10) + 1,
deliveryAddress: {
lat: 40.7128 + (Math.random() - 0.5) * 0.1,
lng: -74.0060 + (Math.random() - 0.5) * 0.1,
city: "New York",
zip: "10001"
}
}
}
static async runTest(client, pipelineId, testData) {
const execution = await client.pipelines.execute(pipelineId, {
metadata: testData,
tags: ['test', 'automated']
})
// Wait for completion
let status = execution.status
while (status === 'PENDING' || status === 'RUNNING') {
await new Promise(resolve => setTimeout(resolve, 2000))
const updated = await client.pipelines.executions.get(execution.id)
status = updated.status
}
return await client.pipelines.executions.get(execution.id)
}
}
// Run tests
const testData = PipelineTestHelper.generateOrderTestData()
const result = await PipelineTestHelper.runTest(client, pipelineId, testData)
console.log('Test result:', result.status)
