πŸ’‘Mike Shakhomirov, Author at Towards Data Science https://towardsdatascience.com/author/mshakhomirov/ The world’s leading publication for data science, AI, and ML professionals. Thu, 23 Jan 2025 06:44:24 +0000 en-US hourly 1 https://wordpress.org/?v=6.7.1 https://towardsdatascience.com/wp-content/uploads/2025/02/cropped-Favicon-32x32.png πŸ’‘Mike Shakhomirov, Author at Towards Data Science https://towardsdatascience.com/author/mshakhomirov/ 32 32 Building Scalable Data Platforms https://towardsdatascience.com/building-scalable-data-platforms-6621c9bde515/ Sun, 01 Sep 2024 16:20:58 +0000 https://towardsdatascience.com/building-scalable-data-platforms-6621c9bde515/ Data Mesh trends in data platform design

The post Building Scalable Data Platforms appeared first on Towards Data Science.

]]>
In this article, I aim to delve into the various types of data platform architectures, taking a better look at their evolution, strengths, weaknesses, and practical applications. A key focus will be the Data Mesh architecture, its role in Modern Data Stack (MDS) and today’s data-driven landscape.

It’s a well-known fact that the architecture of a data platform profoundly affects its performance and scalability. The challenge often lies in selecting an architecture that best aligns with your specific business needs.

Given the overwhelming multitude of data tools available in the market today, it’s easy to get lost. The Internet articles I see now and then on this topic are often highly speculative. Questions about which tools are best, who leads the industry, and how to make the right choice can be very frustrating. This story is for data practitioners who would like to learn more about data platform design and which one to choose in each scenario.

Modern data stack

I keep hearing this term on almost every data-related website on the Internet. Every single LinkedIn data group offers a dozen of posts on this topic. However, the majority of those cover just the data tools and don’t emphasize the importance of strategy considerations related to the data platform design process.

So what is a "Modern Data Stack" and how modern is it?

In essence, it’s a set of tools designed to help you work with data. Depending on your business goals, these tools might include managed ETL/ELT data connectors, Business Intelligence (BI) solutions, data modelling tools (Dataform, DBT, etc.) and, at times, the focus isn’t necessarily on how modern these tools are, but on how effectively they meet your needs.

As data flows through the pipeline it is processed, ingested into the data warehouse or a data lake, transformed and visualised. All these steps help the decision makers to get data sights and activate on them promptly – whether it’s a user retention story [1] or a fraud detection algorithm in a banking system.

Modern data tools and data flow. Image by author.
Modern data tools and data flow. Image by author.

I previously described how to activate on real-time user engagement data in this story:

User Churn Prediction

Scalability

Modern Data Stack (MDS) is all about simplicity, convenience for end users and scalability. Indeed, it doesn’t matter how great your BI tool features are if your major stakeholders don’t like it or don’t know how to use it. Among other great features, it may have Git integration and robust CI/CD [2] capabilities for data pipelines but ultimately it doesn’t matter if can’t render your dashboard or report into a PDF document. Data modelling is important and we create data models using modern tools like DBT and Dataform (GCP). Don’t repeat yourself – use the DRY [2] approach everywhere you can.

Data modelling is useless if it’s done in a wrong way.

These data models must be reusable and unit-tested. According to DBT

DRY is a software development principle that stands for "Don’t Repeat Yourself." Living by this principle means that your aim is to reduce repetitive patterns and duplicate code and logic in favor of modular and referenceable code.

DRY approach not only saves you time and effort but also enhances the maintainability and scalability of your code. Perhaps most importantly, mastering this principle can be the key factor that elevates you from being a good Analytics engineer to a truly exceptional one. This principle is something I consistently aim to achieve. It makes my code reusable and reliable.

Data Lake vs Data Warehouse vs Data Mesh

In my experience, it was never just a single type of data platform architecture. Data lakes are a better fit for users with coding skills and usually is an ideal choice for unstructured and semi-structured data processing. Images, text documents, voice messages, etc. – everything can be stored in the data lake and therefore, can be processed. This makes this architecture extremely useful.

Dealing with Big Data is usually easier within data lakes and makes it the top choice for extra large data volumes.

The downturn is that our data users must be capable of working with one of the popular data lake tools – Dataproc, EMR, Databricks or Galaxy. Usually, companies struggle to find good data developers with great Python, SQL and data modelling skills. This is what makes a Data Engineer role so popular [2].

How to Become a Data Engineer

These distributed computing tools adapt well to growing data workloads and scales to ensure the data is processed by the time it is needed.

For companies with fairly simple data pipeline designs and adequate data volumes a hybrid architecture that blends elements of both data warehouses and data lakes might be the optimal choice.

It combines the advantages of each data platform architecture and in my projects, I use it most often. Data analysts are happy as it offers the ability to perform interactive SQL queries while maintaining a high level of flexibility for customization in case my data pipelines need to scale the processing resources. Modern data warehouse solutions often support interactive queries on data stored in a data lake, such as through external tables. For example, a data pipeline might be designed as follows:

Lake house pipeline example. Image by author.
Lake house pipeline example. Image by author.

It helps a lot with data migration as well. Indeed, any data model can be built on top of the data lake instead. Tools like DBT and Dataform still use ANSI-SQL dialect in the end and the migration task would be a simple change in the data adaptor.

Data Mesh platforms are designed to support the ability to share data across different departments seamlessly. This is a typical use-case scenario for companies going through a merger or acquisition process. Indeed, consider a company with a data warehouse where data is ingested from the data lake and the being processed using DBT to visualise the reports in Looker. Now consider it is being acquired by a bigger company whose data stack is mainly a data lake with streaming components. Data is being processed in Airflow pipelines (batch) and in real-time standalone data services (streaming).

Data Mesh helps to integrate different platforms and data domains.

A data mesh architecture represents a decentralized approach to data management that empowers your company to handle data autonomously and conduct cross-team and cross-domain analyses.

In a data mesh setup, each business unit may possess a diverse set of programming skills, such as SQL or Python, and have varying data processing needs, from flexible data handling to interactive SQL queries. As a result, each unit has the freedom to select its preferred data warehouse or data lake solution tailored to its specific requirements. Despite these individual choices, the architecture facilitates seamless data sharing across units without the need for data movement, ensuring that data remains accessible and integrated across the organization.

Easier said than done.

In one of my previous stories, I looked into the evolution of Data Engineering [3].

How Data Engineering Evolved since 2014

We can witness that the data engineering landscape has significantly transformed since 2014 (when Airflow was introduced to a wider community), now addressing more sophisticated use cases and requirements, including support for multiple programming languages, integrations, and enhanced scalability.

This collaborative "Data Mesh" setup is crucial as it allows each team to operate within the same environment while avoiding disruptions to each other’s workflows, thus fostering a more integrated and efficient approach to data handling and analysis.

My research underscores the importance of several key trends in data pipeline orchestration. These include a focus on heterogeneity, support for a range of programming languages, effective utilization of metadata, and the adoption of data mesh architecture. Embracing these trends is crucial for developing data platforms that can adapt to diverse needs and scale efficiently.

Heterogenity is crucial for creating modern, robust, and scalable data platforms.

Implementing a Data Mesh architecture

The easiest way to implement a data mesh is to follow these principles:

  • Use microservices: Data Mesh is distributed by its definition, it’s not monolithic. Creating a dedicated API service/orchestrator makes perfect sense. It can act as a data hub for all other data processing services. They can invoke the data hub service whenever needed and vice versa.
  • Split data environments: Creating separate environments for development, production and automated testing is important. It simplifies the new data model and pipeline deployments and makes our code tested and reusable.
  • Consider the "DRY" approach always: Using other data projects and their repositories as packages makes life easier.
  • Use infrastructure as code: This is a must and helps to maintain our pipelines [4]. Infrastructure as code is becoming an increasingly popular approach for managing data platform resources. Among software engineers, it is pretty much a standard these days.

Infrastructure as Code for Beginners

Consider the example below and how I implemented Data Mesh using the Mage orchestrator tool. I’ve created a service responsible for the data orchestration of all my data pipelines from other data projects I have.

For instance, it is implemented in one of my data pipelines where it pulls another GitHub repository with a DBT project in it. This project then becomes a package that I use in my orchestrator:

Mage Data Mesh. Image by author.
Mage Data Mesh. Image by author.

Then if I run my pipeline it will install the erd project as a dependency:

Dependency. Image by author.
Dependency. Image by author.

This approach helps me to use other DBT projects in my Data Hub project. Every time I run the data pipeline execution it pulls the code from package repositories to ensure everything is up-to-date.

How do we deploy it?

Okay, we ran it locally but how do we deploy it?

While this task can be complex, employing infrastructure as code can significantly enhance the scalability and maintainability of the code. I previously covered this topic in a tutorial here [5].

Orchestrate Machine Learning Pipelines with AWS Step Functions

This represents a typical data flow or data platform setup for many companies. The real challenge lies in deploying data pipelines and managing the associated resources effectively. For example, deploying a machine learning pipeline across production and staging environments involves careful resource management.

I deployed my Data Hub project using Terraform [6]. A simple command like the one below will do the job:

terraform init
terraform plan -out=PLAN_dev
terraform apply -input=false PLAN_dev

The output will be something like this:

Apply complete! Resources: 16 added, 0 changed, 0destroyed.

Outputs:

load_balancer_dns_name = "http://datahub-dev-alb-123.eu-west-1.elb.amazonaws.com"

As you can see infrastructure as code makes it really easy to deploy the required resources including VPCs, subnets, load balancers and others.

For instance, my Terraform folder structure where I describe the resources I need will look like this:

.
β”œβ”€β”€ PLAN_dev
β”œβ”€β”€ alb.tf
β”œβ”€β”€ backend.tf
β”œβ”€β”€ efs.tf
β”œβ”€β”€ env_vars.json
β”œβ”€β”€ iam.tf
β”œβ”€β”€ main.tf
β”œβ”€β”€ provider.tf
β”œβ”€β”€ variables.tf
└── versions.tf

My main terraform definition file main.tf looks like this. It creates the ECS cluster, the task definition and all other required resources:


resource "aws_ecs_cluster" "aws-ecs-cluster" {
  name = "${var.app_name}-${var.env}-cluster"

  setting {
    name  = "containerInsights"
    value = "enabled"
  }

  tags = {
    Name        = "${var.app_name}-ecs"
    Environment = var.env
  }
}

// To delete an existing log group, run the cli command:
// aws logs delete-log-group --log-group-name app-name-production-logs
resource "aws_cloudwatch_log_group" "log-group" {
  name = "${var.app_name}-${var.env}-logs"

  tags = {
    Application = var.app_name
    Environment = var.env
  }
}

resource "aws_ecs_task_definition" "aws-ecs-task" {
  family = "${var.app_name}-task"

  container_definitions = <<DEFINITION
  [
    {
      "name": "${var.app_name}-${var.env}-container",
      "image": "${var.docker_image}",
      "environment": [
        {"name": "ENV", "value": "dev"},
        {"name": "MAGE_EC2_SUBNET_ID", "value": "${var.mage_ec2_subnet_id}"}
      ],
      "command": [
        "mage", "start", "datahub"
      ],
      "essential": true,
      "mountPoints": [
        {
          "readOnly": false,
          "containerPath": "/home/src",
          "sourceVolume": "${var.app_name}-fs"
        }
      ],
      "logConfiguration": {
        "logDriver": "awslogs",
        "options": {
          "awslogs-group": "${aws_cloudwatch_log_group.log-group.id}",
          "awslogs-region": "${var.aws_region}",
          "awslogs-stream-prefix": "${var.app_name}-${var.env}"
        }
      },
      "portMappings": [
        {
          "containerPort": 6789,
          "hostPort": 6789
        }
      ],
      "cpu": ${var.ecs_task_cpu},
      "memory": ${var.ecs_task_memory},
      "networkMode": "awsvpc",
      "ulimits": [
        {
          "name": "nofile",
          "softLimit": 16384,
          "hardLimit": 32768
        }
      ],
      "healthCheck": {
        "command": ["CMD-SHELL", "curl -f http://localhost:6789/api/status || exit 1"],
        "interval": 30,
        "timeout": 5,
        "retries": 3,
        "startPeriod": 10
      }
    }
  ]
  DEFINITION

  requires_compatibilities = ["FARGATE"]
  network_mode             = "awsvpc"
  memory                   = var.ecs_task_memory
  cpu                      = var.ecs_task_cpu
  execution_role_arn       = aws_iam_role.ecsTaskExecutionRole.arn
  task_role_arn            = aws_iam_role.ecsTaskExecutionRole.arn

  volume {
    name = "${var.app_name}-fs"

    efs_volume_configuration {
      file_system_id     = aws_efs_file_system.file_system.id
      transit_encryption = "ENABLED"
    }
  }

  tags = {
    Name        = "${var.app_name}-ecs-td"
    Environment = var.env
  }

  # depends_on = [aws_lambda_function.terraform_lambda_func]
}

data "aws_ecs_task_definition" "main" {
  task_definition = aws_ecs_task_definition.aws-ecs-task.family
}

resource "aws_ecs_service" "aws-ecs-service" {
  name                 = "${var.app_name}-${var.env}-ecs-service"
  cluster              = aws_ecs_cluster.aws-ecs-cluster.id
  task_definition      = "${aws_ecs_task_definition.aws-ecs-task.family}:${max(aws_ecs_task_definition.aws-ecs-task.revision, data.aws_ecs_task_definition.main.revision)}"
  launch_type          = "FARGATE"
  scheduling_strategy  = "REPLICA"
  desired_count        = 1
  force_new_deployment = true

  network_configuration {
    # subnets          = aws_subnet.public.*.id
    subnets          = var.public_subnets
    assign_public_ip = true
    security_groups = [
      aws_security_group.service_security_group.id,
      aws_security_group.load_balancer_security_group.id
    ]
  }

  load_balancer {
    target_group_arn = aws_lb_target_group.target_group.arn
    container_name   = "${var.app_name}-${var.env}-container"
    container_port   = 6789
  }

  depends_on = [aws_lb_listener.listener]
}

resource "aws_security_group" "service_security_group" {
  # vpc_id = aws_vpc.aws-vpc.id
  vpc_id = var.vpc_id
  ingress {
    from_port       = 6789
    to_port         = 6789
    protocol        = "tcp"
    cidr_blocks     = var.allowed_ips
    security_groups = [aws_security_group.load_balancer_security_group.id]
  }

  egress {
    from_port        = 0
    to_port          = 0
    protocol         = "-1"
    cidr_blocks      = ["0.0.0.0/0"]
    ipv6_cidr_blocks = ["::/0"]
  }

  tags = {
    Name        = "${var.app_name}-service-sg"
    Environment = var.env
  }
}

Read more about it here [7].

The main project has a data environment split based on ENV environment variables used in all my projects. Running the DBT model from a package repository will look like this:

--select my_second_dbt_model --target {{ env_var('ENV') }}

This is what truly makes it powerful. Now, data analysts and data scientists across different departments can collaborate seamlessly, running SQL, Python, and R in a unified environment. This integration allows them to focus their development efforts together, streamlining their workflows and boosting productivity.

This example highlights the integration of different data domains.

The next steps

The next steps would be to expand this approach further and start to integrate data connectors and pipelines into one place. Modern data-driven applications require a robust transactional database to manage current application data. When implementing such applications, consider utilizing OLTP and RDS architectures. Data Mesh helps to integrate these resources as well. In our example, we would want to create a Python data connector that would extract data from RDS.

Each component in the data platform ecosystem – data lakes, data warehouses, lake houses, and databases – offers unique advantages and serves distinct purposes but it is rarely just the one.

Generally, the optimal choice will depend on cost efficiency and compatibility with your development stack.

Testing various options can reveal how well a data source or a project integrates into your data platform, whether it’s a data lake or a data warehouse. Numerous data connectors can be managed with easy in a Data Mesh tool to facilitate seamless data extraction, regardless of the underlying architecture.

However, several considerations are crucial:

  • Alignment with Business Needs: Evaluate how well data tools align with your specific business requirements. For instance, some business intelligence (BI) tools may have a pay-per-user pricing model, which might not be ideal for sharing dashboards with external users.
  • Functionality Overlap: Assess whether there is an overlap in functionality between tools. For example, determine if you need a BI solution that performs data modelling within its own OLAP cube when this is already handled by your data warehouse.
  • Cost Efficiency: If cost savings are a priority, it may be advantageous to choose data tools that are provided by the same cloud vendor as your development stack.
  • Data Modeling Importance: Efficient data modelling is crucial, as it affects the frequency of data processing and subsequently affects processing costs.

The choice between a data lake and a data warehouse often hinges on the skill set of your users. A data warehouse solution typically offers greater interactivity and is suited for SQL-centric products such as Snowflake or BigQuery. Conversely, data lakes are ideal for users with programming expertise, making Python-focused platforms like Databricks, Galaxy, Dataproc, or EMR more suitable.

Conclusion

In the era of Data Mesh, successful collaboration is crucial for thriving in the data domain. Modern Data Stack stems from our data platform architecture which plays the foundational role here. Simply put, a modern data stack is often described as a set of tools that assist with managing and working with data. This is the typical explanation you’ll find in many articles online. However, this statement overlooks the crucial role of strategy and a well-articulated strategy for data platform design is far more critical than the individual features of any given data tool. I always begin a new data warehouse project with comprehensive planning and design sessions.

Proper organization of data environments also supports automated testing and continuous integration (CI) pipelines, ensuring that your data transformation scripts execute correctly according to the logic outlined in your business requirements. There are various methods for deploying data platform resources that will feed data into your data platform, and it’s beneficial to document these methods using metadata. This approach helps maintain clarity and efficiency throughout the project.

My experience suggests that it’s never just one data platform architecture type but a combination of all three that works best for your business goals.

Recommended read:

[1] https://towardsdatascience.com/user-churn-prediction-d43c53e6f6df

[2] https://towardsdatascience.com/how-to-become-a-data-engineer-c0319cb226c2

[3] https://medium.com/towards-data-science/how-data-engineering-evolved-since-2014-9cc85f37fea6

[4] https://medium.com/gitconnected/infrastructure-as-code-for-beginners-a4e36c805316

[5] https://pub.towardsai.net/orchestrate-machine-learning-pipelines-with-aws-step-functions-d8216a899bd5

[6] https://github.com/mage-ai/mage-ai-terraform-templates/blob/master/README.md

[7] https://docs.mage.ai/production/deploying-to-cloud/aws/setup

The post Building Scalable Data Platforms appeared first on Towards Data Science.

]]>
Advanced SQL for Data Science https://towardsdatascience.com/advanced-sql-for-data-science-43a045ae4143/ Sat, 24 Aug 2024 04:54:53 +0000 https://towardsdatascience.com/advanced-sql-for-data-science-43a045ae4143/ Expert techniques to elevate your analysis

The post Advanced SQL for Data Science appeared first on Towards Data Science.

]]>
This story delves into advanced SQL techniques that will be useful for data science practitioners. In this piece, I will provide a detailed exploration of expert-grade SQL queries I use daily in my Analytics projects. SQL, along with modern data warehouses, forms the backbone of data science. It is an indispensable tool for data manipulation and user behaviour analytics. The techniques I am going to talk about are designed to be practical and beneficial from the data science perspective. Mastery of SQL is a valuable skill, crucial for a wide range of projects, and these techniques have significantly streamlined my daily work. I hope it will be useful for you as well.


