Skip to main content

Configuration

This guide covers all configuration options for customizing Eventflux runtime behavior.

Runtime Configuration

Basic Configuration

use eventflux::config::RuntimeConfig;
use std::time::Duration;

let config = RuntimeConfig::builder()
.buffer_size(10_000)
.backpressure_strategy(BackpressureStrategy::Block)
.checkpoint_interval(Duration::from_secs(60))
.build();

let runtime = manager.create_runtime_with_config(app_definition, config)?;

Configuration Options

OptionTypeDefaultDescription
buffer_sizeusize8192Event queue capacity
backpressure_strategyStrategyBlockHow to handle full queues
checkpoint_intervalDuration60sState checkpoint frequency
parallelismusizenum_cpusProcessing threads

Backpressure Strategies

Block (Default)

Blocks the sender until space is available:

let config = RuntimeConfig::builder()
.backpressure_strategy(BackpressureStrategy::Block)
.build();

Use when: Guaranteed delivery is required, sender can tolerate blocking.

Drop Oldest

Drops the oldest events when the buffer is full:

let config = RuntimeConfig::builder()
.backpressure_strategy(BackpressureStrategy::DropOldest)
.build();

Use when: Latest data is more important than historical data.

Drop Newest

Rejects new events when the buffer is full:

let config = RuntimeConfig::builder()
.backpressure_strategy(BackpressureStrategy::DropNewest)
.build();

Use when: Historical data must be preserved.

Buffer Sizing

Calculating Buffer Size

Consider:

  • Burst capacity: Maximum events expected in a burst
  • Memory constraints: Each event consumes memory
  • Latency tolerance: Larger buffers = higher latency potential
// For high-throughput scenarios
let config = RuntimeConfig::builder()
.buffer_size(100_000) // Handle large bursts
.build();

// For low-latency scenarios
let config = RuntimeConfig::builder()
.buffer_size(1_000) // Keep buffers small
.build();

Memory Estimation

// Approximate memory usage
let event_size_bytes = 200; // Average event size
let buffer_size = 10_000;
let estimated_memory = event_size_bytes * buffer_size;
// ~2MB for this configuration

Checkpointing Configuration

Checkpoint Interval

use std::time::Duration;

let config = RuntimeConfig::builder()
.checkpoint_interval(Duration::from_secs(30)) // Every 30 seconds
.build();

Checkpoint Storage

use eventflux::config::CheckpointConfig;
use std::path::PathBuf;

let checkpoint_config = CheckpointConfig {
enabled: true,
interval: Duration::from_secs(60),
compression: true,
storage: StorageConfig::Local {
path: PathBuf::from("/var/eventflux/checkpoints"),
},
max_retained: 10,
};

let config = RuntimeConfig::builder()
.checkpoint_config(checkpoint_config)
.build();

Redis Storage Backend

let checkpoint_config = CheckpointConfig {
enabled: true,
interval: Duration::from_secs(60),
compression: true,
storage: StorageConfig::Redis {
url: "redis://localhost:6379".to_string(),
prefix: "eventflux:checkpoints".to_string(),
ttl_seconds: 86400, // 24 hours
},
max_retained: 10,
};

Parallelism

Thread Configuration

let config = RuntimeConfig::builder()
.parallelism(4) // Use 4 processing threads
.build();

Auto-Detection

let config = RuntimeConfig::builder()
.parallelism(num_cpus::get()) // Use all available CPUs
.build();

Complete Configuration Example

use eventflux::prelude::*;
use eventflux::config::*;
use std::time::Duration;
use std::path::PathBuf;

fn create_production_config() -> RuntimeConfig {
RuntimeConfig::builder()
// Event processing
.buffer_size(50_000)
.backpressure_strategy(BackpressureStrategy::Block)
.parallelism(num_cpus::get())

// Checkpointing
.checkpoint_config(CheckpointConfig {
enabled: true,
interval: Duration::from_secs(30),
compression: true,
storage: StorageConfig::Local {
path: PathBuf::from("/var/eventflux/checkpoints"),
},
max_retained: 20,
})

// Metrics
.metrics_enabled(true)
.metrics_interval(Duration::from_secs(10))

.build()
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
let manager = EventFluxManager::new();
let config = create_production_config();

let app = r#"
DEFINE STREAM Input (value INT);
SELECT value FROM Input INSERT INTO Output;
"#;

let runtime = manager.create_runtime_with_config(app, config)?;
runtime.start();

Ok(())
}

Environment Variables

Some configuration can be set via environment variables:

VariableDescriptionDefault
EVENTFLUX_BUFFER_SIZEEvent buffer size8192
EVENTFLUX_PARALLELISMProcessing threadsCPU count
EVENTFLUX_CHECKPOINT_DIRCheckpoint directory/tmp/eventflux
EVENTFLUX_LOG_LEVELLog verbosityinfo
use std::env;

// Override from environment
let buffer_size: usize = env::var("EVENTFLUX_BUFFER_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(8192);

let config = RuntimeConfig::builder()
.buffer_size(buffer_size)
.build();

Configuration Profiles

Development Profile

fn dev_config() -> RuntimeConfig {
RuntimeConfig::builder()
.buffer_size(1_000)
.checkpoint_config(CheckpointConfig {
enabled: false, // Disable for faster iteration
..Default::default()
})
.build()
}

Production Profile

fn prod_config() -> RuntimeConfig {
RuntimeConfig::builder()
.buffer_size(50_000)
.backpressure_strategy(BackpressureStrategy::Block)
.checkpoint_config(CheckpointConfig {
enabled: true,
interval: Duration::from_secs(30),
compression: true,
storage: StorageConfig::Redis {
url: env::var("REDIS_URL").unwrap(),
prefix: "eventflux:prod".to_string(),
ttl_seconds: 86400 * 7, // 7 days
},
max_retained: 50,
})
.metrics_enabled(true)
.build()
}

Testing Profile

fn test_config() -> RuntimeConfig {
RuntimeConfig::builder()
.buffer_size(100) // Small for predictable tests
.parallelism(1) // Single-threaded for determinism
.checkpoint_config(CheckpointConfig {
enabled: false,
..Default::default()
})
.build()
}

Best Practices

Configuration Guidelines
  1. Start with defaults - Only customize what you need
  2. Profile your workload - Measure before tuning
  3. Test configuration changes - Verify performance impact
  4. Use environment variables - For deployment flexibility
  5. Document your choices - Explain non-default settings
Common Pitfalls
  • Buffer too small - Causes excessive backpressure
  • Buffer too large - Wastes memory, increases latency
  • Checkpointing too frequent - Performance overhead
  • Checkpointing too rare - Data loss risk on failure

Next Steps