Using s-Server, you can read from and write to IBM MQ. To read and write from MQ, you configure and launch the adapter in SQL, using foreign stream options.
Using IBM MQ, you can perform exactly once semantics. For more detail, see the topic Exactly once in IBM MQ in this guide.
The IBM MQ Adapter is based on 9.2.4.0 IBM MQ allclient and uses 2.0.1 JMS API.
This topic contains the following subtopics:
s-Server's IBM MQ Source plugin instantiates a JMS consumer, which will consume messages from a queue in point-to-point connection or a topic using a durable subscriber in pub-sub connection, and can be configured using SQL options. The adapter will read from a queue/topic that belongs to a particular queue manager running on the given host. Data can be ingested using any supported parser such as CSV, Avro, XML, or JSON. You can specify the parser as part of configuration options. See Reading from IBM MQ Using SQL below.
The IBM MQ Source plugin supports exactly once semantics using a persistent volume.
To prevent message loss, we use a persistent volume. The messages from an MQ queue/topic are read and written to a file in the following format:
<JMS_Message_ID_Of_first_message>,<count_of_messages_in_file>
Only when the file is created and saved to the persistent volume do we acknowledge the messages and only then remove them from IBM MQ's queue or durable topic subscription. Now using the files created, a file thread reads these files and sends them to the SQLstream pipeline for processing.
To read from IBM MQ, you need to create a foreign stream in SQL that references a prebuilt server object called IBMMQ_SERVER. The foreign stream's definition contains connection information for the IBM MQ server. You can also define provenance columns that let you pass metadata of the files created into s-Server. See Using Provenance Columns below.
You will also need to specify a parser for the foreign stream, such as 'CSV'. Specifying "PARSER" as a foreign stream option tells s-Server that this foreign stream reads data.
CREATE OR REPLACE FOREIGN STREAM "DI"."data_edrflow_fs"
(
"test" VARCHAR(32),
"SQLSTREAM_PROV_FILE_SOURCE_FILE" VARCHAR(256),
"SQLSTREAM_PROV_ROW_PAYLOAD" VARCHAR(4096) NOT NULL,
"SQLSTREAM_PROV_FILE_MODIFIED_TIME" BIGINT,
"SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" BIGINT
)
SERVER "IBMMQ_SERVER"
OPTIONS (
"QUEUE_MANAGER" 'QM1',
"SERVER_HOST" 'localhost',
"SERVER_PORT" '1414',
"APPLICATION_USER" 'app',
"APPLICATION_PASSWORD" 'cGFzc3cwcmQ=',
"QUEUE" 'DEV.QUEUE.1',
"SERVER_CHANNEL" 'DEV.APP.SVRCONN',
"PARSER" 'CSV',
"CHARACTER_ENCODING" 'UTF-8',
"TRANSACTION_ROW_LIMIT" '10000',
"TRANSACTION_ROWTIME_LIMIT" '20000',
"INGRESS_TEMP_FILE_RETENTION_IN_MIN" '1',
"INGRESS_TEMP_FILE_PATH" '/tmp',
"CONFIG_FILE_PATH" 'home/sqlstream/keystore/ssl.properties',
"CIPHER_SUITE" 'TLS_RSA_WITH_AES_128_CBC_SHA256',
"WATERMARK_QUERY" 'SELECT * FROM "DI".START_POSITION_QUERY'
);
In reading from IBM MQ, you can declare provenance columns. These return metadata for the file created by the MQ foreign stream from which you are reading.
These are as follows:
Option | Data type | Description |
---|---|---|
SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER | BIGINT | Adds the current line number to the output columns. Line number starts from 0. |
SQLSTREAM_PROV_FILE_MODIFIED_TIME | BIGINT | Adds the modified time of the file being read to the output columns. |
SQLSTREAM_PROV_FILE_SOURCE_FILE | VARCHAR | Adds the name of the file being read to the output columns. |
SQLSTREAM_PROV_ROW_PAYLOAD | VARCHAR | Adds the complete line present in the file to the output columns. |
Option | Description | Required | Default |
---|---|---|---|
QUEUE_MANAGER | Queue manager name of the IBM MQ. | True | |
SERVER_HOST | Hostname of the IBM MQ Server | True | |
SERVER_PORT | Listener Port of the IBM MQ Server | True | |
SERVER_CHANNEL | Channel name used by the queue with data. | True | |
APPLICATION_USER | User name that application uses to connect to MQ | False | |
APPLICATION_PASSWORD | Password of the application user encoded in base64 format. | False | |
QUEUE | Name of the queue from which you are receiving data. Must specify either QUEUE or TOPIC but not both. | False | |
TOPIC | Name of the IBM MQ topic from which you are receiving data. Must specify either QUEUE or TOPIC but not both. | False | |
TRANSACTION_ROW_LIMIT | Commit Batch size of the files created from the messages. | False | 2000 |
TRANSACTION_ROWTIME_LIMIT | Commit time interval in ms after which batch is acknowledged and created if the count of messages doesn’t reach batch size. | False | 20000 |
WATERMARK_QUERY | Watermark query which returns string in watermark MQ format from the sink. | False | |
CHARACTER_ENCODING | Character encoding of the data | False | UTF-8 |
INGRESS_TEMP_FILE_PATH | Persistent volume path where the files will get created. | False | /tmp |
INGRESS_TEMP_FILE_RETENTION_IN_MIN | Retention of files in the persistent path if watermark query is not provided. | False | 1 |
INGRESS_TEMP_FILE_LIMIT | Limiting maximum number of temp files in persistent path. | False | 100 |
CIPHER_SUITE | MQ Supported Cipher Spec corresponding to the channel used. | False | TLS_RSA_WITH_AES_128_CBC_SHA256 |
CONFIG_FILE_PATH | Path of properties file for system properties of Java application for keystore, truststore path and password | False |
The IBM MQ adapter writes batches of data to an IBM MQ queue or publishes batches of data to an MQ topic. In order to write to a MQ queue/topic, you must first define a server object for the IBM MQ server with information on its host, port, queue name or topic name, channel and queue manager. s-Server can write data to IBM MQ formatted using any of the supported formatters such as CSV, JSON, XML, or Avro.
s-Server's IBM MQ Sink plugin instantiates a JMS producer, which will produce messages from a queue in point-to-point connection or publish to a topic in pub/sub connection in batches, you configure and launch the adapter in SQL using a foreign stream.
It uses session transactions. Using it, you can perform exactly once semantics between s-Server and IBM MQ.
For writing data to IBM MQ, we can use one topic or queue for the data and one queue for storing watermark if we need exactly once semantics. If we do not provide a watermark queue it will work in at most once semantic. Batch of messages is written depending on size or time whichever condition reaches first. The adapter uses session transactions in JMS Client to produce a batch of message, delete the watermark from the watermark queue and add new watermark to the watermark queue, which is atomic using transactions in JMS.
SQLSTREAM_POSITION_KEY or ROWTIME is used as the watermark column to provide the watermark. If SQLSTREAM_POSITION_KEY is present in the sink foreign stream, then it will be used as a watermark column; if not ROWTIME column will be used as the default watermark column from where the watermark can be fetched.
Like all streams, foreign streams must be defined within schemas. The following code first creates a schema then creates a foreign stream called ibm_output_fs with the predefined server IBMMQ_SERVER as a server option. To transfer data into IBM MQ using this stream, you will need to INSERT into it using a pump.
CREATE OR REPLACE FOREIGN STREAM "DI"."ibm_output_fs"
(
"test" VARCHAR(128),
"SQLSTREAM_POSITION_KEY" VARCHAR(512)
)
SERVER "IBMMQ_SERVER"
OPTIONS (
"FORMATTER" 'CSV',
"FORMATTER_INCLUDE_ROWTIME" 'false',
"SERVER_HOST" 'localhost',
"SERVER_PORT" '1414',
"SERVER_CHANNEL" 'DEV.APP.SVRCONN',
"QUEUE_MANAGER" 'QM1',
"APPLICATION_USER" 'app',
"APPLICATION_PASSWORD" 'cGFzc3cwcmQ=',
"QUEUE" 'DEV.QUEUE.1',
"WATERMARK_QUEUE_NAME" 'DEV.QUEUE.1',
"TRANSACTION_ROW_LIMIT" '5000',
"TRANSACTION_ROWTIME_LIMIT" '30000',
"ENABLE_IBM_MESSAGE_PERSISTENCE" 'false',
"CONFIG_FILE_PATH" 'home/sqlstream/keystore/ssl.properties',
"CIPHER_SUITE" 'TLS_RSA_WITH_AES_128_CBC_SHA256'
);
Option | Description | Required | Default |
---|---|---|---|
QUEUE_MANAGER | Queue manager name of the IBM MQ. | True | |
SERVER_HOST | Hostname of the IBM MQ Server | True | |
SERVER_PORT | Listener Port of the IBM MQ Server | True | |
SERVER_CHANNEL | Channel name used by the queue where data needs to be written. | True | |
APPLICATION_USER | User name that application uses to connect to IBM MQ | False | |
APPLICATION_PASSWORD | Password of the application user encoded in base64 format. | False | |
QUEUE | Name of the queue to which you are sending data. Must specify either QUEUE or TOPIC but not both. | False | |
TOPIC | Name of the IBM MQ topic to which you are sending data. Must specify either QUEUE or TOPIC but not both. | False | |
WATERMARK_QUEUE_NAME | Queue name to where the watermark will be stored. | False | |
ASYNC_DISABLED | If async is enabled the put property of producer is disabled. | False | |
TRANSACTION_ROW_LIMIT | Commit Batch size of the messages produced. | False | 5000 |
TRANSACTION_ROWTIME_LIMIT | Commit time interval in ms after which batch is committed. | False | 20000 |
SQLSTREAM_POSITION_KEY | Column name from row where the watermark is provided to the plugin. | False | |
ENABLE_IBM_MESSAGE_PERSISTENCE | Persistence for the messages produced to the queue. | False | False |
CIPHER_SUITE | MQ Supported Cipher Spec corresponding to the channel used. | False | TLS_RSA_WITH_AES_128_CBC_SHA256 |
CONFIG_FILE_PATH | Path of properties file for system properties of java application for keystore, truststore path and password | False |
For securing messages in transit through the use of Transport Layer Security (TLS) for MQ, we can provide the options of MQ Supported CIPHER_SUITE and SYSTEM_PROPERTIES. The file present in system properties path can have anonymous authentication (where only the MQ server provides a certificate) or mutual authentication (both the MQ server and your client application provide a certificate) depending we have provided a keystore or not in the properties. The properties file supports following properties.
Property | Description |
---|---|
trustStore | Path to the client truststore |
trustStorePassword | Truststore password encode in base64 |
keyStore | Path to the client keystore |
keyStorePassword | Keystore password encode in base64 |
username | User name that application uses to connect to MQ |
password | Password of the application user encoded in base64 format. |
The username and password is overridden if defined in config properties, if we don't want the passwords to be printed in the sql, we can use the config file to provide a username and password and not use the option parameter. Following can be an example of ssl.properties file (assuming the option SYSTEM_PROPERTIES path point to ssl.properties file).
trustStore = /home/sqlstream/keystore/clientTruststore.p12
trustStorePassword = cGFzc3cwcmQ=
keyStore = /home/sqlstream/keystore/clientKeystore.p12
keyStorePassword = cGFzc3cwcmQ=
username = app
password = cGFzc3cwcmQ=
Exactly once semantics can be achieved using the SQLstream IBM MQ plugin, provided some conditions are satisfied in the IBM MQ server:
We must define a persistent volume path INGRESS_TEMP_FILE_PATH in the foreign stream; if the path provided is not reliable then there can be data loss scenarios on disk failure .
We must provide a WATERMARK_QUERY in the foreign stream, since if it is not provided, the file where the messages are stored might be deleted before being sent to the pipeline due to the retention policy, if the retention time is less than the processing time of the pipeline.
The watermark provided through the watermark query must be in following format:
CAST("SQLSTREAM_PROV_FILE_MODIFIED_TIME" AS VARCHAR(150)) || '-' || CAST("SQLSTREAM_PROV_FILE_SOURCE_FILE" AS VARCHAR(256)) || '-' || CAST("SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" AS VARCHAR(150)) as "SQLSTREAM_POSITION_KEY"
We must provide either:
If SQLSTREAM_POSITION_KEY is present in the foreign stream then it will be used as watermark column; if not, then ROWTIME is used as the default watermark column name where the watermark is stored.
Below is an example pipeline which uses IBM Source and IBM Sink in exactly once semantics. Here we are using the same host for both sink and source, which is not necessary. We can have separate MQ servers to read from and write to if needed.
--
-- CREATE OR REPLACE VIEW "DI"."START_POSITION_QUERY" AS
-- SELECT * FROM (VALUES(sys_boot.mgmt.ibmmq_watermark_string_with_config('host',
-- port, 'channel','queue manager','watermark queue',
-- 'cipher','config path'))) AS options("STARTING_POSITION");
CREATE OR REPLACE VIEW "DI"."START_POSITION_QUERY" AS SELECT * FROM (VALUES(sys_boot.mgmt.ibmmq_watermark_string_with_config('localhost', 1414, 'DEV.APP1.SVRCONN','QM1','DEV.QUEUE.3','TLS_RSA_WITH_AES_128_CBC_SHA256','home/sqlstream/keystore/ssl.properties'))) AS options("STARTING_POSITION");
CREATE OR REPLACE FOREIGN STREAM "DI"."ibm_input_fs"
(
"test" VARCHAR(32),
"SQLSTREAM_PROV_FILE_SOURCE_FILE" VARCHAR(256),
"SQLSTREAM_PROV_ROW_PAYLOAD" VARCHAR(4096) NOT NULL,
"SQLSTREAM_PROV_FILE_MODIFIED_TIME" BIGINT,
"SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" BIGINT,
"unparsed_attributes" VARCHAR(4096)
)
SERVER "IBMMQ_SERVER"
OPTIONS (
"QUEUE_MANAGER" 'QM1',
"SERVER_HOST" 'localhost',
"SERVER_PORT" '1414',
"APPLICATION_USER" 'app',
"APPLICATION_PASSWORD" 'cGFzc3cwcmQ=',
"QUEUE" 'DEV.QUEUE.1',
"SERVER_CHANNEL" 'DEV.APP1.SVRCONN',
"PARSER" 'CSV',
"CHARACTER_ENCODING" 'UTF-8',
"TRANSACTION_ROW_LIMIT" '10000',
"TRANSACTION_ROWTIME_LIMIT" '20000',
"INGRESS_TEMP_FILE_RETENTION_IN_MIN" '1',
"INGRESS_TEMP_FILE_PATH" '/tmp',
"CONFIG_FILE_PATH" 'home/sqlstream/keystore/ssl.properties',
"CIPHER_SUITE" 'TLS_RSA_WITH_AES_128_CBC_SHA256',
"WATERMARK_QUERY" 'SELECT * FROM "DI".START_POSITION_QUERY'
);
CREATE OR REPLACE VIEW "DI"."view_1" AS
SELECT STREAM
"test",
CAST("SQLSTREAM_PROV_FILE_MODIFIED_TIME" AS VARCHAR(150)) || '-' || CAST("SQLSTREAM_PROV_FILE_SOURCE_FILE" AS VARCHAR(256)) || '-' || CAST("SQLSTREAM_PROV_TEXT_PARSE_LINE_NUMBER" AS VARCHAR(150)) as "SQLSTREAM_POSITION_KEY"
FROM "DI"."ibm_input_fs" AS "input";
CREATE OR REPLACE FOREIGN STREAM "DI"."ibm_output_fs"
(
"test" VARCHAR(128),
"SQLSTREAM_POSITION_KEY" VARCHAR(512)
)
SERVER "IBMMQ_SERVER"
OPTIONS (
"FORMATTER" 'CSV',
"FORMATTER_INCLUDE_ROWTIME" 'false',
"SERVER_HOST" 'localhost',
"SERVER_PORT" '1414',
"SERVER_CHANNEL" 'DEV.APP1.SVRCONN',
"QUEUE_MANAGER" 'QM1',
"APPLICATION_USER" 'app',
"APPLICATION_PASSWORD" 'cGFzc3cwcmQ=',
"QUEUE" 'DEV.QUEUE.2',
"WATERMARK_QUEUE_NAME" 'DEV.QUEUE.3',
"TRANSACTION_ROW_LIMIT" '5000',
"TRANSACTION_ROWTIME_LIMIT" '30000',
"ENABLE_IBM_MESSAGE_PERSISTENCE" 'false',
"CONFIG_FILE_PATH" 'home/sqlstream/keystore/ssl.properties',
"CIPHER_SUITE" 'TLS_RSA_WITH_AES_128_CBC_SHA256'
);
CREATE OR REPLACE PUMP "DI"."FS_PUMP" STOPPED AS
INSERT INTO "DI"."ibm_output_fs"
SELECT STREAM "test"
FROM "DI"."view_1" AS "input";
ALTER PUMP "DI".* START;