(new in SQLstream s-Server version 7.2.1)
Using s-Server, you can read from and write to Pulsar streams. For more information on Pulsar Plugin Configurations, refer http://pulsar.apache.org/docs/en/2.5.0/client-libraries-java/#client.
This topic contains the following subtopics:
Run the following script to install the Pulsar Plugin in sqllineClient.
CREATE OR REPLACE JAR sys_boot.sys_boot.pulsar LIBRARY 'file:plugin/pulsar/pulsar.jar' OPTIONS(0);
alter system add catalog jar sys_boot.sys_boot.pulsar;
CREATE OR REPLACE SERVER PULSAR_SERVER type 'PULSAR' foreign data wrapper ECDA;
The SQLstream Pulsar Sink Plugin supports all the configurations mentioned in the official documentation of Apache Pulsar. For more information, refer to http://pulsar.apache.org/docs/en/2.5.0/client-libraries-java/#client.
There are three sets of configurations that are used by the Pulsar Sink Plugin:
All of these configurations can be added either as:
NOTE: As well as providing the configuration options described here for the Pulsar sink, you will also have to define a value for the FORMATTER option; see Output Formats for Writing and also the Pulsar SQL pipeline example.
This table lists various Client configurations which can be used to write data to the pulsar sink.
Name | Type | Description | Default Value |
---|---|---|---|
serviceUrl | String | Service URL provider for Pulsar service. It is the URL to the Pulsar web service or broker service. If the Pulsar cluster is not enabled for TLS, enter either the web service or broker service URL in the following format: - Web service URL: http://<host name>:<web service port> - for example: http://localhost:8080 . - Broker service URL: pulsar://<host name>:<broker service port>. For example: pulsar://localhost:6650 If the Pulsar cluster is enabled for TLS, enter the secure broker service URL in the following format: pulsar+ssl://<host name>:<broker service TLS port> - for example: pulsar+ssl://localhost:6651 |
If useTls is set to true then: pulsar+ssl://localhost:6651 If useTls is set to false then: pulsar://localhost:6650 |
useTls | boolean | Whether to use TLS encryption on the connection. | false |
tlsTrustCertsFilePath | String | Path to the trusted TLS certificate file. It is required if useTls is set to true. | None |
authPluginClassName | String | Name of the authentication plugin. Example For TLS authentication we can have: org.apache.pulsar.client.impl.auth.AuthenticationTls |
None |
authParams | String | String represents parameters for the authentication plugin Example key1:val1,key2:val2 For TLS authentication we can have: tlsCertFile:/home/sqlstream/Downloads/my-ca/admin.cert.pem,tlsKeyFile:/home/sqlstream/Downloads/my-ca/admin.key-pk8.pem |
None |
This table lists various producer configurations which can be used to write data to the pulsar sink.
Name | Type | Description | Default Value |
---|---|---|---|
topicName | String | Represents the topic name. Pass the topicName with the topic namespace as follows: {persistent|non-persistent}:/ If you enter a topic name only and no topic namespace is specified, then Pulsar uses the default location persistent://public/default/ To publish to a persistent topic belonging to the public tenant in the default namespace, simply enter the topic name as follows: my-topic If the specified topic does not exist, Pulsar creates the topic when the pipeline starts. You can use expressions to define the topic name. For example: if the my-topic field in the record contains the topic name, enter the following as the topic name: persistent://my-tenant/my-namespace/${record:value("/my-topic")} Also, the namespace needs to be created explicitly. If the namespace does not exist, pulsar throws an exception. |
None |
producerName | String | Represents producer name | SQLStreamProducer |
sendTimeoutMs | long | Represents message send timeout in milliseconds. If a message is not acknowledged by a server before the sendTimeout expires, an error occurs. | 30000 |
blockIfQueueFull | boolean | If it is set to true, when the outgoing message queue is full, the Send and SendAsync methods of producer blocks, rather than failing and throwing errors. If it is set to false, when the outgoing message queue is full, the Send and SendAsync methods of producer fail and ProducerQueueIsFullError exceptions occur. The MaxPendingMessages parameter determines the size of the outgoing message queue. |
false |
maxPendingMessages | int | Represents the maximum size of a queue holding pending messages - messages waiting to receive an acknowledgment from a broker. By default, when the queue is full, all calls to the Send and SendAsync methods fail unless you set BlockIfQueueFull to true. |
1000 |
messageRoutingMode | MessageRoutingMode | Message routing logic for producers on partitioned topics. Note Apply the logic only when setting no key on messages. Available options are as follows: 1. pulsar.RoundRobinDistribution: round robin 2. pulsar.UseSinglePartition: publish all messages to a single partition 3. pulsar.CustomPartition: a custom partitioning scheme For more information on custom-router, please see https://pulsar.apache.org/docs/en/2.5.1/cookbooks-partitioned/ |
pulsar.RoundRobinDistribution |
batchingEnabled | boolean | Enable batching of messages. | true |
batchingMaxMessages | int | The maximum number of messages permitted in a batch. | 1000 |
compressionType | CompressionType | Message data compression type used by a producer. Available options: 1. LZ4 2. ZLIB 3. ZSTD 4. SNAPPY |
No compression |
This table lists some additional producer configurations which can be used to write data to the pulsar sink.
Name | Type | Description | Default Value |
---|---|---|---|
sendAsync | boolean | Enable Asynchronous publishing of messages. | true |
messageEncryption | boolean | Enable end-to-end message encryption. If enabled, providing publicKeyFile is mandatory. | false |
publicKeyFile | String | The absolute path to the RSA or ECDSA public key which is required to encrypt the messages. Also, make sure that you have specified the corresponding privateKeyFile at the consumer end for the messages to be consumed successfully. | null |
encryptionKey | String | The encryption key name. | sqlstream.key |
COUNT_SCHEDULER_TIME | long | Use to schedule a counter that prints the total number of messages whose acknowledgement has been received by the client, in the SQLstream trace log files. This is calculated in milliseconds. Set the value to 0 to disable the counter. | 0 |
PULSAR_CONFIG_FILE | String | Contains the absolute path of the pulsar properties file. If this option is not provided, the Pulsar plugin prints a WARNING and proceeds with the default configurations or the ones passed in the sink options. | None |
In writing to the Pulsar sink, you can declare and use these special egress columns.
Name | Type | Description | Default Value |
---|---|---|---|
PULSAR_PARTITION | INTEGER | When writing rows out of s-Server to a Pulsar topic, you can specify the topic partition to write the data as message to the indicated partition. In case of a NULL value in a particular row, that message is allocated to a random partition. If this column is not referenced, the data is written in RoundRobin Routing Mode. |
RoundRobin |
PULSAR_KEY | VARBINARY | Messages can optionally be tagged with keys, which can be useful for things like topic compaction. These are also used for Routing messages to the partition. For more information please see https://pulsar.apache.org/docs/en/concepts-messaging/#routing-modes | None |
PULSAR_TIMESTAMP | TIMESTAMP | This column is used to add EventTime to the message. It is a mandatory (non-nullable) value. If the PULSAR_TIMESTAMP column value for a particular row is NULL, then that row is discarded. | System’s Current Time |
CREATE OR REPLACE SCHEMA UTILS;
SET SCHEMA 'UTILS';
SET PATH 'UTILS';
CREATE OR REPLACE SCHEMA "miq_pcmd";
ALTER PUMP "miq_pcmd".* STOP;
DROP SCHEMA "miq_pcmd" CASCADE;
CREATE OR REPLACE SCHEMA "miq_pcmd";
-- file input foreign stream
CREATE OR REPLACE FOREIGN STREAM "miq_pcmd"."http_input_fs"
(
"sn_start_time" BIGINT,
"sn_end_time" BIGINT,
"radius_calling_station_id" VARCHAR(16),
"transaction_uplink_bytes" BIGINT
)
SERVER "FILE_SERVER"
OPTIONS (
"PARSER" 'CSV',
"CHARACTER_ENCODING" 'UTF-8',
"QUOTE_CHARACTER" '"',
"SEPARATOR" '^',
"SKIP_HEADER" 'true',
"DIRECTORY" '/home/sqlstream/Downloads/',
"FILENAME_PATTERN" 'data.csv'
);
----------
-- pipeline_1 1: Route to Pulsar
----------
CREATE OR REPLACE FOREIGN STREAM "miq_pcmd"."http_out_fs"
(
"sn_start_time" BIGINT,
"sn_end_time" BIGINT,
"radius_calling_station_id" VARCHAR(16),
"transaction_uplink_bytes" BIGINT
)
SERVER "PULSAR_SERVER"
OPTIONS (
"FORMATTER" 'CSV',
"COUNT_SCHEDULER_TIME" '1000', -- Milli seconds
"CHARACTER_ENCODING" 'UTF-8',
"PULSAR_CONFIG_FILE" '/home/sqlstream/Downloads/pulsar.properties'
);
CREATE OR REPLACE PUMP "miq_pcmd"."http_out_fs_pump" STOPPED AS
INSERT INTO "miq_pcmd"."http_out_fs"
SELECT STREAM * FROM "miq_pcmd"."http_input_fs";
ALTER PUMP "miq_pcmd".* START;
###############
# User-Defined
###############
#topicNamespace = persistent://public/sqlstream/
sendAsync = true
# End-to-End Encryption
# To enable end-to-end encryption set messageEncryption = true
# Default value false
# messageEncryption = true
# Default value for encryptionKey is sqlstream.key
# It is the name for the key
# encryptionKey = sqlstream_1.key
# RSA or ECDSA public and private key
# For producer publicKeyFile is mandatory
# publicKeyFile = /home/sqlstream/Downloads/my-ca/test_ecdsa_pubkey.pem
# privateKeyFile = /home/sqlstream/Downloads/my-ca/test_ecdsa_privkey.pem
##############
# Pre-Defined
##############
# topicName is a required property
# Name of the topic along with namespace specified
# Incase the namespace is not specified, default namespace persistent://public/default/ will be considered
topicName = persistent://public/sqlstream/trial-1
# Service URL
# When TLS enabled use pulsar+ssl://127.0.0.1:6651
# When TLS disabled use pulsar://127.0.0.1:6650
# eg. pulsar://apachepulsar-02.cloud.in.guavus.com:6650
serviceUrl = pulsar+ssl://127.0.0.1:6651
# TLS Encryption
useTls = true
tlsTrustCertsFilePath = /home/sqlstream/Downloads/my-ca/certs/ca.cert.pem
# Authentication
authPluginClassName = org.apache.pulsar.client.impl.auth.AuthenticationTls
authParams = tlsCertFile:/home/sqlstream/Downloads/my-ca/admin.cert.pem,tlsKeyFile:/home/sqlstream/Downloads/my-ca/admin.key-pk8.pem
# Batching and queue size
# By default batching is enabled with batchingMaxMessages 1000
# batchingEnabled = false
# batchingMaxMessages = 1000
# batchingMaxPublishDelayMicros = 1000
# Message Routing Mode
# By default it is RoundRobinPartition
messageRoutingMode = CustomPartition
# Max pending messages queue size
# By default it is 1000
maxPendingMessages = 20000
# Compression Type
# By default None. It supports ZLIB, LZ4, ZSTD and SNAPPY(ver 2.4*)
compressionType = ZLIB
# Message send timeout in ms
# If the message is not acknowledged by the broker before
# the sendTimeout expires, an error occurs
# Set to 0 for infinite. By default 30000
sendTimeoutMs = 0