Using UDXs and UDFs for Analysis

You can use User-Defined Routines, User-Defined Functions, User-Defined Procedures, and User-Defined Transformations to extend Guavus SQLstream’s analytic capabilities.

See the overview of built-in UDXs in the Analyzing Data Guide.

User Defined Functions and Transforms

SQLstream enables programmers to add functionality by defining routines of several kinds as described below:

Using UDFs: the Geolocation Example

Creating user-defined functions can be a multi-step process, especially when implemented externally via Java code.

To help explain things, let’s examine the process of creating a few user-defined functions to help deal with Global Positioning System (GPS) coordinates for use in geolocation.

Creating UDFs in SQL

We’ll begin by creating user-defined functions in SQL that help us manipulate measurements in degrees.

First, create a new schema to handle our extensions and set the schema and path to point to it:

CREATE SCHEMA geolocate_sql;
SET SCHEMA 'geolocate_sql';
SET PATH 'geolocate_sql';

Next, create a function that extracts the fractional portion of a degree measurement.

CREATE FUNCTION get_fraction( degrees DOUBLE )
   RETURNS DOUBLE
   CONTAINS SQL
   RETURN degrees - FLOOR(degrees)
;

Next, use get_fraction to build a UDF that extracts minutes from a degree measurement:

CREATE FUNCTION get_minutes( degrees DOUBLE )
   RETURNS INTEGER
   CONTAINS SQL
   RETURN CAST( (get_fraction(degrees) * 60.0) as INTEGER )
;

With both of these functions in hand, you can build yet another UDF to extract seconds:

CREATE FUNCTION get_seconds( degrees DOUBLE )
 RETURNS INTEGER
 CONTAINS SQL
 RETURN CAST(((get_fraction(degrees) * 3600.0) - ((get_minutes(degrees) * 60)))
​               as INTEGER )
;

You can now create one more function that brings them all together to convert from degrees in decimal format to a fully specified string indicating degrees, minutes, and seconds:

CREATE FUNCTION to_dms( degrees DOUBLE )
   RETURNS VARCHAR(20)
   CONTAINS SQL
   RETURN
​       ( CAST( CAST(degrees AS INTEGER) AS VARCHAR(20) ) ||
​       'd ' ||
​       CAST( get_minutes(degrees) AS VARCHAR(20) ) ||
​       'm ' ||
​       CAST( get_seconds(degrees) AS VARCHAR(20) ) ||
​       's' )
;

Creating UDFs in Java

In the previous section, you created four User-Defined Functions using SQL. Though SQL is perfectly sufficient for these functions, each of them could also have been created using Java. To create them in Java, you would take the following steps.

First, create a new schema to handle the extensions and set the schema and path to point to it:

CREATE SCHEMA geolocate_java;
SET SCHEMA 'geolocate_java';
SET PATH 'geolocate_java';

Second, create a Java class that implements the conversions. Create a Degree.java file containing the following code under a com/sqlstream/examples/geolocate directory:

package com.sqlstream.examples.geolocate;
public class Degree {
   public static double getFraction( double degrees ) {
​       return( degrees - Math.floor(degrees) );
   }
   public static int getMinutes( double degrees ) {
​       return( (int)(getFraction(degrees) * 60.0) );
   }
   public static int getSeconds( double degrees ) {
​       return (int)(((getFraction(degrees) * 3600.0)
​                      \- ((getMinutes(degrees) * 60)) ) );
   }
   public static String toDegMinSec( double degrees) {
int degs = (int)degrees;
 double mins = getMinutes(degrees);
double secs = getSeconds(degrees);
return(degs + "deg " + mins + "m " + secs + "s");
   }
}

Third, after compiling Degree.java, package the Java class up into a JAR file so that SQLstream s-Server can load it. Create the jar from the directory containing com/sqlstream/examples/geolocate:

 jar cf gps.jar com

