The world’s leading publication for data science, AI, and ML professionals.

Mastering Data Streaming in Python

Best Practices for Real-Time Analytics

AI-generated image using Kandinsky
AI-generated image using Kandinsky

In this article, I will address the key challenges data engineers may encounter when designing streaming data pipelines. We’ll explore use case scenarios, provide Python code examples, discuss windowed calculations using streaming frameworks, and share best practices related to these topics.

In many applications, having access to real-time and continuously updated data is crucial. Fraud detection, churn prevention and recommendations are the best candidates for streaming. These data pipelines process data from various sources to multiple target destinations in real time, capturing events as they occur and enabling their transformation, enrichment, and analysis.


Streaming data pipeline

In one of my previous articles, I described the most common data pipeline design patterns and when to use them [1].

Data pipeline design patterns

A data pipeline is a sequence of data processing steps, where each stage’s output becomes the input for the next, creating a logical flow of data.

A data pipeline exists whenever data is processed between two points, such as from source to destination

The three key components of a data pipeline are the source, the processing step(s), and the destination. For example, data extracted from an external API (source) can be loaded into a data warehouse (destination), illustrating a common scenario where the source and destination are distinct.

In streaming, the source is typically a publisher service, while the destination is a consumer – such as an application or another endpoint – of the processed data. This data often undergoes transformations using windowed calculations. A great example would be a session window defined by an inactivity period following the last event (Google Analytics 4, etc).

Conceptual data pipeline design. Image by author
Conceptual data pipeline design. Image by author

Actual application example

Consider a simple stream data processing application example built with Python, Kafka and Faust below. High-level application logic would be the following:

  1. API service app/app_main.py allows to POST valid user engagement events to Kafka producer topic. These events can be collected either from the website, a mobile application or sent by another service such as a data publisher of some sort.
{
    "event_type": "page_view",
    "user_id": "e659e3e7-22e1-4a6b",
    "action": "alternative_handset",
    "timestamp": "2024-06-27T15:43:43.315342",
    "metadata": {
        "session_id": "4b481fd1-9973-4498-89fb",
        "page": "/search",
        "item_id": "05efee91",
        "user_agent": "Opera/8.81.(X11; Linux x86_64; hi-IN) Presto/2.9.181 Version/12.00"
    }
}

Event validation can be performed by pydantic and accepts events with valid types and actions, etc. [2]

Our application constantly consumes processed events from consumer topics and sends them to WebSocket so real-time processing can be visualized.

Python for Data Engineers

  1. Data processing service consumes raw events from producer topic, then applies a window calculation (tumbling table) and sends aggregated results by user to consumer topic every 10 seconds. This can be some sort of a streaming framework like kafka-streams or faust .
  2. This stream of constantly processed data can be instantly consumed by API service and visualized at localhost:8000/monitor.

For example, we can process raw user engagement events every 10 seconds to generate a user leaderboard based on a simple count of events per user

Streaming leaderboard app example. Image by author.
Streaming leaderboard app example. Image by author.
Python">import os
import json
import logging
import asyncio
import uvicorn
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, HTTPException
from fastapi.staticfiles import StaticFiles
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from app.models import Event

from .http import events, users
from dotenv import load_dotenv
load_dotenv()

logging.basicConfig(level=logging.INFO)

# kafka_brokers = os.getenv("REDPANDA_BROKERS")
kafka_brokers = (
    "redpanda-0:9092"
    if os.getenv("RUNTIME_ENVIRONMENT") == "DOCKER"
    else "localhost:19092"
)
consumer_topic = os.getenv("CONSUMER_TOPIC")
producer_topic = os.getenv("PRODUCER_TOPIC")
error_topic = os.getenv("ERROR_TOPIC")

def kafka_json_deserializer(serialized):
    return json.loads(serialized)

@asynccontextmanager
async def startup(app):
    app.producer = AIOKafkaProducer(
        bootstrap_servers=[kafka_brokers],)
    await app.producer.start()

    app.consumer = AIOKafkaConsumer(
        consumer_topic,
        # "agg-events",
        # group_id="demo-group",
        # loop=loop,
        bootstrap_servers=[kafka_brokers],
        enable_auto_commit=True,
        auto_commit_interval_ms=1000,  # commit every second
        auto_offset_reset="earliest",  # If committed offset not found, start from beginning
        value_deserializer=kafka_json_deserializer,
    )
    await app.consumer.start()

    yield

app = FastAPI(lifespan=startup)
app.mount("/static", StaticFiles(directory="static"), name="static")

app.include_router(events.router)
app.include_router(users.router)

