Oct 27, 2024

Incremental Data Processing with Delta Live Tables Flows

Introduction

As businesses grow and scale, managing data effectively becomes a crucial task, especially when dealing with real-time streaming data from multiple sources. Incrementally processing this data allows organizations to handle large datasets more efficiently, ensuring that only new or updated information is processed instead of repeatedly analyzing entire datasets. This is where Delta Live Tables (DLT) comes in, providing a framework within Databricks for managing and processing data pipelines incrementally.

One of the key features of DLT is the use of flows, which allow incremental updates to target streaming tables. In this blog, we will dive into the concept of flows in Delta Live Tables, discuss how they work, and provide practical examples of how to set them up using Python and SQL.

What is a Flow in Delta Live Tables?

A flow in Delta Live Tables refers to a streaming query that processes source data incrementally to update a target streaming table. Flows help ensure that only the new data or changes are processed in real time, reducing the workload on your data pipelines.

Flows can be defined in two ways in DLT:

  1. Implicit Flows: These are automatically created when you define a query that updates a streaming table.

  2. Explicit Flows: These are defined manually to handle more complex operations, such as appending data from multiple streaming sources to a single target table.

In most cases, you do not need to explicitly define a flow. When you create a streaming table in Delta Live Tables, the flow is defined as part of the query.

Implicit Flow Example

Let’s look at a basic example where we create a streaming table without needing to explicitly define the flow.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data");

In the above SQL statement, the streaming query processes the data from source_data and updates the raw_data table incrementally. This is an example of an implicit flow, where the flow is defined automatically by the query.

Explicit Flow Definition for Complex Operations

While implicit flows handle most cases, there are scenarios where you might want to define a flow explicitly. For example, when you need to append data from multiple sources to a single streaming table, or when you need to backfill historical data into a table.

The append flow functionality allows you to read from multiple streaming sources and append data to a single target table. This is especially useful when combining data from multiple regions, Kafka topics, or when backfilling missing data.

Use Cases for Append Flows

1. Appending Data from Multiple Streaming Sources

Let’s say you have a global data pipeline with multiple regions, each streaming data into a central table. Using append flows, you can add new regions to the pipeline without requiring a full refresh.

2. Backfilling Historical Data

When you have an existing streaming table and need to append historical data (which is not streaming), append flows allow you to backfill that data without interrupting the existing pipeline.

3. Incremental Data Union

Rather than using a traditional UNION query to combine data from multiple sources, append flows let you append data incrementally, avoiding the need to refresh the entire table with each new data source.

Creating a Flow in Python

Example 1: Writing to a Streaming Table from Multiple Kafka Topics

In this example, we are going to create a streaming table called kafka_target and append data to it from two different Kafka topics.

import dlt

# Create the target streaming table
dlt.create_streaming_table("kafka_target")

# Append data from the first Kafka topic
@dlt.append_flow(target="kafka_target")
def topic1():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "host1:port1")
            .option("subscribe", "topic1")
            .load()
    )

# Append data from the second Kafka topic
@dlt.append_flow(target="kafka_target")
def topic2():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "host2:port2")
            .option("subscribe", "topic2")
            .load()
    )

In this case, data from both topic1 and topic2 Kafka streams is incrementally appended to the kafka_target streaming table. The @dlt.append_flow decorator is used to ensure that both streams are processed incrementally without needing to merge the data manually.

Example 2: Backfilling Historical Data

Sometimes you may have an existing streaming table but need to add historical data that wasn’t included in the original stream. Here’s an example of how to backfill historical data into a streaming table.

import dlt

# Define the target streaming table
@dlt.table()
def csv_target():
    return spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .load("path/to/sourceDir")

# Backfill the historical data into the streaming table
@dlt.append_flow(target="csv_target")
def backfill():
    return spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .load("path/to/backfill/data/dir")

The backfill function reads historical data from a specific directory and appends it to the csv_target table. This is a one-time operation, and after the historical data is loaded, the function can be removed to prevent reprocessing it in future runs.

Example 3: Append Flow Instead of UNION

In this scenario, instead of using a UNION query to combine data from multiple sources, we can use append flows to process the data incrementally.

Here’s an example using a traditional UNION query:

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
    raw_orders_us = spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .load("/path/to/orders/us")

    raw_orders_eu = spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .load("/path/to/orders/eu")

    return raw_orders_us.union(raw_orders_eu)

While this approach works, it requires reloading both datasets every time a new update is made. Instead, we can use append flows:

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_orders_us():
    return spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
    return spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .load("/path/to/orders/eu")

In this case, new records from the US and EU order streams are appended to the raw_orders table incrementally, making it more efficient than the UNION query approach. Additional regions can be added using more @dlt.append_flow decorators without needing a full table refresh.

Managing Data Quality and Checkpoints

While defining flows, it’s essential to ensure data quality through expectations. However, you cannot define data quality constraints inside the @append_flow definition. Instead, you need to set expectations on the target table when it’s created.

Additionally, each flow has a flow name, which is used to manage streaming checkpoints. Renaming a flow will reset the checkpoint, effectively creating a new flow. It’s important to use consistent flow names to ensure that checkpoints are maintained properly.

Conclusion

Delta Live Tables provide a powerful and efficient way to manage incremental data processing, especially when handling real-time streaming data. With flows and append flows, DLT makes it easy to define and manage streaming pipelines, allowing data engineers to focus on business logic without worrying about complex merging and refreshing tasks.

Whether you're working with multiple Kafka topics, backfilling historical data, or appending data from various regions, append flows simplify your data pipelines, ensuring that only the necessary data is processed incrementally. By using the examples in this blog, you can start building scalable, efficient, and robust data pipelines with Delta Live Tables today.

Table of Content

Title

Subscribe to get notified.

Subscribe to get notified.

Subscribe to get notified.

Want to hear about our latest Datalakehouse and Databricks learnings?

Subscribe to get notified.

Want to hear about our latest Datalakehouse and Databricks learnings?

Subscribe to get notified.

Make your data engineering process efficient and cost effective. Feel free to reach for a data infrastructure audit.

How WTD Can help

- Data experts for implementing projects

- On-demand data team for support

Make your data engineering process efficient and cost effective. Feel free to reach for a data infrastructure audit.

How WTD Can help

- Data experts for implementing projects

- On-demand data team for support

Make your data engineering process efficient and cost effective. Feel free to reach for a data infrastructure audit.

How WTD Can help

- Data experts for implementing projects

- On-demand data team for support