Skip to content

moggan1337/Ethereal

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Ethereal - Distributed Stream Processing with Exactly-Once Semantics

CI

Ethereal Logo Exactly-Once Chandy-Lamport TypeScript

A production-grade distributed stream processing framework with guaranteed exactly-once delivery semantics

FeaturesArchitectureQuick StartDocumentationAPI ReferenceContributing


🎬 Demo

Ethereal Demo

Exactly-once stream processing with checkpointing

Screenshots

Component Preview
Stream Graph graph
Checkpoint Visual checkpoint
Latency Metrics latency

Visual Description

Stream graph displays operators as nodes with data flow connections. Checkpoint visualization shows Chandy-Lamport snapshot markers. Latency metrics present end-to-end processing times with percentiles.


Table of Contents

  1. Overview
  2. Features
  3. Architecture
  4. Exactly-Once Semantics
  5. Windowed Operations
  6. Stream Joins
  7. Fault Tolerance
  8. Flow Control & Backpressure
  9. Auto-Scaling
  10. Quick Start
  11. API Reference
  12. Examples
  13. Configuration
  14. Performance Tuning
  15. Production Deployment

Overview

Ethereal is a comprehensive distributed stream processing framework designed for building scalable, fault-tolerant streaming applications with exactly-once processing semantics. Inspired by Apache Flink and Kafka Streams, Ethereal provides a rich set of operators for real-time data processing while ensuring that each record is processed exactly once, even in the presence of failures.

Key Design Goals

  • Exactly-Once Processing: Guarantee that every record is processed exactly once, with no duplicates and no data loss
  • Distributed Snapshots: Implement Chandy-Lamport algorithm for consistent global state snapshots
  • Event Time Processing: Support for event time semantics with watermarks and late data handling
  • Dynamic Scaling: Automatic scaling based on load metrics with minimal downtime
  • Flink/Kafka Compatible API: Familiar programming model for easy adoption

Features

Core Streaming Features

Feature Description
Exactly-Once Delivery End-to-end exactly-once processing with idempotent sinks and two-phase commit
Chandy-Lamport Snapshots Consistent distributed snapshots for fault tolerance
Event Time Processing Native support for event time with watermarks
Windowed Operations Tumbling, sliding, session, and count windows
Stream Joins Inner, outer, semi, and anti joins with windowed support
Stateful Processing Keyed state with RocksDB and in-memory backends
Late Data Handling Configurable allowed lateness with side outputs
Flow Control Credit-based flow control with adaptive backpressure
Auto-Scaling Dynamic parallelism adjustment based on metrics

Window Types

  • Tumbling Windows: Fixed-size, non-overlapping windows
  • Sliding Windows: Overlapping windows with configurable slide interval
  • Session Windows: Dynamic windows based on activity gaps
  • Count Windows: Windows that close after N elements
  • Global Windows: All records in a single window

Join Types

  • Inner Join: Only emit when both sides have matching keys
  • Left/Right Join: Outer joins preserving one side
  • Full Outer Join: All records from both sides
  • Semi Join: Only emit one side for matching keys
  • Anti Join: Emit when keys don't match
  • Interval Join: Time-bounded joins between streams

Architecture

Core Components

┌─────────────────────────────────────────────────────────────────┐
│                     Ethereal Architecture                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐       │
│  │   Sources   │───▶│  Operators  │───▶│    Sinks     │       │
│  └──────────────┘    └──────────────┘    └──────────────┘       │
│         │                   │                   │               │
│         ▼                   ▼                   ▼               │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │              Stream Graph & Topology                     │    │
│  └─────────────────────────────────────────────────────────┘    │
│                              │                                   │
│         ┌────────────────────┼────────────────────┐             │
│         ▼                    ▼                    ▼             │
│  ┌─────────────┐      ┌─────────────┐      ┌─────────────┐      │
│  │ Checkpoint  │      │    State    │      │   Network   │      │
│  │ Coordinator │      │   Backend   │      │    Layer    │      │
│  └─────────────┘      └─────────────┘      └─────────────┘      │
│                              │                                   │
│         ┌────────────────────┼────────────────────┐             │
│         ▼                    ▼                    ▼             │
│  ┌─────────────┐      ┌─────────────┐      ┌─────────────┐      │
│  │  Watermark  │      │    Flow     │      │    Auto      │      │
│  │  Generator  │      │   Control   │      │    Scaler    │      │
│  └─────────────┘      └─────────────┘      └─────────────┘      │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Distributed Snapshots

