Quadratic Interpolation UDX

The use case handled by this UDX is as follows: Imagine that you have a stream of sensor readings being produced by a large number of devices in the wild. Each device has many sensors. Each device produces a row of sensor data at some regular cadence, say, one row per minute. Each device is identified by one or more key columns in the row. Due to flaky networks, from time to time there will be missing rows. We want to reconstruct that missing data.

The quadratic_interpolator UDX has the same arguments as the linear_interpolator() UDX and addresses the same use-case. However, the quadratic_interpolator() waits until it has 3 rows for a device key before interpolating missing rows for that device. Like the other interpolator, quadratic_interpolator() manufactures a row for each missing sequence number between the next-to-the-last row and the last row received for the device. Here is the signature of the quadratic_interpolator UDX:

create or replace function quadratic_interpolator
(
  inputStream cursor,
  keycolumnNames cursor,
  sequenceNumberColumnName varchar(128),
  valueColumnNames cursor
)
returns table
(
  inputStream.*
)
language java
parameter style system defined java
no sql
external name 'class com.sqlstream.aspen.syslib.InterpolatorUDX.quadraticInterpolator';

The arguments have the following meaning:

  • inputStream. This is an incoming stream of data generated by devices in the wild.

  • keyColumnNames. This is a VALUES clause. It lists the names of the key columns in the inputStream. The key columns form a unique identifier for the device.

  • sequenceNumberColumnName. This is the name of a BIGINT column in the inputStream. This column increases monotonically for each device. That is, this column increments at the cadence that the rows are being generated.

  • valueColumnNames. This is another VALUES clause. It lists the names of the columns which hold the sensor readings.

The UDX produces an output stream which has the same shape as the input stream. When the UDX notices that there is a gap in the sequence numbers of a device’s rows, the UDX reconstructs the missing rows. For each missing sequence number, the UDX manufactures a row, using quadratic interpolation to fill in the sensor readings.

You can employ the same technique for generating missing sequence numbers as you do for the linear_interpolator() UDX.

Here is an example of the UDX in action, along with a pump which manufactures sequence numbers:

Given the following stream of data coming from the devices:

create stream rawClimateReadings
(
  buildingNumber bigint,
  roomNumber bigint,
  temperature double,
  humidity double
);

This is the stream after adding sequence numbers to it

create stream climateReadings
(
  buildingNumber bigint,
  roomNumber bigint,
  sequenceNumber bigint,
  temperature double,
  humidity double
);

This is the pump which adds sequence numbers to the raw data

create pump climateReadingsPump stopped as
 insert into climateReadings
 SELECT STREAM
  s.buildingNumber, s.roomNumber,
  unix_timestamp(s.rowtime) / 60000,
  s.temperature, s.humidity
 from rawClimateReadings s;

alter pump climateReadingsPump start;

This is a query which uses the quadratic_interpolator to reconstruct missing rows:

SELECT STREAM *
from stream
(
  sys_boot.mgmt.quadratic_interpolator
  (
    cursor(SELECT STREAM * from climateReadings),
    cursor(values ('BUILDINGNUMBER'), ('ROOMNUMBER')),
    'SEQUENCENUMBER',
    cursor(values ('TEMPERATURE'), ('HUMIDITY'))
  )
);

The query produces the following output…

'BUILDINGNUMBER','ROOMNUMBER', 'SEQUENCENUMBER','TEMPERATURE', 'HUMIDITY'

------

'100','32', '24300481', '70.0', '0.33'

'200','4', '24300483', '60.0', '0.1'

'100','31', '24300483', '65.0', '0.33'

'100','32', '24300484', '73.0', '0.37'

'200','4', '24300485', '61.0', '0.09'

'200','4', '24300486', '61.5', '0.1'

'200','4', '24300487', '61.0', '0.11'

'100','32', '24300485', '73.5', '0.3742857142857142'

'100','32', '24300486', '73.75','0.3740476190476191'

'100','32', '24300487', '73.75','0.3692857142857143'

'100','32', '24300488', '73.5', '0.36'

'200','4', '24300488', '61.1', '0.09'

'200','4', '24300489', '61.1', '0.09'

'100','32', '24300489', '73.77083333333334','0.3554166666666667'

'100','32', '24300490', '74.1', '0.35'

'200','4', '24300490', '61.1', '0.09'

'200','4', '24300491', '61.1', '0.09'

…when the following device-generated data is inserted into the raw data stream…

insert into test.rawClimateReadings
(rowtime, buildingNumber, roomNumber, temperature, humidity)
values
(timestamp '2016-03-15 08:01:10', 100, 32, 70.0, 0.33)
,(timestamp '2016-03-15 08:03:05', 200, 4, 60.0, 0.1)
,(timestamp '2016-03-15 08:03:20', 100, 31, 65.0, 0.33)
,(timestamp '2016-03-15 08:04:11', 100, 32, 73.0, 0.37)
,(timestamp '2016-03-15 08:05:07', 200, 4, 61.0, 0.09)
,(timestamp '2016-03-15 08:06:06', 200, 4, 61.5, 0.1)
,(timestamp '2016-03-15 08:07:05', 200, 4, 61.0, 0.11)
,(timestamp '2016-03-15 08:08:10', 100, 32, 73.5, 0.36)
,(timestamp '2016-03-15 08:08:12', 200, 4, 61.1, 0.09)
,(timestamp '2016-03-15 08:09:12', 200, 4, 61.1, 0.09)
,(timestamp '2016-03-15 08:10:13', 100, 32, 74.1, 0.35)
,(timestamp '2016-03-15 08:11:12', 200, 4, 61.1, 0.09)
;

Note that the raw data is missing some rows for the device identified by (buildingNumber 100, roomNumber 32). The rows are for minutes 08:02-08:03, 08:05-08:07, and 08:09. Although the quadratic_interpolator() doesn’t have enough data to reconstruct rows for 08:02-08:03, it does have enough data to fill in the remaining gaps.