Skip to main content

Documentation Index

Fetch the complete documentation index at: https://crystalflow.dev/docs/llms.txt

Use this file to discover all available pages before exploring further.

CrystalFlow’s execution engine provides a powerful event system for monitoring and controlling workflow execution.

Execution Basics

Execute a workflow with the Executor:
import { Executor } from '@crystalflow/core';

const executor = new Executor();
const result = await executor.execute(workflow);

console.log('Status:', result.status);
console.log('Duration:', result.duration, 'ms');

Execution Options

const result = await executor.execute(workflow, {
  timeout: 30000,              // Timeout in milliseconds
  variables: {                 // Global variables
    apiKey: 'your-key',
    environment: 'production'
  },
  abortSignal: controller.signal  // Cancellation signal
});

Event System

The Executor emits events throughout the workflow lifecycle:

Lifecycle Events

1

beforeExecute

Fired before workflow execution starts
2

onNodeStart

Fired when each node starts executing
3

onNodeComplete

Fired when each node completes successfully
4

onNodeError

Fired when a node encounters an error
5

afterExecute

Fired after workflow execution completes

Event Listeners

Before Execution

executor.on('beforeExecute', (context) => {
  console.log('Starting execution:', context.executionId);
  console.log('Workflow:', context.workflowId);
  console.log('Variables:', context.variables);
});

Node Start

executor.on('onNodeStart', (nodeId, node) => {
  console.log(`Executing node: ${nodeId}`);
  console.log(`Type: ${node.type}`);
  console.log(`Label: ${node.label}`);
});

Node Complete

executor.on('onNodeComplete', (nodeId, result) => {
  console.log(`Node ${nodeId} completed`);
  console.log('Outputs:', result.outputs);
  console.log('Duration:', result.duration, 'ms');
});

Node Error

executor.on('onNodeError', (nodeId, error) => {
  console.error(`Node ${nodeId} failed:`, error.message);
  // Optionally handle specific errors
  if (error instanceof ValidationError) {
    console.error('Validation failed');
  }
});

After Execution

executor.on('afterExecute', (result) => {
  console.log('Execution complete');
  console.log('Status:', result.status);
  console.log('Total duration:', result.duration, 'ms');
  
  if (result.status === 'success') {
    console.log('All nodes executed successfully');
  } else if (result.status === 'failed') {
    console.error('Execution failed:', result.error);
  }
});

General Error Handler

executor.on('onError', (error) => {
  console.error('Workflow execution error:', error);
  // Send to error tracking service
  // sentryClient.captureException(error);
});

Async Event Handlers

All event listeners support async functions - the executor waits for them:
executor.on('onNodeComplete', async (nodeId, result) => {
  // Executor waits for these async operations
  await saveToDatabase(nodeId, result);
  await sendWebhook({
    nodeId,
    outputs: result.outputs,
    duration: result.duration
  });
  console.log('Node result persisted');
});

executor.on('beforeExecute', async (context) => {
  // Load configuration from API
  const config = await fetchConfiguration();
  context.variables = { ...context.variables, ...config };
});

Execution Result

The execution returns a detailed result object:
interface ExecutionResult {
  id: string;                    // Unique execution ID
  workflowId: string;            // Workflow identifier
  status: ExecutionStatus;       // 'success' | 'failed' | 'cancelled'
  startTime: Date;               // Start timestamp
  endTime?: Date;                // End timestamp
  duration?: number;             // Duration in ms
  nodeResults: Map<string, NodeExecutionResult>;  // Results per node
  error?: Error;                 // Error if failed
  cancellationReason?: string;   // Reason if cancelled
  cancelledAt?: Date;            // Cancellation timestamp
}

Accessing Node Results

const result = await executor.execute(workflow);

if (result.status === 'success') {
  // Iterate through all node results
  result.nodeResults.forEach((nodeResult, nodeId) => {
    console.log(`${nodeId}:`);
    console.log('  State:', nodeResult.state);
    console.log('  Outputs:', nodeResult.outputs);
    console.log('  Duration:', nodeResult.duration, 'ms');
  });
}

Complete Example

import { Executor, ExecutionError, NodeExecutionError } from '@crystalflow/core';

