The world’s leading publication for data science, AI, and ML professionals.

Batch And Streaming Demystified For Unification

Understand how batch can be considered a subset of streaming and why data engineering should simplify its usage significantly

Photo by Felix Mittermeier on Unsplash
Photo by Felix Mittermeier on Unsplash

Table Of Contents

· Principles · Blocking and stateful operators · Streaming is batching and batching is streaming? · Data windows · Event time and processing time · Exactly-once semantics · Scaling to the enterprise level

Much of the discussion around batch versus stream processing centers on their high-level differences. However, the real distinction is more nuanced if we dig deeper. By closely examining the underlying principles of both data processing approaches, we can recognize their similarities. They actually have so much in common that it allows us to largely abstract away the technical differences.

This means that while application developers still need to choose whether their use case is better served by stream or Batch Processing style, they should not have to think much about the different technical implementation anymore. This would greatly simplify their primary focus on implementing business logic.

Let’s examine the basics of streaming for very low latency requirements and compare it to the more familiar batch processing that is applicable to more tolerant latency requirements. I’ll use tools available on all Unix-like systems and simple Python programs to illustrate the principles. This will allow you to play with the concepts to better understand the commonalities, why it is so beneficial to unify batch and streaming, and how it can be implemented. Finally, I’ll cover some popular systems available for scaling the concepts to the enterprise level.

Principles

The core concept of streaming is to consume data as quickly as possible as it becomes available from the producer.

Batch and Streaming - Image by author
Batch and Streaming – Image by author

To achieve this low-latency processing, streaming typically reads data from a stream input, which functions like a FIFO (first in, first out) pipe. This pipe continuously supplies data as the producer writes into it.

The data transformation logic of programs like stream 1 and stream 2 can be applied as soon as the data is available, with the output immediately written to another stream input/output. As a result, the data stream is directly passed through the chain of programs, all running concurrently.

A basic coordination between the producer and consumer is necessary to enable this processing style. If the producer writes data faster than the consumer can read, the operating system will block the producer. Conversely, if the consumer reads faster than the producer can supply data, the consumer will be blocked. In this context, ‘blocked’ means the processes passively wait without consuming computing resources until the operating system signals them to continue. The application remains unaware of this coordination, as the operating system handles it entirely.

Batch processing, on the other hand, involves periodically working with bounded datasets or files that have been fully written by producers. The batch process reads the data from these files, applies transformation logic, and writes the results to another data file. Program batch 2 cannot start until batch 1 has completed.

Intuitively, the transformation logic for both processing styles could be implemented similarly. Indeed, when abstracting away the difference between reading from a bounded data file and an unbounded data stream, the business logic can be identical.

Illustrative example

To illustrate this valuable insight, consider a simple example. We have a Python program, simple.py, running on a Unix-like system. This program reads numbers from standard input, multiplies them by 100 if they are less than 10, and writes the results to standard output as floating-point numbers. Invalid numbers are filtered out and sent to a separate error stream.

# simple.py
import sys

def process_numbers():
    for line in sys.stdin:
        try:
            number = float(line.strip())
            if number < 10:
                number *= 100
            print(number)
        except ValueError:
            # if the source does not contain a valid number, ignore it
            print("Invalid input:", line.strip(), file=sys.stderr)

if __name__ == "__main__":
    process_numbers()

Since Unix-like systems elegantly abstract files as persisted byte streams, we can use the same logic irrespective of reading from files or streaming input. A data file can always be redirected or streamed into the standard input pipe or from the standard output pipe again to a file:

#!/bin/bash
#batch.sh
# create input.txt, read it with simple.py and
# write result to output.txt, sort the output.txt
# numerically and finally create sorted-output.txt.
# Invalid numbers will be printed to the console

# remember to make your scripts executable with 
# chmod +x before you can directly execute them 
# from the shell like so „./batch.sh" 
cat - > input.txt
python simple.py < input.txt > output.txt
sort -n output.txt > sorted-output.txt

We’ve just set up a batch process with three steps. First, the cat program simulates an interactive source system where you input numbers via the console, pressing ENTER to separate them by new lines. Press CTRL+D to end the input and create the file input.txt. Next, data from input.txt is redirected as the standard input to simple.py, which applies the business logic and writes the results to output.txt. Finally, the sort program sorts the data from output.txt and writes the sorted results to sorted-output.txt.

