This example demonstrates building data processing workflows with arrays, filtering, and transformations.Documentation Index
Fetch the complete documentation index at: https://crystalflow.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
Overview
We’ll create nodes that:- Generate or input data arrays
- Filter data based on conditions
- Transform data with mapping
- Aggregate results
Node Definitions
DataSourceNode
import { Node, defineNode, Property, Output } from '@crystalflow/core';
@defineNode({
type: 'data.source',
label: 'Data Source',
category: 'Data'
})
export class DataSourceNode extends Node {
@Property({
type: 'select',
label: 'Source Type',
defaultValue: 'manual',
options: [
{ value: 'manual', label: 'Manual Array' },
{ value: 'range', label: 'Number Range' }
]
})
sourceType: string = 'manual';
@Property({
type: 'string',
label: 'Data (JSON)',
defaultValue: '[1, 2, 3, 4, 5]'
})
dataJson: string = '[1, 2, 3, 4, 5]';
@Output({ type: 'any[]', label: 'Items' })
items: any[];
execute() {
if (this.sourceType === 'manual') {
this.items = JSON.parse(this.dataJson);
} else if (this.sourceType === 'range') {
// Generate range 1-10
this.items = Array.from({ length: 10 }, (_, i) => i + 1);
}
}
}
FilterNode
import { Node, defineNode, Input, Output, Property } from '@crystalflow/core';
@defineNode({
type: 'data.filter',
label: 'Filter',
category: 'Data'
})
export class FilterNode extends Node {
@Input({ type: 'any[]', label: 'Items' })
items: any[] = [];
@Property({
type: 'select',
label: 'Condition',
defaultValue: 'greater',
options: [
{ value: 'greater', label: 'Greater Than' },
{ value: 'less', label: 'Less Than' },
{ value: 'equals', label: 'Equals' },
{ value: 'even', label: 'Even Numbers' },
{ value: 'odd', label: 'Odd Numbers' }
]
})
condition: string = 'greater';
@Property({
type: 'number',
label: 'Threshold',
defaultValue: 5
})
threshold: number = 5;
@Output({ type: 'any[]', label: 'Filtered' })
filtered: any[];
execute() {
switch (this.condition) {
case 'greater':
this.filtered = this.items.filter(item => item > this.threshold);
break;
case 'less':
this.filtered = this.items.filter(item => item < this.threshold);
break;
case 'equals':
this.filtered = this.items.filter(item => item === this.threshold);
break;
case 'even':
this.filtered = this.items.filter(item => item % 2 === 0);
break;
case 'odd':
this.filtered = this.items.filter(item => item % 2 !== 0);
break;
default:
this.filtered = this.items;
}
}
}
MapNode
import { Node, defineNode, Input, Output, Property } from '@crystalflow/core';
@defineNode({
type: 'data.map',
label: 'Map',
category: 'Data'
})
export class MapNode extends Node {
@Input({ type: 'any[]', label: 'Items' })
items: any[] = [];
@Property({
type: 'select',
label: 'Operation',
defaultValue: 'multiply',
options: [
{ value: 'multiply', label: 'Multiply' },
{ value: 'add', label: 'Add' },
{ value: 'square', label: 'Square' },
{ value: 'double', label: 'Double' }
]
})
operation: string = 'multiply';
@Property({
type: 'number',
label: 'Factor',
defaultValue: 2
})
factor: number = 2;
@Output({ type: 'any[]', label: 'Mapped' })
mapped: any[];
execute() {
switch (this.operation) {
case 'multiply':
this.mapped = this.items.map(item => item * this.factor);
break;
case 'add':
this.mapped = this.items.map(item => item + this.factor);
break;
case 'square':
this.mapped = this.items.map(item => item * item);
break;
case 'double':
this.mapped = this.items.map(item => item * 2);
break;
default:
this.mapped = this.items;
}
}
}
AggregateNode
import { Node, defineNode, Input, Output, Property } from '@crystalflow/core';
@defineNode({
type: 'data.aggregate',
label: 'Aggregate',
category: 'Data'
})
export class AggregateNode extends Node {
@Input({ type: 'any[]', label: 'Items' })
items: any[] = [];
@Property({
type: 'select',
label: 'Function',
defaultValue: 'sum',
options: [
{ value: 'sum', label: 'Sum' },
{ value: 'avg', label: 'Average' },
{ value: 'min', label: 'Minimum' },
{ value: 'max', label: 'Maximum' },
{ value: 'count', label: 'Count' }
]
})
aggregateFunc: string = 'sum';
@Output({ type: 'number', label: 'Result' })
result: number;
execute() {
switch (this.aggregateFunc) {
case 'sum':
this.result = this.items.reduce((sum, item) => sum + item, 0);
break;
case 'avg':
this.result = this.items.reduce((sum, item) => sum + item, 0) / this.items.length;
break;
case 'min':
this.result = Math.min(...this.items);
break;
case 'max':
this.result = Math.max(...this.items);
break;
case 'count':
this.result = this.items.length;
break;
default:
this.result = 0;
}
}
}
Example Workflow
Filter and Sum Even Numbers
import { Workflow } from '@crystalflow/core';
import { DataSourceNode, FilterNode, AggregateNode, DisplayNode } from './nodes';
const workflow = new Workflow('data-processing', 'Data Processing');
// Create data source (1-10)
const source = workflow.addNode(DataSourceNode, {
position: { x: 100, y: 200 },
data: {
sourceType: 'range',
dataJson: '[1,2,3,4,5,6,7,8,9,10]'
}
});
// Filter for even numbers
const filter = workflow.addNode(FilterNode, {
position: { x: 350, y: 200 },
data: {
condition: 'even'
}
});
// Sum the results
const aggregate = workflow.addNode(AggregateNode, {
position: { x: 600, y: 200 },
data: {
aggregateFunc: 'sum'
}
});
// Display result
const display = workflow.addNode(DisplayNode, {
position: { x: 850, y: 200 }
});
// Connect nodes
workflow.connect(source.id, 'items', filter.id, 'items');
workflow.connect(filter.id, 'filtered', aggregate.id, 'items');
workflow.connect(aggregate.id, 'result', display.id, 'value');
// Execute
const result = await workflow.execute();
console.log('Sum of even numbers:', result.nodeResults.get(aggregate.id).outputs.result);
// Output: Sum of even numbers: 30 (2+4+6+8+10)
Complex Pipeline
// Source -> Filter -> Map -> Aggregate
const workflow = new Workflow('complex-pipeline', 'Complex Pipeline');
// Source: Generate 1-20
const source = workflow.addNode(DataSourceNode, {
position: { x: 50, y: 200 },
data: { sourceType: 'range' }
});
// Filter: Greater than 10
const filter = workflow.addNode(FilterNode, {
position: { x: 250, y: 200 },
data: { condition: 'greater', threshold: 10 }
});
// Map: Square each number
const map = workflow.addNode(MapNode, {
position: { x: 450, y: 200 },
data: { operation: 'square' }
});
// Aggregate: Sum all
const aggregate = workflow.addNode(AggregateNode, {
position: { x: 650, y: 200 },
data: { aggregateFunc: 'sum' }
});
// Connect pipeline
workflow.connect(source.id, 'items', filter.id, 'items');
workflow.connect(filter.id, 'filtered', map.id, 'items');
workflow.connect(map.id, 'mapped', aggregate.id, 'items');
// Execute
await workflow.execute();
// Numbers > 10: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// Squared: [121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
// Sum: 2485
React Component
import React, { useState } from 'react';
import { WorkflowBuilder } from '@crystalflow/react';
import {
DataSourceNode,
FilterNode,
MapNode,
AggregateNode,
DisplayNode
} from './nodes';
function DataProcessingWorkflow() {
const [results, setResults] = useState(null);
const nodes = [
DataSourceNode,
FilterNode,
MapNode,
AggregateNode,
DisplayNode
];
const handleExecute = (result) => {
// Extract all node results
const nodeResults = {};
result.nodeResults.forEach((nodeResult, nodeId) => {
nodeResults[nodeId] = nodeResult.outputs;
});
setResults(nodeResults);
};
return (
<div className="data-workflow">
<h1>Data Processing Workflow</h1>
<WorkflowBuilder
nodes={nodes}
onExecute={handleExecute}
showNodePalette={true}
showPropertyPanel={true}
style={{ height: '600px' }}
/>
{results && (
<div className="results">
<h2>Results</h2>
<pre>{JSON.stringify(results, null, 2)}</pre>
</div>
)}
</div>
);
}
export default DataProcessingWorkflow;
Advanced: Custom Transform
Build a custom transformation node:@defineNode({
type: 'data.transform',
label: 'Custom Transform',
category: 'Data'
})
export class CustomTransformNode extends Node {
@Input({ type: 'any[]', label: 'Items' })
items: any[] = [];
@Property({
type: 'string',
label: 'Transform Function',
defaultValue: 'item => item * 2',
description: 'JavaScript function body'
})
transformCode: string = 'item => item * 2';
@Output({ type: 'any[]', label: 'Transformed' })
transformed: any[];
execute() {
try {
// Create function from string (be careful in production!)
const transformFn = new Function('item', `return (${this.transformCode})(item)`);
this.transformed = this.items.map(transformFn);
} catch (error) {
throw new Error(`Transform error: ${error.message}`);
}
}
validate() {
const result = super.validate();
// Validate function syntax
try {
new Function('item', `return (${this.transformCode})(item)`);
} catch (error) {
result.isValid = false;
result.errors = result.errors || [];
result.errors.push(`Invalid transform function: ${error.message}`);
}
return result;
}
}
Key Concepts
Data Flow
Data Flow
Arrays flow through nodes via connections. Each node transforms the data and passes it forward.
Filtering
Filtering
Filter nodes reduce arrays based on conditions. Original array is not modified.
Mapping
Mapping
Map nodes transform each element. Input and output arrays have same length.
Aggregation
Aggregation
Aggregate nodes reduce arrays to single values (sum, average, etc.).
Next Steps
Conditional Logic
Add branching and conditions
Basic Math
Simple math workflows
Custom Nodes
Build custom nodes
Error Handling
Handle errors gracefully