Configuring External Stream Sinks

External Stream sinks make use of s-Server's Extensible Common Data framework. This framework allows you to write rows of data in a range of forms over a range of input/output formats, including:

Other sinks are available in SQLstream - ee the complete list of s-Server sinks in the Integration Guide.

All data is sent as a string in CSV, XML, or JSON format.

Using the File System as a External Stream Sink

To read streaming data over the file system, you need two pieces of information:

  • The directory in which the file resides.
  • A pattern for the file's name. Here, you enter part of the file's name, such as output, csv, or log. No quotation marks are necessary.
  • If desired, enter a prefix and suffix for the file.
  • If desired, change the Filename Date Format for the file. (Files are renamed as they are rotated.)
  • Indicate how the file should be rotated, by entering either:
    • Maximum Bytes per File. Files will be rotated once they reach this size.
    • Maximum Time per File. Files will be rotated once you reach the time limit.

Foreign Stream Options for Writing to Hadoop/HDFS

Option Name Scope Required? Description
HDFS_OUTPUT_DIR All Required Address for name node of HDFS, such as hdfs://storm-s3.disruptivetech.com:8020/user/sqlstream/ or for ADLS Gen2 abfs://@.dfs.core.windows.net/path/to/destination . This is where data will be written to on the HDFS server.
ABFS_CONFIG_FILE ABFS Required Path of the ABFS config file with credentials (only yaml file)
ABFS_AUTH_METHOD ABFS Required Authentication method used to upload data - either "SharedKey" or "OAuth" - which indicates OAuth 2.0. These values are case sensitive.
REFRESH_PROPS ABFS Optional If set ‘true’, config file will be read again and new instance of configurations will be used, if set 'false' or not used, any change in config file will not be detected unless s-server is restarted.
AUTH_METHOD HDFS, Hive Optional If desired, specify 'kerberos' Requires AUTH_USERNAME and AUTH_KEYTAB (the latter should be implemented using JNDI properties. See Hadoop Options for Use with JNDI Properties below
AUTH_USERNAME HDFS, Hive Optional User name for HDFS. Required if AUTH_METHOD is specified.
CONFIG_PATH HDFS, Hive Optional Specifies path to an HDFS client configuration file. This will be loaded and used by s-Server's HDFS client in it’s entire life cycle. Example: /home/me/work/kerberos/core-default.xml
AUTH_KEYTAB HDFS, Hive Optional Path to file containing pairs of Kerberos principals and encrypted keys, such as /tmp/nn.service.keytab Required if AUTH_METHOD is specified.
HIVE_TABLE_NAME Hive Required Table name inside HIVE_SCHEMA_NAME.
HIVE_SCHEMA_NAME Hive Required Schema (database) containing the table. Must be specified if HIVE_TABLE_NAME is specified.
AUTH_METASTORE_PRINCIPAL Hive Required Defaults to null. Required if HIVE_TABLE_NAME is specified. 3-part name of the Kerberos principal which can read the Hive metastore. This is the value of the hive.metastore.kerberos.principal property set in the Hive installation's hive-site.xml descriptor file.
HIVE_URI Hive Required JDBC URL for accessing the Hive server when loading data into Hive tables. Must be specified if HIVE_TABLE_NAME is specified.
HIVE_METASTORE_URIS Hive Optional Location of the Hive metastore for loading data into Hive tables. Required if HIVE_TABLE_NAME is specified.
$columnName_HIVE_PARTITION_NAME_FORMAT Hive Optional This option specifies custom formatting directives for partition key values when they are used to construct the names of HDFS directory levels. $columnName must be the name of a partition key of type DATE or TIMESTAMP. The value bound to this option must be a valid format string as understood by java.text.SimpleDateFormat.
OPTIONS_QUERY All Optional Lets you query a table to update one or more adapter options at runtime. You can use this, for example, to set the HDFS_OUTPUT_DIR option from a table that contains options, as in select lastOffset as HDFS_OUTPUT_DIR from TEST.hdfs_options. For more details see the topic Using the Options Query Property.

Hadoop options for use with JNDI properties

We recommend that you supply any options related to access credentials, such as those for SSL or Kerberos credentials, using the following .properties files:

  • $SQLSTREAM_HOME/plugin/jndi/.properties (Applies to all foreign streams that use this foreign server definition.)
  • $SQLSTREAM_HOME/plugin/jndi/LOCALDB...properties (Applies only to the foreign stream.)

Foreign Stream Options for Writing to Hive

Option Description
HIVE_TABLE_NAME Defaults to null. Table name inside HIVE_SCHEMA_NAME.
HIVE_SCHEMA_NAME Defaults to null. Schema (database) containing the table. Must be specified if HIVE_TABLE_NAME is specified.
AUTH_METASTORE_PRINCIPAL Defaults to null. Required if HIVE_TABLE_NAME is specified. 3-part name of the Kerberos principal which can read the Hive metastore. This is the value of the hive.metastore.kerberos.principal property set in the Hive installation's hive-site.xml descriptor file.
HIVE_URI Defaults to null. JDBC URL for accessing the Hive server. Must be specified if HIVE_TABLE_NAME is specified.
HIVE_METASTORE_URIS Defaults to null. Location of the Hive metastore. Required if HIVE_TABLE_NAME is specified.
$columnName_HIVE_PARTITION_NAME_FORMAT This option specifies custom formatting directives for partition key values when they are used to construct the names of HDFS directory levels. $columnName must be the name of a partition key of type DATE or TIMESTAMP. The value bound to this option must be a valid format string as understood by java.text.SimpleDateFormat.

Note: In order to include Kerberos options for an HDFS server, you need to configure your own server object. This server will allow all foreign tables or streams that reference this server to inherit Kerberos options.

Using HTTP as a External Stream Sink

To write data to HTTP, you need to configure the external stream. StreamLab uses this information to implement an HTTP client that writes data to the remote server.

Foreign Stream Options for Writing to a Snowflake Warehouse

Option Name Description
ACCOUNT The name assigned to your account by Snowflake.
USER The user name for your Snowflake account.
PASSWORD The password for your Snowflake account.
DB The database to write to in Snowflake. This should be an existing database for which user/password has privileges.
SCHEMA The schema to write to in the Snowflake database. This should be an existing schema for which user/password has privileges.
WAREHOUSE The Snowflake warehouse to write to. This should be an existing warehouse for which user/password has privileges.
DTABLE The table to write to in Snowflake. This should be an existing table for which user/password has privileges.
OPTIONS_QUERY Optional. Lets you query a table to update adapter options at runtime. You can use this, for example, to set the SCHEMA option from a table that contains the schema, as in select schemaToUse as SCHEMA from TEST.snowflakeOptions. For more details see the topic Using the Options Query Property.

The minimum configuration options required to write to a Snowflake warehouse are the warehouse name, user name/password, account, database, schema, and stream/table.

As well as these Snowflake options, you must also specify:

Using a Network Socket as a External Stream Sink

To read from a line, CSV, XML, or JSON file over a network socket, you need to configure the socket connection. You may want to consult with whoever has set up the application with which StreamLab will connect over the socket. Network sockets initiate a connection over TCP or UDP. Connections can be initiated by remote applications or by StreamLab itself. To tell StreamLab listen to a remote host, use the Remote Host and Remote Port fields. Connections can optionally be made using IPV6.

Name Description
Remote Host Hostname to send rows to or receive rows from. You can override this to 'LOCALHOST' to listen to only local connections, or a specific ip address, such as <168.212.226.204>.
Remote Port Port to send rows to or receive rows from.
Server Host The hostname or IP address to listen upon to send/receive rows (defaults to 0.0.0.0).
Server Port The port to listen upon to send/receive rows.
Socket uses TCP? Whether the socket uses TCP (True) or UDP (False). Default is false (UDP).
Socket uses IPV6? Whether or not the socket uses IPV6. Default is false.
Skip Header True or false; defaults to false. Specifies if the parser should skip the first row of input files. Usually, you would do this if the first row of data contains column headers.

Using MongoDB as a External Stream Sink

Options for Writing to a MongoDB Collection

Option Definition
URL Fully qualified URL starting with *mongodb:// and including, at minimum, a host name (or IP address or UNIX domain socket). URL can also include a username and password (these are passed to the MongoDB instance) and a port number. See https://docs.mongodb.com/manual/reference/connection-string/ for more information.

Using AMQP as a External Stream Sink

To write to an External Stream sink over AMQP, you need to configure the AMQP connection. AMQP stands for Advanced Message Queuing Protocol, and is an Open Standard for Messaging Middleware. For more information, see http://www.amqp.org/about/what. You may want to consult with whoever has set up AMQP in your environment.

AMQP 0.9.1 vs 1.0

There are distinct differences between the way AMQP up to 0.9.1 works and the way AMQP 1.0 works. Roughly, AMQP 1.0 only provides network wire-level protocol for the exchange of messages, while up to 0.9.1, AMQP employed a system of exchanges, queues and bindings to exchange messages. As a result of this change, you configure StreamLab for AMQP differently for 1.0 than for up to 0.9.1

AMQP_LEGACY (AMQP protocol Version 0.9, e.g., RabbitMQ)

The AMQP option lets you define a sink with an AMQP 0.9.1 message bus. AMQP stands for Advanced Message Queuing Protocol, and is an Open Standard for Messaging Middleware. For more information, see http://www.amqp.org/about/what. As with other input formats, AMQP simply intakes rows as strings in CSV, XML, or JSON format. To set up an AMQP 0.9.1 sink, you need the following pieces of information:

URL Format:

amqp://<username>:<password>@<clientid>/<profile>?brokerlist='tcp://<hostname>:<portNumber>'&[ <optionsList> ]

Example:

amqp://guest:guest@clientid/default?brokerlist='tcp://localhost:5672'

AMQP10 (AMQP protocol Version 1.0) - connectionfactory.localhost:

To set up an AMQP 1.0 sink, you need the following pieces of information:

URL Format:

amqp://<username>:<password>@<hostname>:<portNumber>?<optionsList>'

Example:

amqp://guest:guest@localhost:5672?clientid=test-client&remote-host=default

Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation.Single quotes must be doubled for SQL formatting. You may need to do additional URL escaping depending on your AMQP implementation. The site https://azure.microsoft.com/en-us/documentation/articles/service-bus-java-how-to-use-jms-api-amqp/ offers an example of formatting a connection URL.|

Option Name Description
Partition Expression You should only use this if DESTINATION includes "{PARTITION}". This should be a dk.brics regular expression, such as *<0-3>.
Acknowledge Mode Optional. Acknowledgment mode that ECDA communicates to the AMQP 1.0 server. Options are AUTO, MANUAL, or NONE; defaults to AUTO. Details on these modes are available at http://docs.spring.io/spring-integration/reference/html/amqp.html# amqp-inbound-ack

Roughly, AUTO asks the container on the AMQP server to acknowledge once message is completely delivered. MANUAL means that delivery tags and channels are provided in message headers, and NONE means no acknowledgments.

Using Kafka as a External Stream Sink

When you point a sink to Kafka, you can configure a wide range of options for delivering rows to Kafka.

Using EGRESS Columns

These special egress columns are supported by the KAFKA10 plugin. Data arriving in these columns is not included in the outgoing Kafka message payload; instead they are used to set other message variables.

Name Data Type Description Name in s-Server 6.0.0
SQLSTREAM_EGRESS_KAFKA_PARTITION INTEGER This column is used to determine to which partition the message will be written.

NOTE: SQLstream uses this approach and does not support use of the Kafka producer property "partitioner.class"
KAFKA_PARTITION
SQLSTREAM_EGRESS_KAFKA_TIMESTAMP TIMESTAMP This column is used to set the Kafka message timestamp KAFKA_TIMESTAMP
SQLSTREAM_EGRESS_KAFKA_KEY VARBINARY(256) This column is used as the message key KAFKA_KEY
SQLSTREAM_EGRESS_WATERMARK VARCHAR(256) This column is used as a watermark, and is saved when committing to the sink Kafka topic. The developer should set it to whatever watermark format is supported by the source plugin. The format of the stored watermark string is as follows:
<CommitRowtimeInMillisFromEpoch>,<watermark_column_value>.
-

Foreign Stream Options for Writing to Kafka

Some of the following options, which are passed to the Kafka broker, are described in more detail at http://kafka.apache.org/documentation/#producerconfigs. Where appropriate, information in this table is drawn from this page.

Options shown in lower case are case sensitive; they must be lowercase and double-quoted. Options shown in upper case may be entered either unquoted in any case, or double-quoted in upper case. So TOPIC, Topic and topic are all valid; "TOPIC" is valid but "topic" is not.

Some options may not apply to all versions of the Kafka plugin. Each option below is marked as 'Kafka10', 'Legacy' or 'All':

  • 'Kafka10' - the option only applies to the Kafka10 plugin and KAFKA10_SERVER
  • 'Legacy' - the option only applies to the legacy adapter for Kafka up to 0.8, and KAFKA_SERVER
  • 'All' - the option applies to both plugin versions.
Option Name Adapter version Description
TOPIC All Kafka topic
"bootstrap.servers" Kafka10 hostname:port[,hostname:port] of the Kafka broker(s). Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers.
SEED_BROKERS Legacy A comma separated list of broker identifiers in the format "<broker_host_name>:<port>". Defaults to "localhost".
"metadata.broker.list" Legacy hostname:port of the Kafka broker. Defaults to localhost:9092. Used for getting metadata (topics, partitions and replicas). Actual socket connections are established using the information from this metadata. Use commas to separate multiple brokers.
"key.serializer" Kafka10 Names a serializer class for keys. If no class is given, Kafka uses value.serializer.
"key.serializer.class" Legacy Names a serializer class for keys. If no class is given, Kafka uses serializer.class.
"value.serializer" Kafka10 Names a serializer class for values.
"serializer.class" Legacy Names a serializer class for values. The default encoder takes a byte[] and returns the same byte[].
"partitioner.class" All Fully qualified Java classname of Kafka partitioner. Defaults to com.sqlstream.aspen.namespace.kafka.KafkaOutputSink$RoundRobinPartitioner
"compression.type" Kafka10 If desired, specifies the final compression type for a given topic. Defaults to 'none'. Possible values: 'snappy', 'gzip'
"retry.backoff.ms" All Producers refresh metadata to see if a new leader has been elected. This option specifies the amount of time to wait before refreshing.
"request.timeout.ms" All When request.required.acks is enabled, this lets you specify how long the broker should try to bundle the specified number of messages before sending back an error to the client.
"send.buffer.bytes" All Socket write buffer size.
"client.id" All Using this option, you can specify a string to help you identify the application making calls to the Kafka server.
"transactional.id" Kafka10 This is the transaction ID used by the KafkaWriter instance for the given foreign stream. Each foreign stream should use a unique transactional.id to publish messages to the topic using transactions. Transactions are used only if Kafka brokers are v0.11.2. These support transactional semantics.

NOTE: You need to create a separate foreign stream definition for each pump that inserts (publishes) messages to a given topic. Each of these foreign streams needs to use a unique "transactional.id" for itself. The foreign stream option "pump.name", defined below, needs to match the name of the pump that inserts into the foreign stream.

(new in s-Server version 6.0.1) If you set transactional.id = 'auto', when a pump begins running, s-Server automatically sets transactional.id to '<fully_qualified_pump_name>_Pump', where <fully_qualified_pump_name> is the name of the pump that instantiates the sink.
"pump.name" Kafka10 Deprecated since version 6.0.1.

Fully qualified name of the pump that will process rows for this foreign stream. You must set transactional.id in order to use this option. s-Server uses this setting to determine the mode in which the pump instance itself is running (Leader or Follower) when you configure the Kafka adapter to run in High Availability (HA) mode. The pump.name needs to be fully qualified pump name of the format:<catalog_name>.<schema_name>.<pump_name>

For example:'LOCALDB.mySchema.ProcessedOrdersPump'
"linger.ms" All In cases where batch.size has not been reached, number of milliseconds that the Kafka producer will wait before batching sends . Defaults to '100', in milliseconds)
"batch.size" All Number of messages sent as a batch. Defaults to '1000'
"kafka.producer.config" Kafka10 Lets you specify the name of a properties file that contains a set of Kafka producer configurations. For example, you could use such a file to set all the properties needed for a SSL/SASL connection that the producer will invoke. Kafka offers a wide range of config properties.

