Configuring External Stream Sources

Streaming Data sources make use of s-Server's Extensible Common Data framework. This framework allows you to read and write rows of data in a range of forms over a range of input/output formats, including:

Using the File System as a Streaming Data Source

To read streaming data over the file system, you need two pieces of information:

  • The directory in which the file resides.
  • A pattern for the file's name. Here, you enter part of the file's name, such as output, csv, or log. No quotation marks are necessary.

Foreign Stream Options for Reading from the File System

Option Description
DIRECTORY Directory in which file resides from which you are reading.
FILENAME_PATTERN Input only. Java regular expression defining which files to read.

See https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html for more information on Java regular expressions.
FILE_COMPRESSION Inflate / de-compress incoming files using the supplied codec. Currently only 'gzip' is supported (case is not significant). Any other value, or no value, means the file is not de-compressed. Default 'none'.
FILE_COMPRESSION_BUFFER Size in bytes of decompression buffer. Default '65536'.
STATIC_FILES Defaults to false. When you set this to true, you indicate that no files will be added or changed after the file reader is started. The file reader will exit after the last file is read. This lets you use the file reader as a foreign table, which is finite (as opposed to a foreign stream, which is infinite, and handles files that are continually written to).
REPEAT Defaults to 0, meaning do not repeat. Can be a positive whole number, a negative number, or 'FOREVER'.

For positive numbers, after processing all files that match FILENAME_PATTERN, start over again. Repeat for the specified number.

If negative or 'FOREVER', keep reprocessing all files that match FILENAME_PATTERN forever. You must set STATIC_FILES to true in order to use this option.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this to set options using select DIRECTORY, FILENAME_PATTERN from TEST.file_options. For more details see the topic Using the Options Query Property. Often used to supply STARTING_POSITION or STARTING_TIME (see below)
SORT_BY_TIMESTAMP 'true' or 'false'. If 'true' we search in the file for the first timestamp that matches STARTING_TIME. If false we use the STARTING_POSITION watermark.
STARTING_POSITION The watermark in the form 'filename:linenumber' from which to start reading. When used, this option is normally retrieved dynamically using an OPTIONS_QUERY. See Using Exactly Once with File as a source.
STARTING_TIME The starting time in the format defined by FILENAME_DATE_FORMAT. Only used if SORT_BY_TIMESTAMP is true. When set, extract the file timestamp from incoming file names using the FILENAME_PATTERN option. The file timestamp is assumed to be extracted as group 1 of the pattern. Skip the file if its timestamp is less than the watermark timestamp. See Using Exactly Once with File as a source.
FILENAME_DATE_FORMAT The format of the date that is embedded in incoming filenames. Only used if SORT_BY_TIMESTAMP is true, to extract the time from the filename for matching to STARTING_TIME.
WATERMARKS Name of the watermark table or a view to retrieve the WATERMARK for the source
POSTPROCESS_COMMAND Optional. Input: shell script present in $SQLSTREAM_HOME/userbin directory. The shell script receives the watermark from the sink(s) of the pipeline through the WATERMARKS query. The users can hence archive all the files before that WATERMARK.

Using HTTP as a Streaming Data Source

To read from an HTTP source, typically you will need a URL and you will need to define a polling interval. For more information see Reading from HTTP.

Foreign Stream Options for Reading Over HTTP

Option Description
URL URL for HTTP feed.
HEADER_<name_of_header> Tells HTTP reader to add a header to the request called <name_of_header>, with the option value as the value.
PARAM_<name_of_param> Tells HTTP reader to add a query parameter to the request called <name_of_param> with the option value as the value.
POLL_IN_MILLIS How often to request new data, in milliseconds.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set HEADER_xxx and PARAM_xxx options using select HEADER_ABC, PARAM_XYZ from TEST.http_options. For more details see the topic Using the Options Query Property.

Using Websockets as a Streaming Data Source

To read from a Websockets source, you will need a URL (starting with the protocol "ws://" or "wss://"), and you may need to define some headers. For more information see Reading from Websockets.

Foreign Stream Options for Reading Over WebSockets

