In the last tutorial, we used StreamLab to connect with a data source, perform a simple analytic on the data, and visualize the data. As it worked, StreamLab was generating SQL. SQL is a query language that has been used for decades to query databases. SQLstream uses a version of SQL extended for streaming data.
Common tasks in SQL include creating streams and tables, inputting data into streams and tables, querying these streams and tables, moving data between objects, performing analysis on this data, and outputting data into a sink, or destination outside of s-Server.
This tutorial instructs you how to complete these tasks in SQL, first by using SQLstream's Integrated Development Environment, s-Studio, and second by writing SQL by hand and executing it using SQLline, a console utility for executing SQL.
After you complete these steps, you can use s-Dashboard to Show Streaming Data in Dashboard.
SQLstream s-Studio is SQLstream's Integrated Development Environment. It lets you develop, test, run, and administer streaming SQL applications.
To launch s-Studio, open a terminal and enter the following text:
/opt/sqlstream/<VERSION>/s-Studio/s-Studio
The s-Studio workplace opens.
Before you can begin working with data in s-Studio, you first need to connect to s-Server.
If you have installed s-Server and s-Studio as part of the same package, a connection to s-Server has been already set up.
To connect to this instance, right-click on First SQLstream Server and choose Connect.
If s-Studio becomes unresponsive, you may need to reset the s-Studio workplace. You can do so by opening a terminal and issuing the following command:
/opt/sqlstream/<VERSION>/s-Studio/resetStudio
You will not lose any of your work, because all of your changes should be saved in s-Server itself.
In the following exercise, we will capture data from a streaming log file and use this data to identify buses that exceed the speed limit in the Sydney area. Because this data is streaming, speeders will be identified in real time. We will derive this data by connecting to a simulated streaming log file located on your local machine. In this case, the log file records messages sent out by buses in Sydney, Australia.
To begin, you need to start StreamLab and WebAgent. If you are running Guavus SQLstream in a Docker container or other virtual machine, you can do so from the Guavus SQLstream Cover Page.
For appliances and Docker installations, on the cover page, scroll down to Sydney Buses is Running and click the On/Off switch to start streaming data. Data streams to /tmp/buses.log.
If you are running s-Server on a local machine, you can start the script by opening a terminal and entering the following:
$SQLSTREAM_HOME/demo/data/buses/start.sh
To stop the script, enter
$SQLSTREAM_HOME/demo/data/buses/stop.sh
Data streams to /tmp/buses.log.
This file features data in the following categories:
Column | Type | Definition |
---|---|---|
id | DOUBLE | Identification number for the bus. |
reported_at | TIMESTAMP | Time location was reported. |
shift_no | DOUBLE | Shift number for the bus's driver. |
driver_no | DOUBLE | Driver identification for number. |
prescribed | VARCHAR(4096) | The direction on the motorway (into Sydney or out of Sydney). |
highway | DOUBLE | Highway number if available. |
gps | VARCHAR | GPS information with latitude, longitude, and bearing in JSON format. |
In s-Server, you define foreign streams to connect with external data sources or sinks. A foreign stream serves as an interface between s-Server and external data. For each foreign stream, you define both connection information for the data source or sink, as well as (usually) columns to be read from the source or written to the sink. Each foreign stream requires a server. s-Server includes predefined servers for all data sources/sinks:
To create the foreign stream in s-Studio:
Open s-Studio by entering
/opt/sqlstream/<VERSION>/s-Studio/s-Studio
S-Studio should open.
Right-click First SQLstream Server and select Connect.
All foreign streams need to be defined in schemas. Schemas are "containers" for foreign streams, internal streams, tables, pumps, and so on. Here, we're going to create a new schema as a container for our foreign stream.
In the dialog box that opens, enter Buses_Schema for the data source's name.
Click on Buses_Schema to expand it.
Right-click Foreign Streams and select New.
In the dialog box that opens, enter Buses for the data source's name.
A tab opens called Buses (Foreign Stream).
Click the Definition tab.
To the right of Server, select FILE_SERVER from the dropdown menu.
Under Columms, click Add and add the following columns:
Column | Type |
---|---|
id | BIGINT |
reported_at | VARCHAR(32) |
speed | INTEGER |
driver_no | BIGINT |
prescribed | BOOLEAN |
gps | VARCHAR(128) |
highway | VARCHAR(8) |
Next, we need to set options for the foreign stream. Options tell s-Server where and how to read the external data source. In this case, we will be drawing on two sets of options:
Under OPTIONS, click Add and add the following options:
name | Value | Explanation |
---|---|---|
CHARACTER_ENCODING | UTF-8 | Character set for data |
DIRECTORY | /tmp | Tells s-Server where the file resides. |
FILENAME_PATTERN | buses\.log | Tells s-Server what to look for in the directory in which the file resides. |
PARSER | XML | Tells s-Server "parse this data as XML" |
We need to add two additional options to tell s-Studio how to read the XML data in the streaming source. This data looks as follows:
<Table1><id>50116195532</id><reported_at>2014-07-23T20:51:58.547</reported_at><speed>0</speed><driver_no>160019</driver_no><prescribed>false</prescribed><highway/><gps>{ "lat": -33.530784606933594, "lon": 150.87783813476562, "bearing": "bearing": 0}</gps></Table1>
The top-level XML tag that we want to parse is called Table1. You indicate the top-level XML tag with an option called PARSER_XML_ROW_TAGS.
Enter the following additional options:
name | Value | Explanation |
---|---|---|
PARSER_XML_ROW_TAGS | /Table1 | Tells s-Server "parse XML tags under /Table1" |
PARSER_XML_USE_ATTRIBUTES | false | Tells s-Server "ignore XML tag attributes" |
Note that the Buses (Foreign Stream) tab has an asterisk next to it. This indicates that the data source has not been saved. Choose File > Save (or Ctrl-S) to save the foreign stream. This step applies the SQL in s-Server.
To see the new view, right-click First SQLstream Server and select Refresh. This refreshes the Catalog tree with information from s-Server.
You can view the SQL you have created by clicking the SQL tab.
You should see SQL along the following lines:
(
"id" BIGINT,
"reported_at" VARCHAR(32),
"speed" INTEGER,
"driver_no" BIGINT,
"prescribed" BOOLEAN,
"gps" VARCHAR(128),
"highway" VARCHAR(8)
)
SERVER FILE_SERVER
OPTIONS (
CHARACTER_ENCODING 'UTF-8',
DIRECTORY '/tmp',
FILENAME_PATTERN 'buses\.log',
PARSER 'XML',
PARSER_XML_ROW_TAGS '/Table1',
PARSER_XML_USE_ATTRIBUTES 'false'
)
DESCRIPTION 'reads streaming data from buses.log'R 'XML'
)
Now that you have configured the foreign stream in s-Server, let's test the foreign stream to make sure data is streaming.
To do so:
Click the Execute button at the top of the Query Editor.
Data should flow in the lower part of the Query Editor.
Instead of writing a query, you can also use s-Studio's Inspect feature to see data flowing.
To do so, right-click the Buses foreign stream and choose Inspect. A tab opens called Inspect moving_buses with data flowing:
Now that we've created a foreign stream, we want to perform a basic calculation on this stream. To do so, well create a new object called a view.
Views are reusable definitions of queries. Every view contains a SELECT statement. Views, then, work as proxies for streams. Views provide a flexible, easy-to-modify way to work with streaming data. You can query views themselves, and a query on a view automatically retrieves data from the stream or streams queried by the view. As an s-Server developer, you will do much of the work of cleaning, analyzing, and routing data through views.
Because you can create views linked to one another with SELECT statements, you can use views to create pipelines, linked SQL objects that perform some analytic task. If you have used StreamLab, and viewed the SQL generated by pipeline guides, you may have noticed that pipeline guides consist mostly of interlinked views. It's also the case that multiple views can query the same stream or view. This allows you to fork pipelines.
While views contain queries, data will not flow in s-Server until you run an active SELECT on either the view itself or on a view that SELECTs this view downstream in the pipeline.
Views take the general form CREATE VIEW AS <SELECT STATEMENT>.
The SQL statement below, for example, does a simple filtering of the "buses" stream that we created above, selecting only the columns "id" and "speed".
CREATE OR REPLACE VIEW "Buses_Schema"."View" AS
--the SELECT statement below defines the view.
SELECT STREAM "id","speed"
FROM "Buses_Schema"."buses" AS "input";
When you use views, you can take advantage of all the power of a SELECT statement. So, for example, if you want to select only rows that have speeds greater than 0, you can use the WHERE clause.
The WHERE clause extracts records that meet a specified condition. The condition can be a numeric or string comparison, as in the following examples
SELECT STREAM FROM Sales WHERE Employee='Bob';
SELECT STREAM FROM Sales WHERE Customer_Type=1;
SELECT STREAM FROM Sales WHERE Customer_Type<1;
Here, we'll apply the WHERE clause to the "speeders" column, in order to limit results to those buses going faster than 0 km/hour.
In the SQL window, complete the statement by entering the following text:
SELECT STREAM * FROM "Buses_Schema"."buses" where "speed" > 0
This tells s-Server, "get me all of the columns in a stream called buses in a schema called Buses_Schema where the column speed is greater than 0".
Click the Definition tab. Note that all columns for the Buses stream appear here, and that buses_stream appears as a dependency for the view. Dependencies refer to objects referenced by the view.
Note that the moving_buses(View) tab has an asterisk next to it. This indicates that the view has not been saved. Choose File > Save (or Ctrl-S) to save the view. This step applies the SQL in s-Server.
To see the new view, right-click First SQLstream Server and select Refresh. This refreshes the Catalog tree with information from s-Server.
The new View appears under Views.
You can query any objects that contain data directly from s-Studio. Here, we will run a query on the view we just created.
SELECT STREAM * from "moving_buses"
Instead of writing a query, you can also use s-Studio's Inspect feature to see data flowing.
To do so, right-click the moving_buses view and choose Inspect. A tab opens called Inspect moving_buses with data flowing:
s-Server depends on a special column called rowtime to deliver accurately timed data. Often, as a developer, you will want to take a column from the existing data with a timestamp and "promote" it to rowtime.
Here, you want to get the timestamp from the _reportedat field by creating a view that selects reported_at as ROWTIME.
To do so:
In the SQL window, complete the statement by entering the following text:
SELECT STREAM
--casts "reported_at" as TIMESTAMP, promotes "reported_at" as ROWTIME
CAST("reported_at" AS TIMESTAMP) AS ROWTIME,
--other columns for view
"id","speed","driver_no","prescribed","gps","highway"
--this is the view we created in the previous step
FROM "Buses_Schema"."moving_buses"
Note that the buses_rowtime_promoted(View) tab has an asterisk next to it. This indicates that the view has not been saved. Choose File > Save (or Ctrl-S) to save the view. This step applies the SQL in s-Server.
To see the new view, right-click First SQLstream Server and select Refresh. This refreshes the Catalog tree with information from s-Server.
The new View appears under Views.
You can test that reported_at has been promoted to ROWTIME by running a query on the new view. To do so:
SELECT STREAM ROWTIME, * from "buses_rowtime_promoted"
You should see data flowing in the lower window, with ROWTIMES that begin with 2014 on the lefthand side.
You can also use views with windows. By including a WINDOW clause in a SELECT query, you can specify that the query applies to rows in a stream partitioned by the time range interval or a number of rows. This allows you to perform calculations for each output row, such as AVG or MAX. Calculations are performed over the window you specify, such as "1 MINUTE" or "20 ROWS".
In a conventional database, you would not need a window to perform such calculations, because the query would know what rows are available in advance. But because of the nature of streaming data, you have to identify a subset of rows in the stream on which to perform calculations.
Here, we'll create a view that averages bus speed over 1 minute.
In the SQL window, complete the statement by entering the following text:
SELECT STREAM AVG("speed") OVER "aWindow" AS "avg_speed",*
--note that we use the view that we defined above
FROM "Buses_Schema"."buses_rowtime_promoted"
--uses a WHERE clause to filter out buses that are not moving
WHERE "speed">0
WINDOW "aWindow" AS (RANGE INTERVAL '1' MINUTE PRECEDING)
Note that the avg_buses_speed(View) tab has an asterisk next to it. This indicates that the view has not been saved. Choose File > Save (or Ctrl-S) to save the view. This step applies the SQL in s-Server.
To see the new view, right-click First SQLstream Server and select Refresh. This refreshes the Catalog tree with information from s-Server.
The new View appears under Views. When you query this view:
select stream * from "avg_buses_speed"
you should see a column called avg_speed that contains the average speed of buses over the past minute.
When you viewed data flowing, you have probably noticed that the column gps contains unparsed JSON. While you can often parse data as it enters s-Server, there are times when you may need to parse data within a stream. This is known as mid-stream parsing.
In order to proceed, we need to parse this data. We can do so using the Parser UDX. The Parser UDX lets you call one of s-Server's data parsers.
In order to parse JSON, we need to create a function that calls one of s-Server's predefined user-defined transforms or UDXs. A user-defined transform returns a set of rows or a stream of rows. Its input arguments can be scalars or cursors. A cursor is an object representing a subquery, which the user-defined transform can use to read the subquery results.
Here, we'll use the Parser UDX. The Parser UDX lets you apply one of s-Server's predefined parsers to a column of data (in this case, the gps column in buses_rowtime_promoted).
To create a function that calls the Parser UDX:
(
INPUT CURSOR,
COLUMNNAME VARCHAR(256),
PARSERCLASSNAME VARCHAR(256),
OPTIONS CURSOR)
RETURNS TABLE (
--INPUT is the original stream, entered as a cursor for the function
INPUT.*,
--these are the columns that you expect to see in the JSON
"lat" DOUBLE,
"lon" DOUBLE,
"bearing" INTEGER
)
SPECIFIC "json_parser"
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
NOT DETERMINISTIC
--this calls the Parser UDX
EXTERNAL NAME 'class com.sqlstream.aspen.namespace.common.ParserUdx.parseColumn'
Next, we write a view that calls the json_parser function that we just defined.
SELECT STREAM "id","speed","driver_no","prescribed",
--these columns use the alias "JSON" to indicate that they are columns produced after the SELECT statement runs
JSON."lat" AS "lat", JSON."lon" AS "lon", JSON."bearing" AS "bearing","highway"
FROM STREAM (
--this is the name of the function that we defined above
"json_parser"(
--this is the input for the function (known as INPUT in function definition).
CURSOR
--cursor selects the view that we created above
(SELECT STREAM * FROM "buses_rowtime_promoted"),
--column from "buses_rowtime_promoted" to be parsed (known as COLUMN in function definition).
'gps',
--type for parser (known as PARSERCLASSNAME in function definition).
'JSON',
--these are options for the JSON parser (in function definition).
CURSOR(SELECT * FROM (values ('$')) AS OPTIONS(ROW_PATH))
))
AS JSON
Now, let's test our code by querying the buses_json_parsed view. (You can also use the Inspect feature to test your code.)
To do so:
SELECT STREAM * from "buses_json_parsed"
You should see data flowing in the lower window, with new columns called lat, lon, and bearing.
Next, we want to be sure that our values for lat and lon do not contain any values of 0. To do so, we can create another view, along the following lines. Note that you can use the AND keyword with the WHERE clause.
Create a view called buses_filtered_lat_lon and enter the following text into the SQL window:
SELECT STREAM * FROM "Buses_Schema"."buses_json_parsed" where "lat" > 0 AND "lon" > 0
Next, we want to set up the column "speed" so that instead of listing numerical speeds, it categorizes the column into "speeders" and "normal".
Create a view called speed_categorized and enter the following text into the SQL window:
SELECT STREAM "id",(CASE WHEN "speed" >= 45 THEN 'speeding' ELSE 'normal' END) AS "speed","driver_no","prescribed","lat","lon","bearing"
FROM "Buses_Schema"."buses_filtered_lat_lon"
You can test this view by entering the following query:
select stream * from "speed_categorized"
Skip down to learn how to use s-Dashboard to show this data in a Dashboard.
SQLstream s-Server lets you query, analyze, and route streaming data by using SQL (Structured Query Language), the same language that developers have been using to query traditional databases for decades.
This tutorial describes the steps for setting up a log file source and analyzing it, by coding blocks of SQL. You can execute SQL directly through a command-line program called SQLline. SQLline executes SQL code against SQLstream s-Server. A version of SQLline ships with s-Server, and comes pre-configured to connect with s-Server.
The tutorial includes the following steps:
The type of SQL that runs in s-Server is called streaming SQL. This SQL is based on the SQL standard, with some modifications. s-Server' streaming SQL is described in the SQLstream Streaming SQL Reference Guide.
SQLstream’s main enhancement to the SQL standard concerns the STREAM object. The process of creating streams in streaming SQL is similar to the process of creating tables in a database system like PostgreSQL or Oracle. Like database tables, streams have columns with column types. Once you create a stream, you can query it with a SELECT statement, or insert into it using an INSERT statement.
As well as native streams, SQLstream supports foreign streams which allow you to read data from a remote source, or write data to a remote sink.
A block of SQL that creates a foreign stream looks something like the following, which create a source foreign stream.
CREATE OR REPLACE SCHEMA "Buses_Schema";
SET SCHEMA '"Buses_Schema"';
CREATE OR REPLACE FOREIGN STREAM "buses"
(
"id" BIGINT,
"reported_at" VARCHAR(32),
"speed" INTEGER,
"driver_no" BIGINT,
"prescribed" BOOLEAN,
"gps" VARCHAR(128),
"highway" VARCHAR(8)
)
SERVER FILE_SERVER
OPTIONS (
CHARACTER_ENCODING 'UTF-8',
DIRECTORY '/tmp',
FILENAME_PATTERN 'buses\.log',
PARSER 'XML',
PARSER_XML_ROW_TAGS '/Table1',
PARSER_XML_USE_ATTRIBUTES 'false'
)
DESCRIPTION 'reads streaming data from buses.log'
;
Note that you declare types for each column. Data types determine what kind of data a column can contain. For example, a VARCHAR() column contains a character string of variable length, and a DOUBLE contains 64-bit floating point numbers.
For more information see CREATE FOREIGN STREAM.
To begin, you'll need to open SQLline.
To start SQLline:
A terminal window opens. You can enter SQL at the command prompt.
There are two main ways to run SQL in SQLline:
In the following tutorial, you will type SQL directly into the SQLline console.
In the following exercise, we will capture data from a streaming log file and use this data to identify buses that exceed the speed limit. Because this data is streaming, speeders will be identified in real time. We will derive this data by from a simulated streaming log file located on your local machine. In this case, the log file records messages sent out by buses in Sydney, Australia.
To start the sample streaming data source from an installation running the Guavus SQLstream Cover Page, such as a Docker container. On the cover page, scroll down to Sydney Buses is Running and click the On/Off switch to start streaming data. Data streams to /tmp/buses.log in XML format.
This file features data in the following categories:
Column | Type | Definition |
---|---|---|
id | DOUBLE | Identification number for the bus. |
reported_at | TIMESTAMP | Time location was reported. |
shift_no | DOUBLE | Shift number for the bus's driver. |
driver_no | DOUBLE | Driver identification for number. |
prescribed | VARCHAR(4096) | The direction on the motorway (into Sydney or out of Sydney). |
highway | DOUBLE | Highway number if available. |
gps | VARCHAR | GPS information with latitude, longitude, and bearing in JSON format. |
If you are running s-Server on a local machine, you can start the script by opening a terminal and entering the following:
/opt/sqlstream/<VERSION>/s-Server/demo/data/buses/StreamXmlBusData.sh
The most important concept for Streaming SQL is the stream. A stream is a continually updating data object. A stream is like a table with no end, but that begins when the stream was established. As s-Server intakes information, new rows are continually added to the stream.
When you run a SELECT query on a conventional database table, the query iterates through the result set until there are no more rows to return. But when you run a SELECT query on an s-Server stream, there's no end of rows. Instead, the "get next row" call continues to run in s-Server until the statement is closed by the client application. In complex systems, this open-ended SELECT needs to be managed in order to maximize performance. Here, though, we can just let it run.
Streams can be written to by multiple writers and read from by multiple readers.
The following code block creates a stream that references a prebuilt server object FILE_SERVER. This object contains the basic information that s-Server needs to connect to a data source, which in this case is a log file on a local machine. The stream is a virtual object with the columns listed below. When you query the stream, it splits the queried log file into columns based on the separator.
Enter the following code into the SQLline prompt. This code first creates and sets a schema for the foreign stream, sets up columns for the stream, and defines options for connecting to the file.
All streams must be created within schemas. A schema lets you logically group s-Server objects, such as streams, tables, views, and pumps.
CREATE OR REPLACE SCHEMA "Buses_Schema";
SET SCHEMA '"Buses_Schema"';
CREATE OR REPLACE FOREIGN STREAM "buses"
(
"id" BIGINT, --Identification number for the bus.
"reported_at" VARCHAR(32), --Time location was reported.
"speed" INTEGER, --Reported speed of bus.
"driver_no" BIGINT, --Driver identification for reported bus
"prescribed" BOOLEAN, --The direction on the motorway,
--(into Sydney or out of Sydney).
"gps" VARCHAR(128), --gps information in JSON: lat, lon, bearing
"highway" VARCHAR(8) --Highway number, if available.
)
SERVER FILE_SERVER
OPTIONS (
CHARACTER_ENCODING 'UTF-8',
DIRECTORY '/tmp',
FILENAME_PATTERN 'buses\.log',
PARSER 'XML',
PARSER_XML_ROW_TAGS '/Table1',
PARSER_XML_USE_ATTRIBUTES 'false'
)
DESCRIPTION 'reads streaming data from buses.log'
;
Let's look at the options defined above. Some of these options tell s-Server how to connect to the log file, including the directory, pattern for the file, and data type for the file (indicated by parser, which is XML in this case).
name | Value | Explanation |
---|---|---|
CHARACTER_ENCODING | UTF-8 | Character set for data |
DIRECTORY | /tmp | Tells s-Server where the file resides. |
FILENAME_PATTERN | buses\.log | Tells s-Server what to look for in the directory in which the file resides. |
PARSER | XML | Tells s-Server "parse this data as XML" |
You can see a full list of options for reading from the file system below.
Two additional options to tell s-Studio how to read the XML data in the streaming source. This data looks as follows:
<Table1><id>50116195532</id><reported_at>2014-07-23T20:51:58.547</reported_at><speed>0</speed><driver_no>160019</driver_no><prescribed>false</prescribed><highway/><gps>{ "lat": -33.530784606933594, "lon": 150.87783813476562, "bearing": "bearing": 0}</gps></Table1>
The top-level XML tag that we want to parse is called Table1. You indicate the top-level XML tag with an option called PARSER_XML_ROW_TAGS.
To do so, we use two additional options:
name | Value | Explanation |
---|---|---|
PARSER_XML_ROW_TAGS | /Table1 | Tells s-Server "parse XML tags under /Table1" |
PARSER_XML_USE_ATTRIBUTES | false | Tells s-Server "ignore XML tag attributes" |
You can see a full list of options for parsing XML below.
Once the stream is created, you can query it as you would a table.
To do so, open another SQLline terminal and enter the following code:
SELECT STREAM * from "buses";
Note that the query uses the STREAM keyword. You need to use this keyword whenever you query a stream. We will use the STREAM keyword throughout the rest of this tutorial.
Now that we've created a foreign stream, we want to perform a basic calculation on this stream. To do so, well create a new object called a view.
Views are reusable definitions of queries. Every view contains a SELECT statement that reads from one or more streams or tables. Views, then, work as proxies for streams. They provide a flexible, easy-to-modify way to work with streaming data. You can query views themselves, and a query on a view automatically retrieves data from the stream or streams queried by the view. As an s-Server developer, you will do much of the work of cleaning, analyzing, and routing data through views.
Because you can create views linked to one another with SELECT statements, you can use views to create pipelines, linked SQL objects that perform some analytic task. If you have used StreamLab, and viewed the SQL generated by pipeline guides, you may have noticed that pipeline guides consist mostly of interlinked views. It's also the case that multiple views can query the same stream or view. This allows you to fork pipelines.
While views contain queries, data will not flow in s-Server until you run an active SELECT on either the view itself or on a view that SELECTs this view downstream in the pipeline.
Views take the general form CREATE VIEW AS <SELECT STATEMENT>.
The SQL statement below, for example, does a simple filtering of the "buses" stream that we created above, selecting only the columns id and speed.
CREATE OR REPLACE VIEW "Buses_Schema"."View" AS
--the SELECT statement below defines the view.
SELECT STREAM "id","speed"
FROM "Buses_Schema"."buses" AS "input";
When you use views, you can take advantage of all the power of a SELECT statement. So, for example, if you want to select only rows that have speeds greater than 0, you can use the WHERE clause.
The WHERE clause extracts records that meet a specified condition. The condition can be a numeric or string comparison, as in the following examples
SELECT STREAM FROM Sales WHERE Employee='Bob';
SELECT STREAM FROM Sales WHERE Customer_Type=1;
SELECT STREAM FROM Sales WHERE Customer_Type<1;
Here, we'll apply the WHERE clause to the "speeders" column, in order to limit results to those buses going faster than 0 km/hour. That way, we filter out buses that are stopped, so that these do not skew calculations such as average speed.
Enter the following in SQLline:
CREATE VIEW "moving_buses" AS
SELECT STREAM * FROM "Buses_Schema"."buses" where "speed" > 0;
This tells s-Server, "get me all of the columns in a stream called buses in a schema called Buses_Schema where the column speed is greater than 0".
In working with streaming data, you need to be aware of what time data arrives. Because streams continually update, and may update from multiple sources, time is an important concept in Streaming SQL. Time in streams is monotonic, meaning it always goes forward.
This monotonically increasing time is tracked as a column value called ROWTIME. By default, ROWTIME is the time a row enters the stream, though you can also configure the system to assign this value to a time generated by the data source.
This will help you produce meaningful analysis about this data. Every streaming row carries a time value called a rowtime, implemented as a column in every row. The rowtime for newly arriving rows cannot be less than the rowtime for previously received rows (though it can be equal to the rowtime of the current row).
Rowtimes can be implicit or explicit.
Implicit rowtimes are established by the "arrival time" of the row: the time that s-Server receives the row. Even though there is no explicit mention of ROWTIME, it is nevertheless part of that row:
Explicit rowtimes are provided by a timestamp from the data itself.
Note: When setting an explicit ROWTIME, TIMESTAMP must be monotonically increasing from the previous TIMESTAMP. If your rowtimes are out of order, you can sort the data using a time sorting execution object. For more information on t-sorting stream input, see the subtopic T-sorting Stream Input in the topic ORDER BY clause.
In either case, the ROWTIME of the arriving row establishes the current time of the stream, known as the stream clock.
Here, we can create another VIEW that promotes reported_at to ROWTIME. First, we cast reported_at as a TIMESTAMP. The CAST function converts one value expression or data type to another value expression or data type.
Enter the following in SQLline:
CREATE VIEW "buses_rowtime_promoted" AS
SELECT STREAM
--casts "reported_at" as TIMESTAMP, promotes "reported_at" as ROWTIME
CAST("reported_at" AS TIMESTAMP) AS ROWTIME,
--other columns for view
"id","speed","driver_no","prescribed","gps","highway"
--this is the view we created in the previous step
FROM "Buses_Schema"."moving_buses";
You can also use views with windows. By including a WINDOW clause in a SELECT query, you can specify that the query applies to rows in a stream partitioned by the time range interval or a number of rows. This allows you to perform calculations for each output row, such as AVG or MAX. Calculations are performed over the window you specify, such as "1 MINUTE" or "20 ROWS".
In a conventional database, you would not need a window to perform such calculations, because the query would know what rows are available in advance. But because of the nature of streaming data, you have to identify a subset of rows in the stream on which to perform calculations.
Here, we'll create a view that averages bus speed over 1 minute.
CREATE OR REPLACE VIEW "avg_buses" AS
SELECT STREAM AVG("speed") OVER "aWindow" AS "avg_speed",*
--note that we use the view that we defined above
FROM "Buses_Schema"."buses_rowtime_promoted"
--uses a WHERE clause to filter out buses that are not moving
WINDOW "aWindow" AS (RANGE INTERVAL '1' MINUTE PRECEDING);
You can see the new column by running the following query:
select stream "avg_speed" from "avg_buses_speed";
you should see a column called avg_speed that contains the average speed of buses over the past minute.
When you viewed data flowing, you have probably noticed that the column gps contains unparsed JSON. While you can often parse data as it enters s-Server, there are times when you may need to parse data within a stream. This is known as mid-stream parsing.
In order to proceed, we need to parse this data. We can do so using the Parser UDX. The Parser UDX lets you call one of s-Server's data parsers.
In order to parse JSON, we need to create a function that calls one of s-Server's predefined user-defined transforms or UDXs. A user-defined transform returns a set of rows or a stream of rows. Its input arguments can be scalars or cursors. A cursor is an object representing a subquery, which the user-defined transform can use to read the subquery results.
Here, we'll use the Parser UDX. The Parser UDX lets you apply one of s-Server's predefined parsers to a column of data (in this case, the gps column in buses_rowtime_promoted).
Enter the following in SQLline:
CREATE OR REPLACE FUNCTION "json_parser"
(
INPUT CURSOR,
COLUMNNAME VARCHAR(256),
PARSERCLASSNAME VARCHAR(256),
OPTIONS CURSOR)
RETURNS TABLE (
--INPUT is the original stream, entered as a cursor for the function
INPUT.*,
--these are the columns that you expect to see in the JSON
"lat" DOUBLE,
"lon" DOUBLE,
"bearing" INTEGER
)
SPECIFIC "json_parser"
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
NOT DETERMINISTIC
--this is the name of the class for the UDX
EXTERNAL NAME 'class com.sqlstream.aspen.namespace.common.ParserUdx.parseColumn';
Next, we write a view that references the json_parser function that we just defined.
The view below contains a SELECT statement. Some of the columns in the SELECT statement come directly from the stream buses_rowtime_promoted. Other columns--those prefixed with JSON--are the result of the parser UDX running. The prefix "adorns" these columns, indicating columns that result from changes made by the SELECT statement.
The SELECT statement passes input to the function that we defined above: a CURSOR, the column to be parsed, the type of parser, and options for the parser.
Enter the following in SQLline:
CREATE OR REPLACE VIEW "buses_json_parsed" AS
SELECT STREAM "id","speed","driver_no","prescribed",
--these columns use the alias "JSON" to indicate that they are columns produced after the SELECT statement runs buses_rowtime_promoted
JSON."lat" AS "lat", JSON."lon" AS "lon", JSON."bearing" AS "bearing","highway"
FROM STREAM (
--this is the name of the function that we defined above
"json_parser"(
--this is the input for the function (known as INPUT in function definition).
CURSOR
(SELECT STREAM * FROM "buses_rowtime_promoted"),
--column from "buses_rowtime_promoted" to be parsed (known as COLUMN in function definition).
'gps',
--type for parser (known as PARSERCLASSNAME in function definition).
'JSON',
--these are options for the JSON parser (in function definition).
CURSOR(SELECT * FROM (values ('$')) AS OPTIONS(ROW_PATH))
))
AS JSON;
Next, we want to be sure that our values for lat and lon do not contain any values of 0. To do so, we can create another view, along the following lines. Note that you can use the AND keyword with the WHERE clause.
CREATE OR REPLACE VIEW "buses_filtered_lat_lon" AS
SELECT STREAM * FROM "Buses_Schema"."buses_json_parsed" where "lat" > 0 AND "lon" > 0;
Next, we want to set up the column "speed" so that instead of listing numerical speeds, it categorizes the column into "speeders" and "normal".
Enter the following in SQLline:
CREATE OR REPLACE VIEW "speed_categorized" AS
SELECT STREAM "id",(CASE WHEN "speed" >= 45 THEN 'speeding' ELSE 'normal' END) AS "speed","driver_no","prescribed","lat","lon","bearing"
FROM "Buses_Schema"."buses_filtered_lat_lon";
You can test this view by entering the following query:
select stream * from "speed_categorized";
Now that we have promoted a timestamp from the data to ROWTIME, cleaned and prepared the data, and categorized the speed column into speeding and normal, we are ready to display data on the speeding buses in a dashboard.
Now that you've created a view with speeding buses and their latitude/longitude, you can create a dashboard to display these buses on a map.
Dashboards are web pages that contain multiple panels, each of which can connect to a different stream, view, or table. Each panel contains a visualization. These are flexible modes of viewing your data, including simple tables, points on a map, line plots, bar graphs, area maps, and so on.
These all use column-row combinations to plot data. Panels can be changed in terms of both layout and data input. Dashboards will be most useful for streaming data, as you will be able to see data changing in real time.
The dashboard below shows the bus data in tabular form and bus locations on a map of Sydney. This is the dashboard that we will create in the steps below. By using adjustable panels, s-Dashboard lets you view multiple such objects at once. Each dashboard can be laid out with multiple panels, in combinations that you can change by adjusting panel layout.
In the following exercise, we'll create a map dashboard. This type of dashboard takes values for latitude (lat) and longitude (lon) as input, and uses these values to display locations on a map.
To do so:
Click the SQLstream logo in the upper right corner of the map panel and choose Input Preferences. In the window that opens, enter the following in the SELECT template field.
<%= select %> "speed" AS "key","id","lat","lon" FROM <%= from %>
Click Update.
Click the gear icon to the right of Geographical and 3-D Streams:Pan & Zoom Map to open panel preferences.
Make the following changes:
Click Update You should see a map with red and green markers. Click one of the markers to see the id for the reported bus.