Overview
We’ll create nodes that:- Generate or input data arrays
- Filter data based on conditions
- Transform data with mapping
- Aggregate results
Node Definitions
DataSourceNode
Copy
Ask AI
import { Node, defineNode, Property, Output } from '@crystalflow/core';
@defineNode({
type: 'data.source',
label: 'Data Source',
category: 'Data'
})
export class DataSourceNode extends Node {
@Property({
type: 'select',
label: 'Source Type',
defaultValue: 'manual',
options: [
{ value: 'manual', label: 'Manual Array' },
{ value: 'range', label: 'Number Range' }
]
})
sourceType: string = 'manual';
@Property({
type: 'string',
label: 'Data (JSON)',
defaultValue: '[1, 2, 3, 4, 5]'
})
dataJson: string = '[1, 2, 3, 4, 5]';
@Output({ type: 'any[]', label: 'Items' })
items: any[];
execute() {
if (this.sourceType === 'manual') {
this.items = JSON.parse(this.dataJson);
} else if (this.sourceType === 'range') {
// Generate range 1-10
this.items = Array.from({ length: 10 }, (_, i) => i + 1);
}
}
}
FilterNode
Copy
Ask AI
import { Node, defineNode, Input, Output, Property } from '@crystalflow/core';
@defineNode({
type: 'data.filter',
label: 'Filter',
category: 'Data'
})
export class FilterNode extends Node {
@Input({ type: 'any[]', label: 'Items' })
items: any[] = [];
@Property({
type: 'select',
label: 'Condition',
defaultValue: 'greater',
options: [
{ value: 'greater', label: 'Greater Than' },
{ value: 'less', label: 'Less Than' },
{ value: 'equals', label: 'Equals' },
{ value: 'even', label: 'Even Numbers' },
{ value: 'odd', label: 'Odd Numbers' }
]
})
condition: string = 'greater';
@Property({
type: 'number',
label: 'Threshold',
defaultValue: 5
})
threshold: number = 5;
@Output({ type: 'any[]', label: 'Filtered' })
filtered: any[];
execute() {
switch (this.condition) {
case 'greater':
this.filtered = this.items.filter(item => item > this.threshold);
break;
case 'less':
this.filtered = this.items.filter(item => item < this.threshold);
break;
case 'equals':
this.filtered = this.items.filter(item => item === this.threshold);
break;
case 'even':
this.filtered = this.items.filter(item => item % 2 === 0);
break;
case 'odd':
this.filtered = this.items.filter(item => item % 2 !== 0);
break;
default:
this.filtered = this.items;
}
}
}
MapNode
Copy
Ask AI
import { Node, defineNode, Input, Output, Property } from '@crystalflow/core';
@defineNode({
type: 'data.map',
label: 'Map',
category: 'Data'
})
export class MapNode extends Node {
@Input({ type: 'any[]', label: 'Items' })
items: any[] = [];
@Property({
type: 'select',
label: 'Operation',
defaultValue: 'multiply',
options: [
{ value: 'multiply', label: 'Multiply' },
{ value: 'add', label: 'Add' },
{ value: 'square', label: 'Square' },
{ value: 'double', label: 'Double' }
]
})
operation: string = 'multiply';
@Property({
type: 'number',
label: 'Factor',
defaultValue: 2
})
factor: number = 2;
@Output({ type: 'any[]', label: 'Mapped' })
mapped: any[];
execute() {
switch (this.operation) {
case 'multiply':
this.mapped = this.items.map(item => item * this.factor);
break;
case 'add':
this.mapped = this.items.map(item => item + this.factor);
break;
case 'square':
this.mapped = this.items.map(item => item * item);
break;
case 'double':
this.mapped = this.items.map(item => item * 2);
break;
default:
this.mapped = this.items;
}
}
}
AggregateNode
Copy
Ask AI
import { Node, defineNode, Input, Output, Property } from '@crystalflow/core';
@defineNode({
type: 'data.aggregate',
label: 'Aggregate',
category: 'Data'
})
export class AggregateNode extends Node {
@Input({ type: 'any[]', label: 'Items' })
items: any[] = [];
@Property({
type: 'select',
label: 'Function',
defaultValue: 'sum',
options: [
{ value: 'sum', label: 'Sum' },
{ value: 'avg', label: 'Average' },
{ value: 'min', label: 'Minimum' },
{ value: 'max', label: 'Maximum' },
{ value: 'count', label: 'Count' }
]
})
aggregateFunc: string = 'sum';
@Output({ type: 'number', label: 'Result' })
result: number;
execute() {
switch (this.aggregateFunc) {
case 'sum':
this.result = this.items.reduce((sum, item) => sum + item, 0);
break;
case 'avg':
this.result = this.items.reduce((sum, item) => sum + item, 0) / this.items.length;
break;
case 'min':
this.result = Math.min(...this.items);
break;
case 'max':
this.result = Math.max(...this.items);
break;
case 'count':
this.result = this.items.length;
break;
default:
this.result = 0;
}
}
}
Example Workflow
Filter and Sum Even Numbers
Copy
Ask AI
import { Workflow } from '@crystalflow/core';
import { DataSourceNode, FilterNode, AggregateNode, DisplayNode } from './nodes';
const workflow = new Workflow('data-processing', 'Data Processing');
// Create data source (1-10)
const source = workflow.addNode(DataSourceNode, {
position: { x: 100, y: 200 },
data: {
sourceType: 'range',
dataJson: '[1,2,3,4,5,6,7,8,9,10]'
}
});
// Filter for even numbers
const filter = workflow.addNode(FilterNode, {
position: { x: 350, y: 200 },
data: {
condition: 'even'
}
});
// Sum the results
const aggregate = workflow.addNode(AggregateNode, {
position: { x: 600, y: 200 },
data: {
aggregateFunc: 'sum'
}
});
// Display result
const display = workflow.addNode(DisplayNode, {
position: { x: 850, y: 200 }
});
// Connect nodes
workflow.connect(source.id, 'items', filter.id, 'items');
workflow.connect(filter.id, 'filtered', aggregate.id, 'items');
workflow.connect(aggregate.id, 'result', display.id, 'value');
// Execute
const result = await workflow.execute();
console.log('Sum of even numbers:', result.nodeResults.get(aggregate.id).outputs.result);
// Output: Sum of even numbers: 30 (2+4+6+8+10)
Complex Pipeline
Copy
Ask AI
// Source -> Filter -> Map -> Aggregate
const workflow = new Workflow('complex-pipeline', 'Complex Pipeline');
// Source: Generate 1-20
const source = workflow.addNode(DataSourceNode, {
position: { x: 50, y: 200 },
data: { sourceType: 'range' }
});
// Filter: Greater than 10
const filter = workflow.addNode(FilterNode, {
position: { x: 250, y: 200 },
data: { condition: 'greater', threshold: 10 }
});
// Map: Square each number
const map = workflow.addNode(MapNode, {
position: { x: 450, y: 200 },
data: { operation: 'square' }
});
// Aggregate: Sum all
const aggregate = workflow.addNode(AggregateNode, {
position: { x: 650, y: 200 },
data: { aggregateFunc: 'sum' }
});
// Connect pipeline
workflow.connect(source.id, 'items', filter.id, 'items');
workflow.connect(filter.id, 'filtered', map.id, 'items');
workflow.connect(map.id, 'mapped', aggregate.id, 'items');
// Execute
await workflow.execute();
// Numbers > 10: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// Squared: [121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
// Sum: 2485
React Component
Copy
Ask AI
import React, { useState } from 'react';
import { WorkflowBuilder } from '@crystalflow/react';
import {
DataSourceNode,
FilterNode,
MapNode,
AggregateNode,
DisplayNode
} from './nodes';
function DataProcessingWorkflow() {
const [results, setResults] = useState(null);
const nodes = [
DataSourceNode,
FilterNode,
MapNode,
AggregateNode,
DisplayNode
];
const handleExecute = (result) => {
// Extract all node results
const nodeResults = {};
result.nodeResults.forEach((nodeResult, nodeId) => {
nodeResults[nodeId] = nodeResult.outputs;
});
setResults(nodeResults);
};
return (
<div className="data-workflow">
<h1>Data Processing Workflow</h1>
<WorkflowBuilder
nodes={nodes}
onExecute={handleExecute}
showNodePalette={true}
showPropertyPanel={true}
style={{ height: '600px' }}
/>
{results && (
<div className="results">
<h2>Results</h2>
<pre>{JSON.stringify(results, null, 2)}</pre>
</div>
)}
</div>
);
}
export default DataProcessingWorkflow;
Advanced: Custom Transform
Build a custom transformation node:Copy
Ask AI
@defineNode({
type: 'data.transform',
label: 'Custom Transform',
category: 'Data'
})
export class CustomTransformNode extends Node {
@Input({ type: 'any[]', label: 'Items' })
items: any[] = [];
@Property({
type: 'string',
label: 'Transform Function',
defaultValue: 'item => item * 2',
description: 'JavaScript function body'
})
transformCode: string = 'item => item * 2';
@Output({ type: 'any[]', label: 'Transformed' })
transformed: any[];
execute() {
try {
// Create function from string (be careful in production!)
const transformFn = new Function('item', `return (${this.transformCode})(item)`);
this.transformed = this.items.map(transformFn);
} catch (error) {
throw new Error(`Transform error: ${error.message}`);
}
}
validate() {
const result = super.validate();
// Validate function syntax
try {
new Function('item', `return (${this.transformCode})(item)`);
} catch (error) {
result.isValid = false;
result.errors = result.errors || [];
result.errors.push(`Invalid transform function: ${error.message}`);
}
return result;
}
}
Key Concepts
Data Flow
Data Flow
Arrays flow through nodes via connections. Each node transforms the data and passes it forward.
Filtering
Filtering
Filter nodes reduce arrays based on conditions. Original array is not modified.
Mapping
Mapping
Map nodes transform each element. Input and output arrays have same length.
Aggregation
Aggregation
Aggregate nodes reduce arrays to single values (sum, average, etc.).