Option Description
URL URL for web socket.
HEADER_ Tells Web Socket reader to add a header called <name_of_header> to the request.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set HEADER_xxx options using select HEADER_ABC, HEADER_XYZ from TEST.http_options. For more details see the topic Using the Options Query Property.

Using a Network Socket as a Streaming Data Source

To read from a line, CSV, XML, or JSON file over a network socket, you need to configure the socket connection. You may want to consult with whoever has set up the application with which StreamLab will connect over the socket.

Network sockets initiate a connection over TCP or UDP. Connections can be initiated by remote applications or by StreamLab itself. To tell StreamLab listen to a remote host, use the Remote Host and Remote Port fields. Connections can optionally be made using IPV6.

Name Description
Remote Host Hostname to send rows to or receive rows from. You can override this to 'LOCALHOST' to listen to only local connections, or a specific ip address, such as <168.212.226.204>.
Socket uses TCP? Whether the socket uses TCP (True) or UDP (False). Default is false (UDP).
Skip Header True or false; defaults to false. Specifies if the parser should skip the first row of input files. Usually, you would do this if the first row of data contains column headers.

Using AMQP as a Streaming Data Source

To read from a streaming data source over AMQP, you need to configure the AMQP connection. AMQP stands for Advanced Message Queuing Protocol, and is an Open Standard for Messaging Middleware. For more information, see http://www.amqp.org/about/what. You may want to consult with whoever has set up AMQP in your environment.

AMQP 0.9.1 vs 1.0

There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0 only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of exchanges, queues and bindings to exchange messages. As a result of this change, you configure StreamLab for AMQP differently for 1.0 than for up to 0.9.1

Foreign Stream Options for Reading from AMQP

AMQP 0.9.1 vs 1.0

There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0 only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of exchanges, queues and bindings to exchange messages. As a result of this change, you configure the connection URL for AMQP differently for 1.0 than for up to 0.9.1

Option Description
CONNECTION_URL Required. Connection URL for AMQP legacy server. This includes the servers hostname, user, password, port and so on. This will differ depending on whether you are using AMQP 1.0 or a legacy version.
DESTINATION This can be in the form <destination prefix>{PARITITION}.
PARTITION_EXPRESSION You should only use this if DESTINATION includes "{PARTITION}". This should be a dk.brics regular expression, such as <0-3>.
PARSER_QUEUE_SIZE Queue size for parser. Reading only. Defaults to 2. In most cases, you will not want to change this number.
ACKNOWLEDGE_MODE Optional. Acknowledgment mode that ECDA communicates to the AMQP 1.0 server. Options are AUTO, MANUAL, or NONE; defaults to AUTO. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack.

Roughly, AUTO asks the container on the AMQP server to acknowledge once message is completely delivered. MANUAL means that delivery tags and channels are provided in message headers, and NONE means no acknowledgments.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this to set one or more options using select DESTINATION from TEST.amqp_options. For more details see the topic Using the Options Query Property.

AMQP_LEGACY (AMQP protocol Version 0.9, e.g., RabbitMQ)

Format:

amqp://<username>:<password>@<clientid>/<profile>?brokerlist='tcp://<hostname>:<portNumber>'&[ <optionsList> ]`

Example:

amqp://guest:guest@clientid/default?brokerlist='tcp://localhost:5672'

AMQP10 (AMQP protocol Version 1.0) - connectionfactory.localhost:

Format:

amqp://<username>:<password>@<hostname>:<portNumber>?<optionsList>'

Example:

amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default

Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation.Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation. The site https://azure.microsoft.com/en-us/documentation/articles/service-bus-java-how-to-use-jms-api-amqp/ offers an example of formatting a connection URL.

Using Kafka as a Streaming Data Source

To read from a line, CSV, XML, or JSON file over Kafka, you need to configure the connection to Kafka. Kafka is an open-source, real-time publish-subscribe messaging framework. See http://kafka.apache.org/ for more details. You may want to consult with whoever has set up the Kafka messaging system in your environment.

You can configure the Kafka source to distribute data across multiple native streams for load balancing. See Using the Distributed Source Option below for more details.

