Using s-Server, you can read from and write to Azure Event Hubs. SQLstream uses the Kafka API to interact with Event Hubs - see Event Hubs for Kafka.
For more detail on SQLstream's Kafka plugin, please see Integrating Kafka. This section concentrates on the specifics and limitations of the Event Hub API.
Topics covered in this section include:
Azure Event Hubs does not provide transaction support, so exactly once semantics cannot be implemented - because we cannot commit offsets as we do with Kafka. Instead we store offsets into a separate "Index Topic" so that we can perform at least once semantics See Using an Index Topic below.
When using Azure Event Hubs for Kafka requires the TLS-encryption (as all data in transit with Azure Event Hubs is TLS encrypted). It can be done specifying the SASL_SSL option in your configuration file. Find example below that uses Shared access signature (SAS).
The security parameters required by Azure Event Hub will be defined in a properties file. This file can be referenced by the *"kafka.consumer.config" option (when reading from Event Hubs) or the "kafka.producer.config" option (when writing to Event Hubs). Other properties may be included in this file.
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=ewqsdvasdasdfasdf+s=";
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
For your convenience we include the Kafka options here. Just remember that transactions are not supported, and please do not use the "legacy" Kafka options - use Kafka10 plugin options only.
In reading from Kafka, you can declare provenance columns. These return metadata for the Kafka topic from which you are reading.
These are as follows:
Data Type | Name in s-Server 6.0.0 | Name in s-Server 6.0.1+ | Kafka version | Description |
---|---|---|---|---|
VARCHAR(256) | KAFKA_TOPIC | SQLSTREAM_PROV_KAFKA_TOPIC | Kafka v.0.10.2 or later | Returns name of Kafka topic. |
INTEGER | KAFKA_PARTITION | SQLSTREAM_PROV_KAFKA_PARTITION | Kafka v.0.10.2 or later | Returns value of current Kafka partition. |
INTEGER | PARTITION | SQLSTREAM_PROV_KAFKA_PARTITION | Kafka v.0.8.2 | Returns value of current Kafka partition. |
BIGINT | KAFKA_OFFSET | SQLSTREAM_PROV_KAFKA_OFFSET | Kafka v.0.10.2 or later | Returns value of current Kafka offset. |
BIGINT | OFFSET | SQLSTREAM_PROV_KAFKA_OFFSET | Kafka v.0.8.2 | Returns value of current Kafka offset. |
TIMESTAMP | KAFKA_TIMESTAMP | SQLSTREAM_PROV_KAFKA_TIMESTAMP | Kafka v.0.10.2 or later | Returns current Kafka_timestamp. |
VARBINARY(256) | KAFKA_KEY | SQLSTREAM_PROV_KAFKA_KEY | Kafka v.0.10.2 or later | Returns key value for message. |
BINARY(32) | PAYLOAD_PREFIX | SQLSTREAM_PROV_KAFKA_PAYLOAD_PREFIX | Kafka v.0.10.2 or later | Returns key value for message. |
VARCHAR(1000) | N/A | SQLSTREAM_PROV_KAFKA_HEADERS | Kafka v.0.10.2 or later | Returns any headers for Kafka message topics as key-value pairs in serialized text format. See Reading and Parsing Headers. |
Some of the following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation/#consumerconfigs. Where appropriate, information in this table is drawn from this page.
Options shown in lower case are case sensitive; they must be lowercase and double-quoted. Options shown in upper case may be entered either unquoted in any case, or double-quoted in upper case. So TOPIC, Topic and topic are all valid; "TOPIC" is valid but "topic" is not.
Some options may not apply to all versions of the Kafka plugin. Each option below is marked as 'Kafka10', 'Legacy' or 'All':
Option | Adapter version | Description |
---|---|---|
TOPIC | All | Required. Kafka Topic. You can use a regular expression as a "topic wild card". (This is not supported by legacy versions of the adapter.) |
SEED_BROKERS | All | A comma separated list of broker identifiers in the format _<broker_hostname>:<port>. For legacy adapter, this is a comma separated list of broker hosts. Defaults to "localhost". |
PARTITION | All | Partition number to read from. If reading from a kafka topic with multiple partitions and PARTITION is omitted or blank all partitions will be read from. You can specify a single partition with PARTITION or a range with PARTITION -(Range needs two partitions - eg 1-3 - and is inclusive.) Note: Partition numbers are 0 based. |
PORT | Legacy | Deprecated since 6.0.1 Port for Kafka seed brokers |
STARTING_TIME | All | Either EARLIEST, LATEST, or a timestamp in the format 'yyyy-MM-dd HH:mm:ss.SSS', such as '2018-02-01 22:23:45:892'. Default LATEST. When STARTING_TIME is a timestamp, the Kafka adapter "seeks" to offsets within all topic partitions where the message timestamp for all subsequent messages is greater than or equal to the specified STARTING_TIME. Requires Kafka v0.10.2 or later. For the legacy Kafka adapter, options are EARLIEST or LATEST. |
STARTING_OFFSET | All | Where to start reading from (default is -1, the oldest offset) as a long int representing the physical offset within a partition |
INDEX_TOPIC_NAME | Kafka10 | This option specifies the name of the index topic to be used for mapping message offsets to timestamps. For more details, refer to Building and Using an Index Topic in the Reading from Kafka topic of the s-Server Integration Guide. Index topic may be created on a separate Kafka cluster. |
INDEX_TOPIC_SEED_BROKERS | Kafka10 | The broker(s) where the index topic referenced by INDEX_TOPIC_NAME is managed |
MAX_POLL_RECORDS | Kafka10 | Maximum number of records to be polled (fetched) through the new KafkaConsumer.poll() API call. For best throughput during replay, this needs to be set such that you get a "page" full (1MB is the default) of kafka messages from each partition of the topic(s). It can be roughly calculated as: (numPartitions * 1 MB) / typicalMessageSize |
PARTITION_OFFSET_QUERY | Kafka10 | This is a SQL query that fetches starting offsets, Kafka partition and Kafka topic for all topic partitions. For both the legacy Kafka adapter and the Kafka10 adapter, you can use a query along the following lines: PARTITION_OFFSET_QUERY 'SELECT "KAFKA_PARTITION","KAFKA_TOPIC","KAFKA_OFFSET" FROM from stored_offsets' PARTITION should be of type INTEGER. OFFSET should be of type BIGINT. Any partition for which the query does not return anything will either use STARTING_OFFSET or STARTING_TIME to determine where to start. |
CLIENT_ID | All | For the Kafka10 adapter, it is vital to understand that this is the consumer group id (so is used to set consumer group property "group.id"). Client key for Yammer metrics. CLIENT_ID defaults to client{TOPIC} or CLIENT{TOPIC}{PARTITION} if PARTITION is specified. CLIENT_ID and METRICS_PER_PARTITION affect Kafka Yammer metrics reporting. CLIENT_ID does not apply unless METRICS_PER_PARTITION is set to true. See http://docs.confluent.io/1.0/kafka/monitoring.html for more information on Kafka Yammer metrics. |
OPTIONS_QUERY | All | Lets you query a table to update adapter options at runtime. You can use this, for example, to set the STARTING_OFFSET option from a table that contains the last offset, as in select lastOffset as STARTING_OFFSET from TEST.committedOffset; |
SCHEMA_ID | All | If set >= 0, the reader will treat the first byte of data as magic byte, and the next 4 bytes as a schema ID, and will discard any record where the first byte does not match the SCHEMA_ID. Default -1. |
PARSER_PAYLOAD_OFFSET | Kafka10 | The number of bytes to skip at the start of the record to get the value - in case the data is offset (by a schema ID or some other fixed-length data). Default 0. |
"kafka.consumer.config" | Kafka10 | Lets you specify the name of a properties file that contains a set of Kafka consumer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the consumer will invoke. Kafka offers a wide range of config properties. For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#newconsumerconfigs Note: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "SEED_BROKERS". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime. |
"isolation.level" | Kafka10 | Lets you specify whether s-Server should read all Kafka messages or only committed messages. Options are read_uncommitted, or read_committed This option lets you account for transactions, a Kafka 0.11.0 feature whereby applications can write to multiple topics and partitions atomically. To use atomic commitments, you need to configure the Kafka adapter to only read committed data--this is, read_committed. |
METRICS_PER_PARTITION | Legacy | True or False. If METRICS_PER_PARTITION is false, then CLIENT_ID will be used as the client key for all yammer metrics reporting for that adapter. If METRICS_PER_PARTITION is true, then the actual partition number will be appended to each to each client_id (and finer grained metrics will be reported). |
FETCH_SIZE | Legacy | Fetch size. Defaults to 1000000. |
You can also enable security for the Kakfa10 plugin by configuring a truststore, keystore, and password for the Kafka broker used by s-Server. Because these options involve storing passwords, you should implement these using properties files, so the passwords are not visible in the SQL.
The SQL/MED framework used by s-Server supports loading options through JNDI .properties files. SQLstream s-Server looks for these .properties files in $SQLSTREAM_HOME/plugin/jndi/. See Using Property Files. You can also use the "kafka.consumer.config" or "kafka.producer.config" foreign stream options to reference a Kafka properties file that contains these or other kafka consumer / producer properties.
So we recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using one of the following .properties files:
Option Name | Adapter version | Description |
---|---|---|
"ssl.truststore.location" | Kafka10 | The location of the trust store file. |
"ssl.truststore.password" | Kafka10 | The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled. |
"ssl.keystore.location" | Kafka10 | The location of the key store file. This is optional for client and can be used for two-way authentication for client. |
"ssl.keystore.password" | Kafka10 | The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured. |
"ssl.key.password" | Kafka10 | The password of the private key in the key store file. This is optional for client. |
CREATE OR REPLACE FOREIGN STREAM "schema"."fs"
(
"SQLSTREAM_PROV_KAFKA_TOPIC" VARCHAR(128)
,"SQLSTREAM_PROV_KAFKA_PARTITION" INTEGER
,"SQLSTREAM_PROV_KAFKA_TIMESTAMP" TIMESTAMP
,"SQLSTREAM_PROV_KAFKA_OFFSET" BIGINT
,"COL0" VARCHAR(4000)
,"unparsed_attributes" VARCHAR(4096)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
"PARTITION" '0-2',
"PARSER" 'CSV',
"CHARACTER_ENCODING" 'UTF-8',
"SEPARATOR" ',',
"SEED_BROKERS" '<namespace>.servicebus.windows.net:9093',
"STARTING_TIME" 'EARLIEST',
"MAX_POLL_RECORDS" '100',
"BUFFER_SIZE" '1048576',
"FETCH_SIZE" '1000000',
"TOPIC" 'topic-03',
"kafka.consumer.config" '/home/sqlstream/kafka.consumer.config',
"CONSUMER_GROUP_ID" 'group-id-01'
);
select stream * from "schema"."fs";
Below is an example where we are trying to:
CREATE OR REPLACE VIEW "schema".KAFKA_OFFSET_PARTITION AS SELECT * FROM TABLE (sys_boot.mgmt.watermark_offsets_from_index_topic_with_config_file('<namespace>.windows.net:9093', 'topic-03-index', 'group-id-03', '/home/sqlstream/kafka.consumer.config'));
CREATE OR REPLACE FOREIGN STREAM "schema"."fs"
(
"SQLSTREAM_PROV_KAFKA_TOPIC" VARCHAR(128),
"SQLSTREAM_PROV_KAFKA_PARTITION" INTEGER,
"SQLSTREAM_PROV_KAFKA_TIMESTAMP" TIMESTAMP,
"SQLSTREAM_PROV_KAFKA_OFFSET" BIGINT,
"COL0" VARCHAR(4000),
"unparsed_attributes" VARCHAR(4096)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
"PARTITION" '0-2',
"PARSER" 'CSV',
"CHARACTER_ENCODING" 'UTF-8',
"SEPARATOR" ',',
"SEED_BROKERS" '<namespace>.servicebus.windows.net:9093',
"TOPIC" 'topic-03',
"kafka.consumer.config" '/home/sqlstream/kafka.consumer.config',
"CONSUMER_GROUP_ID" 'group-id-03',
"PARTITION_OFFSET_QUERY" 'SELECT * FROM "schema".KAFKA_OFFSET_PARTITION'
);
select stream * from "schema"."fs";
For your convenience we include the Kafka options here. Just remember that transactions are not supported, and please do not use the "legacy" Kafka options - use Kafka10 plugin options only.
These special egress columns are supported by the KAFKA10 plugin. Data arriving in these columns is not included in the outgoing Kafka message payload; instead they are used to set other message variables.
Name | Data Type | Description | Name in s-Server 6.0.0 |
---|---|---|---|
SQLSTREAM_EGRESS_KAFKA_PARTITION | INTEGER | This column is used to determine to which partition the message will be written. NOTE: SQLstream uses this approach and does not support use of the Kafka producer property "partitioner.class" |
KAFKA_PARTITION |
SQLSTREAM_EGRESS_KAFKA_TIMESTAMP | TIMESTAMP | This column is used to set the Kafka message timestamp | KAFKA_TIMESTAMP |
SQLSTREAM_EGRESS_KAFKA_KEY | VARBINARY(256) | This column is used as the message key | KAFKA_KEY |
SQLSTREAM_EGRESS_WATERMARK | VARCHAR(256) | This column is used as a watermark, and is saved when committing to the sink Kafka topic. The developer should set it to whatever watermark format is supported by the source plugin. The format of the stored watermark string is as follows: <CommitRowtimeInMillisFromEpoch>,<watermark_column_value>.
|
- |
Some of the following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation/#producerconfigs. Where appropriate, information in this table is drawn from this page.
Options shown in lower case are case sensitive; they must be lowercase and double-quoted. Options shown in upper case may be entered either unquoted in any case, or double-quoted in upper case. So TOPIC, Topic and topic are all valid; "TOPIC" is valid but "topic" is not.
Some options may not apply to all versions of the Kafka plugin. Each option below is marked as 'Kafka10', 'Legacy' or 'All':
Option Name | Adapter version | Description | |
---|---|---|---|
TOPIC | All | Kafka topic | |
"bootstrap.servers" | Kafka10 | hostname:port[,hostname:port] of the Kafka broker(s). Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers. | |
SEED_BROKERS | Legacy | A comma separated list of broker identifiers in the format "<broker_host_name>:<port>". Defaults to "localhost". | |
"metadata.broker.list" | Legacy | hostname:port of the Kafka broker. Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers. | |
"key.serializer" | Kafka10 | Names a serializer class for keys. If no class is given, Kafka uses value.serializer. | |
"key.serializer.class" | Legacy | Names a serializer class for keys. If no class is given, Kafka uses serializer.class. | |
"value.serializer" | Kafka10 | Names a serializer class for values. | |
"serializer.class" | Legacy | Names a serializer class for values. The default encoder takes a byte[] and returns the same byte[]. | |
"partitioner.class" | All | Fully qualified Java classname of Kafka partitioner. Defaults to com.sqlstream.aspen.namespace.kafka.KafkaOutputSink$RoundRobinPartitioner | |
"compression.type" | Kafka10 | If desired, specifies the final compression type for a given topic. Defaults to 'none'. Possible values: 'snappy', 'gzip' | |
"retry.backoff.ms" | All | Producers refresh metadata to see if a new leader has been elected. This option specifies the amount of time to wait before refreshing. | |
"request.timeout.ms" | All | When request.required.acks is enabled, this lets you specify how long the broker should try to bundle the specified number of messages before sending back an error to the client. | |
"send.buffer.bytes" | All | Socket write buffer size. | |
"client.id" | All | Using this option, you can specify a string to help you identify the application making calls to the Kafka server. | |
"transactional.id" | Kafka10 | This is the transaction ID used by the KafkaWriter instance for the given foreign stream. Each foreign stream should use a unique transactional.id to publish messages to the topic using transactions. Transactions are used only if Kafka brokers are v0.11.2. These support transactional semantics. NOTE: You need to create a separate foreign stream definition for each pump that inserts (publishes) messages to a given topic. Each of these foreign streams needs to use a unique "transactional.id" for itself. The foreign stream option "pump.name", defined below, needs to match the name of the pump that inserts into the foreign stream. (new in s-Server version 6.0.1) If you set transactional.id = 'auto', when a pump begins running, s-Server automatically sets transactional.id to '<fully_qualified_pump_name>_Pump', where <fully_qualified_pump_name> is the name of the pump that instantiates the sink. |
|
"pump.name" | Kafka10 | Deprecated since version 6.0.1. Fully qualified name of the pump that will process rows for this foreign stream. You must set transactional.id in order to use this option. s-Server uses this setting to determine the mode in which the pump instance itself is running (Leader or Follower) when you configure the Kafka adapter to run in High Availability (HA) mode. The pump.name needs to be fully qualified pump name of the format:<catalog_name>.<schema_name>.<pump_name> For example:'LOCALDB.mySchema.ProcessedOrdersPump' |
|
"linger.ms" | All | In cases where batch.size has not been reached, number of milliseconds that the Kafka producer will wait before batching sends . Defaults to '100', in milliseconds) | |
"batch.size" | All | Number of messages sent as a batch. Defaults to '1000' | |
"kafka.producer.config" | Kafka10 | Lets you specify the name of a properties file that contains a set of Kafka producer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the producer will invoke. Kafka offers a wide range of config properties. For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#producerconfigs. NOTE: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "bootstrap.servers". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime. |
|
"security.protocol" | Kafka10 | Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL | |
"transaction.timeout.ms" | Kafka10 | The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. | |
HA_ROLLOVER_TIMEOUT | Kafka10 | Time in milliseconds. Defaults to 5000. You must set transactional.id and pump.name in order to use this option. When the pump is configured to run in High Availability mode, and the pump is running as a "Follower", it waits for this amount of time for a lack of commits from the "Leader". After the timeout, the "Follower" attempts to takeover as the "Leader". There may be multiple follower instances running in a cluster. Only one of these followers succeeds to be designated as the new "Leader". All other pump instances using the same transactional.id continue "following". If the earlier "Leader" was still alive when one of the followers took over as the new "Leader", the earlier leader demotes itself as the "Follower" if possible. |
|
COMMIT_METADATA_COLUMN_NAME | Kafka10 | The name of the column that contains watermark values - default SQLSTREAM_EGRESS_WATERMARK. | |
SQLSTREAM_EGRESS_WATERMARK_SORT_FUNCTION | Kafka10 | If the SQLSTREAM_EGRESS_WATERMARK egress column is being used, the value chosen depends on the value of this option. Possible values are:
|
|
HEADERS_COLUMNS | Kafka10 | Deprecated - please use HEADERS_COLUMN_LIST | |
HEADERS_COLUMN_LIST | Kafka10 | Comma-separated list of foreign stream columns that will be mapped as outgoing headers, rather than to the record value itself. See Writing Headers to Kafka based on Column Data | |
OPTIONS_QUERY | All | Lets you query a table to update adapter options at runtime. You can use this, for example, to set the "bootstraop.servers" option from a table , as in select lastOffset as "bootstrap.servers" from TEST.kafka_write_options; | |
POLL_TIMEOUT | Legacy | This option specifies the timeout value in milliseconds to be passed as a parameter to the KafkaConsumer.poll() API call. The default is 100ms. | |
PORT | Legacy | Deprecated option. From s-Server 6.0.0, port numbers are specified in the SEED_BROKERS option. | |
"producer.type" | Legacy | This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are 'async' (asynchronous) and 'sync' (synchronous). Default 'sync' | |
"compression.codec" | Legacy | The compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy". Default 'none'. | |
"compressed.topics" | Legacy | If the compression codec is anything other than 'none', enable compression only for specified topics if any. If the list of compressed topics is empty, then enable the specified compression codec for all topics. | |
"message.send.max.retries" | Legacy | This property will cause the producer to automatically retry a failed send request. This property specifies the number of retries when such failures occur. Default 3. | |
"topic.metadata.refresh.interval.ms" | Legacy | The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available...). It will also poll regularly (default: every 10min so 600,000ms). If you set this to a negative value, metadata will only get refreshed on failure. Default 600*1000 (10 minutes). | |
"request.required.acks" | Legacy | How many other brokers must have committed the data to their log and acknowledged this to the leader? 0 means never wait; 1 means wait for the leader to acknowledge; -1 means wait for all replicas to acknowledge. Default 0 (never wait). | |
"queue.buffering.max.ms" | Legacy | Maximum time to buffer data when using async mode. Default 5000. | |
"queue.buffering.max.messages" | Legacy | The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped. Default 10000. | |
"queue.enqueue.timeout.ms" | Legacy | The amount of time to block before dropping messages when running in async mode and the buffer has reached queue.buffering.max.messages. If set to 0 events will be enqueued immediately or dropped if the queue is full (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send. Default -1. | |
"batch.num.messages" | Legacy | The number of messages to send in one batch when using async mode. The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached. Default 200. | |
"send.buffer.bytes" | Legacy | Socket write buffer size. Default 100x1024. | |
"SHARD_ID" | kafka10 | This optional parameter is used in scaling, when you are running multiple shards of a pipeline. It uniquely identifies the current shard. When writing metadata/watermarks to kafka, the SHARD_ID is added as a prefix to the SQLSTREAM_POSITION_KEY. It is also included in the automatically generated transaction id. When creating a restartable pipeline, this SHARD_ID is expected to be the same as the SHARD_ID of the source foreign stream. |
You can also enable security for the Kakfa10 plugin by configuring a truststore, keystore, and password for the Kafka broker used by s-Server. Because these options involve storing passwords, you should implement these using properties files, so the passwords are not visible in the SQL.
The SQL/MED framework used by s-Server supports loading options through JNDI .properties files. SQLstream s-Server looks for these .properties files in $SQLSTREAM_HOME/plugin/jndi/. See Using Property Files. You can also use the "kafka.consumer.config" or "kafka.producer.config" foreign stream options to reference a Kafka properties file that contains these or other kafka consumer / producer properties.
So we recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using one of the following .properties files:
Option Name | Adapter version | Description |
---|---|---|
"ssl.truststore.location" | Kafka10 | The location of the trust store file. |
"ssl.truststore.password" | Kafka10 | The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled. |
"ssl.keystore.location" | Kafka10 | The location of the key store file. This is optional for client and can be used for two-way authentication for client. |
"ssl.keystore.password" | Kafka10 | The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured. |
"ssl.key.password" | Kafka10 | The password of the private key in the key store file. This is optional for client. |
CREATE OR REPLACE FOREIGN STREAM "schema"."out_fs"
(
"PARTITION_ID" BIGINT,
"data" VARCHAR(4000),
"SQLSTREAM_PROV_FILE_SOURCE_FILE" VARCHAR(256),
"SQLSTREAM_PROV_FILE_MODIFIED_TIME" BIGINT,
"SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" BIGINT,
"unparsed_attributes" VARCHAR(4096),
"SQLSTREAM_POSITION_KEY" VARCHAR(512)
)
SERVER "KAFKA10_SERVER"
OPTIONS (
"FORMATTER" 'CSV',
"bootstrap.servers" '<namespace>.servicebus.windows.net:9093',
"kafka.producer.config" '/home/sqlstream/kafka.producer.config',
"TOPIC" 'topic-03',
"INDEX_TOPIC_NAME" 'topic-03-index',
"COMMIT_METADATA_COLUMN_NAME" 'SQLSTREAM_POSITION_KEY'
);
CREATE OR REPLACE PUMP "schema"."output_pump" STOPPED AS
INSERT INTO "schema"."out_fs"
SELECT STREAM *
,CAST("PARTITION_ID" AS VARCHAR(50)) || ',' || CAST("SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" AS VARCHAR(50)) as "SQLSTREAM_POSITION_KEY"
FROM "schema"."data_edrflow_fs";
ALTER PUMP "schema".* START;
As Azure Event Hubs does not support transactions we cannot rely on commiting offset metadata into the sink topic. Thus in order to achieve at least once delivery semantics the following approach is used.
For more information please see Building and Using an Index Topic in the Integrating Kafka topic.