Given that SQL is the primary language used by data warehouse and business intelligence professionals, it’s an ideal choice for sharing data across data platforms. Its robust features facilitate seamless data modelling and visualization. It remains the most popular means of communication for any data team and nearly every data platform available in the market.

We will use BigQuery’s standard SQL dialect. It’s free and easy to run the queries I wrote and provided below.

Recursive CTEs

Similarly, we would use Python’s faker library, we can mock test data using recursive CTEs in SQL.

    WITH RECURSIVE
    CTE_1 AS (
        (SELECT 0 AS iteration)
        UNION ALL
        SELECT iteration + 1 AS iteration FROM CTE_1 WHERE iteration < 3
    )
    SELECT iteration FROM CTE_1
    ORDER BY 1 ASC

The output would be this:

Image by author.
Image by author.

In BigQuery and many other data warehouse solutions CTEs can be either non-recursive or recursive. The RECURSIVE keyword allows for recursion within the WITH clause (e.g., WITH RECURSIVE).

Recursive CTEs continue to execute until no new results are produced, making them well-suited for querying hierarchical and graph data. In our case execution will stop is defined by the where clause: FROM CTE_1 WHERE iteration < 3

In contrast, non-recursive CTEs execute only once.

The main difference is that a non-recursive CTE can only reference preceding CTEs and cannot reference itself, whereas a recursive CTE can reference itself, as well as preceding or subsequent CTEs.

Working with graphs

Using recursive CTE to work with graph data is very handy. In the data science world graphs are a pretty neat concept used almost everywhere. In Data Engineering I use dependency graphs a lot to demonstrate data lineage in my data pipeline.

We can use recursive SQL techniques to evaluate reachability in graphs. In the code snippet below we will find nodes that can be reached from node table_5 in a graph called SampleGraph

WITH RECURSIVE
  SampleGraph AS (
    --      table_1               table_5
    --      /                      / 
    --  table_2 - table_3    table_6   table_7
    --      |               /
    --   table_4       table_8
    SELECT 'table_1' AS from_node, 'table_2' AS to_node UNION ALL
    SELECT 'table_1', 'table_3' UNION ALL
    SELECT 'table_2', 'table_3' UNION ALL
    SELECT 'table_3', 'table_4' UNION ALL
    SELECT 'table_5', 'table_6' UNION ALL
    SELECT 'table_5', 'table_7' UNION ALL
    SELECT 'table_6', 'table_8' UNION ALL
    SELECT 'table_7', 'table_8'
  ),
  R AS (
    (SELECT 'table_5' AS node)
    UNION ALL
    (
      SELECT SampleGraph.to_node AS node
      FROM R
      INNER JOIN SampleGraph
        ON (R.node = SampleGraph.from_node)
    )
  )
SELECT DISTINCT node FROM R ORDER BY node;

Output:

image by author
image by author

Recursive CTEs are quite expensive and we would want to make sure they are used for the intended purpose. If your query doesn’t involve graphs or hierarchical data, it may be more efficient to explore alternatives, such as employing a LOOP statement in conjunction with a non-recursive CTE.

Also, be aware of infinite recursion. We wouldn’t want our SQL to run forever.

Fuzzy matching and approximate joins

It proves to be exceptionally useful in situations where we need to join two datasets with values that, while not identical, share a close resemblance. These scenarios require more sophisticated approaches to ensure accurate data matching. The fuzzy matching technique is a great example of the advanced SQL methods that data analysts often rely on in approximate JOINs.

To illustrate this, consider the following SQL snippet:

with people as (
select 'gmail' as domain, 'john.adams@gmail.com' as email
union all
select 'gmail' as domain, 'dave.robinson@gmail.com' as email
)

, linkedin as (
select
 'gmail'          as domain
,'Dave Robinson'  as name  
)

, similarity as (
select 
  linkedin.name   as name 
, linkedin.domain as domain
, people.email
, fhoffa.x.levenshtein(linkedin.name, people.email) similarity_score
from linkedin 
join people
 on linkedin.domain = people.domain
)

select
*
, row_number() over (partition by name order by similarity_score) as best_match
from 
similarity

We can apply proximity functions such as ngramdistance (available in Clickhouse) and levenshtein (BigQuery) to identify emails that resemble each other.

A lower score indicates a better match:

Image by author
Image by author

This approach proved to be very useful in tasks of matching entities, i.e. individuals, etc. from two separate datasets using their attributes, i.e. email addresses. This is a straightforward scenario when dealing with data from platforms like LinkedIn, Crunchbase, and similar sources where we need to align user information.

Calculating user activity and sessions using LEAD and LAG operators

Window functions proved to be very useful in Data Science.

Often we need to calculate sessions to aggregate user activity. The example below demonstrates how to do it in SQL.

Sql">
-- models/sessions.sql
-- mock some data
with raw_event_data as (
    select 'A' as user_id, timestamp_add(current_timestamp(), interval -1 minute) as timestamp union all
    select 'A' as user_id, timestamp_add(current_timestamp(), interval -3 minute) as timestamp union all
    select 'A' as user_id, timestamp_add(current_timestamp(), interval -5 minute) as timestamp union all
    select 'A' as user_id, timestamp_add(current_timestamp(), interval -36 minute) as timestamp union all
    select 'A' as user_id, timestamp_add(current_timestamp(), interval -75 minute) as timestamp

)
-- calculate sessions:
SELECT
    event.user_id || '-' || row_number() over(partition by event.user_id order by event.timestamp) as session_id
    , event.user_id
    , event.timestamp as session_start_at
    , lead(timestamp) over(partition by event.user_id order by event.timestamp) as next_session_start_at
FROM (
    SELECT
        e.user_id
        , e.timestamp
        , DATE_DIFF(
             e.timestamp
            ,LAG(e.timestamp) OVER(
                PARTITION BY e.user_id ORDER BY e.timestamp
                )
            , minute
            ) AS inactivity_time  
        FROM raw_event_data AS e
      ) as event
    WHERE (event.inactivity_time > 30 OR event.inactivity_time is null)

The output would be the following:

Calculating user sessions with SQL. Image by author.
Calculating user sessions with SQL. Image by author.

This is a widely used approach to get an aggregated activity the right way in a scenario when the data science team have to deal with raw user engagement event data.

The benefit of this approach is that we don’t need to rely on data engineers with their streaming techniques or maintain a Kafka server [1].

Mastering Data Streaming in Python

With this data model in place, answering the user analytics questions becomes straightforward. It can be a simple event count but it’s session analytics now. For instance, to compute the average session duration, we can utilize the following SQL:

SELECT
  COUNT(*) AS sessions_count,
  AVG(duration) AS average_session_duration
FROM (
  SELECT session_id
        , DATEDIFF(minutes, MIN(events.timestamp), MAX(events.timestamp)) AS duration
  FROM sessions
  LEFT JOIN events on events.user_id = sessions.user_id
        AND events.timestamp >= events.session_start_at
        AND (events.timestamp < sessions.next_session_start_at OR sessions.next_session_start_at is null)
  GROUP BY 1
)

Using NTILE()

NTILE() is a useful numbering function typically used in analytics to get a distribution of a metric, i.e. sales, revenue, etc. The most common SQL using NTILE() would look like this:

SELECT
    NTILE(4) OVER ( ORDER BY amount ) AS sale_group,
    product_id,
    product_category,
    soccer_team,
    amount as sales_amount
FROM sales
WHERE sale_date >= '2024-12-01' AND sale_date <= '2024-12-31';

It returns the distribution of sales ordered by amount in 4 even buckets.

I find it particularly handy for tracking metrics such as login duration in seconds for a mobile app. For instance, with my app connected to Firebase, I can monitor how long each user’s login process takes.

Image by author
Image by author

This function partitions rows into a specified number of buckets according to their order and assigns each row a bucket number as +1 – a constant integer expression. The rows within each bucket can vary by no more than one. Any remainder from dividing the total number of rows by the number of buckets is distributed evenly across the buckets, starting with bucket 1. If the specified number of buckets is NULL, 0, or negative, an error will be generated. The SQL below explains how I calculate median login duration times:

-- Looker Studio dataset:
select (case when tile = 50 then 'median' when tile = 95 then '95%' else '5%' end) as tile
    , dt
    , max(cast( round(duration/1000) as numeric)/1000 ) max_duration_s
    , min(cast( round(duration/1000) as numeric)/1000 ) min_duration_s

from (
    select 
         trace_info.duration_us duration
        , ntile(100) over (partition by (date(event_timestamp)) order by trace_info.duration_us) tile
        , date(event_timestamp) dt

    from firebase_performance.my_mobile_app 
    where 
        date(_partitiontime) >= parse_date('%y%m%d', @ds_start_date) and date(_partitiontime) <= parse_date('%y%m%d', @ds_end_date)
        and 
        date(event_timestamp) >= parse_date('%y%m%d', @ds_start_date)
        and 
        date(event_timestamp) <= parse_date('%y%m%d', @ds_end_date)
    and lower(event_type) = "duration_trace"
    and lower(event_name) = 'logon'
) x
WHERE tile in (5, 50, 95)
group by dt, tile
order by dt
;

Median and k-th percentile are valuable statistics for analyzing data

Using FOLLOWING AND PRECEDING

FOLLOWING AND PRECEDING are SQL operators we would want to use when we need to check a window before or after that particular record.

Moving average

This is often used to calculate a moving (rolling) average. Consider the SQL below. It explains how to do it and this is a standard task in data analysis.

-- mock data
with temperatures as (
    select 'A' as city, timestamp_add(current_timestamp(), interval -1 day) as timestamp  ,15 as temperature union all
    select 'A' as city, timestamp_add(current_timestamp(), interval -3 day) as timestamp  ,15 union all
    select 'A' as city, timestamp_add(current_timestamp(), interval -5 day) as timestamp  ,15 union all
    select 'A' as city, timestamp_add(current_timestamp(), interval -36 day) as timestamp ,20 union all
    select 'A' as city, timestamp_add(current_timestamp(), interval -75 day) as timestamp ,25

)

SELECT
  city,
  day,
  AVG(temperature) OVER(PARTITION BY city ORDER BY UNIX_DATE(date(timestamp)) 
                RANGE BETWEEN 14 PRECEDING AND CURRENT ROW) AS rolling_avg_14_days,
  AVG(temperature) OVER(PARTITION BY city ORDER BY UNIX_DATE(date(timestamp)) 
                RANGE BETWEEN 30 PRECEDING AND CURRENT ROW) AS rolling_avg_30_days
FROM (
  SELECT date(timestamp) day, city, temperature, timestamp
  FROM temperatures
)

We’ve mocked some data to illustrate the calculation and the output would be the following:

Moving average. Image by author.
Moving average. Image by author.

Indeed, it is very easy to prove that it worked by having just a simple glance at the image above.

Calculating Moving Average Convergence Divergence (MACD)

Widely used by investors, the Moving Average Convergence Divergence (MACD) is a technical indicator to pinpoint optimal market entry points for buying or selling.

MACD can also be calculated using PRECEDING

We will need a 26-period exponential moving average (EMA) and then subtract it from the 12-period EMA. The signal line, which helps to interpret the MACD, is a nine-period EMA of the MACD line itself.

The SQL below explains how to calculate it:

-- mock data
with temperatures as (
    select 'A' as city, timestamp_add(current_timestamp(), interval -1 day) as timestamp  ,15 as temperature union all
    select 'A' as city, timestamp_add(current_timestamp(), interval -3 day) as timestamp  ,15 union all
    select 'A' as city, timestamp_add(current_timestamp(), interval -5 day) as timestamp  ,15 union all
    select 'A' as city, timestamp_add(current_timestamp(), interval -12 day) as timestamp ,20 union all
    select 'A' as city, timestamp_add(current_timestamp(), interval -26 day) as timestamp ,25
)
, data as (
SELECT
  city,
  day,
  temperature,
  AVG(temperature) OVER(PARTITION BY city ORDER BY UNIX_DATE(date(timestamp)) 
                RANGE BETWEEN 12 PRECEDING AND CURRENT ROW) AS rolling_avg_12_days,
  AVG(temperature) OVER(PARTITION BY city ORDER BY UNIX_DATE(date(timestamp)) 
                RANGE BETWEEN 26 PRECEDING AND CURRENT ROW) AS rolling_avg_26_days
FROM (
  SELECT date(timestamp) day, city, temperature, timestamp
  FROM temperatures
)
)

select s.day,
    s.temperature,
    s.rolling_avg_12_days,
    s.rolling_avg_26_days,
    s.rolling_avg_12_days - l.rolling_avg_26_days as macd
from
 data s
join 
 data l
on
 s.day = l.day

Output:

Image by author.
Image by author.

Percentage change

This standard indicator can also be calculated using LEAD and LAG. The SQL below explains how to do it.

-- mock data
with temperatures as (
    select 'A' as city, timestamp_add(current_timestamp(), interval -1 day) as timestamp  ,15 as temperature union all
    select 'A' as city, timestamp_add(current_timestamp(), interval -3 day) as timestamp  ,15 union all
    select 'A' as city, timestamp_add(current_timestamp(), interval -5 day) as timestamp  ,15 union all
    select 'A' as city, timestamp_add(current_timestamp(), interval -12 day) as timestamp ,20 union all
    select 'A' as city, timestamp_add(current_timestamp(), interval -26 day) as timestamp ,25
)

SELECT
  city,
  day,
  temperature,

    (temperature - lag(temperature) over (order by day))*1.0/lag(temperature) over (order by day)*100
FROM (
  SELECT date(timestamp) day, city, temperature, timestamp
  FROM temperatures
)

Output:

Image by author.
Image by author.

Marketing analytics using FOLLOWING AND UNBOUNDED FOLLOWING

Similar to PRECEDING, this is particularly useful when we need to compile a list of items, such as events or purchases, to create a funnel dataset. Using PARTITION BY allows you to group all subsequent events within each partition, regardless of their quantity.

A good example of this concept is marketing funnels.

Our dataset might include a series of recurring events of the same type, but ideally, you want to link each event to the subsequent one of a different type.

Let’s imagine we need to get all events after a user had join_group an event in their funnel. The code below explains how to do it:

-- mock some data
with d as (
select * from unnest([
  struct('0003f' as user_pseudo_id, 12322175 as user_id, timestamp_add(current_timestamp(), interval -1 minute) as event_timestamp, 'join_group' as event_name),
  ('0003',12,timestamp_add(current_timestamp(), interval -1 minute),'set_avatar'),
  ('0003',12,timestamp_add(current_timestamp(), interval -2 minute),'set_avatar'),
  ('0003',12,timestamp_add(current_timestamp(), interval -3 minute),'set_avatar'),
  ('0003',12,timestamp_add(current_timestamp(), interval -4 minute),'join_group'),
  ('0003',12,timestamp_add(current_timestamp(), interval -5 minute),'create_group'),
  ('0003',12,timestamp_add(current_timestamp(), interval -6 minute),'create_group'),
  ('0003',12,timestamp_add(current_timestamp(), interval -7 minute),'in_app_purchase'),
  ('0003',12,timestamp_add(current_timestamp(), interval -8 minute),'spend_virtual_currency'),
  ('0003',12,timestamp_add(current_timestamp(), interval -9 minute),'create_group'),
  ('0003',12,timestamp_add(current_timestamp(), interval -10 minute),'set_avatar')
  ]
  ) as t)

  , event_data as (
SELECT 
    user_pseudo_id
  , user_id
  , event_timestamp
  , event_name
  , ARRAY_AGG(
        STRUCT(
              event_name AS event_name
            , event_timestamp AS event_timestamp
        )
    ) 
    OVER(PARTITION BY user_pseudo_id ORDER BY event_timestamp ROWS BETWEEN 1 FOLLOWING AND  UNBOUNDED FOLLOWING ) as next_events

FROM d
WHERE
DATE(event_timestamp) = current_date()

)

-- Get events after each `join_group` event per user
select
*
from event_data t
where event_name = 'join_group'
;

I previously wrote about it here [2] and there is a more advanced example of marketing funnels:

Advanced SQL techniques for beginners

Exploratory data analysis

It’s often more efficient to conduct analysis directly on the data within your tables using SQL before progressing to ML, AI, data science, or engineering tasks. In fact, you can now even build machine learning models using SQL – BigQuery ML is a prime example of this capability. The trend is clear: everything is increasingly shifting towards data warehouses.

EDA. Image by author.
EDA. Image by author.

Getting unique column values is easily done using Pandas but can we do it in SQL?

The SQL snippet below provides a handy script to achieve this. Run this code in BigQuery (replace ‘your-client’ with your project name):

DECLARE columns ARRAY<STRING>;
DECLARE query STRING;
SET columns = (
  WITH all_columns AS (
    SELECT column_name
    FROM `your-client.staging.INFORMATION_SCHEMA.COLUMNS`
    WHERE table_name = 'churn'
  )
  SELECT ARRAY_AGG((column_name) ) AS columns
  FROM all_columns
);

SET query = (select STRING_AGG('(select count(distinct '||x||')  from `your-client.staging.churn`) '||x ) AS string_agg from unnest(columns) x );
EXECUTE IMMEDIATE 
"SELECT  "|| query
;

Output:

EDA. Image by author.
EDA. Image by author.

Describing the datasets

We can use SQL scripts to describe tables we have in our data warehouse. I will slightly change the SQL mentioned above and add mean, max, min, median, 0.75 tile, 0.25 tile so the final SQL would be like this:

DECLARE columns ARRAY<STRING>;
DECLARE query1, query2, query3, query4, query5, query6, query7 STRING;
SET columns = (
  WITH all_columns AS (
    SELECT column_name
    FROM `your-client.staging.INFORMATION_SCHEMA.COLUMNS`
    WHERE table_name = 'churn' 
        and  data_type IN ('INT64','FLOAT64')
  )
  SELECT ARRAY_AGG((column_name) ) AS columns
  FROM all_columns
);

SET query1 = (select STRING_AGG('(select stddev( '||x||')  from `your-client.staging.churn`) '||x ) AS string_agg from unnest(columns) x );
SET query2 = (select STRING_AGG('(select avg( '||x||')  from `your-client.staging.churn`) '||x ) AS string_agg from unnest(columns) x );
SET query3 = (select STRING_AGG('(select PERCENTILE_CONT( '||x||', 0.5) over()  from `your-client.staging.churn` limit 1) '||x ) AS string_agg from unnest(columns) x );
SET query4 = (select STRING_AGG('(select PERCENTILE_CONT( '||x||', 0.25) over()  from `your-client.staging.churn` limit 1) '||x ) AS string_agg from unnest(columns) x );
SET query5 = (select STRING_AGG('(select PERCENTILE_CONT( '||x||', 0.75) over()  from `your-client.staging.churn` limit 1) '||x ) AS string_agg from unnest(columns) x );
SET query6 = (select STRING_AGG('(select max( '||x||')  from `your-client.staging.churn`) '||x ) AS string_agg from unnest(columns) x );
SET query7 = (select STRING_AGG('(select min( '||x||')  from `your-client.staging.churn`) '||x ) AS string_agg from unnest(columns) x );

