Sep 29, 2024

Comprehensive Guide for Databricks Python SDK

Manage Workspace Permissions

Managing workspace permissions in Databricks is essential to control who has access to specific resources. The Workspace Assignment API allows you to manage these permissions for users, groups, or service principals.

The WorkspaceAssignmentAPI provides several key methods for handling permissions:

  1. Delete Workspace Assignment:

    • Removes the permissions for a specific user or group from a workspace.

    • Method: delete(workspace_id: int, principal_id: int)

      a.workspace_assignment.delete(workspace_id=workspace_id, principal_id=principal_id)

  2. Get Workspace Permissions:

    • Fetches a list of permissions for a specific workspace.

    • Method: get(workspace_id: int)

      permissions = a.workspace_assignment.get(workspace_id=workspace_id) print(permissions)

  3. List Permission Assignments:

    • Retrieves the permission assignments for all principals in a specific workspace.

    • Method: list(workspace_id: int)

      all_permissions = a.workspace_assignment.list(workspace_id=workspace_id) for perm in all_permissions: print(perm)

  4. Update/Create Workspace Permissions:

    • Adds or updates the workspace permissions for a user, group, or service principal.

    • Method: update(workspace_id: int, principal_id: int, permissions: List[WorkspacePermission])

      from databricks.sdk.service import iam a.workspace_assignment.update( workspace_id=workspace_id, principal_id=principal_id, permissions=[iam.WorkspacePermission.USER] )

How to List and Manage Jobs with Databrick’s Python SDK

  • Databricks' Python SDK enables us to programmatically list, manage, and automate jobs in our Databricks workspace.

  • With this, we can streamline workflows, automate monitoring, and manage jobs at scale without relying on the UI.

How to implement it:

  1. Install the Databricks SDK and set up authentication using a Databricks access token or OAuth..

  2. We can list jobs that contains details such as name, job ID, and settings.

  3. We can programmatically trigger and delete a specific job

  4. We can apply filters to list only specific jobs, like jobs created by a certain user, jobs in a specific status (running, failed, etc.).

Real-Life Example: Imagine we’re managing multiple ETL jobs for a daily data processing pipeline. Instead of manually checking each job in the UI, we can use Python to list and manage them all in a centralized script. This makes it easier to track job statuses and troubleshoot errors quickly.

  • Manage Jobs with Databrick’s Python SDK


from databricks.sdk import WorkspaceClient

# Initialize Databricks workspace client
w = WorkspaceClient()

# Listing all jobs in the workspace
jobs = w.jobs.list()

# Display job details
for job in jobs:
    print(f"Job Name: {job.settings.name}, Job ID: {job.job_id}, Status: {job.current_run_state.life_cycle_state}")

# Trigger a specific job by its ID
triggered_job = w.jobs.run_now(job_id=12345)
print(f"Triggered Job: {triggered_job.run_id}")

# Delete a job by its ID
w.jobs.delete(job_id=12345)
print("Job deleted successfully.")

Output

Job Name: Daily_ETL_Pipeline, Job ID: 12345, Status: RUNNING
Job Name: Data_Cleanup, Job ID: 67890, Status: TERMINATED
Triggered Job: 112233
Job deleted successfully

Key Benefits:

Automate job management, save time on tracking jobs, and easily handle large data pipelines without extra effort.

How to Manage groups and user access Using Databricks Python SDK

Managing groups and user access in Databricks is crucial for ensuring secure and organized access control.

With the Databricks SDK, you can easily retrieve the list of groups and their members in your Databricks account.

Here's a step-by-step guide:

  1. Set Up the SDK and authenticate it with Databricks:

  2. Use the list method from the AccountGroupsAPI to get all the groups associated with your Databricks account.

  3. Once you have the group ID, use the group details to get members associated with each group.

Real-Life Example:

Imagine you're managing a large team with different roles in your organization. You need to retrieve group details to assign appropriate permissions. For instance:

  • Data Scientists should have access to datasets but not production jobs.

  • Data Engineers need broader access to manage the data pipeline.

By running this SDK script, you can list all groups, verify which users belong to each, and confirm their access aligns with the organization's policies.

Manage groups and user access Using Databricks Python SDK

from databricks.sdk import AccountClient

# Initialize the Databricks client
client = AccountClient()

# Retrieve and print group details
groups = client.groups.list()
for group in groups:
    print(f"Group Name: {group.display_name}, Group ID: {group.id}")

    # Get members of each group
    group_info = client.groups.get(id=group.id)
    print(f"Members of {group_info.display_name}:")
    for member in group_info.members:
        print(f"  - Member ID: {member.value}")

Output:

Group Name: Data Engineers, Group ID: 12345
Members of Data Engineers:
  - Member ID: user1@example.com
  - Member ID: user2@example.com

Group Name: Data Scientists, Group ID: 67890
Members of Data Scientists:
  - Member ID: user3@example.com
  - Member ID: user4@example.com

Explanation: This method helps you Easily manage user roles and groups. Automate tasks and improve security by making sure each group has the right members with proper permissions.

Here’s how you can use the AccountServicePrincipalsAPI to manage service principals in Databricks. Service principals are useful when running production jobs, CI/CD pipelines, or automated scripts in Databricks.

Managing Databricks Service Principals using SDK

What: Service Principals in Databricks are identities used by automated tools, scripts, apps, or CI/CD platforms to interact with Databricks. By using service principals, you can restrict interactive users' access to production systems, lowering the risk of unintended data modification.

How:

The AccountServicePrincipalsAPI provides methods to create, update, list, and delete service principals in your Databricks account.

Key Methods:

  1. Create Service Principal:

    • Create a new service principal with optional parameters like active status and display name.

    • Method: create

    import time from databricks.sdk import AccountClient a = AccountClient() # Create a new service principal with active status and a unique display name sp_create = a.service_principals.create(active=True, display_name=f'sdk-{time.time_ns()}') print(f"Service Principal Created: {sp_create.display_name}, ID: {sp_create.id}")


  2. Get Service Principal:

    • Fetch details of an existing service principal using its unique ID.

    • Method: get

    # Fetch service principal details sp = a.service_principals.get(id=sp_create.id) print(f"Service Principal Details: {sp.display_name}, Active: {sp.active}")


  3. Update Service Principal:

    • Update an existing service principal, such as changing its active status or display name.

    • Method: update

    # Update the service principal, changing active status to False a.service_principals.update(id=sp_create.id, active=False, display_name=sp_create.display_name) updated_sp = a.service_principals.get(id=sp_create.id) print(f"Updated Service Principal: {updated_sp.display_name}, Active: {updated_sp.active}")


  4. List Service Principals:

    • List all service principals in your Databricks account with filters and sorting options.

    • Method: list

    # List all service principals with a filter for display name sp_list = a.service_principals.list(filter=f"displayName eq '{sp_create.display_name}'") for sp in sp_list: print(f"Found Service Principal: {sp.display_name}, ID: {sp.id}")


  5. Delete Service Principal:

    • Delete a service principal using its ID.

    • Method: delete

    # Delete the service principal a.service_principals.delete(id=sp_create.id) print(f"Service Principal {sp_create.display_name} deleted.")

Real-Life Example:

You are tasked with managing CI/CD pipelines for Databricks in a large organization. For security reasons, all jobs should run using service principals instead of individual user accounts. This way, users won’t accidentally modify production data.

With this SDK, you can automate the creation of service principals for each project or tool, allowing specific permissions for jobs to run securely in the production environment.

Code Example:

Here’s how to create, update, list, and delete a service principal using the Databricks SDK:

import time
from databricks.sdk import AccountClient

# Initialize the Databricks client
a = AccountClient()

