Oct 13, 2024

How Can Delta Live Tables' APPLY CHANGES API Simplify Your CDC Workflows?

Introduction

Data is constantly evolving, and businesses must keep up with these changes in real-time. A common challenge faced by organizations is how to manage and capture changes in their data—this is where Change Data Capture (CDC) comes into play. CDC is a set of processes used to track and manage changes in databases. It is a critical need in modern data architectures that rely on timely and accurate information to fuel decision-making, analytics, and operations.

With Delta Live Tables (DLT), a framework in Databricks for building and managing data pipelines, the introduction of the APPLY CHANGES and APPLY CHANGES FROM SNAPSHOT APIs simplifies the CDC process significantly. These APIs offer a cleaner, more efficient way to manage changes in data, removing the complexity often associated with older methods like the MERGE INTO statement.

In this blog, we'll explore how these APIs work, their key benefits, and how to implement them using both SQL and Python in Delta Live Tables.

What is Change Data Capture (CDC)?

CDC refers to a process used to identify and track changes in a database over time. This method is crucial for keeping data in sync between systems and ensuring that the latest data is always available for analytics, reporting, and decision-making. Historically, MERGE INTO statements have been used to apply these changes. However, as datasets grow, MERGE INTO can lead to issues like handling out-of-order records or requiring complicated logic to reorder records.

The APPLY CHANGES and APPLY CHANGES FROM SNAPSHOT APIs in Databricks remove these complications and provide a more streamlined approach to CDC.

The Two APIs: APPLY CHANGES and APPLY CHANGES FROM SNAPSHOT

1. APPLY CHANGES API

The APPLY CHANGES API is designed for handling CDC from a Change Data Feed (CDF). CDFs are continuous streams of changes that happen in your database. This API allows you to apply changes directly to your target tables, using Slowly Changing Dimension (SCD) Type 1 or Type 2 methods to manage how changes are applied.

  • SCD Type 1: Updates existing records directly without keeping a history of past values.

  • SCD Type 2: Keeps a history of changes by adding new records with appropriate timestamps (or sequencing information), so you can track changes over time.

2. APPLY CHANGES FROM SNAPSHOT API (Public Preview)

The APPLY CHANGES FROM SNAPSHOT API, currently in public preview, deals with data snapshots instead of a continuous change feed. This method is ideal for systems where you take periodic snapshots of the data. Snapshots are essentially full versions of your database at specific points in time. The API allows for processing changes between these snapshots efficiently.

Benefits of the APPLY CHANGES APIs

  1. Handles Out-of-Order Data: One of the biggest challenges with traditional methods like MERGE INTO is dealing with records arriving out of sequence. The APPLY CHANGES API takes care of this automatically, ensuring your data stays consistent without having to write custom code.

  2. Simpler Syntax: By abstracting the complexity, these APIs offer a more intuitive and declarative way to handle CDC in both SQL and Python.

  3. Support for SCD Type 1 and 2: Whether you need to overwrite data with the latest changes (SCD Type 1) or keep a history of changes (SCD Type 2), these APIs have you covered.

  4. Automatic Sequencing: You can specify a column in the source data to be used as the sequencing value (a monotonically increasing number like timestamps). This ensures that changes are applied in the correct order, even if records arrive late.

  5. Support for Deletes and Truncates: These APIs can handle delete and truncate operations seamlessly, which is often tricky in traditional data pipelines.

How to Implement the APPLY CHANGES API with Delta Live Tables

Let’s look at how you can use the APPLY CHANGES API in practice. We will walk through an example using Python in Delta Live Tables (DLT).

Example: Handling SCD Type 1 with Python

SCD Type 1 is the simplest form of CDC where you just update the existing records without keeping a history of previous versions. Here's how you can apply it:

import dlt
from pyspark.sql.functions import col, expr

# Define the source table that reads from a change data feed (CDF)
@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

# Create a target table that will store the changes
dlt.create_streaming_table("target")