For details, see Kafka documentation at https://kafka.apache.org/0100/documentation.html#producerconfigs.

NOTE: In some cases, you can use these property files to override Foreign Stream options. For example, the setting for bootstrap.servers will override the Foreign Stream option "bootstrap.servers". This can be helpful in dynamic environments (AWS, Docker, Kubernetes and so on) where you do not know the specific locations of the Kafka brokers until runtime.
"security.protocol" Kafka10 Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
"transaction.timeout.ms" Kafka10 The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction.
HA_ROLLOVER_TIMEOUT Kafka10 Time in milliseconds. Defaults to 5000. You must set transactional.id and pump.name in order to use this option. When the pump is configured to run in High Availability mode, and the pump is running as a "Follower", it waits for this amount of time for a lack of commits from the "Leader".

After the timeout, the "Follower" attempts to takeover as the "Leader". There may be multiple follower instances running in a cluster. Only one of these followers succeeds to be designated as the new "Leader". All other pump instances using the same transactional.id continue "following". If the earlier "Leader" was still alive when one of the followers took over as the new "Leader", the earlier leader demotes itself as the "Follower" if possible.
COMMIT_METADATA_COLUMN_NAME Kafka10 The name of the column that contains watermark values - default SQLSTREAM_EGRESS_WATERMARK.
SQLSTREAM_EGRESS_WATERMARK_SORT_FUNCTION Kafka10 If the SQLSTREAM_EGRESS_WATERMARK egress column is being used, the value chosen depends on the value of this option. Possible values are:
  • MAX - use the lexical maximum value of all watermarks in the commit batch
  • NONE - (the default) - use the watermark value from the last row in the commit batch
