Skip to content

Pipeline Examples

Production-ready pipeline templates for common logistics workflows.

Order Fulfillment Pipeline

Complete workflow for e-commerce order fulfillment with inventory validation, warehouse selection, and carrier assignment.

Template Configuration

json
{
  "name": "E-commerce Order Fulfillment",
  "version": "1.0.0",
  "description": "Automated order fulfillment from inventory check to carrier assignment",
  "stages": [
    {
      "name": "Validate Inventory",
      "description": "Check product availability and reserve stock",
      "action": "QUERY",
      "query": {
        "resource": "INVENTORY",
        "filters": {
          "sku": "${execution.metadata.sku}",
          "quantity": { "$gte": "${execution.metadata.quantity}" },
          "status": "AVAILABLE"
        },
        "strategy": "BEST_MATCH"
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Select Warehouse"
        },
        "onFailure": {
          "type": "SKIP",
          "next": "Request Replenishment"
        }
      },
      "timeout": 30000
    },
    {
      "name": "Select Warehouse",
      "description": "Find optimal warehouse for order fulfillment",
      "action": "QUERY",
      "query": {
        "resource": "WAREHOUSE",
        "filters": {
          "type": { "$in": ["FULFILLMENT", "DISTRIBUTION"] },
          "status": "OPERATIONAL",
          "inventory.sku": "${execution.metadata.sku}",
          "serviceLevel.orderProcessing.enabled": true
        },
        "strategy": "CLOSEST",
        "fallbackOptions": [
          {
            "strategy": "FASTEST",
            "filters": {
              "status": "OPERATIONAL",
              "inventory.sku": "${execution.metadata.sku}"
            }
          }
        ]
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Assign Carrier"
        },
        "onFailure": {
          "type": "FAIL",
          "next": "Manual Intervention"
        }
      },
      "timeout": 30000
    },
    {
      "name": "Assign Carrier",
      "description": "Select delivery carrier based on cost and speed",
      "action": "QUERY",
      "query": {
        "resource": "CARRIER",
        "filters": {
          "status": "ACTIVE",
          "serviceArea": "${execution.results.Select Warehouse.location.zone}",
          "capabilities": ["LAST_MILE"]
        },
        "strategy": "CHEAPEST"
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Create Shipment"
        },
        "onFailure": {
          "type": "RETRY",
          "retries": 1,
          "retryDelay": 5000
        }
      },
      "timeout": 30000
    },
    {
      "name": "Create Shipment",
      "description": "Create shipment in WMS",
      "action": "WEBHOOK",
      "webhook": {
        "url": "${config.wmsUrl}/shipments",
        "method": "POST",
        "headers": {
          "Authorization": "Bearer ${config.wmsApiKey}",
          "Content-Type": "application/json"
        },
        "body": {
          "orderId": "${execution.metadata.orderId}",
          "warehouseId": "${execution.results.Select Warehouse.id}",
          "carrierId": "${execution.results.Assign Carrier.id}",
          "items": "${execution.metadata.items}"
        },
        "timeout": 10000,
        "retries": 3
      },
      "transitions": {
        "onSuccess": {
          "type": "COMPLETE"
        },
        "onFailure": {
          "type": "FAIL"
        }
      }
    }
  ],
  "config": {
    "timeout": 300000,
    "concurrency": 50,
    "retryPolicy": {
      "maxAttempts": 3,
      "backoffMultiplier": 2,
      "initialDelay": 1000
    }
  },
  "health": {
    "simulationEnabled": true,
    "simulationInterval": 600,
    "alertThresholds": {
      "healthScore": 75,
      "errorRate": 0.05,
      "avgDuration": 120000
    }
  }
}

Usage

typescript
const execution = await client.pipelines.execute(pipelineId, {
  metadata: {
    orderId: "ORD-12345",
    sku: "PROD-001",
    quantity: 2,
    items: [
      { sku: "PROD-001", quantity: 2, name: "Widget" }
    ],
    deliveryAddress: {
      lat: 40.7128,
      lng: -74.0060,
      city: "New York",
      zip: "10001"
    }
  }
})