Exactly the same programs can be reused in a streaming processor setup like this:

#!/bin/bash
#stream.sh
cat input.txt | python simple.py | sort -n > sorted-output.txt
#!/bin/bash
# Data can also be streamed from the console using '-' as input file
cat - | python simple.py | sort -n > sorted-output.txt

In this streaming setup, no temporary files are created. Instead, data from cat is directly streamed into simple.py, then through sort, and finally redirected to sorted-output.txt. Each number produced by cat flows directly through the chain of programs.

Blocking and stateful operators

You can monitor sorted-output.txt in a second terminal with the following command to continuously display the file’s tail, waiting indefinitely for new content to be added. To stop the program, press CTRL+C:

tail -f sorted-output.txt

In the streaming setup, you might expect the numbers to appear in sorted-output.txt as soon as you press ENTER, given the streaming nature of the process. However, until you end the stream with CTRL+D, nothing is shown by the tail command.

Why is that so?

In a streaming chain, cat and simple.py can continuously process data. However, sort is a special case because it acts as a blocking operator. It requires a bounded dataset to produce correct results since it needs to know the entire content to sort it properly. Therefore, sort needs a clear beginning and end of data before it can produce a meaningful output.

Blocking operators need to keep state and only output data when the end of the stream is reached. To verify that data is otherwise streamed properly, you can remove the sort step and have simple.py write directly to sorted-output.txt. While this will result in unsorted data, the output will appear immediately after you press ENTER in the console. Use the following script:

#!/bin/bash
#stream-nonblocking.sh
cat - | python simple.py > sorted-output.txt

Sorting is not the only blocking operator; aggregate operators with grouping and set operators like intersect and except also show this behavior. These operators need to wait until the end of the stream to produce complete and correct results. However, aggregate or set operators can provide meaningful intermediate results, incrementally updating them as new data elements are provided.

The sort operator is unique in that it requires the entire dataset to be known before it can produce meaningful output. Partial knowledge of the dataset would lead to a preliminary sort that needs constant revision and restructuring. Thus, there is no practical useful way to display incremental intermediate results for a sort, as each new data element could entirely alter the previous order. This makes any interim status not only provisional but potentially misleading and unusable.

Sorting operators face an interesting challenge when dealing with endless data streams. They would block indefinitely if the stream never ends, which isn’t practical for streaming processes with unlimited data.

If you want to observe this behavior, you can feed a very large file into the sort program to simulate an endless stream. The sort program will process until it has accumulated enough data to exhaust available memory (RAM) and temporary disk space that is used to swap out memory.

In practice, the sort program will then output a "sorted window" based on the data it has accumulated up to the point where resources are exhausted. It would indeed produce unsorted data in this case – and it won’t even warn you about this.

Hence, to make blocking operators practical, we use ‘data windows’, which are bounded sections of an endless data stream. This is essentially what batch processing achieves when dividing an endless stream into manageable, bounded data files. Each file representing a window on which blocking operators, such as sorting and aggregation, can operate effectively.

Streaming is batching and batching is streaming?

Streaming and batching can indeed be seen as complementary approaches. From a logical perspective, a batch process that continuously loops over a series of data files is functionally equivalent to a continuous stream process where the windows are defined by the boundaries of those data files. This means batch processing can be seen as a specific subset of stream processing. Consequently, business logic can be implemented independently of whether it is applied to bounded data files or unbounded data streams.

The key exception is the sort operator, which allows for specific optimizations when dealing with bounded data where the size is known before the start. These optimizations are just not feasible for endless data streams.

So to further challenge the misconception that streaming is fundamentally different from batch processing, let’s explore how we can modify our batch process to function more like a stream processor.

We defined the core concept of streaming to process data as soon as it becomes available, aiming for very low latency to produce immediate results. Obviously, a batch process that gathers data over a long period of time before writing it to a file does not meet this requirement.

But if we collect data for only a short period before writing it to data files and then process these smaller files more frequently, we move closer to the near real-time processing goal of a streaming system.

By reducing the time period even further, we create what is known as a microbatch streaming processor. This system uses batch processing with very short time intervals, effectively functioning like a streaming processor with sufficiently low latency suitable for most use cases. The popular Apache Spark processing engine implements this style of streaming.

The following example illustrates how we can transform our initial batch process into a microbatch streaming process. We simulate a very short time period before data is written using the split utility, which creates a new file every time you enter two new rows in the console (-l 2 option).