Ethereal implements the Chandy-Lamport distributed snapshot algorithm for achieving consistent global state across distributed operators. This enables fault tolerance without data duplication or loss.

How Chandy-Lamport Works

  1. Checkpoint Trigger: The checkpoint coordinator initiates a snapshot by sending markers to all source operators
  2. Marker Propagation: Markers flow through the stream graph, marking the beginning of a snapshot region
  3. State Capture: Each operator captures its local state when it receives the marker on all input channels
  4. Channel Snapshots: In-flight records are captured as part of channel snapshots
  5. Acknowledgment: Operators acknowledge completion to the coordinator
  6. Completion: Once all operators have completed, the checkpoint is committed
Source1 ──▶ Operator1 ──▶ Operator2 ──▶ Sink
   │            │             │
   ▼            ▼             ▼
[Marker] ──▶ [Marker] ──▶ [Marker]
   │            │             │
   ▼            ▼             ▼
[State1]    [State2]      [State3]

Stream Processing Model

Ethereal follows a directed acyclic graph (DAG) model where:

  • Nodes represent operators (map, filter, reduce, join, etc.)
  • Edges represent data flows between operators
  • Partitions allow parallel processing of data
  • Keys enable stateful operations and grouping

Exactly-Once Semantics

Delivery Guarantees

Ethereal supports three levels of delivery guarantees:

Guarantee Description Use Case
At-Least-Once Records may be processed multiple times, but never lost High throughput, tolerant of duplicates
At-Most-Once Records may be lost, but never duplicated Low latency, best-effort delivery
Exactly-Once Each record is processed exactly once Critical data, billing, financial

Checkpointing

Ethereal uses coordinated checkpoints to achieve exactly-once processing:

// Enable checkpointing
env.enableCheckpointing(interval = 60000, timeout = 300000);

// Configure checkpoint behavior
env.getCheckpointConfig()
  .enableCheckpointing(interval)
  .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  .setMinPauseBetweenCheckpoints(10000)
  .setMaxConcurrentCheckpoints(1);

Two-Phase Commit

For sinks that support transactions, Ethereal implements two-phase commit:

  1. Prepare Phase: Sink receives pre-commit notification with transaction data
  2. Commit Phase: After successful checkpoint, sink commits the transaction
  3. Abort Phase: On failure, sink aborts and rolls back
const transactionSink = {
  async invoke(record, context) {
    // Add to pending transaction
    await addToTransaction(record);
  },
  
  async commit(transaction) {
    // Persist all buffered records
    await flushTransaction(transaction);
  },
  
  async abort(transaction) {
    // Discard buffered records
    await discardTransaction(transaction);
  }
};

Windowed Operations

Window Types

Tumbling Windows

Fixed-size windows that do not overlap:

// 5-second tumbling windows
env
  .addSource(source)
  .keyBy(r => r.key)
  .window(new TumblingWindowAssigner(5000))
  .aggregate(aggregator);

Sliding Windows

Overlapping windows with configurable size and slide:

// 10-second windows sliding every 5 seconds
env
  .addSource(source)
  .keyBy(r => r.key)
  .window(new SlidingWindowAssigner(10000, 5000))
  .aggregate(aggregator);

Session Windows

Dynamic windows based on activity gaps:

// Sessions with 5-second gap
env
  .addSource(source)
  .keyBy(r => r.userId)
  .window(new SessionWindowAssigner(5000, 10000))
  .aggregate(aggregator);

Count Windows

Windows that close after N elements:

// Windows of 100 elements each
env
  .addSource(source)
  .keyBy(r => r.key)
  .window(new CountWindowAssigner(100))
  .aggregate(aggregator);

Watermarks

Watermarks track progress in event time and enable window completion:

const watermarkStrategy = WatermarkStrategy
  .forMonotonousTimestamps()
  .withAllowedLateness(3000); // 3 seconds allowed lateness

Late Data Handling

Configure how to handle records arriving after watermark:

// Allow late data within the allowed lateness window
const lateDataTag = new OutputTag<Record>('late-data');

env
  .addSource(source)
  .keyBy(r => r.key)
  .window(new TumblingWindowAssigner(5000))
  .allowedLateness(3000) // Allow 3 seconds of late data
  .sideOutputLateData(lateDataTag)
  .aggregate(aggregator);

// Process late data separately
lateDataStream.addSink(lateDataSink);

Stream Joins

Windowed Joins

Join two streams within a time window:

// Inner join with 10-second window
const joinOperator = new StreamJoinOperator(
  JoinType.INNER,
  {
    leftKey: (r) => r.orderId,
    rightKey: (r) => r.orderId,
  },
  { start: 0, end: 10000, id: 'join-window', type: WindowType.TUMBLING },
  1000 // allowed lateness
);

Join Types

// Inner Join - only matching pairs
const innerJoin = new StreamJoinOperator(JoinType.INNER, condition, window);

// Left Outer Join - all left records
const leftJoin = new StreamJoinOperator(JoinType.LEFT, condition, window);

// Right Outer Join - all right records
const rightJoin = new StreamJoinOperator(JoinType.RIGHT, condition, window);

// Full Outer Join - all records from both sides
const fullJoin = new StreamJoinOperator(JoinType.FULL, condition, window);

// Semi Join - only left records that match
const semiJoin = new StreamJoinOperator(JoinType.SEMI, condition, window);

// Anti Join - left records that don't match
const antiJoin = new StreamJoinOperator(JoinType.ANTI, condition, window);

Interval Joins

Join records within a time interval:

const intervalJoin = new IntervalJoinOperator(
  (r) => r.userId,      // Left key extractor
  (r) => r.userId,      // Right key extractor
  -3000,               // Lower bound: left -3s to right
  3000                 // Upper bound: left +3s to right
);

Fault Tolerance

Recovery Process

When a failure occurs, Ethereal recovers by:

  1. Detecting Failure: Task manager detects failure through heartbeat timeout
  2. Restoring State: Read operator states from the latest successful checkpoint
  3. Rewinding Sources: Rewind source connectors to the checkpoint offset
  4. Restarting Tasks: Restart affected operators with restored state
  5. Resuming Processing: Continue processing from the checkpoint position

Checkpoint Storage

Ethereal supports multiple checkpoint storage backends:

// In-memory (for development/testing)
const memoryBackend = new MemoryStateBackend();

// RocksDB (for production)
const rocksDBBackend = new RocksDBStateBackend('/path/to/db');

State Recovery

// Automatic recovery on restart
const env = createStreamExecutionEnvironment();
env.enableCheckpointing(30000);

// On failure, state is automatically restored from checkpoint
const result = await env.execute('job-name');

Flow Control & Backpressure

Backpressure Strategies

Strategy Behavior Use Case
Blocking Block sender until capacity available Critical ordering
Dropping Drop records when buffer full High throughput, tolerant of loss
Spilling Spill to disk when buffer full Memory-constrained environments
Adaptive Dynamically adjust based on pressure Production workloads

Credit-Based Flow Control

Ethereal uses credit-based flow control to prevent buffer overflow:

const flowController = new FlowController(BackpressureStrategy.BLOCKING);

// Channels exchange credits
flowController.registerChannel('ch1', 'source', 'operator');

// Send with credit check
const result = await flowController.send('ch1', record);
if (!result.success && result.pending) {
  // Wait for credit
}

Adaptive Rate Controller

Dynamically adjust processing rate based on utilization:

const rateController = new AdaptiveRateController(0.7); // Target 70% utilization

// Record metrics
rateController.recordMeasurement(utilization, throughput);

// Get adjusted rate
const newRate = rateController.adjustRate();

Auto-Scaling

Scaling Policies

Configure how operators scale based on metrics:

const policy: ScalingPolicy = {
  type: ScalingPolicyType.LAG,
  targetUtilization: 0.7,
  minParallelism: 1,
  maxParallelism: 64,
  scaleUpThreshold: 10000,    // Lag threshold for scale up
  scaleDownThreshold: 100,    // Lag threshold for scale down
  scaleUpCooldown: 60000,     // Cooldown after scale up
  scaleDownCooldown: 120000, // Cooldown after scale down
};

Scaling Triggers

  • Throughput-based: Scale based on input/output rate
  • Lag-based: Scale based on consumer lag
  • Latency-based: Scale based on processing latency
  • Custom: Implement custom scaling logic

