Pipeline Architecture
Deep dive into the pipeline orchestration system's design, components, and execution model.
System Overview
De. Pipelines are built on a three-tier architecture that separates design, validation, and execution concerns:
┌─────────────────────────────────────────────────────────────┐
│ De. Pipeline Architecture │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Design Layer - Pipeline Templates │ │
│ │ • Template creation and versioning │ │
│ │ • Stage definitions and transitions │ │
│ │ • Query configurations │ │
│ └──────────────────┬──────────────────────────────────┘ │
│ │ Validation │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Validation Layer - Deployed Pipelines │ │
│ │ • Resource verification │ │
│ │ • Capability checks │ │
│ │ • Health monitoring setup │ │
│ │ • Performance baselining │ │
│ └──────────────────┬──────────────────────────────────┘ │
│ │ Instantiation │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Execution Layer - Pipeline Executions │ │
│ │ • Active workflow instances │ │
│ │ • Stage progression tracking │ │
│ │ • Real-time state management │ │
│ │ • Results and metrics collection │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Worker System - Background Processing │ │
│ │ • Transition Worker (30s) │ │
│ │ • Monitoring Worker (60s) │ │
│ │ • Health Worker (300s) │ │
│ │ • Webhook Worker (10s) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘Core Components
1. Pipeline Templates
Purpose: Reusable workflow blueprints
Schema:
interface PipelineTemplate {
id: string
name: string
description?: string
version: string
// Workflow definition
stages: Stage[]
metadata: Record<string, any>
// Configuration
config: {
timeout?: number
retryPolicy?: RetryPolicy
concurrency?: number
}
// Health monitoring
health?: {
simulationEnabled: boolean
simulationInterval: number
alertThresholds: HealthThresholds
}
// Lifecycle
status: 'DRAFT' | 'PUBLISHED' | 'DEPRECATED'
created: Timestamp
updated: Timestamp
}Lifecycle:
DRAFT- Under development, can be editedPUBLISHED- Available for deploymentDEPRECATED- Archived, no new deployments
2. Stages
Purpose: Individual workflow steps with actions and transitions
Schema:
interface Stage {
name: string
description?: string
// Action to perform
action: 'QUERY' | 'WEBHOOK' | 'WAIT'
// Action-specific config
query?: QueryConfig
webhook?: WebhookConfig
wait?: WaitConfig
// Transition logic
transitions: {
onSuccess?: Transition
onFailure?: Transition
onTimeout?: Transition
}
// Execution constraints
timeout?: number
retries?: number
retryDelay?: number
}Stage States:
PENDING- Awaiting executionRUNNING- Currently executingSUCCESS- Completed successfullyFAILED- Execution failedTIMEOUT- Exceeded time limitSKIPPED- Bypassed by transition logic
3. Query System
Purpose: Dynamic resource selection with fallback strategies
Architecture:
interface QueryConfig {
resource: ResourceType
filters: Record<string, any>
strategy: SelectionStrategy
fallbackOptions?: FallbackOption[]
cacheDuration?: number
}
type ResourceType =
| 'WAREHOUSE'
| 'CARRIER'
| 'TERMINAL'
| 'ROUTE'
| 'INVENTORY'
| 'VEHICLE'
type SelectionStrategy =
| 'BEST_MATCH' // Highest score
| 'CLOSEST' // Geographic proximity
| 'FASTEST' // Shortest time
| 'CHEAPEST' // Lowest cost
| 'HIGHEST_CAPACITY'
| 'HIGHEST_RATED'
| 'RANDOM' // Load balancingQuery Execution Flow:
1. Apply filters to resource collection
2. Score remaining candidates using strategy
3. Select highest-scoring resource
4. If none found, try first fallback option
5. Repeat until match found or all options exhausted
6. Cache result for configured durationScoring Algorithm:
// Example: CLOSEST strategy
function scoreWarehouse(warehouse, execution) {
const destination = execution.metadata.deliveryAddress
const distance = calculateDistance(warehouse.location, destination)
// Normalize distance to 0-100 score (closer = higher)
const maxDistance = 500 // km
const distanceScore = Math.max(0, 100 - (distance / maxDistance * 100))
// Apply capability bonuses
const capabilityBonus = warehouse.capabilities
.filter(cap => execution.requiredCapabilities.includes(cap))
.length * 5
return Math.min(100, distanceScore + capabilityBonus)
}4. Pipeline Executions
Purpose: Runtime instances processing real data
Schema:
interface PipelineExecution {
id: string
pipelineId: string
// Current state
status: 'PENDING' | 'RUNNING' | 'SUCCESS' | 'FAILED' | 'TIMEOUT'
currentStageIndex: number
stages: StageExecution[]
// Execution context
context: Context
metadata: Record<string, any>
// Results
results: Record<string, any>
error?: ExecutionError
// Performance
startedAt: Timestamp
completedAt?: Timestamp
duration?: number
// Retry management
retryAttempts: number
nextRetryAt?: Timestamp
}
interface StageExecution extends Stage {
status: StageStatus
startedAt?: Timestamp
completedAt?: Timestamp
result?: any
error?: StageError
attempts: number
}Execution Flow:
1. Create execution from pipeline
2. Queue initial stage transition
3. Transition Worker picks up transition
4. Execute stage action
5. Evaluate transition conditions
6. Queue next stage or complete
7. Monitoring Worker checks for timeouts
8. Health Worker validates pipeline health5. Worker System
Purpose: Background processing for autonomous operation
Worker Manager
Orchestrates all workers for a workspace:
class WorkerManager {
private workers: Map<string, BaseWorker>
register(worker: BaseWorker): void
unregister(workerId: string): void
startAll(): void
stopAll(): Promise<void>
getStats(): WorkerStats
}Base Worker
Foundation for all worker types:
abstract class BaseWorker {
protected config: WorkerConfig
protected intervalHandle?: NodeJS.Timeout
abstract execute(): Promise<void>
start(): void {
this.intervalHandle = setInterval(
() => this.execute(),
this.config.intervalMs
)
}
stop(): Promise<void>
getStats(): MonitoringStats
}Transition Worker
Responsibility: Execute stage transitions
Interval: 30 seconds
Process:
async execute() {
// 1. Fetch pending transitions
const transitions = await fetchQueuedTransitions()
for (const transition of transitions) {
try {
// 2. Load execution context
const execution = await loadExecution(transition.executionId)
// 3. Execute stage action
const result = await executeStageAction(
execution.stages[execution.currentStageIndex]
)
// 4. Evaluate transition
const nextStage = evaluateTransition(result, transition)
// 5. Update execution state
await updateExecution(execution, result, nextStage)
// 6. Queue next transition if needed
if (nextStage) {
await queueTransition(execution.id, nextStage)
}
this.stats.transitionsProcessed++
} catch (error) {
await handleTransitionError(transition, error)
this.stats.transitionErrors++
}
}
}Monitoring Worker
Responsibility: Detect timeouts and stalled stages
Interval: 60 seconds
Process:
async execute() {
const stats = {
activeExecutions: 0,
timedOutStages: 0,
stalledStages: 0
}
// Check for timed-out stages
await checkForTimedOutStages(this.App, this.context, stats)
// Check for stalled stages
await checkForStalledStages(this.App, this.context, stats)
this.updateStats(stats)
}Timeout Detection:
function checkStageTimeout(execution, stage) {
const elapsed = Date.now() - stage.startedAt
const timeout = stage.timeout || execution.pipeline.config.timeout || 3600000
return elapsed > timeout
}Health Worker
Responsibility: Continuous testing and health scoring
Interval: 300 seconds (5 minutes)
Process:
async execute() {
const stats = {
pipelinesChecked: 0,
healthScoresCalculated: 0,
statusChanges: 0,
alertsGenerated: 0
}
// Run continuous simulations and health checks
await runContinuousTests(this.App, this.context, stats)
this.updateStats(stats)
}Health Score Calculation:
function calculateHealthScore(pipeline, executions) {
const recentExecutions = executions.filter(
e => e.completedAt > Date.now() - 86400000 // Last 24h
)
if (recentExecutions.length === 0) return 100
// Success rate (40% weight)
const successRate = recentExecutions.filter(e => e.status === 'SUCCESS').length
/ recentExecutions.length * 40
// Average duration vs baseline (30% weight)
const avgDuration = average(recentExecutions.map(e => e.duration))
const durationScore = Math.max(0, 30 - (avgDuration / pipeline.baseline.duration - 1) * 30)
// Error rate (20% weight)
const errorRate = recentExecutions.filter(e => e.error).length / recentExecutions.length
const errorScore = Math.max(0, 20 - errorRate * 20)
// Simulation success (10% weight)
const simulationScore = pipeline.lastSimulation?.success ? 10 : 0
return Math.min(100, successRate + durationScore + errorScore + simulationScore)
}Webhook Worker
Responsibility: Process webhook queue
Interval: 10 seconds
Process:
async execute() {
const webhooks = await fetchQueuedWebhooks()
for (const webhook of webhooks) {
try {
const response = await sendWebhook(webhook)
await markWebhookDelivered(webhook.id, response)
this.stats.webhooksDelivered++
} catch (error) {
if (webhook.retries < webhook.maxRetries) {
await requeueWebhook(webhook, error)
this.stats.webhookRetries++
} else {
await markWebhookFailed(webhook.id, error)
this.stats.webhookFailures++
}
}
}
}6. Transition System
Purpose: Manage stage progression logic
Transition Types:
interface Transition {
type: TransitionType
next?: string // Stage name
condition?: TransitionCondition
retries?: number
retryDelay?: number
}
type TransitionType =
| 'CONTINUE' // Proceed to next stage
| 'SKIP' // Jump to specified stage
| 'RETRY' // Re-execute current stage
| 'FAIL' // Mark execution as failed
| 'COMPLETE' // End execution successfullyCondition Evaluation:
interface TransitionCondition {
field: string // e.g., "result.available"
operator: ConditionOperator
value: any
logic?: 'AND' | 'OR'
conditions?: TransitionCondition[] // Nested conditions
}
type ConditionOperator =
| 'EQUALS'
| 'NOT_EQUALS'
| 'GREATER_THAN'
| 'LESS_THAN'
| 'CONTAINS'
| 'EXISTS'Evaluation Engine:
function evaluateCondition(condition, context) {
const fieldValue = getNestedValue(context, condition.field)
switch (condition.operator) {
case 'EQUALS':
return fieldValue === condition.value
case 'GREATER_THAN':
return fieldValue > condition.value
case 'CONTAINS':
return Array.isArray(fieldValue)
? fieldValue.includes(condition.value)
: String(fieldValue).includes(condition.value)
case 'EXISTS':
return fieldValue !== undefined && fieldValue !== null
// ... other operators
}
// Handle nested conditions
if (condition.conditions) {
const results = condition.conditions.map(c => evaluateCondition(c, context))
return condition.logic === 'AND'
? results.every(r => r)
: results.some(r => r)
}
}Data Flow
Pipeline Execution Lifecycle
┌─────────────────────────────────────────────────────────────┐
│ Execution Lifecycle │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. CREATE EXECUTION │
│ ├─ Load pipeline definition │
│ ├─ Initialize execution context │
│ ├─ Set metadata and parameters │
│ └─ Queue first stage transition │
│ │
│ 2. STAGE EXECUTION (Loop) │
│ ├─ Transition Worker picks up queued transition │
│ ├─ Load current stage definition │
│ ├─ Execute stage action: │
│ │ ├─ QUERY: Search and select resource │
│ │ ├─ WEBHOOK: Send HTTP request │
│ │ └─ WAIT: Check condition or timer │
│ ├─ Store stage result │
│ ├─ Evaluate transition conditions │
│ ├─ Determine next stage │
│ └─ Queue next transition or complete │
│ │
│ 3. MONITORING (Parallel) │
│ ├─ Monitoring Worker checks for timeouts │
│ ├─ Handle timed-out stages │
│ └─ Detect stalled executions │
│ │
│ 4. COMPLETION │
│ ├─ Mark execution status (SUCCESS/FAILED) │
│ ├─ Calculate total duration │
│ ├─ Update pipeline metrics │
│ └─ Trigger completion webhooks │
│ │
│ 5. HEALTH TRACKING (Background) │
│ ├─ Health Worker calculates score │
│ ├─ Run periodic simulations │
│ ├─ Generate alerts if degraded │
│ └─ Update pipeline status │
│ │
└─────────────────────────────────────────────────────────────┘Scalability Design
Horizontal Scaling
De. Pipelines support horizontal scaling through workspace sharding:
// Each service instance handles a subset of workspaces
const INSTANCE_ID = process.env.INSTANCE_ID || 0
const TOTAL_INSTANCES = process.env.TOTAL_INSTANCES || 1
// Shard workspaces by ID modulo
const workspaces = await db.find({
status: 'active',
$expr: {
$eq: [
{ $mod: [{ $toInt: '$wid' }, TOTAL_INSTANCES] },
INSTANCE_ID
]
}
})
// Initialize workers only for this shard
for (const workspace of workspaces) {
initializePipelineWorkers(App, workspace.context)
}Scaling Strategy:
- 1-1,000 workspaces: Single instance
- 1,000-10,000: 10 instances (1,000 each)
- 10,000-100,000: 100 instances (1,000 each)
- 100,000+: Dynamic auto-scaling
Performance Optimization
Query Caching:
// Cache query results to reduce database load
const cacheKey = `query:${resource}:${hash(filters)}`
const cached = await cache.get(cacheKey)
if (cached && Date.now() - cached.timestamp < cacheDuration) {
return cached.result
}
const result = await executeQuery(resource, filters)
await cache.set(cacheKey, { result, timestamp: Date.now() })Batch Processing:
// Process multiple transitions in parallel
const transitions = await fetchQueuedTransitions(batchSize: 100)
await Promise.all(transitions.map(t => processTransition(t)))Connection Pooling:
// Database connection pools per context
const poolConfig = {
minSize: 10,
maxSize: 100,
acquireTimeout: 30000,
idleTimeout: 300000
}Error Handling
Error Categories
Validation Errors (4xx)
- Invalid pipeline configuration
- Missing required fields
- Resource not found
- Permission denied
Execution Errors (5xx)
- Stage action failure
- Query timeout
- Webhook delivery failure
- Worker crash
System Errors
- Database connection failure
- Memory exhaustion
- Network partition
Error Recovery
// Stage-level retry with exponential backoff
async function executeStageWithRetry(stage, execution) {
const maxRetries = stage.retries || 3
const baseDelay = stage.retryDelay || 1000
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await executeStage(stage, execution)
} catch (error) {
if (attempt === maxRetries - 1) throw error
const delay = baseDelay * Math.pow(2, attempt)
await sleep(delay)
}
}
}Security Considerations
Context Isolation
- Each workspace has isolated pipeline executions
- No cross-workspace data access
- Separate database collections per context
Webhook Security
- HTTPS only for production webhooks
- Request signing for verification
- Timeout limits to prevent hanging
- Rate limiting on external calls
Query Filtering
- Resource access validated against workspace permissions
- Filters sanitized to prevent injection
- Results limited to authorized resources