HEADERS_COLUMNS Kafka10 Deprecated - please use HEADERS_COLUMN_LIST
HEADERS_COLUMN_LIST Kafka10 Comma-separated list of foreign stream columns that will be mapped as outgoing headers, rather than to the record value itself. See Writing Headers to Kafka based on Column Data
OPTIONS_QUERY All Lets you query a table to update adapter options at runtime. You can use this, for example, to set the "bootstraop.servers" option from a table , as in select lastOffset as "bootstrap.servers" from TEST.kafka_write_options;
POLL_TIMEOUT Legacy This option specifies the timeout value in milliseconds to be passed as a parameter to the KafkaConsumer.poll() API call. The default is 100ms.
PORT Legacy Deprecated option. From s-Server 6.0.0, port numbers are specified in the SEED_BROKERS option.
"producer.type" Legacy This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are 'async' (asynchronous) and 'sync' (synchronous). Default 'sync'
"compression.codec" Legacy The compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy". Default 'none'.
"compressed.topics" Legacy If the compression codec is anything other than 'none', enable compression only for specified topics if any. If the list of compressed topics is empty, then enable the specified compression codec for all topics.
"message.send.max.retries" Legacy This property will cause the producer to automatically retry a failed send request. This property specifies the number of retries when such failures occur. Default 3.
"topic.metadata.refresh.interval.ms" Legacy The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available...). It will also poll regularly (default: every 10min so 600,000ms). If you set this to a negative value, metadata will only get refreshed on failure. Default 600*1000 (10 minutes).
"request.required.acks" Legacy How many other brokers must have committed the data to their log and acknowledged this to the leader? 0 means never wait; 1 means wait for the leader to acknowledge; -1 means wait for all replicas to acknowledge. Default 0 (never wait).
"queue.buffering.max.ms" Legacy Maximum time to buffer data when using async mode. Default 5000.
"queue.buffering.max.messages" Legacy The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped. Default 10000.
"queue.enqueue.timeout.ms" Legacy The amount of time to block before dropping messages when running in async mode and the buffer has reached queue.buffering.max.messages. If set to 0 events will be enqueued immediately or dropped if the queue is full (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send. Default -1.
"batch.num.messages" Legacy The number of messages to send in one batch when using async mode. The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached. Default 200.
"send.buffer.bytes" Legacy Socket write buffer size. Default 100x1024.
"SHARD_ID" kafka10 This optional parameter is used in scaling, when you are running multiple shards of a pipeline. It uniquely identifies the current shard. When writing metadata/watermarks to kafka, the SHARD_ID is added as a prefix to the SQLSTREAM_POSITION_KEY. It is also included in the automatically generated transaction id. When creating a restartable pipeline, this SHARD_ID is expected to be the same as the SHARD_ID of the source foreign stream.

Using Amazon Kinesis as a External Stream sink

To read from a External Stream sink over Amazon Kinesis, you need to configure the Amazon Kinesis connection. You need to specify at minimum the Kinesis stream name.

Option Name Description
Kinesis Stream Name Required. Name of Kinesis stream to write to. No default.
Kinesis Region Required. Region id of Kinesis region. See http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region for more detalis.
Partition ID Optional. Partition id of shard to write to. Defaults to ''. Can be overwritten by a stream column named PARTITION_ID.
Buffer Size Maximum number of bytes per update request.
Max Retries Optional. If insert fails due to throttling, how many retries should be made. Backoff is doubled for each retry up to max. Default is 10.
Initial Backoff Optional. If insert fails due to throttling, how many milliseconds to back off initially. Default to 20.
Max Backoff Optional. If insert fails due to throttling, how many milliseconds to back off as a maximum. Defaults to 20480.
Max Records Per Request Optional. maximum number of records per update request. Defaults to 500.
AWS Profile Path See Setting Up an AWS Profile Path in the topic Reading from Kinesis Streams in the Integration Guide . Must point to a credential file on the s-Server file system with the following format:

[default]aws_access_key_id = xxxaws_secret_access_key = yyy
This defaults to '' - which goes to ~/.aws/credentials.

Note: You need to have an AWS profile set up, and a configuration file stored on your system, in order to read from or write to Kinesis. See Setting Up an AWS Profile Path in the topic Reading from Kinesis Streams in the Integration Guide .
AWS Profile Name Optional. Profile name to use within credentials file. Defaults to default.
Report Frequency Optional. How often to log (in milliseconds) statistics. Defaults to 0, which means "never".
[default]
aws_access_key_id = xxx
aws_secret_access_key = yyy

This defaults to blank, which goes to ~/.aws/credentials.

You need to have an AWS profile set up, and a configuration file stored on your system, in order to read from or write to Kinesis. See Setting Up an AWS Profile Path in the SQLstream Integration Guide .

Using MQTT as a External Stream Sink

To write data to MQTT, you need to configure the external stream. StreamLab uses this information to implement an MQTT client that writes data from the foreign stream. Minimum options required are TOPIC and CONNECTION_URL.

Foreign Stream Options for Reading from MQTT

Option Description
CONNECTION_URL 'tcp://127.0.0.1:1883',
TOPIC MQTT topic. UTF-8 string that the MQTT broker uses to filter messages for each connected client.
CLIENT_ID s-Server implements an MQTT client to connect to the MQTT server. This setting provides a MQTT ClientID for this client. The MQTT broker uses the ClientID to identify the client and its current state. As a result, if used, you should use a distinct CLIENT_ID for each foreign stream defined for MQTT. Defaults to randomly generated.
QOS Defines the guarantee of delivery for a specific message. Either at most once (0), at least once (1), exactly once (2). Defaults to 1. For more information on QOS, see https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/
USERNAME Optional. User name for MQTT connection. Note: s-Server sends user names and passwords as plain text.
PASSWORD Optional. Password for MQTT connection. Note: s-Server sends user names and passwords as plain text.
KEEP_ALIVE_INTERVAL Optional. Interval in seconds sent to MQTT broker when s-Server establishes a connection. Specifies the longest time period of time that broker and client persist without sending a message. Defaults to 60.
CONNECTION_TIMEOUT Optional. Connection timeout in seconds. Defines the maximum time interval the client will wait for the network

connection to the MQTT server to be established. If you set this value to 0, s-Server disables timeout processing, and the client will wait until the network connection is made successfully or fails. Defaults to 30. | | RETAINED | Output only. True or False. If set to true, tells broker to store the last retained message and the QOS for this topic. Defaults to false. | | MAX_IN_FLIGHT | Output only. When QOS is set to 1 or 2, this is the Maximum number of outgoing messages that can be in the process of being transmitted at the same time. This number includes messages currently going in handshakes and messages being retried. Defaults to 10. |

Using Snowflake as a External Stream Sink

To writeswrite data to Snowflake, you need to configure the external strean. StreamLab uses this information to implement a Snowflake client that writes data into the foreign stream. Minimum options required are TOPIC and CONNECTION_URL.

Foreign Stream Options for Writing to a Snowflake Warehouse

Option Name Description
ACCOUNT The name assigned to your account by Snowflake.
USER The user name for your Snowflake account.
PASSWORD The password for your Snowflake account.
DB The database to write to in Snowflake. This should be an existing database for which user/password has privileges.
SCHEMA The schema to write to in the Snowflake database. This should be an existing schema for which user/password has privileges.
WAREHOUSE The Snowflake warehouse to write to. This should be an existing warehouse for which user/password has privileges.
DTABLE The table to write to in Snowflake. This should be an existing table for which user/password has privileges.
OPTIONS_QUERY Optional. Lets you query a table to update adapter options at runtime. You can use this, for example, to set the SCHEMA option from a table that contains the schema, as in select schemaToUse as SCHEMA from TEST.snowflakeOptions. For more details see the topic Using the Options Query Property.

The minimum configuration options required to write to a Snowflake warehouse are the warehouse name, user name/password, account, database, schema, and stream/table.

As well as these Snowflake options, you must also specify: