Joins
Joins allow you to combine events from multiple streams or enrich streams with table data. Eventflux supports all standard SQL join types.
Join Types
| Type | Description | Matching |
|---|---|---|
JOIN / INNER JOIN | Only matching events | Both sides required |
LEFT JOIN | All left events | Right may be null |
RIGHT JOIN | All right events | Left may be null |
FULL JOIN | All events | Either side may be null |
Stream-to-Stream Joins
Basic Join Syntax
SELECT a.column1, b.column2
FROM StreamA AS a
WINDOW TUMBLING(duration)
JOIN StreamB AS b
ON a.key = b.key
INSERT INTO Output;
Inner Join
Returns only matching events from both streams:
DEFINE STREAM Trades (symbol STRING, price DOUBLE, volume INT);
DEFINE STREAM Quotes (symbol STRING, bid DOUBLE, ask DOUBLE);
SELECT t.symbol,
t.price AS trade_price,
q.bid,
q.ask,
t.price - q.bid AS spread
FROM Trades AS t
WINDOW TUMBLING(1 sec)
JOIN Quotes AS q
ON t.symbol = q.symbol
INSERT INTO TradeQuoteMatches;
Left Join
Returns all events from the left stream, with matching right events (or null):
SELECT t.symbol,
t.price,
t.volume,
COALESCE(r.risk_score, 0) AS risk_score
FROM Trades AS t
WINDOW TUMBLING(5 sec)
LEFT JOIN RiskScores AS r
ON t.symbol = r.symbol
INSERT INTO TradesWithRisk;
Right Join
Returns all events from the right stream:
SELECT COALESCE(t.trade_id, 'NO_TRADE') AS trade_id,
o.order_id,
o.symbol,
o.quantity
FROM Trades AS t
WINDOW TUMBLING(10 sec)
RIGHT JOIN Orders AS o
ON t.order_id = o.order_id
INSERT INTO OrderFillStatus;
Full Outer Join
Returns all events from both streams:
SELECT COALESCE(a.symbol, b.symbol) AS symbol,
a.price AS price_a,
b.price AS price_b
FROM ExchangeA AS a
WINDOW TUMBLING(1 sec)
FULL JOIN ExchangeB AS b
ON a.symbol = b.symbol
INSERT INTO CrossExchangePrices;
Multiple Join Conditions
SELECT t.symbol,
t.price,
q.bid,
q.ask
FROM Trades AS t
WINDOW TUMBLING(1 sec)
JOIN Quotes AS q
ON t.symbol = q.symbol
AND t.exchange = q.exchange
INSERT INTO MatchedData;
Stream-to-Table Joins
Enrich streaming data with reference tables:
DEFINE STREAM Orders (
order_id STRING,
customer_id STRING,
product_id STRING,
quantity INT
);
DEFINE TABLE Customers (
customer_id STRING,
name STRING,
tier STRING
);
DEFINE TABLE Products (
product_id STRING,
name STRING,
price DOUBLE
);
-- Enrich orders with customer and product info
SELECT o.order_id,
c.name AS customer_name,
c.tier AS customer_tier,
p.name AS product_name,
o.quantity,
o.quantity * p.price AS total_value
FROM Orders AS o
JOIN Customers AS c
ON o.customer_id = c.customer_id
JOIN Products AS p
ON o.product_id = p.product_id
INSERT INTO EnrichedOrders;
Join with Aggregations
Combine joins with window aggregations:
SELECT a.symbol,
SUM(a.volume) AS total_volume_a,
SUM(b.volume) AS total_volume_b,
AVG(a.price) AS avg_price_a,
AVG(b.price) AS avg_price_b
FROM ExchangeA AS a
WINDOW TUMBLING(1 min)
JOIN ExchangeB AS b
ON a.symbol = b.symbol
GROUP BY a.symbol
INSERT INTO VolumeComparison;
Join Examples
Price Arbitrage Detection
-- Find arbitrage opportunities across exchanges
SELECT a.symbol,
a.price AS price_a,
b.price AS price_b,
ABS(a.price - b.price) AS spread,
ABS(a.price - b.price) / a.price * 100 AS spread_pct
FROM ExchangeA AS a
WINDOW TUMBLING(1 sec)
JOIN ExchangeB AS b
ON a.symbol = b.symbol
WHERE ABS(a.price - b.price) / a.price > 0.001
INSERT INTO ArbitrageOpportunities;
Order-Trade Matching
-- Match orders with their executions
SELECT o.order_id,
o.symbol,
o.quantity AS ordered_qty,
COALESCE(SUM(t.quantity), 0) AS filled_qty,
o.quantity - COALESCE(SUM(t.quantity), 0) AS remaining_qty
FROM Orders AS o
WINDOW TUMBLING(1 min)
LEFT JOIN Trades AS t
ON o.order_id = t.order_id
GROUP BY o.order_id, o.symbol, o.quantity
INSERT INTO OrderStatus;
Multi-Source Sensor Fusion
-- Combine readings from multiple sensor types
SELECT t.device_id,
t.temperature,
h.humidity,
p.pressure,
t.timestamp
FROM TemperatureSensors AS t
WINDOW TUMBLING(10 sec)
LEFT JOIN HumiditySensors AS h
ON t.device_id = h.device_id
LEFT JOIN PressureSensors AS p
ON t.device_id = p.device_id
INSERT INTO FusedSensorData;
Customer 360 View
-- Combine transaction with customer profile
SELECT t.transaction_id,
t.amount,
t.timestamp,
c.name,
c.segment,
c.lifetime_value,
CASE
WHEN t.amount > c.avg_transaction * 3 THEN 'HIGH'
ELSE 'NORMAL'
END AS risk_flag
FROM Transactions AS t
JOIN CustomerProfiles AS c
ON t.customer_id = c.customer_id
INSERT INTO EnrichedTransactions;
Join Behavior
Window Requirements
Stream-to-stream joins require a window to bound the join:
-- Window defines the join scope
FROM StreamA AS a
WINDOW TUMBLING(5 sec) -- Events within 5-second windows are joined
JOIN StreamB AS b
ON a.key = b.key
Null Handling
-- Handle nulls from outer joins
SELECT COALESCE(a.value, 0) AS value_a,
COALESCE(b.value, 0) AS value_b,
COALESCE(a.value, 0) + COALESCE(b.value, 0) AS total
FROM StreamA AS a
WINDOW TUMBLING(1 sec)
FULL JOIN StreamB AS b
ON a.key = b.key
INSERT INTO Combined;
Best Practices
Join Optimization
- Use appropriate window sizes - Smaller windows reduce memory usage
- Filter before joining - Apply WHERE clauses early
- Index join keys - For table joins, ensure keys are indexed
- Monitor state size - Joins maintain state for the window duration
Performance Considerations
- Cardinality explosion - Be careful with many-to-many joins
- Memory usage - Large windows with high-throughput streams consume more memory
- Late arrivals - Consider delay windows for out-of-order events
Next Steps
- Windows - Window types for joins
- Patterns - Pattern detection across streams
- Aggregations - Aggregate joined data