Fourth, move the newly create gps.jar file to the machine SQLstream s-Server is running on (in the example below, it is in /home/aspen/gps.jar), and use CREATE JAR to install the JAR into SQLstream s-Server:

CREATE OR REPLACE JAR '"GPSFuncsJavaLib"'
LIBRARY 'file:/home/aspen/gps.jar'
OPTIONS (0);

At this point, you can use CREATE FUNCTION to make these routines available from SQL (“EXTERNAL NAME” is explained in the topic CREATE FUNCTION in the Streaming SQL Reference Guide):

CREATE FUNCTION get_fraction( degrees DOUBLE )
   RETURNS DOUBLE
   LANGUAGE JAVA
   NO SQL
   EXTERNAL NAME 'GPSFuncsJavaLib:com.sqlstream.examples.geolocate.Degree.getFraction'
;

CREATE FUNCTION get_minutes( degrees DOUBLE )
   RETURNS INTEGER
   LANGUAGE JAVA
   NO SQL
   EXTERNAL NAME 'GPSFuncsJavaLib:com.sqlstream.examples.geolocate.Degree.getMinutes'
;

CREATE FUNCTION get_seconds( degrees DOUBLE )
   RETURNS INTEGER
   LANGUAGE JAVA
   NO SQL
   EXTERNAL NAME 'GPSFuncsJavaLib:com.sqlstream.examples.geolocate.Degree.getSeconds'
;

CREATE FUNCTION to_dms( degrees DOUBLE )
   RETURNS VARCHAR(20)
   LANGUAGE JAVA
   NO SQL
   EXTERNAL NAME 'GPSFuncsJavaLib:com.sqlstream.examples.geolocate.Degree.toDegMinSec'
;

The above steps duplicate user defined functions in Java that could have been created in SQL. Java functions, however, allow access to a much richer set of capabilities.

As an example, let’s create a user-defined function that calculates the great-circle distance in miles between two geographic locations, each of which is measured in degrees latitude and longitude. We’ll do this using a simple form of the great-circle distance equation, calculating the central angle using the spherical law of cosines, and multiplying by the earth’s radius in miles. This requires the use of cosine, sine, and arcosine function which do not exist in SQL, but do exist in the java.lang.Math package.

Create a Waypoint.java in the same directory as Degree.java containing the following code:

package com.sqlstream.examples.geolocate;
import java.lang.Math;
public class Waypoint {
   public static double getDistanceMiles( double lat1,
​                                  double long1,
​                                  double lat2,
​                                  double long2 ) {
​       // Use the Great Circle Distance Formula
​       // to calculate distance in miles
double rlat1 = Math.sin(Math.toRadians(lat1));
double rlat2 = Math.sin(Math.toRadians(lat2));
double rlong1 = Math.sin(Math.toRadians(long1));
double rlong2 = Math.sin(Math.toRadians(long2));
double cangle = Math.acos( Math.sin(rlat1) * Math.sin(rlat2) +
​                                  Math.cos(rlat1) * Math.cos(rlat2) *
​                                  Math.cos(rlong2-rlong1) );
​       return ( 3963.0 * cangle );
   }
}

Compile Waypoint.java, and create a new waypoint.jar file containing both Degree and Waypoint:

 jar cf gps.jar com

Now, drop and recreate the jar file.

DROP JAR GPSFuncsJavaLib
 OPTIONS(0)
 CASCADE;
CREATE JAR GPSFuncsJavaLib
 LIBRARY 'file:/home/aspen/gps.jar'
 OPTIONS(0);

Finally, use CREATE FUNCTION to access the new Java extension:

CREATE FUNCTION find_distance
​            ( lat1 DOUBLE,  long1 DOUBLE, lat2 DOUBLE, long2 DOUBLE )
RETURNS DOUBLE
LANGUAGE JAVA
NO SQL
EXTERNAL NAME
 'GPSFuncsJavaLib:com.sqlstream.examples.geolocate.Waypoint.getDistanceMiles'
;

