Integrating MongoDB

The MongoDB ECDA adapter writes batches of data to a MongoDB collection. Currently, the MongoDB ECDA adapter works with JSON formatter only. In order to write to a MongoDB collection, you must first create:

  1. A MongoDB server object
  2. A foreign stream that defines the database and specific collection into which you are writing

The minimum credentials required to write to MongoDB are a fully qualified URL, the DB_NAME, and its DATA_COLLECTION. Next, you CREATE and INSERT INTO a foreign stream in order to write data to a MongoDB collection.

The MongoDB Sink plugin supports exactly once transaction semantics with the use of transactions and watermarking. The transactions are committed based on properties such as TRANSACTION_ROWTIME_LIMIT, TRANSACTION_ROW_LIMIT, and PREEMPTIVE_TIME_TO_COMMIT_IN_MS (if enabled). Properties take precedence in the order in which they appear.

The preemptive commit works on clock time of last input record and commits the data if no new data is coming for the configured duration of PREEMPTIVE_TIME_TO_COMMIT_IN_MS to MongoDB. This property can be easily enabled/disabled by setting appropriate string value for ENABLE_PREEMPTIVE_COMMIT.

For more information on MongoDB URLs, see https://docs.mongodb.com/manual/reference/connection-string/.

Design of MongoDB Sink

Writing to MongoDB Using SQL

To write to MongoDB, you need to create a foreign stream in SQL that references a prebuilt server object called Mongo_SERVER. In the foreign stream's options, you configure how s-Server connects to MongoDB. For more information on creating foreign streams, see the topic CREATE FOREIGN STREAM in the Streaming SQL Reference Guide.

You will also need to specify a formatter for the foreign stream. Specifying "formatter" as a foreign stream option tells s-Server that this foreign stream writes data. See Output Formats for Writing in this guide for more details.

Streams, like most SQL objects (but unlike data wrappers and servers), must be created within a schema. The following code first creates a schema in which to run the rest of the sample code below, then creates a foreign stream named MongoDBWriterSchema.MongoDBWriterStream.

CREATE OR REPLACE SCHEMA MongoDBWriterSchema;

CREATE OR REPLACE FOREIGN STREAM MongoDBWriterSchema.MongoDBWriterStream
(
"clientOS" VARCHAR(16),
"playerVersion" VARCHAR(32),
"sessionId" VARCHAR(64),
"clientIPAddress" VARCHAR(32),
"KPI" INTEGER,
"netmask" VARCHAR(32),
"inetValue" VARCHAR(32)  
 )
    SERVER MONGO_SERVER
    OPTIONS (
    "FORMATTER" 'JSON',
    "URL" 'mongodb://user:password@localhost:27017/',
    "DB_NAME" 'mongoDb',
    "DATA_COLLECTION" 'stream_data',
    "WTMK_COLLECTION" 'aa1_collection2',
    "ENABLE_TRANSACTIONS" 'false', 
    "MAX_CONN_POOL_SIZE" '50', 
    "MIN_CONN_POOL_SIZE" '2', 
    "MAX_CONN_POOL_WAIT_TIME_SEC" '60', 
    "MAX_CONN_POOL_IDLE_TIME_SEC" '60', 
    "ENABLE_TRANSACTIONS" 'true', 
    "TRANSACTION_ROWTIME_LIMIT" '30000', 
    "TRANSACTION_ROW_LIMIT" '50000', 
    "ENABLE_PREEMPTIVE_COMMIT" 'true', 
    "PREEMPTIVE_TIME_TO_COMMIT_IN_MS" '70000', 
    "UPSERT_FIND_QUERY_COLUMN_NAMES" 'clientOS,clientIPAddress,eth0.inet.inetValue',
    "CONFIG_PROPERTIES_FILE_PATH" '/tmp/mongo.properties',
    "COMMIT_METADATA_COLUMN_NAME" 'sessionId');

To begin writing data to MongoDB, you INSERT into MongoDBWriterSchema.MongoDBWriterStream. When MongoDBWriterSchema.MongoDBWriterStream receives rows, s-Server writes data to the MongoDB you have configured in the foreign stream options.

In most cases, you will want to set up a pump that writes data to MongoDBWriterSchema.MongoDBWriterStream. Pumps are INSERT macros that continually pass data from one point in a streaming pipeline to the other. See the topic CREATE PUMP in the s-Server Streaming SQL Reference Guide for more details.

You do so with code along the following lines:

CREATE OR REPLACE SCHEMA "Pumps";
SET SCHEMA '"Pumps"';

CREATE OR REPLACE PUMP "writerPump" STOPPED AS
--We recommend creating pumps as stopped
--then using ALTER PUMP "Pumps"."writerPump" START to start it
INSERT INTO MongoDBWriterSchema.MongoDBWriterStream
SELECT STREAM * FROM "MyStream";
--where "MyStream" is a currently existing stream

To start writing data, use the following code:

ALTER PUMP "Pumps"."writerPump" START;

Options for Writing to a MongoDB Collection

