Skip to main content

Connectors Overview

EventFlux provides a powerful connector system that enables integration with external messaging systems and data stores. Connect your streaming pipelines to message brokers, databases, and other services using SQL-native syntax.

Architecture

The connector system consists of three main components:

┌─────────────────────────────────────────────────────────────────┐
│ EventFlux Engine │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌──────────┐ ┌─────────┐ ┌──────────┐ │
│ │ Source │───▶│ Mapper │───▶│ Query │───▶│ Mapper │──▶│
│ │ │ │ (decode) │ │ Engine │ │ (encode) │ │
│ └─────────┘ └──────────┘ └─────────┘ └──────────┘ │
│ ▲ │ │
│ │ ▼ │
│ ┌─────────┐ ┌─────────┐ │
│ │RabbitMQ │ │ Sink │ │
│ │WebSocket│ │ │ │
│ │ Kafka │ └─────────┘ │
│ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘

Sources

Sources consume data from external systems and feed events into the processing pipeline. They handle:

  • Connection management and reconnection
  • Message acknowledgment
  • Backpressure handling
  • Format conversion via mappers

Sinks

Sinks publish processed events to external systems. They handle:

  • Connection pooling
  • Delivery guarantees
  • Batching and buffering
  • Format serialization via mappers

Mappers

Mappers transform between raw bytes and structured events:

  • Source Mappers: Decode incoming bytes (JSON, CSV, bytes) into event attributes
  • Sink Mappers: Encode event attributes into output format (JSON, CSV, bytes)

SQL Syntax

Defining a Source Stream

CREATE STREAM StreamName (
field1 TYPE,
field2 TYPE,
...
) WITH (
type = 'source',
extension = 'connector_name',
format = 'mapper_name',
"connector.option1" = 'value1',
"connector.option2" = 'value2'
);

Defining a Sink Stream

CREATE STREAM StreamName (
field1 TYPE,
field2 TYPE,
...
) WITH (
type = 'sink',
extension = 'connector_name',
format = 'mapper_name',
"connector.option1" = 'value1',
"connector.option2" = 'value2'
);

Available Connectors

ConnectorSourceSinkStatusDescription
RabbitMQYesYesProduction ReadyAMQP 0-9-1 message broker
WebSocketYesYesProduction ReadyReal-time bidirectional streaming
KafkaPlannedPlannedRoadmapApache Kafka streaming
HTTPPlannedPlannedRoadmapREST/Webhook endpoints
FilePlannedPlannedRoadmapFile-based input/output

Available Mappers

MapperSourceSinkStatusDescription
JSONYesYesProduction ReadyJSON serialization
CSVYesYesProduction ReadyCSV parsing/formatting
BytesYesYesProduction ReadyRaw binary passthrough
AvroPlannedPlannedRoadmapApache Avro format
ProtobufPlannedPlannedRoadmapProtocol Buffers

Complete Example

Here's a complete example showing RabbitMQ source and sink with JSON mapping:

-- Input: Consume JSON events from RabbitMQ queue
CREATE STREAM OrderInput (
order_id STRING,
customer_id STRING,
amount DOUBLE,
product STRING
) WITH (
type = 'source',
extension = 'rabbitmq',
format = 'json',
"rabbitmq.host" = 'localhost',
"rabbitmq.port" = '5672',
"rabbitmq.queue" = 'orders',
"rabbitmq.username" = 'guest',
"rabbitmq.password" = 'guest'
);

-- Output: Publish enriched events to RabbitMQ exchange
CREATE STREAM OrderOutput (
order_id STRING,
customer_id STRING,
amount DOUBLE,
product STRING,
priority STRING
) WITH (
type = 'sink',
extension = 'rabbitmq',
format = 'json',
"rabbitmq.host" = 'localhost',
"rabbitmq.exchange" = 'processed-orders',
"rabbitmq.routing.key" = 'high-value'
);

-- Processing: Filter high-value orders and add priority
INSERT INTO OrderOutput
SELECT
order_id,
customer_id,
amount,
product,
CASE
WHEN amount > 1000 THEN 'HIGH'
WHEN amount > 100 THEN 'MEDIUM'
ELSE 'LOW'
END AS priority
FROM OrderInput
WHERE amount > 50;

Extension Registration

All built-in connectors and mappers are registered automatically. Custom extensions can be added programmatically:

use eventflux::core::eventflux_manager::EventFluxManager;
use eventflux::core::extension::{SourceFactory, SinkFactory};

let mut manager = EventFluxManager::new();

// Register custom source factory
manager.context().add_source_factory(
"custom".to_string(),
Box::new(MyCustomSourceFactory)
);

// Register custom sink factory
manager.context().add_sink_factory(
"custom".to_string(),
Box::new(MyCustomSinkFactory)
);

Next Steps