Skip to main content
This example demonstrates building data processing workflows with arrays, filtering, and transformations.

Overview

We’ll create nodes that:
  1. Generate or input data arrays
  2. Filter data based on conditions
  3. Transform data with mapping
  4. Aggregate results

Node Definitions

DataSourceNode

import { Node, defineNode, Property, Output } from '@crystalflow/core';

@defineNode({
  type: 'data.source',
  label: 'Data Source',
  category: 'Data'
})
export class DataSourceNode extends Node {
  @Property({
    type: 'select',
    label: 'Source Type',
    defaultValue: 'manual',
    options: [
      { value: 'manual', label: 'Manual Array' },
      { value: 'range', label: 'Number Range' }
    ]
  })
  sourceType: string = 'manual';

  @Property({
    type: 'string',
    label: 'Data (JSON)',
    defaultValue: '[1, 2, 3, 4, 5]'
  })
  dataJson: string = '[1, 2, 3, 4, 5]';

  @Output({ type: 'any[]', label: 'Items' })
  items: any[];

  execute() {
    if (this.sourceType === 'manual') {
      this.items = JSON.parse(this.dataJson);
    } else if (this.sourceType === 'range') {
      // Generate range 1-10
      this.items = Array.from({ length: 10 }, (_, i) => i + 1);
    }
  }
}

FilterNode

import { Node, defineNode, Input, Output, Property } from '@crystalflow/core';

@defineNode({
  type: 'data.filter',
  label: 'Filter',
  category: 'Data'
})
export class FilterNode extends Node {
  @Input({ type: 'any[]', label: 'Items' })
  items: any[] = [];

  @Property({
    type: 'select',
    label: 'Condition',
    defaultValue: 'greater',
    options: [
      { value: 'greater', label: 'Greater Than' },
      { value: 'less', label: 'Less Than' },
      { value: 'equals', label: 'Equals' },
      { value: 'even', label: 'Even Numbers' },
      { value: 'odd', label: 'Odd Numbers' }
    ]
  })
  condition: string = 'greater';

  @Property({
    type: 'number',
    label: 'Threshold',
    defaultValue: 5
  })
  threshold: number = 5;

  @Output({ type: 'any[]', label: 'Filtered' })
  filtered: any[];

  execute() {
    switch (this.condition) {
      case 'greater':
        this.filtered = this.items.filter(item => item > this.threshold);
        break;
      case 'less':
        this.filtered = this.items.filter(item => item < this.threshold);
        break;
      case 'equals':
        this.filtered = this.items.filter(item => item === this.threshold);
        break;
      case 'even':
        this.filtered = this.items.filter(item => item % 2 === 0);
        break;
      case 'odd':
        this.filtered = this.items.filter(item => item % 2 !== 0);
        break;
      default:
        this.filtered = this.items;
    }
  }
}

MapNode

import { Node, defineNode, Input, Output, Property } from '@crystalflow/core';

@defineNode({
  type: 'data.map',
  label: 'Map',
  category: 'Data'
})
export class MapNode extends Node {
  @Input({ type: 'any[]', label: 'Items' })
  items: any[] = [];

  @Property({
    type: 'select',
    label: 'Operation',
    defaultValue: 'multiply',
    options: [
      { value: 'multiply', label: 'Multiply' },
      { value: 'add', label: 'Add' },
      { value: 'square', label: 'Square' },
      { value: 'double', label: 'Double' }
    ]
  })
  operation: string = 'multiply';

  @Property({
    type: 'number',
    label: 'Factor',
    defaultValue: 2
  })
  factor: number = 2;

  @Output({ type: 'any[]', label: 'Mapped' })
  mapped: any[];

  execute() {
    switch (this.operation) {
      case 'multiply':
        this.mapped = this.items.map(item => item * this.factor);
        break;
      case 'add':
        this.mapped = this.items.map(item => item + this.factor);
        break;
      case 'square':
        this.mapped = this.items.map(item => item * item);
        break;
      case 'double':
        this.mapped = this.items.map(item => item * 2);
        break;
      default:
        this.mapped = this.items;
    }
  }
}

AggregateNode

import { Node, defineNode, Input, Output, Property } from '@crystalflow/core';

@defineNode({
  type: 'data.aggregate',
  label: 'Aggregate',
  category: 'Data'
})
export class AggregateNode extends Node {
  @Input({ type: 'any[]', label: 'Items' })
  items: any[] = [];

  @Property({
    type: 'select',
    label: 'Function',
    defaultValue: 'sum',
    options: [
      { value: 'sum', label: 'Sum' },
      { value: 'avg', label: 'Average' },
      { value: 'min', label: 'Minimum' },
      { value: 'max', label: 'Maximum' },
      { value: 'count', label: 'Count' }
    ]
  })
  aggregateFunc: string = 'sum';

  @Output({ type: 'number', label: 'Result' })
  result: number;

  execute() {
    switch (this.aggregateFunc) {
      case 'sum':
        this.result = this.items.reduce((sum, item) => sum + item, 0);
        break;
      case 'avg':
        this.result = this.items.reduce((sum, item) => sum + item, 0) / this.items.length;
        break;
      case 'min':
        this.result = Math.min(...this.items);
        break;
      case 'max':
        this.result = Math.max(...this.items);
        break;
      case 'count':
        this.result = this.items.length;
        break;
      default:
        this.result = 0;
    }
  }
}

