Windowed Aggregation on Streams

In analyzing streaming data, you will often make use of windowed aggregation. Windowed aggregation performs an analytic function across a window specified by either time, such as “one hour proceeding” or rows, such as “the last six rows”.

Time-Based Windows

A time-based window is a defined by a rowtime interval. The window’s defining criteria specify a limited set of rows, using a rowtime-based specification. At any arbitrary wall-clock-time, the number of rows to be found in that window can vary, based on the number of rows that have arrived whose rowtimes falls within the window’s defined period. (The SQL standard calls this a “logical” window, and calls a row-based Window a “physical” window.)

For example, RANGE INTERVAL '1' HOUR PRECEDING specifies that the window contains all rows whose ROWTIMEs are within the hour preceding the stream’s current time, including endpoints. (That time is usually the rowtime of the most recent row received.)

For time-based windows on streams, analytic functions are normally only able to identify the complete set of rows (and hence calculate a result) once a row arrives that falls outside the later bound of the current window. For example, in the case of a window that is RANGE INTERVAL ‘1’ MINUTE FOLLOWING, the arrival of a row with a rowtime more than one minute later than that of the row for which the window is being evaluated would indicate that there will be no more rows in the window, and the query can return a result.

Alternately, you can use a rowtime bound can be used to indicate that no more rows will arrive within a given window, enabling the query to return a result.

Row-Based Windows

A row-based window is specified by a fixed number of rows. For example, “ROWS 10 PRECEDING” specifies that only the latest 10 rows be included in the window. (The SQL standard calls this a “physical” window, and calls a Time-based Window a “logical” window.)

For row-based windows on streams, such as “ROWS 3 PRECEDING”, rowtime bounds have no effect on windows (and hence, on the evaluation of analytic functions). See below for examples of rowtime bounds in windowed aggregation.

Assume the following information flowing through the stream WEATHERSTREAM:

ROWTIME CITY TEMP
2018-11-01 01:00:00.0 Denver 29
2018-11-01 01:00:00.0 Anchorage 2
2018-11-01 06:00:00.0 Miami 65
2018-11-01 07:00:00.0 Denver 32
2018-11-01 09:00:00.0 Anchorage 9
2018-11-01 13:00:00.0 Denver 50
2018-11-01 17:00:00.0 Anchorage 10
2018-11-01 18:00:00.0 Miami 71
2018-11-01 19:00:00.0 Denver 43
2018-11-02 01:00:00.0 Anchorage 4
2018-11-02 01:00:00.0 Denver 39
2018-11-02 07:00:00.0 Denver 46
2018-11-02 09:00:00.0 Anchorage 3
2018-11-02 13:00:00.0 Denver 56
2018-11-02 17:00:00.0 Anchorage 2
2018-11-02 19:00:00.0 Denver 50
2018-11-03 01:00:00.0 Denver 36
2018-11-03 01:00:00.0 Anchorage 1

Let’s say we want to find the minimum and maximum temperature recorded in the 24-hour period prior to any given reading, globally, regardless of city. To do this, we define a window of RANGE INTERVAL ‘1’ DAY PRECEDING, and use it in the OVER clause for the MIN and MAX analytic functions:

SELECT STREAM
ROWTIME,
MIN(TEMP) OVER W1 AS WMIN_TEMP,
MAX(TEMP) OVER W1 AS WMAX_TEMP
 FROM WEATHERSTREAM
 WINDOW W1 AS (
RANGE INTERVAL '1' DAY PRECEDING
);

Results:

ROWTIME WMIN_TEMP WMAX_TEMP
2018-11-01 01:00:00.0 29 29
2018-11-01 01:00:00.0 2 29
2018-11-01 06:00:00.0 2 65
2018-11-01 07:00:00.0 2 65
2018-11-01 09:00:00.0 2 65
2018-11-01 13:00:00.0 2 65
2018-11-01 17:00:00.0 2 65
2018-11-01 18:00:00.0 2 71
2018-11-01 19:00:00.0 2 71
2018-11-02 01:00:00.0 2 71
2018-11-02 01:00:00.0 2 71
2018-11-02 07:00:00.0 4 71
2018-11-02 09:00:00.0 3 71
2018-11-02 13:00:00.0 3 71
2018-11-02 17:00:00.0 2 71
2018-11-02 19:00:00.0 2 56
2018-11-03 01:00:00.0 2 56
2018-11-03 01:00:00.0 1 56

