Writing a Python UDX

This topic contains the following subtopics:

An Example Python UDX

This section introduces a sample Python streaming UDX for SQLstream. Here, readfile.py is a Python function that reads lines from a set of files defined in a specified directory and returns the filename, line number and line as an output which will be returned to SQLstream.

The Python module readfile.py

This UDX is unusual in that it emits streaming data but it does not have a streaming input. It is an example of using a Python UDX as a source plugin.

#!/usr/bin/python,  

# coding=utf-8 

from pyUdx import Connection 

import time 
import os 
import re 

connection=Connection() 
out = connection.getOutput() 
cursor = connection.getInput(0) 

fdir = connection.getParameter("dirname") 
fpatString = connection.getParameter("fpattern") 
fpat = re.compile(fpatString) 

filenames = [f for f in os.listdir(fdir) if re.match(fpat, f)] 
for fn in filenames: 

    inFile = open(fdir+'/'+fn, "r") 
    lines = inFile.read().split('\n') 
    inFile.close() 

    millis = int(round(time.time() * 1000)) 

    i = 0 
    for line in lines: 
        i += 1 
        out.executeUpdate(millis,fn,i,line) 

In this example,

  • pyUDX is the package that supports integration with SQLstream.
  • Connection is the class provided by pyUdx
  • The cursor object created using connection.getInput() allows you to read streaming input data. You can also create a Cursor object using the cursor() method of the Connection object/class.
  • out = connection.getOutput() provides an API to send output downstream.
  • Columns passed from input to output without change are handled by the UDX framework and don’t need to be copied explicitly.
  • The out.executeUpdate() statement takes a list of values and passes them to the output row. In this example, it returns the timestamp, filename, line number and a line.
    • Formally, this function is declared as executeUpdate(*args). This means you can either list out the arguments one by one (as in this example) or provide them as a tuple.
  • In this example, once all the matching files have been read, we close the connection using close() function. This passes an end-of-stream condition to the next stage of the streaming pipeline.

Declaring a SQL Function to call the Python UDX

Just like a Java UDX, for a Python UDX to be available in s-Server you need to declare it as a function in SQL. For more details, see the topic CREATE FUNCTION in the Streaming SQL Reference Guide. The following sample code readfile.sql defines the SQL function that is implemented by the above UDX.

In this readfile example we define a dummy stream to use as input. We are not actually expecting any data on this stream, but it is a requirement for any Python UDX function to have an input as well as an output.

create or replace schema test; 

set schema 'test'; 
set path 'test'; 

create or replace stream dummy (dummy int); 
drop function readfile; 

create or replace function readfile(C cursor, "dirname" varchar(250), "fpattern" varchar(250)) 
returns table 
( rowtime timestamp not null,
  fname varchar(100),
  line_no int,
  line varchar(100) 
) 
language EXTERNAL 
no sql 
no state 
external name '/usr/bin/python3 /home/sqlstream/pyread/readfile.py'; 
  • the readfile() function takes the directory name and filename pattern as a parameters and return the results such as timestamp, filename, line number and a line.
  • /usr/bin/python3 identifies which python executable to use. SQLstream containers and VMs come with python3 pre-installed.
  • /home/sqlstream/pyread/readfile.py gives the full path of the python module.

Calling the UDX

You can call the function like this:

set schema 'test';,  
set path 'test'; 
select stream * 
from stream(
    readfile
        ( cursor(select stream * from dummy)  -- the unused input stream
        , '/home/sqlstream/pythonudx'         -- the directory to search
        ,'.*.sql'                             -- the regex file matching pattern
        ));                          

Example UDX - concatenate columns together

makestring.py

Here is a python module which extracts some columns from the input and emits a stringified concatenation of those columns. The mechanism of stringify is not important here.

#!/usr/bin/python
from pyUdx import getConnection
import numpy as np
from testHelper import stringify
udx = getConnection()

cursor = udx.getInput(0)

rowSlice = cursor.bindToArray(udx.getParameter("WHICH"))
out = udx.getOutput()
while cursor.next():
    out.executeUpdate(stringify(rowSlice))

makestring.sql

The corresponding SQL function can be defined with a cursor and a signature:

create or replace function makestring(C cursor, WHICH select from c)
returns table
    (C.* passthrough
    ,outstring varchar(1024)
    )
language EXTERNAL 
no sql 
no state 
external name '/usr/bin/python3 /home/sqlstream/makestring.py'; 

In this example the cursor parameter is C and the signature is WHICH. The output signature for the rows consists of all the columns of C - which are passed through unchanged (the passthrough modifier) plus the calculated column outstring.

The function can be called, passing a kind of row type row(f1,f2) as follows:

select * from stream(makestring(cursor(select * from someSourceString), row(f1,f2))); 

This tells the UDX framework which input columns (f1 and f2) will be concatenated together; and within the Python module, the rowtype can be used in this way:

rowSlice = cursor.bindToArray(udx.getParameter("WHICH")) 

This binding happens once at the beginning of UDX. Within the while cursor.next() loop, the input column values for columns f1 and f2 are copied into rowslice. Then out.executeUpdate(stringify(rowSlice)) passes those to the stringify function, and the string result is returned into the outstring column.

Cursor Function signature within a UDX

If the names of columns to be read by the Python UDX are known / fixed, they can be coded into the UDX itself and don't need to be specified in the SQL function definition, or in the call to the function.

columnList = [ 'col1', 'col2', 'col3']
rowSliceGetter = cursor.bindToArray(tuple(columnList)) 

Data Type Mapping

These are the mappings between SQL and Python types supported by the SQLstream Python UDX framework.

Note that this table only applies to data that actually needs to be handled by the Python UDX function. That includes:

  • input data derived from getParameter(),
  • input data derived from the cursor row (via cursor.bind() or cursor.bindToArray()),
  • data that is output by executeOutput().

Any column values that are simply passed through without change never get handed from SQLstream to Pythonm so are never converted between SQL and Python types.

SQL Data Type Python Type Notes
BIGINT int
BOOLEAN bool
BINARY bytes
CHAR str
DATE N/A You may CAST(date_column AS TIMESTAMP) and use the timestamp (as an int) in Python
DECIMAL int DECIMAL scale 0 can safely be treated as int

Decimal with fractions treated as implicit fixed point so DECIMAL(5,3) value 5.230 is mapped to the integer 5230. A DECIMAL(6,4) value 5.2300 would be mapped to 52300 - so make sure you know how to work with decimals of different scales.
DOUBLE float
INTEGER int
INTERVAL N/A SQLstream does not support INTERVAL as a column value
NUMERIC N/A You may CAST(numeric_column as DECIMAL(p,s)) and then treat as int in Python (taking scale into account as for DECIMAL above).
REAL N/A You may CAST(real_column AS FLOAT)
SMALLINT int
TIME N/A
TIMESTAMP int The int value contains the Unix epoch in milliseconds
TINYINT int
VARBINARY bytes Not currently supported for output
VARCHAR str

When reading from a table or stream, any unsupported types input types are represented by the (string) value 'unhandled!!!'.

Troubleshooting a Python UDX

If you find any problems with a python UDX, your first ports of call for more information are:

  • the s-Server trace file - usually in /var/log/sqlstream/Trace.log.0
  • the s-Server console output. If s-Server is being run as a service this may appear in the syslog; if the server is being run as a container it may appear in the docker (or Kubernetes) log.

Any errors reported to the trace file should also be reported into the ALL_TRACE global error stream and SAVED_TRACE error table.

Adding Trace Messages in a Python UDX

You can add trace messages into your Python UDX using the traceXXX(message) functions of the Connection object. The supported trace levels are as for Java logging:

  • traceSevere(message)
  • traceWarning(message)
  • traceInfo(message)
  • traceConfig(message)
  • traceFine(message)
  • traceFiner(message)
  • traceFinest(message)

You can get the current trace level using getTraceLevel(), and you may provide the trace level as a parameter to the trace(message) function:

  traceLevel = connection.getTraceLevel()
  
  connection.trace(pyUdx.TRACE_INFO, "info level testMessage tracelevel=%d" % traceLevel)

The trace level can be any of:

  pyUdx.TRACE_OFF
  pyUdx.TRACE_SEVERE
  pyUdx.TRACE_WARNING
  pyUdx.TRACE_INFO
  pyUdx.TRACE_CONFIG
  pyUdx.TRACE_FINE
  pyUdx.TRACE_FINER
  pyUdx.TRACE_FINEST

Here is a complete Python UDX movefile.py with tracing examples:

#!/usr/bin/python
#  Takes files from one directory and moves them to another (if the directory names are different)
#  Filename, input directory, output directory must be passed in that order as a row(fn, in, out)
# passes data through

from pyUdx import Connection
import shutil
import os

connection=Connection()
out = connection.getOutput()
cursor = connection.getInput(0)

rowSlice = cursor.bind(connection.getParameter("WHICH"))

while cursor.next():

    (fn, inDir, outDir) = rowSlice.value()

    msg = None
    success = False

    try:

        # some basic checks

        if not os.path.exists(inDir):
            msg = "Error: input directory '%s' does not exist" % inDir
        elif not os.path.exists(outDir):
            msg = "Error: output directory '%s' does not exist" % outDir

        if msg == None:
            msg = shutil.move(inDir + '/' + fn, outDir+'/'+fn)
            success = True
            connection.traceInfo("Moved %s from %s to %s" % (fn, inDir, outDir))

    except Exception as e:
        # move failed - why?
        msg = str(e)
        connection.traceSevere("Moving %s from %s to %s - %s" % (fn, inDir, outDir, msg))

    out.executeUpdate(success, msg)