EXECUTE IMMEDIATE (
"SELECT 'stddev' ,"|| query1 || " UNION ALL " ||
"SELECT 'mean'   ,"|| query2 || " UNION ALL " ||
"SELECT 'median' ,"|| query3 || " UNION ALL " ||
"SELECT '0.25' ,"|| query4 || " UNION ALL " ||
"SELECT '0.75' ,"|| query5 || " UNION ALL " ||
"SELECT 'max' ,"|| query6 || " UNION ALL " ||
"SELECT 'min' ,"|| query7
)
;

It generates all standard EDA metrics:

Describe() in SQL. Image by author.
Describe() in SQL. Image by author.

EDA can be easily performed using SQL

For instance, we can apply SQL to analyse the correlation between two variables, i.e. CreditScore and Balance. The beauty of SQL-based solutions is that we can easily visualize the results and create scatterplots between all variables using modern BI tools.

Variable distribution. Image by author.
Variable distribution. Image by author.

For instance, in one of my previous stories I compared EDA in SQL and Pandas to calculate such metrics as Standard Deviation and Correlation Matrix [3].

Exploratory Data Analysis with BigQuery SQL? Easy!

Conclusion

Time series analytics is an essential part of data science. In this story, I’ve covered the most popular SQL for data science use cases. I used these queries quite often I hope this will be useful for you in your data science projects.

With SQL scripting, we can automate queries, perform Exploratory Data Analysis, and visualize results directly in any Business Intelligence tool. Modern data warehouses have built-in machine learning tools, i.e. BigQuery ML, etc. and it simplifies ML modelling too.

User Churn Prediction

While Python remains a powerful tool for data scientists, offering robust scripting features, SQL can efficiently handle EDA tasks as well. For visualizing results, an SQL-like setup offers a superior dashboarding experience. Once the dashboard is configured, there’s no need to rerun queries or notebooks, making it a one-time setup that streamlines the process. Adding a modern data modelling tool to this environment setup will put everything to an even higher level of automation with robust data quality checks and unit testing.

Advanced Data Modelling

Recommended read:

[1] https://medium.com/towards-data-science/mastering-data-streaming-in-python-a88d4b3abf8b

[2] https://towardsdatascience.com/advanced-sql-techniques-for-beginners-211851a28488

[3] https://medium.com/towards-data-science/exploratory-data-analysis-with-bigquery-sql-easy-69895ac4eb9e

[4] https://www.mssqltips.com/sqlservertip/7164/sql-while-loop-examples-alternatives/

[5] https://cloud.google.com/bigquery/docs/recursive-ctes

[6] https://medium.com/towards-data-science/advanced-data-modelling-1e496578bc91

The post Advanced SQL for Data Science appeared first on Towards Data Science.

]]>
The Evolution of SQL https://towardsdatascience.com/the-evolution-of-sql-8d017ce566ff/ Sun, 18 Aug 2024 14:03:10 +0000 https://towardsdatascience.com/the-evolution-of-sql-8d017ce566ff/ Unlocking the power of large language models

The post The Evolution of SQL appeared first on Towards Data Science.

]]>
In this article, I will examine how large language models (LLMs) can convert natural language into SQL, making query writing more accessible to non-technical users. The discussion will include practical examples that showcase the ease of developing LLM-based solutions. We’ll also cover various use cases and demonstrate the process by creating a simple Slack application. Building an AI-driven database querying system involves several critical considerations, including maintaining security, ensuring data relevance, managing errors, and properly training the AI. In this story, I explored the quickest way to tackle these challenges and shared some tips for setting up a solid and efficient text-to-SQL query system.


Lately, it’s hard to think of any technology more impactful and widely discussed than large language models. Llm-based applications are now the latest trend, much like the surge of Apple or Android apps that once flooded the market. It is used everywhere in BI space and I previously wrote about it here [1]

Artificial Intelligence in Analytics

Creating an AI-powered database querying system is a complex task. You’ll need to deal with many important factors like keeping things secure, ensuring data is relevant, handling errors, and training the AI properly. In this story, I explored the quickest way to tackle these challenges.

For example, I built this AI chatbot for Slack in 15 minutes using my old repository template for Serverless API and AWS Lambda function:

Image by author
Image by author

Text-to-SQL, reliability and RAGs

Generally speaking, simple Text-to-SQL models are absolutely unreliable and this would be the most common complaint for AI developers I saw in the past:

It looks right but in real life, this SQL is a complete nonsense.

If we’re working with an LLM that generates SQL queries, it can significantly improve if it has access to your database’s Data Definition Language (DDL), metadata, and a collection of well-crafted, optimized queries. By integrating this data, the LLM can produce SQL queries that are not only more reliable but also safer and better optimized for your specific database.

To boost the reliability of SQL generation, one effective approach is to use retrieval-augmented generation (RAG).

In simple terms, RAG allows an LLM to enhance its responses by pulling in additional, relevant data.

This means the model doesn’t just rely on its pre-existing knowledge but can access extra information to tailor its output more accurately to your needs. This method helps ensure that the generated queries align with the actual structure and requirements of your database, making them more effective and reducing the risk of errors.

Text-to-SQL model pain points and limitations

Providing an LLM with written instructions and context is a basic example of in-context learning, where the model derives its output based on the input provided during inference. However, this approach has inherent limitations:

Prompt Sensitivity: Since LLMs predict the next token based on the given input, slight variations in wording can lead to significantly different responses. The output of an LLM is highly dependent on the exact phrasing of the input. This sensitivity to phrasing rather than meaning can result in inconsistent outputs.

Reliability: Simple prompt-based SQL generators are generally unsuitable for enterprise use due to their unreliability. LLMs are prone to generating plausible-sounding but entirely fabricated information. In SQL generation, this can result in queries that appear correct but are fundamentally flawed, often creating fictitious tables, columns, or values.

It might look right but in real life, it will be complete nonsense.

Context Windows: LLMs have a limited capacity for input text or tokens, constrained by their architecture. For instance, ChatGPT 3.5 has a token limit of 4096, which may not suffice for comprehensively understanding a large SQL database with hundreds of tables and columns.

How to build a RAG

There are several robust Python libraries designed for general-purpose applications based on language models, such as LangChain and LlamaIndex. These libraries are great but there are others that are tailored specifically for Text-to-SQL needs, e.g. WrenAI and Vanna. For instance, Vanna.ai offers a targeted approach and is designed to simplify the integration of LLMs with your database, providing secure connections and options for self-hosting. This tool removes much of the complexity, making it easier to leverage LLMs for your specific application without the overhead of more general-purpose libraries.

RAG development process. Source: Vanna.ai
RAG development process. Source: Vanna.ai

It works in two steps:

  1. Train a RAG "model" on your data using any LLM (below). All you need is the API key
  2. Start asking questions.
LLM models. Image by author
LLM models. Image by author
Model training. Image by author.
Model training. Image by author.

Alternatively, you can use a pre-trained chinook Vanna model like so:

# create a Python environment
python3 -m venv env
source env/bin/activate
pip3 install --upgrade pip
pip3 install vanna
# Run get_sql.py
import vanna as vn
from vanna.remote import VannaDefault
# Get your api key from vanna.ai nad replace here:
vn = VannaDefault(model='chinook', api_key='your-api-key')
vn.connect_to_sqlite('https://vanna.ai/Chinook.sqlite')
vn.ask('What are the top 10 artists by sales?')

The terminal output will be the following:

...
LLM Response: SELECT a.ArtistId, a.Name, SUM(il.Quantity) AS TotalSales
FROM Artist a
JOIN Album al ON a.ArtistId = al.ArtistId
JOIN Track t ON al.AlbumId = t.AlbumId
JOIN InvoiceLine il ON t.TrackId = il.TrackId
GROUP BY a.ArtistId, a.Name
ORDER BY TotalSales DESC
LIMIT 10;
Extracted SQL: SELECT a.ArtistId, a.Name, SUM(il.Quantity) AS TotalSales
FROM Artist a
JOIN Album al ON a.ArtistId = al.ArtistId
JOIN Track t ON al.AlbumId = t.AlbumId
JOIN InvoiceLine il ON t.TrackId = il.TrackId
GROUP BY a.ArtistId, a.Name
ORDER BY TotalSales DESC
LIMIT 10;
SELECT a.ArtistId, a.Name, SUM(il.Quantity) AS TotalSales
FROM Artist a
JOIN Album al ON a.ArtistId = al.ArtistId
JOIN Track t ON al.AlbumId = t.AlbumId
JOIN InvoiceLine il ON t.TrackId = il.TrackId
GROUP BY a.ArtistId, a.Name
ORDER BY TotalSales DESC
LIMIT 10;
   ArtistId                     Name  TotalSales
0        90              Iron Maiden         140
1       150                       U2         107
2        50                Metallica          91
3        22             Led Zeppelin          87
4       113  Os Paralamas Do Sucesso          45
5        58              Deep Purple          44
6        82            Faith No More          42
7       149                     Lost          41
8        81             Eric Clapton          40
9       124                   R.E.M.          39
Image by author.
Image by author.

WrenAI is another great open-source tool doing a similar thing. It is designed to streamline the process of querying data by converting natural language into SQL. WrenAI is compatible with various data sources, including DuckDB, MySQL, Microsoft SQL Server, and BigQuery. Additionally, it supports both open and local LLM inference endpoints, such as OpenAI’s GPT-3-turbo, GPT-4, and local LLM servers via Ollama. We can use entity relationships to train the model.

In this case, our model becomes more accurate as we provide extra data about our database:

Image by author
Image by author

This drag-and-drop style UI simplifies and improves model training a lot.

In each relationship, you can edit, add, or delete semantic connections between models, enabling the LLM to understand whether the relationships are one-to-one, one-to-many, or many-to-many.

Indeed, we don’t need to worry about our SQL semantics once it is defined.

Semantic layer and model training

Another critical consideration in developing an AI-powered database querying system is determining the appropriate tables and columns to which the AI should be granted access.

The selection of these data sources is pivotal, as it directly affects the accuracy and performance of the generated queries, as well as the overall system efficiency.

As I mentioned earlier, providing more detailed information about your database is crucial for accuracy and reliability. Data Definition Language (DDL) captures the structural aspects of a database, detailing elements such as tables, columns, and their interrelationships. Vanna excels in this domain compared to standard prompt-based SQL engines. The following code demonstrates how to retrieve DDL statements for SQLite.

Consider this code snippet below. It explains how to connect to your database and train your RAG model. In my case it will be BigQuery:

# train.py
# Connect to BigQuery
vn.connect_to_bigquery(project_id='my-project')
# The information schema query may need some tweaking depending on your database. This is a good starting point.
df_information_schema = vn.run_sql("SELECT * FROM INFORMATION_SCHEMA.COLUMNS")

# This will break up the information schema into bite-sized chunks that can be referenced by the LLM
plan = vn.get_training_plan_generic(df_information_schema)
plan

# If you like the plan, then uncomment this and run it to train
# vn.train(plan=plan)

# Training on SQL queries:
question = "How many albums did each customer buy?"
sql = vn.generate_sql(question)

display(sql)

#Optional if the response by Vanna is exactly as you intend, you can add in the training data
vn.train(question = question, sql=sql)

By using this code above, you can iteratively input queries and assess their outputs. You can then choose to either let Vanna learn from the results or specify preferred queries for it to adapt.

Overloading the system with too many tables and columns can result in increased token counts, elevated costs, and potentially diminished accuracy due to the language model’s risk of confusion or loss of key details. Conversely, insufficient data access limits the AI’s ability to generate precise and effective queries.

This is why this approach is very useful.

A few things to consider:

  • Data Quality and Consistency: Select well-maintained, consistently updated data. Inconsistent or incomplete data can lead to inaccurate results and erode user trust.
  • Security and PII: Ensure sensitive data is protected. Implement measures like data masking or tokenization to secure confidential information while enabling the AI to access relevant data.
  • Relevance to Users: Choose tables and columns that are most relevant to the users’ likely questions. This ensures the AI has the necessary data to produce accurate and useful queries.
  • SQL query Performance: Large or complex tables can slow down the AI’s query performance. Select tables and columns that are indexed and optimized to maintain efficient query generation and execution.

Maintaining the history of interactions

This is another common pain point in LLM development. Unlike common misconceptions, LLMs do not remember or learn about your specific data or system from individual queries unless they are explicitly trained with that information.

Each request to an untrained LLM is processed based on its most recent training data, not previous user interactions.

To generate accurate queries, it’s essential to provide the LLM with your chat history with each request. This should include relevant details about your schema and example queries, ensuring that the LLM can generate precise queries tailored to your data.

Training an AI-driven query system involves an iterative process of refinement and enhancement.

Text-to-SQL development best practices

A major concern with text-to-SQL in AI-powered database querying is the risk of unintended modifications to the database. To address this, it is crucial to implement measures that ensure the AI does not alter the underlying data.

  • Make sure your generated SQL is validated: Introduce a query validation layer that reviews AI-generated queries before they are executed. This layer should filter out any commands that could modify the database, such as INSERT, UPDATE, DROP, etc. By validating queries before they are processed, you can prevent unintended changes to the database.
  • AI service access permissions: Ensure that the AI system is granted only read-only access to the database. This restriction prevents any accidental or malicious changes to the data, preserving database integrity while still allowing the AI to generate queries for data retrieval.
  • Monitor SQL query performance: Keeping an eye on usage and query performance metrics is always a good idea.
  • Focus on insights: AI-generated SQL queries are excellent for data retrieval, but their true potential is realized when paired with advanced data analysis. By integrating these queries with analytical tools and workflows, you can uncover deeper insights and make more informed, data-driven decisions.
  • Custom error handling: Even with meticulous model training and optimization of your text-to-SQL system, there may still be scenarios when generated queries contain parsing errors or have no results. Therefore, it is crucial to implement a mechanism for retrying the query generation and offering constructive feedback to the user in such scenarios. It will enhance the effectiveness and resilience of your text-to-SQL model and improve user experience.

By incorporating these validation mechanisms, you can ensure that AI-generated queries are both secure and reliable, reducing the risk of unintended database modifications and avoiding common query-related issues. This approach not only saves time and resources but also promotes a data-driven culture, where decisions are grounded in accurate and current insights.

Having these policies in place your organization can efficiently harness the full potential of its data, allowing non-technical users to access and analyze information on their own.

Building an AI-powered Slack bot assistant

For this, we will need our OpenAI API key, Slack account and AWS account to deploy the serverless API with a Lambda function.

High-level application logic:

  1. Slack application will POST text messages to our API
  2. Our serverless API deployed in AWS will send a Slack message to AWS Lambda
  3. AWS Lambda will ask OpenAI API for a response and will send it back to Slack

Go to Slack Apps and create a new application: https://api.slack.com/apps

Image by author
Image by author

Click "From scratch" and give it a name:

Image by author
Image by author

Next, let’s add a slash command to trigger our bot:

Image by author
Image by author

In the Edit Command section provide our serverless API endpoint (Request URL):

Image by author
Image by author

Finally, let’s Install it to our Slack workspace:

Image by author
Image by author

Let’s create a microservice to process Slack messages received by our bot. Our AWS Lambda application code can look like this if written in Node.js:

# app.js
const AWS = require('aws-sdk');
AWS.config.update({region: "eu-west-1"});

const axios = require('axios');

const OPENAI_API_KEY = process.env.openApiKey || 'OpenAI API key';

exports.handler = async (event, context) => {
    console.log("app.handler invoked with event "+ JSON.stringify(event,null,2));
    try {
        context.succeed( await processEvent(event) );
    } catch (e) {
        console.log("Error: "+JSON.stringify(e));
        context.done(e)
    }
};

let processEvent = async (event) => {

    /**
     * Adding command parser fro Slack commands
     */
    function commandParser(slashCommand) {
        let hash;
        let myJson = {};
        let hashes = slashCommand.slice(slashCommand.indexOf('?') + 1).split('&amp;');
        for (let i = 0; i < hashes.length; i++) {
            hash = hashes[i].split('=');
            myJson[hash[0]] = hash[1];

        }
        myJson.timestamp = Date.now();
        return myJson;
    };

    try {
        let channel_id = commandParser(event.body).channel_id;
        let user_name = commandParser(event.body).user_name;
        let txt = commandParser(event.body).text;
        // Get response:
        let message =  await processMessageText(txt,user_name,channel_id);
        return {
            "statusCode": 200,
            "headers": {},
            "body": JSON.stringify(message), 
            "isBase64Encoded": false
        };

    } catch (err) {
        console.log('Error handling event', err);
        return {
            "statusCode": 500,
            "headers": {},
            "body": '{}',
            "isBase64Encoded": false
        };
    }

};

const processMessageText = async(txt, user_name, channel_id) => {

    let Response = await fetchAi(txt);
    let message = {
        response_type: 'in_channel',
        text: `@${user_name} , ${Response} `
    };

    return message;
};

const fetchAi = async(prompt) => {

    try {

        const response = await axios.post(
            'https://api.openai.com/v1/chat/completions',
            {
              model: "gpt-3.5-turbo",
              messages: [{ role: "user", content: prompt }],
              max_tokens: 150,
              temperature: 0.7,
            },
            {
              headers: {
                'Authorization': `Bearer ${OPENAI_API_KEY}`,
                'Content-Type': 'application/json',
              },
            }
          );

          const generatedText = response.data.choices[0].message.content;

    return generatedText;
    } catch (e) {
        return [{NOW: 'You are unable to get a response atm.'}];
    }

};

Deploy our API and serverless application with AWS Cloudformation or Terraform and we are ready to go!

AI Slack bot. Image by author.
AI Slack bot. Image by author.

In one of my previous articles, I discussed the benefits of deploying applications with Infrastructure as Code [2].

Infrastructure as Code for Beginners

Conclusion

Lately, it’s hard to think of any technology more impactful and widely discussed than large language models. LLM-based applications are now the latest trend, much like the surge of Apple or Android apps that once flooded the market. Training models using DDL statements, custom queries, metadata or documents to refine definitions simplifies the process of LLM development. For instance, if your business uses custom metrics, supplying this additional context will help it generate more accurate and relevant outputs.

Building an AI-powered database querying system is not a trivial task. You’ll need to deal with many important factors like keeping things secure, ensuring data is relevant, handling errors, and training the AI properly. Training an AI-driven query system involves an iterative process of refinement and enhancement. By developing a comprehensive reference guide, supplying diverse example queries, and managing context windows effectively, you can build a robust system that facilitates quick and accurate data retrieval for users.

In this story, I explored the quickest way to tackle these challenges and shared some tips for setting up a solid and efficient AI query system.

Recommended read:

[1] https://medium.com/towards-data-science/artificial-intelligence-in-analytics-f11d2deafdf0

[2] https://medium.com/gitconnected/infrastructure-as-code-for-beginners-a4e36c805316

[3] https://platform.openai.com/docs/quickstart?context=node

[4] https://github.com/openai/openai-node/tree/master/examples

[5] https://api.slack.com/legacy/enabling-bot-users

The post The Evolution of SQL appeared first on Towards Data Science.

]]>
Mastering Data Streaming in Python https://towardsdatascience.com/mastering-data-streaming-in-python-a88d4b3abf8b/ Fri, 16 Aug 2024 15:09:30 +0000 https://towardsdatascience.com/mastering-data-streaming-in-python-a88d4b3abf8b/ Best Practices for Real-Time Analytics

The post Mastering Data Streaming in Python appeared first on Towards Data Science.

]]>
In this article, I will address the key challenges data engineers may encounter when designing streaming data pipelines. We’ll explore use case scenarios, provide Python code examples, discuss windowed calculations using streaming frameworks, and share best practices related to these topics.

In many applications, having access to real-time and continuously updated data is crucial. Fraud detection, churn prevention and recommendations are the best candidates for streaming. These data pipelines process data from various sources to multiple target destinations in real time, capturing events as they occur and enabling their transformation, enrichment, and analysis.


Streaming data pipeline

In one of my previous articles, I described the most common data pipeline design patterns and when to use them [1].

Data pipeline design patterns

A data pipeline is a sequence of data processing steps, where each stage’s output becomes the input for the next, creating a logical flow of data.

A data pipeline exists whenever data is processed between two points, such as from source to destination

The three key components of a data pipeline are the source, the processing step(s), and the destination. For example, data extracted from an external API (source) can be loaded into a data warehouse (destination), illustrating a common scenario where the source and destination are distinct.

In streaming, the source is typically a publisher service, while the destination is a consumer – such as an application or another endpoint – of the processed data. This data often undergoes transformations using windowed calculations. A great example would be a session window defined by an inactivity period following the last event (Google Analytics 4, etc).

Conceptual data pipeline design. Image by author
Conceptual data pipeline design. Image by author

Actual application example

Consider a simple stream data processing application example built with Python, Kafka and Faust below. High-level application logic would be the following:

  1. API service app/app_main.py allows to POST valid user engagement events to Kafka producer topic. These events can be collected either from the website, a mobile application or sent by another service such as a data publisher of some sort.
{
    "event_type": "page_view",
    "user_id": "e659e3e7-22e1-4a6b",
    "action": "alternative_handset",
    "timestamp": "2024-06-27T15:43:43.315342",
    "metadata": {
        "session_id": "4b481fd1-9973-4498-89fb",
        "page": "/search",
        "item_id": "05efee91",
        "user_agent": "Opera/8.81.(X11; Linux x86_64; hi-IN) Presto/2.9.181 Version/12.00"
    }
}

Event validation can be performed by pydantic and accepts events with valid types and actions, etc. [2]

Our application constantly consumes processed events from consumer topics and sends them to WebSocket so real-time processing can be visualized.

Python for Data Engineers

  1. Data processing service consumes raw events from producer topic, then applies a window calculation (tumbling table) and sends aggregated results by user to consumer topic every 10 seconds. This can be some sort of a streaming framework like kafka-streams or faust .
  2. This stream of constantly processed data can be instantly consumed by API service and visualized at localhost:8000/monitor.

For example, we can process raw user engagement events every 10 seconds to generate a user leaderboard based on a simple count of events per user

Streaming leaderboard app example. Image by author.
Streaming leaderboard app example. Image by author.
Python">import os
import json
import logging
import asyncio
import uvicorn
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, HTTPException
from fastapi.staticfiles import StaticFiles
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from app.models import Event

from .http import events, users
from dotenv import load_dotenv
load_dotenv()

logging.basicConfig(level=logging.INFO)

# kafka_brokers = os.getenv("REDPANDA_BROKERS")
kafka_brokers = (
    "redpanda-0:9092"
    if os.getenv("RUNTIME_ENVIRONMENT") == "DOCKER"
    else "localhost:19092"
)
consumer_topic = os.getenv("CONSUMER_TOPIC")
producer_topic = os.getenv("PRODUCER_TOPIC")
error_topic = os.getenv("ERROR_TOPIC")

def kafka_json_deserializer(serialized):
    return json.loads(serialized)

@asynccontextmanager
async def startup(app):
    app.producer = AIOKafkaProducer(
        bootstrap_servers=[kafka_brokers],)
    await app.producer.start()

    app.consumer = AIOKafkaConsumer(
        consumer_topic,
        # "agg-events",
        # group_id="demo-group",
        # loop=loop,
        bootstrap_servers=[kafka_brokers],
        enable_auto_commit=True,
        auto_commit_interval_ms=1000,  # commit every second
        auto_offset_reset="earliest",  # If committed offset not found, start from beginning
        value_deserializer=kafka_json_deserializer,
    )
    await app.consumer.start()

    yield

app = FastAPI(lifespan=startup)
app.mount("/static", StaticFiles(directory="static"), name="static")

app.include_router(events.router)
app.include_router(users.router)

# WebSocket endpoint
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    async def send_message_to_websocket(msg):
        text = str(msg.value)
        await websocket.send_text(text)

    async def consume_from_topic(topic, callback):
        print(f"Consuming from {topic}")
        async for msg in app.consumer:
            print(f"Received message: {msg.value}")
            await callback(msg)

    # Start consuming
    asyncio.create_task(consume_from_topic(consumer_topic, send_message_to_websocket))

    # Keep the connection open
    while True:
        await asyncio.sleep(3)

@app.post("/track")
async def send_event_to_topic(event: Event):

    try:
        data = event.model_dump_json()
        data = data.encode()

        # # Validate the presence of required fields
        # We could do something like this but Pydantic will do
        # everything for us.
        # if "user_id" not in data or "action" not in data:
        #     raise HTTPException(
        #         status_code=422, detail="Incomplete data provided")
        user_id = event.user_id

        # Send filename to Redpanda
        await app.producer.send(producer_topic, data)
        # Returning a confirmation message
        return {"message": "User data submitted successfully!",
                "user_data": {"user_id": user_id}}

    except HTTPException as e:
        # Re-raise HTTPException to return the specified
        # status code and detail
        print(e)
        raise e
    except Exception as e:
        # Handle other unexpected exceptions and return a
        # 500 Internal Server Error
        print(e)
        raise HTTPException(
            status_code=500, detail=f"An error occurred: {str(e)}")

if __name__ == "__main__":
    uvicorn.run("app.app_main:app", host="0.0.0.0", port=8000)

# Run:
# uvicorn app.app_main:app --reload --host 0.0.0.0 --port 8000
# python -m app.app_main api -l info

The code for the stream-processing service can be found further down below.

Kafka, Kinesis, RabbitMQ and other stream-processing tools

Let’s take a look into popular data Streaming platforms and frameworks that proved themselves most useful over the last couple of years.

  • Apache Spark – a framework for distributed data computing for large-scale analytics and complex data transformations.
  • Apache Kafka – a real-time data pipeline tool with a distributed messaging system for applications. It uses a publish-subscribe model where producers send data to topics, and consumers pull data from those topics. Each topic is split into partitions that are replicated across different servers for better availability and to balance the load. Plus, Kafka has built-in fault tolerance, so you can set the replication factor and how many in-sync replicas (ISRs) you want for each topic. This means your data stays accessible, even if some servers go down.
  • AWS Kinesis is a real-time streaming platform for analytics and applications. I previously wrote about it here [3].

Building a Streaming Data Pipeline with Redshift Serverless and Kinesis

  • Google Cloud Dataflow – Google’s streaming platform for real-time event processing and analytics pipelines.
  • Apache Flink – a distributed streaming data platform designed for low-latency data processing.
  • RabbitMQ is an open-source message broker that facilitates communication between applications. It uses a queuing system based on the Advanced Message Queuing Protocol (AMQP), allowing you to send, receive, and route messages efficiently. RabbitMQ helps decouple application components, enabling them to work independently and scale easily.

Kafka is one of my favourite distributed streaming platforms that lets you publish and subscribe to data streams, process them in real time, and store them reliably. It was built for Java but is now also available for Python developers (kafka-python)).

One thing I like about it is a built-in window methods that simplify session calculations.

For example, using a faust-streamingframework this can be achieved with ease. Consider this code below. It demonstrates how our application would calculate a windowed aggregation for each user:


import os
import random
from datetime import datetime, timedelta
import faust
SINK = os.getenv("CONSUMER_TOPIC")
TOPIC = os.getenv("PRODUCER_TOPIC")

BROKER = (
    "redpanda-0:9092"
    if os.getenv("RUNTIME_ENVIRONMENT") == "DOCKER"
    else "localhost:19092"
)

TABLE = "tumbling-events"
CLEANUP_INTERVAL = 1.0
WINDOW = 10  # 10 seconds window
WINDOW_EXPIRES = 10
PARTITIONS = 1

app = faust.App("event-stream", broker=f"kafka://{BROKER}")

app.conf.table_cleanup_interval = CLEANUP_INTERVAL
source = app.topic(TOPIC, value_type=Event)
sink = app.topic(SINK, value_type=UserStats)

@app.timer(interval=3.0, on_leader=True)
async def generate_event_data():
    events_topic = app.topic(TOPIC, key_type=str, value_type=Event)
    allowed_events = [e.value for e in AllowedEvents]
    allowed_actions = [e.value for e in AllowedActions]

    # Create a loop to send data to the Redpanda topic
    # Send 20 messages every time the timer is triggered (every 5 seconds)
    for i in range(20):
        # Send the data to the Redpanda topic

        await events_topic.send(
            key=random.choice(["User1", "User2", "User3", "User4", "User5"]),
            value=Event(
                event_type=random.choice(["page_view", "scroll"]),
                user_id=random.choice(["User1", "User2", "User3", "User4", "User5"]),  # noqa: E501
                action=random.choice(["action_1", "action_2"]),
                timestamp=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),

            ),
        )

    print("Producer is sleeping for 3 seconds 😴 ")

def window_processor(key, events):
    try:
        timestamp = key[1][0]  # key[1] is the tuple (ts, ts + window)
        print(f'key:: {key}')

        users = [event.user_id for event in events]
        count = len(users)

        user_counter = Counter(event.user_id for event in events)
        for i, (user, count) in enumerate(user_counter.items()):
            print(f"{i}. {user}: {count}")
            aggregated_event = UserStats(
                timestamp=timestamp, user_id=user, count=count
            )
            print(
                f"Processing window: {len(users)} events, Aggreged results: {aggregated_event}"  # noqa: E501
            )
            sink.send_soon(value=aggregated_event)

    except Exception as e:
        print(e)

tumbling_table = (
    app.Table(
        TABLE,
        default=list,
        key_type=str,
        value_type=Event,
        partitions=PARTITIONS,
        on_window_close=window_processor,
    )
    .tumbling(WINDOW, expires=timedelta(seconds=WINDOW_EXPIRES))
    .relative_to_field(Event.timestamp)
)

@app.agent(app.topic(TOPIC, key_type=str, value_type=Event))
async def calculate_tumbling_events(events):
    async for event in events:
        value_list = tumbling_table["events"].value()
        value_list.append(event)
        tumbling_table["events"] = value_list

if __name__ == "__main__":
    app.main()

Now type this in the command line to launch our steaming module:

faust -A streamer.event_stream worker -l info

As a result, we should see event aggregation happening in real time:

Image by author
Image by author

Choosing the right Python client

Some of the popular Python clients for Kafka are confluent-kafka, pykafka and kafka-python.

confluent-kafka is a Python wrapper for the high-performance librdkafka C library, offering full Kafka feature support along with extras like AVRO serialization and schema registry integration. It includes both high-level and low-level APIs for greater flexibility and control over your Kafka applications.

Big Data File Formats, Explained

pykafka and kafka-python clients are the most widely used and were designed with performance and simplicity in mind. They support all Kafka features and include extras like balanced consumers, Zookeeper integration, and partition management.

Each Python library has its own pros and cons.

Simplicity of use can be important for beginner-level practitioners. This would be all about documentation, code readability, API design, and error handling. Pykafka is often considered the simplest, with a clear API, good documentation, and user-friendly error messages. Conversely, kafka-python and confluent-kafka-python tend to be less simple, featuring more complex APIs, less comprehensive documentation, and less clear error messages.

On the other hand, confluent-kafka-python and kafka-python provide the most comprehensive Kafka feature support, covering all Kafka capabilities and offering both high-level and low-level APIs. In contrast, pykafka has limited feature support and only provides a high-level API. Generally, confluent-kafka-python offers the best performance due to its foundation on the optimized librdkafka C library, while kafka-python and pykafka have lower performance because they are pure Python implementations with more overhead.

You can install kafka-python using pip: pip install kafka-python and our application file would look like this:

# app.py

from kafka import KafkaProducer, KafkaConsumer

import json
producer = KafkaProducer(bootstrap_servers=['brokerA:9092', 'brokerB:9092'],
                         key_serializer=lambda k: json.dumps(k).encode(),
                         value_serializer=lambda v: json.dumps(v).encode())

# Send data to Kafka topic 'my_topic'
future = producer.send('my_topic', key='hello', value='world')
try:
    result = future.get(timeout=10)
    print(result)
except Exception as e:
    print(e)

To consume data from a topic we would want to use poll method. It will return a list of topic partitions with our JSON records:

import json
consumer = KafkaConsumer(bootstrap_servers=['brokerA:9092', 'brokerB:9092'],
                         group_id='my-group',
                         topics=['my_topic'],
                         key_deserializer=lambda k: json.loads(k.decode()),
                         value_deserializer=lambda v: json.loads(v.decode()),
                         auto_offset_reset='earliest',
                         enable_auto_commit=True,
                         max_poll_records=10)

for record in consumer:
    print(record.key, record.value)

Kafka Best Practices

So how do we improve our Kafka application reliability and performance?

There is no universal solution for optimizing Kafka applications

The performance and reliability of our Kafka applications can be affected by various factors, including the following:

  • configuration and number of producers and consumers,
  • message size and frequency,
  • network bandwidth and latency,
  • hardware and software resources,
  • potential failure scenarios.

Typically, these are the general recommendations for fault-tolerant and performant design:

Use Replication: Replication creates multiple data copies across brokers, ensuring accessibility during failures and improving performance through load balancing. However, it can lead to increased disk usage and network traffic. Set your replication factor and the number of in-sync replicas (ISRs) based on your availability and consistency requirements.

Enable Batching: Batching groups multiple messages into a single request, reducing network overhead and increasing throughput. It also improves compression efficiency. Enable batching on both producers and consumers, adjusting the batch size and linger time to meet your latency and throughput goals.

Partition it wisely: The number of partitions for each topic affects scalability, parallelism, and performance. More partitions allow for greater producer and consumer capacity, distributing load effectively across brokers. However, excessive partitions can increase network traffic. Thus, select partition counts based on your expected throughput and latency.

Enable Compression: Compression reduces message size. It’s as simple as that. It can reduce disk space usage and send data transfer. While it incurs some CPU overhead for processing, selecting your compression algorithm and level is considered the best practice as well.

Error handling

Python Kafka clients offer a range of methods and options for managing errors and exceptions, including:

  • Error Handling Strategies: Strategies relate to retrying, ignoring, or failing fast on errors. Data engineers can determine how to address different error types while ensuring reliability and data consistency. For example, parameters such as max_in_flight_requests_per_connectio ,retriesand retry_backoff_ms can be used for the kafka-python producer. In the confluent-kafka-python consumer, we could use parameters like enable.auto.commit, auto.commit.interval.ms, and enable.auto.offset.store.
  • Exception Handling: The try, except, and finally blocks can be employed to handle various exception types and execute cleanup or recovery procedures. Standard Python exception handling. For example, there is a KafkaError class in the kafka-python and confluent-kafka-python clients. In the pykafka client we have PyKafkaException class.
  • Error Callbacks: Callbacks can be useful when we need to log errors, raise exceptions, or implement recovery actions. Callback functions can be assigned to the constructors of producers and consumers, triggering whenever an error or exception occurs at the client level. For instance, the error_cb parameter is available for both the kafka-python and confluent-kafka-python clients, while the on_error parameter is used in the pykafka client.

For example, one of the most common error-handling patterns is a dead-letter queue. Events from the source topic can diverge into two separate paths:

  1. The application successfully processes each event in the source topic and publishes them to the target topic (a sink) if everything is okay.
  2. Events that cannot be processed – such as those lacking the expected format or missing required attributes – are directed to the error topic.

Another useful error-handling pattern is to implement a retry queue. For instance, events could be routed to the retry topic if the item’s parameter is unavailable at that moment. For example, it can happen if this feature is being processed by another service and it wasn’t available at the time of request.

This recoverable condition should not be classified as an error

Instead, it should be periodically retried until the required conditions are satisfied. Introducing a retry topic allows for immediate processing of most events while deferring the handling of certain events until the necessary conditions are fulfilled.

Conclusion

Streaming techniques (tumbling and hopping windows, sessions, etc.) combined with Python is a versatile solution. Tools like Kafka help data engineers create streaming applications that can power real-time analytics, messaging, and event-driven systems. For example, Kafka’s key benefits include high throughput and scalability making it extremely powerful for its purpose. Kafka processes millions of messages per second with low latency by using batch writing and reading, reducing disk I/O and network overhead, and leveraging data compression. It can scale both horizontally by adding brokers and vertically by increasing partitions and replicas. It supports consumer groups to distribute workloads efficiently. Kafka ensures data availability even during failures through a leader-follower model, where followers replicate data. If a leader fails, a follower takes over, and Kafka’s commit log allows for durable and replayable data storage.

Choosing the right Python library would be the first thing to start. If performance metrics like throughput, latency, CPU usage, and memory consumption matter most to you then go for confluent-kafka-python. If you are looking for simplicity then pykafka or kafka-python would be a better solution.

Adequate error handling and designing our streaming application in a way that maximizes the latency, throughput and availability is another thing to consider. I hope that following the tips from this story will help you to configure your Kafka cluster the right way and enable the optimal settings for replication, partitioning, batching and compression.

Streaming in Data Engineering

Recommended read

[1] https://towardsdatascience.com/data-pipeline-design-patterns-100afa4b93e3

[2] https://towardsdatascience.com/python-for-data-engineers-f3d5db59b6dd

[3] https://towardsdatascience.com/streaming-in-data-engineering-2bb2b9b3b603

[4] https://towardsdatascience.com/streaming-in-data-engineering-2bb2b9b3b603

[5] https://medium.com/towards-data-science/big-data-file-formats-explained-275876dc1fc9

The post Mastering Data Streaming in Python appeared first on Towards Data Science.

]]>
Top Career Websites for Data Engineers https://towardsdatascience.com/top-career-websites-for-data-engineers-6b3ca8205b26/ Sun, 11 Aug 2024 14:23:23 +0000 https://towardsdatascience.com/top-career-websites-for-data-engineers-6b3ca8205b26/ How to find fantastic remote jobs and get hired

The post Top Career Websites for Data Engineers appeared first on Towards Data Science.

]]>
Data Engineering is super hot right now, which means there are tons of job opportunities out there. In this piece, I want to explore career websites specifically for data engineers. We’ll dive into the main aspects of the data engineering roles currently available and what you’ll need to break into this field by 2025. This will be helpful for entry-level IT folks and mid-level software engineers looking to make a career switch.