Example Workflow

Filter and Sum Even Numbers

import { Workflow } from '@crystalflow/core';
import { DataSourceNode, FilterNode, AggregateNode, DisplayNode } from './nodes';

const workflow = new Workflow('data-processing', 'Data Processing');

// Create data source (1-10)
const source = workflow.addNode(DataSourceNode, {
  position: { x: 100, y: 200 },
  data: {
    sourceType: 'range',
    dataJson: '[1,2,3,4,5,6,7,8,9,10]'
  }
});

// Filter for even numbers
const filter = workflow.addNode(FilterNode, {
  position: { x: 350, y: 200 },
  data: {
    condition: 'even'
  }
});

// Sum the results
const aggregate = workflow.addNode(AggregateNode, {
  position: { x: 600, y: 200 },
  data: {
    aggregateFunc: 'sum'
  }
});

// Display result
const display = workflow.addNode(DisplayNode, {
  position: { x: 850, y: 200 }
});

// Connect nodes
workflow.connect(source.id, 'items', filter.id, 'items');
workflow.connect(filter.id, 'filtered', aggregate.id, 'items');
workflow.connect(aggregate.id, 'result', display.id, 'value');

// Execute
const result = await workflow.execute();
console.log('Sum of even numbers:', result.nodeResults.get(aggregate.id).outputs.result);
// Output: Sum of even numbers: 30 (2+4+6+8+10)

Complex Pipeline

// Source -> Filter -> Map -> Aggregate
const workflow = new Workflow('complex-pipeline', 'Complex Pipeline');

// Source: Generate 1-20
const source = workflow.addNode(DataSourceNode, {
  position: { x: 50, y: 200 },
  data: { sourceType: 'range' }
});

// Filter: Greater than 10
const filter = workflow.addNode(FilterNode, {
  position: { x: 250, y: 200 },
  data: { condition: 'greater', threshold: 10 }
});

// Map: Square each number
const map = workflow.addNode(MapNode, {
  position: { x: 450, y: 200 },
  data: { operation: 'square' }
});

// Aggregate: Sum all
const aggregate = workflow.addNode(AggregateNode, {
  position: { x: 650, y: 200 },
  data: { aggregateFunc: 'sum' }
});

// Connect pipeline
workflow.connect(source.id, 'items', filter.id, 'items');
workflow.connect(filter.id, 'filtered', map.id, 'items');
workflow.connect(map.id, 'mapped', aggregate.id, 'items');

// Execute
await workflow.execute();
// Numbers > 10: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
// Squared: [121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
// Sum: 2485

React Component

import React, { useState } from 'react';
import { WorkflowBuilder } from '@crystalflow/react';
import {
  DataSourceNode,
  FilterNode,
  MapNode,
  AggregateNode,
  DisplayNode
} from './nodes';

function DataProcessingWorkflow() {
  const [results, setResults] = useState(null);

  const nodes = [
    DataSourceNode,
    FilterNode,
    MapNode,
    AggregateNode,
    DisplayNode
  ];

  const handleExecute = (result) => {
    // Extract all node results
    const nodeResults = {};
    result.nodeResults.forEach((nodeResult, nodeId) => {
      nodeResults[nodeId] = nodeResult.outputs;
    });
    setResults(nodeResults);
  };

  return (
    <div className="data-workflow">
      <h1>Data Processing Workflow</h1>
      
      <WorkflowBuilder
        nodes={nodes}
        onExecute={handleExecute}
        showNodePalette={true}
        showPropertyPanel={true}
        style={{ height: '600px' }}
      />

      {results && (
        <div className="results">
          <h2>Results</h2>
          <pre>{JSON.stringify(results, null, 2)}</pre>
        </div>
      )}
    </div>
  );
}

export default DataProcessingWorkflow;

Advanced: Custom Transform

Build a custom transformation node:
@defineNode({
  type: 'data.transform',
  label: 'Custom Transform',
  category: 'Data'
})
export class CustomTransformNode extends Node {
  @Input({ type: 'any[]', label: 'Items' })
  items: any[] = [];

  @Property({
    type: 'string',
    label: 'Transform Function',
    defaultValue: 'item => item * 2',
    description: 'JavaScript function body'
  })
  transformCode: string = 'item => item * 2';

  @Output({ type: 'any[]', label: 'Transformed' })
  transformed: any[];

  execute() {
    try {
      // Create function from string (be careful in production!)
      const transformFn = new Function('item', `return (${this.transformCode})(item)`);
      this.transformed = this.items.map(transformFn);
    } catch (error) {
      throw new Error(`Transform error: ${error.message}`);
    }
  }

  validate() {
    const result = super.validate();
    
    // Validate function syntax
    try {
      new Function('item', `return (${this.transformCode})(item)`);
    } catch (error) {
      result.isValid = false;
      result.errors = result.errors || [];
      result.errors.push(`Invalid transform function: ${error.message}`);
    }
    
    return result;
  }
}

Key Concepts

Arrays flow through nodes via connections. Each node transforms the data and passes it forward.
Filter nodes reduce arrays based on conditions. Original array is not modified.
Map nodes transform each element. Input and output arrays have same length.
Aggregate nodes reduce arrays to single values (sum, average, etc.).

Next Steps