Remember that sort is a blocking operator and needs to wait for the end of the stream to produce meaningful output. To display a meaningful intermediate state, we use the uniq program to print out observed unique numbers while the batch processes the stream – the way it’s implemented is for sure inefficient but good enough to illustrate the principle.

Start the microbatch process to run in the background, waiting for your inputs, which will be written to files as soon as you record two more numbers with cat-parts.sh. Enter the string EOF and press ENTER followed by CTRL+D to stop both cat-parts.sh and the background micro-batch.sh process.

#!/bin/bash
#cat-parts.sh
# instead of using cat we use split to
# continuously write your input in max 2 rows simulating smaller batches
split -l 2 - input_
#!/bin/bash
#micro-batch.sh 
# start it with "./micro-batch.sh &amp;" to constantly run 
# in the background waiting for new files to arrive

# start tidy 
rm -f processed-* input_* output.txt

# process any data file "input_*"
keep_running=1
while [ $keep_running == 1 ]
do
  # Loop over all "input_*" files in current directory
  for file in input_*
  do
    if [ -f "$file" ]; then
      grep "^EOF" $file
      keep_running=$?
      if [ $keep_running == 0 ]; then
        mv "$file" "processed-$file"
        break
      fi

      if [ $(wc -l $file | awk '{print $1}') == 2 ]; then
        extension="${file##*_}"

        python simple.py < "$file" >> output.txt

        echo "UNIQUE NUMBERS SO FAR============="
        sort -n output.txt | uniq
        echo "=================================="
        mv "$file" "processed-$file"
      fi
    fi
  done
  sleep 1 # wait a second before next loop 
done

# finally sort over the complete input
sort -n output.txt > sorted-output.txt

In scenarios where very low latency is not required, you can also use the available streaming processor to handle data files in batches rather than continuously processing an endless stream. This approach allows for sorting optimizations that are only feasible with bounded datasets. Apache Flink, another popular stream processing engine, implemented this approach to support batch style processing.

I might provoke strong reactions from the talented developers behind both open-source streaming projects, but from a practical standpoint, there is often no significant difference for most Data Engineering use cases between the two approaches. The choice between them generally matters only in specific scenarios with stringent latency requirements.

Data windows

Now that we’ve clarified that the technical differences between streaming and batching can often be abstracted away, let’s take a closer look at how data windows can be defined in a way applicable for both data processing styles.

We defined a window as a bounded section of an endless stream of data. In batch processing we typically split an endless stream into data files with a specific window size, such as one day’s worth of data. However, this batch window mainly helps manage the complexities of processing endless streams. The transformation logic typically needs to work with more granular and differently bounded windows, which often extend beyond the boundaries of the data files.

Hence, we need to define windows independently of whether we’re using batch or stream processing and completely independent of file boundaries. Since SQL is a descriptive transformation language that abstracts away the details of data processing, let’s explore how SQL supports defining a logical data window.

SELECT {window_func} OVER (PARTITION BY {partition_key} ORDER BY {order_key})
FROM table1

In this SQL statement, window_func (such as AVG or SUM) is calculated over a window of data defined by partition_key and ordered by order_key. Both partition_key and order_key can be any attributes from table1.

Let’s see a simple example table product to demonstrate the functionality:

+------------+-------------+-----+
|PRODUCT_NAME|PRODUCT_CLASS|PRICE|
+------------+-------------+-----+
|Laptop      |Electronics  |1200 |
|Smartphone  |Electronics  |800  |
|Desk        |Furniture    |300  |
|Chair       |Furniture    |150  |
+------------+-------------+-----+

To add the class_total price for every product_class, the running_total ordered by the product_name or the grand_total across all products:

select p.*,
   sum(price) over (partition by product_class) as class_total,
   sum(price) over (partition by product_class order by product_name) as running_total,
   sum(price) over () as grand_total
from product p
---
+------------+-------------+-----+-----------+-------------+-----------+
|PRODUCT_NAME|PRODUCT_CLASS|PRICE|class_total|running_total|grand_total|
+------------+-------------+-----+-----------+-------------+-----------+
|Laptop      |Electronics  |1200 |2000       |1200         |2450       |
|Smartphone  |Electronics  |800  |2000       |2000         |2450       |
|Chair       |Furniture    |150  |450        |150          |2450       |
|Desk        |Furniture    |300  |450        |450          |2450       |
+------------+-------------+-----+-----------+-------------+-----------+