Dynamic Graph Modification

When scaling occurs:

  1. State Redistribution: Redistribute keyed state to new partitions
  2. Partition Remapping: Update partition-to-task mapping
  3. Minimal Downtime: Use rolling restarts for zero-downtime scaling
const autoScaler = createAutoScaler(30000); // Evaluate every 30 seconds

autoScaler.setPolicy('operator-1', throughputPolicy);
autoScaler.setPolicy('operator-2', lagPolicy);

autoScaler.start();

// Listen for scaling events
autoScaler.addListener({
  async onScalingAction(action) {
    console.log(`Scaling ${action.operatorId}: ${action.currentParallelism} -> ${action.targetParallelism}`);
  }
});

Quick Start

Installation

npm install ethereal-stream
# or
yarn add ethereal-stream

Your First Streaming Application

import { 
  createStreamExecutionEnvironment,
  TumblingWindowAssigner 
} from 'ethereal-stream';

async function wordCount() {
  // 1. Create execution environment
  const env = createStreamExecutionEnvironment();
  
  // 2. Enable checkpointing for exactly-once
  env.enableCheckpointing(10000);
  
  // 3. Define the streaming pipeline
  env
    .addSource(wordSource)
    .flatMap(line => line.split(' ').map(word => ({ word, count: 1 })))
    .keyBy(record => record.word)
    .window(new TumblingWindowAssigner(5000))
    .aggregate({
      createAccumulator: () => ({ word: '', count: 0 }),
      add: (record, acc) => {
        acc.word = record.word;
        acc.count += record.count;
        return acc;
      },
      getResult: acc => acc,
      merge: (a, b) => { a.count += b.count; return a; }
    })
    .addSink(resultSink);
  
  // 4. Execute
  const result = await env.execute('Word Count');
  console.log(`Status: ${result.status}`);
}

wordCount();

API Reference

StreamExecutionEnvironment

// Create environment
const env = createStreamExecutionEnvironment();

// Configure
env.setParallelism(4);
env.enableCheckpointing(interval);
env.setStateBackend(backend);

// Add sources and sinks
env.addSource(sourceFunction);
env.addSink(sinkFunction);

// Execute
const result = await env.execute(jobName);

DataStream Operations

// Transformation
stream.map(fn);              // Apply function to each record
stream.filter(predicate);    // Filter records
stream.flatMap(fn);          // Flatten results
stream.keyBy(keySelector);   // Group by key
stream.reduce(reducer);      // Reduce to single value per key
stream.aggregate(aggregator); // Custom aggregation
stream.process(fn);          // Custom process function

// Windowing
stream.window(assigner);      // Apply window
stream.allowedLateness(ms);   // Configure late data handling
stream.sideOutputLateData(tag); // Redirect late data

// Output
stream.print();               // Print to console
stream.writeAsText(path);     // Write to file
stream.addSink(sink);         // Custom sink

Window Assigners

// Tumbling (fixed size, non-overlapping)
new TumblingWindowAssigner(windowSize, offset?, lateness?);

// Sliding (overlapping)
new SlidingWindowAssigner(windowSize, slideSize, offset?, lateness?);

// Session (gap-based)
new SessionWindowAssigner(gap, maxGap?, lateness?);

// Count (element-based)
new CountWindowAssigner(count);

Examples

Example 1: Simple Word Count

// See src/examples/word-count.ts

Example 2: User Activity Analysis

// See src/examples/user-activity.ts

Example 3: Sessionization

// Sessionize user activity
env
  .addSource(events)
  .keyBy(event => event.userId)
  .window(new SessionWindowAssigner(30000)) // 30s inactivity gap
  .aggregate(sessionAggregator)
  .addSink(sessionSink);

Example 4: Stream Join

// Join orders with shipments
orders
  .join(shipments)
  .where(order => order.orderId)
  .equalTo(shipment => shipment.orderId)
  .window(TumblingWindow.of(Time.minutes(5)))
  .apply(order, shipment => ({ order, shipment, latency: shipment.time - order.time }))
  .addSink(joinedSink);

Configuration

Checkpoint Configuration

const config = env.getCheckpointConfig();

