Parsing CSV

To read CSV formatted data, you need to specify a separator character (such as a comma or a pipe) for values. You can also indicate whether or not the file has a header. When the Extensible Common Data Framework parses CSV, each row becomes a row in the stream. Columns are identified by the separator character, which can be a comma, a pipe, or any other character you designate. s-Server matches data types according to its Ingestion Rules for CSV.

Note: You can also input data in larger chunks and parse it later using the Parser UDX ). This UDX calls the parsers listed above in a function. For more information on using functions, see the topic Transforming Data in s-Server.

The s-Server trace log includes information on readers' and parsers' progress. See Periodic Parser Statistics Logging in the Administering Guavus SQLstream guide. These errors are also logged in the Global Error Stream.

Sample Foreign Stream Implementing ECD Adapter to Parse CSV Files

The following example creates a schema called schema_csv. It then sets up a foreign stream that will parse columns called id, reported_at, shift_no, trip_no, lat, lon, speed, bearing, driver_no, and highway from a file in /tmp/ called buses.log. To parse CSV over other input/output systems, such as Kafka, Kinesis, a network socket, a WebSocket, HTTP or AMQP, you would need to specify options for these formats. See [Reading from Other Sources]/integrating-sqlstream/reading-data/) for more details.

Note: The examples below use the file system as an input system. To parse CSV over other systems, such as Kafka or AMQP, you would need to specify options for these formats. See Reading from Other Sources for more details.

create or replace schema schema_csv;
set schema 'schema_csv';
CREATE OR REPLACE FOREIGN STREAM read_from_csv
(
    "id" BIGINT,
    "reported_at" TIMESTAMP,
    "shift_no" VARCHAR(8),
    "trip_no" VARCHAR(4),
    "lat" DOUBLE,
    "lon" DOUBLE,
    "speed" INTEGER,
    "bearing" INTEGER,
    "driver_no" BIGINT,
    "highway" VARCHAR(8)
)
    SERVER FILE_SERVER
    OPTIONS (
        "PARSER" 'CSV',
        "CHARACTER_ENCODING" 'UTF-8',
        "SEPARATOR" ',',
        "SKIP_HEADER" 'false',
        "DIRECTORY" '/tmp/',
        "FILENAME_PATTERN" 'buses\.log'
    );

To actually begin reading from the file, use a statement along the lines of

SELECT stream * from CSVReaderStream;

Foreign Stream Options for Parsing CSV Data

Option Definition
SKIP_HEADER True or false; defaults to false. Specifies if the parser should skip the first row of input files. Usually, you would do this if the first row of data contains column headers.
DISALLOW_QUOTED_ROW_SEPARATOR True or false: defaults to false.
If true, the parsing will not search for the row separator within a quoted field - so this allows multi-line field values to be ingested.
If false, finding the row separator terminates field parsing even if there is an unmatched start quote.
ROW_SEPARATOR Character(s) separating rows in CSV data. Defaults to '\n' (a newline). Supports multi-character values, and supports use of Unicode literals such as U&'\000D\000A' (CR/LF - for reading from Windows / DOS files).
SEPARATOR Character(s) separating field values within a row. Defaults to ','. Supports multi-character strings like '$$' or '@!@', and single or multi-character Unicode literals.
QUOTE_CHARACTER Lets you specify an expected quotation character (which may be applied to any incoming field, or may be present only when a field value includes the SEPARATOR string). There is no default for quote character. Only a single one-byte character may be used, which limits to code points between 0 and 127.
QUOTED_COLUMNS If set to anything non-blank, sets the quote character to a double quote
COLUMN_MAPPING Allows the extraction and re-ordering of a subset of fields from the CSV record. Fields can be re-ordered. This may not be combined with UNPARSED_TEXT
UNPARSED_TEXT What to do with additional trailing data in the CSV record. Options are 'TRUNCATE' (the default) 'LAST COLUMN' or 'NEW ROW'. This may not be combined with COLUMN_MAPPING

Provenance Columns for Parsers

When parsing data, you can define provenance columns for your foreign stream. These return metadata for the parsed data.

