(new in s-Server version 6.0.1)
The ORC formatter writes data from s-Server rows into the Optimized Row Columnar (ORC) file format based on options supplied through the options clause of a foreign table or foreign stream as well as table properties introspected from the target Hive table.
To format data as ORC, you need to set FORMATTER to ORC. ORC data can only be written to the file system as flat files, HDFS as flat files, or Hive tables as directory trees.
If you write ORC data into Hive tables, ORC-formatted data will be written as a series of directory trees that each correspond to the directory layout of the target Hive table. Otherwise, ORC data will be written to a flat file. See Writing to Hive Tables for more details.
For more information on ORC, see https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC#LanguageManualORC-ORCFiles
The ORC formatter has the following limitations:
When using SQL, the ORC formatter converts rows to ORC that you define in a foreign table or foreign stream, outputting these rows to a data writer that you have defined for the foreign table or stream. When you INSERT into the foreign table or stream, s-Server begins writing ORC to the data writer that you specify.
When using the ECD agent, the ORC converts rows that you define in a properties file to ORC, outputting these rows to a data writer that you have defined for the foreign table or stream.
For a list of writers, and details on how to use them, see the Writing to Other Destinations.
For performance reasons, most formatting should be done in native SQL and passed into ECDA, not pushed into ECDA formatters.
THE ORC Format stores the data in columnar format which can be further partitioned, clustered and sorted based on the column values. The Hive scans the relevant partitions to execute the jobs in parallel. The current SQLstream file writers produce the sequence of files determined by the File Rotation Policy. Each file name encodes the position of the file in that sequence usually represented by TIMESTAMP or WATERMARK field.
If the target Hive table is partitioned and clustered then instead of producing a single file per cadence point, the ORC writer generates the micro-batches of the directory of files structured into tree format and represented as levels and clustered buckets. The stream must be sorted so that the rows in a bucket appear in sorted order in Hive table. To achieve this, sort key’s first column is a monotonic expression of ROWTIME, and the driving stream is sorted according to the sort keys declared on the target Hive table.
For the top directory naming convention of the files produced by FILE, HDFS and Hive sinks refer Integrating the File System and the further sub-directories follow the conventions used by Hive to represent partition keys and cluster buckets. If a Hive table is specified, then the names of the partition levels will incorporate the names of the partition keys in the target Hive table. Here is a sketch of what one of these trees will look like:
busStats-2019-02-22_08:05:19.123.orc represents the top level directory name
event_year=2019 // Partition Keys
event_month=11
driver_no=12345
highway=101
000001_0_2019-02-22_08-05-19-123 //Data Files
000002_0_2019-02-22_08-05-19-123
000003_0_2019-02-22_08-05-19-123
The leaf files are the cluster buckets. Each leaf file is ORC-formatted, and it contains a sorted set of rows for a single cluster key. Buckets are named as follows:
<bucketID>_<lastRowtime>
where,
<bucketID>
is a conventional Hive bucket identifier (0-padded six-digit number followed by _0) and
<lastRowtime>
is the last rowtime in the entire batch, pretty-printed using SQLstream's default timestamp format yyyy-MM-dd_HH-mm-ss-SSS
To write ORC data, you create a foreign table or stream that references one of s-Server's prebuilt server objects. Like all tables and streams, foreign tables and streams must be created within a schema. The example below creates and sets a schema called orc_data and creates a foreign stream called orc_sink that writes data to the file system.
CREATE OR REPLACE SCHEMA orc_data;
SET SCHEMA 'orc_data';
CREATE or REPLACE FOREIGN STREAM orc_sink
(
"id" BIGINT,
"reported_at" TIMESTAMP,
"speed" INTEGER,
"driver_no" BIGINT,
"gps" VARCHAR(128),
"highway" VARCHAR(8)
"event_year" INTEGER,
"event_month" INTEGER
)
SERVER HIVE_SERVER
OPTIONS
(
FORMATTER 'ORC',
DIRECTORY '/tmp/busStats',
CONFIG_PATH '/home/hive/kerberos/core-default.xml',
AUTH_METHOD 'kerberos',
AUTH_USERNAME 'sqlstream_guavus@my.GGN',
AUTH_KEYTAB '/home/hive/kerberos/sqlstream_guavus.keytab',
AUTH_METASTORE_PRINCIPAL 'hive/_HOST@my.GGN'
HDFS_OUTPUT_DIR 'hdfs:///hiveInstallation/data/svc_sqlstream_guavus/busStats',
FILENAME_SUFFIX '.orc',
FILE_ROTATION_TIME '1h',
--the following options are specific to writing to a Hive table
HIVE_SCHEMA_NAME 'trafficApp',
HIVE_TABLE_NAME 'hive_bus_stats',
"orc.version" 'V_0_12',
"orc.user.metadata_com.acme.watermark" 'AC3E'
);
To actually write data, you need to write a pump containing an INSERT statement along the following lines. For more information on pumps, see the topic CREATE PUMP in the Guavus s-Server Streaming SQL Reference Manual.
CREATE OR REPLACE PUMP writer_pump STOPPED AS
INSERT INTO orc_sink
(
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
)
SELECT STREAM
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
from "buses_stream";
--this assumes that a stream called "buses_stream" exists in the same schema
Option | Default Value | Required | Description |
---|---|---|---|
FORMATTER | none | yes | This needs to be ORC. |
"orc.version" | none | yes | Currently supported values are 'V_0_11' and 'V_0_12'. |
"orc.block.padding" | true | no | Sets whether the HDFS blocks are padded to prevent stripes from straddling blocks. Padding improves locality and thus the speed of reading, but costs space. |
"orc.block.size" | 268,435,456 | no | Set the file system block size for the file. For optimal performance, set the block size to be multiple factors of stripe size. |
"orc.direct.encoding.columns" | null | no | Set the comma-separated list of case-sensitive names of columns that should be direct encoded. |
"orc.batch.size" | 10,000 | no | Number of rows to batch together before issuing an ORC write. |
"orc.user.metadata_XXX" | none | no | Where XXX is a string. XXX is the name of an application-specific metadata key. Therefore, it should be clear that it lives in a namespace unique to the application. There can be many of these options. The value of each option is a SQL BINARY literal with the leading X' stripped off. |
STARTING_OUTPUT_ROWTIME | If this is set, then all rows with ROWTIME less than the specified value are silently dropped. It is specified using an ISO 8601 format like '2020-02-06T12:34:56.789'. |
The options below apply only to cases when you are writing to the file system or HDFS. You cannot use these options when writing ORC data to a Hive table.
Option | Default Value | Description |
---|---|---|
"orc.bloom.filter.columns" | null | Column-separated list of bloom filter columns. These are names from the foreign table/stream's column signature. |
"orc.bloom.filter.fpp" | 0.05 | Bloom filter false-positive probability. |
"orc.compress" | ZLIB | Compression algorithm. See Appendix A for legal values. |
"orc.compress.size" | 262,144 | Compression chunk size. |
"orc.row.index.stride" | 10,000 | Number of rows between index entries. |
"orc.stripe.size" | 67,108,864 | Size of in-memory write buffer. |
You can use the ECD agent to ORC Data to remote locations. See Writing Data to Remote Locations for more details.
The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.
# Column types for the source stream
ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7)
FORMATTER=ORC
DIRECTORY=tmp/busStats
CONFIG_PATH=home/hive/kerberos/core-default.xml
AUTH_METHOD=kerberos
AUTH_USERNAME=sqlstream_guavus@my.GGN
AUTH_KEYTAB=home/hive/kerberos/sqlstream_guavus.keytab
AUTH_METASTORE_PRINCIPAL=hive/_HOST@my.GGN
HDFS_OUTPUT_DIR=hdfs:///hiveInstallation/data/svc_sqlstream_guavus/busStats
FILENAME_SUFFIX=.orc
FILE_ROTATION_TIME=1h
--the following options are specific to writing to a Hive table
HIVE_SCHEMA_NAME=trafficApp
HIVE_TABLE_NAME=hive_bus_stats
ORC_VERSION=V_0_12
ORC.USER.METADATA_COM.ACME.WATERMARK=AC3E
To invoke the agent, from the directory $SQLSTREAM_HOME/../clienttools/EcdaAgent/ enter the following at the command line
$ ./commondataagent.sh --output --props sample.properties --io hive