# 1. Create a new service principal
sp_create = a.service_principals.create(active=True, display_name=f'sdk-{time.time_ns()}')
print(f"Created Service Principal: {sp_create.display_name}, ID: {sp_create.id}")

# 2. Get details of the created service principal
sp = a.service_principals.get(id=sp_create.id)
print(f"Service Principal Details: {sp.display_name}, Active: {sp.active}")

# 3. Update the service principal to deactivate it
a.service_principals.update(id=sp_create.id, active=False)
updated_sp = a.service_principals.get(id=sp_create.id)
print(f"Updated Service Principal: {updated_sp.display_name}, Active: {updated_sp.active}")

# 4. List all service principals, filtered by display name
sp_list = a.service_principals.list(filter=f"displayName eq '{sp_create.display_name}'")
for sp in sp_list:
    print(f"Listed Service Principal: {sp.display_name}, ID: {sp.id}")

# 5. Delete the service principal
a.service_principals.delete(id=sp_create.id)
print(f"Deleted Service Principal: {sp_create.display_name}")

)

Output:

Created Service Principal: sdk-1695640136898, ID: 12345
Service Principal Details: sdk-1695640136898, Active: True
Updated Service Principal: sdk-1695640136898, Active: False
Listed Service Principal: sdk-1695640136898, ID: 12345
Deleted Service Principal: sdk-1695640136898

Explanation:

  • Security: By using service principals instead of user accounts for production tasks, you can minimize risks and control access to sensitive data.

  • Automation: Automating service principal creation ensures that your production jobs and scripts run with the appropriate security context.

  • Scalability: You can scale up service principal management across multiple workspaces and jobs efficiently.

Using Databricks SDK with service principals simplifies account management and enforces better security for production environments.

How to Manage Files in Unity Catalog Volume using Databricks Python SDK

  • Unity Catalog Volumes allow you to manage data files within the Databricks Lakehouse.

  • Using Databricks Python SDK, you can interact with these files, organize them, and automate management tasks.

How to implement it:

  1. Install the Databricks SDK and Set your DATABRICKS_HOST and DATABRICKS_TOKEN as environment variables for authentication.

  2. Access Unity Catalog Volumes:

    • Use the Python SDK to access and manage files stored in Unity Catalog volumes.

    • Unity Catalog Volumes are treated like object stores, so file handling becomes easier within Databricks.

  3. Retrieve a list of all files stored in a Unity Catalog volume using volumes.list_files().

  4. Automate the uploading of files into a specific volume using volumes.upload_file().

  5. Use volumes.delete_file() to remove unnecessary files and manage storage efficiently.

  6. Read data directly from the volume into your notebook or program for processing using Spark.

Real-Life Example: Suppose your team is working with datasets stored in a Unity Catalog volume for a large analytics project. You need to regularly add new datasets, clean up outdated files, and ensure that your environment is well-organized.

Instead of manually uploading and deleting files via the UI, you can automate this entire process using the Databricks Python SDK. This lets you maintain an organized file structure and focus on analysis rather than file management.

#WhatsTheData #DataEngineering #Databricks

Manage Files in Unity Catalog Volume using Databricks SDK

from databricks.sdk import WorkspaceClient

# Initialize the workspace client
w = WorkspaceClient()

# List all files in a Unity Catalog Volume
volume_name = 'catalog.volume_name'
files = w.volumes.list_files(volume_name=volume_name)

# Print file names in the volume
for file in files:
    print(f"File Name: {file.path}")

# Upload a new file to the volume
w.volumes.upload_file(volume_name=volume_name, local_path='/path/to/file.csv', remote_path='file.csv')

# Delete an unnecessary file from the volume
w.volumes.delete_file(volume_name=volume_name, file_path='old_file.csv')
print("Old file deleted.")

Output:

File Name: dataset1.csv
File Name: dataset2.csv
File uploaded successfully: file.csv
Old file deleted

