The MongoDB ECDA adapter writes batches of data to a MongoDB collection. Currently, the MongoDB ECDA adapter works with JSON formatter only. In order to write to a MongoDB collection, you must first create:
The minimum credentials required to write to MongoDB are a fully qualified URL, the DB_NAME, and its DATA_COLLECTION. Next, you CREATE and INSERT INTO a foreign stream in order to write data to a MongoDB collection.
The MongoDB Sink plugin supports exactly once transaction semantics with the use of transactions and watermarking. The transactions are committed based on properties such as TRANSACTION_ROWTIME_LIMIT, TRANSACTION_ROW_LIMIT, and PREEMPTIVE_TIME_TO_COMMIT_IN_MS (if enabled). Properties take precedence in the order in which they appear.
The preemptive commit works on clock time of last input record and commits the data if no new data is coming for the configured duration of PREEMPTIVE_TIME_TO_COMMIT_IN_MS to MongoDB. This property can be easily enabled/disabled by setting appropriate string value for ENABLE_PREEMPTIVE_COMMIT.
For more information on MongoDB URLs, see https://docs.mongodb.com/manual/reference/connection-string/.
To write to MongoDB, you need to create a foreign stream in SQL that references a prebuilt server object called Mongo_SERVER. In the foreign stream's options, you configure how s-Server connects to MongoDB. For more information on creating foreign streams, see the topic CREATE FOREIGN STREAM in the Streaming SQL Reference Guide.
You will also need to specify a formatter for the foreign stream. Specifying "formatter" as a foreign stream option tells s-Server that this foreign stream writes data. See Output Formats for Writing in this guide for more details.
Streams, like most SQL objects (but unlike data wrappers and servers), must be created within a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named MongoDBWriterSchema.MongoDBWriterStream.
CREATE OR REPLACE SCHEMA MongoDBWriterSchema;
CREATE OR REPLACE FOREIGN STREAM MongoDBWriterSchema.MongoDBWriterStream
(
"clientOS" VARCHAR(16),
"playerVersion" VARCHAR(32),
"sessionId" VARCHAR(64),
"clientIPAddress" VARCHAR(32),
"KPI" INTEGER,
"netmask" VARCHAR(32),
"inetValue" VARCHAR(32)
)
SERVER MONGO_SERVER
OPTIONS (
"FORMATTER" 'JSON',
"URL" 'mongodb://user:password@localhost:27017/',
"DB_NAME" 'mongoDb',
"DATA_COLLECTION" 'stream_data',
"WTMK_COLLECTION" 'aa1_collection2',
"ENABLE_TRANSACTIONS" 'false',
"MAX_CONN_POOL_SIZE" '50',
"MIN_CONN_POOL_SIZE" '2',
"MAX_CONN_POOL_WAIT_TIME_SEC" '60',
"MAX_CONN_POOL_IDLE_TIME_SEC" '60',
"ENABLE_TRANSACTIONS" 'true',
"TRANSACTION_ROWTIME_LIMIT" '30000',
"TRANSACTION_ROW_LIMIT" '50000',
"ENABLE_PREEMPTIVE_COMMIT" 'true',
"PREEMPTIVE_TIME_TO_COMMIT_IN_MS" '70000',
"UPSERT_FIND_QUERY_COLUMN_NAMES" 'clientOS,clientIPAddress,eth0.inet.inetValue',
"CONFIG_PROPERTIES_FILE_PATH" '/tmp/mongo.properties',
"COMMIT_METADATA_COLUMN_NAME" 'sessionId');
To begin writing data to MongoDB, you INSERT into MongoDBWriterSchema.MongoDBWriterStream. When MongoDBWriterSchema.MongoDBWriterStream receives rows, s-Server writes data to the MongoDB you have configured in the foreign stream options.
In most cases, you will want to set up a pump that writes data to MongoDBWriterSchema.MongoDBWriterStream. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.
You do so with code along the following lines:
CREATE OR REPLACE SCHEMA "Pumps";
SET SCHEMA '"Pumps"';
CREATE OR REPLACE PUMP "writerPump" STOPPED AS
--We recommend creating pumps as stopped
--then using ALTER PUMP "Pumps"."writerPump" START to start it
INSERT INTO MongoDBWriterSchema.MongoDBWriterStream
SELECT STREAM * FROM "MyStream";
--where "MyStream" is a currently existing stream
To start writing data, use the following code:
ALTER PUMP "Pumps"."writerPump" START;
Option | Definition | Required | Default |
---|---|---|---|
URL | Fully qualified URL including, at minimum, a host name (or IP address or UNIX domain socket). URL can also include a username and password (these are passed to the MongoDB instance) and a port number and different Connection String Options. | True | |
DB_NAME | MongoDB database containing the collections. | True | |
DATA_COLLECTION | MongoDB collection to which data will be written. | True | |
WTMK_COLLECTION | MongoDB collection to which watermark will be written. | False | |
COMMIT_METADATA_COLUMN_NAME | Name of the column containing the watermark value. | False | |
CONFIG_PROPERTIES_FILE_PATH | Path of properties file for system properties of java application for keystore, truststore path and password. | False | |
MAX_CONN_POOL_IDLE_TIME_SEC | The maximum idle time of a pooled connection. A zero value indicates no limit to the idle time. | False | 60 |
MAX_CONN_POOL_SIZE | The maximum number of connections allowed. Those connections will be kept in the pool when idle. | False | 50 |
MIN_CONN_POOL_SIZE | The minimum number of connections. Those connections will be kept in the pool when idle, and the pool will ensure that it contains at least this minimum number. | False | 2 |
MAX_CONN_POOL_WAIT_TIME_SEC | The maximum time that a thread may wait for a connection to become available. | False | 60 |
ENABLE_TRANSACTIONS | If transactions are enabled to achieve exactly once. | False | False |
TRANSACTION_ROWTIME_LIMIT | Range in milliseconds. Allows all rows received from the input query that have ROWTIME values within the specified range to be committed in a single batch /transaction to the MongoDB Server. | False | 60000 |
TRANSACTION_ROW_LIMIT | The number of rows to batch up before committing. | False | 20000 |
ENABLE_PREEMPTIVE_COMMIT | If enabled, will commit data preemptively if no new records are coming. | False | False |
PREEMPTIVE_TIME_TO_COMMIT_IN_MS | The maximum time that a thread may wait after the last input record before committing to MongoDB Server. | False | 60000 |
OPTIONS_QUERY | Optional. Lets you query a table to update adapter options at runtime. You can use this, for example, to set the URL option from a table, as in select myUrl as URL from TEST.mongo_options . For more details see the topic Using the Options Query Property. |
False | |
UPSERT_FIND_QUERY_COLUMN_NAMES | A comma separated list of JSON field references used by MongoDB for upsert - see Upserting Documents in MongoDB. Each field reference may be the name of a root element or a dot-separated reference to a nested element. | False |
This plugin supports "upserting" into MongoDB. If the document does not exist it is inserted, otherwise it is replaced.
To upsert into MongoDB you must set the UPSERT_FIND_QUERY_COLUMN_NAMES option, providing a comma-separated list of JSON paths. Each multi-level path is dot-separated.
Example | UPSERT_FIND_QUERY_COLUMN_NAMES value |
---|---|
Example with only root fields | 'clientOS,clientIPAddress,eth0' |
Example including a nested field | 'clientOS,clientIPAddress,eth0.inet.inetValue' |
Given an incoming document like this - as formatted by the JSON formatter:
{
"clientOS": "Ubuntu",
"playerVersion": "3.0.16",
"sessionId": "sess_84266fdbd31d4c2c6d0665f7e8380fa3",
"clientIPAddress": "192.168.43.5",
"KPI": 78639,
"eth0":{
"netmask" : "255.255.240.0",
"inet" :{
"inetValue": "192.168.143.115"
}
}
}
To find out more about upsert in MongoDB, see bulk.find.upsert in the MongoDB documentation.
For securing messages in transit through the use of Transport Layer Security (TLS) for MongoDB, you can provide the options for setting SSLContext. With MongoDB server, you can set up one-way SSL (where only the MongoDB server provides a certificate) or two-way SSL (where both the MongoDB Server and client application provide a certificate). The properties file supports the following properties:
Option | Definition | Default |
---|---|---|
APP_USER | User name that application uses to connect to MongoDb. | |
APP_PASSWORD | Password of the application user encoded in Base64 format. | |
MONGO_TRUST_STORE_PATH | Path to the client truststore. | |
MONGO_TRUST_STORE_PASSWORD | Truststore password encoded in Base64 format. | |
MONGO_KEY_STORE_PATH | Path to the client keystore. | |
MONGO_KEY_STORE_PASSWORD | Keystore password encoded in Base64 format. | |
AUTH_SOURCE_DB | The maximum number of connections allowed. Those connections will be kept in the pool when idle. | admin |
AUTH_MECHANISM | Authentication mechanisms to confirm an identity and establish trust to ensure security in the driver and server before connecting. | SCRAM-SHA-1 |
SSL_ENABLED | If connection to MongoDB is secured via SSL. | False |
TWO_WAY_SSL_ENABLED | If MongoDB connection is protected via two way SSL. | False |
INVALID_HOSTNAME_ALLOWED | Define whether invalid host names should be allowed. | False |
The username and password are overridden if defined in config properties. If you don't want the passwords to be printed in the sql, you can use the config file to provide a username and password and not use the URL option. The following is an example of a mongo.properties file (assuming the option CONFIG_PROPERTIES_FILE_PATH path points to this mongo.properties file).
The MongoDB adapter supports "at least once" semantics with watermarking functionality:
CREATE OR REPLACE VIEW "DI"."START_POSITION_QUERY" AS
SELECT * FROM (
VALUES(
sys_boot.mgmt.mongo_watermark_string(
'mongoDb',
'aa1_collection2',
'{{.Values.mongo.PUMP_NAME}}',
'0:2',
2,
'localhost:27017',
'/tmp/mongo.properties'
)
)
) AS options("STARTING_POSITION");
If just one timestamp watermark is required, MongoDB can fetch the minimum timestamp among available watermarks for each partition. The following example differs from the one above only insofar as it uses min_watermark_timestamp() rather than mongo_watermark_string():
CREATE OR REPLACE VIEW "DI"."START_POSITION_QUERY" AS
SELECT * FROM (
VALUES(
sys_boot.mgmt.min_watermark_timestamp(
'mongoDb',
'aa1_collection2',
'{{.Values.mongo.PUMP_NAME}}',
'0:2',
2,
'localhost:27017',
'/tmp/mongo.properties'
)
)
) AS options("STARTING_POSITION");