Integrating Snowflake

You can write data out of s-Server to a file in a Snowflake warehouse. s-Server formats files for Snowflake in the same way as it does for writing to the local file system. See Writing to the File System for more details. You can specify how files rotate according to time, size, or both, and you can specify how rotated files are named.

Subjects covered in this topic are:

Writing to Snowflake Using SQL

To write to Snowflake using an adapter, you need to create a foreign stream in SQL that references the Snowflake plugin - either by using the prebuilt server object called SNOWFLAKE_SERVER, or by creating your own specific foreign SERVER (see CREATE SERVER. In the foreign stream's options, you configure how s-Server connects to Snowflake. For more information on creating foreign streams, see the topic CREATE FOREIGN STREAM in the Streaming SQL Reference Guide.

Foreign Stream Options for Writing to a Snowflake Warehouse

Option Name Description
ACCOUNT The name assigned to your account by Snowflake.
USER The user name for your Snowflake account.
PASSWORD The password for your Snowflake account.
DB The database to write to in Snowflake. This should be an existing database for which user/password has privileges.
SCHEMA The schema to write to in the Snowflake database. This should be an existing schema for which user/password has privileges.
WAREHOUSE The Snowflake warehouse to write to. This should be an existing warehouse for which user/password has privileges.
DTABLE The table to write to in Snowflake. This should be an existing table for which user/password has privileges.
OPTIONS_QUERY Optional. Lets you query a table to update adapter options at runtime. You can use this, for example, to set the SCHEMA option from a table that contains the schema, as in select schemaToUse as SCHEMA from TEST.snowflakeOptions. For more details see the topic Using the Options Query Property.

The minimum configuration options required to write to a Snowflake warehouse are the warehouse name, user name/password, account, database, schema, and stream/table.

As well as these Snowflake options, you must also specify:

Rotating Files using FILE_ROTATION options

s-Server rotates files according to options set for FILE_ROTATION_TIME, FILE_ROTATION_SIZE and FILE_ROTATION_RESPECT_ROWTIME.

File rotation options work together in the following ways:

Rotate the file if (the FILE_ROTATION_TIME condition is satisfied OR the FILE_ROTATION_SIZE condition is satisfied) AND (the FILE_ROTATION_RESPECT_ROWTIME condition is satisfied).

Using FILE_ROTATION_TIME

You can use FILE_ROTATION_TIME to rotate files based on time elapsed since Unix epoch time. You set FILE_ROTATION_TIME as a positive integer with a unit of either milliseconds (ms), seconds (s), minutes (m), hours (h), or days (d). These express intervals of time from 1970-01-01: an interval might be 15 minutes, 1 hour, or 1 day. Files rotate once a row arrives with a ROWTIME that passes the specified interval.

Examples:

  • FILE_ROTATION_TIME '15m' rotates files every fifteen minutes from the top of the hour (1:00, 1:15, 1:30, and so on).
  • FILE_ROTATION_TIME '1h' rotates files every hour at the top of the hour.
  • FILE_ROTATION_TIME '1d' rotates files every day at 12:00 AM.

More technically, FILE_ROTATION_TIME works as follows:

  • Let $timePeriod be the number of milliseconds in the time unit bound to FILE_ROTATION_TIME.
  • Let $lastWrittenRowtime be the ROWTIME of the last row in the file.
  • Let $currentRowTime be the ROWTIME of the row about to be written.

s-Server rotates to the next file when:

  • integerPart($lastWrittenRowtime / $timePeriod) < integerPart($currentRowTime / $timePeriod)

Using FILE_ROTATION_SIZE

You can use FILE_ROTATION_SIZE to rotate files based on their size. You specify a file size in kilobytes (k), megabytes (m), or gigabytes (g). Expressed as a positive integer followed by a byte measurement: Defaults to 0. That means "don't use file size to trigger file rotation".

Note: You cannot use FILE_ROTATION_SIZE for writing to Hive tables.

Examples:

  • FILE_ROTATION_SIZE '20k' means "rotate files when they reach or exceed a size of 20 kilobytes"
  • FILE_ROTATION_SIZE '20m' means "rotate files when they reach or exceed a size of 20 megabytes"
  • FILE_ROTATION_SIZE '1g' means "rotate files when they reach or exceed a size of 1 gigabyte"

When using FILE_ROTATION_SIZE, you can specify that files wait to rotate until all rows with the same ROWTIME have arrived. See FILE_ROTATION_RESPECT_ROWTIME below.

Using FILE_ROTATION_RESPECT_ROWTIME

Setting FILE_ROTATION_RESPECT_ROWTIME to true ensures that rows with the same rowtime will not be split between two files. For example, if you have set FILE_ROTATION_SIZE to 1m (1 megabyte), and a new row arrives that causes the file to go over the 1 megabyte threshold, if FILE_ROTATION_RESPECT_ROWTIME is set to true, s-Server waits until all rows with the same ROWTIME have arrived. That is, s-Server waits until a new row arrives with a different ROWTIME, even if accepting rows with the same ROWTIME causes the file to grow larger than 1 megabyte.

If you set FILE_ROTATION_RESPECT_ROWTIME to true, you cannot write files from tables, whose rows lack rowtimes. s-Server will raise an error if you try to insert into a file writer foreign stream that has FILE_ROTATION_RESPECT_ROWTIME set to true. That means that if you are planning to write rows from a table into a file, you must set FILE_ROTATION_RESPECT_ROWTIME to false.

Configuring File Names

You can specify options for how files are named as they rotate. In setting options for rotated files' names, you can specify a prefix, suffix, and date format for the file name. You can also specify a watermark. Watermarks are drawn from a column in the source from which the file is written.

At minimum, you must specify either FILENAME_PREFIX or FILENAME_SUFFIX. All rotated files include a timestamp.

If you set ORIGINAL_FILENAME, then its value will serve as a temporary file name to use while data is being actively written.

When this file is rotated out, rotated files are named as follows:

<prefix>-<timestamp>-<watermark>-<sequenceNumber><suffix> depending on the options you specify.

If you do not set ORIGINAL_FILENAME, then the file being actively written to is given the form using the first rowtime in the file set.

Note: if you are writing from a table, and do not specify ORIGINAL_FILENAME, <date> will be the system time when the file began writing.

When this file is rotated out, rotated files are named as follows:

<prefix>-<timestamp>-<watermark>-<sequenceNumber><suffix>

For example, the following options:

filename_prefix 'test-',
filename_date_format 'yyyy-MM-dd_HH:mm:ss',
filename_suffix '.csv',                       --note that you need to specify a period for filename_suffix if desired.

produce rotating file names like this:

test-2020-05-23_19:44:00.csv

And the following options:

filename_date_format 'yyyy-MM-dd_HH:mm:ss',
filename_suffix '.csv',                      --note that you need to specify a period for filename_suffix if desired.

produce rotating file names like this:

2020-05-23_19:44:00.csv

Explanation of Elements

  • <prefix> is the string specified by the FILENAME_PREFIX option. If you did not specify FILENAME_PREFIX, then <prefix> is omitted from the filename.
  • <timestamp> is the ROWTIME of the last row written to the file. This element is always present if the source is a stream, but is omitted if the source is a table. s-Server formats <timestamp> as specified by FILENAME_DATE_FORMAT. If you do not specify FILENAME_DATE_FORMAT, then is formatted as yyyy-MM-dd_HH-mm-ss-SSS.
  • <watermark> is the value of a watermark column in the last row of the source. You use FILE_ROTATION_WATERMARK_COLUMN to specify a source watermark column corresponding to the last row of the file. If you do not specify a FILE_ROTATION_WATERMARK_COLUMN, then <watermark> is omitted from the filename.
  • <sequenceNumber> is added only if the you specify the FILE_ROTATION_RESPECT_ROWTIME option as false. In that case, files may have the same terminal <timestamp>, because s-Server can write rows with the same rowtime across multiple files. In these cases, file names in the series with the same terminal <timestamp> are distinguished from one another by a monotonically increasing 7 digit sequence number, starting at 0000001.
  • <suffix> is the string specified by the FILENAME_SUFFIX option. If you did not specify FILENAME_SUFFIX, then <suffix> is omitted from the filename.

Example using the pre-defined SNOWFLAKE_SERVER

If you use the pre-built SNOWFLAKE_SERVER, you need to supply all options for each and every foreign stream, as in this example:

CREATE OR REPLACE SCHEMA SnowflakeWriterSchema;
SET SCHEMA 'SnowflakeWriterSchema';

CREATE OR REPLACE FOREIGN STREAM SnowflakeWriterSchema.SnowflakeWriterStream (
    "id" INTEGER,
    "shift_no" DOUBLE,
    "reported_at" TIMESTAMP NOT NULL,
    "trip_no" VARCHAR(10),
    "latitude" DOUBLE,
    "longitude" DOUBLE,
    "speed" INTEGER,
    "bearing" INTEGER,
    "active" BOOLEAN
)
SERVER SNOWFLAKE_SERVER
OPTIONS (
                         -- Snowflake options    
    "USER" 'myname',
    "PASSWORD" 'password',
    "ACCOUNT" 'sqlstream',
    "WAREHOUSE" 'DEMO_WH',
    "DB" 'TEST_DB',
    "SCHEMA" 'PUBLIC',
    "DTABLE" 'demo',
                         -- formatting options - always CSV
    "FORMATTER" 'CSV',
    "CHARACTER_ENCODING" 'UTF-8',
    "QUOTE_CHARACTER" '',
    "SEPARATOR" ',',
    "WRITE_HEADER" 'false', 
                         -- file writer options
    "DIRECTORY" '/tmp',
    "ORIGINAL_FILENAME" 'bus-output.csv',
    "FILENAME_PREFIX" 'output-',
    "FILENAME_SUFFIX" '.csv',
    "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss',
    "FILE_ROTATION_SIZE" '20K'
    "FORMATTER_INCLUDE_ROWTIME" 'true'
  );

Example using a user-defined SERVER

If you are writing to several Snowflake tables, it can be helpful to define your own server, and associate it with all the options that would be shared by all these Snowflake tables (represented as SQLstream foreign streams).

The precise choice of which options to define at the server level and which at the foreign stream level is up to the application developer.

CREATE OR REPLACE SCHEMA SnowflakeWriterSchema;
SET SCHEMA 'SnowflakeWriterSchema';

-- Create a server that encapsulates all the shared / common options

CREATE OR REPLACE SERVER "Snowflake_Demo_Test_DB" TYPE 'SNOWFLAKE'
FOREIGN DATA WRAPPER ECDA
OPTIONS  (
                            -- Snowflake-specific options    
    "USER" 'myname',
    "PASSWORD" 'password',
    "ACCOUNT" 'sqlstream',
    "WAREHOUSE" 'DEMO_WH',
    "DB" 'TEST_DB',
    "SCHEMA" 'PUBLIC',
                            -- formatting options - always CSV
    "FORMATTER" 'CSV',
    "CHARACTER_ENCODING" 'UTF-8',
    "QUOTE_CHARACTER" '',
    "SEPARATOR" ',',
    "WRITE_HEADER" 'false'
); 

-- now create several foreign streams that inherit some of their options from the server

CREATE OR REPLACE FOREIGN STREAM SnowflakeWriterSchema.SnowflakeWriterStream (
    "id" INTEGER,
    "shift_no" DOUBLE,
    "reported_at" TIMESTAMP NOT NULL,
    "trip_no" VARCHAR(10),
    "latitude" DOUBLE,
    "longitude" DOUBLE,
    "speed" INTEGER,
    "bearing" INTEGER,
    "active" BOOLEAN
)
SERVER "Snowflake_Demo_Test_DB"  -- using the user-defined server
OPTIONS (
                            -- remaining Snowflake options    
    "DTABLE" 'demo',        -- this would always be per-table
                            -- no need for any CSV formatting options
                            -- file writer options
    "DIRECTORY" '/tmp',
    "ORIGINAL_FILENAME" 'bus-output.csv',
    "FILENAME_PREFIX" 'output-',
    "FILENAME_SUFFIX" '.csv',
    "FILENAME_DATE_FORMAT" 'yyyy-MM-dd-HH:mm:ss.SSS',
    "FILE_ROTATION_SIZE" '20K',
    "FORMATTER_INCLUDE_ROWTIME" 'true'
);

An example pipeline

To begin writing data to Snowflake, you INSERT into SnowflakeWriterSchema.SnowflakeWriterStream. When SnowflakeWriterSchema.SnowflakeWriterStream receives rows, s-Server writes data to the Snowflake warehouse you have configured in the foreign stream options.

In most cases, you will want to set up a pump that writes data to SnowflakeWriterSchema.SnowflakeWriterStream. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

Note On Writing Pumps

Because of the nature of streaming data, you will need to set up a pump in order to move rows continually from an s-Server stream to another stream, file, Kafka topic, RDBMS table or other location. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. A model for setting up a pump is provided below. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

You do so with code along the following lines:

CREATE OR REPLACE SCHEMA Pumps;
SET SCHEMA 'Pumps';

CREATE OR REPLACE PUMP writerPump STOPPED AS
--We recommend creating pumps as stopped
--then using ALTER PUMP "Pumps"."writerPump" START to start it
INSERT INTO SnowflakeWriterSchema.SnowflakeWriterStream
SELECT STREAM * FROM "MyStream";
--where "MyStream" is a currently existing stream

To start writing data, use the following code:

ALTER PUMP Pumps.writerPump START;

Writing to Snowflake Using the ECD Agent

You can use the ECD agent to write files to Snowflake from remote locations. See Writing Data to Remote Locations for more details. The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

Note: Before using the ECD agent, you need to create a source stream for it. In the below example, you would need to create the foreign stream SnowflakeWriterSchema.SnowflakeWriterStream.

Sample Property File for ECD Agent

#                                   SQLstream source schema and stream
SCHEMA_NAME="SnowflakeWriterSchema"
TABLE_NAME="SnowflakeWriterStream"
#                                   Snowflake sink options
USER=myname
PASSWORD=password
ACCOUNT=sqlstream
WAREHOUSE=DEMO_WH
DB=TEST_DB
SCHEMA=PUBLIC
DTABLE=demo 
#                                   CSV formatting options
--formatting for CSV data
FORMATTER=CSV
CHARACTER_ENCODING=UTF-8
QUOTE_CHARACTER=
SEPARATOR=,
WRITE_HEADER=false    
#                                   File writer options
DIRECTORY=/tmp
ORIGINAL_FILENAME=bus-output.csv
FILENAME_PREFIX=output-
FILENAME_SUFFIX=.csv
FILENAME_DATE_FORMAT=yyyy-MM-dd-HH:mm:ss
FILE_ROTATION_SIZE=20K'
FORMATTER_INCLUDE_ROWTIME=true

To invoke the agent, from the directory $SQLSTREAM_HOME/../clienttools/EcdaAgent/ enter the following at the command line:

$ ./commondataagent.sh --output --props sample.test.properties --io snowflake