For CSV, these are as follows:

Data Type Name Value
BIGINT SQLSTREAM_PROV_PARSE_POSITION Parser position within message of last parse error.
VARCHAR(65535) SQLSTREAM_PROV_PARSE_ERROR Description of parser error.
BIGINT SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER How many lines have been parsed so far. This value is not reset per message or file.

COLUMN_MAPPING : Mapping Columns with the CSV Parser

You can use the COLUMN_MAPPING option to parse only some of the columns,in a CSV file, or to reorder the columns from CSV data.

To implement this option, you implement code along the following lines:

CREATE OR REPLACE FOREIGN STREAM my-stream (
      COMMON_NAME VARCHAR(64),
      SPECIES VARCHAR(64),  
      GENUS VARCHAR(64)
)
SERVER FILE_SERVER
OPTIONS(
   DIRECTORY '/animals/',
   FILENAME_PATTERN 'animals.csv',
   COLUMN_MAPPING ',COMMON_NAME,GENUS,SPECIES',
   PARSER 'CSV',
   STATIC_FILES 'true',
   SKIP_HEADER 'true'
);

When you run a SELECT against this stream, the parser will take the following actions:

  • It will not parse the first column in the CSV file (note leading comma).
  • It will map the second column in the CSV file to COMMON_NAME.
  • It will map the third column in the CSV file to GENUS (even though the order of stream columns differs from the CSV column order).
  • It will map the fourth column in the CSV file to SPECIES (even though the order of stream columns differs from the CSV column order).

UNPARSED_TEXT: Dealing with extra trailing text in the CSV data

The UNPARSED_TEXT option allows the following actions to be performed on any unparsed data read by the CSV parser:

  • 'TRUNCATE': (the default). Removes any trailing data from a record
  • 'LAST COLUMN': Maps any trailing data into the last column of the stream - which must be a VARCHAR column.
  • 'NEW ROW': Creates a new row and adds the unparsed data into this new row - repeats until record data runs out.

Example

We can create a foreign table (the same rules apply for foreign streams):

CREATE OR REPLACE FOREIGN TABLE unparsed_text_demo_in 
( 
 C1 VARCHAR(16), 
 C2 VARCHAR(16),
 C3 VARCHAR(16)
) 
SERVER FILE_SERVER 
OPTIONS 
( 
 PARSER 'CSV', 
 SKIP_HEADER 'true', 
 UNPARSED_TEXT 'NEW ROW', 
 DIRECTORY '/tmp', 
 FILENAME_PATTERN 'unparsed_text_example.csv' 
);

Input file: /tmp/unparsed_text_example.csv

col1, col2, col3, col4, col5, col6
a,b,c,d
1,2,3,4,5,6
11,22,33,44
111,222,333,444,555
aaa,bbb,ccc,ddd,eee,fff,ggg,hhh

Outputs

UNPARSED_TEXT 'TRUNCATE'

+------------------+------------------+------------------+
|        C1        |        C2        |        C3        |
+------------------+------------------+------------------+
| a                | b                | c                |
| 1                | 2                | 3                |
| 11               | 22               | 33               |
| 111              | 222              | 333              |
| aaa              | bbb              | ccc              |
+------------------+------------------+------------------+

The first 3 fields are mapped to the 3 foreign stream columns; all additional fields in each record are dropped.

UNPARSED_TEXT 'LAST COLUMN'

+------------------+------------------+------------------+
|        C1        |        C2        |        C3        |
+------------------+------------------+------------------+
| a                | b                | c,d              |
| 1                | 2                | 3,4,5,6          |
| 11               | 22               | 33,44            |
| 111              | 222              | 333,444,555      |
| aaa              | bbb              | ccc,ddd,eee,fff, |
+------------------+------------------+------------------+

The first N-1 fields are mapped to the first N-1 columns; the remainder of the record is copied to the final column (including the separator characters). The last column must be a VARCHAR.

UNPARSED_TEXT 'NEW ROW'

