Integrating Kafka

Using s-Server, you can read from and write to Kafka. To read and write from local locations, you configure and launch the adapter in SQL, using either server or foreign stream/table options. To read and write from remote locations, you configure such options using a properties file and launch the Extensible Common Data agent at the command line.

Using Kafka, you can perform exactly once semantics. For more detail, see the topic exactly once in this guide.

This topic contains the following subtopics:

Reading from Kafka

s-Server's Kafka adapter instantiates a Kafka consumer, configured using either SQL or a properties file. To read from local locations, you configure and launch the adapter in SQL, using either server or foreign stream/table options. Data can be in CSV, Avro, XML, JSON, or BSON, or other formats. You specify a parser as part of configuration options. See Reading from Kafka Using SQL below. To read from remote locations, you configure such options using a properties file and launch the agent at the command line. See Reading from Kafka Using the ECD Agent below. See the overview at Reading from Other Sources for more details.

The Kafka_10 adapter is based on the Kafka client API v.0.10.0.X. The adapter leverages features available on Kafka brokers up to v.10.0.X. This API works with Kafka v1.0 and with the Confluent distribution from at least v3.3 -v4.0.

As of Kafka 0.10, every Kafka message has a timestamp field, specifying the time when the message was produced. The new adapter uses this feature to let you start processing at a given timestamp. Basically, the adapter identifies all topic partitions where the message timestamp for all subsequent messages is greater than or equal to the specified STARTING_TIME.

Once you set up a foreign stream for Kafka in s-Server, each query on the stream reads from a Kafka topic via a Kafka consumer. If multiple queries are run on the same stream, and the Kafka topic has partitions, Kafka will distribute data among these queries according to partition.

In many cases, you may need to aggregate multiple queries in order to achieve the right results. You can also specify a partition in foreign stream options. See Using the Kafka Adapter to Process Partitioned Streams below.

The s-Server trace log includes information on readers' and parsers' progress. See Periodic Parser Statistics Logging in the Administering Guavus SQLstream guide. These errors are also logged in the Global Error Stream.

Reading from Kafka Using SQL

To read from Kafka, you need to create a foreign stream in SQL that references a prebuilt server object called KAFKA_SERVER. The foreign stream's definition contains connection information for the Kafka server. In Kafka's case, you can also define provenance columns that let you pass metadata on the Kafka source into s-Server. See Using Provenance Columns below. For more details on creating a foreign stream, see the CREATE FOREIGN STREAM topic in the SQLstream Streaming SQL Reference Guide.

You will also need to specify a parser for the foreign stream, such as 'CSV'. Specifying "parser" as a foreign stream option tells s-Server that this foreign stream reads data. See Parsers for Reading in this guide for more details.

Streams, like most SQL objects (but unlike data wrappers and servers), should be created within a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named KafkaStream.

