JOIN clause

You can use the JOIN clause of the SELECT statement to write a query that returns rows from multiple streams or tables at once, combined as single rows, allowing you to enrich one stream or table with data from another. Often, JOIN is used to look up 1 or more values in a second stream or table based on a reference value (or on several) in the first. But that is not JOIN’s only application.

s-Server JOINs are similar to JOINs in an RDBMS context. In both cases, you SELECT columns FROM a combination of two streams / tables, which are connected by JOIN. Fundamentally, JOIN combines 1 stream / table with 1 other. But this operation can be extended consecutively to realize multi-way JOINs across any number of streams / tables.

Streaming JOIN differs from the RDBMS model in one crucial respect: RDBMS tables are finite whereas streams continue indefinitely. When you JOIN two RDBMS tables, all rows for both tables are known in advance. Therefore all matches can be identified at once, and rows can be returned accordingly.

With streaming SQL, on the other hand, your SELECT query is never finished. As rows enter each stream, JOINed rows are emitted. Since it would be impossible to JOIN all rows in two infinite streams, s-Server applies a WINDOW on each stream to limit the set of rows and identify matches. Note: The default WINDOW for each stream is RANGE CURRENT ROW. But you will often want to define a custom WINDOW.

Contents

Syntax

{ [INNER] | { LEFT | RIGHT | FULL } [OUTER] | CROSS } JOIN  <StreamOrTable> ON boolean_expression |
{ [INNER] | { LEFT | RIGHT | FULL } [OUTER] } JOIN  <StreamOrTable> |
NATURAL { [INNER] | { LEFT | RIGHT | FULL } [OUTER] } JOIN  <StreamOrTable>
<StreamOrTable> := { <stream> [ OVER <join_window> ] | <table> }

<join_window> :=  WINDOW <window name> | ‘(‘ ( <range_window_spec> | <row_window_spec> ) ‘)’
<range_window_spec> := RANGE [ BETWEEN INTERVAL ‘<timeliteral>’ <timeunit> AND ] ( INTERVAL ‘<timevalue>’ <timeunit> PRECEDING | CURRENT ROW )
<row_window_spec> := ROWS [ BETWEEN <number> PRECEDING AND ] [ <number> PRECEDING | CURRENT ROW ]
<timeunit> := <basetimeunit> [ TO <basetimeunit> ] [ ‘(‘ <precision> ‘)’ ]
<basetimeunit> : = ( SECOND | MINUTE | HOUR | DAY | MONTH | YEAR )

JOIN Conditions

You can query both streams and tables with joins. To simplify, we’ll refer to streams in the following discussion. When you join two streams or tables, we refer to the first stream in the query as the left stream and the second stream in the query as the right stream.

In the query below, for example, stocks.order_amount is the left stream and stocks.stock_price is the right stream.

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price
  FROM stocks.order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS oq
    JOIN stocks.stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp
    ON (oq.ticker = sp.ticker);

Join conditions specify a way to connect the left and right streams. All types of joins except CROSS joins accept a join condition. (CROSS joins simply pair every row from the left stream with every row from the right stream).

There are three ways to specify a join condition:

  • You can use the ON condition to explicitly specify a column from each stream. These columns do not have to have the same names, but need to have matching data. When these two columns match, the join is successful. Using the ON condition is the most general and powerful way to specify a join condition.
  • You can use the USING condition to list columns with matching names in the left and right stream. For example, if you know that both streams have columns called “ticker” and “price”, you can write r1 JOIN r2 USING (ticker, price). That syntax is equivalent to r1 JOIN r2 ON r1.ticker= r1.price AND r1.ticker= r2.price.
  • You can use the the NATURAL keyword to automatically match columns from the left and right streams that have the same name.

The following inner examples are equivalent:

Example With ON

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price FROM stocks.order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS oq   JOIN stocks.stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp   ON (oq.ticker = sp.ticker);

Example With USING

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price FROM stocks.order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) as oq   JOIN stocks.stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp   USING (ticker);

Example With NATURAL

Here, ticker is the only column in common between stocks.order_quantity and stocks.stock_price.

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price FROM stocks.order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) as oa   NATURAL JOIN stocks.stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp;

JOIN Types

There are five types of joins:

Join Type Description
INNER JOIN (or JOIN) Returns joined rows from the left and right streams where data from the columns specified in the join conditions match. If rows do not have matching columns, these rows are not emitted by the query.
LEFT JOIN (or LEFT OUTER JOIN) Returns joined rows from the left and right streams where data from the columns specified in the join conditions match. If rows do not have matching columns, any rows from the left stream are kept, and null values are assigned to columns selected from the right stream.
RIGHT JOIN (or RIGHT OUTER JOIN) Returns joined rows from the left and right streams where data from the columns specified in the join conditions match. If rows do not have matching columns, any rows from the right stream are kept, and null values are assigned to columns selected from the left stream.
FULL JOIN (or FULL OUTER JOIN) Returns joined rows from the left and right streams where data from the columns specified in the join conditions match. If rows do not have matching columns, all rows from both streams are kept, and null values are assigned to any columns from the other stream.
CROSS JOIN In a CROSS JOIN every row from the left is paired with every row from the right. A cross join never has an ON or USING condition.

Working With Streaming Joins

Stream-to-stream joins join two streams that are changing over time. During each millisecond, some rows will get added to the left stream and some to the right stream.

For each millisecond in the joined stream’s rowtime, s-Server outputs whatever rows are in the join of those two relations that were not in the join the last millisecond.

As noted above, when joining on streaming data, you will often want to define a window for one or both of the streams that you are joining. Windows specify a subset of rows, either by time or number of rows. Windows include endpoints; that is, if you specify a 1 minute window starting at 1:00:00, rows with rowtimes of 1:00:00 and 1:01:00 are both included in the join. For more information on windows in streaming SQL, see the WINDOW clause topic in this guide.

If you do not specify an OVER clause for a stream, the join evaluates for the current row only. It may be useful in some cases to omit an OVER clause for one of the streams, but you may not receive useful results if you omit the OVER clause for both streams unless you know the streams will emit data at the same rowtimes. For example, both streams represent periodic aggregates (such as hourly summaries) then they will both be emitting data in the same millisecond. In this case, there is no need for a window because the prior aggregation will sync the joined streams to the same rowtime.

The window specification may also be the name of a window defined in the WINDOW clause.

Which rows are joined, then, depends on how the windows for the join are defined. In the diagram below, rows from 05:02:15 to 05:03:15 are part of the initial window for the join. As the stream clock moves to 05:03:30, rows from 05:02:15 to 05:02:29.999 move out of the window.

Output Rowtimes

The rowtime of a given output row is the rowtime at the point it was possible to calculate the output row. The stream clock for the output stream remains at the lower of the rowtimes for the two streams. For an inner join, the rowtime of an output row is the later of the rowtimes of the two input rows. This is also true for an outer join in which matching input rows are found.

Examples

The examples below use the following input:

stocks.order_quantity

ROWTIME QUANTITY TICKER
‘2019-03-30 05:02:10.000’ 750 IBM
‘2019-03-30 05:03:10.000’ 1000 IBM
‘2019-03-30 05:03:15.000’ 1000 GOOGL
‘2019-03-30 05:03:20.000’ 2000 GOOGL
‘2019-03-30 05:03:20.000’ 2000 IBM
‘2019-03-30 05:03:28.000’ 1000 MSFT
‘2019-03-30 05:03:30.000’ 2000 MSFT

stocks.price

ROWTIME ENDING_PRICE TICKER
‘2019-03-30 05:02:10.000’ 75 IBM
‘2019-03-30 05:03:10.000’ 100 IBM
‘2019-03-30 05:03:15.000’ 100 MSFT
‘2019-03-30 05:03:20.000’ 100 GOOGL
‘2019-03-30 05:03:25.000’ 200 MSFT

The following query uses a window of 1 minute for both left and right streams and an INNER join:

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.price FROM stocks.order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS oq   JOIN stocks.stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp   ON (oq.ticker = sp.ticker);

Result:

'ROWTIME','TICKER','QUANTITY','ENDING_PRICE'
'2019-03-30 05:02:10.0','IBM','750','75'
--The next two rows join with the rows from 5:02:10, because they are still part of the window.
--Note that endpoints are included in the window.
'2019-03-30 05:03:10.0','IBM','1000','75'
'2019-03-30 05:03:10.0','IBM','750','100'
'2019-03-30 05:03:10.0','IBM','1000','100'
'2019-03-30 05:03:20.0','GOOGL','1000','100'
'2019-03-30 05:03:20.0','GOOGL','2000','100'
--note that in next row, rows with IBM are not joined with rows from 5:02:10, because they have fallen out of the window.
'2019-03-30 05:03:20.0','IBM','2000','100'

Note that:

  • The joined stream clock stays at ‘2019-03-30 05:03:20.000’, even though rows have arrived in stocks.order_quantity after this time.

  • No match appears for MSFT, so data from the rows at ‘2019-03-30 05:03:13.000’ and ‘2019-03-30 05:03:15.000’ from stocks.price do not appear in the join.

Once a row arrives in stocks.price at 05:03:30 or later, the two rows that had arrived after 05:03:15 become available for the join:

INSERT INTO stocks.ticker_price (order_time, ticker, price) VALUES (CAST('2019-03-30 05:03:30.000' as TIMESTAMP), 'IBM', 200);

This insert results in six new rows being emitted. These rows only emit once the stream clock moves to 5:03:30 (when the row from stocks.stock_price arrives at 05:03:30)

Nothing emits as long as the stream clock remains at 05:03:20. That would also be true for a right or full join–stream clock still needs to catch up.

This is the also the first time that anything has matched MSFT, which had appeared in stocks.stock_price, but not, up until now, in stocks.order_quantity.

In a left, right or full join, we would have seen MSFT earlier, but without any columns joined from stocks.order_quantity.

'2019-03-30 05:03:25.0','MSFT','1000','100'
'2019-03-30 05:03:25.0','MSFT','1000','200'
'2019-03-30 05:03:30.0','MSFT','2000','100'
'2019-03-30 05:03:30.0','MSFT','2000','200'
'2019-03-30 05:03:30.0','IBM','1000','200'
'2019-03-30 05:03:30.0','IBM','2000','200'

The four rows with “MSFT” in column ticker result from the two rows arriving at 05:03:16 and 05:03:17 matching the two earlier unmatched rows from stocks.price. This is because the output stream clock has moved to 05:03:17.

The new rows are all matches that have come in up until 5:03:30, including those rows that had arrived in stocks.stock_price after 05:03:20 but before 05:03:30. Note that this means that four matches came in for MSFT, even though the row inserted into stocks.stock_price at 05:03:30 had “IBM” in column ticker.

Example with ROWS

The following query uses a window of 5 rows preceding the current for both left and right streams and an INNER join:

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price
  FROM order_quantity OVER (ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS oq
  JOIN stock_price OVER (ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS sp
  ON (oq.ticker = sp.ticker);

LEFT JOIN

Left joins include all rows from the left stream, regardless of whether the join condition matches rows from the right stream. The following is the same query in the above example, but with a LEFT JOIN instead of a JOIN.

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price
  FROM order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) as oq
    LEFT JOIN stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp
    ON (oq.ticker = sp.ticker);
ROWTIME TICKER QUANTITY ENDING_PRICE
‘2019-03-30 05:02:10.0’ IBM 750 75
‘2019-03-30 05:03:10.0’ IBM 1000 75
‘2019-03-30 05:03:10.0’ IBM 750 100
‘2019-03-30 05:03:10.0’ IBM 1000 100
‘2019-03-30 05:03:15.0’ NULL 1000 NULL
‘2019-03-30 05:03:20.0’ GOOGL 1000 100
‘2019-03-30 05:03:20.0’ GOOGL 2000 100
‘2019-03-30 05:03:20.0’ IBM 2000 100

Because there is no match for MSFT in stocks.order_quantity, the row at 2019-03-30 05:03:15.000 still emits, but returns null for the columns ticker and price. (The first part of the query specifies that ticker is selected from price.)

RIGHT JOIN

Right joins include all rows from the right stream, regardless of whether the join condition matches rows from the left stream. The following is the same query in the above example, but with a RIGHT JOIN instead of a JOIN.

SELECT STREAM rowtime, oq.ticker, sp.quantity, oq.ending_price
  FROM order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS oq
    RIGHT JOIN stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp
    ON (oq.ticker = sp.ticker);

Result:

ROWTIME TICKER QUANTITY ENDING_PRICE
‘2019-03-30 05:02:10.000’ IBM 750 75
‘2019-03-30 05:03:10.000’ IBM 1000 75
‘2019-03-30 05:03:10.000’ IBM 750 100
‘2019-03-30 05:03:10.000’ IBM 1000 100
‘2019-03-30 05:03:15.000’ MSFT NULL 100
‘2019-03-30 05:03:20.000’ GOOGL 1000 100
‘2019-03-30 05:03:20.000’ GOOGL 2000 100
‘2019-03-30 05:03:20.000’ IBM 2000 100

Because there is no match for MSFT in stocks.order_quantity, the row at 2019-03-30 05:03:15.000 still emits, but returns null for the column amount.

FULL JOIN

Full joins include all rows from both streams, regardless of whether the join condition matches rows. The following is the same query in the above example, but with a FULL JOIN instead of a JOIN.

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price
  FROM order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) as oq
    FULL JOIN stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp
    ON (oq.ticker = sp.ticker);

Result:

ROWTIME TICKER QUANTITY ENDING_PRICE
‘2019-03-30 05:02:10.000’ IBM 750 75
‘2019-03-30 05:03:10.000’ IBM 1000 75
‘2019-03-30 05:03:10.000’ IBM 750 100
‘2019-03-30 05:03:10.000’ IBM 1000 100
‘2019-03-30 05:03:15.000’ NULL 1000 NULL
‘2019-03-30 05:03:15.000’ MSFT NULL 100
‘2019-03-30 05:03:20.000’ GOOGL 1000 100
‘2019-03-30 05:03:20.000’ GOOGL 2000 100
‘2019-03-30 05:03:20.000’ IBM 2000 100

Because there is no match for MSFT in stocks.order_quantity, rows at 2019-03-30 05:03:15.000 still emit, but return null for the columns ticker and price for the first row and returns null for the column amount in the second row. Note that a full join looks exactly as if you combined the results of a right and left join.

CROSS JOIN

A CROSS join returns the Cartesian Product of all rows in the two streams' windows that have arrived up until the joined stream’s stream clock. CROSS JOIN does not have any matching condition in the join clause.

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price
  FROM order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) as oq
    CROSS JOIN stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS o;

Result:

ROWTIME TICKER QUANTITY ENDING_PRICE
‘2019-03-30 05:02:10.000’ IBM 750 75
‘2019-03-30 05:03:10.000’ IBM 1000 75
‘2019-03-30 05:03:10.000’ IBM 750 100
‘2019-03-30 05:03:10.000’ IBM 1000 100
‘2019-03-30 05:03:15.000’ IBM 1000 100
‘2019-03-30 05:03:15.000’ MSFT 1000 100
‘2019-03-30 05:03:15.000’ MSFT 1000 100
‘2019-03-30 05:03:20.000’ IBM 2000 100
‘2019-03-30 05:03:20.000’ MSFT 2000 100
‘2019-03-30 05:03:20.000’ GOOGL 1000 100
‘2019-03-30 05:03:20.000’ GOOGL 1000 100
‘2019-03-30 05:03:20.000’ GOOGL 2000 100
‘2019-03-30 05:03:20.000’ IBM 2000 100
‘2019-03-30 05:03:20.000’ MSFT 2000 100
‘2019-03-30 05:03:20.000’ GOOGL 2000 100
‘2019-03-30 05:03:20.000’ MSFT 1000 200
‘2019-03-30 05:03:20.000’ MSFT 1000 200
‘2019-03-30 05:03:20.000’ MSFT 2000 200
‘2019-03-30 05:03:20.000’ MSFT 2000 200
‘2019-03-30 05:03:25.000’ IBM 1000 100
‘2019-03-30 05:03:25.000’ MSFT 1000 100
‘2019-03-30 05:03:25.000’ GOOGL 1000 100
‘2019-03-30 05:03:25.000’ MSFT 1000 200

As always for a CROSS JOIN, the size of the result is the product of the number of rows: seven rows from stocks.stock_price and five rows from stocks.order_quantity. Note that even though stocks.price has six input rows, CROSS JOIN still respects the stream clock, so rows that have arrived after ‘2019-03-30 05:03:25.000’ are not included in the cross join.

Using the WHERE Clause With JOINs

You can use the WHERE clause to achieve similar effects to join conditions. With a WHERE clause, rows are filtered after they have been emitted from the join. For an inner join, WHERE is equivalent to ON, but for an outer join, the partially NULL rows are only generated correctly if the condition is evaluated for each pair of candidate rows, and a WHERE clause cannot do that. For more details, see the topic WHERE clause in this guide.

This syntax works similarly to the join syntax for Oracle (a non-ASCII join).

Using CROSS JOIN and WHERE

SELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price
  FROM order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) as oa
    CROSS JOIN stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp WHERE sp.ticker = oq.ticker;

Using a FROM clause with streams separated by commas, and a WHERE condition

SSELECT STREAM rowtime, sp.ticker, oq.quantity, sp.ending_price
   FROM order_quantity OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS oa,
   stock_price OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS sp WHERE sp.ticker = oq.ticker;

Stream-Table JOINs

You can use the JOIN clause to enrich the content of a stream by treating one of the streams columns as a foreign key to a table, and finding table data that matches each row from the stream. In effect, this means computing the relational join of the stream with the table.

There are currently two ways to perform a stream-to-table join:

  • Using a stream-table join in SQL, described here.

  • Using the TableLookup UDX, as described in the topic Table Lookup UDX in the Integration Guide. We recommend only using this option if the stream-table join described here is insufficient. Most notably, the Table Lookup UDX lets you control how table data is fetched and cached. You can also subclass the UDX for special semantics. While performance is good, you need to call this as a UDX. Also, the s-Server query engine is unable to push projections and filters into the UDX.

Stream-Table JOIN

With stream-table joins, s-Server allows the ON condition and the USING condition as described above, and allows both INNER JOIN and OUTER JOIN. Either input can be a foreign table, a native stream, a foreign stream, a view, or a subquery.

The query engine detects when an input is like a stream (such as a stream or streaming view) and when it is like a table (such as a native table or table view). The join of two table-like inputs is a normal join. This works as a hash-join or an index-scan. The join of two stream-like inputs is described above.

When one input is stream-like and the other is table-like, the engine implements the query as a static stream-table join.

With a stream-table join, s-Server computes the stream by capturing the table or table view at the start of the query, and loading its data into a lookup-structure. Streaming input is read row by row; for each row, s-Server identifies matches in the table data, using the condition you have defined for the join, it emits matching rows based on this condition. For an outer join, if there are no matches, s-Server emits nulls as the match.

Limitations to Stream-Table JOIN

The table or table view input must fit into memory. However, since the query planner can push filters and projections through the join operator, this input can be much smaller than the whole table.

You cannot refresh the loading table data, except by executing the query again.

An outer join must be one-sided. It cannot detect table rows that never matched a stream row.

The join must be an equi-join (a join using an equality operator), not a general theta join.

Note: Joins using a tailing table stream have been defined but not implemented. Tailing foreign tables as a stream is defined in the topic Tailing Foreign Tables with SQL/MED in the Integration Guide.

Examples of Static Stream/Table JOIN

The following example uses a stream and a table that match on a key value called animal_id.:

The stream is a simple input stream of observed beast events:

CREATE STREAM beast_events (
    row_id integer not null,
    sensor integer,
    zone integer,
    animal_id int,
    weight integer
);

The foreign table lists animal attributes:

CREATE FOREIGN TABLE animal_data (
    animal_id int not null primary key,
    boolean edible;
    boolean poisonous;
    name varchar(32) not null.
    species varchar(32) not null,
    genus varchar(32) not null
)      
SERVER FILE_SERVER
OPTIONS 
( "DIRECTORY" '/data/'
, "FILENAME_PATTERN" 'animals.csv'
, "PARSER" 'CSV' 
, "SKIP_HEADER" 'true'
);

Here are examples of SQL to join this stream to this foreign table, both valid and invalid.

Simple join:

SELECT STREAM * from beast_events e join animal_data a using (animal_id);

Valid left outer join:

SELECT STREAM * from beast_events e left join animal_data a using (animal_id);

Invalid theta join:

SELECT STREAM * from beast_events e left join animal_data a on (e.animal_id > a.animal_id);

Valid equi-join with extra filters:

SELECT STREAM * from beast_events e join animal_data a   on (e.animal_id = a.animal_id and a.edible and not a.poisonous);

Using JOINs to Enrich Stream Data

You can use JOINs to enrich stream data. For example, RIGHT OUTER JOIN can be a powerful tool to generate results for all rows in a defined data set. Let’s say we have a stream and a table, each of which has information on stores in 200 different cities.

  • The stream, city_sales, contains sales data by city, with each city containing one store.
  • The table, city_info, contains information on the store in each city.

You can use a RIGHT OUTER JOIN to produce hourly sales reports, and return data even in cases where city_sales has no rows. This way, if a batch of hourly data has data aggregated for 125 cities, a RIGHT OUTER JOIN can ensure that exactly 200 rows are generated every hour even if no sales were reported in some cities.

You would do so with code along the following lines:

SELECT STREAM cs.*, ci.*
FROM city_sales OVER (ORDER BY FLOOR(ROWTIME TO HOUR)
                      RANGE CURRENT ROW) AS cs
RIGHT OUTER JOIN city_info AS ci
USING(city);

The column city joins the two streams, with USING as the join condition.

Because it uses ORDER BY FLOOR(ROWTIME TO HOUR, the window on the city_sales stream is a hopping window of zero size. That means that rows accumulate in the window until ROWTIME moves to a new hour. At that point, all rows from the previous window are dropped from the window, because (ORDER BY FLOOR(ROWTIME TO HOUR) will evaluate to the new hour. The “zero size” part refers to RANGE CURRENT ROW.

When the window frame shrinks to remove all rows for the past hour, RIGHT OUTER JOIN and emits all rows from city_info that did not match with any of the last batch of hourly rows from the stream.

Multi-way JOINs

To do a three way join, you use a joined table-reference as the table-reference in a JOIN statement. Here, stream/table 1 (b1) relates to stream/table 2 (asks) and stream/table 2 relates to stream/table 3 (b2), on the column “ticker”.

SELECT STREAM * from bids over (range interval '1' hour preceding) as b1
join asks over (range interval '2' second preceding)
on b1."ticker" = asks."ticker"
join bids over (range interval '3' minute preceding) as b2
on b2."ticker" = asks."ticker";

Using Partitions with Streaming JOIN

Within a WINDOW, the PARTITION BY clause is used together with analytic functions (AVG, MIN, MAX, SUM, etc.) to restrict calculations to a subset of rows that match the PARTITION BY column’s value in the current row. That may sound abstract, but consider a simple example. Suppose a WINDOW covers the preceding 5 minutes with a PARTITION BY customer_id. That means SELECT SUM(bandwidth) would output the total bandwidth used during the previous 5 minutes by the single customer_id identified in the current row. Without PARTITION BY, SELECT SUM(bandwidth) would output the total bandwidth used during the previous 5 minutes by all customers.

JOIN can be used with streams containing PARTITION BY. However, the PARTITION BY clause must include one or more of the JOIN key column(s) — i.e. those listed after USING. Thus, in the following example deviceId appears both after JOIN … USING and after PARTITION BY.

CREATE VIEW join_view AS
SELECT STREAM s2.deviceId AS deviceId, duration, location
FROM s2 OVER (RANGE CURRENT ROW)
    JOIN
     s1 OVER (PARTITION BY deviceId ROWS 1 PRECEDING)
  --note that deviceId is one of the JOIN columns
    USING(deviceId);

Imagine a stream of purchase orders (orders_stream) coming from an online retailer. Each row contains a number of “dimensions”: customer, seller, product, promotion code, time, and so forth. You might want to reference additional data for 1 of these dimensions, which would be incorporated from another table or stream using JOIN.

Suppose you want to connect the orders_stream with an RDBMS table of customer data. From time to time, customer data changes. So you might wish to handle the latter as a stream also (customers_stream), abstracted from the RDBMS system. Consequently, a new row might arrive in customers_stream when a customer moves from San Jose to San Francisco.

This example follows the general outline of a “fact stream” (our orders_stream) JOINed to a “dimension stream” (our customers_stream). Typically, a dimension stream is very sparse, in terms of newly arriving rows, compared to a fact stream. As orders are placed, our fact stream might generate thousands of rows per minute. In contrast, our dimension stream might generate only 100 rows per week, depending on how often customer data changes.

For orders_stream, let’s suppose that you PARTITION BY customer_id. This enables calculating MAX, AVG, SUM on each customer’s order history during some recent WINDOW. Using JOIN, you can incorporate additional information from the fairly stable data in the customers_stream: name, address, phone number, etc. By abstracting the “dimension stream” from RDBMS, you can ensure the JOIN provides the most up-to-date customer information. This way, if a customer moves while an order is still being processed, the updated address will be available, thanks to the JOIN.

A code snippet for this example might look something like this:

    orders_stream JOIN customers_stream
    OVER (PARTITION BY customerID ROWS CURRENT ROW)
    USING(customerID)

Note: When you create a stream that abstracts a database table, the ROWTIME will be the commit_time in a relational database. For more information on creating foreign streams from database tables, see Tailing Foreign Tables with SQL/MED.

Using Sessions with Streaming JOIN

Sessions are partitioned windows whose size and start/end location within the stream are defined by the streaming data itself. Sessions can be the basis of JOINs, following the general case of partition-based JOIN, discussed above. JOIN is supported for all types of sessions, whether those are defined using START WHEN, END WHEN, TIMEOUT AFTER, or a combination of those methods. However, some technical limitations apply.

Example

The following example is a bit contrived, compared to more complex real-world scenarios. But it serves to illustrate the mechanics of a stream-on-stream JOIN where one of the streams employs SESSION. For simplicity, only a minimal set of columns is shown. Also, for clarity, customers are identified by first name rather than numerical ID.

Imagine two streams of customer transaction data: s1 and s2. Within s1, rows for each customer are organized by SESSION, according to events that trigger the session’s start / end. Therefore it is possible to identify rows as belonging (or not) to a particular session ID and to calculate analytic functions (such as AVG, MAX, and SUM) for each customer session as a whole.

But let’s suppose that a separate stream, s2, containing important transaction data for an overlapping set of customers, has no SESSION information. To collate the sessionized stream, s1, with non-sessionized data from s2, you might want to use JOIN, as in the following example:

CREATE OR REPLACE SCHEMA example_schema;
SET SCHEMA 'example_schema';
CREATE OR REPLACE STREAM s1  (status integer, customer varchar(8));
CREATE OR REPLACE STREAM s2  (charge decimal(5,2), customer varchar(8));

SELECT STREAM rowtime, * FROM
    s1 OVER (
        PARTITION BY SESSION ON customer
        START WHEN status = 0 
        END WHEN status = 2
        TIMEOUT AFTER INTERVAL '3' MINUTE
        RANGE INTERVAL '30' MINUTE PRECEDING
    ) 
    FULL OUTER JOIN s2 
    USING (customer);

Notice, only s1 uses SESSION. In this case, a new customer session begins either when the first row is received or when status = 0; and the session ends either when status = 2 or else after 3 minutes of inactivity (meaning no rows received for the customer in question). The RANGE can be disregarded, as it is not relevant to this example.

Recall that the output of a FULL JOIN (a.k.a. FULL OUTER JOIN) includes all rows from both of the streams / tables being JOINed, whether or not they match on the JOIN key column(s). When values are present in only 1 of the streams / tables, then NULL is assigned to columns for the other stream / table.

However, the SELECT query above defines an implicit WINDOW for s1. That is due to the inline specification of OVER. Because rows that don’t satisfy the WINDOW criteria are excluded, only rows from s1 that are assigned to an active session will figure in the JOIN output. Other rows from s1 will be omitted, despite using a FULL JOIN.

S1 Rows:

Assume the following rows for s1:

ROWTIME STATUS CUSTOMER
2021-10-18 00:01:00 0 Claudia
2021-10-18 00:01:15 1 Mahmoud
2021-10-18 00:01:30 1 Claudia
2021-10-18 00:02:00 2 Claudia
2021-10-18 00:05:00 3 Mahmoud

Claudia’s session begins with status = 0 at time 00:01:00 and ends with status = 2 at 00:02:00. In between, the value of status = 1 for Claudia has no effect on her active session.

If Mahmoud already has an active session, then it remains active at 00:01:15, given a value of status = 1 that has no effect. On the other hand, if Mahmoud does not already have an active session, then a session begins with the arrival of the row at 00:01:15. During the ensuing 3 minutes, no new rows are received for Mahmoud. Therefore, at 00:04:15 Mahmoud’s previous session ends; and a new session begins. Later, at 00:05:00, another row for Mahmoud is received; and because its status = 3 has no effect, this row is included in the session that began with the timeout.

Also, let’s assume that other customers have no active session if they are not named in the rows above.

S2 Rows:

In the query above, no SESSION is applied to the second stream, s2. Yet it does contain additional information (charge), which we would like to combine with the sessionized data from s1.

ROWTIME CHARGE CUSTOMER
2021-10-18 00:01:05 0.28 John
2021-10-18 00:01:45 1.57 Claudia
2021-10-18 00:02:30 0.83 Claudia
2021-10-18 00:04:30 2.90 Mahmoud

FULL JOIN Output:

Let’s proceed step by step to understand the output of this FULL OUTER JOIN. Remember, the complete s1 stream is not being used. Rather, only the subset of s1 rows that satisfy its WINDOW criteria will be available for the JOIN.

First up in chronological ROWTIME order is s2.customer = “John” at time 00:01:05. This customer does not appear in s1 at all. Therefore he has no active session. Consequently, the FULL OUTER JOIN output assigns NULL to the s1 columns:

ROWTIME S1.STATUS S1.CUSTOMER S2.CHARGE S2.CUSTOMER
2021-10-18 00:01:05 0.28 John

Next comes s2.customer = “Claudia” at time 00:01:45. Does she have an active session? Yes, it began at 00:01:00 and won’t end until 00:02:00. By this time (00:01:45), Claudia has 2 rows that belong to her s1 session. Both of those will be included in the JOIN:

ROWTIME S1.STATUS S1.CUSTOMER S2.CHARGE S2.CUSTOMER
2021-10-18 00:01:45 0 Claudia 1.57 Claudia
2021-10-18 00:01:45 1 Claudia 1.57 Claudia

At 00:02:00, based on s1, Claudia’s session is terminated. That row from s1 is included in the JOIN output, although only NULL values can accompany it from s2:

ROWTIME S1.STATUS S1.CUSTOMER S2.CHARGE S2.CUSTOMER
2021-10-18 00:02:00 2 Claudia

Next comes s2.customer = “Claudia” again at 00:02:30. This time, she does not have an active session. Hence NULL values are assigned to the s1 columns:

ROWTIME S1.STATUS S1.CUSTOMER S2.CHARGE S2.CUSTOMER
2021-10-18 00:02:30 0.83 Claudia

Based on the 3-minute TIMEOUT rule defined in the query above, the SESSION that began for Mahmoud in s1 at 00:01:15 is terminated at 00:04:15. Although no row for this timestamp is explicitly present in either s1 or s2, it is included in the JOIN output along with the value of s1.status = 1, which was last seen for Mahmoud in s1:

ROWTIME S1.STATUS S1.CUSTOMER S2.CHARGE S2.CUSTOMER
2021-10-18 00:04:15 1 Mahmoud

Next, at 00:04:30 comes s2.customer = “Mahmoud” once again. Since there is no active session for Mahmoud in s1, NULL values are assigned to the s1 columns:

ROWTIME S1.STATUS S1.CUSTOMER S2.CHARGE S2.CUSTOMER
2021-10-18 00:04:30 2.90 Mahmoud

Stitching together the foregoing step-by-step discussion, the entire output appears as follows:

ROWTIME S1.STATUS S1.CUSTOMER S2.CHARGE S2.CUSTOMER
2021-10-18 00:01:00 0.28 John
2021-10-18 00:01:45 0 Claudia 1.57 Claudia
2021-10-18 00:01:45 1 Claudia 1.57 Claudia
2021-10-18 00:02:00 2 Claudia
2021-10-18 00:02:30 0.83 Claudia
2021-10-18 00:04:15 1 Mahmoud
2021-10-18 00:04:30 2.90 Mahmoud

Limitations of JOIN for Partitioned Windows

Limitation #1: The WINDOW clause can make use of a RANGE … PRECEDING that is either a logical interval (time-based) or a physical number of rows (row-based). Either of these methods can be combined with PARTITION BY to create a partitioned WINDOW. However, if a partitioned WINDOW is included as part of a JOIN, then the following restriction applies:

  • Sessions, which are a special kind of partitioned WINDOW, can use a RANGE … PRECEDING that is either time-based or row-based. JOIN is permitted in both cases. (See above.)
  • Other partitioned WINDOWs, which don’t include SESSION ON, cannot be part of a JOIN if using time-based RANGE … PRECEDING — only row-based.

For example, a stream with the following WINDOW is permisible – but not for use in a JOIN:

WINDOW "partition" AS
    (PARTITION BY ticker
    RANGE INTERVAL '5' HOUR PRECEDING)

But in the case of SESSION ON, a time-based RANGE would be allowed:

WINDOW "partition" AS
    (PARTITION BY SESSION ON ticker
    END WHEN status = 'exit'
    RANGE INTERVAL '5' HOUR PRECEDING)

Limitation #2: For the purpose of a JOIN, if a session is specified, then all partition keys must be part of that session. In other words, the list of columns after PARTITION BY must not contain any column that is not also listed after SESSION ON.

WINDOW "awindow" AS
    (PARTITION BY ticker
    SESSION ON trader
    --uses simple flags for START WHEN and END WHEN
    START WHEN (s.action='START')
    END WHEN (s.action='END')
    RANGE INTERVAL '24' HOUR PRECEDING)

For example, the WINDOW defined above is legitimate - but not as part of a JOIN. That is because ticker is present as a partition key but not listed after SESSION ON.