Skip to main content
The Execution Engine is responsible for running workflows, managing node execution, propagating data, and handling errors.

Overview

CrystalFlow’s execution engine provides:

Plan-Based Execution

Build execution plan, then execute with full control

Conditional Flow

Support for if/else and switch/case branching

Event System

Comprehensive events for monitoring execution

Error Handling

Graceful error handling with detailed context

Cancellation

Support for cancelling long-running executions

The Executor

The Executor class orchestrates workflow execution:
import { Executor } from '@crystalflow/core';

const executor = new Executor();
const result = await executor.execute(workflow);

Execution Options

const result = await executor.execute(workflow, {
  timeout: 30000,                    // Timeout in milliseconds
  variables: { apiKey: 'xxx' },      // Global variables
  abortSignal: controller.signal     // Cancellation signal
});
timeout
number
Maximum execution time in milliseconds (default: no timeout)
variables
Record<string, any>
Global variables accessible to all nodes during execution
abortSignal
AbortSignal
Optional AbortSignal for cancelling execution

Execution Plan Architecture

CrystalFlow uses a plan-based execution architecture that enables advanced control flow patterns:

How It Works

  1. Build Phase: The workflow graph is analyzed and converted into an execution plan
  2. Execution Phase: The plan is executed step-by-step with full control
// Simplified internal flow
const plan = planBuilder.build(workflow);
const result = await planExecutor.execute(plan, workflow, context);

Benefits

Conditional Execution

Native support for if/else, switch/case branching

Loop Support

Future support for for/while loops (planned)

Debugging

Step-by-step debugging with breakpoints (planned)

Inspectability

Full visibility into execution plan structure

Step Types

The execution plan consists of different step types:
  • NodeExecutionStep: Execute a single node
  • ConditionalStep: Evaluate condition and execute matching branch
  • LoopStep: Execute steps repeatedly (planned)
  • ParallelStep: Execute multiple steps in parallel (planned)
The plan-based architecture is transparent to users - workflows execute smoothly while supporting advanced control flow patterns like conditionals and future loop support.
For a deep dive into the architecture, see the execution-plan-architecture.md document.

Execution Flow

The execution follows these steps:
1

Validation

Workflow is validated (connections, required inputs)
2

Context Creation

ExecutionContext is created with unique execution ID
3

Build Execution Plan

Workflow graph is analyzed and converted to execution plan with steps and branches
4

Step-by-Step Execution

Each step executes in order:
  • Node Step: Execute node, transfer data, validate, store results
  • Conditional Step: Evaluate condition, select and execute matching branch
  • Check for cancellation between steps
5

Branch Evaluation

When a conditional step is encountered:
  • Execute the conditional node to update its state
  • Call evaluateCondition() to get active branch
  • Execute steps in the matching branch
  • Skip other branches
6

Result Collection

All node results are collected and returned

Event System

The Executor emits events throughout execution:

Before Execution

executor.on('beforeExecute', (context) => {
  console.log('Starting execution:', context.executionId);
});

Node Events

executor.on('onNodeStart', (nodeId, node) => {
  console.log(`Starting node ${nodeId} (${node.type})`);
});

executor.on('onNodeComplete', (nodeId, result) => {
  console.log(`Node ${nodeId} completed:`, result.outputs);
});

executor.on('onNodeError', (nodeId, error) => {
  console.error(`Node ${nodeId} failed:`, error.message);
});

After Execution

executor.on('afterExecute', (result) => {
  console.log('Execution complete:', result.status);
  console.log('Duration:', result.duration, 'ms');
});

Error Events

executor.on('onError', (error) => {
  console.error('Workflow execution failed:', error);
});

Step Events

Monitor execution plan step execution:
executor.on('onStepStart', (stepId, stepType) => {
  console.log(`Starting step ${stepId} of type ${stepType}`);
});

executor.on('onStepComplete', (stepId, stepType) => {
  console.log(`Step ${stepId} completed`);
});

executor.on('onStepError', (stepId, stepType, error) => {
  console.error(`Step ${stepId} failed:`, error);
});
Step types include:
  • ExecutionStepType.Node - Regular node execution
  • ExecutionStepType.Conditional - Conditional branch selection
  • ExecutionStepType.Loop - Loop execution (planned)
  • ExecutionStepType.Parallel - Parallel execution (planned)

Branch Events

Track conditional branch execution:
executor.on('onBranchEnter', (nodeId, branchId, condition) => {
  console.log(`Entering ${condition} branch of ${nodeId}`);
});

executor.on('onBranchExit', (nodeId, branchId) => {
  console.log(`Exiting branch ${branchId} of ${nodeId}`);
});
Example with IfNode:
const executor = new Executor();

executor.on('onBranchEnter', (nodeId, branchId, condition) => {
  // condition will be 'thenOutput' or 'elseOutput'
  console.log(`IfNode ${nodeId} taking ${condition} branch`);
});

executor.on('onBranchExit', (nodeId, branchId) => {
  console.log(`Completed ${branchId} branch of ${nodeId}`);
});

const result = await executor.execute(workflowWithIfNode);
// Output:
// "IfNode if-node-1 taking thenOutput branch"
// "Completed branch-id branch of if-node-1"
Branch events are useful for:
  • Debugging conditional logic
  • Visualizing execution paths
  • Tracking which branches execute in complex workflows
  • Performance profiling of different branches

Async Event Listeners

All event listeners support async functions - the executor waits for them to complete:
executor.on('onNodeComplete', async (nodeId, result) => {
  // Executor waits for async operations
  await saveToDatabase(result);
  await sendWebhook(nodeId);
  console.log('Node result processed');
});

