Skip to main content
A Workflow is a collection of nodes connected together to perform a series of operations. Workflows define the flow of data between nodes and the execution order.

What is a Workflow?

Think of a workflow as a graph where:
  • Nodes are the vertices (operations)
  • Connections are the edges (data flow)
  • Execution follows the topological order
┌──────────┐     ┌──────────┐     ┌──────────┐
│  Input   │────▶│ Process  │────▶│  Output  │
└──────────┘     └──────────┘     └──────────┘

Creating a Workflow

import { Workflow } from '@crystalflow/core';
import { AddNode, MultiplyNode } from './nodes';

const workflow = new Workflow();

// Add nodes
const node1 = workflow.addNode(AddNode, {
  position: { x: 100, y: 100 }
});

const node2 = workflow.addNode(MultiplyNode, {
  position: { x: 300, y: 100 }
});

// Connect nodes
workflow.connect(
  node1.id,        // Source node ID
  'result',        // Output port name
  node2.id,        // Target node ID
  'a'              // Input port name
);

Workflow Properties

id
string
Unique identifier for the workflow
name
string
Human-readable name for the workflow
nodes
Map<string, Node>
Collection of all nodes in the workflow
connections
Connection[]
Array of connections between nodes
variables
Record<string, any>
Global variables accessible during execution

Adding Nodes

There are several ways to add nodes to a workflow:

From Class

const node = workflow.addNode(AddNode, {
  position: { x: 100, y: 100 }
});

From Type String

const node = workflow.addNodeByType('math.add', {
  position: { x: 100, y: 100 }
});

Setting Initial Values

const node = workflow.addNode(AddNode);
node.setInputValue('a', 10);
node.setInputValue('b', 20);

Connecting Nodes

Connections define how data flows between nodes:
workflow.connect(
  sourceNodeId,    // ID of the node producing data
  outputPort,      // Name of the output port
  targetNodeId,    // ID of the node receiving data
  inputPort        // Name of the input port
);

Connection Example

// Create nodes
const inputNode = workflow.addNode(NumberInputNode);
const addNode = workflow.addNode(AddNode);
const displayNode = workflow.addNode(DisplayNode);

// Connect: input -> add
workflow.connect(
  inputNode.id,
  'output',
  addNode.id,
  'a'
);

// Connect: add -> display
workflow.connect(
  addNode.id,
  'result',
  displayNode.id,
  'value'
);

Multiple Connections

One output can connect to multiple inputs:
const inputNode = workflow.addNode(NumberInputNode);
const add1 = workflow.addNode(AddNode);
const add2 = workflow.addNode(AddNode);

// Fan out: one output to multiple inputs
workflow.connect(inputNode.id, 'output', add1.id, 'a');
workflow.connect(inputNode.id, 'output', add2.id, 'a');

Disconnecting Nodes

Remove connections between nodes:
workflow.disconnect(
  sourceNodeId,
  outputPort,
  targetNodeId,
  inputPort
);

Removing Nodes

Remove a node and all its connections:
workflow.removeNode(nodeId);

Workflow Validation

Validate the workflow before execution:
const errors = workflow.validate();

if (errors.length > 0) {
  console.error('Validation errors:', errors);
} else {
  console.log('Workflow is valid!');
}

Validation Checks

Ensures there are no circular dependencies in the workflow
Verifies all connections are valid (ports exist, types compatible)
Checks that all required inputs are connected or have values
Ensures connected ports have compatible types

Execution Order

Workflows execute nodes in topological order:
  1. Identify nodes with no dependencies (entry points)
  2. Execute those nodes
  3. Propagate data to connected nodes
  4. Repeat until all nodes are executed
import { Executor } from '@crystalflow/core';

const executor = new Executor();
const result = await executor.execute(workflow);

console.log('Execution status:', result.status);

Workflow Variables

Global variables can be accessed by all nodes:
const workflow = new Workflow({
  variables: {
    apiKey: 'your-api-key',
    environment: 'production'
  }
});

// Access in node execution
class ApiNode extends Node {
  execute() {
    const apiKey = this.context.variables.apiKey;
    // Use apiKey...
  }
}

Serialization

Workflows can be serialized to/from JSON:

To JSON

import { serializeWorkflow } from '@crystalflow/core';

const json = serializeWorkflow(workflow.toJSON());
console.log(json); // JSON string

From JSON

import { deserializeWorkflow, Workflow } from '@crystalflow/core';

const workflowData = deserializeWorkflow(json);
const workflow = Workflow.fromJSON(workflowData);

JSON Format

{
  "version": "1.0.0",
  "id": "workflow-123",
  "name": "My Workflow",
  "nodes": [
    {
      "id": "node-1",
      "type": "math.add",
      "position": { "x": 100, "y": 100 },
      "inputs": { "a": 10, "b": 20 }
    }
  ],
  "connections": [
    {
      "source": "node-1",
      "sourceOutput": "result",
      "target": "node-2",
      "targetInput": "value"
    }
  ],
  "variables": {}
}

Workflow Lifecycle

1

Creation

Create a new workflow instance
2

Building

Add nodes and create connections
3

Validation

Validate the workflow structure
4

Execution

Execute nodes in topological order
5

Results

Collect and process execution results

Best Practices

Always validate workflows before executing to catch structural issues early.
Consider using descriptive IDs for nodes to make debugging easier.
Design workflows without circular dependencies. CrystalFlow will detect and reject cycles.
Build and test workflows incrementally, adding nodes one at a time.
Add names and descriptions to workflows for better maintainability.

Example: Data Processing Workflow

import { Workflow } from '@crystalflow/core';

// Create workflow
const workflow = new Workflow({ name: 'Data Processing' });

// Add nodes
const fetchNode = workflow.addNodeByType('http.request', {
  position: { x: 100, y: 100 }
});

const filterNode = workflow.addNodeByType('data.filter', {
  position: { x: 300, y: 100 }
});

const transformNode = workflow.addNodeByType('data.transform', {
  position: { x: 500, y: 100 }
});

const saveNode = workflow.addNodeByType('file.write', {
  position: { x: 700, y: 100 }
});

// Configure nodes
fetchNode.setInputValue('url', 'https://api.example.com/data');
filterNode.setInputValue('condition', 'item.active === true');

// Connect nodes
workflow.connect(fetchNode.id, 'response', filterNode.id, 'array');
workflow.connect(filterNode.id, 'filtered', transformNode.id, 'input');
workflow.connect(transformNode.id, 'output', saveNode.id, 'data');

// Validate
const errors = workflow.validate();
if (errors.length === 0) {
  // Execute
  const executor = new Executor();
  const result = await executor.execute(workflow);
}

Next Steps