Now,, let’s assume we want to find the minimum, maximum, and average temperature recorded in the 24 hour period prior to any given reading, broken down by city. To do this, we add a PARTITION BY clause on CITY to the window specification, and add the AVG analytic function over the same window to the selection list:

 SELECT STREAM
ROWTIME,
CITY,
MIN(TEMP) over W1 AS WMIN_TEMP,
MAX(TEMP) over W1 AS WMAX_TEMP,
AVG(TEMP) over W1 AS WAVG_TEMP
 FROM AGGTEST.WEATHERSTREAM
 WINDOW W1 AS (
PARTITION BY CITY
RANGE INTERVAL '1' DAY PRECEDING
 );

Results:

ROWTIME CITY WMIN_TEMP WMAX_TEMP WAVG_TEMP
2018-11-01 01:00:00.0 Denver 29 29 29
2018-11-01 01:00:00.0 Anchorage 2 2 2
2018-11-01 06:00:00.0 Miami 65 65 65
2018-11-01 07:00:00.0 Denver 29 32 30
2018-11-01 09:00:00.0 Anchorage 2 9 5
2018-11-01 13:00:00.0 Denver 29 50 37
2018-11-01 17:00:00.0 Anchorage 2 10 7
2018-11-01 18:00:00.0 Miami 65 71 68
2018-11-01 19:00:00.0 Denver 29 50 38
2018-11-02 01:00:00.0 Anchorage 2 10 6
2018-11-02 01:00:00.0 Denver 29 50 38
2018-11-02 07:00:00.0 Denver 32 50 42
2018-11-02 09:00:00.0 Anchorage 3 10 6
2018-11-02 13:00:00.0 Denver 39 56 46
2018-11-02 17:00:00.0 Anchorage 2 10 4
2018-11-02 19:00:00.0 Denver 39 56 46
2018-11-03 01:00:00.0 Denver 36 56 45
2018-11-03 01:00:00.0 Anchorage 1 4 2

Rowtime Bounds and Windowed Aggregation

This is an example of a windowed aggregate query:

 SELECT STREAM ROWTIME, ticker, amount, SUM(amount)
OVER (
PARTITION BY ticker
RANGE INTERVAL '1' HOUR PRECEDING)
 AS hourlyVolume
 FROM Trades

Because this is a query on a stream, rows pop out of this query as soon as they go in. For example, given the inputs:

Trades: IBM 10 10 10:00:00
Trades: ORCL 20 10:10:00
Trades.bound: 10:15:00
Trades: ORCL 15 10:25:00
Trades: IBM 30 11:05:00
Trades.bound: 11:10:00

the output will be:

Trades: IBM 10 10 10:00:00
Trades: ORCL 20 20 10:10:00
Trades.bound: 10:15:00
Trades: ORCL 15 35 10:25:00
Trades: IBM 30 30 11:05:00
Trades.bound: 11:10:00

The rows still hang around behind the scenes for an hour, and thus the second ORCL row output has a total of 35; but the original IBM trade falls outside the “hour preceding” window, and so is excluded from the IBM sum.

Windowed Aggregation Specifications

Interval Clause

Example:

Some business problems seem to need totals over the whole history of a stream, but this is usually not practical to compute. However, such business problems are often solvable by looking at the last day, the last hour, or the last N records. Sets of such records are called windowed aggregates. They are easy to compute in a stream database, and can be expressed in standard SQL as follows:

SELECT STREAM ticker,
  avg(price  OVER  lastHour AS avgPrice,
  max(price) OVER  lastHour AS maxPrice
FROM Bids
WINDOW lastHour AS  (
  PARTITION BY ticker
  RANGE INTERVAL '1' HOUR PRECEDING)

Note: The Interval_clause must be of an appropriate type:

  • An integer literal with ROWS
  • A numeric value for RANGE over a numeric column
  • An INTERVAL for a RANGE over a date/time/timestamp