+------------------+------------------+------------------+
|        C1        |        C2        |        C3        |
+------------------+------------------+------------------+
| a                | b                | c                |
| d                |                  |                  |
| 1                | 2                | 3                |
| 4                | 5                | 6                |
| 11               | 22               | 33               |
| 44               |                  |                  |
| 111              | 222              | 333              |
| 444              | 555              |                  |
| aaa              | bbb              | ccc              |
| ddd              | eee              | fff              |
| ggg              | hhh              |                  |
+------------------+------------------+------------------+

If there are more fields in the record than there are columns in the table, each file record is split into multiple rows in the foreign stream. If the number of fields is NF, and the number of columns is NC, then each record is split into CEIL(NF/NC) rows.

So we see that:

  • the first record has 4 fields and is split into 2 rows - the second row only consumes one field so the remaining columns are left NULL
  • the second record has 6 fields which are mapped to 2 rows, completely filling both rows
  • the final record has 8 fields which are mapped in turn to 3 rows, with the final column in the final row left null.

If the trailing fields cannot be cast into the data type of the columns they are mapped to, the usual ingest errors will be reported.

Sample Properties Implementing ECD Agent to Parse CSV Files

To parse CSV files with the ECD Agent, configure the options above using the ECD Agent property file with properties similar to the following:

ROWTYPE=RECORDTYPE(VARCHAR(2040) id, VARCHAR(2040) reported_at, VARCHAR(2040) shift_no, VARCHAR(2040) trip_no, VARCHAR(2040) route_variant_id)
DIRECTORY=/TMP
FILENAME_PATTERN=TRANSACTIONS\.LOG
PARSER=CSV
CHARACTER_ENCODING=UTF-8
SKIP_HEADER=TRUE

Ingestion Rules for CSV

s-Server applies the following coercion rules when parsing CSV data. All empty cells are cast as NULL.

CSV Source Cell numeric string non-numeric string
BIGINT, DECIMAL, DOUBLE, INT, SMALLINT, REAL, TINYINT Raise an exception if the JSON number lies beyond the maximum or minimum boundary of the target type. If the number has any decimal digits (including the vacuous .0) and it is being ingested into an integer column, then raise an error. Raise an error.
VARBINARY Raise an error. If the string, without quotes, is a valid SQL Standard BINARY literal, then we will ingest it, truncating and 0-filling as necessary. A Standard BINARY literal has the form X'...' where ... is a sequence of (case-insensitive) hex digits.
CHAR, VARCHAR Put double-quotes around the number and then ingest subject to the truncation/padding rules for strings. If the string won't fit in the target SQL character value, then excess trailing characters are discarded. CHAR values are space-padded up to their declared length.
TIME, TIMESTAMP Raise an error. OK if the string parses as a DATE/TIME/TIMESTAMP. Otherwise, raise an error. Strings are parsed per ISO standards at https://www.w3.org/TR/NOTE-datetime

Logging CSV Parser Error

When the CSV parser cannot parse the raw data, it fills the destination column with NULL value and throws an error at various level:

  1. SEVERE
  2. INFO
  3. WARNING

Using SOURCE_POSITION_KEY column, might help in identifying the row that throws an error however, it does not help in identifying which foreign stream field contains the issue. Therefore, to resolve the above-mentioned problem, the user can configure the properties in /var/log/sqlstream/Trace.properties that change the server behavior dynamically.

Property Name Default Value Description
com.sqlstream.aspen.namespace.common.parse.details 'false' Setting this property to true enables the behavior as described below.
com.sqlstream.aspen.namespace.common.parse.buffer.size '1' This determines how many errors are collected before an error description is logged to all_trace. An attempt to set this property to anything else will result in the property taking its default value

com.sqlstream.aspen.namespace.common.parse.details

When this property is set to true, the following information is logged for CSV Parser error:

  • When the parser can’t parse a field in the raw input, the CSV parser will log an explanatory WARNING to the error stream with the following column values:
ALL_TRACE Column Column Data Type Value Example
ERROR_LEVEL VARCHAR(10) NOT NULL WARNING
IS_ERROR BOOLEAN NOT NULL True
ERROR_CLASS INTEGER 1(For ingestion error)
8 (For mid-pipeline error)
SQL_STATE VARCHAR(5) A five digit alphanumeric code describing the error. Wherever possible, these SQLStates will be errors defined by the 2016 SQL Standard, part 2 (Foundation), subclause 24.1 (SQLSTATE). ‘22018’
ERROR_NAME VARCHAR(32) The corresponding Subcondition text from the SQL Standard. ‘Invalid character value for cast’
MESSAGE VARCHAR(4096) Detailed error message ‘Invalid character in number 'a'
DATA_ROW VARBINARY(32768) The unparsable gibberish X '61'
SOURCE_POSITION_KEY VARCHAR(4096) For ingestion errors, this is a string of the form
<streamName>:<columnName>:<inputStreamLocation>:<rowNumber>
For mid-pipeline errors, this column is NULL.
'“LOCALDB”.”TEST”.”SHORTROWCSV”:”SMALLINTCOL”:sql-1143-short.csv:00000000000000002'
  • This log also includes the unparsable bytes and the name of the destination column which will be set to NULL.
  • If the destination column was declared NOT NULL, then the row will be discarded.
  • If the destination column is nullable, then the parser will continue and attempt to parse the next field.

com.sqlstream.aspen.namespace.common.parse.buffer.size

The first error for a (columnName, sql_state) pair is always logged. When com.sqlstream.aspen.namespace.common.parse.buffer.size is set to N where N>1, then the parser will log WARNING only after N rows have incurred an error for a given (columnName, sql_state) combination. The next error is logged after N instances of this error are collected for a (columnName, sql_state) pair.

Being binary, the DATA_ROW column is not directly readable. You may decode it using the VARBINARY_TO_STRING function:

SELECT STREAM ...,VARBINARY_TO_STRING(DATA_ROW, 'UTF-8') ... FROM SYS_BOOT.MGMT.ALL_TRACE

The below table lists some of the SQLstates which will be logged by the parser:

SQLstate Error Name Example Detailed Message
‘01004’ ‘String data, right truncation’ ‘String data truncated at the end’
'22003' 'Numeric value out of range' 'Value '-2147483649' is out of range for parameter of type INTEGER'
'22004' 'Null value not allowed' 'Cannot assign NULL to non-nullable parameter 'TINYINTCOL' of type TINYINT'
‘22007’ ‘Invalid datetime format’ ‘Invalid date/time data’
‘22018’ 'Invalid character value for cast' 'Invalid character in number z'
‘50000’ ‘Coercion error’ Binary literal string must contain an even number of hexits’
‘50008’ ‘Regular expression match failure’ ‘Cannot match pattern '([0-9])111([0-9])222([^0-9]*)'’

Example

41/52 CALL sys_boot.mgmt.clearSavedTrace(); 
No rows affected (0.269 seconds) 
42/52 SELECT * FROM shortRowCsv; 
'SMALLINTCOL','VARCHARCOL' 
'1','1' 
'','2' '','three' 
3 rows selected (0.189 seconds) 
43/52 SELECT error_level,  error_class,  error_name,  sql_state, message, 
varbinary_to_string(data_row, 'UTF-8') AS gibberish,  source_position_key 
FROM sys_boot.mgmt.saved_trace WHERE is_error; 

The above terminal query will produce the list of errors like:

ERROR_LEVEL ERROR_CLASS ERROR_NAME SQL_STATE MESSAGE GIBBRISH SOURCE_pos_key
WARNING 1 Invalid character value for cast 22018 'Invalid character in number a a "LOCALDB"."TEST"."SHORTROWNOTNULLCSV":"SMALLINTCOL":sql-1143-short.csv:000000000000000001
WARNING 1 Null value not allowed 22004 Cannot assign NULL to non-nullable parameter 'SMALLINTCOL' of type SMALLINT "LOCALDB"."TEST"."SHORTROWNOTNULLCSV":"SMALLINTCOL":sql-1143-short.csv:000000000000000001