Rowtime Bounds

Rowtime bounds are an important streaming extension to SQL. Their purpose is to produce timely output. The topic is covered in the following sections:

Managing Late Rows

Streaming data depends on data being in order, tracked by a special column called ROWTIME. Such operators track the latest rowtime, known as the “highwater mark” or current streamtime. When rows arrive with rowtimes earlier than this highwater mark, they are discarded as late. These rows are called “late” because their timestamps are out of order. Late rows are discarded. They will not processed as an input stream, nor put into an output stream. This means, for example, that if you are archiving rows into an RDBMS database, late rows will not be written.

You can avoid the problem of late rows by T-sorting your data using the ORDER BY clause of the SELECT statement. This clause implements a timesort XO that uses a sliding time-based window of incoming rows to reorder those rows by ROWTIME. For more information on t-sorting stream input, see the subtopic T-sorting Stream Input in the topic ORDER BY clause in the SQLstream Streaming SQL Reference Guide.

Late rows are logged to both the trace log and the error stream. Because the trace log could quickly fill up with a log of each individual row, they are logged in powers of ten–the first row, the tenth row, the hundredth row, the ten thousandth row, and so on. You will not see late rows logged between late row 100 and late row 10,000, or late row 10,000 and late row 100,000.

If you need more granular information on late rows logged, you can change the tracer level for com.sqlstream.aspen.native.xo.laterow to FINEST. This will fill your trace log with late rows, but will give you information on every late row logged.

You can change the default tracing level by editing the trace.properties file, usually located at /var/log/sqlstream/Trace.properties.