Note that dropping the jar file also required dropping all of the functions that depend on it, you can accomplish using the CASCADE keyword. The degree functions can be recreated using the same CREATE FUNCTION calls we used the first time around; that step is omitted here for brevity.

Calling Java UDFs from SQL UDFs

A UDF defined in SQL can call a UDF defined in Java:

CREATE FUNCTION to_miles( lat1 DOUBLE,  long1 DOUBLE, lat2 DOUBLE, long2 DOUBLE )
   RETURNS VARCHAR(30)
   CONTAINS SQL
   RETURN ( CAST( find_distance(lat1, long1, lat2, long2) AS VARCHAR(20) )
           || ' miles'  )

;

Using UDXs: the Session Identification Example

One common task is to extract session information from a series of related events.

For example, consider a provider of mobile data and communications services. Users log on to the mobile network via a dedicated device, proceed to use one or more services available on that network (such as chatting with a friend using a messaging client or making a call with a VOIP application) and eventually log off. In some cases, services may be “nested” further; for example, a user might initiate and complete a file transfer from within the messaging client.

From the provider’s point of view, these usages may be represented by a series of “open” and “close” events for each individual service, identified by the service name. Nesting is indicated by event ordering; if a second service is opened before the previous event is closed, it is considered to be a child of the previously opened service.

In addition, each event carries information about the source and target IP addresses, as well as other information related to timing and bandwidth used. Timing for each event is recorded using the row’s rowtime. For a “close” event, the amount of bandwidth used by the service is recorded in the byteCount (this column is null for “open” events).

The goal of the provider is to use SQLstream to take this event information and knit it together to identify session information, where individual sessions are associated with a unique combination of IP addresses. Further, the desire is to enhance the incoming data to identify the session path for each event, where the session path is the hierarchy of service sessions back to the root.

Java source code, SQL scripts, and sample data for this example may be found in examples/sessionId/…

Setting Schema and Path

For all of the example code below, we’ll need to define a schema and path:

CREATE OR REPLACE SCHEMA session_id;
SET SCHEMA 'session_id';
SET PATH 'session_id';

Getting Events

The first problem we’ll face is in getting the events into SQLstream, so that each event can be represented as a row in a stream. This can be accomplished either through the creation and use of a custom network probe adapter exposed via foreign streams, or as a native stream fed by explicit inputs from an external process.

Getting Events Via a Native Stream

A simple way to access events is to create a stream with the appropriate columns, then populate it via inserts from an external process. This is very handy when initially developing and testing a solution, since it reduces complexity and makes it easier to develop SQL in isolation from the larger system.

CREATE STREAM RawDataRecords (
   serviceName VARCHAR(10) NOT NULL,
   sourceIp VARCHAR(15) NOT NULL,
   destIp VARCHAR(15) NOT NULL,
   byteCount INTEGER,
   action VARCHAR(10)
);

Getting Events Via A Custom Network Probe Adapter

In an actual production environment, it might be better to create a custom network probe adapter that extracts data from an event-producing server, then use that adapter to create a foreign stream. In some cases, there may actually be several such foreign servers, in which case you might want to create multiple foreign streams and then combine them into a single view. See the topic CREATE VIEW in the Streaming SQL Reference Guide for more details/

Since any actual adapter would be highly application-specific, and the purpose of this section is to focus on SQL usage, you can forgo the actual creation of the adapter and simply assume it exists in /usr/lib/sqlstream/adapters/customProbeReader.jar:

Probe Reader Adapter

Set up the adapter to read from the probe(s):

CREATE FOREIGN DATA WRAPPER probeWrapper
LIBRARY '/usr/lib/sqlstream/adapters/customProbeReader.jar'
LANGUAGE JAVA;
CREATE SERVER ProbeServer
FOREIGN DATA WRAPPER probeWrapper;

Probe Input Stream

This is our assumed foreign input stream, created by an adapter running on the network probe:

CREATE FOREIGN STREAM RawDataRecords1
ON SERVER ProbeServer (
   serviceName VARCHAR(10) NOT NULL,
   sourceIp VARCHAR(15) NOT NULL,
   destIp VARCHAR(15) NOT NULL,
   byteCount INTEGER,
   action VARCHAR(10)
)
OPTIONS (host 'probe1.domain.com');

Working with Multiple Probe Streams

A similar foreign input stream, created by another instance of the adapter on a different probe:

CREATE FOREIGN STREAM RawDataRecords2
ON SERVER ProbeServer (
   serviceName VARCHAR(10) NOT NULL,
   sourceIp VARCHAR(15) NOT NULL,
   destIp VARCHAR(15) NOT NULL,
   byteCount INTEGER,
   action VARCHAR(10)
)

OPTIONS (host ‘probe2.domain.com’);

Combining Multiple Probe Streams Into A Single View

CREATE VIEW RAWDATARECORDS AS
SELECT STREAM * FROM RAWDATARECORDS1
UNION ALL
SELECT STREAM * FROM RawDataRecords2;

Normalizing Source and Destination IP Addresses

In the course of a given session, data may flow both ways between two servers (for example, the user in an instant messaging session may download a file from the other server). The provider wants to incorporate this information regardless of which host initiated the connection.

To make it easy to recognize IP address pairs, we’ll use user-defined functions to create a normalized view of the lesser and greater IP addresses.

Creating User-Defined Functions

CREATE FUNCTION lesser(s1 VARCHAR(15), s2 VARCHAR(12))
   RETURNS VARCHAR(15)
   CONTAINS SQL
   RETURN CASE WHEN s1 < s2 THEN s1 ELSE s2 END;
CREATE FUNCTION greater(s1 VARCHAR(15), s2 VARCHAR(12))
   RETURNS VARCHAR(15)
   CONTAINS SQL
   RETURN CASE WHEN s1 < s2 THEN s2 ELSE s1 END;

Creating A Normalized View

CREATE VIEW SymmetricDataRecords AS
SELECT STREAM
   serviceName,
   action,
   byteCount,
   sourceIp,
   destIp,
   lesser(sourceIp, destIp) AS lowerIp,
   greater(sourceIp, destIp) AS upperIp
FROM (
   SELECT STREAM
​       *
   FROM RawDataRecords
);

Assigning Session IDs

Next, assign a unique session id to each sequence for a given service between a pair of hosts. You can do this by incrementing the session id whenever you encounter an “open” event.

Note: You can also use SESSION ON with sliding windows to implement sessions. See the subtopic Implementing Sliding Windows Using SESSION ON in the Guavus s-Server Streaming SQL Reference Manual.

CREATE VIEW SessionedDataRecords AS
SELECT STREAM
   SUM(
​      CASE action
​      WHEN 'Open' THEN 1
​      ELSE 0
​      END) OVER session AS sessionId,
   *
FROM SymmetricDataRecords
WINDOW session AS (
​    PARTITION BY serviceName, lowerIp, upperIp ROWS UNBOUNDED PRECEDING);

Note: This counts sessions forever, but only since the server started. So if you stop and start the server, the session count will reset to 1.

Since this code uses UNBOUNDED PRECEDING, rows never drop out of the window. It maintains running totals for each triplet (serviceName, lowerIp, upperIp). As long as there is a bounded number of triplets, memory usage will not become unlimited.

Identifying Endpoint Pairs

Now that you have unique session IDs, the goal is to enhance each data record to identify not only the unique session, but also the parent session (if any) and the full parent-child hierarchical path back to the root ancestor. For example, a sessionPath [4, 10, 17] indicates that session 17 is a child session of 10, which is a child session of 4. A record will be emitted for all 3 sessions, so that the parent sessions' totals add up.

The Nester UDX