# WebSocket endpoint
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    async def send_message_to_websocket(msg):
        text = str(msg.value)
        await websocket.send_text(text)

    async def consume_from_topic(topic, callback):
        print(f"Consuming from {topic}")
        async for msg in app.consumer:
            print(f"Received message: {msg.value}")
            await callback(msg)

    # Start consuming
    asyncio.create_task(consume_from_topic(consumer_topic, send_message_to_websocket))

    # Keep the connection open
    while True:
        await asyncio.sleep(3)

@app.post("/track")
async def send_event_to_topic(event: Event):

    try:
        data = event.model_dump_json()
        data = data.encode()

        # # Validate the presence of required fields
        # We could do something like this but Pydantic will do
        # everything for us.
        # if "user_id" not in data or "action" not in data:
        #     raise HTTPException(
        #         status_code=422, detail="Incomplete data provided")
        user_id = event.user_id

        # Send filename to Redpanda
        await app.producer.send(producer_topic, data)
        # Returning a confirmation message
        return {"message": "User data submitted successfully!",
                "user_data": {"user_id": user_id}}

    except HTTPException as e:
        # Re-raise HTTPException to return the specified
        # status code and detail
        print(e)
        raise e
    except Exception as e:
        # Handle other unexpected exceptions and return a
        # 500 Internal Server Error
        print(e)
        raise HTTPException(
            status_code=500, detail=f"An error occurred: {str(e)}")

if __name__ == "__main__":
    uvicorn.run("app.app_main:app", host="0.0.0.0", port=8000)

# Run:
# uvicorn app.app_main:app --reload --host 0.0.0.0 --port 8000
# python -m app.app_main api -l info

The code for the stream-processing service can be found further down below.

Kafka, Kinesis, RabbitMQ and other stream-processing tools

Let’s take a look into popular data Streaming platforms and frameworks that proved themselves most useful over the last couple of years.

  • Apache Spark – a framework for distributed data computing for large-scale analytics and complex data transformations.
  • Apache Kafka – a real-time data pipeline tool with a distributed messaging system for applications. It uses a publish-subscribe model where producers send data to topics, and consumers pull data from those topics. Each topic is split into partitions that are replicated across different servers for better availability and to balance the load. Plus, Kafka has built-in fault tolerance, so you can set the replication factor and how many in-sync replicas (ISRs) you want for each topic. This means your data stays accessible, even if some servers go down.
  • AWS Kinesis is a real-time streaming platform for analytics and applications. I previously wrote about it here [3].

Building a Streaming Data Pipeline with Redshift Serverless and Kinesis

  • Google Cloud Dataflow – Google’s streaming platform for real-time event processing and analytics pipelines.
  • Apache Flink – a distributed streaming data platform designed for low-latency data processing.
  • RabbitMQ is an open-source message broker that facilitates communication between applications. It uses a queuing system based on the Advanced Message Queuing Protocol (AMQP), allowing you to send, receive, and route messages efficiently. RabbitMQ helps decouple application components, enabling them to work independently and scale easily.

Kafka is one of my favourite distributed streaming platforms that lets you publish and subscribe to data streams, process them in real time, and store them reliably. It was built for Java but is now also available for Python developers (kafka-python)).

One thing I like about it is a built-in window methods that simplify session calculations.

For example, using a faust-streamingframework this can be achieved with ease. Consider this code below. It demonstrates how our application would calculate a windowed aggregation for each user:


import os
import random
from datetime import datetime, timedelta
import faust
SINK = os.getenv("CONSUMER_TOPIC")
TOPIC = os.getenv("PRODUCER_TOPIC")

BROKER = (
    "redpanda-0:9092"
    if os.getenv("RUNTIME_ENVIRONMENT") == "DOCKER"
    else "localhost:19092"
)

TABLE = "tumbling-events"
CLEANUP_INTERVAL = 1.0
WINDOW = 10  # 10 seconds window
WINDOW_EXPIRES = 10
PARTITIONS = 1

app = faust.App("event-stream", broker=f"kafka://{BROKER}")

app.conf.table_cleanup_interval = CLEANUP_INTERVAL
source = app.topic(TOPIC, value_type=Event)
sink = app.topic(SINK, value_type=UserStats)

@app.timer(interval=3.0, on_leader=True)
async def generate_event_data():
    events_topic = app.topic(TOPIC, key_type=str, value_type=Event)
    allowed_events = [e.value for e in AllowedEvents]
    allowed_actions = [e.value for e in AllowedActions]

    # Create a loop to send data to the Redpanda topic
    # Send 20 messages every time the timer is triggered (every 5 seconds)
    for i in range(20):
        # Send the data to the Redpanda topic

        await events_topic.send(
            key=random.choice(["User1", "User2", "User3", "User4", "User5"]),
            value=Event(
                event_type=random.choice(["page_view", "scroll"]),
                user_id=random.choice(["User1", "User2", "User3", "User4", "User5"]),  # noqa: E501
                action=random.choice(["action_1", "action_2"]),
                timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),

            ),
        )

    print("Producer is sleeping for 3 seconds 😴 ")