Key Benefits:

Automate file management, organize datasets, and scale easily in Unity Catalog without using the UI.

Here’s how you can use the QueriesAPI to create, update, list, and delete queries in Databricks using the Python SDK.

Managing Databricks SQL Queries using SDK

What: The Queries API allows you to manage SQL queries in Databricks. You can create, update, delete, and list queries. Queries are SQL objects that target a SQL warehouse and can be scheduled using the Jobs API for automated execution.

How:

The QueriesAPI provides methods to perform various operations on queries, such as creating new queries, listing existing ones, or updating/deleting them.

Key Methods:

  1. Create a Query:

    • You can create a new query by specifying the SQL text, warehouse ID, and other optional details like description.

    • Method: create

      import time
      from databricks.sdk import WorkspaceClient
      from databricks.sdk.service import sql
      
      # Initialize the Workspace client
      w = WorkspaceClient()
      
      # Get the available data sources (SQL warehouses)
      srcs = w.data_sources.list()
      
      # Create a new query
      query = w.queries.create(query=sql.CreateQueryRequestQuery(
          display_name=f'sdk-{time.time_ns()}',
          warehouse_id=srcs[0].warehouse_id,
          description="test query from SDK",
          query_text="SHOW TABLES"
      ))
      
      print(f"Created Query: {query.display_name}, ID: {query.id}")
      
      

    )

  2. Get a Query:

    • Fetch details of a specific query using its ID.

    • Method: get

      # Fetch the query details by ID
      fetched_query = w.queries.get(id=query.id)
      print(f"Fetched Query: {fetched_query.display_name}, Text: {fetched_query.query_text}")
      
      

    )

  3. Update a Query:

    • Update an existing query by specifying the query's ID and fields to modify.

    • Method: update

      # Update the query by changing its description and SQL text
      updated_query = w.queries.update(
          id=query.id,
          query=sql.UpdateQueryRequestQuery(
              display_name=f'sdk-updated-{time.time_ns()}',
              description="UPDATED: test query",
              query_text="SELECT 2 + 2"
          ),
          update_mask="display_name,description,query_text"
      )
      
      print(f"Updated Query: {updated_query.display_name}, New Text: {updated_query.query_text}")
  4. List Queries:

    • List all queries accessible to the user, optionally paginated.

    • Method: list

    # List all queries created by the user query_list = w.queries.list(page_size=5) for q in query_list: print(f"Query ID: {q.id}, Name: {q.display_name}")

  5. Delete a Query:

    • Move a query to the trash. The query can still be restored from the UI for 30 days before permanent deletion.

    • Method: delete

    # Delete the created query w.queries.delete(id=query.id) print(f"Deleted Query: {query.display_name}")

Real-Life Example:

Let’s say you’re managing a SQL analysis team that regularly creates and schedules SQL queries for data insights. By automating query management through the Databricks SDK, you can dynamically create, update, or delete queries, and schedule them in a CI/CD pipeline.

For instance, you can automate query updates when business logic changes, ensuring that your scheduled jobs always run the latest version of the SQL query.

Code Example:

Here’s an example that demonstrates creating, fetching, updating, listing, and deleting a query:

import time
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import sql

# Initialize the Workspace client
w = WorkspaceClient()

# 1. Get the available data sources (SQL warehouses)
srcs = w.data_sources.list()

# 2. Create a new query
query = w.queries.create(query=sql.CreateQueryRequestQuery(
    display_name=f'sdk-{time.time_ns()}',
    warehouse_id=srcs[0].warehouse_id,
    description="test query from SDK",
    query_text="SHOW TABLES"
))

print(f"Created Query: {query.display_name}, ID: {query.id}")

# 3. Fetch the query details
fetched_query = w.queries.get(id=query.id)
print(f"Fetched Query: {fetched_query.display_name}, Text: {fetched_query.query_text}")