To view more logging, open one of the properties files in a text editor and uncomment (remove the “#") the following line:

# com.sqlstream.aspen.native.xo.laterow=FINEST

Note: All errors that are logged to the trace log are also available in a system management stream called sys_boot.mgmt.all_trace. The latest (up to 1000) error messages are also available in a table called sys_boot.mgmt.saved_trace. You can use these to create visualizations of errors; they can be accessed from SQLline or anywhere that has a JDBC connection to s-Server. See the Global Error Stream for more details.

What are Rowtime Bounds?

Before explaining rowtime bounds, let’s begin by reviewing the role of time in a data stream.

Streams, Rows and Rowtimes

A stream is a sequence of timestamped data rows. All rows in a stream have the same rowtype (the same list of columns).

The column rowtime is mandatory, and the stream is ordered by rowtime. That means that a row follows all earlier rows (each such row having a smaller rowtime), and precedes all later rows (each such row having a larger rowtime). Stream rows need to be timesorted before being accepted: if an incoming row has a timestamp earlier than rows already-received-and-accepted, that incoming row is rejected. For example, if the stream time is 10:00 (due to receiving a row or a rowtime-bound with 10:00 as its rowtime), then incoming rows with rowtimes of 9:58, 9:47, 9:59 will all be rejected as out of order, even though 9:59 follows 9:47 (Several adjacent rows can have the same rowtime, and can be reordered by application logic, without violating the constraint that rowtimes must not be decreasing.)

Often the rows in a stream represent real-world observations or events, and the rowtime is the actual time of the event. The application that inserts a row can set the rowtime value explicitly; the default value of the rowtime is the system clock time of the insertion.

A named stream is a stream defined in the catalog by a CREATE STREAM statememt. It is a persistent object, accessible by its name in SQL. Applications can INSERT rows into it, or read rows from it:

INSERT into foo (x, y) VALUES(101, 118);
SELECT STREAM * from foo;

Here the query results are the entire contents of stream foo. Now consider a more complex query, such as SELECT STREAM rowtime, (x + y), z from foo where z > 12;

Past, Present, and Future

Picture a stream as a horizontal line, its timeline, with the past at the right. The rows are dots that appear on the line, each new dot appearing to the left of all its predecessors. There is a natural frontier, the present, which separates the past (at the right, with dots) from the future (to the left, with unknown invisible dots).

The rowtime of its latest row serves as a natural clock for a stream. We call it the stream clock. For example, when a row with rowtime 10:00 is added to stream S, the stream clock of S becomes 10:00. When the next row is inserted, with rowtime 10:05, the clock jumps to 10:05.

Since an added row cannot be earlier than its predecessors, we know that all future rows of stream S have a rowtime no earlier than the clock of S. That is, the stream clock is a lower bound for future rows. Hence a synonym for stream clock is “rowtime bound”.

To restate this: a rowtime bound is an assertion about the future contents of a stream. It states that the next row in the stream will have a rowtime no earlier than t, the value of the bound. This is a mathematical lower bound on the set of future rowtimes. Note that with rowtimes “earlier” is the same as “less”, and “later” is the same as “more”. If we compare two bounds on the same stream, with values t1 and t2, where t2 > t1, then we call t2 the stronger or stricter bound, because mathematically it is a more restrictive constraint, and hence also a more informative one.

(If the rowtimes in the last example reflect real time, there is a five minute wait between these two rows. But if the rowtimes are historical and all the data is available, the system can process it as fast as possible, and the “stream clock” shows a fictional time that jumps instantaneously from 10:00 to 10:05. In both cases there seems no point to imagining the clock as continuous, and asking about the stream S at 10:01 etc.)

We can add a relational operation to this picture by imagining it as a black box, with one or more input streams going in, at the left side, and a result stream coming out at the right side. As always, the output rows are added to the result stream one at a time and in rowtime order. The output is computed from past and present input rows – but not from the unknown future inputs.

Note that duplicate row times can occur and are acceptable in input or output streams.

To read more about removing duplicate records, based on more than rowtime, see the topics SELECT ALL and SELECT DISTINCT and GROUP BY in the s-Server Streaming SQL Reference Guide.

Rowtime Bounds: a hint about the future

There are situations where a bit of insight into the future is feasible and can make a big difference, especially where rows are merged or grouped. Here are two simple examples: Grouping by Hours and Merging Two Streams.

Example: Grouping by Hours

Consider a stream of color-coded events, and a black box counting events, with a separate count for each color. Each hour, on the hour, the box outputs the total count for the last hour. (In SQL, you implement this as a streaming GROUP BY hour, where hour is calculated from the rowtime.) On the surface, implementing the black box seems trivial: every hour, as defined by the internal “stream clock”, you reset the count. The problem arises in cases where the stream is updated long after the hour closes, say at 6:15 when the hour closed at 5:00. These cases can produce late or inaccurate counts. We get around this problem by implementing a rowtime bound.

But consider the following data: input stream

rowtime color
3:01 blue
3:15 red
4:00 red
4:06 blue
4:45 blue
6:15 red

The arrival of the row (4:00, red) moves the input clock to 4:00, closing off the period from 3:00 to 4:00. During this period there were 5 input rows, and the counts are (red = 3, blue = 2).

The next period starts at 4:00 and ends just before 5:00. This period also includes 6 rows, with counts (red = 3, blue = 3). However the input stream clock jumps from 4:49 to 6:15, and it is only on seeing the row at 6:15 that the counting operator knows that the period 4:00 – 5:00 has ended! The operator produces the correct results — but 1:15 too late! Worse, suppose that 5:00 is the end of the business day and that the next input row doesn’t occur until next morning. Now the hourly count is late by hours.

Example: Merging Two Streams

Consider a black box computing the UNION ALL of two streams P and Q: this just means to accept all rows from either input. Of course the output as well as both inputs must be ordered by rowtime, so what the black box does is to merge the two inputs by rowtime: it always picks the earlier input row from P or Q, and copies it to its output stream.

There’s a catch: this algorithm is correct but it can get stuck. Input rows can appear intermittently, so there can be a time period when there is a row available from one input but none on the other side.

For example, suppose the input rows arrive with the rowtimes shown in this table:

stream P stream Q
1:00 1:02
1:06 1:04

The merge algorithm produces:

P UNION ALL Q
1:00 from P
1:02 from Q
1:04 from Q

And now it stops, with no input rows on deck from Q. It cannot output “1:06 from P”, because for all it knows there may be an earlier row from Q that hasn’t arrived yet, say a row at 1:05. On the other hand the next Q row might be later, say 1:10, in which case it would be correct to output “1:06 from P”.

The merge algorithm will always wait as soon as one input is exhausted.

Rowtime Bounds Help the Server Produce Timely Data

In fact the system does not need to wait for the next row. It could proceed on less information. In the merging streams example above, if it learns that the next Q rowtime is later than the latest P row, then it can consume the P row immediately. That is to say, when no Q row is available, it can help to have a rowtime bound for Q.

Thus it can be very useful for a data source to advance the rowtime bound explicitly, in anticipation of a future row. For more detail, see the topic JDBC Driver in the s-Server Integration Guide.

Some relational operators on streams – notably union, join, and windowed aggregation – produce output related to the end of a time period. Advancing the rowtime bound lets the system produce this output as early as possible.

  • as a data source inserts rows into a stream, the rowtime bound automatically increases
  • the data aource can advance the rowtime bound without inserting a row
  • relational operators automatically respond to the bound and pass it downstream

Informally, we call the action of advancing the rowtime bound as punctuating the stream. This is an analogy to punctuation in english text: punctuation marks are not words, but affect the meaning of a series of words.

Advancing the Rowtime bound gives early notice about future data in a stream, as well as immediate notice that a time window has ended. Early notice of a lower bound on future inputs can expedite the ordered union of streams, by guiding the ordered merger of the input rows. In ordered merging, an input row is held back until there is certainty that it is the earliest of all inputs. Time-constraints provide that certainty. Without them, processing could not continue until receipt of an input row with a later rowtime, because there would be no lower bound on unseen future inputs.

Immediate notice that a time window has ended enables immediate progress or results from operations awaiting one more input message. One such operation is windowed join, which may await the “last” transaction of a certain type, such as when merging streams. Another is windowed aggregation, which may await the “last” transaction within a specified time window in order to compute totals and other statistics. Aggregation often uses sliding windows, which move forward in time continuously: e.g., the last ten minutes relative to the current row.

Example Revisited: Grouping By Hours

Let’s see how rowtime bound can help with the first example above. At the end of each hour, keeping time by the stream clock, we want to output the value of each counter, and then clear the counters. When the input data is dense in time, we can rely on each new row to advance the stream clock (to its rowtime). A row at 3:00:01 moves the clock to 3:00:01, and implies that the period from 2:00 to 3:00 is ended. The delay of 1 second is acceptable (we assume).

But suppose the last input of the day occurs at 4:59. We should recognize the end of the period from 4:00 to 5:00 as soon as possible, that is at 5:00. To cause this, the data source punctuates. At 5:00 it knows there will be no more input rows for the day, so it can advance the rowtime bound past 5:00. (This is an example of a strict bound, “next row is later than 5:00”. A normal bound is non-strict, “next row is at least 5:00 or later” But to the server rowtimes are discrete, measured to the millisecond, so a strict bound at 5:00 is the same as a non-strict bound at 5:00:00.001)

In fact, if we know that the earliest possible next input row could be at 9:00 the next day, the data source could advance the bound to that 9:00 (non-strict). The effect is the same.

Example Revisited: Merging Inputs

Now to reconsider the second example. The merge operator is stuck with inputs:

stream P stream Q
1:00 1:01
1:06 1:04

The input clock for Q is still at 1:04, from the last input row from Q. If we knew that the next Q row will arrive with rowtime 1:05, then we know we must wait for it, and output it, On the other hand, if we knew that the next Q row will have rowtime 1:10, we can output the last P row immediately.

If the source of the Q rows knows a little bit about the next row, it can set the rowtime bound, which will be propagated downstream to the merge operator, which will see it as the value of the input clock for Q. The source doesn’t need to know the whole row in advance, and it doesn’t even need to know the rowtime, it only needs to know a lower bound on the rowtime (that is larger than the rowtime of the latest inserted row, in this case 1:04). Thus punctuation by data sources results in better input clocks and a more efficient merge operator. The same correct output is produced without the advantage of punctuation: but it is delayed.

Rowtime Bounds Summary

A rowtime bound is a constraint on the rows that will later be inserted into a stream. Rows that are less than the bound will be rejected.

  • Because the rowtimes in a stream must be non-descending, each row automatically creates a non-strict bound following itself.
  • Rowtime bounds are never lost. Two rowtime bounds can combine, but if they do, the stronger bound is preserved.
  • Rowtime bounds can be strict or non-strict. A non-strict bound allows the next row to have the same or greater rowtime, just not less, whereas a strict bound requires the following row only a greater rowtime.
  • Rowtime bounds do not flush data through the system, but they allow operators to make progress.

Rowtime Bounds API

This section explains the API for rowtime bounds.

  • How can a data reader see the latest rowtime bound?
  • How can a data source advance the rowtime bound?

Rowtime Bounds and Interfacing SQLstream s-Server Applications

The issues of handling rowtime bounds arise in several contexts of interfacing streaming SQL applications.

Rowtime Bounds and SQL

Rowtime bounds cannot be read or written in SQL.

However, rowtime bounds can be read or written in Java clients using the JDBC Driver as well as in adapters using the SQLstream API defined in the SDK, as described below.

More detailed descriptions appear in the Integration Guide’s JDBC Driver topic about setting and getting rowtime bounds; their interfaces are shown in that book’s section titled Extensions to JDBC API. and an example appears in the section titled Sending Rowtime Bounds.

Many applications use pumps as a way of inserting the results of a query into a stream. (A pump is a separate data source defined entirely in SQL.) There is no way in SQL for a pump to punctuate, that is, to create rowtime bounds. A pump can only insert rows. However, the pump will pass forward the rowtime bounds it finds in its data source, its embedded SELECT statement. (This point is explained in the Eliminating Rows topic of this guide.)

Rowtime Bounds and JDBC

Java client applications connect to the s-Server using the SQLstream JDBC Driver, which has been extended to support streaming concepts. In particular, java.sql.Statement is extended as StreamingStatement, and java.sql.PreparedStatement as StreamingPreparedStatement.

Typically a data source will prepare an INSERT statement with parameters, and execute the prepared statement several times, binding different parameter values. A data reader executes a SELECT STREAM statement. In JDBC this corresponds to a StreamingStatement. To read rows, the reader gets a ResultSet and calls ResultSet.next() in a loop. The ResultSet from a SELECT STREAM is essentially a result stream: each call of next() returns the next row in the stream, and may block, waiting for the next row to arrive.

The SQLstream JDBC driver lets a writer set the rowtime bound (lets him “punctuate” his stream of input rows), and lets a reader get the current rowtime bound (get the current stream clock for the stream of result rows for his query). See the topic SQLstream JDBC Driver in the Integration Guide for more details. This is the interface:

// returns the current bound for the results from the statement
Timestamp StreamingStatement.getRowtimeBound();
// Sets the rowtime bound. Must be an input statement.
// An error to set the bound backwards in time.
// long form: 'cal' indicates the timezone of 't'.
// Next row's rowtime must >= t.
// 'strict' makes the constraint strict: rowtime > t.
void StreamingPreparedStatement.setRowtimeBound(Timestamp t, Calendar cal, boolean strict);
// short form: non-strict bound, UTC timezone
void StreamingPreparedStatement.setRowtimeBound(Timestamp t);

If possible, a data source inserting rows should call

StreamingPreparedStatement.setRowtimeBound(t)

before a gap, to indicate the rowtime of the next row to be inserted.

A data reader can get the stream clock by calling

StreamingStatement.getRowtimeBound().

It returns a non-strict lower bound on future rows not yet in the JDBC client buffer. An application may call getRowtimeBound() several times; the value returned may increase, which means that the system has provided the client with a stronger bound.

Rowtime Bounds and UDFs

A UDF (User-Defined Function), coded in Java, can be used as an element of a scalar SQL expression. Thus it is used as part of a row-by-row transformation or a filter. In s-Server, a UDF passes on rowtime bounds unchanged.

Rowtime Bounds and UDXs

When writing a UDX (User-Defined Transformation), you need to be aware of rowtime bounds. See the topic Rowtime Bounds and UDXs in the Integrating with Other Systems* for more details.

Rowtime Bounds and Adapters

An adapter is a Java module that is either a data source or a data sink. A data source calls the Java interface com.sqlstream.plugin.RowSink to send rows, and a data sink implements the same interface to receive rows. This interface has a method to transfer a rowtime bound:

void sendRowtimeBound(Timestamp);

A data source can call this, to the same effect as a JDBC client data source calling StreamingPreparedStatement.setRowtimeBound A data sink may use rowtime bounds sent to it as hints about future inputs, which may allow it to do some computations in advance, in a way analogous to the merge operator discussed above.

Some JDBC Examples

An Example of a JDBC Data Source

The following lines of Java code provide an example of a producer introducing a bound with a later rowtime than the last occurring row. These lines produce the three orders shown in the Implicit Rowtime Bounds section, followed by an arbitrary later bound:

Connection connection01;
StreamingPreparedStatement pstmt = connection.prepareStatement("INSERT INTO Orders (rowtime, ticker) VALUES (?, ?)");
// Create a calendar for '2006-09-11 12:00:00'.
Calendar calendar01 = Calendar.getInstance(TimeZone.getTimeZone("GMT"));
   calendar01.set(Calendar.YEAR, 2006);
   calendar01.set(Calendar.MONTH, 09); // months are 0-based
   calendar01.set(Calendar.DAY_OF_MONTH, 11);
   calendar01.set(Calendar.HOUR_OF_DAY, 12);
   calendar01.set(Calendar.MINUTE, 0);
   calendar01.set(Calendar.SECOND, 0);
// Send ORDERS: 'ORCL', rowtime '2006-09-11 12:00:00'.
   pstmt.set(1, new Timestamp(calendar01.getTimeInMillis()));
   pstmt.set(2, "ORCL");
   pstmt.execute();
// Send ORDERS: 'MSFT', rowtime '2006-09-11 12:01:00'.
   calendar01.set(Calendar.MINUTE, 1);
   pstmt.set(1, new Timestamp(calendar01.getTimeInMillis()));
   pstmt.set(2, "MSFT");
   pstmt.execute();
// Send ORDERS: 'IBM', rowtime '2006-09-11 12:02:00'.
   calendar01.set(Calendar.MINUTE, 2);
   pstmt.set(1, new Timestamp(calendar01.getTimeInMillis()));
   pstmt.set(2, "IBM");
   pstmt.execute();
// Send rowtime bound '2006-09-11 12:05:00'.
   calendar01.set(Calendar.MINUTE, 5);
   pstmt.setRowtimeBound(new Timestamp(calendar01.getTimeInMillis()));

The results of this code are as follows:

  • Set up the calendar for the 10th month, 11th day, 12th hour (lines 4 through 11).
  • Create the first order, for ORCL, occurring exactly at 12:00:00 (lines 12 through 15).
  • Create the second order, for MSFT, occurring exactly at 12:01:00 (lines 16 - 20).
  • Create the third order, for IBM, occurring exactly at 12:02:00 (lines 21 - 25).
  • Create the producer’s chosen arbitrary bound [2006-09-11 12:05:00] (lines 26 - 28).

These 28 lines will create the previous sequence of rows, and follow it with the bound: ORDERS.bound: rowtime ‘2006-09-11 12:05:00’. The bound is a promise by the caller not to insert another row using this prepared statement with a rowtime earlier than 2006-09-11 12:05:00.

Semantics of Rowtime Bounds

Rowtime bounds never change the results of a query, but they do help the system provide the results sooner. The general rule is that the value of a streaming SQL expression (such as a query) depends on the SQL operations and their operands (the data), but not on any rowtime bounds that accompany the data.

Some operators ignore rowtime bounds. Some operators make use of rowtime bounds to produce earlier results, but never to produce different results.

In this section we examine the semantics of rowtime bounds. First we review what rowtime bounds are and how they flow through the system. Next we review why rowtime bounds are useful. Finally we consider how relational operators deal with rowtime bounds.

Implicit Rowtime Bounds

Because the rows of data in a stream are ascending in time-value, the existence of a row implicitly states that there will not later be a row with an earlier rowtime. It is as if every row in a stream is followed by a time bound with the same rowtime as the rowtime of that row:

ORDERS: 'ORCL', rowtime '2006-10-10 10:00:00'
ORDERS.bound: rowtime '2006-10-10 10:00:00'

As rows move through the system, sometimes they “catch up” with earlier rows in the same stream. The rowtime of each later row must be greater than or equal to the bound of the preceding row. The result is a group of rows followed by a single bound. If, for example, rows occur representing three orders, the rowtime for the implicit bound is automatically that of the last occurring row:

ORDERS: 'ORCL', rowtime '2006-09-11 12:00:00'
ORDERS: 'MSFT', rowtime '2006-09-11 12:01:00'
ORDERS: 'IBM', rowtime '2006-09-11 12:02:00'

Hence at any moment in time, a stream contains groups of rows, each implicitly followed by a bound.

Merging a bound onto a train of rows

A bound moves through the system as a self-sufficient unit. If it encounters or catches up with a previous train of rows, it supersedes that train’s (weaker) bound. The train will look like this:

ORDERS: 'ORCL', rowtime '2006-09-11 12:00:00'
ORDERS: 'MSFT', rowtime '2006-09-11 12:01:00'
ORDERS: 'IBM', rowtime '2006-09-11 12:02:00'
ORDERS.bound: rowtime '2006-09-11 12:02:00'

Splitting trains of rows

Sometimes the system splits a train of rows in two. (This can happen when an internal buffer fills up) When a train is split, the first train will end with the strongest bound it can, namely, the rowtime of the first row in the next train:

ORDERS: 'ORCL', rowtime '2006-10-10 10:00:00'
ORDERS: 'MSFT', rowtime '2006-10-10 10:01:00'
ORDERS.bound: rowtime '2006-10-10 10:02:00'
ORDERS: 'IBM', rowtime '2006-10-10 10:02:00'
ORDERS.bound: rowtime '2006-10-10 10:05:00'

Eliminating rows

A filter condition can eliminate rows from a stream, but bounds are never eliminated. For example, if the previous two trains passed through the condition WHERE ticker = ‘ORCL’, the resulting train would look like this:

ORDERS: 'ORCL', rowtime '2006-10-10 10:00:00'
ORDERS.bound: rowtime '2006-10-10 10:02:00'
ORDERS.bound: rowtime '2006-10-10 10:05:00'

When Rowtime Bounds Matter

A streaming query will automatically take advantage of any available rowtime bounds to produce more timely output. As a stream of rows is processed, rowtime bounds accompany the data as described in the previous section.

The important rowtime bounds – those that carry extra information – are the extra bounds (nicknamed “punctuation”) that a data source adds as it inserts rows. These give advance notice of future data: namely, a lower bound on the rowtime of all future rows inserted. Asserting the bound t means that, for the next inserted row r, the field r.rowtime will be at least t. Other ways to express this are that the “clock” of the stream of inserted rows has jumped forward to t, or that the data source is going to be “quiet” until time t.

Important notes:

  • s-Server trusts that the rowtime bound is valid*. If the data source subsequently inserts a row with rowtime less than the bound, the s-Server rejects the row and reports an error.
  • Every valid inserted row must have a rowtime not less than the stream clock*. Hence every row either repeats or advances the stream clock,
  • Data sources should track inserted rowtimes and estimate the rowtime of future inserts*. Estimating the earliest rowtime of future inserts (such as the opening of the next trading day) will help s-Server produce timely output.
  • Relational operators also advance their output streams*. They advance the rowtime bound of their output stream after producing a train of rows, before an apparent pause, and the bound they set is a prediction and a promise about their next output row.

Eclipsed Rowtime Bounds

A rowtime bound may convey important extra information about a data stream, but the importance of the bound is short-lived. As soon as the next row or the next bound arrives, the older bound is no longer useful: that is, the rowtime bound is eclipsed* by the new information.

Since all rows in S are in rowtime order, any row r with rowtime b.t functions as an explicit announcement that no future row will have a rowtime earlier than r.t. Every row advances the rowtime bound unless it has the same rowtime as its predecessor.

As a result, any rowtime bound is discarded once a later rowtime is made available either through a newly arrived row or another rowtime bound. All older bounds are eclipsed by expicit rowtimes and discarded.

Rowtime bounds are most useful when rows will be followed by a known gap in time. They tell the system to not wait for a new row to begin processing.

Splitting/Merging Rows and Rowtimes

At times, two trains of rows may join up and form a single train, or a single train may be split in two. In the first case the bound of the earlier train is eclipsed. In the second case a gap has been introduced, and the new early train will be given a rowtime bound, namely, the rowtime of the first row in the second, later train. This means that rowtime bounds are “not conserved”. But the information they carried is not lost, and so their effect is not lost.

Strict and non-strict bounds

All of the bounds discussed so far have been non-strict. For example, the bound

ORDERS.bound: rowtime '2006-10-10 10:00:00'

means ‘There is not going to be a row in this stream with rowtime before 10:00:00’. In other words, each row must be 10:00:00 or after. The ‘or after’ makes the bound non-strict. This is similar to the SQL standard OVER operator for a 1-hour window, which contains both of its end points. Thus, a trade at 12 noon would fall into both the 11-12 window and the 12-1 window.

However, certain operators require strict bounds in order to proceed. A strict bound would say ‘There is not going to be a row in this stream with rowtime at or before 10:00:00’. In other words, with a strict bound, each row must be strictly after 10:00:00. For example, the windowed aggregation operator cannot produce a summary for the hour 11:00:00 to 12:00:00 until it knows it has seen the last trade occurring on the stroke of 12:00:00.

In SQLstream rowtimes have finite precision (namely, 1 msec). A strict bound at time t is equivalent to a non-strict bound t + 1 msec.

Relational Operators and Rowtime Bounds

Rowtime bounds help data flow through a SQLstream system. They are a form of contract, giving the consumer a benefit (more timely data) if the provider accepts a constraint (to only produce rows with a given rowtime or later). Understanding how rowtime bounds affect query processing requires looking at each of the stream-processing operators in turn.

Row-by-Row operators

Many operators act on one input row at a time, transforming it into a single output row. For example:

  • A PROJECT operator copies certain columns while suppressing others.
  • An expression calculator computes the value of a scalar expression over columns, such as the sum of two numeric column values. Usually the operator passes through the rowtime unchanged.

In this case rowtime bounds are passed through unchanged. In precise terms, a lower bound on future input rows is also a valid lower bound on future output rows. In the course of producing output, the operator passes a rowtime bound. The same is true for filters (WHERE clauses in a SELECT statement), which are a special kind of row-by-row operator. A filter rejects some rows and passes others, but it does pass all rowtime bounds. This is the correct behavior because a rowtime bound is an assertion about all future rows. To be precise, the bound is a mathematical lower bound on the rowtime of all future rows to arrive at the filter, so it must also be a valid lower bound on any subset of future rows, including the subset of rows tha the filter will pass. The filter operator doesn’t change rowtimes, so the same bound applies to the output of the filter.

(Note that there may be a better, ie larger bound. The best possible bound is the actual rowtime of the next output row, though this is generally not yet known at the time the bound is propagated.)

Another special case is a row-by-row operator that replaces the input rowtime by a calculated expression. Since the output stream must be ordered by rowtime, this expression must be a monotonic increasing function of rowtime. For example:

SELECT rowtime + 100 AS rowtime, ....

Here the monotonic transformation is a simple shift by 100 msecs. The discussion above about bounds still holds, provided that rowtime bounds are shifted the same way as rowtimes. To summarize: row-by-row operators pass on rowtime bounds unchanged, unless they change rowtimes.

Merging Streams

Input streams are merged in two cases:

  • By the UNION ALL operator
  • Whenever several statements insert into the same named stream

In both cases the system merges the input streams by rowtime, so that the output stream is correctly ordered. The merge algorithm needs to know the next rowtime available on each input stream. When the next row to output has not yet arrived on an input stream, the merge must wait. Getting rowtime bounds on all input stream can avoid some of these delays.

UNION ALL and rowtime bounds

The UNION ALL operator needs to produce its output in ascending order. Consider the streaming Union operator generated to implement the query

SELECT STREAM * FROM Orders
UNION ALL
SELECT STREAM * FROM Trades

and suppose that the Union operator has already seen input

Orders: ORCL 10:01:00
Orders: MSFT 10:04:00
Trades: YHOO 10:02:00
Trades: IBM 10:03:00

and produced output

Union: ORCL 10:01:00
Union: YHOO 10:02:00
Union: IBM 10:03:00

The following row and bound: Orders: MSFT 10:04:00 Orders.bound: 10:06:00

cannot flow through the union yet. To see why, imagine that a row with an earlier rowtime subsequently arrives on the Trades stream:

Trades: GOOG 10:03:30

If the union had already emitted the MSFT 10:04:00 record, it would now be forced to output a row out of order! So, in order to proceed, the union needs the assurance that no row with rowtime less than 10:04:00 will ever arrive in the Trades stream. In short, it needs a rowtime bound. Such a rowtime bound might arrive on its own:

Trades.bound: 10:05:00

or be implied by a row with a later rowtime:

Trades: IBM 10:05:30

Named Streams and Rowtime Bounds

When several clients are simultaneously inserting rows into the same named stream, the system has to merge the inputs by rowtime. This gets tricky when the data sources come and go.

Simultaneous Inserts

Consider a stream definition such as CREATE STREAM Orders (ticker VARCHAR(10)); and two producers each periodically executing the prepared statement INSERT INTO Orders (rowtime, ticker) VALUES (?, ?);

The behavior is similar to union: a row will only flow out of the Orders stream with a particular rowtime when both of the producers have sent a row or a rowtime bound up to at least that rowtime.

One outcome is that a stream seems to run at the speed of its slowest producer, which may be inconvenient when one producer is significantly slower than the others. When they are implemented, timeliness constraints will mitigate this effect.

The “slowest producer” effect is yet another example of a contract: In this case, open producers benefit by being given as much time as they need to produce the next row. Consumers, however, suffer by possibly being required to wait an arbitrarily long time for a given row to appear. Also, the producer benefit from this contract may not help a producer that has just joined a stream: If the producer has a row at rowtime 11:00:00 and the stream’s most recent row was timestamped 11:10:00, that producer’s row is simply out of sequence, and the producer must discard the row.

Producers Entering and Leaving a Stream

When a named stream is created, it has no rowtime bound: the first row inserted can have any rowtime, and this rowtime then becomes the rowtime bound of the stream. The next row cannot be earlier, whether inserted by the same or a new producer.

Later, whenever a new producer joins, the stream’s rowtime bound is the time of the latest row inserted. The newcomer can insert a row only if its rowtime is the same as, or later than, the stream bound. Note that the newcomer can barge in ahead of the other producers if its row is earlier than theirs. The point, of course, is to ensure that the rows in the stream are in rowtime order.

Here is a concrete example: stream S is created (and has no bound). Producer P1 arrives and asserts its bound is 10:00. The stream bound is still undefined, since no actual row has been inserted yet.

Then P1 inserts a row with rowtime 10:00, consistent with its declared bound. The row enters the stream and the stream bound becomes 10:00.

Next, producer P2 arrives, asserting its bound is 10:05. This is acceptable, as it is later than 10:00. If P2 tries then to insert a row with rowtime 10:05, it may have to wait (if P1 or a newcomer has earlier rows), but eventually it will succeed. On the other hand, if P2 tried to insert a row with rowtime 9:59, it would be rejected as a late row.

If P1 now closes and detaches from the stream, then P2 is the only source, so its 10:05 row is next up and enters the stream, advancing the stream clock to 10:05. The departure of input P1 let the stream advance: while it was still attached, with a bound of only 10:00, there was a chance it would produce the next row (with any time t such that 10:00 ≤ t < 10:05).

If all inputs close and detach, the stream “remembers” its rowtime bound, so that a producer P3 which appears later cannot insert a late row.

Streaming Aggregation and Windowed Aggregation

Aggregation applies summarizing functions (like COUNT, SUM, or MAX) to groups of rows. A group always consists of a consecutive series of rows (with rowtimes in a single interval). A group is defined by the count of rows it contains or (more usually) the length of time the rows span. For example, given a stream of events with rowtype (seqno integer, classification varchar(16), cost money), we can count events over each successive hour, which is a streaming aggregation, or we can compute a running average cost of the events of the last hour, which is a windowed aggregation. The streaming aggregation produces a value at the end of each group. The windowed aggregation produces a new value on every input row: that is, whenever the rows in the window change.

Both aggregation cases, streaming and windowed, depend on input rowtimes to trigger output:

  • Streaming aggregation needs to know when a time period has ended.
  • Windowed aggregation needs to know when the window has moved forward.
  • More precisely, it needs to know when the window contents change, that is, when a row enters or is dropped from the window.

So for aggregation to make efficient progress, as with union/merge above, both cases need to know the rowtime of the next row. Rowtime bounds thus allow aggregation to produce results as soon as possible.

For examples of the effect of rowtime bounds:

  • On windowed aggregation, see the topic Analytic Functions in the s-Server Streaming SQL Reference Guide.
  • On streaming aggregation, see the topic Aggregate Functions in the s-Server Streaming SQL Reference Guide.

Streaming Joins

When you use streaming join queries, as with streaming union queries, you need to remember that s-Server always outputs rows in ascending order of rowtime.

You cannot join two entire streams – that is, find all matching pairs of rows regardless of rowtime – is not allowed, because it requires unbounded memory to save the entire history of both streams.

You can, though, apply sliding windows to each input stream and to find matching pairs of rows, one from each window. This is the same “window” construct used in windowed aggregation.

Each window join must have a finite size, though its contents vary over time. Windowed joins can benefit from rowtime bounds on their input to emit earlier output. In particular, this helps an outer join by finding unmatched rows as soon as possible. Like other operators, a windowed join operator sets its output rowtime bound to indicate the earliest possible rowtime for its next output.

When the output is due to a matched pair of input rows, each has its own rowtime. The output rowtime is the later of these times, because the rows could not match until both “existed”.

For an outer join, an unmatched row results in an output row. The output rowtime is not the rowtime of the unmatched row: to be consistent with the treatment of matches, it is the time we discover the row to be unmatched, i.e., the time the unmatched row is dropped from its window. Rowtime bounds on the input allows this be discovered as soon as possible.

Examples of Streaming Joins

Join query # 1

SELECT STREAM
ROWTIME, o.orderId, o.ticker,
o.amount AS orderAmount, t.amount AS tradeAmount
FROM Orders AS o
JOIN Trades OVER (RANGE INTERVAL '10' MINUTE FOLLOWING) AS t
ON o.orderId = t.orderId

This query prints all orders matched by a trade within ten minutes of the order being placed. As ever, the output stream must be sorted by rowtime; the question becomes, “Which rowtime?” The query is written in a way that centers on a single row from the Orders stream at a time, joined to a window of rows from the Trades stream. The output rowtime, appropriately, is the rowtime of the single Orders row. (To see why, apply the definition of the rowtime of a join: the base-time of the windows at the time the match is discovered. The orders row is alone in its window, a row-based window of size 1 with no offset, so its base-time is the rowtime of the orders row.)

Orders matched within ten minutes

Step Orders Trades Output
1 10:00:00 1 ORCL 100
10:03:00 2 YHOO 25
2 10:02:00 1 ORCL 60
10:00:00 1 ORCL 100 60
10:04:00 2 YHOO 25
10:07:30 1 ORCL 30
10:00:00 1 ORCL 100 30
3 10:11:00 bound
10:03:00 2 YHOO 25 25
4 10:10:00 bound
5 10:12:00 1 ORCL 10

Let’s look at the execution of this query in detail.

  • In step 1, two orders are placed.
  • In step 2, three trades occur, and they all match an order.

The trades for order # 1 placed at 10:00:00 can be output, but not those for order # 2 placed at 10:03:00. Why not? Because it’s still possible for a trade for order # 1 to arrive before the 10:10:00 deadline.

  • In step 3, a rowtime bound on the Trades stream tells us that no further trades will match order # 1.

The streaming join operator can now remove order # 1 from its memory. It’s now possible to output the match on YHOO that was seen in step # 2.

  • In step 4, a rowtime bound on the Orders stream arrives. It has no effect.
  • In step 5, a trade arrives for order # 1, but too late for its window (which ended at 10:00): no match.

Join query # 2

SELECT STREAM
ROWTIME, o.orderId, o.ticker,
o.amount AS orderAmount, t.amount AS tradeAmount
FROM Orders OVER (RANGE INTERVAL '10' MINUTE PRECEDING) AS o
JOIN Trades AS t
ON o.orderId = t.orderId

This query is similar to the one above. Orders are still matched with trades which occur within ten minutes of being placed. The only difference is that the join is performed from the perspective of the Trades stream, and therefore the output rowtime is that of the trade. As a result, the timings when rows are output are quite different:

Step Orders Trades Output
1 10:00:00 1 ORCL 100
10:03:00 2 YHOO 25
2 10:02:00 1 ORCL 60
10:04:00 2 YHOO 25
10:07:30 1 ORCL 30
10:02:00 1 ORCL 100 60
3 10:11:00 bound
4 10:10:00 bound
10:04:00 2 YHOO 25 25
10:07:30 1 ORCL 100 30
5 10:12:00 1 ORCL 10

Let’s look at the execution in detail.

  • In step 1, two orders are placed.
  • In step 2, three trades occur, and they all match an order. We can output the 10:02:00 trade for order # 1, but we cannot move onto the 10:04:00 trade yet. How can this be?

We know that there are no more trades before 10:04:00, so why can’t we move on to it? The problem is not with the Trades stream – it is with the Orders stream. The streaming join operator thinks that there could be another Order record with orderId=1 before the 10:04:00 deadline; if such an order were to arrive, it would generate another output row time-stamped 10:02:00. (We happen to know that there is orderIds are unique, and that can’t happen.)

  • In step 3, a bound for Trades arrives.
  • In step 4, a bound for Orders arrives. The streaming join operator now knows that it is impossible to see another order pairing with the 3 trade rows seen so far.
  • In step 5, an unpaired trade arrives. The streaming join operator will keep this trade in memory until the Orders stream reaches 10:12:00, because it would be possible for a matching order to arrive. (We know that the order # 1 has already been seen, and that there will not be another, but once again the windowed join operator cannot deduce that.)

Join query # 3

Now consider a variation on query # 1 as an outer join, with an unmatched trade and an unmatched order in the input data. (The select list has been expanded to clarify the new output rows.)


SELECT STREAM

   ROWTIME, o.orderId, o.ticker, o.amount AS orderAmount,

   t.ticker, t.tradeId, t.amount AS tradeAmount,

 FROM Orders AS o

 OUTER JOIN Trades OVER (RANGE INTERVAL '10' MINUTE FOLLOWING) AS t

 ON o.orderId = t.orderId
Step Orders Trades Output
1 10:00:00 0 IBM 110
10:00:00 1 ORCL 100
10:03:00 2 YHOO 25
2 10:02:00 1 ORCL 60
10:00:00 1 ORCL 100 1 ORCL 60
10:04:00 2 YHOO 25
10:07:30 1 ORCL 30
10:00:00 1 ORCL 100 1 ORCL 30
3 10:11:00 bound
10:00:00 0 IBM 110 null null null
10:03:00 2 YHOO 25 2 YHOO 25
4 10:10:00 bound
5 10:12:00 1 ORCL 10
10:12:00 null null null 1 ORCL 10

The inputs are the same as before, but we add an umatched order at the start. As the same matches occur, the output should be a superset of that of query #1, with the same rowtimes for the matches.

The unmatched order for IBM results in an output row with null Trade-columns; as with the matches, the output rowtime is the Order rowtime (10:00 for Order # 0); but the mismatch is discovered only when the 10 minute window expires for this order, at 10:10. Hence the 10:11 bound on the Orders stream causes the mismatch for IBM to be output, and then (as before) the match for YHOO from 10:03.

The unmatched trade at 10:12 for order 1 results in an output row with null Order-columns. But when? The output rowtime cannot be the time of the matched Orders row, since there isn’t any. Instead it is the time of a missing order that could have matched this Trade, which could have occured any time during the 10 minute window preceding 10:12; thus the mismatch is known at the end of the window, that is, immediately at 10:12, which is the correct rowtime.

Extension Operators (UDF, UDX, Transform Plugins)

SQLstream allows an application to define its own extension operators. These include:

  • User-defined functions (UDFs). You use these in a SELECT expression (or in a WHERE clause). For more detail, see the topics SELECT and WHERE clause in the s-Server Streaming SQL Reference Guide. A UDF is a row-by-row operation, and so is unaffected by rowtime bounds, which it automatically passes downstream.
  • Foreign streams, generated by an adapter coded in Java. Such streams are very general-purpose. The algorithms they express can make use of rowtime bounds and should be responsible for passing them downstream. A plugin can see and emit rowtime bounds as part of the com.sqlstream.plugin.RowSink interface that all plugins use to read and write data. A client application can see and emit rowtime bounds as part of the SQLstream implementation of JDBC.
  • User-defined transforms (UDXs). You code these in Java. They read an input stream and produce an output stream. When writing a UDX (User-Defined Transformation), you need to be aware of rowtime bounds. See the topic Rowtime Bounds and UDXs in the SQLstream Integration Guide for more details.