Without going into further details on window frames, which provide even more features for defining calculation windows, the principle should be clear – for more information, refer for instance to the PostgreSQL documentation on window functions. These versatile possibilities to define windows allow your application to calculate a wide range of variations based on your data, completely independent of the underlying data processing style.

Especially for stream processing, versatile calculations based on time intervals are very important. SQL supports the definition of ranges around a time attribute like this:

sum(price) over (partition by product_class
                 order by event_time
                 range between '1 day' preceding and '2 days' following)
   as sliding_price
from product

However, in stream processing we often need to define tumbling or hopping windows that cannot be defined using this SQL syntax. Apache Spark e.g. supports the following function to define even more versatile windows:

def window(timeColumn: org.apache.spark.sql.Column,
           windowDuration: String,
           slideDuration: String,
           startTime: String)

# Example calls

# Nonoverlapping or tumbling windows with just the windowDuration
# it results in time intervals like: 10:00-10:05, 10:05-10:10, 10:10-10:15
window( col("event_time"), "5 minutes" )

# Overlapping or sliding windows with slideDuration
# produces windows of 5 minutes size, every minute
# it results in time intervals like: 10:00-10:05, 10:01-10:06, 10:02-10:07
window( col("event_time"), "5 minutes", "1 minute" )

# This sliding window is identical to the above tumbling window
window( col("event_time"), "5 minutes", "5 minutes" )

# This is a window with 2 minutes offset at the start
# it results in time intervals like: 10:02-10:07, 10:03-10:08, 10:04-10:09
windows( col("event_time"), "5 minutes", "1 minute", "2 minutes" )

Practically all processing engines, whether in databases or as streaming processors, offer these sophisticated window definition capabilities or at least a significant subset thereof – many by also supporting SQL statements to be applied to the stream.

Event time and processing time

When working with endless streams of data, it’s crucial to understand how to handle time. In any data processing system, there are typically two key domains of time to consider:

  • Event time is the time when events actually occurred in reality.
  • Processing time is the time when events are observed in the system. This could be further distinguished into source processing time, which is when events are initially observed, and further downstream processing times as data flows through the chain of processes in the enterprise.

In an ideal real-time streaming scenario, event time and processing time would be the same, with events processed immediately as they occur. However, in reality, there is often a significant skew between event time and processing time, even if we aim for near real-time streaming throughout our systems. In a batch system, the processing-time lag will at least equal the batch cycle duration before the batch starts. This lag decreases as the batch progresses, bringing processing time closer to event time until the batch is finished and waits again for it’s next cycle.

Processing Time vs Event Time - Image by author
Processing Time vs Event Time – Image by author

We can avoid many correctness issues by consistently using event time in our business logic. Unfortunately, many early streaming systems did not make this distinction, leading to the misconception that streaming systems are less reliable than the seemingly accurate batch systems. This challenge even led to the development of the Lambda architecture, which aimed to combine separate batch and streaming setups. It added a supposedly less reliable streaming path to meet low latency requirements on top of a batch path as the foundation with greater reliability.

However, even batch systems can fail to correctly account for event-time windows if relevant events arrive after the batch snapshot has been extracted from the source system. If we calculate based solely on the data in the batch window, late-arriving events will be ignored, resulting in inaccurate results.

The problem with straggler events must be addressed as soon as we have defined time windows or decide to publish the information accumulated up to the current reporting date. As the skew between the event time and the processing time can vary significantly, we need a function to set a watermark (point in time) to decide on when the window shall be considered complete.

The key takeaway is that achieving accurate results doesn’t depend on the processing style – whether batch or streaming – but rather on reliable data processing and the correct application of event time and processing time in our business applications.

Exactly-once semantics

Regardless of which processing style is used, we need to ensure the reliability of our data systems. There is a second technical issue that many data processing systems don’t implement correctly but can be solved by robust exactly-once semantics.

It’s very clear that any risk of data loss in a processing pipeline is unacceptable. Therefore many systems implemented at-least-once guarantees, ensuring that no single data element get’s lost and is processed at least once. But this does not prevent another subtle issue that can lead to duplicate processing of the same data elements. If a data processor fails for any reason, get’s restarted and by accident re-executes already processed data elements, we’ll produce incorrect results. These apparently small technical issues quickly can accumulate to significant quality problems within your data systems that are unacceptable.

