Formatting Data as ORC

(new in s-Server version 6.0.1)

The ORC formatter writes data from s-Server rows into the Optimized Row Columnar (ORC) file format based on options supplied through the options clause of a foreign table or foreign stream as well as table properties introspected from the target Hive table.

To format data as ORC, you need to set FORMATTER to ORC. ORC data can only be written to the file system as flat files, HDFS as flat files, or Hive tables as directory trees.

If you write ORC data into Hive tables, ORC-formatted data will be written as a series of directory trees that each correspond to the directory layout of the target Hive table. Otherwise, ORC data will be written to a flat file. See Writing to Hive Tables for more details.

For more information on ORC, see https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC#LanguageManualORC-ORCFiles

The ORC formatter has the following limitations:

  • The ORC formatter does not support column encryption or Hive indexes.
  • The ORC formatter also does not support HA watermarks. Watermarks (for HA recovery) are well-defined when a stream is used as the source for an INSERT into an ORC-formatted sink. In that case, the ROWTIME of the last row INSERTed into a file is recorded in the file name (or directory name if you are writing ORC formatted data to Hive tables). In all other cases, you need to include watermarks in the target rows or build some other mechanism for tracking recovery points.

When using SQL, the ORC formatter converts rows to ORC that you define in a foreign table or foreign stream, outputting these rows to a data writer that you have defined for the foreign table or stream. When you INSERT into the foreign table or stream, s-Server begins writing ORC to the data writer that you specify.

When using the ECD agent, the ORC converts rows that you define in a properties file to ORC, outputting these rows to a data writer that you have defined for the foreign table or stream.

For a list of writers, and details on how to use them, see the Writing to Other Destinations.

For performance reasons, most formatting should be done in native SQL and passed into ECDA, not pushed into ECDA formatters.

Partitioning

THE ORC Format stores the data in columnar format which can be further partitioned, clustered and sorted based on the column values. The Hive scans the relevant partitions to execute the jobs in parallel. The current SQLstream file writers produce the sequence of files determined by the File Rotation Policy. Each file name encodes the position of the file in that sequence usually represented by TIMESTAMP or WATERMARK field.

If the target Hive table is partitioned and clustered then instead of producing a single file per cadence point, the ORC writer generates the micro-batches of the directory of files structured into tree format and represented as levels and clustered buckets. The stream must be sorted so that the rows in a bucket appear in sorted order in Hive table. To achieve this, sort key’s first column is a monotonic expression of ROWTIME, and the driving stream is sorted according to the sort keys declared on the target Hive table.

For the top directory naming convention of the files produced by FILE, HDFS and Hive sinks refer Integrating the File System and the further sub-directories follow the conventions used by Hive to represent partition keys and cluster buckets. If a Hive table is specified, then the names of the partition levels will incorporate the names of the partition keys in the target Hive table. Here is a sketch of what one of these trees will look like:

Example

busStats-2019-02-22_08:05:19.123.orc represents the top level directory name

event_year=2019                      // Partition Keys

  event_month=11 

    driver_no=12345 

      highway=101 

        000001_0_2019-02-22_08-05-19-123    //Data Files

        000002_0_2019-02-22_08-05-19-123 

        000003_0_2019-02-22_08-05-19-123 

The leaf files are the cluster buckets. Each leaf file is ORC-formatted, and it contains a sorted set of rows for a single cluster key. Buckets are named as follows:

<bucketID>_<lastRowtime> 

where,

<bucketID> is a conventional Hive bucket identifier (0-padded six-digit number followed by _0) and <lastRowtime> is the last rowtime in the entire batch, pretty-printed using SQLstream's default timestamp format yyyy-MM-dd_HH-mm-ss-SSS

Using SQL to Write ORC

To write ORC data, you create a foreign table or stream that references one of s-Server's prebuilt server objects. Like all tables and streams, foreign tables and streams must be created within a schema. The example below creates and sets a schema called orc_data and creates a foreign stream called orc_sink that writes data to the file system.

CREATE OR REPLACE SCHEMA orc_data;
SET SCHEMA 'orc_data';