def window_processor(key, events):
    try:
        timestamp = key[1][0]  # key[1] is the tuple (ts, ts + window)
        print(f'key:: {key}')

        users = [event.user_id for event in events]
        count = len(users)

        user_counter = Counter(event.user_id for event in events)
        for i, (user, count) in enumerate(user_counter.items()):
            print(f"{i}. {user}: {count}")
            aggregated_event = UserStats(
                timestamp=timestamp, user_id=user, count=count
            )
            print(
                f"Processing window: {len(users)} events, Aggreged results: {aggregated_event}"  # noqa: E501
            )
            sink.send_soon(value=aggregated_event)

    except Exception as e:
        print(e)

tumbling_table = (
    app.Table(
        TABLE,
        default=list,
        key_type=str,
        value_type=Event,
        partitions=PARTITIONS,
        on_window_close=window_processor,
    )
    .tumbling(WINDOW, expires=timedelta(seconds=WINDOW_EXPIRES))
    .relative_to_field(Event.timestamp)
)

@app.agent(app.topic(TOPIC, key_type=str, value_type=Event))
async def calculate_tumbling_events(events):
    async for event in events:
        value_list = tumbling_table["events"].value()
        value_list.append(event)
        tumbling_table["events"] = value_list

if __name__ == "__main__":
    app.main()

Now type this in the command line to launch our steaming module:

faust -A streamer.event_stream worker -l info

As a result, we should see event aggregation happening in real time:

Image by author
Image by author

Choosing the right Python client

Some of the popular Python clients for Kafka are confluent-kafka, pykafka and kafka-python.

confluent-kafka is a Python wrapper for the high-performance librdkafka C library, offering full Kafka feature support along with extras like AVRO serialization and schema registry integration. It includes both high-level and low-level APIs for greater flexibility and control over your Kafka applications.

Big Data File Formats, Explained

pykafka and kafka-python clients are the most widely used and were designed with performance and simplicity in mind. They support all Kafka features and include extras like balanced consumers, Zookeeper integration, and partition management.

Each Python library has its own pros and cons.

Simplicity of use can be important for beginner-level practitioners. This would be all about documentation, code readability, API design, and error handling. Pykafka is often considered the simplest, with a clear API, good documentation, and user-friendly error messages. Conversely, kafka-python and confluent-kafka-python tend to be less simple, featuring more complex APIs, less comprehensive documentation, and less clear error messages.

On the other hand, confluent-kafka-python and kafka-python provide the most comprehensive Kafka feature support, covering all Kafka capabilities and offering both high-level and low-level APIs. In contrast, pykafka has limited feature support and only provides a high-level API. Generally, confluent-kafka-python offers the best performance due to its foundation on the optimized librdkafka C library, while kafka-python and pykafka have lower performance because they are pure Python implementations with more overhead.

You can install kafka-python using pip: pip install kafka-python and our application file would look like this:

# app.py

from kafka import KafkaProducer, KafkaConsumer

import json
producer = KafkaProducer(bootstrap_servers=['brokerA:9092', 'brokerB:9092'],
                         key_serializer=lambda k: json.dumps(k).encode(),
                         value_serializer=lambda v: json.dumps(v).encode())

# Send data to Kafka topic 'my_topic'
future = producer.send('my_topic', key='hello', value='world')
try:
    result = future.get(timeout=10)
    print(result)
except Exception as e:
    print(e)

To consume data from a topic we would want to use poll method. It will return a list of topic partitions with our JSON records:

import json
consumer = KafkaConsumer(bootstrap_servers=['brokerA:9092', 'brokerB:9092'],
                         group_id='my-group',
                         topics=['my_topic'],
                         key_deserializer=lambda k: json.loads(k.decode()),
                         value_deserializer=lambda v: json.loads(v.decode()),
                         auto_offset_reset='earliest',
                         enable_auto_commit=True,
                         max_poll_records=10)

for record in consumer:
    print(record.key, record.value)

Kafka Best Practices

So how do we improve our Kafka application reliability and performance?

There is no universal solution for optimizing Kafka applications

The performance and reliability of our Kafka applications can be affected by various factors, including the following:

  • configuration and number of producers and consumers,
  • message size and frequency,
  • network bandwidth and latency,
  • hardware and software resources,
  • potential failure scenarios.

