Skip to main content
The Executor class orchestrates workflow execution with events and state management.

Class: Executor

class Executor extends AsyncEventEmitter {
  // Methods
  execute(workflow: Workflow, options?: ExecutionOptions): Promise<ExecutionResult>;
  cancel(executionId: string, reason?: CancellationReason): void;
  cancelAll(reason?: CancellationReason): void;
  isExecutionActive(executionId: string): boolean;
  
  // Event Methods (inherited from AsyncEventEmitter)
  on(event: string, listener: Function): this;
  once(event: string, listener: Function): this;
  off(event: string, listener: Function): this;
  emit(event: string, ...args: any[]): Promise<boolean>;
}

Methods

execute()

Execute a workflow with options.
execute(workflow: Workflow, options?: ExecutionOptions): Promise<ExecutionResult>;
workflow
Workflow
Workflow instance to execute.
options
ExecutionOptions
Optional execution configuration:
  • timeout?: number - Maximum execution time (ms)
  • variables?: Record<string, any> - Workflow variables
  • abortSignal?: AbortSignal - External cancellation signal
Returns: Promise resolving to ExecutionResult. Example:
import { Executor } from '@crystalflow/core';

const executor = new Executor();

const result = await executor.execute(workflow, {
  timeout: 30000,
  variables: { apiKey: 'xxx' }
});

console.log(result.status); // 'success'
console.log(result.duration); // 1523 (ms)

cancel()

Cancel a specific execution.
cancel(executionId: string, reason?: CancellationReason): void;
executionId
string
Execution ID to cancel.
reason
CancellationReason
Cancellation reason (default: CancellationReason.UserCancelled).
Example:
let executionId: string;

executor.on('beforeExecute', (context) => {
  executionId = context.executionId;
});

// Cancel after 5 seconds
setTimeout(() => {
  executor.cancel(executionId, CancellationReason.UserCancelled);
}, 5000);

cancelAll()

Cancel all active executions.
cancelAll(reason?: CancellationReason): void;
reason
CancellationReason
Cancellation reason (default: CancellationReason.UserCancelled).
Example:
executor.cancelAll(CancellationReason.UserCancelled);

isExecutionActive()

Check if an execution is currently running.
isExecutionActive(executionId: string): boolean;
executionId
string
Execution ID to check.
Returns: true if execution is active, false otherwise. Example:
if (executor.isExecutionActive(executionId)) {
  console.log('Execution is running');
}

Events

The Executor emits events throughout the execution lifecycle. All event listeners can be synchronous or asynchronous.

beforeExecute

Emitted before workflow execution starts.
executor.on('beforeExecute', (context: ExecutionContext) => {
  console.log(`Starting execution: ${context.executionId}`);
});
context
ExecutionContext
Execution context with executionId, workflowId, variables, etc.

afterExecute

Emitted after workflow execution completes.
executor.on('afterExecute', (result: ExecutionResult) => {
  console.log(`Execution ${result.status}: ${result.duration}ms`);
});
result
ExecutionResult
Complete execution result.

onNodeStart

Emitted when a node starts executing.
executor.on('onNodeStart', (nodeId: string, node: Node) => {
  console.log(`Node ${nodeId} starting...`);
});
nodeId
string
Node ID.
node
Node
Node instance.

onNodeComplete

Emitted when a node completes successfully.
executor.on('onNodeComplete', (nodeId: string, result: NodeExecutionResult) => {
  console.log(`Node ${nodeId} completed:`, result.outputs);
});
nodeId
string
Node ID.
result
NodeExecutionResult
Node execution result with outputs, timing, etc.

onNodeError

Emitted when a node fails.
executor.on('onNodeError', (nodeId: string, error: Error) => {
  console.error(`Node ${nodeId} failed:`, error);
});
nodeId
string
Node ID.
error
Error
Error that occurred.

onError

Emitted when workflow execution fails.
executor.on('onError', (error: Error) => {
  console.error('Workflow failed:', error);
});
error
Error
Error that caused workflow failure.

onCancellation

Emitted when execution is cancelled.
executor.on('onCancellation', (
  executionId: string,
  reason: CancellationReason,
  cancelledAt: Date
) => {
  console.log(`Execution ${executionId} cancelled: ${reason}`);
});
executionId
string
Cancelled execution ID.
reason
CancellationReason
Cancellation reason.
cancelledAt
Date
Timestamp of cancellation.

