Using s-Server, you can read from and write to Kafka. To read and write from local locations, you configure and launch the adapter in SQL, using either server or foreign stream/table options. To read and write from remote locations, you configure such options using a properties file and launch the Extensible Common Data agent at the command line.
Using Kafka, you can perform exactly once semantics. For more detail, see the topic exactly once in this guide.
This topic contains the following subtopics:
s-Server's Kafka adapter instantiates a Kafka consumer, configured using either SQL or a properties file. To read from local locations, you configure and launch the adapter in SQL, using either server or foreign stream/table options. Data can be in CSV, Avro, XML, JSON, or BSON, or other formats. You specify a parser as part of configuration options. See Reading from Kafka Using SQL below. To read from remote locations, you configure such options using a properties file and launch the agent at the command line. See Reading from Kafka Using the ECD Agent below. See the overview at Reading from Other Sources for more details.
The Kafka_10 adapter is based on the Kafka client API v.0.10.0.X. The adapter leverages features available on Kafka brokers up to v.10.0.X. This API works with Kafka v1.0 and with the Confluent distribution from at least v3.3 -v4.0.
As of Kafka 0.10, every Kafka message has a timestamp field, specifying the time when the message was produced. The new adapter uses this feature to let you start processing at a given timestamp. Basically, the adapter identifies all topic partitions where the message timestamp for all subsequent messages is greater than or equal to the specified STARTING_TIME.
Once you set up a foreign stream for Kafka in s-Server, each query on the stream reads from a Kafka topic via a Kafka consumer. If multiple queries are run on the same stream, and the Kafka topic has partitions, Kafka will distribute data among these queries according to partition.
In many cases, you may need to aggregate multiple queries in order to achieve the right results. You can also specify a partition in foreign stream options. See Using the Kafka Adapter to Process Partitioned Streams below.
The s-Server trace log includes information on readers' and parsers' progress. See Periodic Parser Statistics Logging in the Administering Guavus SQLstream guide. These errors are also logged in the Global Error Stream.
To read from Kafka, you need to create a foreign stream in SQL that references a prebuilt server object called KAFKA_SERVER. The foreign stream's definition contains connection information for the Kafka server. In Kafka's case, you can also define provenance columns that let you pass metadata on the Kafka source into s-Server. See Using Provenance Columns below. For more details on creating a foreign stream, see the CREATE FOREIGN STREAM topic in the SQLstream Streaming SQL Reference Guide.
You will also need to specify a parser for the foreign stream, such as 'CSV'. Specifying "parser" as a foreign stream option tells s-Server that this foreign stream reads data. See Parsers for Reading in this guide for more details.
Streams, like most SQL objects (but unlike data wrappers and servers), should be created within a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named KafkaStream.
CREATE OR REPLACE SCHEMA KafkaSchema`;
SET SCHEMA 'KafkaSchema';
CREATE OR REPLACE FOREIGN STREAM KafkaReaderStream
("id" BIGINT,
"reported_at" TIMESTAMP,
"lat" DOUBLE,
"lon" DOUBLE,
"speed" INTEGER,
"bearing" INTEGER,
"driver_no" BIGINT)
SERVER KAFKA10_SERVER
OPTIONS
(topic 'mytopic',
"SEED_BROKERS" 'localhost:9092',
"STARTING_TIME" 'LATEST',
"PARSER" 'JSON',
"ROW_PATH" '$',
character_encoding 'UTF-8'
)
;
As with all ECDA adapters, data is only moved from Kafka to s-Server once you execute a SELECT statement on the foreign stream that you have defined for the adapter. (Often, you will define a SELECT statement on the foreign stream as part of a pump, in which case data moves once you start the pump.) For example, the following code block copies the values of the columns "speed" and "driver_no" into s-Server using the foreign stream defined above, KafkaSourceStream.
SELECT STREAM "speed", "driver_no" from KafkaSource.KafkaReaderStream;
In reading from Kafka, you can declare provenance columns. These return metadata for the Kafka topic from which you are reading.
These are as follows:
Data Type | Name in s-Server 6.0.0 | Name in s-Server 6.0.1+ | Kafka version | Description |
---|---|---|---|---|
VARCHAR(256) | KAFKA_TOPIC | SQLSTREAM_PROV_KAFKA_TOPIC | Kafka v.0.10.2 or later | Returns name of Kafka topic. |
INTEGER | KAFKA_PARTITION | SQLSTREAM_PROV_KAFKA_PARTITION | Kafka v.0.10.2 or later | Returns value of current Kafka partition. |
INTEGER | PARTITION | SQLSTREAM_PROV_KAFKA_PARTITION | Kafka v.0.8.2 | Returns value of current Kafka partition. |
BIGINT | KAFKA_OFFSET | SQLSTREAM_PROV_KAFKA_OFFSET | Kafka v.0.10.2 or later | Returns value of current Kafka offset. |
BIGINT | OFFSET | SQLSTREAM_PROV_KAFKA_OFFSET | Kafka v.0.8.2 | Returns value of current Kafka offset. |
TIMESTAMP | KAFKA_TIMESTAMP | SQLSTREAM_PROV_KAFKA_TIMESTAMP | Kafka v.0.10.2 or later | Returns current Kafka_timestamp. |
VARBINARY(256) | KAFKA_KEY | SQLSTREAM_PROV_KAFKA_KEY | Kafka v.0.10.2 or later | Returns key value for message. |
BINARY(32) | PAYLOAD_PREFIX | SQLSTREAM_PROV_KAFKA_PAYLOAD_PREFIX | Kafka v.0.10.2 or later | Returns key value for message. |
VARCHAR(1000) | N/A | SQLSTREAM_PROV_KAFKA_HEADERS | Kafka v.0.10.2 or later | Returns any headers for Kafka message topics as key-value pairs in serialized text format. See Reading and Parsing Headers. |
Some of the following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation/#consumerconfigs. Where appropriate, information in this table is drawn from this page.
Options shown in lower case are case sensitive; they must be lowercase and double-quoted. Options shown in upper case may be entered either unquoted in any case, or double-quoted in upper case. So TOPIC, Topic and topic are all valid; "TOPIC" is valid but "topic" is not.
Some options may not apply to all versions of the Kafka plugin. Each option below is marked as 'Kafka10', 'Legacy' or 'All':
Option | Adapter version | Description |
---|---|---|
TOPIC | All | Required. Kafka Topic. You can use a regular expression as a "topic wild card". (This is not supported by legacy versions of the adapter.) |
SEED_BROKERS | All | A comma separated list of broker identifiers in the format _<broker_hostname>:<port>. For legacy adapter, this is a comma separated list of broker hosts. Defaults to "localhost". |
PARTITION | All | Partition number to read from. If reading from a kafka topic with multiple partitions and PARTITION is omitted or blank all partitions will be read from. You can specify a single partition with PARTITION or a range with PARTITION -(Range needs two partitions - eg 1-3 - and is inclusive.) Note: Partition numbers are 0 based. |
PORT | Legacy | Deprecated since 6.0.1 Port for Kafka seed brokers |
STARTING_TIME | All | Either EARLIEST, LATEST, or a timestamp in the format 'yyyy-MM-dd HH:mm:ss.SSS', such as '2018-02-01 22:23:45:892'. Default LATEST. When STARTING_TIME is a timestamp, the Kafka adapter "seeks" to offsets within all topic partitions where the message timestamp for all subsequent messages is greater than or equal to the specified STARTING_TIME. Requires Kafka v0.10.2 or later. For the legacy Kafka adapter, options are EARLIEST or LATEST. |
STARTING_OFFSET | All | Where to start reading from (default is -1, the oldest offset) as a long int representing the physical offset within a partition |
INDEX_TOPIC_NAME | Kafka10 | This option specifies the name of the index topic to be used for mapping message offsets to timestamps. For more details, refer to Building and Using an Index Topic in the Reading from Kafka topic of the s-Server Integration Guide. Index topic may be created on a separate Kafka cluster. |
INDEX_TOPIC_SEED_BROKERS | Kafka10 | The broker(s) where the index topic referenced by INDEX_TOPIC_NAME is managed |
MAX_POLL_RECORDS | Kafka10 | Maximum number of records to be polled (fetched) through the new KafkaConsumer.poll() API call. For best throughput during replay, this needs to be set such that you get a "page" full (1MB is the default) of kafka messages from each partition of the topic(s). It can be roughly calculated as: (numPartitions * 1 MB) / typicalMessageSize |
PARTITION_OFFSET_QUERY | Kafka10 | This is a SQL query that fetches starting offsets, Kafka partition and Kafka topic for all topic partitions. For both the legacy Kafka adapter and the Kafka10 adapter, you can use a query along the following lines: PARTITION_OFFSET_QUERY 'SELECT "KAFKA_PARTITION","KAFKA_TOPIC","KAFKA_OFFSET" FROM from stored_offsets' PARTITION should be of type INTEGER. OFFSET should be of type BIGINT. Any partition for which the query does not return anything will either use STARTING_OFFSET or STARTING_TIME to determine where to start. |
CLIENT_ID | All | For the Kafka10 adapter, it is vital to understand that this is the consumer group id (so is used to set consumer group property "group.id"). Client key for Yammer metrics. CLIENT_ID defaults to client{TOPIC} or CLIENT{TOPIC}{PARTITION} if PARTITION is specified. CLIENT_ID and METRICS_PER_PARTITION affect Kafka Yammer metrics reporting. CLIENT_ID does not apply unless METRICS_PER_PARTITION is set to true. See http://docs.confluent.io/1.0/kafka/monitoring.html for more information on Kafka Yammer metrics. |
OPTIONS_QUERY | All | Lets you query a table to update adapter options at runtime. You can use this, for example, to set the STARTING_OFFSET option from a table that contains the last offset, as in select lastOffset as STARTING_OFFSET from TEST.committedOffset; |
SCHEMA_ID | All | If set >= 0, the reader will treat the first byte of data as magic byte, and the next 4 bytes as a schema ID, and will discard any record where the first byte does not match the SCHEMA_ID. Default -1. |
PARSER_PAYLOAD_OFFSET | Kafka10 | The number of bytes to skip at the start of the record to get the value - in case the data is offset (by a schema ID or some other fixed-length data). Default 0. |
"kafka.consumer.config" | Kafka10 | Lets you specify the name of a properties file that contains a set of Kafka consumer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the consumer will invoke. Kafka offers a wide range of config properties. For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#newconsumerconfigs Note: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "SEED_BROKERS". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime. |
"isolation.level" | Kafka10 | Lets you specify whether s-Server should read all Kafka messages or only committed messages. Options are read_uncommitted, or read_committed This option lets you account for transactions, a Kafka 0.11.0 feature whereby applications can write to multiple topics and partitions atomically. To use atomic commitments, you need to configure the Kafka adapter to only read committed data--this is, read_committed. |
METRICS_PER_PARTITION | Legacy | True or False. If METRICS_PER_PARTITION is false, then CLIENT_ID will be used as the client key for all yammer metrics reporting for that adapter. If METRICS_PER_PARTITION is true, then the actual partition number will be appended to each to each client_id (and finer grained metrics will be reported). |
FETCH_SIZE | Legacy | Fetch size. Defaults to 1000000. |
You can also enable security for the Kakfa10 plugin by configuring a truststore, keystore, and password for the Kafka broker used by s-Server. Because these options involve storing passwords, you should implement these using properties files, so the passwords are not visible in the SQL.
The SQL/MED framework used by s-Server supports loading options through JNDI .properties files. SQLstream s-Server looks for these .properties files in $SQLSTREAM_HOME/plugin/jndi/. See Using Property Files. You can also use the "kafka.consumer.config" or "kafka.producer.config" foreign stream options to reference a Kafka properties file that contains these or other kafka consumer / producer properties.
So we recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using one of the following .properties files:
Option Name | Adapter version | Description |
---|---|---|
"ssl.truststore.location" | Kafka10 | The location of the trust store file. |
"ssl.truststore.password" | Kafka10 | The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled. |
"ssl.keystore.location" | Kafka10 | The location of the key store file. This is optional for client and can be used for two-way authentication for client. |
"ssl.keystore.password" | Kafka10 | The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured. |
"ssl.key.password" | Kafka10 | The password of the private key in the key store file. This is optional for client. |
On the Kafka side, the KafkaConsumer API uses the property group.id as the consumer group id to support shared subscriptions among multiple consumers. Kafka uses group.id to identify consumers belonging to the same consumer group. Each group's consumers divide the topic partitions among themselves, such that each partition is only consumed by a single consumer.
All consumer instances with the same value for group.id share a single logical Kafka subscription. That is, each consumer instance reads messages from a subset of partitions of subscribed topic(s).
In order to have s-Server read from the same logical Kafka subscription, you need to configure how s-Server handles group.id. In s-Server 6.0.0, s-Server uses the value for the option CLIENT_ID in a foreign stream as the value for group.id for each KafkaConsumer instance.
(new in s-Server version 6.0.1) As of s-Server v6.0.1, s-Server no longer uses CLIENT_ID as group.id for the Kafka consumer API.
Instead, s-Server determines the value for group.id in the following way:
Since group.id is set to sessionID at runtime, queries running on the foreign stream will be considered to be a distributed query instance if they have the same sessionID.
When you read from Kafka using a pump, s-Server assigns group.id based the name of the pump and the name of the schema in which the pump is running (<fully_qualified_pump_name>). For example, for a pump called "myschema"."my_kafka_source_pump" s-Server sets sessionID = 'LOCALDB.myschema.my_kafka_source_pump_Pump'.
When using pumps, in order to distribute data across queries running on multiple instances of s-Server, you need to create schemas with the same names (such as "myschema") that contain pumps with the same names (such as "my_kafka_source_pump").
If you create the schema myschema on each node in a federation of s-Servers, the pump, my_kafka_source_pump, starting on each of those nodes will get the same sessionID when the pump is started (at runtime) and each one of those pumps will be processing data from a subset of topic partitions.
You can append numbers to pump names. These will be chopped off at run time. For example, pumps with names such as "my_kafka_source_pump1", "my_kafka_source_pump2", ..., "my_kafka_source_pumpN" will generate the same group.id. By using trailing numbers, you can create distributed pipelines on a single node and a single schema. In other words, you can run multiple pumps in the same schema with appended numbers, and these pumps will be assigned the same sessionID. (This is how StreamLab currently handles pumps.)
If the query is not running as a part of a pump, then s-Server assigns a unique sessionID at runtime. That is, queries running thru sqllineClient and WebAgent are considered independent queries. These create and use their own group.id every time a query is submitted.
You can also explicitly set group.id using the foreign stream option group.id. This ensures that all foreign stream queries (which are processed as Kafka subscriptions) against all topics are synchronized. This ensures, for example, that stream-stream joins advance all streams' clocks in a synchronized manner.
You can use the ECD agent to read files from remote locations. See Writing Data to Remote Locations for more details.
The ECD agent takes similar options as the ones you format in SQL, but these options need to be formatted in a properties file along the lines of the following.
TOPIC=mytopic
SEED_BROKERS=localhost:9092
STARTING_TIME=LATEST
max_poll_records '2MB'
PARSER=JSON
ROW_PATH=$
CHARACTER_ENCODING=utf-8
SCHEMA_NAME="KafkaSource"
TABLE_NAME="KafkaSourceStream"
ROWTYPE=RECORDTYPE(BIGINT NOT NULL kafka_offset, TIMESTAMP NOT NULL ts, INT NOT NULL KAFKA_PARTITION, CHAR(10) NOT NULL zipcode, DOUBLE NOT NULL transactionTotal, INT NOT NULL transactionCount)
The Kafka ECDA adapter writes batches of data to a Kafka topic. In order to write to a Kafka topic, you must first define a server object for the Kafka server with information on its seed broker(s) and topic (at minimum). s-Server writes to Kafka in data formatted as CSV, JSON, XML, or BSON.
The Kafka 10 adapter works with Kafka 0.10. It uses atomic commitments. Using it, you can perform exactly once semantics between s-Server and Kafka. See the topic exactly once semantics.
For adapters, you configure and launch the adapter in SQL, using either server or foreign stream/table options. For agents, you configure such options using a properties file and launch the agent at the command line. The topic CREATE FOREIGN STREAM in the Streaming SQL Reference Guide has a complete list of options for the ECD adapter. To actually write data, you INSERT into the foreign stream.
Note: Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.
Like all streams, foreign streams must be defined within schemas. The following code first creates a schema called KafkaSchema then creates a foreign stream called KafkaWriterStream with the predefined server KAFKA10_SERVER as a server option. To transfer data into Kafka using this stream, you will need to INSERT into it. This step simply sets up the stream, with named columns and Kafka-specific options. (These options are discussed below.)
You can pass many of the options described at http://kafka.apache.org/documentation/#brokerconfigs, such as compression.type, to the Kafka writer. See Foreign Stream Options for Writing to Kafka below.
The example below uses a source stream created using a demonstration script located at $SQLSTREAM_HOME/demo/data/buses/StreamXmlBusData.sh.
Here is an example of the SQL used to define a foreign stream for the Kafka adapter. The code first creates and sets a schema. If you have run the code to create the source stream, you should omit the lines that create and set the schema.
--omit the two lines below if you have run the code to create the source stream
CREATE OR REPLACE SCHEMA KafkaSchema;
SET SCHEMA 'KafkaSchema';
CREATE OR REPLACE FOREIGN STREAM KafkaSchema.buses_kafka_egress
(
"id" BIGINT,
"reported_at" VARCHAR(32),
"speed" INTEGER,
"driver_no" BIGINT,
"prescribed" BOOLEAN,
"gps" VARCHAR(128),
"highway" VARCHAR(8)
)
--this is the prebuilt server for Kafka10
SERVER KAFKA10_SERVER
OPTIONS (
"FORMATTER" 'JSON',
"FORMATTER_INCLUDE_ROWTIME" 'false',
"bootstrap.servers" 'localhost:9092',
"TOPIC" 'buses'
);
CREATE OR REPLACE PUMP KafkaSchema.source_to_buses_pump STOPPED AS
INSERT INTO KafkaSchema.buses_kafka_egress
SELECT STREAM * FROM KafkaSchema.buses;
To start writing data, use the following code:
ALTER PUMP KafkaSchema.* START;
To test the stream, use the following code:
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic buses --from-beginning
Note: This code assumes that you have added KAFKA_HOME to your path.
These special egress columns are supported by the KAFKA10 plugin. Data arriving in these columns is not included in the outgoing Kafka message payload; instead they are used to set other message variables.
Name | Data Type | Description | Name in s-Server 6.0.0 |
---|---|---|---|
SQLSTREAM_EGRESS_KAFKA_PARTITION | INTEGER | This column is used to determine to which partition the message will be written. NOTE: SQLstream uses this approach and does not support use of the Kafka producer property "partitioner.class" |
KAFKA_PARTITION |
SQLSTREAM_EGRESS_KAFKA_TIMESTAMP | TIMESTAMP | This column is used to set the Kafka message timestamp | KAFKA_TIMESTAMP |
SQLSTREAM_EGRESS_KAFKA_KEY | VARBINARY(256) | This column is used as the message key | KAFKA_KEY |
SQLSTREAM_EGRESS_WATERMARK | VARCHAR(256) | This column is used as a watermark, and is saved when committing to the sink Kafka topic. The developer should set it to whatever watermark format is supported by the source plugin. The format of the stored watermark string is as follows: <CommitRowtimeInMillisFromEpoch>,<watermark_column_value>.
|
- |
Some of the following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation/#producerconfigs. Where appropriate, information in this table is drawn from this page.
Options shown in lower case are case sensitive; they must be lowercase and double-quoted. Options shown in upper case may be entered either unquoted in any case, or double-quoted in upper case. So TOPIC, Topic and topic are all valid; "TOPIC" is valid but "topic" is not.
Some options may not apply to all versions of the Kafka plugin. Each option below is marked as 'Kafka10', 'Legacy' or 'All':
Option Name | Adapter version | Description | |
---|---|---|---|
TOPIC | All | Kafka topic | |
"bootstrap.servers" | Kafka10 | hostname:port[,hostname:port] of the Kafka broker(s). Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers. | |
SEED_BROKERS | Legacy | A comma separated list of broker identifiers in the format "<broker_host_name>:<port>". Defaults to "localhost". | |
"metadata.broker.list" | Legacy | hostname:port of the Kafka broker. Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers. | |
"key.serializer" | Kafka10 | Names a serializer class for keys. If no class is given, Kafka uses value.serializer. | |
"key.serializer.class" | Legacy | Names a serializer class for keys. If no class is given, Kafka uses serializer.class. | |
"value.serializer" | Kafka10 | Names a serializer class for values. | |
"serializer.class" | Legacy | Names a serializer class for values. The default encoder takes a byte[] and returns the same byte[]. | |
"partitioner.class" | All | Fully qualified Java classname of Kafka partitioner. Defaults to com.sqlstream.aspen.namespace.kafka.KafkaOutputSink$RoundRobinPartitioner | |
"compression.type" | Kafka10 | If desired, specifies the final compression type for a given topic. Defaults to 'none'. Possible values: 'snappy', 'gzip' | |
"retry.backoff.ms" | All | Producers refresh metadata to see if a new leader has been elected. This option specifies the amount of time to wait before refreshing. | |
"request.timeout.ms" | All | When request.required.acks is enabled, this lets you specify how long the broker should try to bundle the specified number of messages before sending back an error to the client. | |
"send.buffer.bytes" | All | Socket write buffer size. | |
"client.id" | All | Using this option, you can specify a string to help you identify the application making calls to the Kafka server. | |
"transactional.id" | Kafka10 | This is the transaction ID used by the KafkaWriter instance for the given foreign stream. Each foreign stream should use a unique transactional.id to publish messages to the topic using transactions. Transactions are used only if Kafka brokers are v0.11.2. These support transactional semantics. NOTE: You need to create a separate foreign stream definition for each pump that inserts (publishes) messages to a given topic. Each of these foreign streams needs to use a unique "transactional.id" for itself. The foreign stream option "pump.name", defined below, needs to match the name of the pump that inserts into the foreign stream. (new in s-Server version 6.0.1) If you set transactional.id = 'auto', when a pump begins running, s-Server automatically sets transactional.id to '<fully_qualified_pump_name>_Pump', where <fully_qualified_pump_name> is the name of the pump that instantiates the sink. |
|
"pump.name" | Kafka10 | Deprecated since version 6.0.1. Fully qualified name of the pump that will process rows for this foreign stream. You must set transactional.id in order to use this option. s-Server uses this setting to determine the mode in which the pump instance itself is running (Leader or Follower) when you configure the Kafka adapter to run in High Availability (HA) mode. The pump.name needs to be fully qualified pump name of the format:<catalog_name>.<schema_name>.<pump_name> For example:'LOCALDB.mySchema.ProcessedOrdersPump' |
|
"linger.ms" | All | In cases where batch.size has not been reached, number of milliseconds that the Kafka producer will wait before batching sends . Defaults to '100', in milliseconds) | |
"batch.size" | All | Number of messages sent as a batch. Defaults to '1000' | |
"kafka.producer.config" | Kafka10 | Lets you specify the name of a properties file that contains a set of Kafka producer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the producer will invoke. Kafka offers a wide range of config properties. For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#producerconfigs. NOTE: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "bootstrap.servers". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime. |
|
"security.protocol" | Kafka10 | Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL | |
"transaction.timeout.ms" | Kafka10 | The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. | |
HA_ROLLOVER_TIMEOUT | Kafka10 | Time in milliseconds. Defaults to 5000. You must set transactional.id and pump.name in order to use this option. When the pump is configured to run in High Availability mode, and the pump is running as a "Follower", it waits for this amount of time for a lack of commits from the "Leader". After the timeout, the "Follower" attempts to takeover as the "Leader". There may be multiple follower instances running in a cluster. Only one of these followers succeeds to be designated as the new "Leader". All other pump instances using the same transactional.id continue "following". If the earlier "Leader" was still alive when one of the followers took over as the new "Leader", the earlier leader demotes itself as the "Follower" if possible. |
|
COMMIT_METADATA_COLUMN_NAME | Kafka10 | The name of the column that contains watermark values - default SQLSTREAM_EGRESS_WATERMARK. | |
SQLSTREAM_EGRESS_WATERMARK_SORT_FUNCTION | Kafka10 | If the SQLSTREAM_EGRESS_WATERMARK egress column is being used, the value chosen depends on the value of this option. Possible values are:
|
|
HEADERS_COLUMNS | Kafka10 | Deprecated - please use HEADERS_COLUMN_LIST | |
HEADERS_COLUMN_LIST | Kafka10 | Comma-separated list of foreign stream columns that will be mapped as outgoing headers, rather than to the record value itself. See Writing Headers to Kafka based on Column Data | |
OPTIONS_QUERY | All | Lets you query a table to update adapter options at runtime. You can use this, for example, to set the "bootstraop.servers" option from a table , as in select lastOffset as "bootstrap.servers" from TEST.kafka_write_options; | |
POLL_TIMEOUT | Legacy | This option specifies the timeout value in milliseconds to be passed as a parameter to the KafkaConsumer.poll() API call. The default is 100ms. | |
PORT | Legacy | Deprecated option. From s-Server 6.0.0, port numbers are specified in the SEED_BROKERS option. | |
"producer.type" | Legacy | This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are 'async' (asynchronous) and 'sync' (synchronous). Default 'sync' | |
"compression.codec" | Legacy | The compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy". Default 'none'. | |
"compressed.topics" | Legacy | If the compression codec is anything other than 'none', enable compression only for specified topics if any. If the list of compressed topics is empty, then enable the specified compression codec for all topics. | |
"message.send.max.retries" | Legacy | This property will cause the producer to automatically retry a failed send request. This property specifies the number of retries when such failures occur. Default 3. | |
"topic.metadata.refresh.interval.ms" | Legacy | The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available...). It will also poll regularly (default: every 10min so 600,000ms). If you set this to a negative value, metadata will only get refreshed on failure. Default 600*1000 (10 minutes). | |
"request.required.acks" | Legacy | How many other brokers must have committed the data to their log and acknowledged this to the leader? 0 means never wait; 1 means wait for the leader to acknowledge; -1 means wait for all replicas to acknowledge. Default 0 (never wait). | |
"queue.buffering.max.ms" | Legacy | Maximum time to buffer data when using async mode. Default 5000. | |
"queue.buffering.max.messages" | Legacy | The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped. Default 10000. | |
"queue.enqueue.timeout.ms" | Legacy | The amount of time to block before dropping messages when running in async mode and the buffer has reached queue.buffering.max.messages. If set to 0 events will be enqueued immediately or dropped if the queue is full (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send. Default -1. | |
"batch.num.messages" | Legacy | The number of messages to send in one batch when using async mode. The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached. Default 200. | |
"send.buffer.bytes" | Legacy | Socket write buffer size. Default 100x1024. | |
"SHARD_ID" | kafka10 | This optional parameter is used in scaling, when you are running multiple shards of a pipeline. It uniquely identifies the current shard. When writing metadata/watermarks to kafka, the SHARD_ID is added as a prefix to the SQLSTREAM_POSITION_KEY. It is also included in the automatically generated transaction id. When creating a restartable pipeline, this SHARD_ID is expected to be the same as the SHARD_ID of the source foreign stream. |
You can also enable security for the Kakfa10 plugin by configuring a truststore, keystore, and password for the Kafka broker used by s-Server. Because these options involve storing passwords, you should implement these using properties files, so the passwords are not visible in the SQL.
The SQL/MED framework used by s-Server supports loading options through JNDI .properties files. SQLstream s-Server looks for these .properties files in $SQLSTREAM_HOME/plugin/jndi/. See Using Property Files. You can also use the "kafka.consumer.config" or "kafka.producer.config" foreign stream options to reference a Kafka properties file that contains these or other kafka consumer / producer properties.
So we recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using one of the following .properties files:
Option Name | Adapter version | Description |
---|---|---|
"ssl.truststore.location" | Kafka10 | The location of the trust store file. |
"ssl.truststore.password" | Kafka10 | The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled. |
"ssl.keystore.location" | Kafka10 | The location of the key store file. This is optional for client and can be used for two-way authentication for client. |
"ssl.keystore.password" | Kafka10 | The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured. |
"ssl.key.password" | Kafka10 | The password of the private key in the key store file. This is optional for client. |
When writing rows out of s-Server to a Kafka topic, you can specify 1) partition and 2) key by including egress columns named, respectively, SQLSTREAM_EGRESS_KAFKA_PARTITION and SQLSTREAM_EGRESS_KAFKA_KEY. Data will be written as a message to the indicated partition in the topic, and SQLSTREAM_EGRESS_KAFKA_KEY will serve as the first part of the key-value pair that constitutes a Kafka message in Kafka.
If you do not include a SQLSTREAM_EGRESS_KAFKA_PARTITION column in the foreign stream, Kafka will assign a partition number to the message being published. If you have included a SQLSTREAM_EGRESS_KAFKA_KEY column, partition number is determined by hashing the SQLSTREAM_EGRESS_KAFKA_KEY column. If you have not included the SQLSTREAM_EGRESS_KAFKA_KEY column, then Kafka assigns partition numbers by a "round robin" method.
If you do not include a SQLSTREAM_EGRESS_KAFKA_KEY column in the foreign stream, (key, value) will be (null,value) where value is the serialized row using the FORMATTER option. Our KAFKA10 plugin excludes the columns SQLSTREAM_EGRESS_KAFKA_KEY & SQLSTREAM_EGRESS_KAFKA_PARTITION columns from getting serialized in values.
As of version 6.0.0, s-Server writes messages to Kafka atomically. A multi-step operation (writing multiple records to a database as one transaction) is said to be atomic when it requires all the steps in the transaction to be completed successfully or else rolls back all changes. In a ten-row insertion, for example, either all 10 rows are inserted successfully or none at all.
In order to commit insertions atomically, our adapter makes use of a new Kafka broker that supports transactional semantics. As a result, in order to implement atomic commitments, you must be using Kafka v1.0 or later. When creating a foreign stream that connects to Kafka v1.0 or later, use the prebuilt server KAFKA10_SERVER in the foreign stream definition. (KAFKA_SERVER is the legacy Kafka plugin).
To implement atomic commitments:
You also need to use an OPTIONS_QUERY in the foreign stream, along the following lines:
CREATE OR REPLACE FOREIGN STREAM KafkaSchema.KafkaReaderStream
( "id" BIGINT,
"reported_at" TIMESTAMP,
"lat" DOUBLE,
"lon" DOUBLE,
"speed" INTEGER,
"bearing" INTEGER,
"driver_no" BIGINT,
"SQLSTREAM_EGRESS_KAFKA_TIMESTAMP" TIMESTAMP
)
SERVER KAFKA10_SERVER
OPTIONS
(topic 'buses',
"SEED_BROKERS" 'localhost:9092',
"STARTING_TIME" 'LATEST',
"PARSER" 'JSON',
"ROW_PATH" '$',
TRANSACTION_ROWTIME_LIMIT '60000',
character_encoding 'UTF-8',
"OPTIONS_QUERY" 'SELECT * FROM (VALUES(sys_boot.mgmt.watermark_timestamp(''localhost:9092'', ''buses'', ''buses_watermark'', '''',0) - INTERVAL ''6'' SECOND)) AS options(STARTING_TIME)')
;
At times, a pipeline may fail. In these cases, s-Server needs to be able to replay the entire pipeline from TRANSACTION_ROWTIME_LIMIT in order to avoid data loss. For high availability situations, we recommend creating duplicate pipelines. Duplicate pipelines will process data in the same order, but will not commit.
You set up duplicate pipelines using Kafka's transactional.id option. These pipelines read from redundant Kafka sessions in a single Kafka producer while ensuring that rows are processed only once. This setup provides a "hot-cold", high availability configuration. That is, when the "hot", or live, node fails, the "cold" node takes over. This configuration requires recovery and replay to rebuild the state (if any) before s-Server begins writing data to the Kafka sink.
To configure high availability mode in s-Server 6.0.0, you must identify:
To configure high availability in s-Server 6.0.1, you can set transaction.id to auto. If you set transaction.id = 'auto', when a pump begins running, s-Server automatically sets transactional.id to <fully_qualified_pump_name>_Pump, where <fully_qualified_pump_name> is the name of the pump that instantiates the sink.
When a single instance of a Kafka egress pipeline is run with the option transactional.id specified for the Kafka sink, the pipeline runs in transactional mode. If s-Server identifies two foreign streams/pipelines that use the same transactional.id, one foreign stream/pipeline is designated the leader and all others are designated followers. s-Server will actively read from the leader only. If the leader fails, one of the followers will be designated the leader, and so on.
If a follower catches up to the leader and the leader has not committed, the follower "sleeps" for a few seconds
If the follower does not see the leader commit, then the follow concludes that leader has probably failed. Follower then topples leader. I will then start committing rows.
Here, transactional.id functions as a "baton". This is what lets the follower declare itself to be leader. With three redundant pipelines running, you have one leader and two followers. If leader fails, one of the other two become leader.
Note: If you do not supply transactional.id as an option in the CREATE FOREIGN STREAM statement, transactional semantics are disabled.
In order to automatically rebalance of assigned partitions or implement exactly once semantics, s-Server relies on a Kafka v0.10.2 feature called Time Index. This feature provides the timestamps that you promote to rowtime, which serve as a natural watermark for the pipeline.
But what if you're working with an earlier version of Kafka, or have other trouble getting timestamps into your pipeline?
In these cases, you can still do automatic partition rebalancing and exactly once semantics, but you will need to create your own timestamps for each message. You can do so by building an index topic.
To do so, you set up a stream that uses the first values and last values of the topic's offset and applies a timestamp at build time. Using the offset range for the actual source topic, we read records from the source topic and return the timestamp from the index topic for all records (messages) in that offset range.
MESSAGE_TIMESTAMP | START_OFFSET | END_OFFSET |
---|---|---|
2018-01-01 00:00:01 | 64000 | 67903 |
2018-01-01 00:00:02 | 67904 | 68112 |
2018-01-01 00:00:04 | 68113 | 72892 |
For example:
CREATE OR REPLACE SCHEMA "IndexBuilder";
SET SCHEMA '"IndexBuilder"';
CREATE OR REPLACE FOREIGN STREAM "IndexBuilder"."OrdersTopicIndex" (
"PARTITION" INTEGER,
"MESSAGE_TIMESTAMP" TIMESTAMP,
"START_OFFSET" BIGINT,
"END_OFFSET" BIGINT
)
SERVER KAFKA10_SERVER
OPTIONS (
SEED_BROKERS 'localhost:9092,
TOPIC 'OrdersTopic_index',
"COMMIT_METADATA_COLUMN_NAME" 'END_OFFSET',
"transactional.id" 'OrdersTopic_index_builder',
"pump.name" 'LOCALDB.IndexBuilder.Index1Pump,
"TRANSACTION_ROWTIME_LIMIT" '1000' -- 1 second
FORMATTER 'CSV'
);
CREATE OR REPLACE PUMP "IndexBuilder"."Index1Pump" STOPPED AS
INSERT INTO "IndexBuilder"."OrdersTopicIndex"
SELECT STREAM * FROM "IndexBuilder".AggregationView;
Once the index topic is built, you can use it as follows:
CREATE OR REPLACE FOREIGN STREAM OrdersStream (
MESSAGE_TIMESTAMP TIMESTAMP,
...
ORDER_ID BIGINT,
...
)
SERVER KAFKA10_SERVER
OPTIONS (
SEED_BROKERS 'localhost:9092',
TOPIC 'OrdersTopic',
INDEX_TOPIC_NAME_SUFFIX '_index', -- use OrdersTopic_index to determine starting position
STARTING_TIME '2018-02-01 00:00:00',
...,
PARSER 'JSON'
);
The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.
FORMATTER=CSV
TOPOC=AggregatedData
"metadata.broker.list"=localhost:9092
FORMATTER=CSV
ROW_SEPARATOR=''
CHARACTER_ENCODING=UTF-8
SCHEMA_NAME=KAFKASOURCE
TABLE_NAME=KAFKAWRITER_STREAM
ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at, VARCHAR(2040) shift_no, VARCHAR(2040) trip_no, VARCHAR(2040) route_variant_id)
(new in s-Server version 6.0.1) Messages published to a Kafka topic can have associated headers. s-Server lets you both read and write these headers.
For Kafka10 Sinks, the KAFKA10 adapter supports an option called HEADERS_COLUMN_LIST. This is a comma-separated list of column names (case sensitive) that are turned into Kafka headers in the following form:
column_name1=column_value1,...,column_nameN=column_valueN
For example, the following stream specifies that headers will be written to Kafka using the columns prescribed and highway:
--this line creates a schema for use in the two examples that follow
CREATE OR REPLACE SCHEMA headers_example;
CREATE OR REPLACE FOREIGN STREAM headers_example.headers_egress
(
"id" BIGINT,
"reported_at" VARCHAR(32),
"speed" INTEGER,
"driver_no" BIGINT,
"prescribed" BOOLEAN,
"highway" VARCHAR(8)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
"FORMATTER" 'JSON',
"FORMATTER_INCLUDE_ROWTIME" 'false',
"bootstrap.servers" 'localhost:9092',
--option for writing headers
"HEADERS_COLUMN_LIST" 'prescribed,highway',
"TOPIC" 'headers_test'
);
Once you have written these headers to a Kafka message, you can read them by specifying a stream column called SQLSTREAM_PROV_KAFKA_HEADERS, defined as VARCHAR. (This column is known as a provenance column because it tells you something about the source of your data.)
Headers do not need to have been written by s-Server; the KAFKA10 adapter will read any headings present in a Kafka topic's messages. If there are no headers for the Kafka message, then the column SQLSTREAM_PROV_KAFKA_HEADERS returns null.
Limitations:
When you specify this column and select it in a query, it returns headers as key-value pairs, in serialized text format. If a Kafka message has 2 headers ("key1", "value1") & ("key2", "value2"), the value of SQLSTREAM_PROV_KAFKA_HEADERS will be: 'key1=value1<newline-character>key2=value2'
, where (<newline-character> is the actual newline character).
Each Kafka message may have an arbitrary number of headers (key-value pairs). You can use the Parser UDX to parse SQLSTREAM_PROV_KAFKA_HEADERS in a pipeline using PARSER = 'KV' as a setting for the UDX.
The following is an example of a KAFKA10 foreign stream with SQLSTREAM_PROV_KAFKA_HEADERS specified as a provenance column:
CREATE OR REPLACE FOREIGN STREAM headers_example.headers_ingest
(
"id" BIGINT,
"reported_at" TIMESTAMP,
"prescribed" BOOLEAN,
"highway" VARCHAR(8),
"SQLSTREAM_PROV_KAFKA_HEADERS" VARCHAR(256)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
"PARSER" 'JSON',
"ROW_PATH" '$',
"SEED_BROKERS" 'localhost:9092',
"STARTING_TIME" 'LATEST',
"isolation.level" 'read_uncommitted',
"TOPIC" 'headers_test'
);
For a stream with headers written by s-Server as "HEADERS_COLUMN_LIST" 'prescribed,highway'
(as we defined above, headers might look like the following. Note that you call SQLSTREAM_PROV_KAFKA_HEADERS using quotation marks. Also note that headers return with a line break.
SELECT STREAM "id", "SQLSTREAM_PROV_KAFKA_HEADERS" FROM headers_example.headers_ingest;
'id','SQLSTREAM_PROV_KAFKA_HEADERS'
'50115871536','prescribed=false
highway=MR549'
'346866848365','prescribed=false
highway=MR620'
'50116198282','prescribed=false
highway=MR184'
To use the Key Value parser to parse the resulting Kafka headers, you can use code along the following lines.
First, create a view that reads headers:
CREATE OR REPLACE VIEW header_kv_options
( "SEPARATOR"
)
AS
SELECT * FROM (VALUES(u&'\000a')); -- use newline character as the separator
Next, create a function that calls the Parser UDX:
CREATE OR REPLACE FUNCTION headerParser (
input cursor,
columnName varchar(256),
parserType varchar(256),
options cursor
)
--columns need to be declared with types that match
--data to be parsed
RETURNS TABLE(
"id" BIGINT, -- original columns
"reported_at" TIMESTAMP,
"prescribed" BOOLEAN, --columns extracted from the headers
"highway" VARCHAR(8)
)
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'class com.sqlstream.aspen.namespace.common.ParserUdx.parseColumn';
Finally, create a view that uses the function to parse header information:
CREATE OR REPLACE VIEW headers_exposed as
SELECT STREAM * FROM STREAM(headerParser(@
--uses the view defined above as input for the function.
CURSOR(SELECT stream * FROM headers_example.headers_ingest),@
SQLSTREAM_PROV_KAFKA_HEADERS ,@
'KV',@
CURSOR(select * from HEADER_KV_OPTIONS)));
This topic describes an example of setting up SQL to process a pipeline handling one partition. For each partition you can have N pipelines (often on separate instances of s-Server) listening to that partition, where N is your redundancy level. It assumes two Kafka topics have been set up:
Each pipeline will
In order to ensure all pipelines for the same partition output the same data, the code discards data for the first second's aggregation. This lets you restart an instance of s-Server running pipelines at any time without affecting results.
Results are written to the AggregatedData topic. One or more instances of s-Server will then read that AggregatedData topic, discarding duplicate rows. Aggregates are then rolled up and written to a stream.
CREATE OR REPLACE SCHEMA "kafkaAggregation";
SET SCHEMA '"kafkaAggregation"';
SET PATH '"kafkaAggregation"';
CREATE OR REPLACE SERVER "KafkaServer" TYPE 'KAFKA10'
FOREIGN DATA WRAPPER ECDA
CREATE OR REPLACE FOREIGN STREAM "KafkaPartitionedInputStream"
(KAFKA_OFFSET BIGINT NOT NULL,
"ts" TIMESTAMP NOT NULL,
"cardNumber" BIGINT NOT NULL,
"zipcode" CHAR(10) NOT NULL,
"transactionAmount" DOUBLE NOT NULL,
"recipientId" BIGINT NOT NULL,
"transactionId" BIGINT NOT NULL)
SERVER KAFKA10_SERVER
OPTIONS
(topic 'TransactionData',
"PARTITION" '1',
"SEED_BROKERS" 'localhost',
"PORT" '9092',
"STARTING_TIME" 'latest',
parser 'CSV',
character_encoding 'UTF-8'
);
CREATE OR REPLACE FOREIGN STREAM "KafkaOutputStream"
(
"ts" TIMESTAMP NOT NULL,
"partition" INT NOT NULL,
"zipcode" CHAR(5) NOT NULL,
"transactionTotal" DOUBLE NOT NULL,
"transactionCount" INT NOT NULL)
SERVER "KafkaServer"
OPTIONS
(topic 'AggregatedData',
"metadata.broker.list" 'localhost:9092',
parser 'CSV',
character_encoding 'UTF-8');
-- Source columns we're interested in
CREATE OR REPLACE VIEW "sourceData" AS
SELECT STREAM "ts" AS ROWTIME, "zipcode", "transactionAmount"
FROM "KafkaPartitionedInputStream";
CREATE OR REPLACE FUNCTION "getZipcode"(
​ inputRows CURSOR,
​ dsName VARCHAR(64),
​ tableName VARCHAR(128),
​ colName VARCHAR(128),
​ cacheSize INTEGER,
prefetchRows BOOLEAN,
fuzzyLookups BOOLEAN)
RETURNS TABLE(
​ inputRows.*,
​ "zipcode" CHAR(5))
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME '"TableLookup":com.sqlstream.plugin.tablelookup.TableLookup.lookup';
-- adorn with zipcode using in memory lookup if possible
CREATE OR REPLACE VIEW "transactionAndZipcode" AS
SELECT STREAM
FROM TABLE("getZipcode(CURSOR(SELECT STREAM * FROM "sourceData"), 'customerDb', 'address', 'recipientId', 10000000, true, false));
CREATE OR REPLACE VIEW "aggregatedData" AS
SELECT STREAM "zipcode", SUM("transactionAmount") AS "transactionTotal", COUNT(*) AS "transactionCount"
FROM "sourceData"
GROUP BY FLOOR((("sourceData".ROWTIME - timestamp '1970-01-01 00:00:00') second)/10 to second), "zipcode";
-- Creates output pump
-- Does not output first group of rows (all rows in group will have same rowtime)
-- as this group may be partial if restarting after failure.
CREATE OR REPLACE PUMP "aggregatedDataOutputPump" STOPPED AS
INSERT INTO "kafkaAgg1a"."KafkaOutputStream"
SELECT STREAM ROWTIME AS "ts", 1 AS "partition", "zipcode", "transactionTotal", "transactionCount"
FROM (SELECT STREAM *, a.ROWTIME as thistime, FIRST_VALUE(a.ROWTIME) OVER (ROWS UNBOUNDED PRECEDING) as firsttime from "kafkaAgg1a"."aggregatedData"a) b
where firsttime <> thistime;
ALTER PUMP "aggregatedDataOutputPump" START;
You can use the Options Query to read from a configuration table. You can then use this table to update adapter options at SQL start time. Usually, this will be when you start application pumps.
You can use this, for example, to set the STARTING_OFFSET option from a table that contains the last offset.
You can create a configuration table (or view) and read from it using the OPTIONS_QUERY option (not property).
The Options Query returns one row. Table column names should be set for property names and row value should be set for property value.
If you set the OPTIONS_QUERY property to
select * from conf
and that query returns 1 row with 1 column called TOPIC containing the value "topic1", then the adapter is configured with the TOPIC property set to "topic1". Each time the adapter runs its configuration gets dynamically computed from the conf table. You can also use views for the OPTIONS_QUERY.
CREATE OR REPLACE FOREIGN STREAM testOut (
KAFKA_OFFSET BIGINT NOT NULL,
line VARCHAR(4096))
SERVER KAFKASERVER
OPTIONS (TOPIC 'testThroughput', OPTIONS_QUERY 'select lastOffset as STARTING_OFFSET from TEST.committedOffset');
OPTIONS_QUERY is supported for all sources, sinks formatters and parsers.
To ensure fault tolerance, you can set up multiple instances of s-Server to listen to each partition, using a program like Puppet to start servers and pipelines. You can run more than one pipeline on each version of s-Server, but you cannot have multiple pipelines on the same server listening to the same partition.
The diagram below shows multiple instances of s-Server listening to Kafka partitions. This prevents data loss if an s-Server instance goes down.
After aggregating data in multiple instances of s-Server, you can create one or more s-Server pumps in order to write to Kafka topics, and add and remove these pumps using the ALTER PUMP command.
Every time you use an adapter, you need to implement it within an s-Server schema. The following code first creates a schema and implements the Kafka ECDA adapter.
CREATE OR REPLACE SCHEMA "kafkaAggregation";
SET SCHEMA '"kafkaAggregation"';
SET PATH '"kafkaAggregation"';
CREATE OR REPLACE SERVER "KafkaServer" TYPE 'KAFKA'
FOREIGN DATA WRAPPER ECDA;
CREATE OR REPLACE FOREIGN STREAM "KafkaAggregatedData"
(offset LONG NOT NULL,
"ts" TIMESTAMP NOT NULL,
"PARTITION" INT NOT NULL,
"zipcode" CHAR(5) NOT NULL,
"transactionTotal" DECIMAL(18,2) NOT NULL,
"transactionCount" INT NOT NULL)
SERVER "KafkaServer"
OPTIONS
(topic: 'AggregatedData',
seed_brokers: 'localhost',
starting_time: 'latest',
parser 'CSV',
character_encoding 'UTF-8'
);
The code below creates a stream with three columns, zipcode, transactionTotal, and transactionCount. This stream will be used to pump data from a Kafka topic.
CREATE OR REPLACE STREAM "AggregatedData"(
"zipcode" CHAR(5) NOT NULL,
"transactionTotal" DECIMAL(18,2) NOT NULL,
"transactionCount" INT NOT NULL);
The next code block creates a view on the foreign stream KafkaAggregatedData, ordered by timestamp ("ts") and selecting the columns PARTITION, zipcode, transactionTotal, and transactionCount.
CREATE OR REPLACE VIEW "AggregatedDataWithRowTime" AS
SELECT STREAM "ts" AS ROWTIME, "PARTITION", "zipcode", "transactionTotal", "transactionCount"
FROM "KafkaAggregatedData"
ORDER BY "ts" WITHIN INTERVAL '2' SECOND;
The next code block uses a WHERE statement to identify and discard duplicate rows.
CREATE OR REPLACE VIEW "AggregatedData" AS
SELECT STREAM "partition", "zipcode", "transactionTotal", "transactionCount"
FROM (SELECT STREAM *,
COUNT(*) OVER (PARTITION BY "partition", "zipcode" RANGE INTERVAL '0' SECOND PRECEDING) AS c
FROM "AggregatedDataWithRowTime") AS dd
WHERE c = 1;
The next code block pumps the the columns zipcode, the total of column transactionTotal and the total of column transactionCount.
CREATE OR REPLACE PUMP "rollupPump" STOPPED AS
INSERT INTO "AggregatedData"
SELECT STREAM "zipcode", sum("transactionTotal"), sum("transactionCount")
FROM "AggregatedDataDeDuped"
GROUP BY "AggregatedDataDeDuped".ROWTIME, "zipcode";
You can then start and stop the pump using the ALTER PUMP command:
ALTER PUMP "rollupPump" START;
ALTER PUMP "rollupPump" STOP;
See the topic ALTER PUMP in the SQLstream Streaming SQL Reference Guide for more details on starting and stopping pumps.
Once you have read in data from Kafka and created a view of this data, as in Using the Kafka ECDA for Fault Tolerance, you can use the TableLookup UDX to prefetch a partitioned portion of a database to form a preloaded cache. The code below prefetches zip codes.
CREATE OR REPLACE FUNCTION "getZipcode"(
inputRows CURSOR,
dsName VARCHAR(64),
tableName VARCHAR(128),
colName VARCHAR(128),
cacheSize INTEGER,
prefetchRows BOOLEAN,
fuzzyLookups BOOLEAN)
RETURNS TABLE(
inputRows.*,
"zipcode" CHAR(5))
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME '"TableLookup":com.sqlstream.plugin.tablelookup.TableLookup.lookup';
You can then use the function to prefetch zip code from a partitioned portion of a database:
CREATE OR REPLACE VIEW "transactionAndZipcode" AS
SELECT STREAM
FROM TABLE("getZipcode(CURSOR(SELECT STREAM * FROM "sourceData"), 'customerDb', 'address', 'recipientId', 10000000, true, false));
The source partition need not be a kafka partition - it can be whatever kind of partitioning is supported by a Kafka or non-Kafka plugin (such as File-VFS).
Please see Scaling in File-VFS for more information about partitioning in the File-VFS plugin and also see Using Exactly Once with VFS or S3 as a Source for a full example.
For Kafka to Kafka, see Using Exactly Once with Kafka as a Source.
If the source plugin is File-VFS, Kafka plugin supports storing metadata in following ways:
Kafka stores watermarks in this way: sourcePartitionId, sourcePartitionWatermark - see the glossary term shard. The File-VFS source foreign stream definition could defined as below:
Working with a single source partition and shard id as empty:
SHARD_ID ''
INGRESS_ASSIGNED_PARTITIONS ':'
INGRESS_NUM_PARTITIONS 1
Working with 2 source partitions and shard id not empty. Also, only 1 partition is assigned to 1 shard.
SHARD_ID '0'
INGRESS_ASSIGNED_PARTITIONS '0'
INGRESS_NUM_PARTITIONS 2
SHARD_ID '1'
INGRESS_ASSIGNED_PARTITIONS '1'
INGRESS_NUM_PARTITIONS 2
Working with source partitions and shard id not empty. Also, multiple partitions are assigned to 1 shard.
SHARD_ID '0'
INGRESS_ASSIGNED_PARTITIONS '0,1'
INGRESS_NUM_PARTITIONS 4
SHARD_ID '1'
INGRESS_ASSIGNED_PARTITIONS '2,3'
INGRESS_NUM_PARTITIONS 4
The Kafka sink foreign stream stores metadata in the output sink topic in the this way: partitionId, watermark.
For case 1 above, when shard id is empty and there is only a single partition, the metadata would be stored as:
0,watermark_for_0_source_partition
If the shardid is not empty, it is appended to the metadata partitionId alongwith '' character. So, the metadata becomes shardId_partitionIdAssingedToThisShard, watermark. For case 2 above, when shard id is not empty and there are multiple source partitions but only 1 partition is assigned to each shard, the metadata would be stored as:
0_0,watermark_for_0_source_partition
1_1,watermark_for_1_source_partition
For case 3 above, when shard id is not empty and there are multiple source partitions assigned to each shard, the metadata would be stored as:
0_0,watermark_for_0_source_partition
0_1,watermark_for_1_source_partition
1_2,watermark_for_2_source_partition
1_3,watermark_for_3_source_partition
A user can fetch the metadata using any of the available Kafka Watermark Functions.
The Kafka10 plugin supplies the following functions to retrieve source watermark information that has been stored in Kafka:
Please see Storing Watermarks in Kafka for Exactly Once for more information.
For each watermark type there are two functions, the name of the second ends with _with_config_file and works with secure kafka by accepting an additional parameter - the name of a Kafka (SSL) properties file.
This function returns watermark offsets to be used during replay/recovery from the source partitions of a pipeline. The topic passed as a parameter, however, is the name of the sink topic. As transactions are committed to the sink, the commit metadata saves the commit timestamp and, optionally, the offset for the source topic. This function returns these offsets for each partition of the topic
Note: The consumer_group, for now, should be '
CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_offsets(
bootstrap_servers VARCHAR(128),
sink_topic VARCHAR(128),
consumer_group VARCHAR(128)
)
RETURNS TABLE (
"TOPIC" VARCHAR(128),
"PARTITION" INTEGER,
"OFFSET" BIGINT
)
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'sys_boot.sys_boot.kafka10:com.sqlstream.aspen.namespace.kafka.KafkaWatermark.getWatermarkOffsetsFromIndexTopic';
Same as watermark_offsets but with added support for secured kafka.
CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_offsets_from_index_topic_with_config_file(
bootstrap_servers VARCHAR(1024),
topic VARCHAR(128),
consumer_group VARCHAR(128),
config_file_path VARCHAR(128)
)
RETURNS TABLE (
"KAFKA_TOPIC" VARCHAR(128),
"KAFKA_PARTITION" INTEGER,
"KAFKA_OFFSET" BIGINT
)
...
This function returns a timestamp value, which is the maximum timestamp committed across all partitions in the topic for the input consumer group. Typically, this would be the ROWTIME of the last row committed in the transaction.
CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_timestamp(
bootstrap_servers VARCHAR(1024),
topic VARCHAR(128),
consumer_group_prefix VARCHAR(128),
partition_id_range VARCHAR(128),
max_num_partitions INTEGER
)
RETURNS TIMESTAMP
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'sys_boot.sys_boot.kafka10:com.sqlstream.aspen.namespace.kafka.KafkaWatermark.getWatermarkTimestamp';
For examples of using this function please see:
Same as watermark_offsets, with added support for secured kafka.
CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_timestamp_with_config_file(
bootstrap_servers VARCHAR(1024),
topic VARCHAR(128),
consumer_group_prefix VARCHAR(128),
partition_id_range VARCHAR(128),
max_num_partitions INTEGER,
config_file_path VARCHAR(128)
)
RETURNS TIMESTAMP
...
This function returns watermark string to be used during replay/recovery from the source partitions of a pipeline. The topic passed as a parameter, however, is the name of the sink topic. As data is committed to the sink Kafka topic, the plugin saves the watermark for the source foreign stream. This function returns these watermarks which can be used in the source to ensure exactly-once-semantics. This function returns a string, delimited by newline ('\n') characters. Each line of the string represents the entity `consumer_group, watermarkString'. Please refer kafka sink metadata storage for sample scenarios.
CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_string(
bootstrap_servers VARCHAR(1024),
topic VARCHAR(128),
consumer_group_prefix VARCHAR(128),
partition_id_range VARCHAR(128),
max_num_partitions INTEGER
)
RETURNS VARCHAR(512)
...
For example, if 2 shards are running on your cluster:
SHARD_ID '0'
INGRESS_ASSIGNED_PARTITIONS '0,1'
INGRESS_NUM_PARTITIONS 4
SHARD_ID '1'
INGRESS_ASSIGNED_PARTITIONS '2,3'
INGRESS_NUM_PARTITIONS 4
The metadata returned from invoking watermark_string would be:
0_0,watermark_for_0_source_partition
0_1,watermark_for_1_source_partition
1_2,watermark_for_2_source_partition
1_3,watermark_for_3_source_partition
Here is the sample usage for invoking a UDX function:
CREATE OR REPLACE VIEW "sampleschema"."START_POSITION_QUERY" AS
SELECT * FROM (VALUES(sys_boot.mgmt.watermark_string('bootstrap_servers', 'topic_name', 'consumer_group_prefix','assigned_partitions',total_partitions))) AS options(STARTING_POSITION);
In the above query, please use consumer_groupprefix as 'shardId'+'' i.e. "{{shardid}}_" if shard_id is not empty. If shard_id is empty, then consumer_group_prefix should be set as empty('') when running the above queries.
For a bigger example see Reading from File-VFS in the Exactly Once topic in this guide.
Same as watermark_string, with added support for secured kafka.
CREATE OR REPLACE FUNCTION sys_boot.mgmt.watermark_string_with_config_file(
bootstrap_servers VARCHAR(1024),
topic VARCHAR(128),
consumer_group_prefix VARCHAR(128),
partition_id_range VARCHAR(128),
max_num_partitions INTEGER,
config_file_path VARCHAR(128)
)
RETURNS VARCHAR(512)
...
Here is the sample usage for invoking a UDX function, when working with a secure kafka:
CREATE OR REPLACE VIEW "sampleschema"."START_POSITION_QUERY" AS
SELECT * FROM (VALUES(sys_boot.mgmt.watermark_string_with_config_file('bootstrap_servers', 'topic_name', 'consumer_group_prefix','assigned_partitions',total_partitions, 'config_file_path'))) AS options(STARTING_POSITION);
This function returns the HA mode (Leader/Follower) of a pump.
CREATE OR REPLACE FUNCTION sys_boot.mgmt.pump_mode(
pump_name VARCHAR(128)
)
RETURNS VARCHAR(256)
LANGUAGE JAVA
PARAMETER STYLE SYSTEM DEFINED JAVA
NO SQL
EXTERNAL NAME 'sys_boot.sys_boot.kafka10:com.sqlstream.aspen.namespace.kafka.KafkaWatermark.getMode';