Typically, these are the general recommendations for fault-tolerant and performant design:

Use Replication: Replication creates multiple data copies across brokers, ensuring accessibility during failures and improving performance through load balancing. However, it can lead to increased disk usage and network traffic. Set your replication factor and the number of in-sync replicas (ISRs) based on your availability and consistency requirements.

Enable Batching: Batching groups multiple messages into a single request, reducing network overhead and increasing throughput. It also improves compression efficiency. Enable batching on both producers and consumers, adjusting the batch size and linger time to meet your latency and throughput goals.

Partition it wisely: The number of partitions for each topic affects scalability, parallelism, and performance. More partitions allow for greater producer and consumer capacity, distributing load effectively across brokers. However, excessive partitions can increase network traffic. Thus, select partition counts based on your expected throughput and latency.

Enable Compression: Compression reduces message size. It’s as simple as that. It can reduce disk space usage and send data transfer. While it incurs some CPU overhead for processing, selecting your compression algorithm and level is considered the best practice as well.

Error handling

Python Kafka clients offer a range of methods and options for managing errors and exceptions, including:

  • Error Handling Strategies: Strategies relate to retrying, ignoring, or failing fast on errors. Data engineers can determine how to address different error types while ensuring reliability and data consistency. For example, parameters such as max_in_flight_requests_per_connectio ,retriesand retry_backoff_ms can be used for the kafka-python producer. In the confluent-kafka-python consumer, we could use parameters like enable.auto.commit, auto.commit.interval.ms, and enable.auto.offset.store.
  • Exception Handling: The try, except, and finally blocks can be employed to handle various exception types and execute cleanup or recovery procedures. Standard Python exception handling. For example, there is a KafkaError class in the kafka-python and confluent-kafka-python clients. In the pykafka client we have PyKafkaException class.
  • Error Callbacks: Callbacks can be useful when we need to log errors, raise exceptions, or implement recovery actions. Callback functions can be assigned to the constructors of producers and consumers, triggering whenever an error or exception occurs at the client level. For instance, the error_cb parameter is available for both the kafka-python and confluent-kafka-python clients, while the on_error parameter is used in the pykafka client.

For example, one of the most common error-handling patterns is a dead-letter queue. Events from the source topic can diverge into two separate paths:

  1. The application successfully processes each event in the source topic and publishes them to the target topic (a sink) if everything is okay.
  2. Events that cannot be processed – such as those lacking the expected format or missing required attributes – are directed to the error topic.

Another useful error-handling pattern is to implement a retry queue. For instance, events could be routed to the retry topic if the item’s parameter is unavailable at that moment. For example, it can happen if this feature is being processed by another service and it wasn’t available at the time of request.

This recoverable condition should not be classified as an error

Instead, it should be periodically retried until the required conditions are satisfied. Introducing a retry topic allows for immediate processing of most events while deferring the handling of certain events until the necessary conditions are fulfilled.

Conclusion

Streaming techniques (tumbling and hopping windows, sessions, etc.) combined with Python is a versatile solution. Tools like Kafka help data engineers create streaming applications that can power real-time analytics, messaging, and event-driven systems. For example, Kafka’s key benefits include high throughput and scalability making it extremely powerful for its purpose. Kafka processes millions of messages per second with low latency by using batch writing and reading, reducing disk I/O and network overhead, and leveraging data compression. It can scale both horizontally by adding brokers and vertically by increasing partitions and replicas. It supports consumer groups to distribute workloads efficiently. Kafka ensures data availability even during failures through a leader-follower model, where followers replicate data. If a leader fails, a follower takes over, and Kafka’s commit log allows for durable and replayable data storage.

Choosing the right Python library would be the first thing to start. If performance metrics like throughput, latency, CPU usage, and memory consumption matter most to you then go for confluent-kafka-python. If you are looking for simplicity then pykafka or kafka-python would be a better solution.

Adequate error handling and designing our streaming application in a way that maximizes the latency, throughput and availability is another thing to consider. I hope that following the tips from this story will help you to configure your Kafka cluster the right way and enable the optimal settings for replication, partitioning, batching and compression.

Streaming in Data Engineering

Recommended read

[1] https://towardsdatascience.com/data-pipeline-design-patterns-100afa4b93e3

[2] https://towardsdatascience.com/python-for-data-engineers-f3d5db59b6dd

[3] https://towardsdatascience.com/streaming-in-data-engineering-2bb2b9b3b603

[4] https://towardsdatascience.com/streaming-in-data-engineering-2bb2b9b3b603

[5] https://medium.com/towards-data-science/big-data-file-formats-explained-275876dc1fc9


Related Articles