Why data engineering?

Data engineering is a dynamic and highly rewarding field, offering the opportunity to engage with all aspects of data, from APIs and data connectors to data platforms, business intelligence, and a wide array of tools available in the market.

The role is very secure, as the ongoing generation of data by businesses ensures a continuous demand for data engineers. This role is also closely intertwined with machine learning (ML), as you’ll be responsible for creating and deploying various data and ML pipelines. So it won’t be boring!

The field is far from mundane and offers lucrative compensation. The high pay reflects the complexity of building robust data platforms. It begins with gathering requirements and designing the system, demanding significant experience and advanced programming skills.

According to tech job research conducted by DICE Data engineering is the fastest-growing tech occupation:

Source: DICE
Source: DICE

Let’s see which platforms offer data engineer career opportunities.

Wellfound (ANGELLIST)

AngelList Talent (the jobs marketplace) is now [Wellfound](https://wellfound.com/jobs) and is accessible from the Wellfound website by clicking the "Jobs" tab at the top of your homepage. At Wellfound, a leading source for startup job listings, you can search for jobs and select the "Remote OK" filter.

This way, regardless of where you live, you can find a startup that suits you.

Have you always wanted to work for a tech startup but don’t live in a tech hub? That’s the advantage of remote work – your location doesn’t matter!

Image by author
Image by author

In a previous article, I discussed the essential skill set for data engineers and what it takes to enter this exciting field.

How to Become a Data Engineer

LinkedIn

LinkedIn is an absolute leader here as we can easily create job alerts and use their fantastic "Easy apply" feature. It also gives you insight into your connections who work in that company which is very useful. We can simply message the recruiter and ask about the opportunity we spotted.

To say more, I use LinkedIn as a blogging platform. It p[provides further marketing benefits for me as a data engineer. With my articles and reposts, I tell recruiters about my skills and expertise.

By using LinkedIn strategically, you can secure a higher salary without jeopardizing your current job.

Image by author
Image by author

Glassdoor

Glassdoor is a popular career portal that provides insights into companies, job opportunities, and workplace cultures. It’s known for offering a platform where current and former employees can anonymously share reviews about their employers, including information on company culture, management, salary, and benefits. This transparency helps job seekers make informed decisions about potential employers.

In addition to company reviews, Glassdoor provides job listings, salary information, interview questions, and insights into company benefits. Employers can also use the platform to post jobs and attract potential candidates. Glassdoor’s combination of job listings and insider company information makes it a valuable resource for both job seekers and employers.

Image by author
Image by author

Career Vault

Career Vault is a platform that aggregates job listings including remote ones from various sources, providing users with a comprehensive database of career opportunities across different industries and locations. It serves as a one-stop resource for data engineering job seekers to find and apply for positions.

Image by author
Image by author

Want to develop your data skills and be promoted? Websites like Career Vault and LinkedIn can help with that. I discussed the key competencies required for Lead-level roles here:

Lead Data Engineer Career Guide

TheMuse.com

The Muse offers a beautiful user interface and comprehensive details on companies and jobs, making the search for remote work feel effortless.

I noticed a very good selection of remote opportunities here.

Image by author
Image by author

It also makes it very easy to apply quickly. We just need to upload the CV.

Image by author
Image by author

Some job postings will allow you to apply on the company website only as well.

Remote OK

Remote OK is a remote job site that categorizes all job listings. You can filter listings based on specific criteria, such as recruiter-posted jobs, experience level (junior, senior, etc.), job type (sales, marketing, design, development, and more), and whether they are tech or non-tech roles. Even non-tech jobs, like analyst and marketing positions, can benefit from basic tech skills.

Image by author
Image by author

Working Nomads

Working Nomads is a website that curates and lists remote job opportunities for professionals seeking flexible work options. It offers a subscription-based newsletter that delivers a curated selection of remote jobs directly to users’ inboxes, allowing them to browse and apply for positions across various industries while maintaining a location-independent lifestyle.

Basically, it is a newsletter tailored for busy digital nomads. By signing up, you’ll receive a curated list of remote jobs delivered straight to your inbox. You can opt for daily or weekly emails, allowing you to continue your travels while the remote job hunt comes to you.

Image by author
Image by author

Remotive

Remotive is a fairly simple job board with lots of free remote listings. It features a comprehensive list of remote jobs categorized by fields like sales, support, product engineering, marketing, and more, making it super easy to find the exact type of job you want. Remotive also has a bi-monthly newsletter for anyone looking to work remotely.

Image by author
Image by author

Remote.co

Remote.co is a website dedicated to remote work, offering a wide range of resources and job listings for individuals seeking remote employment.

remote.co has a section with remote-only jobs too. I used it last year and got a few interviews lined up.

It features job opportunities across various industries and provides valuable insights on remote work practices, team management, and productivity. Remote.co is designed to help both job seekers and employers navigate the remote work landscape effectively.

Image by author
Image by author

It is also a good idea to check all international remote openings.

Check companies’ websites for any remote jobs that might be of interest to you. Sometimes companies have remote roles on their websites that are not listed on this job board.

Remote International Jobs – Work From Home

We work remotely

Featuring a straightforward design, this virtual job board provides a broad selection of work-from-home positions, from customer service to web design and programming. We Work Remotely attracts over 100,000 users monthly, making it a reliable resource for swiftly finding remote job opportunities.

Image by author
Image by author

Skip the drive

True to its name, Skip the Drive offers a convenient resources tab and a dependable list of remote jobs.

By utilizing the resources provided, you can easily trade in your stressful morning commute for a relaxed environment.

It is really focused on remote job listings, offering users a curated selection of work-from-home opportunities across various industries. The platform simplifies the job search process by allowing users to filter listings based on their preferences, and it also provides resources to help individuals transition to and thrive in remote work. Skip the Drive is designed to help users avoid the daily commute by finding flexible, location-independent jobs.

Image by author
Image by author

Power To Fly

Power To Fly is an online platform aimed at career growth for diverse professionals, featuring a job board, virtual job fairs, and networking events. This one has a good selection of jobs available remotely. By completing your profile, you can connect with recruiters and hiring managers from various industries worldwide.

I used this one as well and got a few opportunities from there.

Image by author
Image by author

Landing.jobs

Although Landing Jobs doesn’t have a large selection of remote tech job opportunities, they meticulously curate their listings. A unique feature of their site is the ability to filter your search for jobs that are fully remote, partially remote, or remote but within commuting distance.

They focus on tech careers, connecting professionals with opportunities in the tech industry across Europe and beyond. The platform curates job listings, helping users find roles that match their skills and career goals. Landing.jobs also offers resources for career development and provides a supportive community for tech talent. While it primarily features on-site positions, it includes options for fully remote, partially remote, and locally remote jobs, catering to a variety of work preferences.

Image by author
Image by author

Job boards that MIGHT be useful

There are a lot of other job boards like Pangian, Intch and Flexjobs that lack free options. In my opinion, these websites still might be useful to anyone looking for a new role in IT.

For example, Pangian has a nice selection of worldwide remote roles at https://pangian.com/job-travel-remote/ but it is available for a fee only ($14.99).

Personally, I didn’t find it very usefull.

Image by author
Image by author

Conclusion

In this article, I’ve rounded up some top websites for finding data-related career opportunities, extending beyond just data engineering roles. Data engineering often overlaps with positions like BI developer and Machine Learning architect, giving job seekers more flexibility in finding the right fit.

Data engineering is a hot trend right now, with LinkedIn job postings surging. Previously, I discussed the essential skillset for aspiring data engineers and what it takes to break into this dynamic field.

How to Become a Data Engineer

To land the ideal data engineering role, I suggest beginning with job boards that offer free options. LinkedIn, Glassdoor, and Indeed are among the best choices. Review the required skills, customize your CV to match, and apply with confidence.

Lead Data Engineer Career Guide

Recommended read

[1] https://towardsdatascience.com/lead-data-engineer-career-guide-699e806111b4

[2] https://towardsdatascience.com/how-to-become-a-data-engineer-c0319cb226c2

[3] https://towardsdatascience.com/data-platform-architecture-types-f255ac6e0b7

[4] https://towardsdatascience.com/data-pipeline-design-patterns-100afa4b93e3

[5] https://towardsdatascience.com/python-for-data-engineers-f3d5db59b6dd

The post Top Career Websites for Data Engineers appeared first on Towards Data Science.

]]>
The Top 10 Data Lifecycle Problems that Data Engineering Solves https://towardsdatascience.com/the-top-10-data-lifecycle-problems-that-data-engineering-solves-7735781959d5/ Fri, 02 Aug 2024 14:13:25 +0000 https://towardsdatascience.com/the-top-10-data-lifecycle-problems-that-data-engineering-solves-7735781959d5/ Clear strategies for addressing key pain points

The post The Top 10 Data Lifecycle Problems that Data Engineering Solves appeared first on Towards Data Science.

]]>
In this article, I want to tackle some of the biggest challenges data engineers face when working with pipelines throughout the data lifecycle. Understanding how to manage the data lifecycle is key in our constantly changing field. As a data engineer, I often deal with huge volumes of different types of data, including unstructured data, coming from various sources like databases, data lakes, and third-party APIs. These factors can make managing critical data really tough. We’ll cover all the important stages of data processing, from collection and analysis to storage and destruction, and I’ll share the best practices I use every day.


Data lifecycle management

Data lifecycle management enables businesses with a strategic and regulated approach to organising and managing data from source to destination or its final state such as archiving or destruction.

Essentially, this is a set of policies to maximise the value of data throughout its useful life, from data creation to destruction where it becomes obsolete or needs to be destroyed due to compliance regulations.

The typical data lifecycle follows a well-known ETL pattern.

  1. Data Sources: Data is being created somewhere. This is a data creation stage. It can be external and internal data sources – APIs (CRM systems, Accounting software, etc.), Relational databases (MySQL, Postgres), cloud storage and many others we might want to create ourselves.
  2. Data Collection: Extract ("E" in ETL). We would want to extract data from the data source and do something with it – either load "as is" or transform first and then load.
  3. Data Ingestion: ( "T" and "L" in ETL). We are talking about ELT / ETL services that can transform our data, handle different file formats and orchestrate data ingestion into a data warehouse solution (DWH) or data lake.
  4. Data Storage: it all depends on our data pipeline design pattern [2] and can be a data lake, DWH or OLAP database. It would typically perform a storage function, i.e., a landing area for data files and be a proxy stage of many other pipelines.
  5. Data Governance: a set of strategic policies to make our data always available to end users, secure and accurate.
  6. Business Intelligence and Reporting – creating reports is obviously another challenging task. BI developers usually do this.
  7. Data Orchestration: Finally we would want to orchestrate all this madness effectively.

According to this, our data platform infrastructure would often look very complex:

Simplified Data platform blueprint. Image by author.
Simplified Data platform blueprint. Image by author.

Data creation stage

Pain point 1: Data source management and data observability

Data lineage looks okay but where is this data coming from?

Managing multiple data sources can become a significant challenge when orchestrating data extraction from numerous origins. Consider the complexity of integrating data from a dozen APIs, relational databases, and native platforms like Google Analytics 4 via Firebase into your data warehouse. Effective management of these diverse inputs is crucial, and we should focus on the meticulous declaration of these database entities.

It’s all about data sources and their declarations.

Solution:

  • Use Git and metadata to declare sources.
  • Add data source descriptions, data origins and owners.
  • Add sources to your data lineage graphs so everyone can find the information about them.
  • Create a single source of truth for all data pipelines using a modern data modelling tool.
  • Deploy data observability tests to understand data health and the state of your data better.

Advanced tools such as Dataform (by Google) and DBT offer comprehensive features to streamline and optimize the management of these data sources, ensuring efficient and organized data integration. Consider this snippet below:

sources:
  - name: sales_data
    description: Data source description and origins of data.
    owner: data source owner
    database: analytics
    schema: raw_sales
    tables:
      - name: transactions
      - name: orders
          columns:
            - name: order_id
              tests:
                - unique
                - not_null
            - name: status
              tests:
                - accepted_values:
                    values: ['placed', 'shipped', 'completed', 'returned']
            - name: customer_id
      - name: customers

Data collection and ingestion

Indeed, often these two steps are combined as they are being performed by the same data service. It can be anything that can perform efficient data manipulation, i.e. the PySpark application built in Databricks or AWS Glue service. It also can be a tiny Cloud Function invoked by message queue events (Rabbit, SQS, Pub/Sub, Kafka, etc.).

Pain point 2:

Lack of functionality in out-of-the-box (OOTB) solutions available in the market.

In data collection or extraction we can always work with managed tools like Fivetran or Stitch but sometimes they don’t have the capabilities you need.

For example, consider a data pipeline that generates invoices using sales data from DWH. These invoices must be uploaded into your accounting system. This is not an OOTB integration and it doesn’t exist. You would want to build your own data service and orchestrate invoice uploads as needed. You would want to keep it warm and refresh access tokens constantly to be able to authenticate the service successfully.

Solution:

If you are a data engineer then you are the solution.

Why would someone need to pay for any third-party service if they have a data engineer with relevant programming skills? Check out these ETL techniques that I use daily to design and deploy durable data pipelines.

Python for Data Engineers

Pain point 3:

Third-party tools are too expensive

Whether it’s a Fivetran, Stitch or anything else their pricing models are usually row-based for data we need to extract. This would typically result in a thousand-dollar bill monthly.

Solution:

  • Build robust, durable and cost-effective data pipelines which are unit-tested for side effects.
  • Make sure your code is idempotent, reusable and easy to maintain.
  • Provision resources effectively with infrastructure as code.

Building Durable Data Pipelines

Pain point 4: Scalability, **** Insight Delays and Operational Overheads

Choosing the right architecture for your data pipelines is the key to success

Designing robust data pipelines is a complex task that demands thorough requirements analysis. Focus on business needs to determine the "what" and "why" of data processing. The "how" is defined through functional requirements, developed in collaboration with technical teams. For instance, if business units require real-time analytics, data freshness will dictate whether to use batch or streaming processing.

A deep understanding of business logic is essential for selecting the appropriate tools. Consider a simple pipeline that transfers data from the data warehouse business or mart model into a Google spreadsheet – this is straightforward for any data engineer.

However, when dealing with highly sensitive data in a transaction monitoring compliance pipeline, more sophisticated solutions are necessary to meet regulatory requirements for Personally Identifiable Information (PII) and effectively manage data input/output features.

Solution:

  • Focus on high-level business requirements in the first plan, be project-focused and understand the limitations and deadlines.
  • Collect all functional requirements to fulfil what business needs.
  • Ensure you understand the skillset of your end users and main stakeholders – this will define the data tools.
  • Finally, choose the data pipeline design pattern that fits well into this framework.

Data pipeline design patterns

Data storage

In many companies data storage is the key data platform component used to stage, preserve, categorise and archive the data we extract from other systems and use in analytics.

Data pipeline example. Image by author.
Data pipeline example. Image by author.

Pain point 5:

Storage costs, schema drifts, file formats and partitioning layout – how to optimise and which ones to use?

Even a data warehouse solution has a storage component which translates into associated costs. It’s a known fact that modern DWH tools have decoupled computing and storage resources. Even though storage costs are relatively low they might accumulate over time. Consider a super large dataset with activity schema with terabytes of user engagement events generated daily. Now imagine this being loaded into your BigQuery data warehouse. The good thing is that three months of data are optimised to near cold line storage class and are fairly cheap but everything else after that point goes into standard storage class and is way more expensive.

Datalake architecture. Image by author.
Datalake architecture. Image by author.

ORC vs Parquet vs AVRO. No JSON?

In the application world, a huge amount of data is gathered and kept in a JSON format. So why not store it in JSON? Simply because JSON doesn’t carry any schema information on board, dealing with it in Big Data, Hadoop tools might be slow. Basically, it’s a no-go for Big Data processing frameworks. This is the main reason Parquet and ORC formats were created.

Solution:

  • Storing data in an optimized storage class is essential for efficiency. Utilizing cloud storage provides the flexibility to process data across various platforms within the data lake using Spark, AWS Glue, or Databricks. It facilitates reloading data into the data warehouse if necessary.
  • Parquet and ORC store data in columns and offer a compression ratio higher than AVRO. If your data structure might change over time and you need schema evolution support then **choose AVRO**. ORC is usually considered the best file format option for HIVE, whereas Parquet is considered the optimal solution across the Hadoop ecosystem.
  • Create a cloud storage bucket for an archive with HIVE partitioning layout using one of the major cloud providers (Google, AWS, or Azure).
  • Establish a routine to monitor and offload obsolete data, archiving it to reduce costs.

Consider this code below. It shows how easy it is to unload the data and then load it back:

-- export data from BigQuery to GCP
EXPORT DATA
  OPTIONS (
    uri = 'gs://events-export/public-project/events.json',
    format = 'Parquet', -- or CSV, JSON, Parquet
    overwrite = true
)
AS (
SELECT *
FROM `firebase-public-project.analytics_153293282.events_20181001`
);

Load data back if needed:

LOAD DATA INTO source.json_external_test
FROM FILES(
  format='JSON',
  uris = ['gs://events-export/public-project/*']
)

When your Stack is a Lake House

Pain point 6:

Security, major access controls and data retention policies

If we are dealing with sensitive company data then it must be protected. We would want to ensure that only authorized users have access and that the data we have can be considered as a single source of truth for any data dataset and model we have.

Solution:

  • Use VPC to restrict access. Virtual private clouds are a good practice to ensure your data is isolated from the outer world.
  • Apply versioning to your data. This aims to ensure data consistency in case of any potential changes to our data from any user or service that might be working with data lake data.
  • Back up data regularly and introduce data loss prevention policies.

Consider this Terraform example below. It will enable versioning on the AWS S3 bucket. It’s a simple thing but it helps to keep your data lineage clean and protected ensuring that the data lake is a single source of truth, including any potential database replication pipelines where data can be easily manipulated, updated or erased.

resource "aws_s3_bucket" "example" {
  bucket = "example-bucket"
}

resource "aws_s3_bucket_acl" "example" {
  bucket = aws_s3_bucket.example.id
  acl    = "private"
}

resource "aws_s3_bucket_versioning" "versioning_example" {
  bucket = aws_s3_bucket.example.id
  versioning_configuration {
    status = "Enabled"
  }
}

Data governance

Data Governance is all about making data accurate, secure and available to main stakeholders.

Pain point 7: Granular access controls for major stakeholders and data availability.

Sometimes it is a good idea to place a DWH solution into a separate data processing stage. Indeed, data management, role-based access controls and robust data governance features – are all that make these tools very practical in modern data stacks [3]. I discussed it here:

Modern Data Warehousing

Solution:

  • If not using a data warehouse apply identity-based access controls for data lake datasets.
  • Apply row/column-based access controls for your DWH data where needed.
  • Apply object tagging and tag-based masking policies.
  • Implement alerting using access history. If you are a DBA then you must have it under control.
  • Apply regular data quality checks. They can be scheduled or run in CI for every pull request.

For example, DBT has a set of generic and singular tests that we might want to run on data regularly. This is very easy to implement like so:

version: 2

models:
  - name: orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: ['placed', 'shipped', 'completed', 'returned']
      - name: customer_id
        tests:
          - relationships:
              to: ref('customers')
              field: id

As an example, in one of my previous stories I wrote how to create email alerts in BigQuery using row conditions for data quality:

Automated emails and data quality checks for your data

Pain point 8:

Data quality and testing

This is what matters in data development. You would want to use the "Do Not Repeat Yourself" (DRY) modular approach and create data models that are templated, easy to maintain, reuse and unit-tested.

  • Introduce separate data environments and separate production, development and testing areas in both DWH and data lake solutions.
  • Use data modelling and data lineage graphs for your data pipelines.
  • Run unit tests for every data transformation you have.

Splitting data environments makes perfect sense as you wouldn’t want staging data to mix up with your production one. In this light, it would be best to design your data platform to have data environment layers split like in this example below:

DATABASE_NAME   SCHEMA_NAME     
-------------------------------
RAW_DEV         SERVER_DB_1     -- mocked data
RAW_DEV         SERVER_DB_2     -- mocked data
RAW_DEV         EVENTS          -- mocked data
RAW_PROD        SERVER_DB_1     -- real production data from pipelines
RAW_PROD        SERVER_DB_2     -- real production data from pipelines
RAW_PROD        EVENTS          -- real production data from pipelines
...                             
BASE_PROD       EVENTS          -- enriched data
BASE_DEV        EVENTS          -- enriched data
...                             
ANALYTICS_PROD       REPORTING  -- materialised queries and aggregates
ANALYTICS_DEV        REPORTING  
ANALYTICS_PROD       AD_HOC     -- ad-hoc queries and views

Database Data Transformation for Data Engineers

SQL unit tests are very useful and that’s what you do if you want to make sure your data transformation logic is persisted and remains the same unless you want to change it intentionally.

I previously wrote about it here:

Advanced Data Modelling

Think of data governance as a set of policies and tools designed to guide key data stakeholders, especially management, on the proper use of data. Some data warehouse solutions, like Snowflake, offer object categorization for potentially personal or sensitive data. This feature can help ensure compliance with privacy regulations when necessary.

Business Intelligence and Reporting

This is the most interesting one as I keep seeing BI tools becoming increasingly injected with features that are typically implemented in previous data lifecycle stages.

Pain point 9:

My BI tool is too expensive

Indeed, modern reporting solutions are expensive, especially if they are meant for enterprise-level companies. What should we do if we are an SME?

Well, I would say one single data engineer can replace numerous features in modern BI tools.

Solution:

  • Optimise data modelling: BI with data modelling capabilities seems like an overkill in the DBT era. All SQL data transformations should be implemented before they reach your BI solution. In this case, paying for two instances in (development and production) Looker doesn’t make much sense to me and we can save a lot of money.
  • BI and dedicated OLAP cubes: Why would you need something that would be processing your data outside your data warehouse? It doesn’t make sense to pay the double price using a BI tool with this feature.
  • Caching and data freshness: Even free community-based BI solutions like Looker Studio have this functionality. Data freshness tests can be implemented using either row conditions or DBT generic tests. A data engineer would schedule analytics data model update materialization and implement a full data refresh if needed.
  • Semantic layer: This is a great example. Think of it as a model with dimensions, metrics or measures for each dimension, and clear instructions on how they all connect. That’s why I like Looker; it takes data modelling to the next level. But what if I told you that you can get this for free? Try the DBT semantic layer with dbt-metricflow and you’ll understand that it is not that difficult to do it yourself.
  • Git and source control: This is my favourite one. At the moment there a just a few BI tools with this feature. The most popular ones are Looker and Sisense but what if I tell you that we can go open-source and do it ourselves? Try Apache Superset, create virtual datasets, source control them in your repository and, finally, deploy them using API.

Data pipeline orchestration

Now we need to orchestrate all this modern data stack and fancy pipelines we built.

Pain point 10:

Orchestration

My previous research highlights that an emphasis on heterogeneity, support for multiple programming languages, effective use of metadata, and the adoption of data mesh architecture are essential trends in data pipeline orchestration for building modern, robust, and scalable data platforms.

How Data Engineering Evolved since 2014

Solution:

  • If you rely on the legacy architecture built with Apache Airflow of AWS Step Functions then I would recommend using that stack. However, if you’re looking for a more robust and heterogeneous solution then Mage orchestrator might be a better fit.
  • If you need to orchestrate a lot of machine learning pipelines I would highly recommend trying Flyte.

The support for multiple programming languages is expected to be a significant trend in the coming years. For example, Temporal accommodates various languages and runtimes, whereas Airflow primarily emphasizes Python.

Conclusion

Third-party tools for data extraction often fail to meet specific needs, and their pricing models – typically based on the volume of data extracted – can be prohibitively expensive. As a data engineer, it is important to know how to build and deploy durable, cost-effective data pipelines.

Advanced ETL Techniques for Beginners

Data storage presents another challenge that may not be immediately apparent. Utilizing more efficient storage classes is essential. Cloud storage offers the flexibility to process data in various environments within the data lake, such as Spark, AWS Glue, or Databricks, and facilitates reloading data into the data warehouse when necessary. Implementing versioning and protection for your data lake buckets is a best practice, as is establishing data retention policies that specify which data should be archived and for how long. Ensure archived data is stored in standard big data formats to maintain accessibility as technology evolves.

Big Data File Formats, Explained

Data governance should be viewed as a comprehensive set of policies and tools guiding key data stakeholders, particularly management, in the proper usage of data. This strategy ensures high-quality data is available to all involved parties.

Business intelligence (BI) tools with data modelling capabilities may seem excessive in the DBT era. All SQL data transformations should be completed before they reach your BI solution, aligning with the principle that everything should be unit-tested prior to deployment. This approach exemplifies data governance and data modelling at their finest.

Recommended read:

[1] https://towardsdatascience.com/python-for-data-engineers-f3d5db59b6dd

[2] https://towardsdatascience.com/building-durable-data-pipelines-cf3cbf68a7e6

[3] https://towardsdatascience.com/data-pipeline-design-patterns-100afa4b93e3

[4] https://pub.towardsai.net/when-your-stack-is-a-lake-house-6bcb17f9bff6

[5] https://towardsdatascience.com/modern-data-warehousing-2b1b0486ce4a

[6] https://towardsdatascience.com/automated-emails-and-data-quality-checks-for-your-data-1de86ed47cf0

[7] https://towardsdatascience.com/how-data-engineering-evolved-since-2014-9cc85f37fea6

[8] https://towardsdatascience.com/advanced-data-modelling-1e496578bc91

[10]https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket_versioning

[11] https://docs.getdbt.com/docs/build/data-tests

[12] https://towardsdatascience.com/database-data-transformation-for-data-engineers-6404ed8e6000

The post The Top 10 Data Lifecycle Problems that Data Engineering Solves appeared first on Towards Data Science.

]]>
How Data Engineering Evolved since 2014 https://towardsdatascience.com/how-data-engineering-evolved-since-2014-9cc85f37fea6/ Sat, 27 Jul 2024 15:32:10 +0000 https://towardsdatascience.com/how-data-engineering-evolved-since-2014-9cc85f37fea6/ Top trends to help your data pipelines scale with ease

The post How Data Engineering Evolved since 2014 appeared first on Towards Data Science.

]]>
In this discussion, I aim to explore the evolving trends in data orchestration and data modelling, highlighting the advancements in tools and their core benefits for data engineers. While Airflow has been the dominant player since 2014, the Data Engineering landscape has significantly transformed, now addressing more sophisticated use cases and requirements, including support for multiple programming languages, integrations, and enhanced scalability. I will examine contemporary and perhaps unconventional tools that streamline my data engineering processes, enabling me to effortlessly create, manage, and orchestrate robust, durable, and scalable data pipelines.


During the last decade we witnessed a "Cambrian explosion" of various ETL frameworks for data extraction, transformation and orchestration. It’s not a surprise that many of them are open-source and are Python-based.

The most popular ones:

  • Airflow, 2014
  • Luigi, 2014
  • Prefect,2018
  • Temporal, 2019
  • Flyte, 2020
  • Dagster, 2020
  • Mage, 2021
  • Orchestra, 2023

Apache Airflow, 2014

Apache Airflow, created by Airbnb in 2014, went open-source in 2016 and joined the Apache Software Foundation in 2018. It’s a platform where you can programmatically create, schedule, and monitor workflows. They were the first to introduce the DAG concept. Using Python, engineers define DAGs (Directed Acyclic Graphs) to organize and visualize tasks. This is now used in almost every orchestrator tool. For example "Prefect DAG" looks almost identical. The interface is simple and allows to monitor data pipelines with ease:

Image by author
Image by author

DAGs are easy to understand and are written in Python:

@dag(default_args=default_args, tags=['etl'])
def etl_pipeline():

    @task()
    def extract():
        return json.loads(data_string)    
    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        return {"total_count": len(order_data_dict)}    
    @task()
    def load(total_order_value: float):
        print(f"Total order value is: {total_count}")    

    extracted = extract()
    transformed = transform(extracted)
    load(transformed["total_count"])

In one of my previous stories I wrote about a more advanced Airflow example:

How to Become a Data Engineer

Advanatges:

  • Simple and Robust UI: Detailed views of DAGs, retries, task durations and failures – everything looks so familiar after all these years.
  • Community Support: Open-source project after all these years has a massive community of followers. It is very popular and has lots of plugins for various data sources and destinations. Combined with regular updates it makes it an obvious choice for many data devs.
  • Python: Python equals custom and scripty. Everything looks so flexible here. In one of my stories many years ago I managed to customize the ML connector and it was very easy to do [1].

Training your ML model using Google AI Platform and Custom Environment containers

  • Jinja and Parameters: This feature must be familiar to many DBT users allowing us to create dynamic pipelines in a similar way we create templated models in DBT.

Luigi, 2014

Another orchestrator for our ETL pipelines. Developed by Spotify to handle massive data processing workloads, Luigi features a command-line interface and excellent visualization capabilities. However, even simple ETL pipelines require some Python programming skills. Nonetheless, Luigi works well in many situations.

Advantages

  • HDFS Support: Luigi gives teams a handy toolbox with task templates, including file system abstractions for HDFS and local files. This helps keep operations atomic and data pipelines consistent, making everything run smoothly.
  • Easy to use: Strong community of followers and contributors. The project is maintained.
  • Nice UI: Web UI allows to search, filter and prioritize DAGs which makes it easy to work with pipeline dependencies.
  • Robust infrastructure: Luidgi supports complex pipelines with multiple tools, A/B tests, recommendations and external reports.

From my experience, it’s excellent for strict and straightforward pipelines, though implementing complex branching logic can be challenging. Even Spotify themselves moved from it in 2019. They said it was not easy to maintain and that they needed multiple language support [2].

Why We Switched Our Data Orchestration Service – Spotify Engineering

They moved to Flyte (more below).

Prefect, 2018

Established in 2018, Prefect aims to overcome some of the challenges engineers face with other orchestration tools like Airflow. Prefect employs a much simpler system than Airflow. Instead of using a DAG structure, you can easily convert your code into a "Prefect DAG" using Python decorators.

It offers a hybrid execution model, allowing tasks to be executed locally or on any cloud platform.

Prefect promotes itself as "the new standard in dataflow automation."

Advantages:

  1. Open-source: Flexibility and multiple deployment options.
  2. Lightweight: With just one command we can set up our development environment for orchestration.
  3. Dynamic Mapping: Enables dynamic task execution based on another task’s output.
  4. Python-first: With Python-first experience Prefect provides cleaner abstractions, similar to Airflow.
  5. Monitoring: Monitor key operational metrics and run failures using Prefect’s Cloud UI. It simply looks intuitive and nice.
  6. Alerts and Notifications: Discord, Email, Slack and many more channels for our pipeline alerts.
Image by author
Image by author

The open-source version provides everything we typically use in Airflow including features like scheduling, native secrets, retries, concurrency management, scaling functionality and great logging.

With Prefect Cloud we get a few extras, e.g. Automations, Webhooks, Event feeds and Workspaces and Organisations tailored for the hosted environment we might need.

Key components would be tasks and flows we build. Each step of the pipeline we can describe with Prefect’s @task decorator – a base unit of a flow. For each task, we can supply settings using arguments. It can be anything – tags, descriptions, retries, cache settings, etc. Consider this @task below.

@task
def extract_data(source, pipe):
    ...
    return result

@task
def load_data_to_dwh(data, database: bigquery):
    ...
 return table_name

@task
def run_data_checks(dataset, row_conditions: qa_dataset):
    ...

In the example below, we create a Flow using the @flow decorator. The flow will execute all tasks in order, generating inputs, and outputs and passing them from one task to another.

@flow
def etl_workflow():
    s3_data = extract_data(google_spreadsheet, pipe_object_with_settings)
    dataset = load_data_to_dwh(s3_data, database)
    run_data_checks(table_name, rules)

if __name__ == "__main__":
    etl_workflow()

Prefect uses Work Pools to manage work allocation efficiently and prioritize tasks in a required environment (test, dev, prod) for optimal performance and automated testing. We can create Workers (agents) locally or in the cloud.

Prefect can be installed with pip:

pip install -U "prefect==2.17.1"
# or get the latest version from prefect
pip install -U git+https://github.com/PrefectHQ/prefect

Here is the simple script to extract data from NASA API:


# ./asteroids.py
import requests
API_KEY="fsMlsu69Y7KdMNB4P2m9sqIpw5TGuF9IuYkhURzW"
ASTEROIDS_API_URL="https://api.nasa.gov/neo/rest/v1/feed"

def get_asteroids_data():
    print('Fetching data from NASA Asteroids API...')
    session = requests.Session()
    url=ASTEROIDS_API_URL
    apiKey=API_KEY
    requestParams = {
        'api_key': apiKey,
        'start_date': '2023-04-20',
        'end_date': '2023-04-21'
    }
    response = requests.get(url, params = requestParams)
    print(response.status_code)
    near_earth_objects = (response.json())['near_earth_objects']
    return near_earth_objects

if __name__ == "__main__":
    get_asteroids_data()

We can turn it into a flow like so:

# my_nasa_pipeline.py
import requests   # an HTTP client library and dependency of Prefect
from prefect import flow, task

API_KEY = "fsMlsu69Y7KdMNB4P2m9sqIpw5TGuF9IuYkhURzW"
ASTEROIDS_API_URL = "https://api.nasa.gov/neo/rest/v1/feed"

@task(retries=2)
def get_asteroids_data(api_key: str, url: str):
    """Get asteroids close to Earth for specific datess
    - will retry twice after failing"""
    print('Fetching data from NASA Asteroids API...')
    session = requests.Session()
    url = ASTEROIDS_API_URL
    apiKey = API_KEY
    requestParams = {
        'api_key': apiKey,
        'start_date': '2023-04-20',
        'end_date': '2023-04-21'
    }
    response = session.get(url, params=requestParams)
    print(response.status_code)
    near_earth_objects = (response.json())['near_earth_objects']
    return near_earth_objects

@task
def save_to_s3(data):
    """Save data to S3 storage"""
    # Do some ETL here
    result = print(data)
    return result

@flow(log_prints=True)
def asteroids_info(date: str = "2023-04-21"):
    """
    Given a date, saves data to S3 storage
    """
    asteroids_data = get_asteroids_data(API_KEY, ASTEROIDS_API_URL)
    print(f"Close to Eart asteroids: {asteroids_data}")

    s3_location = save_to_s3(asteroids_data)
    print(f"Saved to: {s3_location}")

if __name__ == "__main__":
    asteroids_info("2023-04-21")

Run the flow:

python my_nasa_pipeline.py
Image by author
Image by author

Creating a Prefect server

Now we would want to create a deployment to schedule our flow run:

# create_deployment.py
from prefect import flow

if __name__ == "__main__":
    flow.from_source(
        # source="https://github.com/your_repo/prefect_nasa.git",
        source="./",
        entrypoint="my_nasa_pipeline.py:asteroids_info",
    ).deploy(
        name="my-first-deployment",
        work_pool_name="test-pool",
        cron="0 1 * * *",
    )

In your command line run:

python create_deployment.py
# Run the workflow manually
# prefect deployment run 'repo-info/my-first-deployment'
Image by author
Image by author

In order to run our scheduled flow we would want to create a worker pool:

prefect work-pool create test-pool
prefect worker start --pool 'test-pool'
prefect server start
Image by author
Image by author
Image by author
Image by author

Alternatively, we can use this command and use the prompt to create a deployment:


prefect deploy ./my_nasa_pipeline.py:asteroids_info -n my-first-deployment
Image by author
Image by author

Prefect integrations

Prefect becomes even more powerful with its robust set of integrations a full list of which can be found here [3].

For example, we can use Prefect with DBT which makes our data modelling even more powerful. In your command line run:

pip install -U prefect-dbt
# register blocks to start using tghem in Prefect
prefect block register -m  prefect_dbt

And now we can use Prefect with dbt-core :

from prefect import flow
from prefect_dbt.cli.commands import DbtCoreOperation

@flow
def trigger_dbt_flow() -> str:
    result = DbtCoreOperation(
        commands=["pwd", "dbt debug", "dbt run"],
        project_dir="PROJECT-DIRECTORY-PLACEHOLDER",
        profiles_dir="PROFILES-DIRECTORY-PLACEHOLDER"
    ).run()
    return result

if __name__ == "__main__":
    trigger_dbt_flow()

In one of my stories I wrote how to configure DBT for convenient execution from any environment:

Database Data Transformation for Data Engineers

Temporal, 2019

Temporal supports workflow triggering through APIs and allows for multiple concurrent workflow executions.

Advantages:

  • Fault-tolerance and Retries: Fault tolerance with automatic retries for any tasks that fail. The ability to manage long-running workflows that may last for several days or even months.
  • Scalability: Capabilities to accommodate high-throughput workloads.
  • Enhanced monitoring: Insight into workflow execution and historical data.
  • Support for temporal queries and event-driven workflows.
  • Heterogeneity: Multiple programming language support.

Temporal can orchestrate intricate data processing workflows, managing failures effectively while maintaining data consistency and reliability throughout the entire process. Its capability to manage states across distributed systems allows Temporal to enhance resource allocation.

Temporal accommodates continuously running workflows, enabling the modelling of the lifecycles of various entities. Temporal workflows are inherently dynamic, and capable of executing multi-step core business logic. They can send or await signals from external processes, facilitating notifications to humans or initiating processes for intervention.

Flyte, 2020

Flyte is an open-source orchestration tool for managing Machine Learning and AI workflows, built on Kubernetes. With data being crucial for businesses, running large-scale compute jobs is essential but operationally challenging. Scaling, monitoring, and managing clusters can burden product teams, slowing innovation. These workflows also have complex data dependencies, making collaboration difficult without platform abstraction.

Image by author
Image by author

Flyte aims to boost development speed for machine learning and data processing by simplifying these tasks. It ensures reliable, scalable compute, letting teams focus on business logic. Additionally, it promotes sharing and reuse across teams, solving problems once and enhancing collaboration as data and machine learning roles merge.