# Apply the changes using the APPLY CHANGES API
dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],   # Define the key column(s)
  sequence_by = col("sequenceNum"),  # Sequence column for ordering changes
  apply_as_deletes = expr("operation = 'DELETE'"),  # Apply delete operations
  apply_as_truncates = expr("operation = 'TRUNCATE'"),  # Apply truncate operations
  except_column_list = ["operation", "sequenceNum"],  # Columns to ignore
  stored_as_scd_type = 1  # Use SCD Type 1
)

In this example:

  • Source: The data is read from a stream, representing the changes.

  • Target: The final table that will store the updated records.

  • Keys: The column(s) that uniquely identify records (in this case, userId).

  • Sequence_by: We order changes by the sequenceNum column to ensure changes are applied in the correct order.

  • SCD Type 1: Updates are applied directly without keeping a history.

After running the above code, the target table will have the latest data based on the changes provided.

SCD Type 2 Example with Python

In contrast to SCD Type 1, SCD Type 2 keeps a history of changes, allowing you to track data over time. Here’s how to handle SCD Type 2 with Python:

import dlt
from pyspark.sql.functions import col, expr

# Define the source table that reads from a change data feed (CDF)
@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

# Create a target table that will store the changes
dlt.create_streaming_table("target")

# Apply the changes using the APPLY CHANGES API with SCD Type 2
dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],   # Define the key column(s)
  sequence_by = col("sequenceNum"),  # Sequence column for ordering changes
  apply_as_deletes = expr("operation = 'DELETE'"),  # Apply delete operations
  except_column_list = ["operation", "sequenceNum"],  # Columns to ignore
  stored_as_scd_type = "2"  # Use SCD Type 2
)

In this case, we are using SCD Type 2, which will create new records with timestamps whenever there is an update, preserving a history of changes.

APPLY CHANGES FROM SNAPSHOT API

The APPLY CHANGES FROM SNAPSHOT API is ideal when your data comes in snapshots (periodic captures of a table’s state). This API compares the latest snapshot with the previous one to identify changes.

Example: Processing Data from Snapshots

import dlt

# Define a function that reads from a table containing snapshots
@dlt.view
def source():
    return spark.read.table("mycatalog.myschema.mytable")

# Create a target table for storing changes
dlt.create_streaming_table("target")

# Apply the changes from snapshots using the API
dlt.apply_changes_from_snapshot(
  target="target",
  source="source",
  keys=["Key"],  # Define key column(s)
  stored_as_scd_type=2  # Use SCD Type 2 for history tracking
)

This API simplifies the process of identifying and applying changes between snapshots, perfect for databases that don’t continuously update but instead refresh periodically.

Conclusion

The APPLY CHANGES and APPLY CHANGES FROM SNAPSHOT APIs are game-changers for managing Change Data Capture (CDC) in Delta Live Tables. These tools streamline the process, automatically handle out-of-order data, and provide a simple yet powerful interface for managing updates in your data pipelines. Whether you’re dealing with continuous change feeds or periodic snapshots, these APIs provide the flexibility and control needed to keep your data pipelines accurate, efficient, and up to date.

By using these APIs, organizations can ensure that their data pipelines are not only easier to manage but also capable of delivering timely and accurate insights across various use cases.

Table of Content

Title

Subscribe to get notified.

Subscribe to get notified.

Subscribe to get notified.

Want to hear about our latest Datalakehouse and Databricks learnings?

Subscribe to get notified.

Want to hear about our latest Datalakehouse and Databricks learnings?

Subscribe to get notified.

Make your data engineering process efficient and cost effective. Feel free to reach for a data infrastructure audit.

How WTD Can help

- Data experts for implementing projects

- On-demand data team for support

Make your data engineering process efficient and cost effective. Feel free to reach for a data infrastructure audit.

How WTD Can help

- Data experts for implementing projects

- On-demand data team for support

Make your data engineering process efficient and cost effective. Feel free to reach for a data infrastructure audit.

How WTD Can help

- Data experts for implementing projects

- On-demand data team for support