CREATE OR REPLACE SCHEMA KafkaSchema`;
SET SCHEMA 'KafkaSchema';

CREATE OR REPLACE FOREIGN STREAM KafkaReaderStream
("id" BIGINT,
  "reported_at" TIMESTAMP,
  "lat" DOUBLE,
  "lon" DOUBLE,
  "speed" INTEGER,
  "bearing" INTEGER,
  "driver_no" BIGINT)
SERVER KAFKA10_SERVER
OPTIONS
(topic 'mytopic',
"SEED_BROKERS" 'localhost:9092',
"STARTING_TIME" 'LATEST',
"PARSER" 'JSON',
"ROW_PATH" '$',
character_encoding 'UTF-8'
)
;

Input

As with all ECDA adapters, data is only moved from Kafka to s-Server once you execute a SELECT statement on the foreign stream that you have defined for the adapter. (Often, you will define a SELECT statement on the foreign stream as part of a pump, in which case data moves once you start the pump.) For example, the following code block copies the values of the columns "speed" and "driver_no" into s-Server using the foreign stream defined above, KafkaSourceStream.

SELECT STREAM "speed", "driver_no" from KafkaSource.KafkaReaderStream;

Using Provenance Columns

In reading from Kafka, you can declare provenance columns. These return metadata for the Kafka topic from which you are reading.

These are as follows:

Data Type Name in s-Server 6.0.0 Name in s-Server 6.0.1+ Kafka version Description
VARCHAR(256) KAFKA_TOPIC SQLSTREAM_PROV_KAFKA_TOPIC Kafka v.0.10.2 or later Returns name of Kafka topic.
INTEGER KAFKA_PARTITION SQLSTREAM_PROV_KAFKA_PARTITION Kafka v.0.10.2 or later Returns value of current Kafka partition.
INTEGER PARTITION SQLSTREAM_PROV_KAFKA_PARTITION Kafka v.0.8.2 Returns value of current Kafka partition.
BIGINT KAFKA_OFFSET SQLSTREAM_PROV_KAFKA_OFFSET Kafka v.0.10.2 or later Returns value of current Kafka offset.
BIGINT OFFSET SQLSTREAM_PROV_KAFKA_OFFSET Kafka v.0.8.2 Returns value of current Kafka offset.
TIMESTAMP KAFKA_TIMESTAMP SQLSTREAM_PROV_KAFKA_TIMESTAMP Kafka v.0.10.2 or later Returns current Kafka_timestamp.
VARBINARY(256) KAFKA_KEY SQLSTREAM_PROV_KAFKA_KEY Kafka v.0.10.2 or later Returns key value for message.
BINARY(32) PAYLOAD_PREFIX SQLSTREAM_PROV_KAFKA_PAYLOAD_PREFIX Kafka v.0.10.2 or later Returns key value for message.
VARCHAR(1000) N/A SQLSTREAM_PROV_KAFKA_HEADERS Kafka v.0.10.2 or later Returns any headers for Kafka message topics as key-value pairs in serialized text format. See Reading and Parsing Headers.

Foreign Stream Options for Reading from Kafka

Some of the following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation/#consumerconfigs. Where appropriate, information in this table is drawn from this page.

Options shown in lower case are case sensitive; they must be lowercase and double-quoted. Options shown in upper case may be entered either unquoted in any case, or double-quoted in upper case. So TOPIC, Topic and topic are all valid; "TOPIC" is valid but "topic" is not.

Some options may not apply to all versions of the Kafka plugin. Each option below is marked as 'Kafka10', 'Legacy' or 'All':

  • 'Kafka10' - the option only applies to the Kafka10 plugin and KAFKA10_SERVER
  • 'Legacy' - the option only applies to the legacy adapter for Kafka up to 0.8, and KAFKA_SERVER
  • 'All' - the option applies to both plugin versions.
Option Adapter version Description
TOPIC All Required. Kafka Topic. You can use a regular expression as a "topic wild card". (This is not supported by legacy versions of the adapter.)
SEED_BROKERS All A comma separated list of broker identifiers in the format _<broker_hostname>:<port>. For legacy adapter, this is a comma separated list of broker hosts. Defaults to "localhost".
PARTITION All Partition number to read from. If reading from a kafka topic with multiple partitions and PARTITION is omitted or blank all partitions will be read from. You can specify a single partition with PARTITION or a range with PARTITION -(Range needs two partitions - eg 1-3 - and is inclusive.) Note: Partition numbers are 0 based.
PORT Legacy Deprecated since 6.0.1 Port for Kafka seed brokers
STARTING_TIME All Either EARLIEST, LATEST, or a timestamp in the format 'yyyy-MM-dd HH:mm:ss.SSS', such as '2018-02-01 22:23:45:892'. Default LATEST.

When STARTING_TIME is a timestamp, the Kafka adapter "seeks" to offsets within all topic partitions where the message timestamp for all subsequent messages is greater than or equal to the specified STARTING_TIME. Requires Kafka v0.10.2 or later.

For the legacy Kafka adapter, options are EARLIEST or LATEST.
STARTING_OFFSET All Where to start reading from (default is -1, the oldest offset) as a long int representing the physical offset within a partition
INDEX_TOPIC_NAME Kafka10 This option specifies the name of the index topic to be used for mapping message offsets to timestamps. For more details, refer to Building and Using an Index Topic in the Reading from Kafka topic of the s-Server Integration Guide. Index topic may be created on a separate Kafka cluster.
INDEX_TOPIC_SEED_BROKERS Kafka10 The broker(s) where the index topic referenced by INDEX_TOPIC_NAME is managed
MAX_POLL_RECORDS Kafka10 Maximum number of records to be polled (fetched) through the new KafkaConsumer.poll() API call. For best throughput during replay, this needs to be set such that you get a "page" full (1MB is the default) of kafka messages from each partition of the topic(s). It can be roughly calculated as: (numPartitions * 1 MB) / typicalMessageSize
PARTITION_OFFSET_QUERY Kafka10 This is a SQL query that fetches starting offsets, Kafka partition and Kafka topic for all topic partitions. For both the legacy Kafka adapter and the Kafka10 adapter, you can use a query along the following lines:

PARTITION_OFFSET_QUERY 'SELECT "KAFKA_PARTITION","KAFKA_TOPIC","KAFKA_OFFSET" FROM from stored_offsets'

PARTITION should be of type INTEGER. OFFSET should be of type BIGINT. Any partition for which the query does not return anything will either use STARTING_OFFSET or STARTING_TIME to determine where to start.
CLIENT_ID All For the Kafka10 adapter, it is vital to understand that this is the consumer group id (so is used to set consumer group property "group.id"). Client key for Yammer metrics. CLIENT_ID defaults to client{TOPIC} or CLIENT{TOPIC}{PARTITION} if PARTITION is specified. CLIENT_ID and METRICS_PER_PARTITION affect Kafka Yammer metrics reporting. CLIENT_ID does not apply unless METRICS_PER_PARTITION is set to true. See http://docs.confluent.io/1.0/kafka/monitoring.html for more information on Kafka Yammer metrics.
OPTIONS_QUERY All Lets you query a table to update adapter options at runtime. You can use this, for example, to set the STARTING_OFFSET option from a table that contains the last offset, as in select lastOffset as STARTING_OFFSET from TEST.committedOffset;
SCHEMA_ID All If set >= 0, the reader will treat the first byte of data as magic byte, and the next 4 bytes as a schema ID, and will discard any record where the first byte does not match the SCHEMA_ID. Default -1.
PARSER_PAYLOAD_OFFSET Kafka10 The number of bytes to skip at the start of the record to get the value - in case the data is offset (by a schema ID or some other fixed-length data). Default 0.
"kafka.consumer.config" Kafka10 Lets you specify the name of a properties file that contains a set of Kafka consumer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the consumer will invoke. Kafka offers a wide range of config properties. For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#newconsumerconfigs Note: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "SEED_BROKERS". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime.
"isolation.level" Kafka10 Lets you specify whether s-Server should read all Kafka messages or only committed messages. Options are read_uncommitted, or read_committed This option lets you account for transactions, a Kafka 0.11.0 feature whereby applications can write to multiple topics and partitions atomically. To use atomic commitments, you need to configure the Kafka adapter to only read committed data--this is, read_committed.
METRICS_PER_PARTITION Legacy True or False. If METRICS_PER_PARTITION is false, then CLIENT_ID will be used as the client key for all yammer metrics reporting for that adapter. If METRICS_PER_PARTITION is true, then the actual partition number will be appended to each to each client_id (and finer grained metrics will be reported).
FETCH_SIZE Legacy Fetch size. Defaults to 1000000.

Additional Kafka Options for Use with JNDI Properties

You can also enable security for the Kakfa10 plugin by configuring a truststore, keystore, and password for the Kafka broker used by s-Server. Because these options involve storing passwords, you should implement these using properties files, so the passwords are not visible in the SQL.

The SQL/MED framework used by s-Server supports loading options through JNDI .properties files. SQLstream s-Server looks for these .properties files in $SQLSTREAM_HOME/plugin/jndi/. See Using Property Files. You can also use the "kafka.consumer.config" or "kafka.producer.config" foreign stream options to reference a Kafka properties file that contains these or other kafka consumer / producer properties.

So we recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using one of the following .properties files:

  • $SQLSTREAM_HOME/plugin/jndi/<foreign_server_name>.properties (Applies to all foreign streams that use this foreign server definition).
  • $SQLSTREAM_HOME/plugin/jndi/LOCALDB.<schema_name>.<foreign_stream_name>.properties (Applies only to the specified foreign stream).
  • A file referenced by "kafka.consumer.config" or "kafka.producer.config" (Applies to all foreign streams that reference the named file).
Option Name Adapter version Description
"ssl.truststore.location" Kafka10 The location of the trust store file.
"ssl.truststore.password" Kafka10 The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.
"ssl.keystore.location" Kafka10 The location of the key store file. This is optional for client and can be used for two-way authentication for client.
"ssl.keystore.password" Kafka10 The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
"ssl.key.password" Kafka10 The password of the private key in the key store file. This is optional for client.

Understanding Shared Subscriptions for Kafka

On the Kafka side, the KafkaConsumer API uses the property group.id as the consumer group id to support shared subscriptions among multiple consumers. Kafka uses group.id to identify consumers belonging to the same consumer group. Each group's consumers divide the topic partitions among themselves, such that each partition is only consumed by a single consumer.

All consumer instances with the same value for group.id share a single logical Kafka subscription. That is, each consumer instance reads messages from a subset of partitions of subscribed topic(s).

In order to have s-Server read from the same logical Kafka subscription, you need to configure how s-Server handles group.id. In s-Server 6.0.0, s-Server uses the value for the option CLIENT_ID in a foreign stream as the value for group.id for each KafkaConsumer instance.

(new in s-Server version 6.0.1) As of s-Server v6.0.1, s-Server no longer uses CLIENT_ID as group.id for the Kafka consumer API.

Instead, s-Server determines the value for group.id in the following way:

  • If you start a SELECT query on the foreign stream with a pump, s-Server sets group.id to <fully_qualified_pump_name> for the pump that starts the query. Note: If your pump name ends with digits, s-Server trims all trailing digits are trimmed from <fully_qualified_pump_name>. Using trailing digits allows you to run multiple pumps in the same schema that all read from the same Kafka topic.
  • If you submit a SELECT query on the foreign stream through sqlline, WebAgent or JDBC, a unique group.id will be generated when the query begins executing.

Since group.id is set to sessionID at runtime, queries running on the foreign stream will be considered to be a distributed query instance if they have the same sessionID.

Using group.id with Pumps in s-Server 6.0.1

When you read from Kafka using a pump, s-Server assigns group.id based the name of the pump and the name of the schema in which the pump is running (<fully_qualified_pump_name>). For example, for a pump called "myschema"."my_kafka_source_pump" s-Server sets sessionID = 'LOCALDB.myschema.my_kafka_source_pump_Pump'.

When using pumps, in order to distribute data across queries running on multiple instances of s-Server, you need to create schemas with the same names (such as "myschema") that contain pumps with the same names (such as "my_kafka_source_pump").

If you create the schema myschema on each node in a federation of s-Servers, the pump, my_kafka_source_pump, starting on each of those nodes will get the same sessionID when the pump is started (at runtime) and each one of those pumps will be processing data from a subset of topic partitions.

You can append numbers to pump names. These will be chopped off at run time. For example, pumps with names such as "my_kafka_source_pump1", "my_kafka_source_pump2", ..., "my_kafka_source_pumpN" will generate the same group.id. By using trailing numbers, you can create distributed pipelines on a single node and a single schema. In other words, you can run multiple pumps in the same schema with appended numbers, and these pumps will be assigned the same sessionID. (This is how StreamLab currently handles pumps.)

Using group.id with Other Queries in s-Server 6.0.1

If the query is not running as a part of a pump, then s-Server assigns a unique sessionID at runtime. That is, queries running thru sqllineClient and WebAgent are considered independent queries. These create and use their own group.id every time a query is submitted.

Setting group.id Explicitly

You can also explicitly set group.id using the foreign stream option group.id. This ensures that all foreign stream queries (which are processed as Kafka subscriptions) against all topics are synchronized. This ensures, for example, that stream-stream joins advance all streams' clocks in a synchronized manner.

Reading from Kafka Using the ECD Agent

You can use the ECD agent to read files from remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options as the ones you format in SQL, but these options need to be formatted in a properties file along the lines of the following.

TOPIC=mytopic
SEED_BROKERS=localhost:9092
STARTING_TIME=LATEST
max_poll_records '2MB'
PARSER=JSON
ROW_PATH=$
CHARACTER_ENCODING=utf-8
SCHEMA_NAME="KafkaSource"
TABLE_NAME="KafkaSourceStream"
ROWTYPE=RECORDTYPE(BIGINT NOT NULL kafka_offset, TIMESTAMP NOT NULL ts, INT NOT NULL KAFKA_PARTITION, CHAR(10) NOT NULL zipcode, DOUBLE NOT NULL transactionTotal, INT NOT NULL transactionCount)

Writing to Kafka

The Kafka ECDA adapter writes batches of data to a Kafka topic. In order to write to a Kafka topic, you must first define a server object for the Kafka server with information on its seed broker(s) and topic (at minimum). s-Server writes to Kafka in data formatted as CSV, JSON, XML, or BSON.

The Kafka 10 adapter works with Kafka 0.10. It uses atomic commitments. Using it, you can perform exactly once semantics between s-Server and Kafka. See the topic exactly once semantics.

For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For agents, you configure such options using a properties file and launch the agent at the command line. The topic CREATE FOREIGN STREAM in the Streaming SQL Reference Guide has a complete list of options for the ECD adapter. To actually write data, you INSERT into the foreign stream.

Note: Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Writing to Kafka Using SQL

Like all streams, foreign streams must be defined within schemas. The following code first creates a schema called KafkaSchema then creates a foreign stream called KafkaWriterStream with the predefined server KAFKA10_SERVER as a server option. To transfer data into Kafka using this stream, you will need to INSERT into it. This step simply sets up the stream, with named columns and Kafka-specific options. (These options are discussed below.)

You can pass many of the options described at http://kafka.apache.org/documentation/#brokerconfigs, such as compression.type, to the Kafka writer. See Foreign Stream Options for Writing to Kafka below.

The example below uses a source stream created using a demonstration script located at $SQLSTREAM_HOME/demo/data/buses/StreamXmlBusData.sh.

Click here to see the code used to create this source stream

Defining a Foreign Stream for Kafka

Here is an example of the SQL used to define a foreign stream for the Kafka adapter. The code first creates and sets a schema. If you have run the code to create the source stream, you should omit the lines that create and set the schema.

--omit the two lines below if you have run the code to create the source stream
CREATE OR REPLACE SCHEMA KafkaSchema;
SET SCHEMA 'KafkaSchema';

CREATE OR REPLACE FOREIGN STREAM KafkaSchema.buses_kafka_egress
(
    "id" BIGINT,
    "reported_at" VARCHAR(32),
    "speed" INTEGER,
    "driver_no" BIGINT,
    "prescribed" BOOLEAN,
    "gps" VARCHAR(128),
    "highway" VARCHAR(8)
)
--this is the prebuilt server for Kafka10
SERVER KAFKA10_SERVER
OPTIONS (
    "FORMATTER" 'JSON',
    "FORMATTER_INCLUDE_ROWTIME" 'false',
    "bootstrap.servers" 'localhost:9092',
    "TOPIC" 'buses'
);

CREATE OR REPLACE PUMP KafkaSchema.source_to_buses_pump STOPPED AS
INSERT INTO KafkaSchema.buses_kafka_egress
SELECT STREAM  * FROM KafkaSchema.buses;

To start writing data, use the following code:

ALTER PUMP KafkaSchema.* START;

To test the stream, use the following code:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic buses --from-beginning

Note: This code assumes that you have added KAFKA_HOME to your path.

Using EGRESS Columns

These special egress columns are supported by the KAFKA10 plugin. Data arriving in these columns is not included in the outgoing Kafka message payload; instead they are used to set other message variables.

Name Data Type Description Name in s-Server 6.0.0
SQLSTREAM_EGRESS_KAFKA_PARTITION INTEGER This column is used to determine to which partition the message will be written.

NOTE: SQLstream uses this approach and does not support use of the Kafka producer property "partitioner.class"
KAFKA_PARTITION
SQLSTREAM_EGRESS_KAFKA_TIMESTAMP TIMESTAMP This column is used to set the Kafka message timestamp KAFKA_TIMESTAMP
SQLSTREAM_EGRESS_KAFKA_KEY VARBINARY(256) This column is used as the message key KAFKA_KEY
SQLSTREAM_EGRESS_WATERMARK VARCHAR(256) This column is used as a watermark, and is saved when committing to the sink Kafka topic. The developer should set it to whatever watermark format is supported by the source plugin. The format of the stored watermark string is as follows:
<CommitRowtimeInMillisFromEpoch>,<watermark_column_value>.
-

Foreign Stream Options for Writing to Kafka

Some of the following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation/#producerconfigs. Where appropriate, information in this table is drawn from this page.

Options shown in lower case are case sensitive; they must be lowercase and double-quoted. Options shown in upper case may be entered either unquoted in any case, or double-quoted in upper case. So TOPIC, Topic and topic are all valid; "TOPIC" is valid but "topic" is not.

Some options may not apply to all versions of the Kafka plugin. Each option below is marked as 'Kafka10', 'Legacy' or 'All':

  • 'Kafka10' - the option only applies to the Kafka10 plugin and KAFKA10_SERVER
  • 'Legacy' - the option only applies to the legacy adapter for Kafka up to 0.8, and KAFKA_SERVER
  • 'All' - the option applies to both plugin versions.
Option Name Adapter version Description
TOPIC All Kafka topic
"bootstrap.servers" Kafka10 hostname:port[,hostname:port] of the Kafka broker(s). Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers.
SEED_BROKERS Legacy A comma separated list of broker identifiers in the format "<broker_host_name>:<port>". Defaults to "localhost".
"metadata.broker.list" Legacy hostname:port of the Kafka broker. Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers.
"key.serializer" Kafka10 Names a serializer class for keys. If no class is given, Kafka uses value.serializer.
"key.serializer.class" Legacy Names a serializer class for keys. If no class is given, Kafka uses serializer.class.
"value.serializer" Kafka10 Names a serializer class for values.
"serializer.class" Legacy Names a serializer class for values. The default encoder takes a byte[] and returns the same byte[].
"partitioner.class" All Fully qualified Java classname of Kafka partitioner. Defaults to com.sqlstream.aspen.namespace.kafka.KafkaOutputSink$RoundRobinPartitioner
"compression.type" Kafka10 If desired, specifies the final compression type for a given topic. Defaults to 'none'. Possible values: 'snappy', 'gzip'
"retry.backoff.ms" All Producers refresh metadata to see if a new leader has been elected. This option specifies the amount of time to wait before refreshing.
"request.timeout.ms" All When request.required.acks is enabled, this lets you specify how long the broker should try to bundle the specified number of messages before sending back an error to the client.
"send.buffer.bytes" All Socket write buffer size.
"client.id" All Using this option, you can specify a string to help you identify the application making calls to the Kafka server.
"transactional.id" Kafka10 This is the transaction ID used by the KafkaWriter instance for the given foreign stream. Each foreign stream should use a unique transactional.id to publish messages to the topic using transactions. Transactions are used only if Kafka brokers are v0.11.2. These support transactional semantics.

NOTE: You need to create a separate foreign stream definition for each pump that inserts (publishes) messages to a given topic. Each of these foreign streams needs to use a unique "transactional.id" for itself. The foreign stream option "pump.name", defined below, needs to match the name of the pump that inserts into the foreign stream.

(new in s-Server version 6.0.1) If you set transactional.id = 'auto', when a pump begins running, s-Server automatically sets transactional.id to '<fully_qualified_pump_name>_Pump', where <fully_qualified_pump_name> is the name of the pump that instantiates the sink.
"pump.name" Kafka10 Deprecated since version 6.0.1.

Fully qualified name of the pump that will process rows for this foreign stream. You must set transactional.id in order to use this option. s-Server uses this setting to determine the mode in which the pump instance itself is running (Leader or Follower) when you configure the Kafka adapter to run in High Availability (HA) mode. The pump.name needs to be fully qualified pump name of the format:<catalog_name>.<schema_name>.<pump_name>

For example:'LOCALDB.mySchema.ProcessedOrdersPump'
"linger.ms" All In cases where batch.size has not been reached, number of milliseconds that the Kafka producer will wait before batching sends . Defaults to '100', in milliseconds)
"batch.size" All Number of messages sent as a batch. Defaults to '1000'
"kafka.producer.config" Kafka10 Lets you specify the name of a properties file that contains a set of Kafka producer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the producer will invoke. Kafka offers a wide range of config properties.

For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#producerconfigs.

NOTE: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "bootstrap.servers". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime.
"security.protocol" Kafka10 Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
"transaction.timeout.ms" Kafka10 The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.
HA_ROLLOVER_TIMEOUT Kafka10 Time in milliseconds. Defaults to 5000. You must set transactional.id and pump.name in order to use this option. When the pump is configured to run in High Availability mode, and the pump is running as a "Follower", it waits for this amount of time for a lack of commits from the "Leader".

After the timeout, the "Follower" attempts to takeover as the "Leader". There may be multiple follower instances running in a cluster. Only one of these followers succeeds to be designated as the new "Leader". All other pump instances using the same transactional.id continue "following". If the earlier "Leader" was still alive when one of the followers took over as the new "Leader", the earlier leader demotes itself as the "Follower" if possible.
COMMIT_METADATA_COLUMN_NAME Kafka10 The name of the column that contains watermark values - default SQLSTREAM_EGRESS_WATERMARK.
SQLSTREAM_EGRESS_WATERMARK_SORT_FUNCTION Kafka10 If the SQLSTREAM_EGRESS_WATERMARK egress column is being used, the value chosen depends on the value of this option. Possible values are:
  • MAX - use the lexical maximum value of all watermarks in the commit batch
  • NONE - (the default) - use the watermark value from the last row in the commit batch
HEADERS_COLUMNS Kafka10 Deprecated - please use HEADERS_COLUMN_LIST
HEADERS_COLUMN_LIST Kafka10 Comma-separated list of foreign stream columns that will be mapped as outgoing headers, rather than to the record value itself. See Writing Headers to Kafka based on Column Data
OPTIONS_QUERY All Lets you query a table to update adapter options at runtime. You can use this, for example, to set the "bootstraop.servers" option from a table , as in select lastOffset as "bootstrap.servers" from TEST.kafka_write_options;
POLL_TIMEOUT Legacy This option specifies the timeout value in milliseconds to be passed as a parameter to the KafkaConsumer.poll() API call. The default is 100ms.
PORT Legacy Deprecated option. From s-Server 6.0.0, port numbers are specified in the SEED_BROKERS option.
"producer.type" Legacy This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are 'async' (asynchronous) and 'sync' (synchronous). Default 'sync'
"compression.codec" Legacy The compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy". Default 'none'.
"compressed.topics" Legacy If the compression codec is anything other than 'none', enable compression only for specified topics if any. If the list of compressed topics is empty, then enable the specified compression codec for all topics.
"message.send.max.retries" Legacy This property will cause the producer to automatically retry a failed send request. This property specifies the number of retries when such failures occur. Default 3.
"topic.metadata.refresh.interval.ms" Legacy The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available...). It will also poll regularly (default: every 10min so 600,000ms). If you set this to a negative value, metadata will only get refreshed on failure. Default 600*1000 (10 minutes).
"request.required.acks" Legacy How many other brokers must have committed the data to their log and acknowledged this to the leader? 0 means never wait; 1 means wait for the leader to acknowledge; -1 means wait for all replicas to acknowledge. Default 0 (never wait).
"queue.buffering.max.ms" Legacy Maximum time to buffer data when using async mode. Default 5000.
"queue.buffering.max.messages" Legacy The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped. Default 10000.
"queue.enqueue.timeout.ms" Legacy The amount of time to block before dropping messages when running in async mode and the buffer has reached queue.buffering.max.messages. If set to 0 events will be enqueued immediately or dropped if the queue is full (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send. Default -1.
"batch.num.messages" Legacy The number of messages to send in one batch when using async mode. The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached. Default 200.
"send.buffer.bytes" Legacy Socket write buffer size. Default 100x1024.
"SHARD_ID" kafka10 This optional parameter is used in scaling, when you are running multiple shards of a pipeline. It uniquely identifies the current shard. When writing metadata/watermarks to kafka, the SHARD_ID is added as a prefix to the SQLSTREAM_POSITION_KEY. It is also included in the automatically generated transaction id. When creating a restartable pipeline, this SHARD_ID is expected to be the same as the SHARD_ID of the source foreign stream.

Additional Kafka Options for Use with JNDI Properties

You can also enable security for the Kakfa10 plugin by configuring a truststore, keystore, and password for the Kafka broker used by s-Server. Because these options involve storing passwords, you should implement these using properties files, so the passwords are not visible in the SQL.

The SQL/MED framework used by s-Server supports loading options through JNDI .properties files. SQLstream s-Server looks for these .properties files in $SQLSTREAM_HOME/plugin/jndi/. See Using Property Files. You can also use the "kafka.consumer.config" or "kafka.producer.config" foreign stream options to reference a Kafka properties file that contains these or other kafka consumer / producer properties.

So we recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using one of the following .properties files:

  • $SQLSTREAM_HOME/plugin/jndi/<foreign_server_name>.properties (Applies to all foreign streams that use this foreign server definition).
  • $SQLSTREAM_HOME/plugin/jndi/LOCALDB.<schema_name>.<foreign_stream_name>.properties (Applies only to the specified foreign stream).
  • A file referenced by "kafka.consumer.config" or "kafka.producer.config" (Applies to all foreign streams that reference the named file).
Option Name Adapter version Description
"ssl.truststore.location" Kafka10 The location of the trust store file.
"ssl.truststore.password" Kafka10 The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled.
"ssl.keystore.location" Kafka10 The location of the key store file. This is optional for client and can be used for two-way authentication for client.
"ssl.keystore.password" Kafka10 The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured.
"ssl.key.password" Kafka10 The password of the private key in the key store file. This is optional for client.

Using Columns to Specify Partition and Key for Kafka Topics

When writing rows out of s-Server to a Kafka topic, you can specify 1) partition and 2) key by including egress columns named, respectively, SQLSTREAM_EGRESS_KAFKA_PARTITION and SQLSTREAM_EGRESS_KAFKA_KEY. Data will be written as a message to the indicated partition in the topic, and SQLSTREAM_EGRESS_KAFKA_KEY will serve as the first part of the key-value pair that constitutes a Kafka message in Kafka.

If you do not include a SQLSTREAM_EGRESS_KAFKA_PARTITION column in the foreign stream, Kafka will assign a partition number to the message being published. If you have included a SQLSTREAM_EGRESS_KAFKA_KEY column, partition number is determined by hashing the SQLSTREAM_EGRESS_KAFKA_KEY column. If you have not included the SQLSTREAM_EGRESS_KAFKA_KEY column, then Kafka assigns partition numbers by a "round robin" method.

If you do not include a SQLSTREAM_EGRESS_KAFKA_KEY column in the foreign stream, (key, value) will be (null,value) where value is the serialized row using the FORMATTER option. Our KAFKA10 plugin excludes the columns SQLSTREAM_EGRESS_KAFKA_KEY & SQLSTREAM_EGRESS_KAFKA_PARTITION columns from getting serialized in values.

Using Atomic Commitments

As of version 6.0.0, s-Server writes messages to Kafka atomically. A multi-step operation (writing multiple records to a database as one transaction) is said to be atomic when it requires all the steps in the transaction to be completed successfully or else rolls back all changes. In a ten-row insertion, for example, either all 10 rows are inserted successfully or none at all.

In order to commit insertions atomically, our adapter makes use of a new Kafka broker that supports transactional semantics. As a result, in order to implement atomic commitments, you must be using Kafka v1.0 or later. When creating a foreign stream that connects to Kafka v1.0 or later, use the prebuilt server KAFKA10_SERVER in the foreign stream definition. (KAFKA_SERVER is the legacy Kafka plugin).

To implement atomic commitments:

  1. Set TRANSACTION_ROWTIME_LIMIT = 60000 (1 minute). The adapter begins a new transaction at the top of a minute (that is, ROWTIME = FLOOR(ROWTIME TO MINUTE))
  2. Start sending messages to the Kafka broker until you receive the last row for the minute
  3. When the first row is received for the next minute, commit the transaction opened earlier and start a new transaction.
  4. Promote ROWTIME to SQLSTREAM_EGRESS_KAFKA_TIMESTAMP (version 6.0.1) or KAFKA_TIMESTAMP (version 6.0.0).

You also need to use an OPTIONS_QUERY in the foreign stream, along the following lines:

CREATE OR REPLACE FOREIGN STREAM KafkaSchema.KafkaReaderStream
( "id" BIGINT,
  "reported_at" TIMESTAMP,
  "lat" DOUBLE,
  "lon" DOUBLE,
  "speed" INTEGER,
  "bearing" INTEGER,
  "driver_no" BIGINT,
  "SQLSTREAM_EGRESS_KAFKA_TIMESTAMP" TIMESTAMP
)
SERVER KAFKA10_SERVER
OPTIONS
(topic 'buses',
"SEED_BROKERS" 'localhost:9092',
"STARTING_TIME" 'LATEST',
"PARSER" 'JSON',
"ROW_PATH" '$',
TRANSACTION_ROWTIME_LIMIT '60000',
character_encoding 'UTF-8',
"OPTIONS_QUERY" 'SELECT * FROM (VALUES(sys_boot.mgmt.watermark_timestamp(''localhost:9092'', ''buses'', ''buses_watermark'', '''',0) - INTERVAL ''6'' SECOND)) AS options(STARTING_TIME)')
;

Using transactional.id for Load Balancing and Recovery

At times, a pipeline may fail. In these cases, s-Server needs to be able to replay the entire pipeline from TRANSACTION_ROWTIME_LIMIT in order to avoid data loss. For high availability situations, we recommend creating duplicate pipelines. Duplicate pipelines will process data in the same order, but will not commit.

You set up duplicate pipelines using Kafka's transactional.id option. These pipelines read from redundant Kafka sessions in a single Kafka producer while ensuring that rows are processed only once. This setup provides a "hot-cold", high availability configuration. That is, when the "hot", or live, node fails, the "cold" node takes over. This configuration requires recovery and replay to rebuild the state (if any) before s-Server begins writing data to the Kafka sink.

To configure high availability mode in s-Server 6.0.0, you must identify:

  1. The transactional.id for the session
  2. The pump for the pipeline.

To configure high availability in s-Server 6.0.1, you can set transaction.id to auto. If you set transaction.id = 'auto', when a pump begins running, s-Server automatically sets transactional.id to <fully_qualified_pump_name>_Pump, where <fully_qualified_pump_name> is the name of the pump that instantiates the sink.

When a single instance of a Kafka egress pipeline is run with the option transactional.id specified for the Kafka sink, the pipeline runs in transactional mode. If s-Server identifies two foreign streams/pipelines that use the same transactional.id, one foreign stream/pipeline is designated the leader and all others are designated followers. s-Server will actively read from the leader only. If the leader fails, one of the followers will be designated the leader, and so on.

If a follower catches up to the leader and the leader has not committed, the follower "sleeps" for a few seconds

If the follower does not see the leader commit, then the follow concludes that leader has probably failed. Follower then topples leader. I will then start committing rows.

Here, transactional.id functions as a "baton". This is what lets the follower declare itself to be leader. With three redundant pipelines running, you have one leader and two followers. If leader fails, one of the other two become leader.

Note: If you do not supply transactional.id as an option in the CREATE FOREIGN STREAM statement, transactional semantics are disabled.

Building and Using an Index Topic

In order to automatically rebalance of assigned partitions or implement exactly once semantics, s-Server relies on a Kafka v0.10.2 feature called Time Index. This feature provides the timestamps that you promote to rowtime, which serve as a natural watermark for the pipeline.

But what if you're working with an earlier version of Kafka, or have other trouble getting timestamps into your pipeline?

In these cases, you can still do automatic partition rebalancing and exactly once semantics, but you will need to create your own timestamps for each message. You can do so by building an index topic.

To do so, you set up a stream that uses the first values and last values of the topic's offset and applies a timestamp at build time. Using the offset range for the actual source topic, we read records from the source topic and return the timestamp from the index topic for all records (messages) in that offset range.

MESSAGE_TIMESTAMP START_OFFSET END_OFFSET
2018-01-01 00:00:01 64000 67903
2018-01-01 00:00:02 67904 68112
2018-01-01 00:00:04 68113 72892

For example:

CREATE OR REPLACE SCHEMA "IndexBuilder";
SET SCHEMA '"IndexBuilder"';

CREATE OR REPLACE FOREIGN STREAM "IndexBuilder"."OrdersTopicIndex" (
  "PARTITION" INTEGER,
  "MESSAGE_TIMESTAMP" TIMESTAMP,
  "START_OFFSET" BIGINT,
  "END_OFFSET" BIGINT
)

SERVER KAFKA10_SERVER
OPTIONS (
  SEED_BROKERS 'localhost:9092,
  TOPIC 'OrdersTopic_index',
  "COMMIT_METADATA_COLUMN_NAME" 'END_OFFSET',
  "transactional.id" 'OrdersTopic_index_builder',
  "pump.name" 'LOCALDB.IndexBuilder.Index1Pump,
  "TRANSACTION_ROWTIME_LIMIT" '1000' -- 1 second
  FORMATTER 'CSV'
);

CREATE OR REPLACE PUMP "IndexBuilder"."Index1Pump" STOPPED AS
INSERT INTO "IndexBuilder"."OrdersTopicIndex"
SELECT STREAM * FROM "IndexBuilder".AggregationView;

Once the index topic is built, you can use it as follows:

CREATE OR REPLACE FOREIGN STREAM OrdersStream (
   MESSAGE_TIMESTAMP TIMESTAMP,
   ...
   ORDER_ID BIGINT,
   ...
)

SERVER KAFKA10_SERVER
OPTIONS (
   SEED_BROKERS 'localhost:9092',
   TOPIC 'OrdersTopic',
   INDEX_TOPIC_NAME_SUFFIX '_index', -- use OrdersTopic_index to determine starting position
   STARTING_TIME '2018-02-01 00:00:00',
   ...,
   PARSER 'JSON'
);

Writing to Kafka Using the ECD Agent

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

FORMATTER=CSV
TOPOC=AggregatedData
"metadata.broker.list"=localhost:9092
FORMATTER=CSV
ROW_SEPARATOR=''
CHARACTER_ENCODING=UTF-8
SCHEMA_NAME=KAFKASOURCE
TABLE_NAME=KAFKAWRITER_STREAM
ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at, VARCHAR(2040) shift_no, VARCHAR(2040) trip_no, VARCHAR(2040) route_variant_id)

Working with Kafka Message Headers

(new in s-Server version 6.0.1) Messages published to a Kafka topic can have associated headers. s-Server lets you both read and write these headers.

Writing Headers to Kafka Based on Column Data

For Kafka10 Sinks, the KAFKA10 adapter supports an option called HEADERS_COLUMN_LIST. This is a comma-separated list of column names (case sensitive) that are turned into Kafka headers in the following form:

column_name1=column_value1,...,column_nameN=column_valueN

For example, the following stream specifies that headers will be written to Kafka using the columns prescribed and highway:

--this line creates a schema for use in the two examples that follow
CREATE OR REPLACE SCHEMA headers_example;

CREATE OR REPLACE FOREIGN STREAM headers_example.headers_egress
(
    "id" BIGINT,
    "reported_at" VARCHAR(32),
    "speed" INTEGER,
    "driver_no" BIGINT,
    "prescribed" BOOLEAN,
    "highway" VARCHAR(8)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
    "FORMATTER" 'JSON',
    "FORMATTER_INCLUDE_ROWTIME" 'false',
    "bootstrap.servers" 'localhost:9092',
    --option for writing headers
    "HEADERS_COLUMN_LIST" 'prescribed,highway',
    "TOPIC" 'headers_test'
);

Reading and Parsing Headers

Once you have written these headers to a Kafka message, you can read them by specifying a stream column called SQLSTREAM_PROV_KAFKA_HEADERS, defined as VARCHAR. (This column is known as a provenance column because it tells you something about the source of your data.)

Headers do not need to have been written by s-Server; the KAFKA10 adapter will read any headings present in a Kafka topic's messages. If there are no headers for the Kafka message, then the column SQLSTREAM_PROV_KAFKA_HEADERS returns null.

Limitations:

  • Header values are assumed to be 'UTF-8' encodings of String values
  • Keys & values in headers cannot contain '<double-quote>', '<newline>', '\='

When you specify this column and select it in a query, it returns headers as key-value pairs, in serialized text format. If a Kafka message has 2 headers ("key1", "value1") & ("key2", "value2"), the value of SQLSTREAM_PROV_KAFKA_HEADERS will be: 'key1=value1<newline-character>key2=value2', where (<newline-character> is the actual newline character).

Each Kafka message may have an arbitrary number of headers (key-value pairs). You can use the Parser UDX to parse SQLSTREAM_PROV_KAFKA_HEADERS in a pipeline using PARSER = 'KV' as a setting for the UDX.

The following is an example of a KAFKA10 foreign stream with SQLSTREAM_PROV_KAFKA_HEADERS specified as a provenance column:

CREATE OR REPLACE FOREIGN STREAM headers_example.headers_ingest
(
    "id" BIGINT,
    "reported_at" TIMESTAMP,
    "prescribed" BOOLEAN,
    "highway" VARCHAR(8),
    "SQLSTREAM_PROV_KAFKA_HEADERS" VARCHAR(256)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
    "PARSER" 'JSON',
    "ROW_PATH" '$',
    "SEED_BROKERS" 'localhost:9092',
    "STARTING_TIME" 'LATEST',
    "isolation.level" 'read_uncommitted',
    "TOPIC" 'headers_test'
);

For a stream with headers written by s-Server as "HEADERS_COLUMN_LIST" 'prescribed,highway' (as we defined above, headers might look like the following. Note that you call SQLSTREAM_PROV_KAFKA_HEADERS using quotation marks. Also note that headers return with a line break.

SELECT STREAM "id", "SQLSTREAM_PROV_KAFKA_HEADERS" FROM headers_example.headers_ingest;

'id','SQLSTREAM_PROV_KAFKA_HEADERS'
'50115871536','prescribed=false
highway=MR549'
'346866848365','prescribed=false
highway=MR620'
'50116198282','prescribed=false
highway=MR184'

Using the Key Value Parser to Parse Kafka Headers

To use the Key Value parser to parse the resulting Kafka headers, you can use code along the following lines.

First, create a view that reads headers:

CREATE OR REPLACE VIEW header_kv_options
( "SEPARATOR"
)
AS
SELECT * FROM (VALUES(u&'\000a'));   -- use newline character as the separator

Next, create a function that calls the Parser UDX:

CREATE OR REPLACE FUNCTION headerParser (
   input cursor,
   columnName varchar(256),
   parserType varchar(256),
   options cursor
  )
  --columns need to be declared with types that match
  --data to be parsed
  RETURNS TABLE(
     "id" BIGINT,               -- original columns
     "reported_at" TIMESTAMP,
     "prescribed" BOOLEAN,      --columns extracted from the headers
     "highway" VARCHAR(8)
)
  LANGUAGE JAVA
  PARAMETER STYLE SYSTEM DEFINED JAVA
  NO SQL
  EXTERNAL NAME 'class com.sqlstream.aspen.namespace.common.ParserUdx.parseColumn';

Finally, create a view that uses the function to parse header information:

  CREATE OR REPLACE VIEW headers_exposed as  
  SELECT STREAM * FROM STREAM(headerParser(&#64;  
        --uses the view defined above as input for the function.
        CURSOR(SELECT stream * FROM headers_example.headers_ingest),&#64;  
        SQLSTREAM_PROV_KAFKA_HEADERS ,&#64;  
        'KV',&#64;  
        CURSOR(select * from HEADER_KV_OPTIONS)));

Using the Kafka Adapter to Process Partitioned Streams

This topic describes an example of setting up SQL to process a pipeline handling one partition. For each partition you can have N pipelines (often on separate instances of s-Server) listening to that partition, where N is your redundancy level. It assumes two Kafka topics have been set up:

  • TransactionData. This takes something like credit card transactions. It should be partitioned with some kind of round robin scheme.
  • AggregatedData. This will be used to communicate between the aggregation pipeline servers an the rollup server.

Each pipeline will

  1. Read from the partition of a topic.
  2. Parse incoming data into columns using the ECDA CSV Parser.
  3. Lookup in address table zipcode for recipient of transaction.
  4. Aggregate by zipcode transaction amounts and counts by second.

In order to ensure all pipelines for the same partition output the same data, the code discards data for the first second's aggregation. This lets you restart an instance of s-Server running pipelines at any time without affecting results.

Results are written to the AggregatedData topic. One or more instances of s-Server will then read that AggregatedData topic, discarding duplicate rows. Aggregates are then rolled up and written to a stream.

CREATE OR REPLACE SCHEMA "kafkaAggregation";
SET SCHEMA '"kafkaAggregation"';
SET PATH '"kafkaAggregation"';

CREATE OR REPLACE SERVER "KafkaServer" TYPE 'KAFKA10'
FOREIGN DATA WRAPPER ECDA

CREATE OR REPLACE FOREIGN STREAM "KafkaPartitionedInputStream"
(KAFKA_OFFSET BIGINT NOT NULL,
"ts" TIMESTAMP NOT NULL,
"cardNumber" BIGINT NOT NULL,
"zipcode" CHAR(10) NOT NULL,
"transactionAmount" DOUBLE NOT NULL,
"recipientId" BIGINT NOT NULL,
"transactionId" BIGINT NOT NULL)
SERVER KAFKA10_SERVER
OPTIONS
(topic 'TransactionData',
"PARTITION" '1',
"SEED_BROKERS" 'localhost',
"PORT" '9092',
"STARTING_TIME" 'latest',
parser 'CSV',
character_encoding 'UTF-8'
);

CREATE OR REPLACE FOREIGN STREAM "KafkaOutputStream"
(
"ts" TIMESTAMP NOT NULL,
"partition" INT NOT NULL,
"zipcode" CHAR(5) NOT NULL,
"transactionTotal" DOUBLE NOT NULL,
"transactionCount" INT NOT NULL)
SERVER "KafkaServer"
OPTIONS
(topic 'AggregatedData',
"metadata.broker.list" 'localhost:9092',
parser 'CSV',
character_encoding 'UTF-8');

-- Source columns we're interested in
CREATE OR REPLACE VIEW "sourceData" AS
SELECT STREAM "ts" AS ROWTIME, "zipcode", "transactionAmount"
FROM "KafkaPartitionedInputStream";

CREATE OR REPLACE FUNCTION "getZipcode"(
​     inputRows CURSOR,
​        dsName VARCHAR(64),
​     tableName VARCHAR(128),
​       colName VARCHAR(128),
​     cacheSize INTEGER,
  prefetchRows BOOLEAN,
  fuzzyLookups BOOLEAN)
  RETURNS TABLE(
​       inputRows.*,
​       "zipcode" CHAR(5))
  LANGUAGE JAVA
  PARAMETER STYLE SYSTEM DEFINED JAVA
  NO SQL
  EXTERNAL NAME '"TableLookup":com.sqlstream.plugin.tablelookup.TableLookup.lookup';

-- adorn with zipcode using in memory lookup if possible
CREATE OR REPLACE VIEW "transactionAndZipcode" AS
SELECT STREAM
FROM TABLE("getZipcode(CURSOR(SELECT STREAM * FROM "sourceData"), 'customerDb', 'address', 'recipientId', 10000000, true, false));

CREATE OR REPLACE VIEW "aggregatedData" AS
SELECT STREAM "zipcode", SUM("transactionAmount") AS "transactionTotal", COUNT(*) AS "transactionCount"
FROM "sourceData"
GROUP BY FLOOR((("sourceData".ROWTIME - timestamp '1970-01-01 00:00:00') second)/10 to second), "zipcode";

-- Creates output pump
-- Does not output first group of rows (all rows in group will have same rowtime)
-- as this group may be partial if restarting after failure.

CREATE OR REPLACE PUMP "aggregatedDataOutputPump" STOPPED AS
INSERT INTO "kafkaAgg1a"."KafkaOutputStream"
SELECT STREAM ROWTIME AS "ts", 1 AS "partition", "zipcode", "transactionTotal", "transactionCount"
FROM (SELECT STREAM *, a.ROWTIME as thistime, FIRST_VALUE(a.ROWTIME) OVER (ROWS UNBOUNDED PRECEDING) as firsttime from "kafkaAgg1a"."aggregatedData"a) b
where firsttime <> thistime;

ALTER PUMP "aggregatedDataOutputPump" START;

Using the Options Query

You can use the Options Query to read from a configuration table. You can then use this table to update adapter options at SQL start time. Usually, this will be when you start application pumps.

You can use this, for example, to set the STARTING_OFFSET option from a table that contains the last offset.

You can create a configuration table (or view) and read from it using the OPTIONS_QUERY option (not property).

The Options Query returns one row. Table column names should be set for property names and row value should be set for property value.

If you set the OPTIONS_QUERY property to

select * from conf

and that query returns 1 row with 1 column called TOPIC containing the value "topic1", then the adapter is configured with the TOPIC property set to "topic1". Each time the adapter runs its configuration gets dynamically computed from the conf table. You can also use views for the OPTIONS_QUERY.

CREATE OR REPLACE FOREIGN STREAM testOut (
KAFKA_OFFSET BIGINT NOT NULL,
line VARCHAR(4096))
SERVER KAFKASERVER
OPTIONS (TOPIC 'testThroughput', OPTIONS_QUERY 'select lastOffset as STARTING_OFFSET from TEST.committedOffset');

OPTIONS_QUERY is supported for all sources, sinks formatters and parsers.

Using the Kafka Adapter for Fault Tolerance

To ensure fault tolerance, you can set up multiple instances of s-Server to listen to each partition, using a program like Puppet to start servers and pipelines. You can run more than one pipeline on each version of s-Server, but you cannot have multiple pipelines on the same server listening to the same partition.

The diagram below shows multiple instances of s-Server listening to Kafka partitions. This prevents data loss if an s-Server instance goes down.

Adding and Removing Processing Nodes for Kafka

After aggregating data in multiple instances of s-Server, you can create one or more s-Server pumps in order to write to Kafka topics, and add and remove these pumps using the ALTER PUMP command.

Implementing the Kafka Adapter to Pump Data to Kafka

Every time you use an adapter, you need to implement it within an s-Server schema. The following code first creates a schema and implements the Kafka ECDA adapter.

CREATE OR REPLACE SCHEMA "kafkaAggregation";
SET SCHEMA '"kafkaAggregation"';
SET PATH '"kafkaAggregation"';

CREATE OR REPLACE SERVER "KafkaServer" TYPE 'KAFKA'
FOREIGN DATA WRAPPER ECDA;

CREATE OR REPLACE FOREIGN STREAM "KafkaAggregatedData"
(offset LONG NOT NULL,
"ts" TIMESTAMP NOT NULL,
"PARTITION" INT NOT NULL,
"zipcode" CHAR(5) NOT NULL,
"transactionTotal" DECIMAL(18,2) NOT NULL,
"transactionCount" INT NOT NULL)
SERVER "KafkaServer"

OPTIONS
(topic: 'AggregatedData',
seed_brokers: 'localhost',
starting_time: 'latest',
parser 'CSV',
character_encoding 'UTF-8'
);

Setting up the Pump

The code below creates a stream with three columns, zipcode, transactionTotal, and transactionCount. This stream will be used to pump data from a Kafka topic.

CREATE OR REPLACE STREAM "AggregatedData"(
"zipcode" CHAR(5) NOT NULL,
"transactionTotal" DECIMAL(18,2) NOT NULL,
"transactionCount" INT NOT NULL);

The next code block creates a view on the foreign stream KafkaAggregatedData, ordered by timestamp ("ts") and selecting the columns PARTITION, zipcode, transactionTotal, and transactionCount.

CREATE OR REPLACE VIEW "AggregatedDataWithRowTime" AS
SELECT STREAM "ts" AS ROWTIME, "PARTITION", "zipcode", "transactionTotal", "transactionCount"
FROM "KafkaAggregatedData"
ORDER BY "ts" WITHIN INTERVAL '2' SECOND;

The next code block uses a WHERE statement to identify and discard duplicate rows.

CREATE OR REPLACE VIEW "AggregatedData" AS
SELECT STREAM "partition", "zipcode", "transactionTotal", "transactionCount"
FROM (SELECT STREAM *,
COUNT(*) OVER (PARTITION BY "partition", "zipcode" RANGE INTERVAL '0' SECOND PRECEDING) AS c
FROM "AggregatedDataWithRowTime") AS dd
WHERE c = 1;

The next code block pumps the the columns zipcode, the total of column transactionTotal and the total of column transactionCount.

CREATE OR REPLACE PUMP "rollupPump" STOPPED AS
INSERT INTO "AggregatedData"
SELECT STREAM "zipcode", sum("transactionTotal"), sum("transactionCount")
FROM "AggregatedDataDeDuped"
GROUP BY "AggregatedDataDeDuped".ROWTIME, "zipcode";

Starting and Stopping the Pump

You can then start and stop the pump using the ALTER PUMP command:

ALTER PUMP "rollupPump" START;
ALTER PUMP "rollupPump" STOP;

See the topic ALTER PUMP in the SQLstream Streaming SQL Reference Guide for more details on starting and stopping pumps.

Using the TableLookup UDX to Prefetch a Partitioned Part of a Kafka Topic

Once you have read in data from Kafka and created a view of this data, as in Using the Kafka ECDA for Fault Tolerance, you can use the TableLookup UDX to prefetch a partitioned portion of a database to form a preloaded cache. The code below prefetches zip codes.

CREATE OR REPLACE FUNCTION "getZipcode"(
  inputRows CURSOR,
    dsName VARCHAR(64),
  tableName VARCHAR(128),
   colName VARCHAR(128),
  cacheSize INTEGER,
 prefetchRows BOOLEAN,
 fuzzyLookups BOOLEAN)
 RETURNS TABLE(
   inputRows.*,
   "zipcode" CHAR(5))
 LANGUAGE JAVA
 PARAMETER STYLE SYSTEM DEFINED JAVA
 NO SQL
 EXTERNAL NAME '"TableLookup":com.sqlstream.plugin.tablelookup.TableLookup.lookup';

You can then use the function to prefetch zip code from a partitioned portion of a database:

CREATE OR REPLACE VIEW "transactionAndZipcode" AS
SELECT STREAM
FROM TABLE("getZipcode(CURSOR(SELECT STREAM * FROM "sourceData"), 'customerDb', 'address', 'recipientId', 10000000, true, false));

Storing Watermarks in Kafka for Exactly Once

The source partition need not be a kafka partition - it can be whatever kind of partitioning is supported by a Kafka or non-Kafka plugin (such as File-VFS).

Please see Scaling in File-VFS for more information about partitioning in the File-VFS plugin and also see Using Exactly Once with VFS or S3 as a Source for a full example.

For Kafka to Kafka, see Using Exactly Once with Kafka as a Source.

If the source plugin is File-VFS, Kafka plugin supports storing metadata in following ways:

  1. For a single File-VFS source partition: When using a kafka sink with a source with single partition, set INGRESS_NUM_PARTITIONS to 1 and INGRESS_ASSIGNED_PARTITIONS to ':'.
  2. For multiple source partitions: To support horizontal scaling, you could create multiple source partitions and distrubute these input partitions over multiple instances. Read about Scaling in File-VFS for more details.

Kafka stores watermarks in this way: sourcePartitionId, sourcePartitionWatermark - see the glossary term shard. The File-VFS source foreign stream definition could defined as below:

  1. Working with a single source partition and shard id as empty:

    SHARD_ID ''
    INGRESS_ASSIGNED_PARTITIONS ':'
    INGRESS_NUM_PARTITIONS 1
  2. Working with 2 source partitions and shard id not empty. Also, only 1 partition is assigned to 1 shard.

    SHARD_ID '0'
    INGRESS_ASSIGNED_PARTITIONS '0'
    INGRESS_NUM_PARTITIONS 2
    SHARD_ID '1'
    INGRESS_ASSIGNED_PARTITIONS '1'
    INGRESS_NUM_PARTITIONS 2
  3. Working with source partitions and shard id not empty. Also, multiple partitions are assigned to 1 shard.

    SHARD_ID '0'
    INGRESS_ASSIGNED_PARTITIONS '0,1'
    INGRESS_NUM_PARTITIONS 4
    SHARD_ID '1'
    INGRESS_ASSIGNED_PARTITIONS '2,3'
    INGRESS_NUM_PARTITIONS 4

The Kafka sink foreign stream stores metadata in the output sink topic in the this way: partitionId, watermark.

For case 1 above, when shard id is empty and there is only a single partition, the metadata would be stored as:

0,watermark_for_0_source_partition

If the shardid is not empty, it is appended to the metadata partitionId alongwith '' character. So, the metadata becomes shardId_partitionIdAssingedToThisShard, watermark. For case 2 above, when shard id is not empty and there are multiple source partitions but only 1 partition is assigned to each shard, the metadata would be stored as:

0_0,watermark_for_0_source_partition
1_1,watermark_for_1_source_partition

For case 3 above, when shard id is not empty and there are multiple source partitions assigned to each shard, the metadata would be stored as:

0_0,watermark_for_0_source_partition
0_1,watermark_for_1_source_partition
1_2,watermark_for_2_source_partition
1_3,watermark_for_3_source_partition

A user can fetch the metadata using any of the available Kafka Watermark Functions.

Kafka Watermark Functions

The Kafka10 plugin supplies the following functions to retrieve source watermark information that has been stored in Kafka:

Please see Storing Watermarks in Kafka for Exactly Once for more information.

For each watermark type there are two functions, the name of the second ends with _with_config_file and works with secure kafka by accepting an additional parameter - the name of a Kafka (SSL) properties file.

sys_boot.mgmt.watermark_offsets

This function returns watermark offsets to be used during replay/recovery from the source partitions of a pipeline. The topic passed as a parameter, however, is the name of the sink topic. As transactions are committed to the sink, the commit metadata saves the commit timestamp and, optionally, the offset for the source topic. This function returns these offsets for each partition of the topic

Note: The consumer_group, for now, should be '_watermark'.

CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_offsets(
  bootstrap_servers VARCHAR(128),
  sink_topic VARCHAR(128),
  consumer_group VARCHAR(128)
)
RETURNS TABLE (
  "TOPIC" VARCHAR(128),
  "PARTITION" INTEGER,
  "OFFSET" BIGINT
)

LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'sys_boot.sys_boot.kafka10:com.sqlstream.aspen.namespace.kafka.KafkaWatermark.getWatermarkOffsetsFromIndexTopic';

sys_boot.mgmt.watermark_offsets_from_index_topic_with_config_file

Same as watermark_offsets but with added support for secured kafka.

CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_offsets_from_index_topic_with_config_file(
    bootstrap_servers VARCHAR(1024),
    topic VARCHAR(128),
    consumer_group VARCHAR(128),
    config_file_path VARCHAR(128)
)
RETURNS TABLE (
    "KAFKA_TOPIC" VARCHAR(128),
    "KAFKA_PARTITION" INTEGER,
    "KAFKA_OFFSET" BIGINT
)
...

sys_boot.mgmt.watermark_timestamp

This function returns a timestamp value, which is the maximum timestamp committed across all partitions in the topic for the input consumer group. Typically, this would be the ROWTIME of the last row committed in the transaction.

CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_timestamp(
    bootstrap_servers VARCHAR(1024),
    topic VARCHAR(128),
    consumer_group_prefix VARCHAR(128),
    partition_id_range VARCHAR(128),
    max_num_partitions INTEGER
)
RETURNS TIMESTAMP

LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'sys_boot.sys_boot.kafka10:com.sqlstream.aspen.namespace.kafka.KafkaWatermark.getWatermarkTimestamp';

For examples of using this function please see:

sys_boot.mgmt.watermark_timestamp_with_config_file

Same as watermark_offsets, with added support for secured kafka.

CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_timestamp_with_config_file(
    bootstrap_servers VARCHAR(1024),
    topic VARCHAR(128),
    consumer_group_prefix VARCHAR(128),
    partition_id_range VARCHAR(128),
    max_num_partitions INTEGER,
    config_file_path VARCHAR(128)
)
RETURNS TIMESTAMP
...

sys_boot.mgmt.watermark_string

This function returns watermark string to be used during replay/recovery from the source partitions of a pipeline. The topic passed as a parameter, however, is the name of the sink topic. As data is committed to the sink Kafka topic, the plugin saves the watermark for the source foreign stream. This function returns these watermarks which can be used in the source to ensure exactly-once-semantics. This function returns a string, delimited by newline ('\n') characters. Each line of the string represents the entity `consumer_group, watermarkString'. Please refer kafka sink metadata storage for sample scenarios.

CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_string(
    bootstrap_servers VARCHAR(1024),
    topic VARCHAR(128),
    consumer_group_prefix VARCHAR(128),
    partition_id_range VARCHAR(128),
    max_num_partitions INTEGER
)
RETURNS VARCHAR(512)
...

For example, if 2 shards are running on your cluster:

SHARD_ID '0'
INGRESS_ASSIGNED_PARTITIONS '0,1'
INGRESS_NUM_PARTITIONS 4

SHARD_ID '1'
INGRESS_ASSIGNED_PARTITIONS '2,3'
INGRESS_NUM_PARTITIONS 4

The metadata returned from invoking watermark_string would be:

0_0,watermark_for_0_source_partition
0_1,watermark_for_1_source_partition
1_2,watermark_for_2_source_partition
1_3,watermark_for_3_source_partition

Here is the sample usage for invoking a UDX function:

CREATE OR REPLACE VIEW "sampleschema"."START_POSITION_QUERY" AS
SELECT * FROM (VALUES(sys_boot.mgmt.watermark_string('bootstrap_servers', 'topic_name', 'consumer_group_prefix','assigned_partitions',total_partitions))) AS options(STARTING_POSITION);

In the above query, please use consumer_groupprefix as 'shardId'+'' i.e. "{{shardid}}_" if shard_id is not empty. If shard_id is empty, then consumer_group_prefix should be set as empty('') when running the above queries.

