Documentation Index
Fetch the complete documentation index at: https://crystalflow.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
CrystalFlow’s execution engine provides a powerful event system for monitoring and controlling workflow execution.
Execution Basics
Execute a workflow with the Executor:
import { Executor } from '@crystalflow/core';
const executor = new Executor();
const result = await executor.execute(workflow);
console.log('Status:', result.status);
console.log('Duration:', result.duration, 'ms');
Execution Options
const result = await executor.execute(workflow, {
timeout: 30000, // Timeout in milliseconds
variables: { // Global variables
apiKey: 'your-key',
environment: 'production'
},
abortSignal: controller.signal // Cancellation signal
});
Event System
The Executor emits events throughout the workflow lifecycle:
Lifecycle Events
beforeExecute
Fired before workflow execution starts
onNodeStart
Fired when each node starts executing
onNodeComplete
Fired when each node completes successfully
onNodeError
Fired when a node encounters an error
afterExecute
Fired after workflow execution completes
Event Listeners
Before Execution
executor.on('beforeExecute', (context) => {
console.log('Starting execution:', context.executionId);
console.log('Workflow:', context.workflowId);
console.log('Variables:', context.variables);
});
Node Start
executor.on('onNodeStart', (nodeId, node) => {
console.log(`Executing node: ${nodeId}`);
console.log(`Type: ${node.type}`);
console.log(`Label: ${node.label}`);
});
Node Complete
executor.on('onNodeComplete', (nodeId, result) => {
console.log(`Node ${nodeId} completed`);
console.log('Outputs:', result.outputs);
console.log('Duration:', result.duration, 'ms');
});
Node Error
executor.on('onNodeError', (nodeId, error) => {
console.error(`Node ${nodeId} failed:`, error.message);
// Optionally handle specific errors
if (error instanceof ValidationError) {
console.error('Validation failed');
}
});
After Execution
executor.on('afterExecute', (result) => {
console.log('Execution complete');
console.log('Status:', result.status);
console.log('Total duration:', result.duration, 'ms');
if (result.status === 'success') {
console.log('All nodes executed successfully');
} else if (result.status === 'failed') {
console.error('Execution failed:', result.error);
}
});
General Error Handler
executor.on('onError', (error) => {
console.error('Workflow execution error:', error);
// Send to error tracking service
// sentryClient.captureException(error);
});
Async Event Handlers
All event listeners support async functions - the executor waits for them:
executor.on('onNodeComplete', async (nodeId, result) => {
// Executor waits for these async operations
await saveToDatabase(nodeId, result);
await sendWebhook({
nodeId,
outputs: result.outputs,
duration: result.duration
});
console.log('Node result persisted');
});
executor.on('beforeExecute', async (context) => {
// Load configuration from API
const config = await fetchConfiguration();
context.variables = { ...context.variables, ...config };
});
Execution Result
The execution returns a detailed result object:
interface ExecutionResult {
id: string; // Unique execution ID
workflowId: string; // Workflow identifier
status: ExecutionStatus; // 'success' | 'failed' | 'cancelled'
startTime: Date; // Start timestamp
endTime?: Date; // End timestamp
duration?: number; // Duration in ms
nodeResults: Map<string, NodeExecutionResult>; // Results per node
error?: Error; // Error if failed
cancellationReason?: string; // Reason if cancelled
cancelledAt?: Date; // Cancellation timestamp
}
Accessing Node Results
const result = await executor.execute(workflow);
if (result.status === 'success') {
// Iterate through all node results
result.nodeResults.forEach((nodeResult, nodeId) => {
console.log(`${nodeId}:`);
console.log(' State:', nodeResult.state);
console.log(' Outputs:', nodeResult.outputs);
console.log(' Duration:', nodeResult.duration, 'ms');
});
}
Complete Example
import { Executor, ExecutionError, NodeExecutionError } from '@crystalflow/core';
async function executeWorkflow(workflow) {
const executor = new Executor();
// Set up all event listeners
executor.on('beforeExecute', (context) => {
console.log(`\n=== Starting Workflow Execution ===`);
console.log(`Execution ID: ${context.executionId}`);
console.log(`Workflow ID: ${context.workflowId}\n`);
});
executor.on('onNodeStart', (nodeId, node) => {
console.log(`▶ Executing: ${node.type} (${nodeId})`);
});
executor.on('onNodeComplete', async (nodeId, result) => {
console.log(`✓ Completed: ${nodeId} in ${result.duration}ms`);
console.log(` Outputs:`, result.outputs);
// Persist results
await saveNodeResult(nodeId, result);
});
executor.on('onNodeError', (nodeId, error) => {
console.error(`✗ Failed: ${nodeId}`);
console.error(` Error: ${error.message}`);
});
executor.on('afterExecute', (result) => {
console.log(`\n=== Execution Complete ===`);
console.log(`Status: ${result.status}`);
console.log(`Total Duration: ${result.duration}ms`);
console.log(`Nodes Executed: ${result.nodeResults.size}`);
});
executor.on('onError', (error) => {
console.error('FATAL ERROR:', error.message);
});
// Execute with options
try {
const result = await executor.execute(workflow, {
timeout: 60000,
variables: {
apiKey: process.env.API_KEY,
environment: process.env.NODE_ENV
}
});
return result;
} catch (error) {
if (error instanceof NodeExecutionError) {
console.error(`Node ${error.nodeId} failed in execution ${error.executionId}`);
} else if (error instanceof ExecutionError) {
console.error(`Workflow execution failed:`, error.message);
}
throw error;
}
}
Cancellation Events
Cancellation events: The onCancellation event is available for tracking cancelled executions.
executor.on('onCancellation', (executionId, reason, cancelledAt) => {
console.log(`Execution ${executionId} cancelled`);
console.log(`Reason: ${reason}`);
console.log(`Cancelled at: ${cancelledAt}`);
});
// Cancel an execution
executor.cancel(executionId, CancellationReason.UserCancelled);
Progress Tracking
Track execution progress with event counts:
let totalNodes = 0;
let completedNodes = 0;
executor.on('beforeExecute', (context) => {
totalNodes = workflow.nodes.size;
completedNodes = 0;
});
executor.on('onNodeComplete', (nodeId, result) => {
completedNodes++;
const progress = (completedNodes / totalNodes) * 100;
console.log(`Progress: ${progress.toFixed(1)}%`);
});
UI Integration
Integrate events with React UI:
import React, { useState } from 'react';
import { Executor } from '@crystalflow/core';
function WorkflowExecutor({ workflow }) {
const [status, setStatus] = useState('idle');
const [progress, setProgress] = useState(0);
const [nodeStates, setNodeStates] = useState({});
const executeWorkflow = async () => {
const executor = new Executor();
executor.on('beforeExecute', () => {
setStatus('running');
setProgress(0);
});
executor.on('onNodeStart', (nodeId) => {
setNodeStates(prev => ({
...prev,
[nodeId]: 'running'
}));
});
executor.on('onNodeComplete', (nodeId, result) => {
setNodeStates(prev => ({
...prev,
[nodeId]: 'success'
}));
setProgress(prev => prev + (100 / workflow.nodes.size));
});
executor.on('onNodeError', (nodeId) => {
setNodeStates(prev => ({
...prev,
[nodeId]: 'error'
}));
});
executor.on('afterExecute', (result) => {
setStatus(result.status);
});
await executor.execute(workflow);
};
return (
<div>
<button onClick={executeWorkflow}>Execute</button>
<div>Status: {status}</div>
<div>Progress: {progress.toFixed(0)}%</div>
<div>
{Object.entries(nodeStates).map(([id, state]) => (
<div key={id}>{id}: {state}</div>
))}
</div>
</div>
);
}
Best Practices
Use onError and onNodeError listeners to handle failures gracefully.
Clean up resources in async event handlers (close connections, clear timers).
Keep event handlers lightweight - don’t block execution with heavy operations.
Use async handlers for database writes, API calls, and file operations.
Use timing data from results to identify slow nodes.
Next Steps
Executor API
Complete Executor API reference
Error Handling
Advanced error handling patterns
Cancellation
Cancelling workflow execution
Execution Engine
Understanding the engine