CREATE or REPLACE FOREIGN STREAM orc_sink
(
  "id" BIGINT,
  "reported_at" TIMESTAMP,
  "speed" INTEGER,
  "driver_no" BIGINT,
  "gps" VARCHAR(128),
  "highway" VARCHAR(8)
  "event_year" INTEGER,
  "event_month" INTEGER
)
SERVER HIVE_SERVER
OPTIONS
(
  FORMATTER 'ORC',
  DIRECTORY '/tmp/busStats',
  CONFIG_PATH '/home/hive/kerberos/core-default.xml',
  AUTH_METHOD 'kerberos',
  AUTH_USERNAME 'sqlstream_guavus@my.GGN',
  AUTH_KEYTAB '/home/hive/kerberos/sqlstream_guavus.keytab',
  AUTH_METASTORE_PRINCIPAL 'hive/_HOST@my.GGN'
  HDFS_OUTPUT_DIR 'hdfs:///hiveInstallation/data/svc_sqlstream_guavus/busStats',
  FILENAME_SUFFIX '.orc',
  FILE_ROTATION_TIME '1h',
  --the following options are specific to writing to a Hive table
  HIVE_SCHEMA_NAME 'trafficApp',
  HIVE_TABLE_NAME 'hive_bus_stats',
  "orc.version" 'V_0_12',
  "orc.user.metadata_com.acme.watermark" 'AC3E'
  );

To actually write data, you need to write a pump containing an INSERT statement along the following lines. For more information on pumps, see the topic CREATE PUMP in the Guavus s-Server Streaming SQL Reference Manual.

CREATE OR REPLACE PUMP writer_pump STOPPED AS
INSERT INTO orc_sink
(
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
)
SELECT STREAM
"id",
"reported_at",
"shift_no",
"trip_no",
"route_variant_id",
"waypoint_id",
"last_known_location_state"
from "buses_stream";
--this assumes that a stream called "buses_stream" exists in the same schema

Foreign Stream Options for Formatting ORC

Option Default Value Required Description
FORMATTER none yes This needs to be ORC.
"orc.version" none yes Currently supported values are 'V_0_11' and 'V_0_12'.
"orc.block.padding" true no Sets whether the HDFS blocks are padded to prevent stripes from straddling blocks. Padding improves locality and thus the speed of reading, but costs space.
"orc.block.size" 268,435,456 no Set the file system block size for the file. For optimal performance, set the block size to be multiple factors of stripe size.
"orc.direct.encoding.columns" null no Set the comma-separated list of case-sensitive names of columns that should be direct encoded.
"orc.batch.size" 10,000 no Number of rows to batch together before issuing an ORC write.
"orc.user.metadata_XXX" none no Where XXX is a string. XXX is the name of an application-specific metadata key. Therefore, it should be clear that it lives in a namespace unique to the application. There can be many of these options. The value of each option is a SQL BINARY literal with the leading X' stripped off.
STARTING_OUTPUT_ROWTIME If this is set, then all rows with ROWTIME less than the specified value are silently dropped. It is specified using an ISO 8601 format like '2020-02-06T12:34:56.789'.

The options below apply only to cases when you are writing to the file system or HDFS. You cannot use these options when writing ORC data to a Hive table.

Option Default Value Description
"orc.bloom.filter.columns" null Column-separated list of bloom filter columns. These are names from the foreign table/stream's column signature.
"orc.bloom.filter.fpp" 0.05 Bloom filter false-positive probability.
"orc.compress" ZLIB Compression algorithm. See Appendix A for legal values.
"orc.compress.size" 262,144 Compression chunk size.
"orc.row.index.stride" 10,000 Number of rows between index entries.
"orc.stripe.size" 67,108,864 Size of in-memory write buffer.

Using the ECD Agent to Write ORC

You can use the ECD agent to ORC Data to remote locations. See Writing Data to Remote Locations for more details.

The ECD agent takes similar options, but these options need to be formatted in a properties file along the lines of the following. These properties correspond to those defined for the adapter above.

# Column types for the source stream
ROWTYPE=RecordType(INTEGER COL1,TIMESTAMP COL2, INTEGER COL3, BOOLEAN COL4, VARCHAR(32) COL5, VARCHAR(32) COL6, INTEGER COL7)
FORMATTER=ORC
DIRECTORY=tmp/busStats
CONFIG_PATH=home/hive/kerberos/core-default.xml
AUTH_METHOD=kerberos
AUTH_USERNAME=sqlstream_guavus@my.GGN
AUTH_KEYTAB=home/hive/kerberos/sqlstream_guavus.keytab
AUTH_METASTORE_PRINCIPAL=hive/_HOST@my.GGN
HDFS_OUTPUT_DIR=hdfs:///hiveInstallation/data/svc_sqlstream_guavus/busStats
FILENAME_SUFFIX=.orc
FILE_ROTATION_TIME=1h
--the following options are specific to writing to a Hive table
HIVE_SCHEMA_NAME=trafficApp
HIVE_TABLE_NAME=hive_bus_stats
ORC_VERSION=V_0_12
ORC.USER.METADATA_COM.ACME.WATERMARK=AC3E

To invoke the agent, from the directory $SQLSTREAM_HOME/../clienttools/EcdaAgent/ enter the following at the command line

$ ./commondataagent.sh --output --props sample.properties --io hive