Pipelines as Microapps

Contents

Introduction

Here we discuss decomposing SQLstream applications into elastic micro-apps for deployment. This means subdividing a whole project into smaller components. “Sharded” pipelines are then distributed across s-Server instances running in a Kubernetes cluster.

Example

Consider the following streaming application. It is depicted using the schematic or “network” diagram found within StreamLab, SQLstream’s interactive, low-code GUI dev environment.

Diagram: Pipelines as Microapps

Note: You don’t need to use StreamLab for what follows. SQLLine, our command-line utility, would be sufficient. Primarily, this StreamLab screenshot is used to illustrate pipeline decomposition. That said, StreamLab makes it easy to generate DDL for creating and modifying SQLstream pipelines. So you may wish to try it out.

In this view of a StreamLab project, the entire streaming application consists of interconnected sources (outlined in green), sinks (outlined in red), pipelines and native streams (both outlined in blue). Note: In this example, there is also a single visualization dashboard (outlined in purple), found in the upper right.

Pipelines (shown with rounded corners) connect all other components. Each pipeline consists of a SQL script that modifies input data. Sources and sinks are streaming abstractions of external data entities (e.g. file systems, RDBMS, Kafka, cloud storage, and so forth). In contrast, native streams act as inputs / outputs internal to the application. They are a nexus between pipelines on either side, functioning as consumers and producers. For instance, the native stream all_edrs is the outcome of the ingestion pipeline. Thereafter, all_edrs bifurcates as the input to 2 different pipelines: sla_edrs and sla_alerts.

Decomposition

Every pipeline begins with a source and ends in a sink, proceeding through various internal streams and cascaded VIEWs, which transform the data, along the way. Data “moves” through a pipeline thanks to a PUMP, which functions like a continuously running query to SELECT … FROM a source stream and INSERT INTO a target stream.

Each pipeline in a streaming application has its PUMP, which is defined after DDL statements create the other SQL objects. Essentially, deploying that pipeline means starting the pump in an s-Server instance.

Thus, each pipeline within a streaming application can be considered as a single logical unit for deployment. And we can divide the entire project into a set of independent pipelines, each to be deployed as a scalable micro-app.

Diagram: Pipelines as Microapps

Prior to decomposition, this example application contained 3 native streams: all_edrs, vlogins, and sla_chart. Notice how these have changed. Previously, all_edrs functioned as both input and output. Now it has been replaced by all_edrs_i (reading input) and all_edrs_o (writing output). Instead of being native streams (outlined in blue) as before, both of these are now foreign streams — one a source (outlined in green) and one a sink (outlined in red). The same occurred with vlogins. Note: This has not been done for sla_chart because the only consumer of this native stream is a dashboard query.

In general, when decomposing a streaming application, the native streams can be handled in either of 2 ways:

  • Replace the native stream with a “persistent” stream. This could be a foreign stream based on Kafka or the file system, for instance. That foreign stream is then referenced as a source and as a sink. (This approach was used in the example above.)

  • Short-circuit the native stream. This would merge the 2 pipelines on either side of the native stream, effectively eliminating it.

Following the second approach, consider what would happen to the all_edrs native stream. From the example above, it contained 1 input pipeline (ingestion) and 2 output pipelines (sla_edrs and sla_alerts). To eliminate or bypass this native stream, you could merge ingestion and sla_edrs as a single pipeline called ingestion_plus_sla_edrs. Similarly, you would merge ingestion and sla_alerts as a single pipeline called ingestion_plus_sla_alerts.

Although this results in a longer SQL script for the merged pipeline, it can yield advantages:

  • Reduce IO Footprint — Avoid persistent storage, which would be needed if converting all_edrs to a foreign stream. This makes sense if the ingestion pipeline does not involve compute-intensive operations such as UDXs or UDAs.

  • Low Latency — If sla_alerts and sla_alerts further downstream need to maintain very low latency, then writing to and reading from an intermediate entity should be avoided.

  • Reduce Checkpointing Overhead

All pipelines can now be deployed independently, each in a separate container. Yet there is no requirement to allocate exactly 1 container per pipeline. As needed, a given pipeline can be sharded across multiple containers by assigning a range of source partitions.

Deployment

A DevOps engineer will access the catalog of these sharded pipelines and deploy them through Kubernetes using Helm charts or other methods. In addition to pipeline parameters, which are the same across all shards, each shard will have its own shard parameters. Among these are the following:

  • A unique shard ID

  • Partition range for the pipeline’s source(s)

  • Connection credentials for data sources and sinks

  • Number of shards

  • Latency and heartbeat semantics for each shard

The shard ID along with the pipeline name will be used for transaction semantics, watermarking and checkpointing. For more information, see the section Implementing Federation for s-Server.

Pipelines and shards can be templatized using Ansible or any similar templating framework. You can monitor deployed pipelines by inspecting telemetry information from running s-server instances. For practical guidelines, see the section Supplying Info to Containers.

Scaling

You should determine the appropriate degree of scaling for your streaming application based on data rates, payload sizes, degree of pipeline parallelism, resource requirements, and other factors.

Currently, scaling is static and performed manually at the time of deployment. To re-scale, all shards must be stopped and then re-deployed using new shard parameters. Auto-scaling is planned for a future SQLstream release.

Distribution Models

Depending on the kind of operations performed within a pipeline, you will want to consider various models or methods for combining shards. Generally speaking, this is done with UNION ALL.

Stateless / Ingestion

The diagram below shows a sharded ingestion pipeline. The degree of scale-out can be configured during deployment.

Diagram: Pipelines as Microapps

Sliding Windows

A partitioned parallel mode of distributing aggregations or JOINs requires shuffling data from source partitions to appropriate shards. This distribution pattern needs significant network bandwidth for the shuffle operation. If the partition key of aggregations is the same as the partition key of the source, then the shuffle operation is not needed.

Diagram: Pipelines as Microapps

Tumbling Windows

This model employs a rollup aggregation as an additional step:

Diagram: Pipelines as Microapps

Advantages

  • Does not require shuffle.
  • Performs well when domain of GROUP BY keys is small and good compression is obtained. In that case, memory usage is reasonable in individual shards and rollup aggregation can run at low throughput.

Disadvantages

  • Memory usage will be much larger than when shuffle is used, and local aggregations only operate on a slice of the domain.
  • If the domain is large and compression is low, the rollup aggregation may have to run at close to (or even higher than) the undistributed throughput of the aggregation.

JOIN

Pipelines containing a stream-table or stream-stream JOIN can follow the following distribution model:

Diagram: Pipelines as Microapps

Note: At least one side of the JOIN must be sharded for the JOIN key. In the case of an OUTER JOIN, both sides must be sharded for hte JOIN key.