Advantages

  • End-to-end Data Lineage: This allows you to monitor the health of your data and ML workflows throughout each execution stage. Easily analyze data flows to pinpoint the source of any errors.
  • Parameters and Caching: Designed with ML in mind it allows dynamic pipelines and use of cached pre-computed artifacts. For example, during hyperparameter optimization, you can easily apply different parameters for each run. If a task has already been computed in a previous execution, Flyte will efficiently use the cached output, saving time and resources.
  • Multi-tenant and Serverless: Flyte eliminates the need to manage infrastructure, letting you focus on business challenges instead of machines. As a multi-tenant service, it provides an isolated repository for your work, allowing you to deploy and scale independently. Your code is versioned, and containerized with its dependencies, and every execution is reproducible.
  • Extensible: Flyte tasks can be highly complex. It can be a simple ETL job or a call to remote Hive cluster or distributed Spark executions. The best solution might be hosted elsewhere, so task extensibility allows you to integrate external solutions into Flyte and your infrastructure.
  • Heterogeneous and multi-language support: Data pipelines can be complex. Each step can be written in a different language and use various frameworks. One step could use Spark to prepare data, while the next step trains a deep learning model.

Dagster, 2020

Dagster is an open-source data orchestrator that facilitates the development, management, and monitoring of data pipelines. It supports job monitoring, debugging, data asset inspection, and backfill execution.

Essentially, Dagster serves as a framework for building data pipelines, acting as the abstraction layer within your data ecosystem.

Heterogeneous Everything

Advantages:

  • Heterogeneous: Dagster offers a comprehensive interface that enables users to build, test, deploy, run, and refine their data pipelines all in one place. This unified approach streamlines the workflow for data engineers, making it easier to manage the entire lifecycle of data processing.
  • Improved Abstractions: It employs declarative programming through software-defined assets (SDAs), which enhances abstraction and simplifies pipeline design. Users benefit from shared, reusable, and configurable components that facilitate efficient data processing. Additionally, Dagster includes declarative scheduling features that allow users to implement freshness policies, ensuring data is up-to-date.
  • Testing Capabilities: To ensure the integrity of data, Dagster incorporates quality checks using types that define the acceptable values for the inputs and outputs of your pipelines. It also supports asset versioning for both code and data, along with caching mechanisms that enhance performance.
  • Great Monitoring Features: Dagster comes equipped with a built-in observability dashboard, providing real-time monitoring of pipeline performance and health. Its built-in testability feature allows users to verify the functionality of their pipelines seamlessly.
  • Robust Integrations: Dagster offers robust integrations with various tools in your data ecosystem, including Airflow, dbt, Databricks, Snowflake, Fivetran, Great Expectations, Spark, and Pandas.

Mage, 2021

It looks like Mage was created with a focus on speed and scalability, tailored specifically for containerized environments such as Kubernetes. Created in 2021, Mage was developed to meet the increasing need for real-time data processing in microservice architectures.

With Mage, we can use multiple programming languages such as R, Python and SQL combined with robust template functionality. Mage definitely introduced a couple of new things in the orchestration space.

Mage's DBT support. Image by author.
Mage’s DBT support. Image by author.

Advantages:

  • DRY Components and DBT support: Built with a focus on a modular design, for example, we can integrate different repos, packages and components with ease. Building, running, and managing your dbt models with ease. Very useful for Data Mesh platforms.
  • Cost Effectiveness: Claims to optimize resource provisioning and consumption with obvious cost-effective benefits included.
  • Native Kubernetes: Easy deployments for modern data platform architectures.
  • Real-time data pipelines: Works beautifully with real-time data. Ingesting and transforming streaming data is a real pain point for many companies.
  • Built-in Integrations: Supports dozens of sources and destinations, e.g. Amazon S3, BigQuery, Redshift, PowerBI, Tableau, Salesforce, Snowflake, and more.

Mage’s user interface also offers the capability to preview the pipelines you create prior to deployment.

This feature allows users to visualize and examine the structure and functionality of their pipelines, ensuring everything is set up correctly before going live. By providing this preview option, Mage helps users identify any potential issues or adjustments that may be needed, ultimately leading to a smoother deployment process and enhancing overall workflow efficiency.

Mage’s key concepts are pretty much the same as in Prefect: projects, blocks, pipeline, data product, backfill, trigger and runs.

Interactive notebook UI allowing me to preview my code’s output is one of my favourite features.

Orchestra, 2024

Created in 2023 by Hugo Lu Orchestra is the new generation orchestrator solution focussing on serverless architecture to unify everything in one platform.

Advantages:

  • Serverless Modular Architecture: You don’t need a Kubernetes cluster anymore.
  • Deliver fast: Build enterprise-grade orchestration in minutes.
  • Enhanced Monitoring: Don’t waste your time identifying bugs.
  • State of the Art integrations: Hundreds of out-of-the-box integrations.

Conclusion

This research highlights that an emphasis on heterogeneity, support for multiple programming languages, effective use of metadata, and the adoption of data mesh architecture are essential trends in data pipeline orchestration for building modern, robust, and scalable data platforms.

For instance, Apache Airflow provides a variety of pre-built data connectors that facilitate seamless ETL tasks across various cloud vendors, including AWS, GCP, and Azure. However, it lacks features such as intra-task checkpointing and caching, and it is not optimized for machine learning pipelines.

The support for multiple programming languages is expected to be a significant trend in the coming years. For example, Temporal accommodates various languages and runtimes, whereas Airflow primarily emphasizes Python.

Collaboration in Data Mesh era is the key to success in data space. Your organization may have separate teams for data management, classification models, and forecasting models, all of which can collaboratively utilize the same platform – Mage or Flyte, for example. This allows them to operate within the same workspace without interfering with one another’s work.

Although the evolution of tools is truly fascinating, the basic principles of data engineering remain the same:

Python for Data Engineers

We can witness a "Cambrian explosion" of various ETL frameworks for data extraction and transformation. It’s not a surprise that many of them are open-source and are Python-based.

Modern Data Engineering

Recommended read

[1] https://www.datacamp.com/tutorial/ml-workflow-orchestration-with-prefect

[2] https://docs.prefect.io/latest/concepts/work-pools/

[3] https://docs.prefect.io/latest/integrations/prefect-dbt/

[4] https://docs.prefect.io/latest/integrations/prefect-dbt/

[5] https://engineering.atspotify.com/2022/03/why-we-switched-our-data-orchestration-service/

[6] https://thenewstack.io/flyte-an-open-source-orchestrator-for-ml-ai-workflows/?utm_referrer=https%3A%2F%2Fwww.google.com%2F

[7] https://atlan.com/dagster-data-orchestration/

The post How Data Engineering Evolved since 2014 appeared first on Towards Data Science.

]]>
Advanced Data Modelling https://towardsdatascience.com/advanced-data-modelling-1e496578bc91/ Sun, 21 Jul 2024 19:54:50 +0000 https://towardsdatascience.com/advanced-data-modelling-1e496578bc91/ Data model layers, environments, tests and data quality explained

The post Advanced Data Modelling appeared first on Towards Data Science.

]]>
Data modelling is an essential part of Data engineering. I would say this is a must if you want to become a successful data practitioner. Building SQL transformation pipelines with multiple layers is a challenging task. During the process, it is important to keep things organised. In this story, I tried to summarise some techniques for convenient data structuring and describe the modelling techniques I use daily. It often helps me to design and develop a great data platform or a data warehouse which is accurate, easy to navigate and user-friendly.

Naming convention

Using a well-designed naming convention provides a very clear and unambiguous sense and meaning regarding the content of a given database object. It is always good to have naming policies for tables and columns in place. It simply demonstrates how mature your data warehouse is and helps a lot during the development.

Database entity names must be human-readable – at a minimum.

Maintaining the database or a DWH with this in mind improves user experience and simply makes it look more user-friendly.

Our current naming convention must look solid. It should be at least consistent with the standard Kimball techniques where we prefix the dimension table with dim_ and fact tables with fact_, i.e. dim[Dimension Name] /fact[Fact Table Name].

This is a good practice.

Semantic layer names must look consistent and logically coherent providing a solid foundation for robust data modelling.

Data lineage example for marketing attribution model. Image by author.
Data lineage example for marketing attribution model. Image by author.

I’ll try to be concise here as this topic is very well covered by renowned scholars per se (Inmon, Kimball, et al.). I’ll just focus on things that I believe are crucial and impact DWH scalability significantly.

Abbreviations

  • I would recommend in general avoiding abbreviations that might sound misleading (stg_, int_) and using full English words in database object names where possible.

Most of the modern data warehouse (DWH) solutions support 20–30 character names for identifiers. Postgres even supports more so that might be a good idea. There is no right or wrong solution here and the optimal naming format depends on the nature of the database entity, end users and data model. For example, I often abbreviate common terms to a minimum of three letters for visibility and better discovery mainly, i.e. dev for development, prod for production, etc.

From this point of view, stg instead of staging doesn’t make much sense.

It’s okay though, we can live with that.

  • I recommend using single names when possible, for example – _transaction not _transactions.
  • I recommend using lower_snake_case with no quotes around database identifier – again for consistency and convenience.

I understand though that in many companies naming conventions might be inherited from legacy projects and databases. So it’s absolutely fine if we see something conflicting with the typical Kimbal guide, e.g.

cst_ instead of customer_

There has to be a reason why these names were introduced. One such reason – is the platform and microservice architecture in the company. Server-side development, e.g. existing implementations of event streams, connectors, SDK wrappers, etc. – might rely heavily on these names and obviously refactoring is not worth the risk in such a case.

Often we can face the scenario when we have an OLTP database somewhere with such a naming convention that doesn’t look like the best practice either. Contemplating the adjustments to the DWH tool it would be good to keep in mind that the new naming convention won’t work well with the legacy one we have in the OLTP database.

It might be simply very difficult to join the DWH schema back on OLTP.

This also might have further adverse implications on ad-hoc Analytics and required data deep dives and investigations.

  • I would recommend using the consistent naming conventions for DWH objects enriching them with metadata as necessary. This would give a hint to end users and serve as a single source of truth but will also make the solution maintainable and user-friendly.

Data marts

Denormalized data is great and we can take full advantage of it using modern DWH tools. Maintenance might become a pain point when we have to work with a considerable number of different data marts. We can end up with multiple-star schemas. In this case, we would want to ensure that a single data item (a dimension, a column, etc.) is used consistently and in the same way across all data marts. So it has to be designed as a standard basic dimension to be shared consistently in all data marts. It would be nice to have a matrix of conformed dimensions and facts (if not in place already). This will ensure the DWH solution is business process-oriented, and predictable and query performance tweaks can be done easily if needed.

Primary and surrogate keys

Using the surrogate keys is important too. Typically surrogate keys don’t have any contextual or business meaning but become useful if designed right. Natural keys have a tendency to cause problems when scaling the data model. They also can deteriorate over time becoming meaningless. A good example can be a stock price ticker after an M&A or a company reorganisation (GOOG/GOOGL, etc.).

DWH/database structure

Naming source data source with the src_ prefix makes sense if you have only one project or database. In this case, it would be a clear indication that this particular entity is a declared data source and no transformation is applied in the first place. I previously wrote about it here:

Building a Data Warehouse

Data location likeanalytics_src.* suggests the nature (data origin) of a business function (analytics). However, this might seem a little confusing as analytics is typically an entity that normally contains multitudes of ad-hoc queries, materializations and/or reporting objects, e.g. analytics per se shouldn’t have _src suffix. int_analytics or simple analytics would suffice.

Remember the rule though:

Never use *`select `** – on raw data

Staging data model layer: stg_

We can often see a couple of great things like "explicit declaration of fields" and "Fields named according to naming convention". This is a really good practice.

Table name example stg_raw_customer_data clearly indicates the staging development area.

  • I would consider removing the stg_raw_ prefix from table names. Ideally, I would remove it from the table name and create a dedicated database for staging/dev and one more for automated testing – with _test.* suffix.

Data environments are very important and I tried to cover them here:

Database Data Transformation for Data Engineers

The following transformations should not be expected in this data layer:

  • no filter (no "where")
  • no joins
  • no group by
  • no calculations

It is a named implementation of a base model with only basic transformations applied where no complex logic is expected.

Intermediate models: int_ layer

Intermediate models: int_ layer

What I keep seeing very often is that data analysts and BI developers use ephemeral materialisations in their model’s lineage.

Using ephemerals is recommended by dbt labs. It’s recommended at early stages as it makes simple CTEs reusable.

  • I wouldn’t recommend using ephemeral constructs as they, don’t produce an output, are hard to unit test for data transformation logic and don’t support model contracts. Writing unit tests for any ephemeral CTE inside a data transformation pipeline is problematic.
  • int_ abbreviation might seem confusing for data developers as analysts often use it to name internal data sources. I wouldn’t use it as my naming convention. However, it’s a matter of taste and some analysts would differ.

According to common sense, complex transformations are not expected in this data model layer.

The models in this layer should be relatively simple. If the model includes many complex transformations, it may be more suitable for a business logic layer (biz_) or a data mart layer (mrt_)."

Usually, this layer is used for temporary transformations. If the model does not materialize for the sake of query optimisation, it should be ephemeral (100% of cases).

This is ultimately a good thing as it reduces the database clutter and allows reusable logic for simple tasks.

ephemerals are a good choice for basic data preparation, deduplication and JSON extracts early only if there is no need for unit testing (not data testing) of actual data transformation logic.

Dimension layer: dim_

Input model layer type

  • int
  • dim

presumes that fact data also flows in this layer. All common data transformations are allowed here, e.g. joins, aggregations and filtering.

We can create persisted fact tables for improved query performance if needed.

I prefer naming dimension entities with the dim_ prefix. Fact tables are fact by default so no need to prefix them.

I previously wrote about the benefits of the dimensional approach here:

Data Modelling For Data Engineers

Mart and Biz layers: mrt or biz prefix (or suffix)

It’s a data model layer with materialized entities containing some complex data transformation logic. Using incremental strategies is considered as best practice.

Often the mrt_ layer provides an input for biz_ and in this case, materialization or data precalculation is not necessary and a simple view is enough. However, it all would depend on the data volumes we process.

A good example of an incremental table update can be found here:

Advanced SQL techniques for beginners

Data platform and/or data warehouse architecture

Data modelling I would recommend considering templating your dbt / dataform project and starting using custom database names to split the data environment between production, dev and test data. For example, databases in the production environment can be named like so:

In this case data model naming convention can be simplified by moving raw_ and base_ prefixes to database naming:

DATABASE_NAME   SCHEMA_NAME     
-------------------------------
RAW_DEV         SERVER_DB_1     -- mocked data
RAW_DEV         SERVER_DB_2     -- mocked data
RAW_DEV         EVENTS          -- mocked data
RAW_PROD        SERVER_DB_1     -- real production data from pipelines
RAW_PROD        SERVER_DB_2     -- real production data from pipelines
RAW_PROD        EVENTS          -- real production data from pipelines
...                             
BASE_PROD       EVENTS          -- enriched data
BASE_DEV        EVENTS          -- enriched data
...                             
ANALYTICS_PROD       REPORTING  -- materialised queries and aggregates
ANALYTICS_DEV        REPORTING  
ANALYTICS_PROD       AD_HOC     -- ad-hoc queries and views

Using custom database names always helps to simplify this task. To dynamically inject custom database names we just need to create this macro:

-- ./macros/generate_database_name.sql
{% macro generate_database_name(custom_database_name=none, node=none) -%}
    {%- set default_database = target.database -%}
    {%- if custom_database_name is none -%}
        {{ default_database }}
    {%- else -%}
        {{ custom_database_name | trim }}
    {%- endif -%}
{%- endmacro %}