Cold Chain Inventory Management

Automated monitoring and replenishment for temperature-sensitive inventory.

Template Configuration

json
{
  "name": "Cold Chain Inventory Management",
  "version": "1.0.0",
  "description": "Monitor cold chain inventory and trigger automatic replenishment",
  "stages": [
    {
      "name": "Check Inventory Levels",
      "description": "Monitor current stock levels",
      "action": "QUERY",
      "query": {
        "resource": "INVENTORY",
        "filters": {
          "type": "COLD_CHAIN",
          "quantity": { "$lt": "${config.reorderPoint}" },
          "status": "ACTIVE"
        },
        "strategy": "BEST_MATCH"
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Verify Cold Chain Compliance",
          "condition": {
            "field": "result.quantity",
            "operator": "LESS_THAN",
            "value": "${config.reorderPoint}"
          }
        },
        "onFailure": {
          "type": "COMPLETE"
        }
      },
      "timeout": 30000
    },
    {
      "name": "Verify Cold Chain Compliance",
      "description": "Check warehouse cold chain capabilities",
      "action": "QUERY",
      "query": {
        "resource": "WAREHOUSE",
        "filters": {
          "type": "COLD_CHAIN",
          "status": "OPERATIONAL",
          "environmental.temperatureControlled": true,
          "compliance.certifications": { "$in": ["HACCP", "FDA"] }
        },
        "strategy": "CLOSEST"
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Create Replenishment Order"
        },
        "onFailure": {
          "type": "FAIL",
          "next": "Alert Compliance Team"
        }
      },
      "timeout": 30000
    },
    {
      "name": "Create Replenishment Order",
      "description": "Generate purchase order for replenishment",
      "action": "WEBHOOK",
      "webhook": {
        "url": "${config.erpUrl}/purchase-orders",
        "method": "POST",
        "headers": {
          "Authorization": "Bearer ${config.erpApiKey}"
        },
        "body": {
          "sku": "${execution.results.Check Inventory Levels.sku}",
          "quantity": "${config.reorderQuantity}",
          "warehouseId": "${execution.results.Verify Cold Chain Compliance.id}",
          "priority": "HIGH",
          "specialHandling": "COLD_CHAIN"
        },
        "timeout": 10000,
        "retries": 3
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Notify Stakeholders"
        }
      }
    },
    {
      "name": "Notify Stakeholders",
      "description": "Send notifications about replenishment",
      "action": "WEBHOOK",
      "webhook": {
        "url": "${config.notificationUrl}/send",
        "method": "POST",
        "body": {
          "type": "INVENTORY_REPLENISHMENT",
          "sku": "${execution.results.Check Inventory Levels.sku}",
          "currentLevel": "${execution.results.Check Inventory Levels.quantity}",
          "orderQuantity": "${config.reorderQuantity}",
          "warehouseName": "${execution.results.Verify Cold Chain Compliance.name}"
        }
      },
      "transitions": {
        "onSuccess": {
          "type": "COMPLETE"
        }
      }
    }
  ],
  "config": {
    "timeout": 180000,
    "concurrency": 20
  }
}

Dynamic Route Optimization

Real-time route optimization for last-mile delivery.

Template Configuration

