A foreign stream is an instance of a foreign data wrapper that provides access in s-Server to a flow of data either from or to an external system. Foreign streams provide interfaces for sources of data coming into s-Server and sinks of data being written out of s-Server.
For example, the Extensible Common Data Adapter supports the creation of a foreign stream that will take rows (from a local stream) and send them by email to a recipient.
This topic contains the following subtopics:
CREATE [OR REPLACE] FOREIGN STREAM foreign-stream-name (<column_list>) SERVER <server_name> [OPTIONS <options_specification>] [DESCRIPTION string_literal]
For detailed examples of using CREATE FOREIGN STREAM and stream options, see the topics Reading from Other Sources and Writing to Other Sources in the Integrating with Other Systems.
The OPTIONS required to create a foreign stream depend upon the server that the foreign stream references. Servers are often defined for an adapter, such as the SQL/MED Adapter or Extensible Common Data Adapter. See the s-Server Integrations page in the Integrating Guavus SQLstream with Other Systems guide for more details.
S-Server ships with a number of prebuilt server objects for the Extensible Common Data Adapter. In many cases, these objects will work fine for creating foreign streams for ECDA sources and sinks. You generally only need to create your own server object for ECDA sources and sinks if you know you will have multiple foreign streams that share options. In this case, it may be more efficient to define a custom server object with these shared options. Doing so allows foreign streams that invoke the custom server object to inherit options. See Using SQL/MED Inheritance with Server objects.
This table lists the ECD plugin types, and the corresponding prebuilt servers.
The 'Type' is used in the TYPE clause when creating a SERVER, and with the --io switch when running an ECD agent.
Type | Plugin Description | Name of Prebuilt Server for Foreign Streams | Notes |
---|---|---|---|
amqp_legacy | AMQP Legacy: Reads to and writes to AMQP message bus for AMQP 0.9. See the topics Reading from AMQP and Writing to AMQP for more details. | AMQP_LEGACY_SERVER | |
amqp10 | AMQP 1.0: Reads to and writes to AMQP message bus for AMQP 1.0. See the topics Reading from AMQP and Writing to AMQP for more details. | AMQP10_SERVER | |
hdfs | Allows s-Server to write to the Hadoop/HDFS file system (also supports ADLS Gen 2). See Writing to Hadoop for more details. | HDFS_SERVER | sink only |
hive | Allows s-Server to write to Hadoop/Hive tables. See Writing to Hive Tables for more details. | HIVE_SERVER | sink only; ORC format only |
http | Allows s-Server to read and write data over HTTP / HTTPS. See the topics Reading over HTTP and Writing over HTTP for more details. | HTTP_SERVER | |
file | File System: Reading and writing over the file system. See the topics Reading from the File System and Writing to the File System for more details. | FILE_SERVER | |
filevfs | Allows s-Server to read files in the compressed format from ADLS Gen2, HDFS, local file system, S3 and SFTP. See the topics Integrating Files using VFS for more details. | FILE_VFS_SERVER | source only |
ibmmq | Allows s-Server to read from or write to queues and topics in IBM MQ. See Reading from IBM MQ and Writing to IBM MQ for more details. | IBMMQ_SERVER | |
kafka | Allows s-Server to exchange data with Kafka clusters. See the topics Reading from Kafka and Writing to Kafka for more details. | KAFKA_SERVER | non-transactional |
kafka10 | Allows s-Server to exchange data with Kafka clusters versions 0.1.0 and above. See the topics Reading from Kafka and Writing to Kafka for more details. Also supports Azure Event Hubs. | KAFKA10_SERVER | versions since 0.10.2 |
kinesis | Allows s-Server to exchange data with Kinesis streams. See the topics Reading from Kinesis and Writing to Kinesis for more details. | KINESIS_SERVER | |
pulsar | Using s-Server, Using s-Server, you can read from and write to Pulsar streams. See the topic Integrating Pulsar for more details. | PULSAR_SERVER | |
Allows s-Server to connect to an SMTP server in order to send emails. See the topic Writing to Mail Servers for details. | MAIL_SERVER | sink only | |
mongodb | Allows s-Server to write to MongoDB. See the topic Writing to MongoDB for more details. | MONGO_SERVER | sink only |
mqtt | Allows s-Server to exchange data with MQTT brokers. See the topics Reading from MQTT and Writing to MQTT for more details. | MQTT_SERVER | |
snowflake | Allows s-Server to write to Snowflake warehouses. See the topic Writing to Snowflake for more details. | SNOWFLAKE_SERVER | sink only; uploads files |
net | Network Sockets (TCP/UDP): Configured for a socket. Reads or writes data streamed from a client program using TCP or UDP. See the topics Reading from Network Sockets and Writing to Network Sockets for more details. | NET_SERVER | |
websocket | Allows s-Server to read and write data over web sockets. See the topics Reading from Websockets and Writing to Websockets for more details. | WEBSOCKET_SERVER |
NOTES:
sqllineClient --run=$SQLSTREAM_HOME/support/sql/showForeignServers
You declare columns for foreign stream based on data type.
The following example uses the ECDA file reader to write CSV data to the file system. In order to read from or write to the file system, you must reference a prebuilt server object called FILE_SERVER. In the foreign stream's options, you configure how s-Server connects to the file system.
Foreign streams must be created in a schema, which is what we create next. This foreign stream declares columns explicitly, but in some cases, you can derive column names from the external source.
See the topic Writing to the File System in the Integrating with Other Systems Guide for more information about this example.
CREATE OR REPLACE SCHEMA "FileWriterSchema"
SET SCHEMA 'FileWriterSchema';
CREATE OR REPLACE FOREIGN STREAM "FileWriterStream"
("recNo" INTEGER,
"ts" TIMESTAMP,
"accountNumber" INTEGER,
"loginSuccessful" BOOLEAN,
"sourceIP" VARCHAR(32),
"destIP" VARCHAR(32),
"customerId" INTEGER)
--Columns for the new stream
SERVER FILE_SERVER
OPTIONS
(directory 'myDirectory',
--directory for the file
formatter 'CSV',
filename_pattern 'myRecord.csv',
--regex for filename pattern to look for
character_encoding 'UTF-8',
write_header 'true');
To start writing, you need to create a pump to insert data into the foreign stream. You do so using code along the following lines. See the topic CREATE PUMP in this guide for more information on pumps.
Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.
You can specify an OPTIONS clause to provide parameters for a foreign stream or for any column.
Each OPTIONS specification is an option-value pair that names the option and gives its intended value.
No two options in the same clause may have the same name.
Options vary greatly depending on the kind of source for which you are setting up the foreign stream. The uses of foreign streams are not limited to the below, but these do represent common uses for foreign streams in s-Server.
See the ADLS section of the topic Reading from Files using VFS in the Integration Guide for more details or see Options for Reading from Files using VFS below.
There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0 only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of exchanges, queues and bindings to exchange messages. As a result of this change, you configure the connection URL for AMQP differently for 1.0 than for up to 0.9.1
Option | Description |
---|---|
CONNECTION_URL | Required. Connection URL for AMQP legacy server. This includes the servers hostname, user, password, port and so on. This will differ depending on whether you are using AMQP 1.0 or a legacy version. |
DESTINATION | This can be in the form <destination prefix>{PARITITION} |
PARTITION_EXPRESSION | You should only use this if DESTINATION includes "{PARTITION}". This should be a dk.brics regular expression, such as <0-3>. |
PARSER_QUEUE_SIZE | Queue size for parser. Reading only. Defaults to 2. In most cases, you will not want to change this number. |
ACKNOWLEDGE_MODE | Optional. Acknowledgment mode that ECDA communicates to the AMQP 1.0 server. Options are AUTO, MANUAL, or NONE; defaults to AUTO. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack. Roughly, AUTO asks the container on the AMQP server to acknowledge once message is completely delivered. MANUAL means that delivery tags and channels are provided in message headers, and NONE means no acknowledgments. |
OPTIONS_QUERY | Optional. Lets you query a table to update one or more adapter options at runtime. You can use this to set one or more options using select DESTINATION from TEST.amqp_options . For more details see the topic Using the Options Query Property. |
Format:
amqp://<username>:<password>@<clientid>/<profile>?brokerlist='tcp://<hostname>:<portNumber>'&[ <optionsList> ]`
Example:
amqp://guest:guest@clientid/default?brokerlist='tcp://localhost:5672'
Format:
amqp://<username>:<password>@<hostname>:<portNumber>?<optionsList>'
Example:
amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default
Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation.Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation. The site https://azure.microsoft.com/en-us/documentation/articles/service-bus-java-how-to-use-jms-api-amqp/ offers an example of formatting a connection URL.
See the topic Reading from AMQP in the Integration Guide for more details.
For reading from Azure Event Hubs we use the Kafka plugin - please see Foreign Stream Options for Reading from Kafka below. For more information see the topic Reading from Azure Event Hubs in the Integration Guide.
Option | Description |
---|---|
DIRECTORY | Directory in which file resides from which you are reading. |
FILENAME_PATTERN | Input only. Java regular expression defining which files to read. See https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html for more information on Java regular expressions. |
FILE_COMPRESSION | Inflate / de-compress incoming files using the supplied codec. Currently only 'gzip' is supported (case is not significant). Any other value, or no value, means the file is not de-compressed. Default 'none'. |
FILE_COMPRESSION_BUFFER | Size in bytes of decompression buffer. Default '65536'. |
STATIC_FILES | Defaults to false. When you set this to true, you indicate that no files will be added or changed after the file reader is started. The file reader will exit after the last file is read. This lets you use the file reader as a foreign table, which is finite (as opposed to a foreign stream, which is infinite, and handles files that are continually written to). |
REPEAT | Defaults to 0, meaning do not repeat. Can be a positive whole number, a negative number, or 'FOREVER'. For positive numbers, after processing all files that match FILENAME_PATTERN, start over again. Repeat for the specified number. If negative or 'FOREVER', keep reprocessing all files that match FILENAME_PATTERN forever. You must set STATIC_FILES to true in order to use this option. |
OPTIONS_QUERY | Optional. Lets you query a table to update one or more adapter options at runtime. You can use this to set options using select DIRECTORY, FILENAME_PATTERN from TEST.file_options . For more details see the topic Using the Options Query Property. Often used to supply STARTING_POSITION or STARTING_TIME (see below) |
SORT_BY_TIMESTAMP | 'true' or 'false'. If 'true' we search in the file for the first timestamp that matches STARTING_TIME. If false we use the STARTING_POSITION watermark. |
STARTING_POSITION | The watermark in the form 'filename:linenumber' from which to start reading. When used, this option is normally retrieved dynamically using an OPTIONS_QUERY. See Using Exactly Once with File as a source. |
STARTING_TIME | The starting time in the format defined by FILENAME_DATE_FORMAT. Only used if SORT_BY_TIMESTAMP is true. When set, extract the file timestamp from incoming file names using the FILENAME_PATTERN option. The file timestamp is assumed to be extracted as group 1 of the pattern. Skip the file if its timestamp is less than the watermark timestamp. See Using Exactly Once with File as a source. |
FILENAME_DATE_FORMAT | The format of the date that is embedded in incoming filenames. Only used if SORT_BY_TIMESTAMP is true, to extract the time from the filename for matching to STARTING_TIME. |
WATERMARKS | Name of the watermark table or a view to retrieve the WATERMARK for the source |
POSTPROCESS_COMMAND | Optional. Input: shell script present in $SQLSTREAM_HOME/userbin directory. The shell script receives the watermark from the sink(s) of the pipeline through the WATERMARKS query. The users can hence archive all the files before that WATERMARK. |
See the topic Reading From the File System in the Integration Guide for more details.
Option | Description | Default | ||||||
---|---|---|---|---|---|---|---|---|
FILE_LOCATION | (Mandatory). Represents the URI of target directory containing the files to be read - for example file:///tmp/newdirectory represents a directory on the local file system. For more information see URI Formats Supported by FILE-VFS. | |||||||
FILENAME_PATTERN | (Mandatory). A Java regular expression match filter selecting which files to be read from the FILE_LOCATION directory. When the SORT_FIELD option has the value TIME_IN_FILENAME or LEXICAL_IN_FILENAME the first capture group in the regex will be used to identify the sub-regex on which the files will be sorted. Example: buses_(\d{14})_[a-z]+.* - files matching this regex will be picked and the group \d{14} (14 digits) will be used to sort files. |
|||||||
FILE_TYPE | The type of file you wish to read. Supported options:
|
none | ||||||
SORT_FIELD | The file sorting mechanism you wish to use:
|
MODIFIED_FILE_TIME | ||||||
FILENAME_TIME_FORMAT | The format of the timestamp in case the SORT_FIELD option is set to TIME_IN_FILENAME. NOTE: This option is mandatory while using TIME_IN_FILENAME. |
|||||||
STARTING_FILE_NAME_AND_OFFSET | Reads will start with the given starting file name and its offset value when SORT_FIELD option is set to LEXICAL_IN_FILENAME. The default value EARLIEST means all files will be picked. For example:
|
EARLIEST | ||||||
STARTING_TIME | The starting time in case the SORT_FIELD option is set to TIME_IN_FILENAME - in the format yyyy-MM-dd HH:mm:ss.SSS. | 1752-09-14 00:00:00.000. | ||||||
STARTING_POSITION | The starting position value for file read in case the option SORT_FIELD is set to MODIFIED_FILE_TIME. The format for STARTING_POSITION is modified_time_epoch[:file_name[:offset]] - file_name and offset are optional. For example,
|
1752-09-14 00:00:00.000 | ||||||
INGRESS_NUM_PARTITIONS | Used in scaling. Denotes the number of logical partitions into which you want to divide your input data . A single partition is defined as the logical grouping of input data files based on hash-computation. Files having the same partition value belong to the same partition. Partition value for a file is computed as: hash(filename)%INGRESS_NUM_PARTITIONS; see Scaling in File-VFS for more information. | |||||||
INGRESS_ASSIGNED_PARTITIONS | Used in scaling. This identifies which partitions that are assigned to the current shard. A shard only processes these partitions and skips the data from other partitions. These are 0 indexed and can be comma-separated values (0,1,2) or range-based (4:7 which is equivalent to 4,5,6) or a mix of both (0,1,4:7 which is equivalent to 0,1,4,5,6) or all(:). | |||||||
SHARD_ID | Used in scaling. This is an optional parameter and used only in scenarios when you are running multiple shards of a pipeline. This parameter identifies the current shard, counting from 0. | |||||||
INGRESS_DELAY_INTERVAL | The period (in milliseconds) for file-based T-sort to wait for late files - applies only if SORT_FIELD is set to MODIFIED_FILE_TIME or TIME_IN_FILENAME. Valid values are:
|
0 | ||||||
INGRESS_FILE_SCAN_WAIT | The duration (in milliseconds) in which the reader thread checks the Queue for new files to be read. | 2000. | ||||||
IGNORE_CORRUPT_FILES | In case of any fatal IOExceptions that may come up while reading from a file, should the VFS reader ignore the file? If true, the corrupt file is skipped else the pump is terminated and no more data will flow. | False | ||||||
NUMBER_OF_BUFFERS | The number of buffers the File-VFS plugin can use to read data before it needs to recycle the buffers. Increasing the number of buffers might increase the plugin’s performance at the cost of memory. | 2 | ||||||
INGRESS_RECURSIVE_LOOKUP | Enable / disable recursive file lookups within subdirectories of the provided directory. When the flag is 'true' the plugin checks each directory entry to distinguish files from sub-directories. If you do not need to include files from child sub-directories, and if you know that all the directory entries that match FILENAME_PATTERN will be files you can safely set INGRESS_RECURSIVE_LOOKUP 'false' to disable this check and save the latency of a round trip to the remote file system for each file in the FILE_LOCATION directory |
true |
Depending on the settings of SORT_FIELD and INGRESS_DELAY_INTERVAL, the file-based T-sort feature may come into operation.
Option | File System | Default | Description |
---|---|---|---|
"vfs.hdfs.kerberos.enabled" | HDFS | 'false' | In order to read from HDFS with kerberos authentication, the value for this option should be 'true' |
"vfs.hdfs.kerberos.keytab" | HDFS | no default value | In order to read from HDFS with kerberos authentication, the user must provide the location of the keytab here as a string |
"vfs.hdfs.kerberos.principal" | HDFS | no default value | In order to read from HDFS with kerberos authentication, the user must provide the value of the principal here as a string |
"vfs.s3.useHttps" | S3 | 'true' | In order to read from s3 or minio over HTTP (and not HTTPS) the value for this option should be 'false' |
AUTH_TYPE_ADLS | ADLS Gen2 | no default value | (Mandatory when using ADLS Gen2). Specify the authentication mechanism. Supported types are 'SharedKey' and 'OAuth'. OAuth2.0 support is provided via Client Credentials |
KEY_FILE | ADLS Gen2 | no default value | (Mandatory when using ADLS Gen2). Provides path to the configuration file containing credentials for the appropriate authentication mechanism. Example here |
REFRESH_CRED | ADLS Gen2 | no default value | (Optional). Supports values 'yes' and 'no' (case insensitive). When set to yes, any changes made to the KEY_FILE will be used to establish a new connection with ADLS Gen2. If set to no or not provided changed credentials in the KEY_FILE won’t be used to establish connection. |
When reading from files, you can define provenance columns. These return metadata for the file from which you are reading.
Option | Data type | Description |
---|---|---|
SQLSTREAM_PROV_FILE_SOURCE_FILE | VARCHAR | Adds the name of the file being read to the output columns |
SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER | BIGINT | Adds the current line number to the output columns. Line number starts from 0. |
SQLSTREAM_PROV_FILE_MODIFIED_TIME | BIGINT | Adds the modified time of the file being read to the output columns. |
SQLSTREAM_PROV_INGRES_PARTITION_ID | INT | Adds the partition from which this row was read, calculated by hashing the incoming filename as described in scaling. |
See the topic Reading from Files using VFS in the Integration Guide for more details.
See the HDFS section of the topic Reading from HDFS using VFS in the Integration Guide for more details or see Options for Reading from File using VFS above.
Option | Description |
---|---|
URL | URL for HTTP feed. |
HEADER_<name_of_header> | Tells HTTP reader to add a header to the request called <name_of_header>, with the option value as the value. |
PARAM_<name_of_param> | Tells HTTP reader to add a query parameter to the request called <name_of_param> with the option value as the value. |
POLL_IN_MILLIS | How often to request new data, in milliseconds. |
OPTIONS_QUERY | Optional. Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set HEADER_xxx and PARAM_xxx options using select HEADER_ABC, PARAM_XYZ from TEST.http_options . For more details see the topic Using the Options Query Property. |
See the topic Reading from HTTP in the Integration Guide for more details.
Option | Description | Required | Default |
---|---|---|---|
QUEUE_MANAGER | Queue manager name of the IBM MQ. | True | |
SERVER_HOST | Hostname of the IBM MQ Server | True | |
SERVER_PORT | Listener Port of the IBM MQ Server | True | |
SERVER_CHANNEL | Channel name used by the queue with data. | True | |
APPLICATION_USER | User name that application uses to connect to MQ | False | |
APPLICATION_PASSWORD | Password of the application user encoded in base64 format. | False | |
QUEUE | Name of the queue from which you are receiving data. Must specify either QUEUE or TOPIC but not both. | False | |
TOPIC | Name of the IBM MQ topic from which you are receiving data. Must specify either QUEUE or TOPIC but not both. | False | |
TRANSACTION_ROW_LIMIT | Commit Batch size of the files created from the messages. | False | 2000 |
TRANSACTION_ROWTIME_LIMIT | Commit time interval in ms after which batch is acknowledged and created if the count of messages doesn’t reach batch size. | False | 20000 |
WATERMARK_QUERY | Watermark query which returns string in watermark MQ format from the sink. | False | |
CHARACTER_ENCODING | Character encoding of the data | False | UTF-8 |
INGRESS_TEMP_FILE_PATH | Persistent volume path where the files will get created. | False | /tmp |
INGRESS_TEMP_FILE_RETENTION_IN_MIN | Retention of files in the persistent path if watermark query is not provided. | False | 1 |
INGRESS_TEMP_FILE_LIMIT | Limiting maximum number of temp files in persistent path. | False | 100 |
CIPHER_SUITE | MQ Supported Cipher Spec corresponding to the channel used. | False | TLS_RSA_WITH_AES_128_CBC_SHA256 |
CONFIG_FILE_PATH | Path of properties file for system properties of Java application for keystore, truststore path and password | False |
See the topic Reading from IBM MQ in the Integration Guide for more details.
In reading from Kafka, you can declare provenance columns. These return metadata for the Kafka topic from which you are reading.
These are as follows:
Data Type | Name in s-Server 6.0.0 | Name in s-Server 6.0.1+ | Kafka version | Description |
---|---|---|---|---|
VARCHAR(256) | KAFKA_TOPIC | SQLSTREAM_PROV_KAFKA_TOPIC | Kafka v.0.10.2 or later | Returns name of Kafka topic. |
INTEGER | KAFKA_PARTITION | SQLSTREAM_PROV_KAFKA_PARTITION | Kafka v.0.10.2 or later | Returns value of current Kafka partition. |
INTEGER | PARTITION | SQLSTREAM_PROV_KAFKA_PARTITION | Kafka v.0.8.2 | Returns value of current Kafka partition. |
BIGINT | KAFKA_OFFSET | SQLSTREAM_PROV_KAFKA_OFFSET | Kafka v.0.10.2 or later | Returns value of current Kafka offset. |
BIGINT | OFFSET | SQLSTREAM_PROV_KAFKA_OFFSET | Kafka v.0.8.2 | Returns value of current Kafka offset. |
TIMESTAMP | KAFKA_TIMESTAMP | SQLSTREAM_PROV_KAFKA_TIMESTAMP | Kafka v.0.10.2 or later | Returns current Kafka_timestamp. |
VARBINARY(256) | KAFKA_KEY | SQLSTREAM_PROV_KAFKA_KEY | Kafka v.0.10.2 or later | Returns key value for message. |
BINARY(32) | PAYLOAD_PREFIX | SQLSTREAM_PROV_KAFKA_PAYLOAD_PREFIX | Kafka v.0.10.2 or later | Returns key value for message. |
VARCHAR(1000) | N/A | SQLSTREAM_PROV_KAFKA_HEADERS | Kafka v.0.10.2 or later | Returns any headers for Kafka message topics as key-value pairs in serialized text format. See Reading and Parsing Headers. |
Some of the following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation/#consumerconfigs. Where appropriate, information in this table is drawn from this page.
Options shown in lower case are case sensitive; they must be lowercase and double-quoted. Options shown in upper case may be entered either unquoted in any case, or double-quoted in upper case. So TOPIC, Topic and topic are all valid; "TOPIC" is valid but "topic" is not.
Some options may not apply to all versions of the Kafka plugin. Each option below is marked as 'Kafka10', 'Legacy' or 'All':
Option | Adapter version | Description |
---|---|---|
TOPIC | All | Required. Kafka Topic. You can use a regular expression as a "topic wild card". (This is not supported by legacy versions of the adapter.) |
SEED_BROKERS | All | A comma separated list of broker identifiers in the format _<broker_hostname>:<port>. For legacy adapter, this is a comma separated list of broker hosts. Defaults to "localhost". |
PARTITION | All | Partition number to read from. If reading from a kafka topic with multiple partitions and PARTITION is omitted or blank all partitions will be read from. You can specify a single partition with PARTITION or a range with PARTITION -(Range needs two partitions - eg 1-3 - and is inclusive.) Note: Partition numbers are 0 based. |
PORT | Legacy | Deprecated since 6.0.1 Port for Kafka seed brokers |
STARTING_TIME | All | Either EARLIEST, LATEST, or a timestamp in the format 'yyyy-MM-dd HH:mm:ss.SSS', such as '2018-02-01 22:23:45:892'. Default LATEST. When STARTING_TIME is a timestamp, the Kafka adapter "seeks" to offsets within all topic partitions where the message timestamp for all subsequent messages is greater than or equal to the specified STARTING_TIME. Requires Kafka v0.10.2 or later. For the legacy Kafka adapter, options are EARLIEST or LATEST. |
STARTING_OFFSET | All | Where to start reading from (default is -1, the oldest offset) as a long int representing the physical offset within a partition |
INDEX_TOPIC_NAME | Kafka10 | This option specifies the name of the index topic to be used for mapping message offsets to timestamps. For more details, refer to Building and Using an Index Topic in the Reading from Kafka topic of the s-Server Integration Guide. Index topic may be created on a separate Kafka cluster. |
INDEX_TOPIC_SEED_BROKERS | Kafka10 | The broker(s) where the index topic referenced by INDEX_TOPIC_NAME is managed |
MAX_POLL_RECORDS | Kafka10 | Maximum number of records to be polled (fetched) through the new KafkaConsumer.poll() API call. For best throughput during replay, this needs to be set such that you get a "page" full (1MB is the default) of kafka messages from each partition of the topic(s). It can be roughly calculated as: (numPartitions * 1 MB) / typicalMessageSize |
PARTITION_OFFSET_QUERY | Kafka10 | This is a SQL query that fetches starting offsets, Kafka partition and Kafka topic for all topic partitions. For both the legacy Kafka adapter and the Kafka10 adapter, you can use a query along the following lines: PARTITION_OFFSET_QUERY 'SELECT "KAFKA_PARTITION","KAFKA_TOPIC","KAFKA_OFFSET" FROM from stored_offsets' PARTITION should be of type INTEGER. OFFSET should be of type BIGINT. Any partition for which the query does not return anything will either use STARTING_OFFSET or STARTING_TIME to determine where to start. |
CLIENT_ID | All | For the Kafka10 adapter, it is vital to understand that this is the consumer group id (so is used to set consumer group property "group.id"). Client key for Yammer metrics. CLIENT_ID defaults to client{TOPIC} or CLIENT{TOPIC}{PARTITION} if PARTITION is specified. CLIENT_ID and METRICS_PER_PARTITION affect Kafka Yammer metrics reporting. CLIENT_ID does not apply unless METRICS_PER_PARTITION is set to true. See http://docs.confluent.io/1.0/kafka/monitoring.html for more information on Kafka Yammer metrics. |
OPTIONS_QUERY | All | Lets you query a table to update adapter options at runtime. You can use this, for example, to set the STARTING_OFFSET option from a table that contains the last offset, as in select lastOffset as STARTING_OFFSET from TEST.committedOffset; |
SCHEMA_ID | All | If set >= 0, the reader will treat the first byte of data as magic byte, and the next 4 bytes as a schema ID, and will discard any record where the first byte does not match the SCHEMA_ID. Default -1. |
PARSER_PAYLOAD_OFFSET | Kafka10 | The number of bytes to skip at the start of the record to get the value - in case the data is offset (by a schema ID or some other fixed-length data). Default 0. |
"kafka.consumer.config" | Kafka10 | Lets you specify the name of a properties file that contains a set of Kafka consumer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the consumer will invoke. Kafka offers a wide range of config properties. For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#newconsumerconfigs Note: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "SEED_BROKERS". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime. |
"isolation.level" | Kafka10 | Lets you specify whether s-Server should read all Kafka messages or only committed messages. Options are read_uncommitted, or read_committed This option lets you account for transactions, a Kafka 0.11.0 feature whereby applications can write to multiple topics and partitions atomically. To use atomic commitments, you need to configure the Kafka adapter to only read committed data--this is, read_committed. |
METRICS_PER_PARTITION | Legacy | True or False. If METRICS_PER_PARTITION is false, then CLIENT_ID will be used as the client key for all yammer metrics reporting for that adapter. If METRICS_PER_PARTITION is true, then the actual partition number will be appended to each to each client_id (and finer grained metrics will be reported). |
FETCH_SIZE | Legacy | Fetch size. Defaults to 1000000. |
You can also enable security for the Kakfa10 plugin by configuring a truststore, keystore, and password for the Kafka broker used by s-Server. Because these options involve storing passwords, you should implement these using properties files, so the passwords are not visible in the SQL.
The SQL/MED framework used by s-Server supports loading options through JNDI .properties files. SQLstream s-Server looks for these .properties files in $SQLSTREAM_HOME/plugin/jndi/. See Using Property Files. You can also use the "kafka.consumer.config" or "kafka.producer.config" foreign stream options to reference a Kafka properties file that contains these or other kafka consumer / producer properties.
So we recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using one of the following .properties files:
Option Name | Adapter version | Description |
---|---|---|
"ssl.truststore.location" | Kafka10 | The location of the trust store file. |
"ssl.truststore.password" | Kafka10 | The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled. |
"ssl.keystore.location" | Kafka10 | The location of the key store file. This is optional for client and can be used for two-way authentication for client. |
"ssl.keystore.password" | Kafka10 | The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured. |
"ssl.key.password" | Kafka10 | The password of the private key in the key store file. This is optional for client. |
See the topic Reading from Kafka in the Integration Guide for more details.
Reading from Amazon Kinesis Streams uses an agent framework and does not require creation of a foreign stream. Please see Reading from Kinesis.
Option | Description |
---|---|
CONNECTION_URL | 'tcp://127.0.0.1:1883', |
TOPIC | MQTT topic. UTF-8 string that the MQTT broker uses to filter messages for each connected client. |
QOS | Defines the guarantee of delivery for a specific message. Either at most once (0), at least once (1), exactly once (2). Defaults to 1. For more information on QOS, see https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/ |
CLIENT_ID | s-Server implements an MQTT client to connect to the MQTT server. This setting provides a MQTT ClientID for this client. The MQTT broker uses the ClientID to identify the client and its current state. As a result, if used, you should use a distinct CLIENT_ID for each foreign stream defined for MQTT. Defaults to randomly generated. |
USERNAME | Optional. User name for MQTT connection. Note: s-Server sends user names and passwords as plain text. |
PASSWORD | Optional. Password for MQTT connection. Note: s-Server sends user names and passwords as plain text. |
KEEP_ALIVE_INTERVAL | Optional. Interval in seconds sent to MQTT broker when s-Server establishes a connection. Specifies the longest time period of time that broker and client persist without sending a message. Defaults to 60. |
CONNECTION_TIMEOUT | Optional. Defaults to 30. |
OPTIONS_QUERY | Optional. Lets you query a table to update one or more adapter options at runtime. You can use this to set options using select USERNAME, PASSWORD from TEST.mqtt_options . For more details see the topic Using the Options Query Property. |
See the topic Reading from MQTT in the Integration Guide for more details.
The ECD framework can act as a server or client. When it acts a client, set REMOTE_HOST and REMOTE_PORT. When it acts a server, set SERVER_PORT and if desired SERVER_HOST.
Option | Description |
---|---|
IS_IPV6 | Whether or not the socket uses IPV6. Default is false. |
PROTOCOL | Whether the socket uses TCP or UDP. Default is TCP. |
REMOTE_HOST | Hostname to receive tuples from, when ECDA is acting as a client. You can override this to 'LOCALHOST' to listen to only local connections, or a specific ip address, such as 168.212.226.204. When you specify REMOTE_HOST and REMOTE_PORT, this tells the ECD socket code to start the network connection as a client. |
REMOTE_PORT | Port to sreceive tuples from when ECDA is acting as a client. REMOTE_ and SERVER_ tells ECDA's socket code how to start the network connection (as a server or a client). |
SERVER_HOST | The hostname or IP address to listen upon to receive tuples, when ECDA is acting as a server (defaults to 0.0.0.0). When you specify SERVER_HOST and SERVER_PORT, this tells the ECD socket code to start the network connection as a client. |
SERVER_PORT | the port to listen upon to receive tuples when ECDA is acting as a server. |
OPTIONS_QUERY | Optional. Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set the REMOTE_HOST and REMOTE_PORT options using select REMOTE_HOST, REMOTE_PORT from TEST.socket_options . For more details see the topic Using the Options Query Property. |
In reading from network sockets, you can declare provenance columns. These return metadata for the network socket from which you are reading.
These are as follows:
Data Type | Name | Value |
---|---|---|
VARCHAR(1024) | SQLSTREAM_PROV_SOCKET_SOURCE_HOST | Returns host name for socket. |
INTEGER | SQLSTREAM_PROV_SOCKET_SOURCE_PORT | Returns port for socket. |
See the topic Reading from Network Sockets in the Integration Guide for more details.
See the S3 section of the topic Reading from Files using VFS in the Integration Guide for more details or see Options for Reading from Files using VFS above.
See the SFTP section of the topic Reading from Files using VFS in the Integration Guide for more details or see Options for Reading from Files using VFS above.
Option | Description |
---|---|
URL | URL for web socket. |
HEADER_ |
Tells Web Socket reader to add a header called <name_of_header> to the request. |
OPTIONS_QUERY | Optional. Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set HEADER_xxx options using select HEADER_ABC, HEADER_XYZ from TEST.http_options . For more details see the topic Using the Options Query Property. |
See the topic Reading from WebSockets in the Integration Guide for more details.
See the ADLS section of the topic Writing to Hadoop in the Integration Guide for more details, or see Options for Hadoop/HDFS below.
There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0 only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of exchanges, queues and bindings to exchange messages. As a result of this change, you configure the connection URL for AMQP differently for 1.0 than for up to 0.9.1
Name | Description |
---|---|
CONNECTION_URL | Required. Connection URL for AMQP server. This includes the servers hostname, user, password, port and so on. This will differ depending on whether you are using AMQP 1.0 or a legacy version. AMQP_LEGACY (AMQP protocol Version 0.9, e.g., RabbitMQ) Format: amqp://<username>:<password>@<clientid>/<profile>?brokerlist='tcp://<hostname>:<portNumber>'&[ <optionsList>] Example: amqp://guest:guest@clientid/default?brokerlist='tcp://localhost:5672' AMQP10 (AMQP protocol Version 1.0) - connectionfactory.localhost: Format: amqp://<username>:<password>@<hostname>:<portNumber>?<optionsList>' Example: amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation. The site https://azure.microsoft.com/en-us/documentation/articles/service-bus-java-how-to-use-jms-api-amqp/ offers an example of formatting a connection URL. |
DESTINATION | Required. AMQP 1.0 queue or topic identifier. In general, the destination qualification syntax may be specific to the AMQP message broker implementation. The examples here are specific to ActiveMQ. You can fully qualify the AMQP destination by identifying the destination as a topic or a queue. ActiveMQ supports such qualification. For a topic: "DESTINATION" 'topic://testTopic' For a queue: "DESTINATION" 'queue://testTopic' ActiveMQ treats an unqualified destination as a queue. In other words, for ActiveMQ, DESTINATION 'foo' is equivalent to DESTINATION 'queue://foo' See http://camel.apache.org/activemq.html for more details. In s-Server 6.0, DESTINATION can be either an absolute destination or be in the form: <destination prefix>{PARITITION}<destination suffix> Example: /new/ConsumerGroups/$Default/Partitions/{PARTITION} |
ACKNOWLEDGE_MODE | Optional. Acknowledgment mode that ECDA communicates to the AMQP 1.0 server. Options are AUTO, MANUAL, or NONE; defaults to AUTO. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack Roughly, AUTO asks the container on the AMQP server to acknowledge once message is completely delivered. MANUAL means that delivery tags and channels are provided in message headers, and NONE means no acknowledgments. |
DELIVERY_MODE | Optional. Delivery mode for messages that ECDA communicates to the AMQP 1.0 server. Options are NON-PERSISTENT or PERSISTENT; defaults to NON-PERSISTENT. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-outbound-channel-adapter-xml-7a |
See the topic Writing to AMQP in the Integration Guide for more details.
For writing to Azure Event Hubs we use the Kafka plugin - please see Foreign Stream Options for Writing to Kafka below. For more information see the topic Writing to Azure Event Hubs in the Integration Guide.
Option | Description |
---|---|
DIRECTORY | Directory to which you are writing. s-Server needs permission to write to this directory. |
ORIGINAL_FILENAME | Name of file to write before rotation. This will be the name of the file that s-Server is currently writing. |
FILENAME_PREFIX | Prefix of the final output file name, such as "test-". |
FILENAME_DATE_FORMAT | Java time format to use in the final output file name, for example yyyy-MM-dd_HH:mm:ss. Uses java SimpleDateFormat. This specifies how to format a timestamp that appears between the prefix and the suffix. This timestamp is the ROWTIME of the last row in the file. |
FILE_ROTATION_WATERMARK_COLUMN | This declares the name of a source column whose contents will be used to further distinguish files in the series. |
FILENAME_SUFFIX | Suffix of the final output file name. If you want this suffix to include a period, you must specify it, e.g. ".csv" |
FILE_ROTATION_TIME | Determines when files rotate based on time elapsed since Unix epoch time. Defaults to 0. That means "don't use ROWTIME to trigger file rotation". You must specify either FILE_ROTATION_TIME or FILE_ROTATION_SIZE as an option. You can choose to specify both. You set FILE_ROTATION_TIME as a positive integer with a unit of either milliseconds (ms), seconds (s), minutes (m), hours (h), or days (d) - for example '15s', '20m', '2h' or '3d'. These express intervals of time from 1970-01-01; files rotate once a row arrives with a ROWTIME that passes the specified interval. See Using FILE_ROTATION_TIME. |
FILE_ROTATION_SIZE | Determines when files rotate based on file size. s-Server rotates to the next file once a row arrives that brings the file size over the byte threshhold specified by FILE_ROTATION_SIZE. You must specify either FILE_ROTATION_TIME or FILE_ROTATION_SIZE. You can choose to specify both. Lets you specify a file size in kilobytes (k), megabytes (m), or gigabytes (g). Expressed as a positive integer followed by a byte measurement, for example '64k', '100m', '8g'. Defaults to 0 - which means "don't use file size to trigger file rotation". See Using FILE_ROTATION_SIZE. |
FILE_ROTATION_RESPECT_ROWTIME | 'true' or 'false', case-insensitive. When you use FILE_ROTATION_SIZE, this option lets you specify whether files wait to rotate until all rows with the same ROWTIME have arrived. Defaults to 'true', which means "always respect rowtime". See Using FILE_ROTATION_RESPECT_ROWTIME. |
ESCAPE_<column name> | True or false; defaults to true. Causes strings to be escaped before being written. |
POSTPROCESS_COMMAND | The POSTPROCESS_COMMAND option lets you run a script after each output file is written. To use this option, enter the path to the script, along with any parameters needed by the script, substituting <input> for the name of the file. When the file is complete, the script will execute with parameters, and <input> will be replaced by the name of the file. Example: '/home/sqlstream/scripts/send-to-destination.sh <input> sftp://some.sftp.server.com' |
See the topic Writing to the File System in the Integration Guide for more details.
Option Name | Scope | Required? | Description |
---|---|---|---|
HDFS_OUTPUT_DIR | All | Required | Address for name node of HDFS, such as hdfs://storm-s3.disruptivetech.com:8020/user/sqlstream/ or for ADLS Gen2 abfs:// |
ABFS_CONFIG_FILE | ABFS | Required | Path of the ABFS config file with credentials (only yaml file) |
ABFS_AUTH_METHOD | ABFS | Required | Authentication method used to upload data - either "SharedKey" or "OAuth" - which indicates OAuth 2.0. These values are case sensitive. |
REFRESH_PROPS | ABFS | Optional | If set ‘true’, config file will be read again and new instance of configurations will be used, if set 'false' or not used, any change in config file will not be detected unless s-server is restarted. |
AUTH_METHOD | HDFS, Hive | Optional | If desired, specify 'kerberos' Requires AUTH_USERNAME and AUTH_KEYTAB (the latter should be implemented using JNDI properties. See Hadoop Options for Use with JNDI Properties below |
AUTH_USERNAME | HDFS, Hive | Optional | User name for HDFS. Required if AUTH_METHOD is specified. |
CONFIG_PATH | HDFS, Hive | Optional | Specifies path to an HDFS client configuration file. This will be loaded and used by s-Server's HDFS client in it’s entire life cycle. Example: /home/me/work/kerberos/core-default.xml |
AUTH_KEYTAB | HDFS, Hive | Optional | Path to file containing pairs of Kerberos principals and encrypted keys, such as /tmp/nn.service.keytab Required if AUTH_METHOD is specified. |
HIVE_TABLE_NAME | Hive | Required | Table name inside HIVE_SCHEMA_NAME. |
HIVE_SCHEMA_NAME | Hive | Required | Schema (database) containing the table. Must be specified if HIVE_TABLE_NAME is specified. |
AUTH_METASTORE_PRINCIPAL | Hive | Required | Defaults to null. Required if HIVE_TABLE_NAME is specified. 3-part name of the Kerberos principal which can read the Hive metastore. This is the value of the hive.metastore.kerberos.principal property set in the Hive installation's hive-site.xml descriptor file. |
HIVE_URI | Hive | Required | JDBC URL for accessing the Hive server when loading data into Hive tables. Must be specified if HIVE_TABLE_NAME is specified. |
HIVE_METASTORE_URIS | Hive | Optional | Location of the Hive metastore for loading data into Hive tables. Required if HIVE_TABLE_NAME is specified. |
$columnName_HIVE_PARTITION_NAME_FORMAT | Hive | Optional | This option specifies custom formatting directives for partition key values when they are used to construct the names of HDFS directory levels. $columnName must be the name of a partition key of type DATE or TIMESTAMP. The value bound to this option must be a valid format string as understood by java.text.SimpleDateFormat. |
OPTIONS_QUERY | All | Optional | Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set the HDFS_OUTPUT_DIR option from a table that contains options, as in select lastOffset as HDFS_OUTPUT_DIR from TEST.hdfs_options . For more details see the topic Using the Options Query Property. |
We recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using the following .properties files:
See the topic Writing to Hadoop in the Integration Guide for more details.
Option | Description |
---|---|
HIVE_TABLE_NAME | Defaults to null. Table name inside HIVE_SCHEMA_NAME. |
HIVE_SCHEMA_NAME | Defaults to null. Schema (database) containing the table. Must be specified if HIVE_TABLE_NAME is specified. |
AUTH_METASTORE_PRINCIPAL | Defaults to null. Required if HIVE_TABLE_NAME is specified. 3-part name of the Kerberos principal which can read the Hive metastore. This is the value of the hive.metastore.kerberos.principal property set in the Hive installation's hive-site.xml descriptor file. |
HIVE_URI | Defaults to null. JDBC URL for accessing the Hive server. Must be specified if HIVE_TABLE_NAME is specified. |
HIVE_METASTORE_URIS | Defaults to null. Location of the Hive metastore. Required if HIVE_TABLE_NAME is specified. |
$columnName_HIVE_PARTITION_NAME_FORMAT | This option specifies custom formatting directives for partition key values when they are used to construct the names of HDFS directory levels. $columnName must be the name of a partition key of type DATE or TIMESTAMP. The value bound to this option must be a valid format string as understood by java.text.SimpleDateFormat. |
Note: In order to include Kerberos options for an HDFS server, you need to configure your own server object. This server will allow all foreign tables or streams that reference this server to inherit Kerberos options.
See the topic Writing to Hive in the Integration Guide for more details.
Format Name | Name |
---|---|
URL | URL for HTTP feed. |
HEADER_<name_of_header> | Tells HTTP writer to add a header to the request called <name_of_header>, with the option value as the value. |
PARAM_<name_of_param> | Tells HTTP writer to add a query parameter to the request called <name_of_param> with the option value as the value. |
OPTIONS_QUERY | Optional. Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set HEADER_xxx and PARAM_xxx options using select HEADER_ABC, PARAM_XYZ from TEST.http_options . For more details see the topic Using the Options Query Property. |
See the topic Writing to HTTP in the Integration Guide for more details.
Option | Description | Required | Default |
---|---|---|---|
QUEUE_MANAGER | Queue manager name of the IBM MQ. | True | |
SERVER_HOST | Hostname of the IBM MQ Server | True | |
SERVER_PORT | Listener Port of the IBM MQ Server | True | |
SERVER_CHANNEL | Channel name used by the queue where data needs to be written. | True | |
APPLICATION_USER | User name that application uses to connect to IBM MQ | False | |
APPLICATION_PASSWORD | Password of the application user encoded in base64 format. | False | |
QUEUE | Name of the queue to which you are sending data. Must specify either QUEUE or TOPIC but not both. | False | |
TOPIC | Name of the IBM MQ topic to which you are sending data. Must specify either QUEUE or TOPIC but not both. | False | |
WATERMARK_QUEUE_NAME | Queue name to where the watermark will be stored. | False | |
ASYNC_DISABLED | If async is enabled the put property of producer is disabled. | False | |
TRANSACTION_ROW_LIMIT | Commit Batch size of the messages produced. | False | 5000 |
TRANSACTION_ROWTIME_LIMIT | Commit time interval in ms after which batch is committed. | False | 20000 |
SQLSTREAM_POSITION_KEY | Column name from row where the watermark is provided to the plugin. | False | |
ENABLE_IBM_MESSAGE_PERSISTENCE | Persistence for the messages produced to the queue. | False | False |
CIPHER_SUITE | MQ Supported Cipher Spec corresponding to the channel used. | False | TLS_RSA_WITH_AES_128_CBC_SHA256 |
CONFIG_FILE_PATH | Path of properties file for system properties of java application for keystore, truststore path and password | False |
See the topic Writing to IBM MQ in the Integration Guide for more details.
These special egress columns are supported by the KAFKA10 plugin. Data arriving in these columns is not included in the outgoing Kafka message payload; instead they are used to set other message variables.
Name | Data Type | Description | Name in s-Server 6.0.0 |
---|---|---|---|
SQLSTREAM_EGRESS_KAFKA_PARTITION | INTEGER | This column is used to determine to which partition the message will be written. NOTE: SQLstream uses this approach and does not support use of the Kafka producer property "partitioner.class" |
KAFKA_PARTITION |
SQLSTREAM_EGRESS_KAFKA_TIMESTAMP | TIMESTAMP | This column is used to set the Kafka message timestamp | KAFKA_TIMESTAMP |
SQLSTREAM_EGRESS_KAFKA_KEY | VARBINARY(256) | This column is used as the message key | KAFKA_KEY |
SQLSTREAM_EGRESS_WATERMARK | VARCHAR(256) | This column is used as a watermark, and is saved when committing to the sink Kafka topic. The developer should set it to whatever watermark format is supported by the source plugin. The format of the stored watermark string is as follows: <CommitRowtimeInMillisFromEpoch>,<watermark_column_value>.
|
- |
Some of the following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation/#producerconfigs. Where appropriate, information in this table is drawn from this page.
Options shown in lower case are case sensitive; they must be lowercase and double-quoted. Options shown in upper case may be entered either unquoted in any case, or double-quoted in upper case. So TOPIC, Topic and topic are all valid; "TOPIC" is valid but "topic" is not.
Some options may not apply to all versions of the Kafka plugin. Each option below is marked as 'Kafka10', 'Legacy' or 'All':
Option Name | Adapter version | Description | |
---|---|---|---|
TOPIC | All | Kafka topic | |
"bootstrap.servers" | Kafka10 | hostname:port[,hostname:port] of the Kafka broker(s). Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers. | |
SEED_BROKERS | Legacy | A comma separated list of broker identifiers in the format "<broker_host_name>:<port>". Defaults to "localhost". | |
"metadata.broker.list" | Legacy | hostname:port of the Kafka broker. Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers. | |
"key.serializer" | Kafka10 | Names a serializer class for keys. If no class is given, Kafka uses value.serializer. | |
"key.serializer.class" | Legacy | Names a serializer class for keys. If no class is given, Kafka uses serializer.class. | |
"value.serializer" | Kafka10 | Names a serializer class for values. | |
"serializer.class" | Legacy | Names a serializer class for values. The default encoder takes a byte[] and returns the same byte[]. | |
"partitioner.class" | All | Fully qualified Java classname of Kafka partitioner. Defaults to com.sqlstream.aspen.namespace.kafka.KafkaOutputSink$RoundRobinPartitioner | |
"compression.type" | Kafka10 | If desired, specifies the final compression type for a given topic. Defaults to 'none'. Possible values: 'snappy', 'gzip' | |
"retry.backoff.ms" | All | Producers refresh metadata to see if a new leader has been elected. This option specifies the amount of time to wait before refreshing. | |
"request.timeout.ms" | All | When request.required.acks is enabled, this lets you specify how long the broker should try to bundle the specified number of messages before sending back an error to the client. | |
"send.buffer.bytes" | All | Socket write buffer size. | |
"client.id" | All | Using this option, you can specify a string to help you identify the application making calls to the Kafka server. | |
"transactional.id" | Kafka10 | This is the transaction ID used by the KafkaWriter instance for the given foreign stream. Each foreign stream should use a unique transactional.id to publish messages to the topic using transactions. Transactions are used only if Kafka brokers are v0.11.2. These support transactional semantics. NOTE: You need to create a separate foreign stream definition for each pump that inserts (publishes) messages to a given topic. Each of these foreign streams needs to use a unique "transactional.id" for itself. The foreign stream option "pump.name", defined below, needs to match the name of the pump that inserts into the foreign stream. (new in s-Server version 6.0.1) If you set transactional.id = 'auto', when a pump begins running, s-Server automatically sets transactional.id to '<fully_qualified_pump_name>_Pump', where <fully_qualified_pump_name> is the name of the pump that instantiates the sink. |
|
"pump.name" | Kafka10 | Deprecated since version 6.0.1. Fully qualified name of the pump that will process rows for this foreign stream. You must set transactional.id in order to use this option. s-Server uses this setting to determine the mode in which the pump instance itself is running (Leader or Follower) when you configure the Kafka adapter to run in High Availability (HA) mode. The pump.name needs to be fully qualified pump name of the format:<catalog_name>.<schema_name>.<pump_name> For example:'LOCALDB.mySchema.ProcessedOrdersPump' |
|
"linger.ms" | All | In cases where batch.size has not been reached, number of milliseconds that the Kafka producer will wait before batching sends . Defaults to '100', in milliseconds) | |
"batch.size" | All | Number of messages sent as a batch. Defaults to '1000' | |
"kafka.producer.config" | Kafka10 | Lets you specify the name of a properties file that contains a set of Kafka producer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the producer will invoke. Kafka offers a wide range of config properties. For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#producerconfigs. NOTE: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "bootstrap.servers". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime. |
|
"security.protocol" | Kafka10 | Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL | |
"transaction.timeout.ms" | Kafka10 | The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. | |
HA_ROLLOVER_TIMEOUT | Kafka10 | Time in milliseconds. Defaults to 5000. You must set transactional.id and pump.name in order to use this option. When the pump is configured to run in High Availability mode, and the pump is running as a "Follower", it waits for this amount of time for a lack of commits from the "Leader". After the timeout, the "Follower" attempts to takeover as the "Leader". There may be multiple follower instances running in a cluster. Only one of these followers succeeds to be designated as the new "Leader". All other pump instances using the same transactional.id continue "following". If the earlier "Leader" was still alive when one of the followers took over as the new "Leader", the earlier leader demotes itself as the "Follower" if possible. |
|
COMMIT_METADATA_COLUMN_NAME | Kafka10 | The name of the column that contains watermark values - default SQLSTREAM_EGRESS_WATERMARK. | |
SQLSTREAM_EGRESS_WATERMARK_SORT_FUNCTION | Kafka10 | If the SQLSTREAM_EGRESS_WATERMARK egress column is being used, the value chosen depends on the value of this option. Possible values are:
|
|
HEADERS_COLUMNS | Kafka10 | Deprecated - please use HEADERS_COLUMN_LIST | |
HEADERS_COLUMN_LIST | Kafka10 | Comma-separated list of foreign stream columns that will be mapped as outgoing headers, rather than to the record value itself. See Writing Headers to Kafka based on Column Data | |
OPTIONS_QUERY | All | Lets you query a table to update adapter options at runtime. You can use this, for example, to set the "bootstraop.servers" option from a table , as in select lastOffset as "bootstrap.servers" from TEST.kafka_write_options; | |
POLL_TIMEOUT | Legacy | This option specifies the timeout value in milliseconds to be passed as a parameter to the KafkaConsumer.poll() API call. The default is 100ms. | |
PORT | Legacy | Deprecated option. From s-Server 6.0.0, port numbers are specified in the SEED_BROKERS option. | |
"producer.type" | Legacy | This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are 'async' (asynchronous) and 'sync' (synchronous). Default 'sync' | |
"compression.codec" | Legacy | The compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy". Default 'none'. | |
"compressed.topics" | Legacy | If the compression codec is anything other than 'none', enable compression only for specified topics if any. If the list of compressed topics is empty, then enable the specified compression codec for all topics. | |
"message.send.max.retries" | Legacy | This property will cause the producer to automatically retry a failed send request. This property specifies the number of retries when such failures occur. Default 3. | |
"topic.metadata.refresh.interval.ms" | Legacy | The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available...). It will also poll regularly (default: every 10min so 600,000ms). If you set this to a negative value, metadata will only get refreshed on failure. Default 600*1000 (10 minutes). | |
"request.required.acks" | Legacy | How many other brokers must have committed the data to their log and acknowledged this to the leader? 0 means never wait; 1 means wait for the leader to acknowledge; -1 means wait for all replicas to acknowledge. Default 0 (never wait). | |
"queue.buffering.max.ms" | Legacy | Maximum time to buffer data when using async mode. Default 5000. | |
"queue.buffering.max.messages" | Legacy | The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped. Default 10000. | |
"queue.enqueue.timeout.ms" | Legacy | The amount of time to block before dropping messages when running in async mode and the buffer has reached queue.buffering.max.messages. If set to 0 events will be enqueued immediately or dropped if the queue is full (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send. Default -1. | |
"batch.num.messages" | Legacy | The number of messages to send in one batch when using async mode. The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached. Default 200. | |
"send.buffer.bytes" | Legacy | Socket write buffer size. Default 100x1024. | |
"SHARD_ID" | kafka10 | This optional parameter is used in scaling, when you are running multiple shards of a pipeline. It uniquely identifies the current shard. When writing metadata/watermarks to kafka, the SHARD_ID is added as a prefix to the SQLSTREAM_POSITION_KEY. It is also included in the automatically generated transaction id. When creating a restartable pipeline, this SHARD_ID is expected to be the same as the SHARD_ID of the source foreign stream. |
You can also enable security for the Kakfa10 plugin by configuring a truststore, keystore, and password for the Kafka broker used by s-Server. Because these options involve storing passwords, you should implement these using properties files, so the passwords are not visible in the SQL.
The SQL/MED framework used by s-Server supports loading options through JNDI .properties files. SQLstream s-Server looks for these .properties files in $SQLSTREAM_HOME/plugin/jndi/. See Using Property Files. You can also use the "kafka.consumer.config" or "kafka.producer.config" foreign stream options to reference a Kafka properties file that contains these or other kafka consumer / producer properties.
So we recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using one of the following .properties files:
Option Name | Adapter version | Description |
---|---|---|
"ssl.truststore.location" | Kafka10 | The location of the trust store file. |
"ssl.truststore.password" | Kafka10 | The password for the trust store file. If a password is not set access to the truststore is still available, but integrity checking is disabled. |
"ssl.keystore.location" | Kafka10 | The location of the key store file. This is optional for client and can be used for two-way authentication for client. |
"ssl.keystore.password" | Kafka10 | The store password for the key store file. This is optional for client and only needed if ssl.keystore.location is configured. |
"ssl.key.password" | Kafka10 | The password of the private key in the key store file. This is optional for client. |
See the topic Writing to Kafka in the Integration Guide for more details.
Option Name | Description |
---|---|
KINESIS_STREAM_NAME | Required. Name of Kinesis stream to write to. No default. |
AWS_REGION | Required. Region id of Kinesis region. See http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region for more detalis. |
AWS_PROFILE_PATH | See Setting Up an AWS Profile Path in the topic Reading from Kinesis Streams in the Integration Guide . Must point to a credential file on the s-Server file system with the following format:[default]aws_access_key_id = xxxaws_secret_access_key = yyy This defaults to '' - which goes to ~/.aws/credentials.Note: You need to have an AWS profile set up, and a configuration file stored on your system, in order to read from or write to Kinesis. See Setting Up an AWS Profile Path in the topic Reading from Kinesis Streams in the Integration Guide . |
AWS_PROFILE_NAME | Optional. Profile name to use within credentials file. Amazon supports multiple named profiles within a configuration file. If you have a named profile, you can reference it here. Defaults to default. See http://docs.aws.amazon.com/cli/latest/userguide/cli-multiple-profiles.html |
INITIAL_BACKOFF | Optional. If insert fails due to throttling, how many milliseconds to back off initially. Default to 20. |
MAX_BACKOFF | Optional. If insert fails due to throttling, how many milliseconds to back off as a maximum. Defaults to 20480. |
MAX_RETRIES | Optional. If insert fails due to throttling, how many retries should be made. Backoff is doubled for each retry up to max. Default is 10. |
BUFFER_SIZE | Optional. Maximum number of bytes per update request. Defaults to 4194304. |
MAX_RECORDS_PER_REQUEST | Optional. maximum number of records per update request. Defaults to 500. |
REPORT_FREQUENCY | Optional. How often to log (in milliseconds) statistics. Defaults to 0, which means "never". |
KINESIS_DEFAULT_PARTITION_ID | Optional. Partition id of shard to write to. Defaults to ''. |
See the topic Writing to Kinesis Streams in the Integration Guide for more details.
The SQLstream Pulsar Sink Plugin supports all the configurations mentioned in the official documentation of Apache Pulsar. For more information, refer to http://pulsar.apache.org/docs/en/2.5.0/client-libraries-java/#client.
There are three sets of configurations that are used by the Pulsar Sink Plugin:
All of these configurations can be added either as:
NOTE: As well as providing the configuration options described here for the Pulsar sink, you will also have to define a value for the FORMATTER option; see Output Formats for Writing and also the Pulsar SQL pipeline example.
This table lists various Client configurations which can be used to write data to the pulsar sink.
Name | Type | Description | Default Value |
---|---|---|---|
serviceUrl | String | Service URL provider for Pulsar service. It is the URL to the Pulsar web service or broker service. If the Pulsar cluster is not enabled for TLS, enter either the web service or broker service URL in the following format: - Web service URL: http://<host name>:<web service port> - for example: http://localhost:8080 . - Broker service URL: pulsar://<host name>:<broker service port>. For example: pulsar://localhost:6650 If the Pulsar cluster is enabled for TLS, enter the secure broker service URL in the following format: pulsar+ssl://<host name>:<broker service TLS port> - for example: pulsar+ssl://localhost:6651 |
If useTls is set to true then: pulsar+ssl://localhost:6651 If useTls is set to false then: pulsar://localhost:6650 |
useTls | boolean | Whether to use TLS encryption on the connection. | false |
tlsTrustCertsFilePath | String | Path to the trusted TLS certificate file. It is required if useTls is set to true. | None |
authPluginClassName | String | Name of the authentication plugin. Example For TLS authentication we can have: org.apache.pulsar.client.impl.auth.AuthenticationTls |
None |
authParams | String | String represents parameters for the authentication plugin Example key1:val1,key2:val2 For TLS authentication we can have: tlsCertFile:/home/sqlstream/Downloads/my-ca/admin.cert.pem,tlsKeyFile:/home/sqlstream/Downloads/my-ca/admin.key-pk8.pem |
None |
This table lists various producer configurations which can be used to write data to the pulsar sink.
Name | Type | Description | Default Value |
---|---|---|---|
topicName | String | Represents the topic name. Pass the topicName with the topic namespace as follows: {persistent|non-persistent}:/ If you enter a topic name only and no topic namespace is specified, then Pulsar uses the default location persistent://public/default/ To publish to a persistent topic belonging to the public tenant in the default namespace, simply enter the topic name as follows: my-topic If the specified topic does not exist, Pulsar creates the topic when the pipeline starts. You can use expressions to define the topic name. For example: if the my-topic field in the record contains the topic name, enter the following as the topic name: persistent://my-tenant/my-namespace/${record:value("/my-topic")} Also, the namespace needs to be created explicitly. If the namespace does not exist, pulsar throws an exception. |
None |
producerName | String | Represents producer name | SQLStreamProducer |
sendTimeoutMs | long | Represents message send timeout in milliseconds. If a message is not acknowledged by a server before the sendTimeout expires, an error occurs. | 30000 |
blockIfQueueFull | boolean | If it is set to true, when the outgoing message queue is full, the Send and SendAsync methods of producer blocks, rather than failing and throwing errors. If it is set to false, when the outgoing message queue is full, the Send and SendAsync methods of producer fail and ProducerQueueIsFullError exceptions occur. The MaxPendingMessages parameter determines the size of the outgoing message queue. |
false |
maxPendingMessages | int | Represents the maximum size of a queue holding pending messages - messages waiting to receive an acknowledgment from a broker. By default, when the queue is full, all calls to the Send and SendAsync methods fail unless you set BlockIfQueueFull to true. |
1000 |
messageRoutingMode | MessageRoutingMode | Message routing logic for producers on partitioned topics. Note Apply the logic only when setting no key on messages. Available options are as follows: 1. pulsar.RoundRobinDistribution: round robin 2. pulsar.UseSinglePartition: publish all messages to a single partition 3. pulsar.CustomPartition: a custom partitioning scheme For more information on custom-router, please see https://pulsar.apache.org/docs/en/2.5.1/cookbooks-partitioned/ |
pulsar.RoundRobinDistribution |
batchingEnabled | boolean | Enable batching of messages. | true |
batchingMaxMessages | int | The maximum number of messages permitted in a batch. | 1000 |
compressionType | CompressionType | Message data compression type used by a producer. Available options: 1. LZ4 2. ZLIB 3. ZSTD 4. SNAPPY |
No compression |
This table lists some additional producer configurations which can be used to write data to the pulsar sink.
Name | Type | Description | Default Value |
---|---|---|---|
sendAsync | boolean | Enable Asynchronous publishing of messages. | true |
messageEncryption | boolean | Enable end-to-end message encryption. If enabled, providing publicKeyFile is mandatory. | false |
publicKeyFile | String | The absolute path to the RSA or ECDSA public key which is required to encrypt the messages. Also, make sure that you have specified the corresponding privateKeyFile at the consumer end for the messages to be consumed successfully. | null |
encryptionKey | String | The encryption key name. | sqlstream.key |
COUNT_SCHEDULER_TIME | long | Use to schedule a counter that prints the total number of messages whose acknowledgement has been received by the client, in the SQLstream trace log files. This is calculated in milliseconds. Set the value to 0 to disable the counter. | 0 |
PULSAR_CONFIG_FILE | String | Contains the absolute path of the pulsar properties file. If this option is not provided, the Pulsar plugin prints a WARNING and proceeds with the default configurations or the ones passed in the sink options. | None |
In writing to the Pulsar sink, you can declare and use these special egress columns.
Name | Type | Description | Default Value |
---|---|---|---|
PULSAR_PARTITION | INTEGER | When writing rows out of s-Server to a Pulsar topic, you can specify the topic partition to write the data as message to the indicated partition. In case of a NULL value in a particular row, that message is allocated to a random partition. If this column is not referenced, the data is written in RoundRobin Routing Mode. |
RoundRobin |
PULSAR_KEY | VARBINARY | Messages can optionally be tagged with keys, which can be useful for things like topic compaction. These are also used for Routing messages to the partition. For more information please see https://pulsar.apache.org/docs/en/concepts-messaging/#routing-modes | None |
PULSAR_TIMESTAMP | TIMESTAMP | This column is used to add EventTime to the message. It is a mandatory (non-nullable) value. If the PULSAR_TIMESTAMP column value for a particular row is NULL, then that row is discarded. | System’s Current Time |
See the topic Writing to Pulsar in the Integration Guide for more details.
Option | Definition |
---|---|
USERNAME | Required. User name for the SMTP server defined in HOST. |
HOST | Required. Host name for the SMTP server. |
PASSWORD | Optional. Password to use for the SMTP server defined in HOST. s-Server uses this option with USERNAME when authenticating to the SMTP server. If this option is empty, s-Server will not attempt authentication. Defaults to none. |
PORT | Optional. Port for SMTP server defined by HOST. Defaults to none. |
CONNECTION_SECURITY | Optional. Security used to connect to SMTP server. SSL, STARTTLS or NONE. Defaults to NONE. |
SENDER | Optional. Used in the “Sender” header when sending the email. Can also be specified as a special column in the foreign stream. |
RECEIVER | Optional. Address used in the “To” header when sending the email. Can also be specified as a special column in the foreign stream. You can specify multiple addresses separated by commas. |
SUBJECT | Optional. Subject for email. Can also be specified as a special column in the foreign stream. |
REPLY_TO | Optional. Address used in the "Reply-To" header when sending the email. Can also be specified as a special column in the foreign stream. |
CC | Optional. Address used in the “CC” header when sending the email. Can also be specified as a special column in the foreign stream. You can specify multiple addresses separated by commas. |
BCC | Optional. Address used in the “BCC” header when sending the email. Can also be specified as a special column in the foreign stream. You can specify multiple addresses separated by commas. |
TIMEOUT | Optional. Socket read timeout value in milliseconds. Defaults to 30000. |
CONN_TIMEOUT | Optional. Socket connection timeout value in milliseconds. Defaults to 30000. |
FORMAT_CHARSET_KEY | Optional. Charset for formatting mail. Defaults to UTF-8. See https://docs.oracle.com/javase/8/docs/api/java/nio/charset/StandardCharsets.html |
LOCALHOST | Optional. Used to set the mail.smtp.localhost option in the JavaMail API. See https://javaee.github.io/javamail/docs/api/com/sun/mail/smtp/package-summary.html for more information. It is generally best to leave this blank. |
OPTIONS_QUERY | Optional. Lets you query a table to update adapter options at runtime. You can use this, for example, to set the PORT option using select PORT from TEST.mail_options . For more details see the topic Using the Options Query Property. |
See the topic Writing to Mail in the Integration Guide for more details.
Option | Definition |
---|---|
URL | Fully qualified URL starting with mongodb:// and including, at minimum, a host name (or IP address or UNIX domain socket). URL can also include a username and password (these are passed to the MongoDB instance) and a port number. Seehttps://docs.mongodb.com/manual/reference/connection-string/ for more information. |
COLLECTION | MongoDB collection to which data will be written. |
OPTIONS_QUERY | Optional. Lets you query a table to update adapter options at runtime. You can use this, for example, to set the URL option from a table, as in select myUrl as URL from TEST.mongo_options . For more details see the topic Using the Options Query Property. |
See the topic Writing to MongoDB in the Integration Guide for more details.
Option | Description |
---|---|
CONNECTION_URL | 'tcp://127.0.0.1:1883', |
TOPIC | MQTT topic. UTF-8 string that the MQTT broker uses to filter messages for each connected client. |
CLIENT_ID | s-Server implements an MQTT client to connect to the MQTT server. This setting provides a MQTT ClientID for this client. The MQTT broker uses the ClientID to identify the client and its current state. As a result, if used, you should use a distinct CLIENT_ID for each foreign stream defined for MQTT. Defaults to randomly generated. |
QOS | Defines the guarantee of delivery for a specific message. Either at most once (0), at least once (1), exactly once (2). Defaults to 1. For more information on QOS, see https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/ |
USERNAME | Optional. User name for MQTT connection. Note: s-Server sends user names and passwords as plain text. |
PASSWORD | Optional. Password for MQTT connection. Note: s-Server sends user names and passwords as plain text. |
KEEP_ALIVE_INTERVAL | Optional. Interval in seconds sent to MQTT broker when s-Server establishes a connection. Specifies the longest time period of time that broker and client persist without sending a message. Defaults to 60. |
CONNECTION_TIMEOUT | Optional. Connection timeout in seconds. Defines the maximum time interval the client will wait for the network |
connection to the MQTT server to be established. If you set this value to 0, s-Server disables timeout processing, and the client will wait until the network connection is made successfully or fails. Defaults to 30. | | RETAINED | Output only. True or False. If set to true, tells broker to store the last retained message and the QOS for this topic. Defaults to false. | | MAX_IN_FLIGHT | Output only. When QOS is set to 1 or 2, this is the Maximum number of outgoing messages that can be in the process of being transmitted at the same time. This number includes messages currently going in handshakes and messages being retried. Defaults to 10. |
See the topic Writing to MQTT in the Integration Guide for more details.
The ECD framework can act as a server or client. When it acts a client, set REMOTE_HOST and REMOTE_PORT. When it acts a server, set SERVER_PORT and if desired SERVER_HOST.
Name | Description |
---|---|
IS_IPV6 | Whether or not the socket uses IPV6. Default is false. |
PROTOCOL | Whether the socket uses TCP or UDP. Default is TCP. |
REMOTE_HOST | Hostname to send tuples to when ECDA is acting as a client. You can override this to 'LOCALHOST' to listen to only local connections, or a specific ip address, such as 168.212.226.204. When you specify REMOTE_HOST and REMOTE_PORT, this tells the ECD socket code to start the network connection as a client. |
REMOTE_PORT | Port to send tuples to when ECDA is acting as a client. REMOTE_ and SERVER_ tells ECDA's socket code how to start the network connection (as a server or a client). |
SERVER_HOST | The hostname or IP address to listen upon to send tuples, when ECDA is acting as a server (defaults to 0.0.0.0). When you specify SERVER_HOST and SERVER_PORT, this tells the ECD socket code to start the network connection as a client. |
SERVER_PORT | the port to listen upon to send tuples when ECDA is acting as a server. |
OPTIONS_QUERY | Optional. Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set the REMOTE_HOST and REMOTE_PORT options using select REMOTE_HOST, REMOTE_PORT from TEST.socket_options . For more details see the topic Using the Options Query Property. |
See the topic Writing to Network Sockets in the Integrating with Other Systems for more details.
Option Name | Description |
---|---|
URL | Required - denotes the Prometheus push gateway URL endpoint to which the metrices generated by the pipeline are sent. |
JOB | Required - An application identifier used to denote a specific application pipeline. |
METRIC_TYPE | Required - represents the type of metric, either 'counter' or 'gauge'. For more details, see https://www.prometheus.io/docs/concepts/metric_types/ |
Namespace | A prefix that helps to identify multiple metrics name used across the application or a system. For example, in the _projectname:module:invalid_recordtotal metric, projectname:module: indicates a namespace. |
Labels | A global label that is applied to all the metrics using the specific Prometheus sink and can annotate a metric with very specific information such as component=7stream;. For more details, see https://prometheus.io/docs/practices/naming/ |
See the topic Writing to Prometheus Pushgateway in the Integration Guide for more details.
Option Name | Description |
---|---|
ACCOUNT | The name assigned to your account by Snowflake. |
USER | The user name for your Snowflake account. |
PASSWORD | The password for your Snowflake account. |
DB | The database to write to in Snowflake. This should be an existing database for which user/password has privileges. |
SCHEMA | The schema to write to in the Snowflake database. This should be an existing schema for which user/password has privileges. |
WAREHOUSE | The Snowflake warehouse to write to. This should be an existing warehouse for which user/password has privileges. |
DTABLE | The table to write to in Snowflake. This should be an existing table for which user/password has privileges. |
OPTIONS_QUERY | Optional. Lets you query a table to update adapter options at runtime. You can use this, for example, to set the SCHEMA option from a table that contains the schema, as in select schemaToUse as SCHEMA from TEST.snowflakeOptions . For more details see the topic Using the Options Query Property. |
The minimum configuration options required to write to a Snowflake warehouse are the warehouse name, user name/password, account, database, schema, and stream/table.
As well as these Snowflake options, you must also specify:
See the topic Writing to Snowflake in the Integration Guide for more details.
Format Name | Name |
---|---|
URL | URL for web socket. |
HEADER_<name_of_header> | Tells Web Socket writer to add a header called <name_of_header> to the request. |
OPTIONS_QUERY | Optional. Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set HEADER_xxx options using select HEADER_ABC, HEADER_XYZ from TEST.http_options . For more details see the topic Using the Options Query Property. |
See the topic Writing to WebSockets in the Integration Guide for more details.
Option | Definition |
---|---|
FORMATTER | This needs to be CSV. |
CHARACTER_ENCODING | Character set for data. |
FORMATTER_INCLUDE_ROWTIME | Whether or not to include rowtime when writing data. Defaults to 'true'. |
WRITE_HEADER | Whether or not to write header information (a header row) into the CSV data. Defaults to 'false'. |
ROW_SEPARATOR | Character(s) separating rows in CSV data. Defaults to '\n' (a newline). Supports multiple characters, and supports use of Unicode literals such as U&'\000D\000A' |
SEPARATOR | Character(s) separating field values within a row. Defaults to ','. Supports multi-character strings like '$$' or '@!@', and single or multi-character Unicode literals. |
QUOTE_CHARACTER | Lets you specify a quotation character to wrap the output value if the SEPARATOR string is present in the column value. There is no default for quote character. Only a single one-byte character may be used, which limits to code points between 0 and 127. |
STARTING_OUTPUT_ROWTIME | If this is set, then all rows with ROWTIME less than the specified value are silently dropped. It is specified using an ISO 8601 format like '2020-02-06T12:34:56.789'. |
Option Name | Description |
---|---|
FORMATTER | This needs to be XML. |
DOC_ELEMENTS | Specifies a list of elements, separated by slashes ( /), to make as the root of the XML document to write. Defaults to "batch". |
ROW_ELEMENTS | Specifies a list of elements, separated by slashes ( /), to add for each row of the XML document's DOM. Defaults to "row". |
DATA_ELEMENTS | Specifies a list of elements, separated by slashes ( /), to add for each datum in a row/tuple. You must define DATA_ELEMENTS or DATA_ATTRIBUTES but not both. Using both will produce inconsistent XML. |
DATA_ATTRIBUTES | Specifies a name of an attribute to add for each datum in a row/tuple. You must define DATA_ELEMENTS or DATA_ATTRIBUTES but not both. Using both will produce inconsistent XML. |
Specifies a list of elements, separated by slashes ( /), to add for a specific datum in each row/tuple. | |
Specifies a name of an attribute to add for a specific column's datum in each row/tuple. | |
CHARACTER_ENCODING | Character set for data. |
FORMATTER_INCLUDE_ROWTIME | Whether or not to include rowtime when writing data. Defaults to 'true'. |
STARTING_OUTPUT_ROWTIME | If this is set, then all rows with ROWTIME less than the specified value are silently dropped. It is specified using an ISO 8601 format like '2020-02-06T12:34:56.789'. |
Option | Definition |
---|---|
FORMATTER | This needs to be JSON. |
CHARACTER_ENCODING | Character set for data. |
FORMATTER_INCLUDE_ROWTIME | Whether or not to include rowtime when writing data. Defaults to 'true'. |
NESTED_JSON | Whether or not to write data in nested json format. Defaults to 'false'. |
SCHEMA_FILE_LOCATION | Format and validate the output json based on json schema file. |
PATH_COLUMN_NAME | Path of the duplicate fields, if any. |
STARTING_OUTPUT_ROWTIME | If this is set, then all rows with ROWTIME less than the specified value are silently dropped. It is specified using an ISO 8601 format like '2020-02-06T12:34:56.789'. |
Option | Definition |
---|---|
FORMATTER | This needs to be BSON. |
CHARACTER_ENCODING | Character set for data. |
FORMATTER_INCLUDE_ROWTIME | Whether or not to include rowtime when writing data. Defaults to 'true'. |
STARTING_OUTPUT_ROWTIME | If this is set, then all rows with ROWTIME less than the specified value are silently dropped. It is specified using an ISO 8601 format like '2020-02-06T12:34:56.789'. |
Option | Definition |
---|---|
FORMATTER | This needs to be AVRO. |
AVRO_SCHEMA_LOCATION | Required option to specify the path for the schema file to be used for formatting the Avro payload. |
FORMATTER_INCLUDE_ROWTIME | Whether or not to include rowtime when writing data. Defaults to 'true'. |
STARTING_OUTPUT_ROWTIME | If this is set, then all rows with ROWTIME less than the specified value are silently dropped. It is specified using an ISO 8601 format like '2020-02-06T12:34:56.789'. |
Option | Default Value | Required | Description |
---|---|---|---|
FORMATTER | none | yes | This needs to be ORC. |
"orc.version" | none | yes | Currently supported values are 'V_0_11' and 'V_0_12'. |
"orc.block.padding" | true | no | Sets whether the HDFS blocks are padded to prevent stripes from straddling blocks. Padding improves locality and thus the speed of reading, but costs space. |
"orc.block.size" | 268,435,456 | no | Set the file system block size for the file. For optimal performance, set the block size to be multiple factors of stripe size. |
"orc.direct.encoding.columns" | null | no | Set the comma-separated list of case-sensitive names of columns that should be direct encoded. |
"orc.batch.size" | 10,000 | no | Number of rows to batch together before issuing an ORC write. |
"orc.user.metadata_XXX" | none | no | Where XXX is a string. XXX is the name of an application-specific metadata key. Therefore, it should be clear that it lives in a namespace unique to the application. There can be many of these options. The value of each option is a SQL BINARY literal with the leading X' stripped off. |
STARTING_OUTPUT_ROWTIME | If this is set, then all rows with ROWTIME less than the specified value are silently dropped. It is specified using an ISO 8601 format like '2020-02-06T12:34:56.789'. |
The options below apply only to cases when you are writing to the file system or HDFS. You cannot use these options when writing ORC data to a Hive table.
Option | Default Value | Description |
---|---|---|
"orc.bloom.filter.columns" | null | Column-separated list of bloom filter columns. These are names from the foreign table/stream's column signature. |
"orc.bloom.filter.fpp" | 0.05 | Bloom filter false-positive probability. |
"orc.compress" | ZLIB | Compression algorithm. See Appendix A for legal values. |
"orc.compress.size" | 262,144 | Compression chunk size. |
"orc.row.index.stride" | 10,000 | Number of rows between index entries. |
"orc.stripe.size" | 67,108,864 | Size of in-memory write buffer. |
Option | Definition |
---|---|
SKIP_HEADER | True or false; defaults to false. Specifies if the parser should skip the first row of input files. Usually, you would do this if the first row of data contains column headers. |
DISALLOW_QUOTED_ROW_SEPARATOR | True or false: defaults to false. If true, the parsing will not search for the row separator within a quoted field - so this allows multi-line field values to be ingested. If false, finding the row separator terminates field parsing even if there is an unmatched start quote. |
ROW_SEPARATOR | Character(s) separating rows in CSV data. Defaults to '\n' (a newline). Supports multi-character values, and supports use of Unicode literals such as U&'\000D\000A' (CR/LF - for reading from Windows / DOS files). |
SEPARATOR | Character(s) separating field values within a row. Defaults to ','. Supports multi-character strings like '$$' or '@!@', and single or multi-character Unicode literals. |
QUOTE_CHARACTER | Lets you specify an expected quotation character (which may be applied to any incoming field, or may be present only when a field value includes the SEPARATOR string). There is no default for quote character. Only a single one-byte character may be used, which limits to code points between 0 and 127. |
QUOTED_COLUMNS | If set to anything non-blank, sets the quote character to a double quote |
COLUMN_MAPPING | Allows the extraction and re-ordering of a subset of fields from the CSV record. Fields can be re-ordered. This may not be combined with UNPARSED_TEXT |
UNPARSED_TEXT | What to do with additional trailing data in the CSV record. Options are 'TRUNCATE' (the default) 'LAST COLUMN' or 'NEW ROW'. This may not be combined with COLUMN_MAPPING |
When parsing data, you can define provenance columns for your foreign stream. These return metadata for the parsed data.
For CSV, these are as follows:
Data Type | Name | Value |
---|---|---|
BIGINT | SQLSTREAM_PROV_PARSE_POSITION | Parser position within message of last parse error. |
VARCHAR(65535) | SQLSTREAM_PROV_PARSE_ERROR | Description of parser error. |
BIGINT | SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER | How many lines have been parsed so far. This value is not reset per message or file. |
Option | Definition |
---|---|
PARSER | This needs to be JSON. |
ROW_PATH | This is the JSON path for the row to be found. The JsonPath parser uses a row path to find JSON objects containing a row, then the path for each column is used to find the value to be extracted. |
<COLUMN_NAME>_PATH | Optional for each column in the ECDA column set. This defaults to \$..<COLUMN_NAME>. Here, a column named 'FOO' would have an option named FOO_PATH that defaulted to $..FOO which would return the first property named FOO under the JSON object found by the ROW_PATH. |
CHARACTER_ENCODING | Character set for data. |
Option | Definition |
---|---|
PARSER | This needs to be XML. |
CHARACTER_ENCODING | Character set for data. |
PARSER_XML_ROW_TAGS | An absolute XPATH query that finds the XML element that becomes one row. No default. |
An XPATH that finds the text that becomes the value of the column of the same name. Examples include "RetailStoreID_XPATH" '/POSLog/Body/tri:RetailTransaction/RetailStoreID',"WorkstationID_XPATH" '/POSLog/Body/tri:RetailTransaction/WorkstationID', | |
PARSER_XML_USE_ATTRIBUTES | True/false (default is false). Specifies the default XPATH query to find each column's data in a tuple. If false, the column name is assigned from a child element's tag name. If true, the default is \@<column_name>, meaning an attribute of the xml row element. |
CUSTOM_TYPE_PARSER_ |
Allows overriding of individual column's parsing. Specifies a fully qualified Java classname thatimplements com.sqlstream.aspen.namespace.common.TypeParser |
When parsing data, you can declare provenance columns. These return metadata for the parsed data.
For XML, these are as follows:
Data Type | Name in s-Server 6.0.0 | Name in s-Server 6.0.1 | Value |
---|---|---|---|
BIGINT | PARSE_POSITION | SQLSTREAM_PROV_PARSE_POSITION | Parser position within message of last parse error. |
BIGINT | PARSE_LINE_NUMBER | SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER | How many lines have been parsed so far. This value is not reset per message or file. |
Option | Definition |
---|---|
PARSER | This needs to be KV. |
QUOTE_CHARACTER | Lets you specify a different quote character, such as a single quote ('). Default is double quote ("). |
KEY_VALUE_SEPARATOR_CHARACTER | Lets you specify the character that connects keys and values. Default is equals symbol (=) |
SEPARATOR | Lets you specify a character that separates key-value pairs. Default is comma (,). |
ROW_SEPARATOR | Lets you specify a character that splits lines in the key-value source. Default is \n. |
When parsing data, you can declare provenance columns. These return metadata for the parsed data.
For Key Values, these are as follows:
Data Type | Name | Description |
---|---|---|
BIGINT | SQLSTREAM_PROV_PARSE_POSITION | Parser position within message of last parse error. |
VARCHAR(65535) | SQLSTREAM_PROV_PARSE_ERROR | Description of parser error. |
BIGINT | SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER | How many lines have been parsed so far. This value is not reset per message or file. |
Option | Definition |
---|---|
PARSER | This needs to be AVRO. |
AVRO_SCHEMA_LOCATION | This option can either be an HTTP URL to fetch the schema or it can be a path to a file on server host machine or VM. |
SCHEMA_HEADER | is a required option to indicate if the AVRO schema is embedded in the AVRO data. This option needs to be set to false for data sources like Kafka or AMQP, where each message can be one or more serialized Avro records without a schema. |
ROW_PATH | This is the AVRO path for the row to be found. The Avro parser uses a row path to find Avro objects containing a row, then the path for each column is used to find the value to be extracted. |
PATH | Path for each column in the ECDA column set. This defaults to _\$..<COLUMNNAME>. Here, a column named 'FOO' would have an option named FOO_PATH that defaulted to \$..FOO which would return the first property named FOO under the Avro object found by the ROW_PATH. |
Option | Definition |
---|---|
PARSER | 'PROTOBUF' Required. Indicates that ECD parser will parse files as protobuf. |
SCHEMA_JAR | Required. Jar containing compiled java classes created with the Google protocol buffer compiler (protoc command), such asunitsql/concurrent/plugins/common/protobufData/protobufpackage.jar. Locations are relative to $SQLSTREAM_HOME. See https://developers.google.com/protocol-buffers/docs/javatutorial for more details.Note: $SQLSTREAM_HOME refers to the installation directory for s-Server, such as /opt/sqlstream/ |
SCHEMA_CLASS | Required. Class name of outer protobuf record created with the Google protocol buffer compiler (protoc command), such as protobuf.PackageProto.protobuf.PackageProto\$protobufPackage. Locations are relative to $SQLSTREAM_HOME. See https://developers.google.com/protocol-buffers/docs/javatutorial for more details. |
<column name>_PATH | Not required. Lets you specify a path within the schema that maps to a column in the foreign stream. If these are not specified, s-Server populates a column using a field with the same name in the outer level record . |
MESSAGE_STREAM | True or false (defaults to false). This option tells the ProtoBuf parser to expect multiple messages within a single payload, each preceded by a bytes field containing their encoded length. When you set MESSAGE_STREAM to true, s-Server will expect that messages will be prefixed by this bytes field. |
Option Name | Description |
---|---|
PARSER | Must be set to NONE to use the None parser. |
CHARACTER_ENCODING | Only applies with VARCHAR. Defaults to UTF-8. See https://docs.oracle.com/javase/8/docs/api/java/nio/charset/StandardCharsets.html |
ROW_SEPARATOR | If ROW_SEPARATOR is specified, it should either be a string of HEX digit pairs when going to VARBINARY or a string that encodes properly in the character encoding being used. If ROW_SEPARATOR is not specified (or is empty) then each row will contain the entire content of one file or one message. |