So now whenever we compile our models it will apply a custom database name from the model`s config, i.e.:

  • dbt run -t dev -> select * from raw_dev.shema.table
  • dbt run -t prod -> select * from raw_prod.shema.table
  • dbt run -t test -> select * from raw_test.shema.table

Personally, I try to design and introduce a base_ data model layer with minimal data manipulation on the column level. Sometimes it might be worth doing to get a better query performance:

-- simplified project structure:
.
└── models
    └── some_data_source
        β”œβ”€β”€ _data_source_model__docs.md
        β”œβ”€β”€ _data_source__models.yml
        β”œβ”€β”€ _sources.yml  -- raw data table declarations
        └── base -- base transformations, e.g. JSON to cols
        |   β”œβ”€β”€ base_transactions.sql
        |   └── base_orders.sql
        └── analytics -- deeply enriched data prod grade data, QA'ed
            β”œβ”€β”€ _analytics__models.yml
            β”œβ”€β”€ some_model.sql
            └── some_other_model.sql
-- extended example with various data sources and marts:
└── models
    β”œβ”€β”€ int -- only if required and 100% necessary for reusable logic
    β”‚   └── finance
    β”‚       β”œβ”€β”€ _int_finance__models.yml
    β”‚       └── int_payments_pivoted_to_orders.sql
    β”œβ”€β”€ marts -- deeply enriched, QAed data with complex transformations
    β”‚   β”œβ”€β”€ finance
    β”‚   β”‚   β”œβ”€β”€ _finance__models.yml
    β”‚   β”‚   β”œβ”€β”€ orders.sql
    β”‚   β”‚   └── payments.sql
    β”‚   └── marketing
    β”‚       β”œβ”€β”€ _marketing__models.yml
    β”‚       └── customers.sql
    └── src (or staging) -- raw data with basic transformations applied
        β”œβ”€β”€ some_data_source
        β”‚   β”œβ”€β”€ _data_source_model__docs.md
        β”‚   β”œβ”€β”€ _data_source__models.yml
        β”‚   β”œβ”€β”€ _sources.yml
        β”‚   └── base
        β”‚       β”œβ”€β”€ base_transactions.sql
        β”‚       └── base_orders.sql
        └── another_data_source
            β”œβ”€β”€ _data_source_model__docs.md
            β”œβ”€β”€ _data_source__models.yml
            β”œβ”€β”€ _sources.yml
            └── base
                β”œβ”€β”€ base_marketing.sql
                └── base_events.sql

A good practice would be to use the following techniques:

  • Use persisted materialization and clustering for biz_ layer objects if needed
  • Avoid using Google Sheets as a source
  • Avoid using dbt seeds. This allows only CSV and shouldn’t be used to populate tables. Try seeding _test database tables using custom materialization instead, e.g. a SQL which generates an output and can be referenced (also visible on the data lineage graph):
--./macros/operation.sql
{%- materialization operation, default  -%}
    {%- set identifier = model['alias'] -%}
  {%- set target_relation = api.Relation.create(
        identifier=identifier, schema=schema, database=database,
        type='table') -%}
  -- ... setup database ...
  -- ... run pre-hooks...
  -- build model
  {% call statement('main') -%}
    {{ sql }}
  {%- endcall %}

  -- ... run post-hooks ...
  -- ... clean up the database...
    -- `COMMIT` happens here
    {{ adapter.commit() }}
  -- Return the relations created in this materialization
  {{ return({'relations': [target_relation]}) }}
{%- endmaterialization -%}

Now if we add it to our model config as an operation it will simply run the SQL and we will be able to reference it using the ref() function:

{{ config(
    materialized='operation',
    alias='table_a',
    schema='schema',
    tags=["tag"]
) }}
create or replace table {{this.database}}.{{this.schema}}.{{this.name}} (
     id      number(38,0)   
    ,comments varchar(100)  
);
# dbt run --select table_a.sql
19:10:08  Concurrency: 1 threads (target='dev')
19:10:08  
19:10:08  1 of 1 START sql operation model schema.table_a ................................ [RUN]
19:10:09  1 of 1 OK created sql operation model schema.table_a ........................... [SUCCESS 1 in 1.10s]
19:10:09  
19:10:09  Finished running 1 operation model in 0 hours 0 minutes and 4.16 seconds (4.16s).
19:10:09  
19:10:09  Completed successfully
19:10:09  
19:10:09  Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1

Now if we want to have another table or a view that can reference this operation we can use the standard ref() function and our table_a would appear as a dependency in data lineage:

Image by author
Image by author
  • I would recommend to use incremental updates with clustering and incremental predicates, e.g.
-- Sample SQL for standard transaction table
-- will use conditional ref in incremental strategy
-- depends_on: {{ ref('transactions_view') }}
{{
    config(
        materialized='incremental',
        unique_key='id',
        on_schema_change='append_new_columns',
        incremental_strategy='merge',
        cluster_by = ['updated_at'],
        incremental_predicates = [
            "DBT_INTERNAL_SOURCE.load_ts > dateadd(day, -30, current_date)"
        ],
        schema='source_db',
        alias='transactions_',
        query_tag = 'transactions__incremental',
        tags=["incremental", "transactions", "unit_testing"]
    )
}}
select
    *
{% if is_incremental() %}
    from {{ref('transactions_view')}}
{% else %}
    from {{ref('transactions__log')}}
{% endif %}
where
    id is not null
qualify
    ROW_NUMBER() OVER (PARTITION BY id ORDER BY updated_at::TIMESTAMP DESC ) = 1
  • Avoid patterns with select * and consider moving attribution_with_paid_ads and paid_ads_wtihout_attribution into separate model files. Unit test.

Testing the model logic

This is a very important part where can run tests for our data model logic. Do you unit test your Python functions? I tend to do it in a similar way.

Consider this example below. I use dbt_utils.equality package to compare the expected dataset and the table we get after we run our model:

version: 2

models:
  - name: my_model_i_want_to_test
    database: |
        {%- if  target.name == "dev" -%} raw_dev
        {%- elif target.name == "prod"  -%} raw_prod
        {%- elif target.name == "test"  -%} raw_test
        {%- else -%} invalid_database
        {%- endif -%}
enabled: true
    description: 
    tests:
      - dbt_utils.equality:
          tags: ['unit_testing']
          compare_model: ref('my_model_i_want_to_test_test_expected')
          compare_columns:
            - col_1
            - col_2
            ...
    config:
      alias: my_model_i_want_to_test
      schema: marketing

Now if we run dbt build - select tag:unit_testing -t test in our command line dbt will build our source table which is transactions to compare it with the expected table built in the test environment. This table is called my_model_i_want_to_test_test_expected and is referenced using our favourite ref() function.

-- my_model_i_want_to_test_test_expected.sql
{{ config(
    materialized='table',
    schema='some_schema',
    query_tag = 'tag',
    tags=["unit_testing"]
) }}
select
'{
  "data": {
    ...
  }
}'...
...
...
dbt build --select tag:unit_testing -t test
[RUN]
19:40:35  2 of 2 PASS dbt_utils_source_equality__db_my_model_i_want_to_test_test_expected_..._col1__ref_..._test_expected_  [PASS in 1.53s]
19:40:35  
19:40:35  Finished running 1 table model, 1 test in 0 hours 0 minutes and 10.97 seconds (10.97s).
19:40:35  
19:40:35  Completed successfully
19:40:35  
19:40:35  Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2

dbt_utils.equality will compile our test into something like this to check that the actual table generated after the run equals the expected table we defined for the test:

with a as (
    select * from raw_test.schema.my_model_i_want_to_test
),

b as (
    select * from raw_test.schema.my_model_i_want_to_test_test_expected
),

a_minus_b as (

    select filename from a

    except
    select filename from b

),

b_minus_a as (
    select filename from b

    except
    select filename from a

),

unioned as (
    select 'a_minus_b' as which_diff, a_minus_b.* from a_minus_b
    union all
    select 'b_minus_a' as which_diff, b_minus_a.* from b_minus_a

)

select * from unioned

    ) dbt_internal_test

This feature is very useful and this way we can test any model with just one dbt-core package.

DBT goes even further and in dbt-core package starting version 1.8 they implemented independent unit tests that look even better.

Unit tests | dbt Developer Hub

Conclusion

It’s always great to have an opportunity to design our DWH solution from scratch, In this case, we are lucky not to rely on any legacy processes and OLTP databases. Often this is not the case and the world might seem a bit more complicated. It’s good to bear in mind that the new naming convention must be usable and could be applied easily with any existing infrastructure resources we have.

I would recommend using the consistent naming conventions for DWH objects enriching them with metadata as necessary. At least data marts must follow the naming convention with meaningful business names. Other raw_/source_ and live_/prod_ schema object names can be anything with the existing naming convention. As long as they are enriched with metadata and downstream clearly into data mart entities on the data lineage graph. This would give a hint to end users and serve as a single source of truth but will also make the solution maintainable and user-friendly.

If we need to use names longer than 30 characters then it would be probably a good idea to come up with abbreviation rules. That would make development easier in the future.

Building a SQL transformation pipeline with multiple models is a challenging task. It is not only data frames and CTEs but also various data models because they help to manage your code in a way that is easy to maintain and split the logic between different data environments. During the process, it is important to keep things organised. I summarised some techniques for convenient structuring of our data transformation steps that result in a nice DHW design.

Data Warehouse Design Patterns

Model metadata is very important. No matter what data platform architecture we use I would always recommend using the consistent naming conventions for DWH objects enriching them with metadata as necessary. This would give a hint to end users and serve as a single source of truth but will also make the solution maintainable and user-friendly.

The final data model might look very complex and difficult to read. In some companies, this can be considered as an anti-pattern in SQL development. Breaking that SQL into smaller parts makes our code reusable and easier to maintain. Modern data build tools (data form, DBT, etc.) offer a set of useful features to improve this process. We can power our SQL templates by injecting pieces of code using macros, variables and constants. From my experience, this feature combined with infrastructure as code helps to ensure adequate CI/CD processes which saves time during development. It helps to organise and simplify the data environment split between dev, live and automated testing so we can focus on business logic and continuous improvement of our state-of-the-art data platform.

Recommended read:

[1] https://medium.com/towards-data-science/building-a-data-warehouse-9696b238b2da

[2] https://towardsdatascience.com/database-data-transformation-for-data-engineers-6404ed8e6000

[3] https://towardsdatascience.com/data-modelling-for-data-engineers-93d058efa302

[4] https://towardsdatascience.com/advanced-sql-techniques-for-beginners-211851a28488

[5] https://docs.getdbt.com/docs/build/unit-tests

The post Advanced Data Modelling appeared first on Towards Data Science.

]]>
Four Data Engineering Projects That Look Great on your CV https://towardsdatascience.com/four-data-engineering-projects-that-look-great-on-your-cv-069dffae95e0/ Sat, 23 Mar 2024 15:06:59 +0000 https://towardsdatascience.com/four-data-engineering-projects-that-look-great-on-your-cv-069dffae95e0/ Data pipelines that would turn you into a decorated data professional

The post Four Data Engineering Projects That Look Great on your CV appeared first on Towards Data Science.

]]>
AI-generated image using Kandinsky
AI-generated image using Kandinsky

In this story, I would like to speak about Data Engineering career paths and data projects that look great on any CV. If you are an aspiring data practitioner not only willing to learn new tools and techniques but also aiming to build your own data projects portfolio – this article is for you. During my more than 15 years career in data and analytics, I witnessed good and bad CVs showcasing data engineering skills. Data engineering projects you were involved in or responsible for are the ultimate compass that tells a recruiter about your experience, how good you are and why they should hire you. This story is about how to present your data engineering experience in CVs and deliver that sense of professionalism and confidence that buy-in.


Starting a new data engineering project is always challenging as data engineering is probably the most difficult job role in the data space. You need to be a software engineer – to know how to build data pipelines, then you need to be a data analyst – to communicate efficiently with analytics teams using SQL, and in the end, you need to be an experienced data platform architect to manage all required infrastructure resources. It’s definitely worth the risk to start learning it! Data engineering is the fastest growing occupation according to DICE research conducted in 2023 and I previously wrote about it in one of my previous articles [1].

Source: DICE
Source: DICE

How to Become a Data Engineer

If you are learning data engineering this article might be useful for you because choosing the right set of projects helps to develop required skills faster. In this article above I wrote about the skills that look great on your CV. I think now is the time to speak about the data engineering project portfolio. It is so easy to get stuck choosing the right tools or finding the right data for your project. Let’s see what we can do to make your CV look more professional data-wise and maybe create a mini data engineering roadmap to learn a few things this year.

I noticed that beginner and even intermediate-level users often struggle in these three main areas while contemplating a new data engineering project:

  • Choosing the right dataset for your project
  • Picking the right tools to work with data
  • Data pipeline orchestration

When you are thinking of starting a new data engineering project I recommend considering it as a data platform you are building from scratch (or at least a part of it). Imagine that you’ve been just hired and onboarded. Try to think of questions you would want to ask your new employers about the data stack, and business and functional requirements, and try to envision all the potential pitfalls and challenges we might face during the data platform design. In any data platform data flow is predictable:

Typical data flow and tools. Image by author.
Typical data flow and tools. Image by author.

Ultimately there are a few main things to focus on while architecting the data platform [2]:

  • Data sources (APIs, relational and non-relational databases, integrations, event streams)
  • Data ingestion – this is complicated, more ideas can be found below.
  • Data storage – mostly for lake house architecture types.
  • Data orchestration – data pipeline triggers, flow charts and directed acyclic graphs (DAGs)
  • Data resources – provisioning and management.
  • Data warehousing – tools and techniques.
  • Business intelligence (BI) – reporting and dashboards.
  • Machine learning and MLOps

Each of these areas must be mentioned in your data engineering CV to make it look complete. It would be great to create some sort of a visual to present the final deliverable too. It can be a website or a simple dashboard, e.g. Looker Studio dashboard that we can share. It is always a great idea to supply it with comments and annotations.

Sample Looker Studio dashboard. Image by author.
Sample Looker Studio dashboard. Image by author.

Data Platform Architecture Types

Data sources

It all starts with data sources. Indeed, any data pipeline starts somewhere. Whenever we transform data from point A to point B – there is a data pipeline [3]. A data pipeline’s three major parts are a source, a processing step or steps, and a destination. We can extract data from an external API (a source) and then be loaded into the data warehouse (destination). This is an example of a most common data pipeline where the source and destination are different.

Data pipeline. Image by author
Data pipeline. Image by author

Data pipeline design patterns

A simple data connector built with MySQL database can demonstrate that you know how to extract data – the basic ETL technique. Very often data engineers are tasked to extract really huge amounts of data from databases (or any other data source). It is crucial to convey a feeling that you know how to use code and do it efficiently, e.g. using Python generator and yield Consider this code snippet below as it explains how to process the data using generators:

# Create a file first: ./very_big_file.csv as:
# transaction_id,user_id,total_cost,dt
# 1,John,10.99,2023-04-15
# 2,Mary, 4.99,2023-04-12

# Example.py
def etl(item):
    # Do some etl here
    return item.replace("John", '****') 

# Create a generator 
def batch_read_file(file_object, batch_size=19):
    """Lazy function (generator) can read a file in chunks.
    Default chunk: 1024 bytes."""
    while True:
        data = file_object.read(batch_size)
        if not data:
            break
        yield data
# and read in chunks
with open('very_big_file.csv') as f:
    for batch in batch_read_file(f):
        print(etl(batch))

# In command line run
# Python example.py

So the project can look like that:

Data pipeline from MySQL to data warehouse. Image by author.
Data pipeline from MySQL to data warehouse. Image by author.

MySQL data connector for your data warehouse solution

The tutorial explains how to:

  • export data from MySQL efficiently with stream and save locally as CSV
  • export data from MySQL and pipe that stream to GCP’s Cloud Storage or AWS S3

This is one of the most common data pipeline designs and MySQL is probably the most popular relational database that can be easily deployed locally or in the cloud. You can try one of my previous stories to do it.

Create MySQL and Postgres instances using AWS Cloudformation

Data ingestion, streams and data warehousing

The way we load data into the data warehouse or into the landing area in our data lake is probably the most difficult part of any data engineering project as it would involve a producer. Basically we need something to generate data – it can be a microservice or a containerised application, e.g. Docker, etc. It can be simple AWS Lambda function sending event data. For example consider this code snippet. It mocks some fake data and forward it into Kinesis stream [4].

# Make sure boto3 is installed locally, i.e. pip install boto3
import json
import random
import boto3

kinesis_client = boto3.client('kinesis', region_name='eu-west-1')
# Constants:
STREAM_NAME = "your-data-stream-staging"

def lambda_handler(event, context):
    processed = 0
    print(STREAM_NAME)
    try:
        print('Trying to send events to Kinesis...')
        for i in range(0, 5):
            data = get_data()
            print(i, " : ", data)
            kinesis_client.put_record(
                StreamName=STREAM_NAME,

                Data=json.dumps(data),
                PartitionKey="partitionkey")
            processed += 1
    except Exception as e:
        print(e)
    message = 'Successfully processed {} events.'.format(processed)
    return {
        'statusCode': 200,
        'body': { 'lambdaResult': message }
    }

We would like to add a helper function to generate some random event data. For instance:

# Helpers:
def get_data():
    return {
        'event_time': datetime.now().isoformat(),
        'event_name': random.choice(['JOIN', 'LEAVE', 'OPEN_CHAT', 'SUBSCRIBE', 'SEND_MESSAGE']),
        'user': round(random.random() * 100)}

Here we go! This is our second data engineering project that demonstrates the following:

  • we know how to mock data using Python
  • we are familiar with serverless architecture
  • we are confident with event streams
  • we know basic ETL techniques

Building a Streaming Data Pipeline with Redshift Serverless and Kinesis

Example view with events from our application. Image by author.
Example view with events from our application. Image by author.

There is an Infrastructure as code part of this project too. I think this is one of the main things I would look into when hiring mid-level data engineers.

Data storage, machine learning and orchestration

Here I would demonstrate that I am familiar with different data platform architecture types, e.g. data warehouse, lake house, data lake and data mesh. I remember a couple of years ago the internet was boiling with "Hasdoop is dead" type stories. There was a noticeable shift towards data warehouse architecture. In 2024 everyone seems to be obsessed with real-time data streaming and scalability suggesting Spark and Kafka soon to become the public benchmark leaders. Indeed, processing huge amounts of data in the data lake might be way more efficient using distributed computing.

All we need is to demonstrate that we are familiar with Spark (PySpark) for example and that we have experience working with cloud storage providers. The main three are AWS, Goole and Azure.

So our third project might look like this [5]:

Orchestrate Machine Learning Pipelines with AWS Step Functions

In this tutorial, we will use public Movielens datasets with movie ratings to build the recommendation system. So the steps would be the following:

  • preserve movie data in cloud storage, e.g.
# user_ratedmovies-timestamp.dat
userID movieID rating timestamp
75 3 1 1162160236000
75 32 4.5 1162160624000
75 110 4 1162161008000
  • transform datasets into a conformed model using AWS Glue
AWS Glue main page. Image by author.
AWS Glue main page. Image by author.
  • orchestrate the pipeline using AWS Step Functions
CreateBatchInferenceJob node. Image by author.
CreateBatchInferenceJob node. Image by author.

This is not only a good project to demonstrate the knowledge of data platform and data pipeline design but also to learn infrastructure as code (AWS CloudFormation) and Machine Learning (ML) techniques with AWS Personalize.

Data warehousing and machine learning

Machine learning is an important part of data engineering. MLOps and skills required to manage machine learning pipelines are essential for data engineers. So why not demonstrate that we are confident with both?

A good example of such a project might include the analysis of user behaviour data to predict user propensity to churn. This is not a trivial task per se and building a project like this one requires a good understanding of marketing event data and basic concepts of user retention. So if you are capable of finishing it then you’ll have it all! We can focus on dataset preparation and model training using standard SQL. It demonstrates our good knowledge of SQL techniques.

Advanced SQL techniques for beginners

Indeed, retention is an important business metric that helps understand user behaviour’s mechanics. It provides a high-level overview of how successful our Application is in terms of retaining users by answering one simple question: Is our App good enough at retaining users? It is a well-known fact that it’s cheaper to retain an existing user than to acquire a new one. So building a data pipeline like this might be a great way to learn these concepts:

Dataset prep and model training diag
Dataset prep and model training diag

Then we can build a simple classification model using BigQuery ML like so:

SELECT
  user_pseudo_id,
  churned,
  predicted_churned,
  predicted_churned_probs[OFFSET(0)].prob as probability_churned
FROM
  ML.PREDICT(MODEL sample_churn_model.churn_model,
  (SELECT * FROM sample_churn_model.churn)) #can be replaced with a proper test dataset
order by 3 desc
;

It will forecast the probability (propensity to churn) – and the closer this probability is to 1 the more likely this user will not return to the App according to the model’s prediction:

Predictions. Image by author.
Predictions. Image by author.

It is true that acting on machine learning (ML) model data to retain users proved itself extremely useful and might help to gain a competitive advantage in the fast-changing market environment.

That’s why it is a great candidate to include in our CV!

Conclusion

Starting a new data engineering project is always challenging and in this story, I tried to focus on some ideas that might look great on any CV. Often I find myself struggling with the choice of the dataset. When I get stuck I simply start to mock data myself – this should not be a blocker. Alternatively, we can use datasets that are publically available for free, e.g. movielens and Google Analytics events. They are just great for staging purposes due to their sizes. Choosing the right tools for your data engineering projects depends on functional and business requirements. Here I would recommend coming up with a scenario and playing with your imagination. That’s why I love tech – everything is entirely possible!

In this article, I shared four data engineering projects that cover some crucial areas of data platform design – data pipeline orchestration, data warehousing, data ingestion and machine learning. These are the real projects I was involved and I translated them into tutorials. I hope you find it useful.

Recommended read

[1] https://towardsdatascience.com/how-to-become-a-data-engineer-c0319cb226c2

[2] https://towardsdatascience.com/data-platform-architecture-types-f255ac6e0b7

[3] https://towardsdatascience.com/data-pipeline-design-patterns-100afa4b93e3

[4] https://towardsdatascience.com/building-a-streaming-data-pipeline-with-redshift-serverless-and-kinesis-04e09d7e85b2

[5] https://medium.com/towards-artificial-intelligence/orchestrate-machine-learning-pipelines-with-aws-step-functions-d8216a899bd5

[6] https://medium.com/towards-data-science/python-for-data-engineers-f3d5db59b6dd

The post Four Data Engineering Projects That Look Great on your CV appeared first on Towards Data Science.

]]>