json
{
  "name": "Dynamic Route Optimization",
  "version": "1.0.0",
  "description": "Optimize delivery routes based on real-time traffic and constraints",
  "stages": [
    {
      "name": "Fetch Pending Deliveries",
      "description": "Get all pending deliveries for route planning",
      "action": "QUERY",
      "query": {
        "resource": "INVENTORY",
        "filters": {
          "status": "READY_FOR_DELIVERY",
          "deliveryDate": "${execution.metadata.targetDate}",
          "zone": "${execution.metadata.zone}"
        },
        "strategy": "BEST_MATCH"
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Select Available Vehicles"
        }
      },
      "timeout": 30000
    },
    {
      "name": "Select Available Vehicles",
      "description": "Find available vehicles for route",
      "action": "QUERY",
      "query": {
        "resource": "VEHICLE",
        "filters": {
          "status": "AVAILABLE",
          "currentLocation.zone": "${execution.metadata.zone}",
          "type": { "$in": ["VAN", "TRUCK"] }
        },
        "strategy": "CLOSEST"
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Calculate Optimal Route"
        },
        "onFailure": {
          "type": "WAIT",
          "next": "Select Available Vehicles"
        }
      },
      "timeout": 30000
    },
    {
      "name": "Calculate Optimal Route",
      "description": "Use route optimization API",
      "action": "WEBHOOK",
      "webhook": {
        "url": "${config.routingApiUrl}/optimize",
        "method": "POST",
        "body": {
          "deliveries": "${execution.results.Fetch Pending Deliveries}",
          "vehicle": "${execution.results.Select Available Vehicles}",
          "constraints": {
            "maxDuration": 28800,
            "maxDistance": 200,
            "timeWindows": true
          },
          "optimization": "DISTANCE"
        },
        "timeout": 15000
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Assign Route to Driver"
        }
      }
    },
    {
      "name": "Assign Route to Driver",
      "description": "Assign optimized route to driver",
      "action": "WEBHOOK",
      "webhook": {
        "url": "${config.fleetUrl}/routes/assign",
        "method": "POST",
        "body": {
          "vehicleId": "${execution.results.Select Available Vehicles.id}",
          "route": "${execution.results.Calculate Optimal Route}",
          "estimatedDuration": "${execution.results.Calculate Optimal Route.duration}",
          "estimatedDistance": "${execution.results.Calculate Optimal Route.distance}"
        }
      },
      "transitions": {
        "onSuccess": {
          "type": "COMPLETE"
        }
      }
    }
  ]
}

Multi-Stage Quality Control

Automated quality checks across warehouse operations.

Template Configuration

json
{
  "name": "Multi-Stage Quality Control",
  "version": "1.0.0",
  "description": "Automated quality control for inbound, storage, and outbound operations",
  "stages": [
    {
      "name": "Inbound Quality Check",
      "description": "Verify received goods quality",
      "action": "WEBHOOK",
      "webhook": {
        "url": "${config.qmsUrl}/inspections/inbound",
        "method": "POST",
        "body": {
          "shipmentId": "${execution.metadata.shipmentId}",
          "items": "${execution.metadata.items}",
          "checkpoints": ["QUANTITY", "CONDITION", "DOCUMENTATION"]
        },
        "timeout": 30000
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Assign Storage Location",
          "condition": {
            "field": "result.passed",
            "operator": "EQUALS",
            "value": true
          }
        },
        "onFailure": {
          "type": "SKIP",
          "next": "Quarantine Items"
        }
      }
    },
    {
      "name": "Assign Storage Location",
      "description": "Find optimal storage location",
      "action": "QUERY",
      "query": {
        "resource": "WAREHOUSE",
        "filters": {
          "type": { "$in": ["STORAGE", "DISTRIBUTION"] },
          "capacity.available": { "$gte": "${execution.metadata.volumeRequired}" },
          "zones.temperature": "${execution.metadata.storageRequirements.temperature}"
        },
        "strategy": "BEST_MATCH"
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Storage Quality Check"
        }
      },
      "timeout": 30000
    },
    {
      "name": "Storage Quality Check",
      "description": "Periodic quality checks during storage",
      "action": "WAIT",
      "wait": {
        "type": "DURATION",
        "duration": 86400000,
        "timeout": 604800000
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Verify Storage Conditions"
        }
      }
    },
    {
      "name": "Verify Storage Conditions",
      "description": "Check environmental conditions",
      "action": "WEBHOOK",
      "webhook": {
        "url": "${config.iotUrl}/sensors/check",
        "method": "GET",
        "body": {
          "warehouseId": "${execution.results.Assign Storage Location.id}",
          "location": "${execution.results.Assign Storage Location.zone}",
          "metrics": ["TEMPERATURE", "HUMIDITY", "LIGHT"]
        }
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Outbound Quality Check",
          "condition": {
            "field": "result.compliant",
            "operator": "EQUALS",
            "value": true
          }
        },
        "onFailure": {
          "type": "FAIL",
          "next": "Alert Quality Team"
        }
      }
    },
    {
      "name": "Outbound Quality Check",
      "description": "Final quality check before shipment",
      "action": "WEBHOOK",
      "webhook": {
        "url": "${config.qmsUrl}/inspections/outbound",
        "method": "POST",
        "body": {
          "orderId": "${execution.metadata.orderId}",
          "items": "${execution.metadata.items}",
          "checkpoints": ["QUANTITY", "PACKAGING", "LABELING"]
        }
      },
      "transitions": {
        "onSuccess": {
          "type": "COMPLETE"
        },
        "onFailure": {
          "type": "FAIL",
          "next": "Return to Storage"
        }
      }
    }
  ]
}