To connect with Kafka, you need two pieces of information:

  • The name and port of the Kafka broker (this defaults to localhost:9092, but the source will not work if a Kafka broker is not working at this location).
  • The Kafka topic name from which you are reading.

The other configuration details below help manage the starting point for reading Kafka topics as well as the amount of data fed to StreamLab.

Using Provenance Columns

In reading from Kafka, you can declare provenance columns. These return metadata for the Kafka topic from which you are reading.

These are as follows:

Data Type Name in s-Server 6.0.0 Name in s-Server 6.0.1+ Kafka version Description
VARCHAR(256) KAFKA_TOPIC SQLSTREAM_PROV_KAFKA_TOPIC Kafka v.0.10.2 or later Returns name of Kafka topic.
INTEGER KAFKA_PARTITION SQLSTREAM_PROV_KAFKA_PARTITION Kafka v.0.10.2 or later Returns value of current Kafka partition.
INTEGER PARTITION SQLSTREAM_PROV_KAFKA_PARTITION Kafka v.0.8.2 Returns value of current Kafka partition.
BIGINT KAFKA_OFFSET SQLSTREAM_PROV_KAFKA_OFFSET Kafka v.0.10.2 or later Returns value of current Kafka offset.
BIGINT OFFSET SQLSTREAM_PROV_KAFKA_OFFSET Kafka v.0.8.2 Returns value of current Kafka offset.
TIMESTAMP KAFKA_TIMESTAMP SQLSTREAM_PROV_KAFKA_TIMESTAMP Kafka v.0.10.2 or later Returns current Kafka_timestamp.
VARBINARY(256) KAFKA_KEY SQLSTREAM_PROV_KAFKA_KEY Kafka v.0.10.2 or later Returns key value for message.
BINARY(32) PAYLOAD_PREFIX SQLSTREAM_PROV_KAFKA_PAYLOAD_PREFIX Kafka v.0.10.2 or later Returns key value for message.
VARCHAR(1000) N/A SQLSTREAM_PROV_KAFKA_HEADERS Kafka v.0.10.2 or later Returns any headers for Kafka message topics as key-value pairs in serialized text format. See Reading and Parsing Headers.

Foreign Stream Options for Reading from Kafka

Some of the following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation/#consumerconfigs. Where appropriate, information in this table is drawn from this page.

Options shown in lower case are case sensitive; they must be lowercase and double-quoted. Options shown in upper case may be entered either unquoted in any case, or double-quoted in upper case. So TOPIC, Topic and topic are all valid; "TOPIC" is valid but "topic" is not.

Some options may not apply to all versions of the Kafka plugin. Each option below is marked as 'Kafka10', 'Legacy' or 'All':

  • 'Kafka10' - the option only applies to the Kafka10 plugin and KAFKA10_SERVER
  • 'Legacy' - the option only applies to the legacy adapter for Kafka up to 0.8, and KAFKA_SERVER
  • 'All' - the option applies to both plugin versions.
Option Adapter version Description
TOPIC All Required. Kafka Topic. You can use a regular expression as a "topic wild card". (This is not supported by legacy versions of the adapter.)
SEED_BROKERS All A comma separated list of broker identifiers in the format _<broker_hostname>:<port>. For legacy adapter, this is a comma separated list of broker hosts. Defaults to "localhost".
PARTITION All Partition number to read from. If reading from a kafka topic with multiple partitions and PARTITION is omitted or blank all partitions will be read from. You can specify a single partition with PARTITION or a range with PARTITION -(Range needs two partitions - eg 1-3 - and is inclusive.) Note: Partition numbers are 0 based.
PORT Legacy Deprecated since 6.0.1 Port for Kafka seed brokers
STARTING_TIME All Either EARLIEST, LATEST, or a timestamp in the format 'yyyy-MM-dd HH:mm:ss.SSS', such as '2018-02-01 22:23:45:892'. Default LATEST.

When STARTING_TIME is a timestamp, the Kafka adapter "seeks" to offsets within all topic partitions where the message timestamp for all subsequent messages is greater than or equal to the specified STARTING_TIME. Requires Kafka v0.10.2 or later.