config
  .enableCheckpointing(60000)                    // Interval: 1 minute
  .setCheckpointTimeout(300000)                 // Timeout: 5 minutes
  .setMinPauseBetweenCheckpoints(10000)         // Min pause: 10 seconds
  .setMaxConcurrentCheckpoints(1)               // Max concurrent: 1
  .setCheckpointRetention(
    CheckpointRetention.RETAIN_ON_CANCELLATION  // Retain on cancel
  );

State Backend Configuration

// Memory backend (development)
const memoryBackend = new MemoryStateBackend();

// RocksDB backend (production)
const rocksDBBackend = new RocksDBStateBackend(
  '/path/to/db',
  64,  // Write buffer size (MB)
  4    // Max background jobs
);

env.setStateBackend(rocksDBBackend);

Flow Control Configuration

const flowController = new FlowController(BackpressureStrategy.ADAPTIVE);

flowController.configureChannel('channel-1', {
  bufferSize: 1000,
  highWatermark: 800,
  lowWatermark: 200,
  timeout: 30000,
});

Performance Tuning

Parallelism

Set parallelism based on available resources:

// Global parallelism
env.setParallelism(4);

// Per-operator parallelism
env.addSource(source).setParallelism(8);
env.addSink(sink).setParallelism(4);

Buffer Configuration

Tune network buffers for throughput:

// Network buffer configuration
env.setNetworkBufferSize(256);  // KB per buffer
env.setNetworkBuffersPerChannel(32);

Checkpoint Tuning

Balance between consistency and overhead:

// For lower latency
env.enableCheckpointing(5000);  // More frequent checkpoints

// For higher throughput
env.enableCheckpointing(60000); // Less frequent checkpoints

Production Deployment

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: ethereal-job
spec:
  replicas: 3
  template:
    spec:
      containers:
      - name: ethereal
        image: ethereal/streaming:1.0.0
        env:
        - name: CHECKPOINT_URL
          value: "hdfs:///checkpoint"
        - name: STATE_BACKEND
          value: "rocksdb"

Monitoring

Integrate with Prometheus/Grafana:

// Enable metrics
env.getConfig().setMetricEnabled(true);

// Access metrics
const metrics = operator.getMetricGroup();
metrics.counter('records-processed');
metrics.gauge('buffer-size', buffer.size());
metrics.histogram('processing-latency', latency);

High Availability

Configure for HA:

env.getConfig()
  .setRestartStrategy(RestartStrategies.exponentialDelay())
  .setMaxRestartAttempts(10)
  .setFailOnCheckpointErrors(false);

Architecture Deep Dive

Chandy-Lamport Implementation

The Chandy-Lamport algorithm implementation in Ethereal consists of:

  1. CheckpointCoordinator: Orchestrates snapshot creation
  2. SnapshotMarker: Marker messages that flow through the graph
  3. ChannelSnapshot: Captures in-flight records
  4. OperatorSnapshot: Contains operator state and channel state

State Management

State is organized by:

  • Namespace: Logical grouping of state (typically per operator)
  • Key: Key for keyed state
  • Value: The actual state value

Supported state types:

  • ValueState<T>: Single value per key
  • ListState<T>: List of values per key
  • MapState<K, V>: Map of key-value pairs per key
  • AggregatingState<S, T, R>: Aggregated state per key

Watermark Propagation

Watermarks flow through the graph following these rules:

  1. Watermark at source based on earliest event time
  2. Propagate watermark to downstream when all inputs have passed it
  3. Operators hold records until watermark passes window end
  4. Late records (before watermark) are handled per configuration

Contributing

Contributions are welcome! Please read our contributing guidelines before submitting PRs.

Development Setup

git clone https://github.com/moggan1337/Ethereal
cd Ethereal
npm install
npm run build
npm test

Code Style

We use TypeScript with strict mode. Please ensure:

  • All new code includes type definitions
  • Tests pass with npm test
  • Linting passes with npm run lint

License

MIT License - see LICENSE for details.


Acknowledgments

Ethereal is inspired by and builds upon concepts from:

  • Apache Flink: Stream processing architecture and API design
  • Apache Kafka: Producer/consumer patterns and exactly-once semantics
  • Google Dataflow: Unified batch/stream processing model
  • Chandy & Lamport: Distributed snapshot algorithm

Built with ❤️ by moggan1337

Version 1.0.0

About

Distributed Stream Processing Engine with Exactly-Once Semantics and Chandy-Lamport Snapshots

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors