With conventional databases, you execute a SELECT query, the query runs, and the result is all the rows in selected tables that match the restrictions of the SELECT statement. That works because tables are finite: a SELECT statement grabs whatever rows are available at the moment the query is run. If someone were to add rows to a table after you’ve run the query, those rows would not be part of the query’s results. In a conventional relational database application, calculations such as SUM, COUNT, or MAX or operations such as JOIN, GROUP BY, or PARTITION, can know all the data they need in order to produce a correct result.
But when you are querying streams, the result of a SELECT query continues to add more rows, because the sources feeding these rows are continually updating data. So in a streaming context, the SELECT statement cannot know in advance all the rows that will be part of the query’s result. That means that if you’re going to run aggregations on streaming data, you will need some way to block off chunks of data to which you will apply these aggregations.
To solve this problem, you apply functions and operations over a subset of records called a window. Each window contains, at any given time, a subset of streaming rows defined by time, number of rows, or another numeric expression. Examples of windows include “every ten minutes” or “the last thirty seconds” or “every millimeter”, and so on.
We’re going to call the set of rows to which the analytic is applied the window frame. In the diagram below, the window frame, highlighted in blue, lasts from 0 seconds after 2:00 on February 2, 2022, to 10 seconds after 2:00 on the same date.
If you’re used to working with tables that have fixed number of rows, you can think of windows as a kind of “virtual” table–windows set up a fixed window so that you can run queries on them. Actually, they’re not really fixed windows–they evolve over time, and are sometimes open ended. But it’s still a rough idea of how windows work in streaming SQL.
Time-based aggregation windows use rows' rowtime in order to determine windows. Many windows end at current row, which is the latest rowtime that s-Server has seen.
The Inspect Trades window shows a sample input stream of stock trades (taken from an Inspect window in SQLstream s-Studio, used for inspecting stream or table inputs and outputs):
This same input stream can be processed by either form of aggregation. The input stream is created by the following code:
CREATE SCHEMA "Trading" DESCRIPTION 'Contains the equity trades application objects'; SET SCHEMA '"Trading"'; CREATE FOREIGN TABLE "Tickers" ( "Ticker" VARCHAR(5) NOT NULL, description VARCHAR(100) ) SERVER FILE_SERVER OPTIONS ( "DIRECTORY" '/data/' , "FILENAME_PATTERN" 'tickers.csv' , "PARSER" 'CSV' , "SKIP_HEADER" 'true' ); CREATE STREAM "TRADES" ( "Ticker" VARCHAR(5) NOT NULL, "Shares" INTEGER , "Price" DECIMAL(6,2) );
The Set Schema command enables referencing objects in the schema without fully qualifying the object name with the schema name (see also the SET SCHEMA command in the s-Server Streaming SQL Reference Guide). Since no quotes surround the column names given in lower case, they are saved in upper case. If lowercase distinctions are preferred for the column names, those names would need quotes around them, as has been done for the schema, table, and stream names.
As illustrated in the graphic below, the input stream can be read and processed by SQLstream statements performing windowed aggregation, shown on the right, or by SQLstream statements performing streaming aggregation, shown on the left.
Windowed Aggregation and Streaming Aggregation have important differences that are described below.
Windowed aggregation uses a rolling window with a specified set of rows. A windowed aggregation query can define a rolling window as having a fixed number of rows (a row-based window ) or a varying (though finite) number of rows. With a varying number of rows, the number is not fixed in advance but rather is determined by some logical specification (time-based window ). The windowed aggregation window contains any newly qualifying row and all older qualifying rows up to the specified number of rows or, for a time-based specification, all rows meeting that specification.
Example of several windowed aggregations over a 1-hour rolling window
CREATE OR REPLACE VIEW "Orders & Shares Trends" DESCRIPTION 'rolling-period orders and volume trends for TRADES' AS SELECT STREAM T.ROWTIME AS "orderTime", "Ticker", "Shares", "Price", COUNT (*) OVER "last_Hour" AS "ordersPerHour", COUNT (*) OVER "last_ten_min" AS "ordersLastTen", SUM("Shares") OVER "last_Hour" AS "sharesPerHour", SUM("Price" * "Shares") OVER "last_Hour" AS "amtPerHour" FROM TRADES AS T WINDOW "last_Hour" AS (RANGE INTERVAL '1' HOUR PRECEDING), "last_ten_min" AS (RANGE INTERVAL '10' MINUTE PRECEDING);
Note: RANGE and ROWS operate differently, as follows: RANGE is only for time-based windows, and requires a valid INTERVAL specification, which defines a time duration.
INTERVAL ‘1’ HOUR is legal, but ROWS 10 is not.
For row-based windows, the proper syntax example is ROWS 10 PRECEDING.
See WINDOW clause in the Streaming SQL Reference guide for more details.
Output Each row arriving on TRADES changes the set of rows defined within the window and produces an output row with the new aggregated values.
Streaming aggregation uses time-based windows with many-rows-in, one-row-out behavior, applying the SQL constructs SELECT STREAM DISTINCT … or, more commonly, SELECT STREAM … GROUP BY. In this type of aggregation, incoming rows that belong to the group are added to the running aggregations. An incoming row that belongs to the next group ends the current group, causes that current group’s aggregations to be emitted as a single output row, and begins the next group.
For streaming aggregation using GROUP BY in a streaming SELECT statement, the first group-by term must be time-based. For example GROUP BY FLOOR(S.ROWTIME) TO HOUR will yield one output row per hour for the previous hour’s input rows. The GROUP BY can specify additional partitioning terms. For example, GROUP BY FLOOR(S.ROWTIME) TO HOUR, USERID will yield one output row per hour per USERID value.
Example of streaming aggregations to perform periodic reporting over a sliding 1-hour window
CREATE OR REPLACE VIEW "Orders & Shares Groups" DESCRIPTION 'rolling-period order groups for TRADES' SELECT STREAM FLOOR(T.ROWTIME TO HOUR) AS "hour", "Ticker", COUNT(*) AS "ordersPerHour" FROM TRADES AS T GROUP BY FLOOR(T.ROWTIME TO HOUR), "Ticker"
Output Each row arriving on TRADES adds to the count of orders for that ticker, i.e., the trade represented by that row, within the hour in which it arrived. Each ticker named in trades arriving during that hour provides a single output row in the streaming aggregation output, showing how many trades for that ticker occurred in that hour.
Here are some examples of aggregation windows:
A sliding window always applies an analytic to a set period of time, which moves along steadily. That is, a sliding window is always exactly 10 minutes of data (or, as we’ll see, 10 rows).
A hopping window also applies an analytic to a set period of time, but moves ahead in jumps, such that, for example, one minute is subtracted from the analytic window every minute.
Hopping windows are supported as of s-Server 5.3.
Offset windows also use a set period of time–actually, these can be sliding or hopping–but the time period ends at some time earlier than the present–such as “10 seconds ago” or “1 minute ago”. These say to s-Server “give me all the rows for a ten minute interval that ends 1 minute ago”.
Offset windows are supported as of s-Server 5.3.
An unbounded window calculates from first available data until the current row. A snapping window applies an analytic to a time period that began one hour ago, and starts over at the top of the hour.
Tumbling windows accumulate a batch of rows of the input to produce a set of aggregated result rows. Technically, they are not “windows”–they’re the result of a GROUP BY statement run on rowtime.
For all of these windows, you can specify DISTINCT using SELECT STREAM DISTINCT.
You can also use PARTITION BY to segment results. The window function is applied to each partition separately and computation restarts for each partition.
Many windows end with the present, which is what we call current row.
There are some slightly tricky things about current row. Mostly, you need to know that s-Server thinks of current row as “the first time we see a new rowtime,” and starts making calculations accordingly, The tricky thing is that other rows may come in with the same rowtime. These will be incorporated into later calculations, but calculations begin as soon as the first row hits s-Server. That’s a good thing if you want your calculation to tell you when the temperature of a set of sensors exceeds an average–so that you can shut down whatever is causing the temperature to go up before things light on fire. But it can be a little weird to imagine that windows that involve current row incorporate rows that have the current rowtime, but not all of them.