For the legacy Kafka adapter, options are EARLIEST or LATEST.
STARTING_OFFSET All Where to start reading from (default is -1, the oldest offset) as a long int representing the physical offset within a partition
INDEX_TOPIC_NAME Kafka10 This option specifies the name of the index topic to be used for mapping message offsets to timestamps. For more details, refer to Building and Using an Index Topic in the Reading from Kafka topic of the s-Server Integration Guide. Index topic may be created on a separate Kafka cluster.
INDEX_TOPIC_SEED_BROKERS Kafka10 The broker(s) where the index topic referenced by INDEX_TOPIC_NAME is managed
MAX_POLL_RECORDS Kafka10 Maximum number of records to be polled (fetched) through the new KafkaConsumer.poll() API call. For best throughput during replay, this needs to be set such that you get a "page" full (1MB is the default) of kafka messages from each partition of the topic(s). It can be roughly calculated as: (numPartitions * 1 MB) / typicalMessageSize
PARTITION_OFFSET_QUERY Kafka10 This is a SQL query that fetches starting offsets, Kafka partition and Kafka topic for all topic partitions. For both the legacy Kafka adapter and the Kafka10 adapter, you can use a query along the following lines:

PARTITION_OFFSET_QUERY 'SELECT "KAFKA_PARTITION","KAFKA_TOPIC","KAFKA_OFFSET" FROM from stored_offsets'

PARTITION should be of type INTEGER. OFFSET should be of type BIGINT. Any partition for which the query does not return anything will either use STARTING_OFFSET or STARTING_TIME to determine where to start.
CLIENT_ID All For the Kafka10 adapter, it is vital to understand that this is the consumer group id (so is used to set consumer group property "group.id"). Client key for Yammer metrics. CLIENT_ID defaults to client{TOPIC} or CLIENT{TOPIC}{PARTITION} if PARTITION is specified. CLIENT_ID and METRICS_PER_PARTITION affect Kafka Yammer metrics reporting. CLIENT_ID does not apply unless METRICS_PER_PARTITION is set to true. See http://docs.confluent.io/1.0/kafka/monitoring.html for more information on Kafka Yammer metrics.
OPTIONS_QUERY All Lets you query a table to update adapter options at runtime. You can use this, for example, to set the STARTING_OFFSET option from a table that contains the last offset, as in select lastOffset as STARTING_OFFSET from TEST.committedOffset;
SCHEMA_ID All If set >= 0, the reader will treat the first byte of data as magic byte, and the next 4 bytes as a schema ID, and will discard any record where the first byte does not match the SCHEMA_ID. Default -1.
PARSER_PAYLOAD_OFFSET Kafka10 The number of bytes to skip at the start of the record to get the value - in case the data is offset (by a schema ID or some other fixed-length data). Default 0.
"kafka.consumer.config" Kafka10 Lets you specify the name of a properties file that contains a set of Kafka consumer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the consumer will invoke. Kafka offers a wide range of config properties. For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#newconsumerconfigs Note: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "SEED_BROKERS". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime.
"isolation.level" Kafka10 Lets you specify whether s-Server should read all Kafka messages or only committed messages. Options are read_uncommitted, or read_committed This option lets you account for transactions, a Kafka 0.11.0 feature whereby applications can write to multiple topics and partitions atomically. To use atomic commitments, you need to configure the Kafka adapter to only read committed data--this is, read_committed.
METRICS_PER_PARTITION Legacy True or False. If METRICS_PER_PARTITION is false, then CLIENT_ID will be used as the client key for all yammer metrics reporting for that adapter. If METRICS_PER_PARTITION is true, then the actual partition number will be appended to each to each client_id (and finer grained metrics will be reported).
FETCH_SIZE Legacy Fetch size. Defaults to 1000000.

Using the Distributed Source Option

StreamLab reads data from Kafka using a Kafka consumer. Kafka consumers subscribe to Kafka topics, receiving messages from these topics. Kafka consumers balance loads among partitions in the topic.

By default, StreamLab routes all data read from a Kafka source into one native stream. While behind the scenes, StreamLab invokes multiple Kafka consumers, all of these consumers are rolled into one native stream. This means that all data from the Kafka topic ends up in one s-Server stream, which StreamLab then uses as a source for a pipeline.

In many cases, this default option may work fine with your StreamLab application. In some cases, though, you may want to have multiple, duplicate pipelines running on one Kafka foreign stream. This option is called Distributed.

Once you distribute the Kafka topic across multiple native streams, you can choose to use duplicate pipelines. These run in parallel. You do not need to configure the additional pipelines; these are simply exact duplicates of the first. If you set up a sink for the pipeline, both pipelines will be routed to the same sink.

In a Pipeline Guide that uses a distributed Kafka source, an option to increase the number of duplicate pipelines will appear to the left of the Change Guide Name or Source button in the upper right-hand corner of the Guide window:

Note on Stateful Properties and Distributed Sources

The distributed pipelines option is intended for advanced SQL developers who know how to run stateful aggregations in a distributed manner. A stateful distributed pipeline can use Streamlab templates that involve stateful operations (such as aggregation) as long as the application developer takes the responsibility to ensure the proper distribution of the aggregation state. You can achieve better performance and throughput by carefully distributing, for example, an aggregation state across duplicated pipelines.

The distributed option works seamlessly for stateless operations, such as CAST or Rename. If you are doing aggregations or other operations that involve making calculations across data from the entire Kafka topic, you will need to either:

  1. Route results of duplicated pipelines to a native stream sink and then create a new pipeline using this sink as a source. You can then perform aggregations in the new pipeline.
  2. Perform partial aggregations through distributed pipelines and route partially aggregated results into a native stream and add roll-up aggregation steps in a new pipeline.

Using Amazon Kinesis as a Streaming Data Source

To read from a streaming data source over Amazon Kinesis, you need to configure the Amazon Kinesis connection.

You need to have an AWS profile set up, and a configuration file stored on your system, in order to read from or write to Kinesis. See Setting Up an AWS Profile Path in the SQLstream Integration Guide .

Option Description
AWS Profile Name Optional. Profile name to use within credentials file. Defaults to default.
Initial Position LATEST for latest or TRIM_HORIZON for earliest. Defaults to LATEST.
Socket Timeout (defaults to -1) if set will override Kinesis socket timeout

Using MQTT as a Streaming Data Source

To read data from or write data to MQTT, you need to configure the connection to MQTT. StreamLab uses this information to implement an MQTT client that reads data into the foreign stream. Minimum options required are TOPIC and CONNECTION_URL.

Foreign Stream Options for Reading from MQTT

Option Description
CONNECTION_URL 'tcp://127.0.0.1:1883',
TOPIC MQTT topic. UTF-8 string that the MQTT broker uses to filter messages for each connected client.
QOS Defines the guarantee of delivery for a specific message. Either at most once (0), at least once (1), exactly once (2). Defaults to 1. For more information on QOS, see https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/
CLIENT_ID s-Server implements an MQTT client to connect to the MQTT server. This setting provides a MQTT ClientID for this client. The MQTT broker uses the ClientID to identify the client and its current state. As a result, if used, you should use a distinct CLIENT_ID for each foreign stream defined for MQTT. Defaults to randomly generated.
USERNAME Optional. User name for MQTT connection. Note: s-Server sends user names and passwords as plain text.
PASSWORD Optional. Password for MQTT connection. Note: s-Server sends user names and passwords as plain text.
KEEP_ALIVE_INTERVAL Optional. Interval in seconds sent to MQTT broker when s-Server establishes a connection. Specifies the longest time period of time that broker and client persist without sending a message. Defaults to 60.
CONNECTION_TIMEOUT Optional. Defaults to 30.
OPTIONS_QUERY Optional. Lets you query a table to update one or more adapter options at runtime. You can use this to set options using select USERNAME, PASSWORD from TEST.mqtt_options. For more details see the topic Using the Options Query Property.

Using Teradata Listener as a Streaming Data Source

Reading from Teradata Listener uses a Websockets connection. You will need to provide a URL (starting with the protocol "ws://" or "wss://"), and an authorization string.

For more information see Reading from a Teradata Listener Broadcast Stream.