To add hierarchical session information to the incoming data stream, you create a user-defined transform (UDX) that tracks each incoming row, creates a map of the session hierarchy, and uses that to add a parent service name, parent session ID and the session path to each incoming row before writing the row out again. The assumptions this UDX makes are as follows:

  1. Each row contains a column indicating session ID and action. These columns are identified by the actionLabel and sessionLabel parameters, respectively.
  2. An action column value of openToken indicates the start of a session, and a value of closeToken indicates the end of the session.
  3. The action column may contain values other than openToken or closeToken, but such values have no effect on session definition. Any such rows will be output with the same parent session ID and session path as the preceding openToken in the same session.
  4. Each row containing openToken is associated with one and only one row containing closeToken for a given session ID.
  5. If a new session is opened before the previous one is closed, the previous session is the parent of the new one.
  6. Incoming rows may have an arbitrary number of other columns; these will be included verbatim in the corresponding output rows

The java source code and instructions for building the session nester UDX may be found in examples/sessionId/…. Once the UDX has been built, it will reside in examples/sessionId/lib/nester.jar.

Before the UDX can be used, it must be installed and registered (For more information, see the topic CREATE FUNCTION in the Streaming SQL Reference Guide):

CREATE OR REPLACE JAR '"nesterJar"'
LIBRARY 'file:examples/sessionId/lib/nester.jar'
OPTIONS (0);

CALL SQLJ.INSTALL_JAR('file:examples/sessionId/lib/nester.jar', '"nesterJar"', 0);
​     ( ResultSet inputSet
​     , String actionLabel, String sessionLabel
​     , String openToken, String closeToken
​     , PreparedStatement resultInserter)
CREATE FUNCTION sessionNester (
c CURSOR,
actionLabel VARCHAR(128),
sessionLabel VARCHAR(128),
serviceLabel VARCHAR(128),
partKey1 VARCHAR(15),
partKey2 VARCHAR(15),
openToken VARCHAR(128),
closeToken VARCHAR(128))
 RETURNS TABLE (
 rowtime TIMESTAMP,
sessionId INTEGER,
serviceName VARCHAR(10),
action VARCHAR(10),
byteCount INTEGER,
sourceIp VARCHAR(15),
destIp VARCHAR(15),
lowerIp VARCHAR(15),
upperIp VARCHAR(15),
startTime TIMESTAMP,
vstopTime TIMESTAMP,
parentServiceName VARCHAR(32),
parentSessionId INTEGER,
sessionPath VARCHAR(128))
 LANGUAGE JAVA
 PARAMETER STYLE SYSTEM DEFINED JAVA
 NO SQL
 EXTERNAL NAME 'session_id."nesterJar":com.sqlstream.sessionid.Nester.sessionNester';

Creating the Session Enhanced View

Now that the session nester UDX has been installed and registered, you can use it to create an enhanced view:

CREATE VIEW EnhancedDataRecords (
   sessionId,
   serviceName,
   action,
   byteCount,
   sourceIp,
   destIp,
   lowerIp,
   upperIp,
   startTime,
   stopTime,
   parentServiceName,
   parentSessionId,
   sessionPath) AS
SELECT STREAM *
FROM TABLE (sessionNester(
   CURSOR(SELECT STREAM * FROM SessionedDataRecords),
   'ACTION', 'SESSIONID', 'SERVICENAME', 'LOWERIP', 'UPPERIP', 'Open', 'Close'));

Summary View

An example of a summary view - hourly totals of bandwidth used (as defined by byteCount) for each session:

CREATE VIEW HourlySessionSummary (
   sessionId,
   serviceName,
   sourceIp,
   destIp,
   startOfHour,
   byteCount) AS
SELECT STREAM
   sessionId,
   serviceName,
   sourceIp,
   destIp,
   FLOOR(rowtime TO HOUR) AS startOfHour,
   SUM(byteCount) AS byteCount
FROM EnhancedDataRecords
GROUP BY
   sessionId,
   serviceName,
   sourceIp,
   destIp,
   FLOOR(rowtime TO HOUR);