Execution Result

The executor returns an ExecutionResult object:
interface ExecutionResult {
  id: string;                                    // Unique execution ID
  workflowId: string;                            // Workflow identifier
  status: 'success' | 'failed' | 'cancelled';    // Execution status
  startTime: Date;                               // When execution started
  endTime?: Date;                                // When execution ended
  duration?: number;                             // Duration in ms
  nodeResults: Map<string, NodeExecutionResult>; // Results for each node
  error?: Error;                                 // Error if failed
  cancellationReason?: CancellationReason;       // Reason if cancelled
  cancelledAt?: Date;                            // When cancelled
}

Using Results

const result = await executor.execute(workflow);

if (result.status === 'success') {
  // Access node outputs
  result.nodeResults.forEach((nodeResult, nodeId) => {
    console.log(`${nodeId}:`, nodeResult.outputs);
  });
  
  console.log('Total duration:', result.duration, 'ms');
} else {
  console.error('Execution failed:', result.error);
}

Data Propagation

Data flows automatically between connected nodes:
// Node 1 outputs: { result: 42 }
// Connected to Node 2 input 'value'
// Node 2 automatically receives: this.value = 42

How It Works

  1. Source node executes and sets output values
  2. Executor identifies connections from source outputs
  3. Target node inputs are populated with output values
  4. Target node executes with received data

Error Handling

The execution engine provides robust error handling:

Node Execution Errors

import { NodeExecutionError } from '@crystalflow/core';

try {
  const result = await executor.execute(workflow);
} catch (error) {
  if (error instanceof NodeExecutionError) {
    console.error(`Node ${error.nodeId} (${error.nodeType}) failed`);
    console.error('Execution ID:', error.executionId);
    console.error('Workflow ID:', error.workflowId);
    console.error('Cause:', error.cause);
  }
}

Validation Errors

import { ValidationError } from '@crystalflow/core';

try {
  const result = await executor.execute(workflow);
} catch (error) {
  if (error instanceof ValidationError) {
    console.error('Workflow validation failed:', error.message);
  }
}

Timeout Errors

import { TimeoutError } from '@crystalflow/core';

try {
  const result = await executor.execute(workflow, { timeout: 5000 });
} catch (error) {
  if (error instanceof TimeoutError) {
    console.error('Execution timed out after', error.timeout, 'ms');
  }
}

Cancellation

Workflows can be cancelled during execution:

User-Initiated Cancellation

// Start execution
const promise = executor.execute(workflow);

// Cancel after 2 seconds
setTimeout(() => {
  executor.cancel(executionId, CancellationReason.UserCancelled);
}, 2000);

try {
  await promise;
} catch (error) {
  if (error instanceof UserCancelledError) {
    console.log('User cancelled execution');
  }
}

External Signal Cancellation

const controller = new AbortController();

// Start execution
const promise = executor.execute(workflow, {
  abortSignal: controller.signal
});

// Cancel from external signal
controller.abort();

Timeout-Based Cancellation

try {
  const result = await executor.execute(workflow, {
    timeout: 30000 // 30 seconds
  });
} catch (error) {
  if (error instanceof TimeoutError) {
    console.log('Execution timed out');
  }
}

Mid-Node Cancellation

Nodes can check for cancellation during long operations:
class LongRunningNode extends Node {
  async execute() {
    for (let i = 0; i < 1000; i++) {
      // Check if execution was cancelled
      this.checkCancellation();
      
      await processItem(i);
    }
  }
}

Execution Context

The ExecutionContext provides runtime information to nodes:
class CustomNode extends Node {
  execute() {
    // Access execution context
    const executionId = this.context.executionId;
    const workflowId = this.context.workflowId;
    const variables = this.context.variables;
    
    // Use context data
    console.log(`Executing in ${executionId}`);
  }
}

Context Properties

executionId
string
Unique identifier for this execution
workflowId
string
Identifier of the workflow being executed
variables
Record<string, any>
Global variables passed to execution
startTime
Date
When execution started

Performance Monitoring

Track execution performance with timing data:
const result = await executor.execute(workflow);

console.log('Total execution time:', result.duration, 'ms');

// Individual node timings
result.nodeResults.forEach((nodeResult, nodeId) => {
  console.log(`${nodeId}: ${nodeResult.duration}ms`);
});

Best Practices

Wrap execution in try-catch and handle different error types appropriately.
Use timeouts to prevent workflows from hanging indefinitely.
Use events for logging, monitoring, and debugging execution flow.
For long-running nodes, periodically check for cancellation.
Pass runtime configuration through variables rather than hardcoding.

Example: Complete Execution

import { Executor, CancellationReason } from '@crystalflow/core';

async function runWorkflow(workflow) {
  const executor = new Executor();
  
  // Set up event listeners
  executor.on('beforeExecute', (context) => {
    console.log(`Starting execution ${context.executionId}`);
  });
  
  executor.on('onNodeStart', (nodeId, node) => {
    console.log(`Executing ${node.type}`);
  });
  
  executor.on('onNodeComplete', async (nodeId, result) => {
    // Async event handler - executor waits
    await logToDatabase(nodeId, result);
  });
  
  executor.on('afterExecute', (result) => {
    console.log(`Completed in ${result.duration}ms`);
  });
  
  // Execute with options
  try {
    const result = await executor.execute(workflow, {
      timeout: 60000,
      variables: {
        apiKey: process.env.API_KEY,
        environment: 'production'
      }
    });
    
    if (result.status === 'success') {
      console.log('Workflow succeeded!');
      return result;
    }
  } catch (error) {
    console.error('Execution failed:', error);
    throw error;
  }
}

Next Steps