Integrating Prometheus Pushgateway

Introduction

Real-time application metrices or counters generated from SQLstream pipeline can be pushed to Prometheus pushgateway which is scraped by Prometheus and the user can further visualize those metrices in form of graphs for analysis purpose.

This topic contains the following subtopics:

How it works

Prometheus is a metric system that works on a pull model. Data from Kafka, local file system or SFTP arrives at the SQLstream engine and various metrics are calculated and emitted by s-Server - such as total number of records dropped, total number of records processed and so on. SQLstream's Prometheus plugin uses the push gateway to send metrics to Prometheus.

It is common for SQLstream and the push gateway to be implemented as containers in the same Kubernetes pod.

For example, if you have a view containing the following counter metrics as columns in a SQLstream pipeline:

rat_type_total_unknown app_protocol_total_unknown ip_protocol_total_unknown
100 3 3

We would pivot each metric into a separate row in pivot_view(NAME, VAL) as shown below:

NAME VAL
rat_type_total_unknown 100
app_protocol_total_unknown 3
ip_protocol_total_unknown 3
  • This pivot_view view can be passed to push gateway and used for visualizations on Prometheus UI.
  • The Prometheus push gateway sink plugin actually expects three inputs: NAME, VAL and LABELS such as '*validation=unknown;component=7stream;*' respectively.

Example

The application pipeline

-- Input FS

CREATE OR REPLACE FOREIGN STREAM "demo"."input_data_fs"
(
 "msisdn" VARCHAR(32),
 "app_protocol" VARCHAR(16),
 "ip_protocol" VARCHAR(16),
 "rat_type" VARCHAR(16)
)
SERVER "FILE_SERVER"
OPTIONS (
 "PARSER" 'CSV',
 "CHARACTER_ENCODING" 'UTF-8',
 "ROW_SEPARATOR" u&'\000A',
 "SEPARATOR" ',',
 "SKIP_HEADER" 'false',
 "DIRECTORY" '/sample/7s/data',
 "FILENAME_PATTERN" '.+(\.csv)'
);

-- ghenerating some counter metrics
-- "DUMMY" column is used in the pivot below

CREATE OR REPLACE VIEW "demo"."v1_view" AS 
SELECT STREAM COUNT(CASE WHEN "rat_type" = 'Unknown' THEN 1 END) AS "rat_type_total_unknown",
       COUNT(CASE WHEN "app_protocol"='Unknown' THEN 1 END) AS "app_protocol_total_unknown" ,
       COUNT(CASE WHEN "ip_protocol"='Unknown'  THEN 1 END) AS "ip_protocol_total_unknown",
       1 AS "DUMMY"
FROM "demo"."input_data_fs" AS "input"
GROUP BY STEP("input"."ROWTIME" BY INTERVAL '1' MINUTE) ;

Pivoting the metrics

-- Make a view listing the metric names (one row per name)
-- be sure to TRIM the name, as VALUES space-extends all values to the same length as the longest in the set

CREATE OR REPLACE VIEW "demo"."metric_names"
AS SELECT 1 AS DUMMY, TRIM(EXPR$0) AS NAME
FROM (VALUES('rat_type_total_unknown'),('app_protocol_total_unknown'),('ip_protocol_total_unknown'));

--Transpose metric columns to rows - use the dummy column equi-join to simulate CROSS JOIN

CREATE OR REPLACE VIEW "demo"."pivot_view" AS 
SELECT STREAM N.NAME,
     , CASE N.NAME
         WHEN 'rat_type_total_unknown' THEN "rat_type_total_unknown" 
         WHEN 'app_protocol_total_unknown' THEN "app_protocol_total_unknown"
         WHEN 'ip_protocol_total_unknown' THEN "ip_protocol_total_unknown"
       END AS VAL
FROM "demo"."v1_view" V
JOIN "demo"."metric_names" N USING ("DUMMY");

Defining the Prometheus sink objects

-- Prometheus Server Definition : There is no pre-defined server so define it here

CREATE OR REPLACE JAR sys_boot.sys_boot.prometheus_plugin 
LIBRARY 'file:plugin/prometheus/prometheus.jar' OPTIONS(0);

alter system add catalog jar sys_boot.sys_boot.prometheus_plugin;

CREATE OR REPLACE SERVER Prom_Server
TYPE 'Prometheus'  FOREIGN DATA WRAPPER ECDA
OPTIONS (
  URL 'http://<ip address>:<port no>',
) ;

-- Using Prom_Server, inheriting the URL

CREATE OR REPLACE FOREIGN STREAM "demo"."pushgateway" (
  NAME VARCHAR(200),
  VAL DECIMAL,
  LABELS VARCHAR(500) options (header 'metric_labels')
) SERVER prom_server
OPTIONS
( JOB 'prometheus_job',
  METRIC_TYPE 'counter',
  NAMESPACE 'sample:enrichment',
  LABELS 'component=7stream;',
  formatter 'CSV'
);

Delivering metrics to the push gateway using a pump

CREATE OR REPLACE PUMP "demo"."pushgateway_pump" STOPPED AS
INSERT INTO "demo"."pushgateway"  
SELECT STREAM "NAME","VAL",'validation=unknown' AS "LABELS" 
FROM "demo"."pivot_view" 
;

Foreign Stream Options for Writing to a Prometheus pushgateway

Option Name Description
URL Required - denotes the Prometheus push gateway URL endpoint to which the metrices generated by the pipeline are sent.
JOB Required - An application identifier used to denote a specific application pipeline.
METRIC_TYPE Required - represents the type of metric, either 'counter' or 'gauge'. For more details, see https://www.prometheus.io/docs/concepts/metric_types/
Namespace A prefix that helps to identify multiple metrics name used across the application or a system. For example, in the _projectname:module:invalid_recordtotal metric, projectname:module: indicates a namespace.
Labels A global label that is applied to all the metrics using the specific Prometheus sink and can annotate a metric with very specific information such as component=7stream;. For more details, see https://prometheus.io/docs/practices/naming/