Reliable data processing systems therefore need to guarantee that any data element only get’s processed exactly once (or better effectively once), even in situations where systems crash because of hardware failure or any other software related errors.

Unfortunately the convenient pipe abstraction from the operating system does not help to provide such a guarantee. If a process writes data to a pipe and the consumer or the producer crashes or fails for some reason, the data may remain in an inconsistent state. If an error occurs the consumer may have only read part of the data. The same is true for writing to files – if a batch process fails to completely write the transformed source data to the target file, the data may be in an inconsistent state.

In a streaming setup without an overall system providing the important exactly-once guarantee, every program involved in the flow would therefore have to implement respective error handling. While this is possible, it transfers a large part of the technical effort to the applications. Therefore data engineering should provide the missing exactly-once semantics to the basic streaming functionality as offered by the operating system. That’s exactly what professional streaming engines offer on top of their capabilities to scale.

Even the seemingly simpler batch processing style on files does not come with built-in exactly-once guarantees. It’s up to the batch programs to either use transactions (ACID in databases) or implement idempotent processes that always produce the same output, regardless of how often and at what time they are started. Although it’s best practice to design idempotent processes, I have had to fix many batch systems that suffered from inconsistency problems caused by missing exactly-once guarantees.

Scaling to the enterprise level

We have analyzed that data engineering can provide reliable data systems offering both processing styles – batch as well as streaming – without the need for application developers to reason about the technicalities.

We have also seen that basic abstractions offered by the operating system simplify our implementation for streaming but do not provide the important exactly-once guarantee needed when working with huge volumes of data on the enterprise level.

What we still can do better to simplify the application developers work is to enable the implemented logic to scale without the need to change the business logic itself.

Vertical scaling on single systems

At the level of the single system, scaling can largely be handled by the operating system itself through the abstraction of files as redirectable byte stream pipes. Even several pipes can be used by streaming applications, if the standard pipes (input, output and error) are supplemented by named pipes. These pipes behave like files in that they exist in the file system having a name, but function like streaming pipes with full coordination logic support between producers and consumers.

In general it’s advisable to scale single systems vertically to the limits before trying to scale out horizontally using cluster solutions. As long as the data volumes can be handled by a single system, it’s way simpler to work on the well-known abstractions from the operating system. Today’s computing power in a single system actually makes it possible to process truly large amounts of data. I know of many projects where complicated cluster configurations were used because the client simply had defined this to be the standard. In most cases, they would have been better advised to use a single, more powerful computer instead.

The popular streaming engines Apache Spark and Apache Flink both provide sophisticated windowing capabilities and exactly-once guarantees for improved fault tolerance. They can also be started in local mode on a single system and perfectly scale vertically without a complicated cluster setup.

They both provide unified APIs for batch and streaming so that we can implement in the processing agnostic style described. With the Apache Beam programming model, we even have an overall unified API that can abstract from the concrete processing engines configured. By using Beam, processing engines like Spark, Flink, and others can transparently be configured as so-called ‘runners’, allowing to implement an even more decoupled and flexible architecture.

However, these solutions, for all their quality, are unfortunately relatively poorly integrated into the surrounding Unix-like system. There is actually no easy and direct way to consume data from the standard input/output that we have recognized as the main operating system abstraction for data pipes. We can work around that restriction by streaming the standard input to a local socket and reading from the local socket, but Spark will warn you with this message:

WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.

That is Spark’s way to inform you that this mode won’t support the exactly-once guarantee and thus should be avoided for a production setup. We will nevertheless use it for illustration of the concept and output to the console to see Spark Streaming in action. As a replacement for standard input/output and sockets on the enterprise level, we will later see that Apache Kafka or Pulsar can be used for horizontal scaling and also for more fault tolerant processing on a single system.

We can use the netcat (nc) tool to redirect the stream of data to the local network socket 9999 to be consumed by Spark – instead of standard input ‘-‘ any large data file can be used as well:

# Start your "source system" for interactive input of numbers
# and stream them directly to the local socket 9999
cat - | netcat -l -c -p 9999

The following Python program spark-stdin.py starts Spark in batch or streaming mode to either read from the file input.txt or from the local socket 9999, consuming the numbers that you entered in the other terminal.

#spark-stdin.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, when, window
import sys