async function executeWorkflow(workflow) {
  const executor = new Executor();
  
  // Set up all event listeners
  executor.on('beforeExecute', (context) => {
    console.log(`\n=== Starting Workflow Execution ===`);
    console.log(`Execution ID: ${context.executionId}`);
    console.log(`Workflow ID: ${context.workflowId}\n`);
  });
  
  executor.on('onNodeStart', (nodeId, node) => {
    console.log(`▶ Executing: ${node.type} (${nodeId})`);
  });
  
  executor.on('onNodeComplete', async (nodeId, result) => {
    console.log(`✓ Completed: ${nodeId} in ${result.duration}ms`);
    console.log(`  Outputs:`, result.outputs);
    
    // Persist results
    await saveNodeResult(nodeId, result);
  });
  
  executor.on('onNodeError', (nodeId, error) => {
    console.error(`✗ Failed: ${nodeId}`);
    console.error(`  Error: ${error.message}`);
  });
  
  executor.on('afterExecute', (result) => {
    console.log(`\n=== Execution Complete ===`);
    console.log(`Status: ${result.status}`);
    console.log(`Total Duration: ${result.duration}ms`);
    console.log(`Nodes Executed: ${result.nodeResults.size}`);
  });
  
  executor.on('onError', (error) => {
    console.error('FATAL ERROR:', error.message);
  });
  
  // Execute with options
  try {
    const result = await executor.execute(workflow, {
      timeout: 60000,
      variables: {
        apiKey: process.env.API_KEY,
        environment: process.env.NODE_ENV
      }
    });
    
    return result;
  } catch (error) {
    if (error instanceof NodeExecutionError) {
      console.error(`Node ${error.nodeId} failed in execution ${error.executionId}`);
    } else if (error instanceof ExecutionError) {
      console.error(`Workflow execution failed:`, error.message);
    }
    throw error;
  }
}

Cancellation Events

Cancellation events: The onCancellation event is available for tracking cancelled executions.
executor.on('onCancellation', (executionId, reason, cancelledAt) => {
  console.log(`Execution ${executionId} cancelled`);
  console.log(`Reason: ${reason}`);
  console.log(`Cancelled at: ${cancelledAt}`);
});

// Cancel an execution
executor.cancel(executionId, CancellationReason.UserCancelled);

Progress Tracking

Track execution progress with event counts:
let totalNodes = 0;
let completedNodes = 0;

executor.on('beforeExecute', (context) => {
  totalNodes = workflow.nodes.size;
  completedNodes = 0;
});

executor.on('onNodeComplete', (nodeId, result) => {
  completedNodes++;
  const progress = (completedNodes / totalNodes) * 100;
  console.log(`Progress: ${progress.toFixed(1)}%`);
});

UI Integration

Integrate events with React UI:
import React, { useState } from 'react';
import { Executor } from '@crystalflow/core';

function WorkflowExecutor({ workflow }) {
  const [status, setStatus] = useState('idle');
  const [progress, setProgress] = useState(0);
  const [nodeStates, setNodeStates] = useState({});
  
  const executeWorkflow = async () => {
    const executor = new Executor();
    
    executor.on('beforeExecute', () => {
      setStatus('running');
      setProgress(0);
    });
    
    executor.on('onNodeStart', (nodeId) => {
      setNodeStates(prev => ({
        ...prev,
        [nodeId]: 'running'
      }));
    });
    
    executor.on('onNodeComplete', (nodeId, result) => {
      setNodeStates(prev => ({
        ...prev,
        [nodeId]: 'success'
      }));
      setProgress(prev => prev + (100 / workflow.nodes.size));
    });
    
    executor.on('onNodeError', (nodeId) => {
      setNodeStates(prev => ({
        ...prev,
        [nodeId]: 'error'
      }));
    });
    
    executor.on('afterExecute', (result) => {
      setStatus(result.status);
    });
    
    await executor.execute(workflow);
  };
  
  return (
    <div>
      <button onClick={executeWorkflow}>Execute</button>
      <div>Status: {status}</div>
      <div>Progress: {progress.toFixed(0)}%</div>
      <div>
        {Object.entries(nodeStates).map(([id, state]) => (
          <div key={id}>{id}: {state}</div>
        ))}
      </div>
    </div>
  );
}

Best Practices

Use onError and onNodeError listeners to handle failures gracefully.
Clean up resources in async event handlers (close connections, clear timers).
Keep event handlers lightweight - don’t block execution with heavy operations.
Use async handlers for database writes, API calls, and file operations.
Use timing data from results to identify slow nodes.

Next Steps

Executor API

Complete Executor API reference

Error Handling

Advanced error handling patterns

Cancellation

Cancelling workflow execution

Execution Engine

Understanding the engine