Types

ExecutionOptions

interface ExecutionOptions {
  timeout?: number;
  variables?: Record<string, any>;
  abortSignal?: AbortSignal;
}

ExecutionResult

interface ExecutionResult {
  id: string;
  workflowId: string;
  status: 'success' | 'failed' | 'cancelled';
  startTime: Date;
  endTime?: Date;
  duration?: number;
  nodeResults: Map<string, NodeExecutionResult>;
  error?: Error;
  cancellationReason?: CancellationReason;
  cancelledAt?: Date;
}

NodeExecutionResult

interface NodeExecutionResult {
  nodeId: string;
  state: NodeState;
  outputs: Record<string, any>;
  error?: Error;
  startTime: Date;
  endTime?: Date;
  duration?: number;
}

ExecutionContext

interface ExecutionContext {
  executionId: string;
  workflowId: string;
  startTime: Date;
  variables: Record<string, any>;
  nodeResults: Map<string, NodeExecutionResult>;
}

CancellationReason

enum CancellationReason {
  Timeout = 'timeout',
  UserCancelled = 'user-cancelled',
  ExternalSignal = 'external-signal',
  ResourceLimit = 'resource-limit'
}

Usage Examples

Basic Execution

import { Executor } from '@crystalflow/core';

const executor = new Executor();

const result = await executor.execute(workflow);
console.log(result.status); // 'success'

With Events

const executor = new Executor();

executor.on('beforeExecute', (context) => {
  console.log(`Starting: ${context.executionId}`);
});

executor.on('onNodeStart', (nodeId, node) => {
  console.log(`Executing ${node.type} (${nodeId})`);
});

executor.on('onNodeComplete', (nodeId, result) => {
  console.log(`Completed ${nodeId}:`, result.outputs);
});

executor.on('afterExecute', (result) => {
  console.log(`Finished in ${result.duration}ms`);
});

await executor.execute(workflow);

Async Event Handlers

executor.on('onNodeComplete', async (nodeId, result) => {
  // Executor waits for async handlers
  await saveToDatabase(result);
  await sendWebhook(result);
  console.log('Persisted node result');
});

await executor.execute(workflow);

With Timeout

try {
  const result = await executor.execute(workflow, {
    timeout: 10000 // 10 seconds
  });
} catch (error) {
  if (error instanceof TimeoutError) {
    console.error('Execution timed out');
  }
}

With Cancellation

let executionId: string;

executor.on('beforeExecute', (context) => {
  executionId = context.executionId;
});

executor.on('onCancellation', (id, reason) => {
  console.log(`Cancelled: ${reason}`);
});

// Start execution
const promise = executor.execute(workflow);

// Cancel after 5 seconds
setTimeout(() => {
  executor.cancel(executionId);
}, 5000);

try {
  await promise;
} catch (error) {
  if (error instanceof CancellationError) {
    console.log('Execution was cancelled');
  }
}

With AbortSignal

const controller = new AbortController();

// Start execution
const promise = executor.execute(workflow, {
  abortSignal: controller.signal
});

// Cancel from outside
setTimeout(() => controller.abort(), 5000);

try {
  await promise;
} catch (error) {
  console.error('Cancelled:', error);
}

Progress Tracking

const totalNodes = workflow.nodes.size;
let completedNodes = 0;

executor.on('onNodeStart', (nodeId) => {
  console.log(`Progress: ${completedNodes}/${totalNodes}`);
});

executor.on('onNodeComplete', (nodeId) => {
  completedNodes++;
  const progress = (completedNodes / totalNodes) * 100;
  console.log(`Progress: ${progress.toFixed(0)}%`);
});

await executor.execute(workflow);

Error Recovery

executor.on('onNodeError', async (nodeId, error) => {
  console.error(`Node ${nodeId} failed:`, error);
  
  // Log to external service
  await logError({
    nodeId,
    error: error.message,
    timestamp: new Date()
  });
});

executor.on('onError', async (error) => {
  // Workflow failed - send notification
  await sendAlertEmail({
    subject: 'Workflow Failed',
    body: error.message
  });
});

try {
  await executor.execute(workflow);
} catch (error) {
  // Handle gracefully
  console.log('Execution failed, but errors were logged');
}