def init_spark():
    # same logic for batch and streaming 
    spark = (SparkSession.builder 
        .appName("ReadFromStdinAndWriteToConsole")
        .getOrCreate())

    spark.sparkContext.setLogLevel("ERROR")
    return spark

def read_data_stream(spark):
    nums = (spark.readStream
        .format("socket")
        .option("host", "localhost")
        .option("port", 9999)
        .load())
    return nums

def read_data_batch(spark):
    nums = (spark.read
        .format("text")
        .load("input.txt"))
    return nums

def process_data(nums):
    result_df = (nums.withColumn("num", when(col("value").rlike("^[+-]?([0-9]*[.])?[0-9]+$"), col("value").cast("float"))
                                        .otherwise(None))
                    .filter(col("num").isNotNull())
                    .withColumn("res", expr("case when num < 10.0 then num * 100 else num end"))
                    .selectExpr("res", "current_timestamp() as event_time"))
    return result_df

def write_data_stream(df):
    query = (df.withWatermark("event_time", "10 seconds")
        .groupBy("res").count()
        .writeStream.outputMode("update")
        .option("truncate", "false")
        .format("console").start())
    query.awaitTermination()

def write_data_batch(df):
    # remember that Spark writes into directories. The actual file will be found in the directory
    (df.sort("res")
       .coalesce(1)
       .write
       .format("csv")
       .mode("overwrite")
       .save("sorted-output.txt"))

if __name__ == "__main__":
    if len(sys.argv) != 2 or sys.argv[1] not in ["STREAM", "BATCH"]:
        print(f"Usage: python {sys.argv[0]} [STREAM|BATCH]")
        sys.exit(1)

    mode = sys.argv[1]
    if mode == "STREAM":
        read_data = read_data_stream
        write_data = write_data_stream
    else:
        read_data = read_data_batch
        write_data = write_data_batch

    df = process_data(read_data(init_spark()))
    write_data(df)

Start your Spark streaming job using spark-submit for local mode – the local[*] indicates to use all CPUs available in your system:

spark-submit --master local[*] spark-stdin.py STREAM

As the respective batch process can directly read from the data file, we use different spark functions for reading and writing (read_data* and write_data*), but the identical logic for transforming the data including the final sort. Just use "BATCH" instead of "STREAM" as parameter to use it as a batch process.

To save the more complicated setup of large scale processing engines, our dynamic data engineering world actually created a new in-process system called DuckDB that currently gains traction in the community. DuckDB is designed as an analytical processing engine with database capabilities including a straightforward integration to standard pipes and a very easy setup. It can efficiently use all available resources and therefore scales perfectly on single systems for batch processing.

However, the system has not yet a stable streaming mode. But we can use our self-engineered microbatch setup to use it as a streaming system for illustration. We would need to implement exactly-once guarantee for fault tolerance in production setups and that’s of course not an easy task. But as the DuckDB development is very active and the memory management in the system perfectly addresses the needs of streaming systems, let’s hope for some more streaming support to be provided soon.

Just to demonstrate the idea of using DuckDB as the processing engine to replace simple.py, sort, uniq in our illustrative microbatch setup:

#!/bin/bash
#micro-batch-duckdb.sh 
# start it with "./micro-batch-duckdb.sh &amp;" to constantly run 
# in the background waiting for new files to arrive

# start tidy 
rm -f processed-* input_* output_* output.txt

# process any data file "input_*"
keep_running=1
while [ $keep_running == 1 ]
do
  # Loop over all "input_*" files in current directory
  for file in input_*
  do
    if [ -f "$file" ]; then
      grep "^EOF" $file
      keep_running=$?
      if [ $keep_running == 0 ]; then
        mv "$file" "processed-$file"
        break
      fi

      if [ $(wc -l $file | awk '{print $1}') == 2 ]; then
        extension="${file##*_}"

        # replace python simple.py
        duckdb -noheader -csv -cmd "SET enable_progress_bar=False;" -c 
           "select case when num < 10.0 then num * 100 else num end as num2 
            from read_csv('$file', header=False, columns={'num': 'FLOAT'},
                         auto_detect=False, ignore_errors=True, store_rejects=True);
            select 'Invalid input:'; select csv_line, error_message from reject_errors;" | 
            split -p '"Invalid input:"' - output_
        cat output_aa >> output.txt
        cat output_ab

        echo "UNIQUE NUMBERS SO FAR============="
        duckdb -noheader -csv -cmd "SET enable_progress_bar=False;" -c 
           "select distinct num 
            from read_csv('output.txt', header=False, columns={'num': 'FLOAT'},
                         auto_detect=False, ignore_errors=True, store_rejects=True)
            order by 1;"
        echo "=================================="
        mv "$file" "processed-$file"
      fi
    fi
  done
  sleep 1 # wait a second before next loop 