Option Definition Required Default
URL Fully qualified URL including, at minimum, a host name (or IP address or UNIX domain socket). URL can also include a username and password (these are passed to the MongoDB instance) and a port number and different Connection String Options. True
DB_NAME MongoDB database containing the collections. True
DATA_COLLECTION MongoDB collection to which data will be written. True
WTMK_COLLECTION MongoDB collection to which watermark will be written. False
COMMIT_METADATA_COLUMN_NAME Name of the column containing the watermark value. False
CONFIG_PROPERTIES_FILE_PATH Path of properties file for system properties of java application for keystore, truststore path and password. False
MAX_CONN_POOL_IDLE_TIME_SEC The maximum idle time of a pooled connection. A zero value indicates no limit to the idle time. False 60
MAX_CONN_POOL_SIZE The maximum number of connections allowed. Those connections will be kept in the pool when idle. False 50
MIN_CONN_POOL_SIZE The minimum number of connections. Those connections will be kept in the pool when idle, and the pool will ensure that it contains at least this minimum number. False 2
MAX_CONN_POOL_WAIT_TIME_SEC The maximum time that a thread may wait for a connection to become available. False 60
ENABLE_TRANSACTIONS If transactions are enabled to achieve exactly once. False False
TRANSACTION_ROWTIME_LIMIT Range in milliseconds. Allows all rows received from the input query that have ROWTIME values within the specified range to be committed in a single batch /transaction to the MongoDB Server. False 60000
TRANSACTION_ROW_LIMIT The number of rows to batch up before committing. False 20000
ENABLE_PREEMPTIVE_COMMIT If enabled, will commit data preemptively if no new records are coming. False False
PREEMPTIVE_TIME_TO_COMMIT_IN_MS The maximum time that a thread may wait after the last input record before committing to MongoDB Server. False 60000
OPTIONS_QUERY Optional. Lets you query a table to update adapter options at runtime. You can use this, for example, to set the URL option from a table, as in select myUrl as URL from TEST.mongo_options. For more details see the topic Using the Options Query Property. False
UPSERT_FIND_QUERY_COLUMN_NAMES A comma separated list of JSON field references used by MongoDB for upsert - see Upserting Documents in MongoDB. Each field reference may be the name of a root element or a dot-separated reference to a nested element. False

Upserting Documents in MongoDB

This plugin supports "upserting" into MongoDB. If the document does not exist it is inserted, otherwise it is replaced.

To upsert into MongoDB you must set the UPSERT_FIND_QUERY_COLUMN_NAMES option, providing a comma-separated list of JSON paths. Each multi-level path is dot-separated.

Example UPSERT_FIND_QUERY_COLUMN_NAMES value
Example with only root fields 'clientOS,clientIPAddress,eth0'
Example including a nested field 'clientOS,clientIPAddress,eth0.inet.inetValue'

Given an incoming document like this - as formatted by the JSON formatter:

{
    "clientOS": "Ubuntu",
    "playerVersion": "3.0.16",
    "sessionId": "sess_84266fdbd31d4c2c6d0665f7e8380fa3",
    "clientIPAddress": "192.168.43.5",
    "KPI": 78639,
    "eth0":{
        "netmask" : "255.255.240.0",
        "inet" :{
          "inetValue": "192.168.143.115"
        }
    }
}
  • The sink plugin finds stored documents in the MongoDB collection (if any) matching by the specified fields
    • If there is no such document, we add the input document into the collection
    • If there is one or more matching documents, we replace the first one we find.

To find out more about upsert in MongoDB, see bulk.find.upsert in the MongoDB documentation.

Security in MongoDB

For securing messages in transit through the use of Transport Layer Security (TLS) for MongoDB, you can provide the options for setting SSLContext. With MongoDB server, you can set up one-way SSL (where only the MongoDB server provides a certificate) or two-way SSL (where both the MongoDB Server and client application provide a certificate). The properties file supports the following properties:

Security Options for Connecting to MongoDB Collection

Option Definition Default
APP_USER User name that application uses to connect to MongoDb.
APP_PASSWORD Password of the application user encoded in Base64 format.
MONGO_TRUST_STORE_PATH Path to the client truststore.
MONGO_TRUST_STORE_PASSWORD Truststore password encoded in Base64 format.
MONGO_KEY_STORE_PATH Path to the client keystore.
MONGO_KEY_STORE_PASSWORD Keystore password encoded in Base64 format.
AUTH_SOURCE_DB The maximum number of connections allowed. Those connections will be kept in the pool when idle. admin
AUTH_MECHANISM Authentication mechanisms to confirm an identity and establish trust to ensure security in the driver and server before connecting. SCRAM-SHA-1
SSL_ENABLED If connection to MongoDB is secured via SSL. False
TWO_WAY_SSL_ENABLED If MongoDB connection is protected via two way SSL. False
INVALID_HOSTNAME_ALLOWED Define whether invalid host names should be allowed. False

The username and password are overridden if defined in config properties. If you don't want the passwords to be printed in the sql, you can use the config file to provide a username and password and not use the URL option. The following is an example of a mongo.properties file (assuming the option CONFIG_PROPERTIES_FILE_PATH path points to this mongo.properties file).

WaterMarking with MongoDB

The MongoDB adapter supports "at least once" semantics with watermarking functionality:

CREATE OR REPLACE VIEW "DI"."START_POSITION_QUERY" AS
    SELECT * FROM (
        VALUES(
            sys_boot.mgmt.mongo_watermark_string(
                'mongoDb', 
                'aa1_collection2', 
                '{{.Values.mongo.PUMP_NAME}}',
                '0:2',
                2,
                'localhost:27017',
                '/tmp/mongo.properties'
                )
            )
        ) AS options("STARTING_POSITION");

If just one timestamp watermark is required, MongoDB can fetch the minimum timestamp among available watermarks for each partition. The following example differs from the one above only insofar as it uses min_watermark_timestamp() rather than mongo_watermark_string():

CREATE OR REPLACE VIEW "DI"."START_POSITION_QUERY" AS 
    SELECT * FROM (
        VALUES(
            sys_boot.mgmt.min_watermark_timestamp(
                'mongoDb', 
                'aa1_collection2', 
                '{{.Values.mongo.PUMP_NAME}}',
                '0:2',
                2,
                'localhost:27017',
                '/tmp/mongo.properties'
                )
            )
        ) AS options("STARTING_POSITION");