# 4. Update the query
updated_query = w.queries.update(
    id=query.id,
    query=sql.UpdateQueryRequestQuery(
        display_name=f'sdk-updated-{time.time_ns()}',
        description="UPDATED: test query",
        query_text="SELECT 2 + 2"
    ),
    update_mask="display_name,description,query_text"
)
print(f"Updated Query: {updated_query.display_name}, New Text: {updated_query.query_text}")

# 5. List queries
query_list = w.queries.list(page_size=5)
for q in query_list:
    print(f"Query ID: {q.id}, Name: {q.display_name}")

# 6. Delete the query
w.queries.delete(id=query.id)
print(f"Deleted Query: {query.display_name}")

Output:

Created Query: sdk-1695640136898, ID: 12345
Fetched Query: sdk-1695640136898, Text: SHOW TABLES
Updated Query: sdk-updated-1695640139898, New Text: SELECT 2 + 2
Query ID: 12345, Name: sdk-updated-1695640139898
Deleted Query: sdk-updated-1695640139898

Explanation:

  • Automation: This approach simplifies query management in a scalable way, allowing you to programmatically create, update, and delete SQL queries in Databricks.

  • Scheduling Queries: You can use the Jobs API to schedule the queries created via the Queries API, making it easy to run them at regular intervals.

  • Real-Time Control: The SDK allows you to modify queries dynamically based on changing requirements, ensuring your data pipelines and reports remain up-to-date.

Using the Databricks SDK for managing queries enables greater flexibility in SQL operations, especially in automated environments like CI/CD and large-scale data operations.

Here’s a structured overview of the Databricks Vector Search Endpoints and Indexes APIs, which allow you to create and manage vector search indexes for efficient querying.

Managing Vector Search Endpoints and Indexes in Databricks

What: The Vector Search APIs in Databricks provide a way to manage compute resources (endpoints) for hosting vector search indexes. These indexes allow for efficient approximate nearest neighbor (ANN) search queries on embedding vectors, which is critical for applications like recommendation systems, image search, and natural language processing.

How: You can perform CRUD operations on endpoints and indexes, including creating, deleting, listing, and querying indexes.

Key Methods:

  • Vector Search Endpoints API

  1. Create an Endpoint:

    • Method: create_endpoint

    • Parameters:

      • name: Name of the endpoint

      • endpoint_type: Type of endpoint (e.g., Delta Sync or Direct Access)

    • Usage:

      from databricks.sdk.service.vectorsearch import VectorIndexType
      
      index = w.vector_search_indexes.create_index(
          name="my-vector-index",
          endpoint_name="my-vector-endpoint",
          primary_key="id",
          index_type=VectorIndexType.DELTA_SYNC
      )
  2. Get an Endpoint:

    • Method: get_endpoint

    • Parameters:

      • endpoint_name: Name of the endpoint

    • Usage:

    endpoint_info = w.vector_search_endpoints.get_endpoint("my-vector-endpoint") print(endpoint_info)

  3. List Endpoints:

    • Method: list_endpoints

    • Usage:

    for ep in w.vector_search_endpoints.list_endpoints(): print(ep.name)

  4. Delete an Endpoint:

    • Method: delete_endpoint

    • Parameters:

      • endpoint_name: Name of the endpoint to delete

    • Usage:

    w.vector_search_endpoints.delete_endpoint("my-vector-endpoint")

  • Vector Search Indexes API

  1. Create an Index:

    • Method: create_index

    • Parameters:

      • name: Name of the index

      • endpoint_name: Name of the endpoint for serving the index

      • primary_key: Primary key of the index

      • index_type: Type of index (DELTA_SYNC or DIRECT_ACCESS)

    • Usage:

    from databricks.sdk import WorkspaceClient
    from databricks.sdk.service.vectorsearch import EndpointType
    
    w = WorkspaceClient()
    endpoint = w.vector_search_endpoints.create_endpoint("my-vector-endpoint", EndpointType.DELTA_SYNC)

    Get an Index:

    • Method: get_index

    • Parameters:

      • index_name: Name of the index

    • Usage:

    index_info = w.vector_search_indexes.get_index("my-vector-index") print(index_info)

  2. List Indexes:

    • Method: list_indexes

    • Parameters:

      • endpoint_name: Name of the endpoint

    • Usage:

    for idx in w.vector_search_indexes.list_indexes("my-vector-endpoint"): print(idx.name)

  3. Delete an Index:

    • Method: delete_index

    • Parameters:

      • index_name: Name of the index

    • Usage:

    w.vector_search_indexes.delete_index("my-vector-index")

  4. Query an Index:

    • Method: query_index

    • Parameters:

      • index_name: Name of the index to query

      • columns: List of columns to include in the response

      • query_vector: List of floats representing the query vector

    • Usage:

    results = w.vector_search_indexes.query_index( index_name="my-vector-index", columns=["id", "embedding"], query_vector=[0.1, 0.2, 0.3] # example vector )

Real-Life Example:

Imagine you’re building a recommendation system for an e-commerce platform. You can use vector search to efficiently find similar products based on user preferences. By creating a Delta Sync index, your system will automatically update the index as product details change, ensuring that recommendations are always relevant.

For instance, when a new product is added or an existing one is modified, the Delta Sync index will keep your vector search optimized and responsive, helping customers discover products more easily.

Code Example:

Here’s a complete example of creating an endpoint, creating an index, querying it, and then deleting them.

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.vectorsearch import EndpointType, VectorIndexType

# Initialize the Workspace client
w = WorkspaceClient()

# 1. Create a new vector search endpoint
endpoint = w.vector_search_endpoints.create_endpoint("my-vector-endpoint", EndpointType.DELTA_SYNC)

# 2. Create a new index on the endpoint
index = w.vector_search_indexes.create_index(
    name="my-vector-index",
    endpoint_name="my-vector-endpoint",
    primary_key="id",
    index_type=VectorIndexType.DELTA_SYNC
)

# 3. Query the index with a sample vector
query_results = w.vector_search_indexes.query_index(
    index_name="my-vector-index",
    columns=["id", "embedding"],
    query_vector=[0.1, 0.2, 0.3]  # Example query vector
)
print("Query Results:", query_results)

# 4. Clean up: delete the index and endpoint
w.vector_search_indexes.delete_index("my-vector-index")
w.vector_search_endpoints.delete_endpoint("my-vector-endpoint")

Output:

Query Results: [<List of matching results based on the query vector>

Explanation:

  • Efficiency: By utilizing the Vector Search APIs, you can efficiently manage and query large datasets, making your applications responsive and scalable.

  • Automatic Updates: Delta Sync indexes automatically update as data changes, ensuring real-time accuracy in search results.

  • Flexible Management: You can create and manage multiple endpoints and indexes to tailor your vector search capabilities to different use cases.

Using the Databricks Vector Search APIs helps streamline the management of vector data, enhancing the performance of machine learning applications and data retrieval processes.

Table of Content

Title

Subscribe to get notified.

Subscribe to get notified.

Subscribe to get notified.

Want to hear about our latest Datalakehouse and Databricks learnings?

Subscribe to get notified.

Want to hear about our latest Datalakehouse and Databricks learnings?

Subscribe to get notified.

Make your data engineering process efficient and cost effective. Feel free to reach for a data infrastructure audit.

How WTD Can help

- Data experts for implementing projects

- On-demand data team for support

Make your data engineering process efficient and cost effective. Feel free to reach for a data infrastructure audit.

How WTD Can help

- Data experts for implementing projects

- On-demand data team for support

Make your data engineering process efficient and cost effective. Feel free to reach for a data infrastructure audit.

How WTD Can help

- Data experts for implementing projects

- On-demand data team for support