For a bigger example see Reading from File-VFS in the Exactly Once topic in this guide.

sys_boot.mgmt.watermark_string_with_config_file

Same as watermark_string, with added support for secured kafka.

CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_string_with_config_file(
     bootstrap_servers VARCHAR(1024),
     topic VARCHAR(128),
     consumer_group_prefix VARCHAR(128),
     partition_id_range VARCHAR(128),
     max_num_partitions INTEGER,
     config_file_path VARCHAR(128)
)
RETURNS VARCHAR(512)
...

Here is the sample usage for invoking a UDX function, when working with a secure kafka:

CREATE OR REPLACE VIEW "sampleschema"."START_POSITION_QUERY" AS
SELECT * FROM (VALUES(sys_boot.mgmt.watermark_string_with_config_file('bootstrap_servers', 'topic_name', 'consumer_group_prefix','assigned_partitions',total_partitions, 'config_file_path'))) AS options(STARTING_POSITION);

sys_boot.mgmt.pump_mode

This function returns the HA mode (Leader/Follower) of a pump.

CREATE OR REPLACE FUNCTION sys_boot.mgmt.pump_mode(
  pump_name VARCHAR(128)
)
RETURNS VARCHAR(256)

LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'sys_boot.sys_boot.kafka10:com.sqlstream.aspen.namespace.kafka.KafkaWatermark.getMode';