Cross-Dock Operations

Automated cross-docking workflow for fast-moving goods.

Template Configuration

json
{
  "name": "Cross-Dock Operations",
  "version": "1.0.0",
  "description": "Direct transfer from inbound to outbound without storage",
  "stages": [
    {
      "name": "Verify Inbound Shipment",
      "description": "Confirm incoming shipment arrival",
      "action": "QUERY",
      "query": {
        "resource": "TERMINAL",
        "filters": {
          "type": "CROSS_DOCK",
          "status": "OPERATIONAL",
          "inbound.shipmentId": "${execution.metadata.shipmentId}"
        },
        "strategy": "BEST_MATCH"
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Match Outbound Orders"
        }
      },
      "timeout": 30000
    },
    {
      "name": "Match Outbound Orders",
      "description": "Find orders ready for cross-dock transfer",
      "action": "QUERY",
      "query": {
        "resource": "INVENTORY",
        "filters": {
          "status": "PENDING_CROSSDOCK",
          "items.sku": { "$in": "${execution.metadata.skus}" },
          "priority": "HIGH"
        },
        "strategy": "BEST_MATCH"
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Assign Outbound Carrier"
        }
      },
      "timeout": 30000
    },
    {
      "name": "Assign Outbound Carrier",
      "description": "Select carrier for immediate dispatch",
      "action": "QUERY",
      "query": {
        "resource": "CARRIER",
        "filters": {
          "status": "AVAILABLE",
          "currentLocation": "${execution.results.Verify Inbound Shipment.location}",
          "capabilities": ["IMMEDIATE_DISPATCH"]
        },
        "strategy": "FASTEST"
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Process Cross-Dock Transfer"
        }
      },
      "timeout": 30000
    },
    {
      "name": "Process Cross-Dock Transfer",
      "description": "Execute transfer and update systems",
      "action": "WEBHOOK",
      "webhook": {
        "url": "${config.wmsUrl}/cross-dock/transfer",
        "method": "POST",
        "body": {
          "inboundShipmentId": "${execution.metadata.shipmentId}",
          "outboundOrders": "${execution.results.Match Outbound Orders}",
          "carrierId": "${execution.results.Assign Outbound Carrier.id}",
          "terminalId": "${execution.results.Verify Inbound Shipment.id}"
        }
      },
      "transitions": {
        "onSuccess": {
          "type": "COMPLETE"
        }
      }
    }
  ],
  "config": {
    "timeout": 120000,
    "concurrency": 30
  }
}

Predictive Maintenance

Automated vehicle maintenance scheduling based on usage and condition.

Template Configuration

