Skip to main content
CrystalFlow provides comprehensive cancellation support for stopping workflow execution when needed.

Cancellation Mechanisms

Timeout

Automatic cancellation after time limit

User-Initiated

Manual cancellation via API

External Signal

AbortController/AbortSignal support

Mid-Node

Check cancellation during long operations

Timeout-Based Cancellation

Set a timeout when executing:
import { Executor, TimeoutError } from '@crystalflow/core';

const executor = new Executor();

try {
  const result = await executor.execute(workflow, {
    timeout: 30000  // 30 seconds
  });
} catch (error) {
  if (error instanceof TimeoutError) {
    console.log(`Execution timed out after ${error.timeout}ms`);
    console.log(`Execution ID: ${error.executionId}`);
  }
}

User-Initiated Cancellation

Cancel an execution manually:
import { Executor, CancellationReason } from '@crystalflow/core';

const executor = new Executor();

// Start execution
const executionPromise = executor.execute(workflow);

// Get execution ID from context
let executionId: string;
executor.on('beforeExecute', (context) => {
  executionId = context.executionId;
});

// Cancel after 5 seconds
setTimeout(() => {
  executor.cancel(executionId, CancellationReason.UserCancelled);
}, 5000);

try {
  await executionPromise;
} catch (error) {
  if (error instanceof UserCancelledError) {
    console.log('Execution was cancelled by user');
  }
}

External Signal Cancellation

Use AbortController for external cancellation:
const executor = new Executor();
const controller = new AbortController();

// Start execution with abort signal
const executionPromise = executor.execute(workflow, {
  abortSignal: controller.signal
});

// Cancel from outside
setTimeout(() => {
  controller.abort();
}, 5000);

try {
  await executionPromise;
} catch (error) {
  if (error instanceof CancellationError) {
    console.log('Execution was cancelled');
  }
}

Cancellation Reasons

Timeout
CancellationReason
Execution exceeded timeout limit
UserCancelled
CancellationReason
User explicitly cancelled via API
ExternalSignal
CancellationReason
Cancelled via AbortController/AbortSignal
ResourceLimit
CancellationReason
Resource constraints (reserved for future use)

Mid-Node Cancellation

Nodes can check for cancellation during long operations:
@defineNode({
  type: 'data.process',
  label: 'Process Data',
  category: 'Data'
})
class ProcessDataNode extends Node {
  @Input({ type: 'any[]', label: 'Items' })
  items: any[] = [];

  @Output({ type: 'any[]', label: 'Processed' })
  processed: any[];

  async execute() {
    const results = [];

    for (let i = 0; i < this.items.length; i++) {
      // Check if execution was cancelled
      this.checkCancellation();

      // Process item
      const result = await this.processItem(this.items[i]);
      results.push(result);
    }

    this.processed = results;
  }

  private async processItem(item: any) {
    // Long-running operation
    await this.sleep(1000);
    return item * 2;
  }

  private sleep(ms: number) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

Cancellation Events

Listen to cancellation events:
executor.on('onCancellation', (executionId, reason, cancelledAt) => {
  console.log(`Execution ${executionId} cancelled`);
  console.log(`Reason: ${reason}`);
  console.log(`Cancelled at: ${cancelledAt.toISOString()}`);
  
  // Clean up resources
  cleanupExecution(executionId);
});

Cancel All Executions

Cancel all active executions:
executor.cancelAll(CancellationReason.UserCancelled);

Check Active Executions

Check if an execution is still running:
if (executor.isExecutionActive(executionId)) {
  console.log('Execution is still running');
  executor.cancel(executionId);
} else {
  console.log('Execution has completed');
}

React Integration

Implement cancellation in React components:
import React, { useState, useRef } from 'react';
import { Executor, CancellationReason } from '@crystalflow/core';

function WorkflowExecutor({ workflow }) {
  const [isExecuting, setIsExecuting] = useState(false);
  const executionIdRef = useRef<string | null>(null);
  const executorRef = useRef(new Executor());

  const handleExecute = async () => {
    setIsExecuting(true);
    const executor = executorRef.current;

    executor.on('beforeExecute', (context) => {
      executionIdRef.current = context.executionId;
    });

    try {
      await executor.execute(workflow, { timeout: 60000 });
      console.log('Execution completed');
    } catch (error) {
      console.error('Execution failed or cancelled:', error);
    } finally {
      setIsExecuting(false);
      executionIdRef.current = null;
    }
  };

  const handleCancel = () => {
    if (executionIdRef.current) {
      executorRef.current.cancel(
        executionIdRef.current,
        CancellationReason.UserCancelled
      );
    }
  };

  return (
    <div>
      <button onClick={handleExecute} disabled={isExecuting}>
        Execute
      </button>
      <button onClick={handleCancel} disabled={!isExecuting}>
        Cancel
      </button>
      {isExecuting && <div>Executing...</div>}
    </div>
  );
}

Graceful Shutdown

Implement graceful shutdown with cleanup:
class DataFetchNode extends Node {
  private abortController?: AbortController;

  async execute() {
    this.abortController = new AbortController();

    try {
      const response = await fetch(this.url, {
        signal: this.abortController.signal
      });
      this.data = await response.json();
    } catch (error) {
      if (error.name === 'AbortError') {
        console.log('Fetch was cancelled');
      }
      throw error;
    }
  }

  // Called during cancellation
  cleanup() {
    if (this.abortController) {
      this.abortController.abort();
    }
  }
}
Node Cleanup: The cleanup() method for node-level cleanup is planned for a future release.

Best Practices

Always set timeouts to prevent workflows from running indefinitely.
For nodes with loops, call checkCancellation() in each iteration.
Cancel network requests, close connections, and clear timers when cancelled.
Show cancellation status and allow users to cancel long operations.
Catch cancellation errors and clean up appropriately.

Complete Example

import {
  Executor,
  CancellationReason,
  CancellationError,
  TimeoutError
} from '@crystalflow/core';

async function executeWithCancellation(workflow, maxDuration = 60000) {
  const executor = new Executor();
  const controller = new AbortController();
  let executionId: string;

  // Track execution ID
  executor.on('beforeExecute', (context) => {
    executionId = context.executionId;
    console.log(`Started execution: ${executionId}`);
  });

  // Listen to cancellation
  executor.on('onCancellation', (id, reason, cancelledAt) => {
    console.log(`Execution cancelled: ${reason}`);
  });

  // Set up manual cancellation after max duration
  const timeoutHandle = setTimeout(() => {
    if (executor.isExecutionActive(executionId)) {
      console.log('Manually cancelling due to max duration');
      executor.cancel(executionId, CancellationReason.UserCancelled);
    }
  }, maxDuration);

  try {
    const result = await executor.execute(workflow, {
      timeout: maxDuration,
      abortSignal: controller.signal
    });

    console.log('Execution completed successfully');
    return result;

  } catch (error) {
    if (error instanceof TimeoutError) {
      console.error('Execution timed out');
    } else if (error instanceof CancellationError) {
      console.log('Execution was cancelled');
    } else {
      console.error('Execution failed:', error);
    }
    throw error;

  } finally {
    clearTimeout(timeoutHandle);
  }
}

// Usage
try {
  await executeWithCancellation(workflow, 30000);
} catch (error) {
  console.error('Failed:', error);
}

Next Steps