CrystalFlow provides comprehensive cancellation support for stopping workflow execution when needed.
Cancellation Mechanisms
Timeout Automatic cancellation after time limit
User-Initiated Manual cancellation via API
External Signal AbortController/AbortSignal support
Mid-Node Check cancellation during long operations
Timeout-Based Cancellation
Set a timeout when executing:
import { Executor , TimeoutError } from '@crystalflow/core' ;
const executor = new Executor ();
try {
const result = await executor . execute ( workflow , {
timeout: 30000 // 30 seconds
});
} catch ( error ) {
if ( error instanceof TimeoutError ) {
console . log ( `Execution timed out after ${ error . timeout } ms` );
console . log ( `Execution ID: ${ error . executionId } ` );
}
}
User-Initiated Cancellation
Cancel an execution manually:
import { Executor , CancellationReason } from '@crystalflow/core' ;
const executor = new Executor ();
// Start execution
const executionPromise = executor . execute ( workflow );
// Get execution ID from context
let executionId : string ;
executor . on ( 'beforeExecute' , ( context ) => {
executionId = context . executionId ;
});
// Cancel after 5 seconds
setTimeout (() => {
executor . cancel ( executionId , CancellationReason . UserCancelled );
}, 5000 );
try {
await executionPromise ;
} catch ( error ) {
if ( error instanceof UserCancelledError ) {
console . log ( 'Execution was cancelled by user' );
}
}
External Signal Cancellation
Use AbortController for external cancellation:
const executor = new Executor ();
const controller = new AbortController ();
// Start execution with abort signal
const executionPromise = executor . execute ( workflow , {
abortSignal: controller . signal
});
// Cancel from outside
setTimeout (() => {
controller . abort ();
}, 5000 );
try {
await executionPromise ;
} catch ( error ) {
if ( error instanceof CancellationError ) {
console . log ( 'Execution was cancelled' );
}
}
Cancellation Reasons
Execution exceeded timeout limit
User explicitly cancelled via API
Cancelled via AbortController/AbortSignal
Resource constraints (reserved for future use)
Mid-Node Cancellation
Nodes can check for cancellation during long operations:
@ defineNode ({
type: 'data.process' ,
label: 'Process Data' ,
category: 'Data'
})
class ProcessDataNode extends Node {
@ Input ({ type: 'any[]' , label: 'Items' })
items : any [] = [];
@ Output ({ type: 'any[]' , label: 'Processed' })
processed : any [];
async execute () {
const results = [];
for ( let i = 0 ; i < this . items . length ; i ++ ) {
// Check if execution was cancelled
this . checkCancellation ();
// Process item
const result = await this . processItem ( this . items [ i ]);
results . push ( result );
}
this . processed = results ;
}
private async processItem ( item : any ) {
// Long-running operation
await this . sleep ( 1000 );
return item * 2 ;
}
private sleep ( ms : number ) {
return new Promise ( resolve => setTimeout ( resolve , ms ));
}
}
Cancellation Events
Listen to cancellation events:
executor . on ( 'onCancellation' , ( executionId , reason , cancelledAt ) => {
console . log ( `Execution ${ executionId } cancelled` );
console . log ( `Reason: ${ reason } ` );
console . log ( `Cancelled at: ${ cancelledAt . toISOString () } ` );
// Clean up resources
cleanupExecution ( executionId );
});
Cancel All Executions
Cancel all active executions:
executor . cancelAll ( CancellationReason . UserCancelled );
Check Active Executions
Check if an execution is still running:
if ( executor . isExecutionActive ( executionId )) {
console . log ( 'Execution is still running' );
executor . cancel ( executionId );
} else {
console . log ( 'Execution has completed' );
}
React Integration
Implement cancellation in React components:
import React , { useState , useRef } from 'react' ;
import { Executor , CancellationReason } from '@crystalflow/core' ;
function WorkflowExecutor ({ workflow }) {
const [ isExecuting , setIsExecuting ] = useState ( false );
const executionIdRef = useRef < string | null >( null );
const executorRef = useRef ( new Executor ());
const handleExecute = async () => {
setIsExecuting ( true );
const executor = executorRef . current ;
executor . on ( 'beforeExecute' , ( context ) => {
executionIdRef . current = context . executionId ;
});
try {
await executor . execute ( workflow , { timeout: 60000 });
console . log ( 'Execution completed' );
} catch ( error ) {
console . error ( 'Execution failed or cancelled:' , error );
} finally {
setIsExecuting ( false );
executionIdRef . current = null ;
}
};
const handleCancel = () => {
if ( executionIdRef . current ) {
executorRef . current . cancel (
executionIdRef . current ,
CancellationReason . UserCancelled
);
}
};
return (
< div >
< button onClick = { handleExecute } disabled = { isExecuting } >
Execute
</ button >
< button onClick = { handleCancel } disabled = { ! isExecuting } >
Cancel
</ button >
{ isExecuting && < div > Executing... </ div > }
</ div >
);
}
Graceful Shutdown
Implement graceful shutdown with cleanup:
class DataFetchNode extends Node {
private abortController ?: AbortController ;
async execute () {
this . abortController = new AbortController ();
try {
const response = await fetch ( this . url , {
signal: this . abortController . signal
});
this . data = await response . json ();
} catch ( error ) {
if ( error . name === 'AbortError' ) {
console . log ( 'Fetch was cancelled' );
}
throw error ;
}
}
// Called during cancellation
cleanup () {
if ( this . abortController ) {
this . abortController . abort ();
}
}
}
Node Cleanup: The cleanup() method for node-level cleanup is planned for a future release.
Best Practices
Always set timeouts to prevent workflows from running indefinitely.
Check Cancellation in Loops
For nodes with loops, call checkCancellation() in each iteration.
Cancel network requests, close connections, and clear timers when cancelled.
Show cancellation status and allow users to cancel long operations.
Handle Cancellation Gracefully
Catch cancellation errors and clean up appropriately.
Complete Example
import {
Executor ,
CancellationReason ,
CancellationError ,
TimeoutError
} from '@crystalflow/core' ;
async function executeWithCancellation ( workflow , maxDuration = 60000 ) {
const executor = new Executor ();
const controller = new AbortController ();
let executionId : string ;
// Track execution ID
executor . on ( 'beforeExecute' , ( context ) => {
executionId = context . executionId ;
console . log ( `Started execution: ${ executionId } ` );
});
// Listen to cancellation
executor . on ( 'onCancellation' , ( id , reason , cancelledAt ) => {
console . log ( `Execution cancelled: ${ reason } ` );
});
// Set up manual cancellation after max duration
const timeoutHandle = setTimeout (() => {
if ( executor . isExecutionActive ( executionId )) {
console . log ( 'Manually cancelling due to max duration' );
executor . cancel ( executionId , CancellationReason . UserCancelled );
}
}, maxDuration );
try {
const result = await executor . execute ( workflow , {
timeout: maxDuration ,
abortSignal: controller . signal
});
console . log ( 'Execution completed successfully' );
return result ;
} catch ( error ) {
if ( error instanceof TimeoutError ) {
console . error ( 'Execution timed out' );
} else if ( error instanceof CancellationError ) {
console . log ( 'Execution was cancelled' );
} else {
console . error ( 'Execution failed:' , error );
}
throw error ;
} finally {
clearTimeout ( timeoutHandle );
}
}
// Usage
try {
await executeWithCancellation ( workflow , 30000 );
} catch ( error ) {
console . error ( 'Failed:' , error );
}
Next Steps