json
{
  "name": "Predictive Maintenance Scheduler",
  "version": "1.0.0",
  "description": "Monitor vehicle health and schedule proactive maintenance",
  "stages": [
    {
      "name": "Check Vehicle Metrics",
      "description": "Fetch vehicle telemetry data",
      "action": "WEBHOOK",
      "webhook": {
        "url": "${config.iotUrl}/vehicles/${execution.metadata.vehicleId}/telemetry",
        "method": "GET"
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Analyze Maintenance Needs",
          "condition": {
            "logic": "OR",
            "conditions": [
              {
                "field": "result.mileage",
                "operator": "GREATER_THAN",
                "value": "${config.maintenanceThreshold.mileage}"
              },
              {
                "field": "result.alerts",
                "operator": "EXISTS",
                "value": true
              }
            ]
          }
        },
        "onFailure": {
          "type": "COMPLETE"
        }
      },
      "timeout": 30000
    },
    {
      "name": "Analyze Maintenance Needs",
      "description": "Determine required maintenance type",
      "action": "WEBHOOK",
      "webhook": {
        "url": "${config.maintenanceApiUrl}/analyze",
        "method": "POST",
        "body": {
          "vehicleId": "${execution.metadata.vehicleId}",
          "telemetry": "${execution.results.Check Vehicle Metrics}",
          "history": "${execution.metadata.maintenanceHistory}"
        }
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Find Available Service Center"
        }
      }
    },
    {
      "name": "Find Available Service Center",
      "description": "Locate nearby service center with capacity",
      "action": "QUERY",
      "query": {
        "resource": "TERMINAL",
        "filters": {
          "type": "MAINTENANCE",
          "status": "OPERATIONAL",
          "services": { "$in": "${execution.results.Analyze Maintenance Needs.requiredServices}" },
          "capacity.available": { "$gt": 0 }
        },
        "strategy": "CLOSEST"
      },
      "transitions": {
        "onSuccess": {
          "type": "CONTINUE",
          "next": "Schedule Maintenance"
        }
      },
      "timeout": 30000
    },
    {
      "name": "Schedule Maintenance",
      "description": "Book maintenance appointment",
      "action": "WEBHOOK",
      "webhook": {
        "url": "${config.fleetUrl}/maintenance/schedule",
        "method": "POST",
        "body": {
          "vehicleId": "${execution.metadata.vehicleId}",
          "serviceCenterId": "${execution.results.Find Available Service Center.id}",
          "services": "${execution.results.Analyze Maintenance Needs.requiredServices}",
          "priority": "${execution.results.Analyze Maintenance Needs.priority}",
          "estimatedDuration": "${execution.results.Analyze Maintenance Needs.duration}"
        }
      },
      "transitions": {
        "onSuccess": {
          "type": "COMPLETE"
        }
      }
    }
  ]
}

Testing Pipelines

Test Data Generator

Use this helper to generate test executions:

typescript
class PipelineTestHelper {
  static generateOrderTestData() {
    return {
      orderId: `TEST-${Date.now()}`,
      sku: "TEST-WIDGET-001",
      quantity: Math.floor(Math.random() * 10) + 1,
      deliveryAddress: {
        lat: 40.7128 + (Math.random() - 0.5) * 0.1,
        lng: -74.0060 + (Math.random() - 0.5) * 0.1,
        city: "New York",
        zip: "10001"
      }
    }
  }
  
  static async runTest(client, pipelineId, testData) {
    const execution = await client.pipelines.execute(pipelineId, {
      metadata: testData,
      tags: ['test', 'automated']
    })
    
    // Wait for completion
    let status = execution.status
    while (status === 'PENDING' || status === 'RUNNING') {
      await new Promise(resolve => setTimeout(resolve, 2000))
      const updated = await client.pipelines.executions.get(execution.id)
      status = updated.status
    }
    
    return await client.pipelines.executions.get(execution.id)
  }
}

// Run tests
const testData = PipelineTestHelper.generateOrderTestData()
const result = await PipelineTestHelper.runTest(client, pipelineId, testData)
console.log('Test result:', result.status)

Next Steps