done

# finally sort over the complete input
duckdb -noheader -csv -cmd "SET enable_progress_bar=False;" -c 
   "from read_csv('output.txt', header=False, columns={'num': 'FLOAT'},
                 auto_detect=False, ignore_errors=True, store_rejects=True)
    order by 1;" > sorted-output.txt

Horizontal scaling with clusters

If the data volumes have grown so big that we cannot process them on single systems anymore, we can scale out horizontally using clusters. If further extended to the cloud, we practically have no limits regarding the volumes that can be handled. However, as mentioned in the chapter on single systems, it comes with additional setup and maintenance efforts. If your company completely uses cloud infrastructure or allows the hybrid use together with systems on-premises, you may be able to streamline the setup/maintenance efforts by using pre-configured IaaS/PaaS/DPaaS solutions offered by your vendors.

We recognized that the popular unified processing engines Apache Spark and Flink can be run in local mode on single systems. But these systems can also be configured to run in very large cluster setups and this works without the need to recode any of your applications. There is however the issue with the data pipelines provided as standard input/output on local systems. We do not have a direct replacement for these pipes on the enterprise level that integrates transparently in cluster systems like Kubernetes or Hadoop/YARN.

For batch processing the well-known Hadoop/HDFS offers the replacement closest to the file systems available on your single system. If you used Beam, Spark and/or Flink, you won’t need to change your code more than configuring your file paths in a system independent way.

For streaming pipelines Apache Kafka and recently also Apache Pulsar emerged as basic log-based message broker systems, that can act as a direct replacement for the Unix-like pipes to be used between nodes in the cluster. And they also provide the missing fault tolerance functionality that we identified with pipes. Unfortunately this functionality is not tightly integrated into the operating system. It won’t be possible to use Kafka pipes (or topics as they name it) as a transparent abstraction for distributed file systems like HDFS.

But as many streaming systems have interfaces to Kafka (Spark and Flink have it), it is quite easy to replace that specific part of your code. For our illustrative spark-stdin.py we would only have to change the read_data_stream function, given a Kafka system (here on localhost:9092) is up and running:

def read_data_stream(spark):
    nums = (spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "stdin")
        .load()
        )
    return nums

This would also be the preferable operating mode for streaming on single systems, as we have seen that the built-in Unix-like pipes and sockets do not provide any fault-tolerance. We can instead use Apache Kafka or Pulsar to get pipelines that scale well and also support the exactly-once semantics implemented in streaming engines like Apache Spark.

The full potential of these systems can be exploited when used in cluster setups by scaling horizontally and thereby providing extended fault tolerance and improved network efficiency.

To scale the business application from a single system to a distributed cluster setup, we would only have to change the kafka.bootstrap.servers option. Of course, managing a horizontally scaling cluster is much more complicated and requires the configuration of security, further partitioning of topics, monitoring, high availability, network and latency considerations, to name a few.

But if you designed your application in the processing style agnostic way described, you won’t have to change anything in your business logic and both processing styles can still be combined to allow for a true unified processing of data on enterprise level.


We have covered a lot of ground and hopefully this helps to see the end-to-end possibilities and the benefit of combining batch and stream processing.

Unix-like systems did a great job on abstracting away the differences between files and pipes to allow for a processing style agnostic implementation of business logic. We should retain this simplifying abstraction when we scale our systems to the enterprise level and add exactly-once semantics to further increase fault tolerance and reliability in our systems.

This setup is also a perfect architectural fit for the data infrastructure in the adapted Data Mesh as described in my three-part series "Challenges and Solutions in Data Mesh". Such a data infrastructure needs to scale out horizontally and at the same time abstract whether data is processed as batch or in a streaming way. This greatly simplifies the task of applications to make data available to the public, which can then be used completely transparently for low or more tolerant latency requirements of consumers.

Challenges and Solutions in Data Mesh


If you find this information useful, please consider to clap. I would be more than happy to receive your feedback with your opinions and questions.


Related Articles