Lakeflow Declarative Pipelines
Overview
Lakeflow Declarative Pipeline (LDP) in Databricks is a declarative framework for building and managing reliable, scalable, and maintainable data pipelines
Installation
- Copy the Package ID:
@coalesce/databricks/lakeflow-declarative-pipelines - In Coalesce, open the Workspace where you wish to install the package.
- Go to the Build Setting of the Workspace, tab Packages, and click the Install button on the top right of the page.
- Paste the Package ID, and proceed with the installation process.
Description
Lakeflow Declarative Pipelines
Brief Summary
The Coalesce LDP node type allows you to create a streaming table. A streaming table is a form of Unity Catalog managed table that is also a streaming target for Lakeflow Declarative Pipelines.
Lakeflow Declarative Pipelines is a declarative framework for developing and running batch and streaming data pipelines in SQL and Python. Lakeflow Declarative Pipelines runs on the performance-optimized Databricks Runtime (DBR), and the Lakeflow Declarative Pipelines flows API uses the same DataFrame API as Apache Spark and Structured Streaming. Common use cases for Lakeflow Declarative Pipelines include incremental data ingestion from sources such as cloud storage (including Amazon S3, Azure ADLS Gen2, and Google Cloud Storage) and message buses (such as Apache Kafka, Amazon Kinesis, Google Pub/Sub, Azure EventHub, and Apache Pulsar), incremental batch and streaming transformations with stateless and stateful operators, and real-time stream processing between transactional stores like message buses and databases.
A streaming table is a Delta table with additional support for streaming or incremental data processing. A streaming table can be targeted by one or more flows in an ETL pipeline.
Key points
1.Databricks by default creates tables with lowercase.Hence,it is better to keep table names in lowercase.https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html
2.The node which loads from a file creates a streaming table.For further processing,Re-Sync the columns in the mapping grid using Re-Sync columns button. The streaming table can be re-created with the Columns inferred using Include Columns Inferred option.
3.The streaming tables can be recreated and refreshed if there is a need to drop the inferred columns or add transformations to columns inferred in previous step.The structure of the streaming table is refreshed only on enabling the 'Refresh Stream' Option
LDP Node Configuration
The LDP has two configuration groups:
- LDP Node Properties
- General Options
- LDP Options
- File Options
- General file load Options
- File format options
- Advanced file load options
- Schedule Options
LDP Node Properties
There are four configs within the Node Properties group.
| Property | Description |
|---|---|
| Storage Location | Storage Location where the Materialized View will be created |
| Node Type | Name of template used to create node objects |
| Description | A description of the node's purpose |
| Deploy Enabled | If TRUE the node will be deployed / redeployed when changes are detected If FALSE the node will not be deployed or will be dropped during redeployment |
General Options
| Option | Description |
|---|---|
| Create As | Choose materialization type-by default it is STREAMING TABLE |
| Schedule refresh | True / False toggle - True: Schedule Option will be visible - False: Schedule Option will be disabled |
LDP options
| Option | Description |
|---|---|
| Type of Lakeflow Declarative Pipeline | - Streaming Table |
| Read Files | True / False toggle - True: Allows loading from an external cloud location into a streaming table. - False: Allows loading from a table source. |
| Include Columns Inferred | Enables recreating the streaming table with added transformations after syncing columns. |
| Table Properties | Defines table properties like quality. |
| Refresh Stream | Enables full refresh of the table if any changes in structure occur. |
| Partition By | Specifies the columns used for partitioning the streaming table. |
| Table Constraints | - Primary Key - Foreign Key |
| Other Constraints | - Column Name - Expectation Expression - On Violation Action |
File options
| Option | Description |
|---|---|
| File Location | External cloud location of the file. |
| File Name/File Pattern | The pattern to identify files to be processed. |
| File Type | - Options: CSV, JSON, Parquet, Avro, ORC, Text, XML. |
| Advanced File Load Options | Toggle to enable additional loading options for files. |
| Advanced File Format Options | Toggle to enable additional formatting options for files. |
General File Load options
| Option | Description |
|---|---|
| Schema Definition | The schema to define the structure of the input data. |
| Override inferred schema columns | The schema to define the structure of the input data. |
| Infer Exact Column Types | Toggles whether to infer column data types accurately. |
| Ignore Corrupt Files | Toggle to ignore files that are corrupt during processing. |
| Ignore Missing Files | Toggle to ignore files that are missing during processing. |
| Partition Columns | Comma-separated list of Hive-style partition columns to include |
Advanced File Load options
| Option | Description |
|---|---|
| Process files modified after (Ex '2024-01-01T00:00:00Z') | Only process files modified after this timestamp |
| Process files modified before (Ex'2024-12-31T23:59:59Z') | Only process files modified before this timestamp |
| Recursive file lookup | Search nested directories regardless of naming scheme. Default: false |
| Use strict glob pattern matching | Use strict glob pattern matching. Default: true |
File Format options
PARQUET Options
| Group | Option | Description |
|---|---|---|
| Options | Merge Schema | Infer and merge schema across multiple files. Default: false |
| Options | Column name for rescued data column | Column name to collect data that doesn't match schema |
| Advanced | Datetime rebase mode | Rebasing DATE/TIMESTAMP between Julian and Proleptic Gregorian calendars. Options: EXCEPTION, LEGACY, CORRECTED. Default: LEGACY |
| Advanced | Integer rebase mode | Rebasing INT96 timestamps. Options: EXCEPTION, LEGACY, CORRECTED. Default: LEGACY |
AVRO Options
| Group | Option | Description |
|---|---|---|
| Options | avroSchema | User-provided Avro schema (can be evolved schema) |
| Options | Merge Schema | Infer and merge schema across files. Default: false |
| Options | Column name for rescued data column | Column name to collect data that doesn't match schema |
| Advanced | Datetime rebase mode | Rebasing DATE/TIMESTAMP between Julian and Proleptic Gregorian calendars. Options: EXCEPTION, LEGACY, CORRECTED. Default: LEGACY |
| Advanced | Integer rebase mode | Rebasing INT96 timestamps. Options: EXCEPTION, LEGACY, CORRECTED. Default: LEGACY |
ORC Options
| Group | Option | Description |
|---|---|---|
| Options | Merge Schema | Infer and merge schema across files. Default: false |
XML Options
| Group | Option | Description |
|---|---|---|
| Options | Row Tag | Required. XML element to treat as a row (e.g., 'book' for <books><book>...</book></books>) |
| Options | Encoding | Character encoding. Default: UTF-8 |
| Options | String representation of null | String representation of null. Default: null |
| Options | Column name for rescued data column | Column name to collect data that doesn't match schema |
| Advanced | Prefix for attribute | Prefix for attribute field names. Default: _ |
| Advanced | valueTag | Tag for character data in elements with attributes. Default: _VALUE |
| Advanced | Exclude attributes from elements | Exclude attributes from elements. Default: false |
| Advanced | Skip surrounding whitespace | Skip surrounding whitespace. Default: true |
| Advanced | Ignore namespace | Ignore namespace prefixes. Default: false |
| Advanced | Sampling Ratio | Fraction of rows for schema inference. Default: 1.0 |
| Advanced | Mode | Corrupt record handling. Options: PERMISSIVE, DROPMALFORMED, FAILFAST. Default: PERMISSIVE |
| Advanced | Date Parsing Format | Date parsing format. Default: yyyy-MM-dd |
| Advanced | Timestamp Parsing Format | Timestamp parsing format. Default: yyyy-MM-dd'T'HH:mm .SSS [XXX] |
| Advanced | Path to XSD for row validation | Path to XSD for row validation |
TEXT Options
| Group | Option | Description |
|---|---|---|
| Options | encoding | Character encoding. Default: UTF-8 |
| Options | Line Separator | Line separator string. Default: auto-detects \r, \r\n, \n |
| Advanced | Whole text | Read entire file as single record. Default: false |
JSON Options
| Group | Option | Description |
|---|---|---|
| Options | Encoding | Character encoding. Default: UTF-8 |
| Options | quote | Quote character. Default: " |
| Options | escape | Escape character. Default: \ |
| Advanced | Skip surrounding whitespace | Skip surrounding whitespace. Default: true |
| Options | Number of rows to skip from beginning | Number of rows to skip from beginning. Default: 0 |
| Options | Column name for rescued data column | Column name for rescued data |
| Advanced | MultiLine | Records span multiple lines. Default: false |
| Advanced | Mode | Options: PERMISSIVE, DROPMALFORMED, FAILFAST. Default: PERMISSIVE |
| Advanced | Character indicating line comment | Character indicating line comment. Default: disabled |
| Advanced | dateFormat | Date parsing format. Default: yyyy-MM-dd |
| Advanced | timestampFormat | Timestamp parsing format |
| Advanced | Ignore Leading WhiteSpaces | Default: false |
| Advanced | ignore Trailing WhiteSpaces | toggleButton |
| Advanced | Allowcomments | Allow comments in JSON data. Default: false |
| Advanced | AllowUnquotedFieldNames | Allow unquoted field names. Default: false |
| Advanced | InferPrimitiveTypes | Infer primitive types correctly. Default: true |
CSV Options
| Group | Option | Description |
|---|---|---|
| Options | Delimiter | Delimiter used in csv files |
| Options | Header added | Header added in csv file |
| Options | Encoding | Character encoding. Default: UTF-8 |
| Options | quote | Quote character. Default: " |
| Options | escape | Escape character. Default: \ |
| Advanced | Skip surrounding whitespace | Skip surrounding whitespace. Default: true |
| Options | Number of rows to skip from beginning | Number of rows to skip from beginning. Default: 0 |
| Options | Column name for rescued data column | Column name for rescued data |
| Advanced | MultiLine | Records span multiple lines. Default: false |
| Advanced | Mode | Options: PERMISSIVE, DROPMALFORMED, FAILFAST. Default: PERMISSIVE |
| Advanced | Character indicating line comment | Character indicating line comment. Default: disabled |
| Advanced | dateFormat | Date parsing format. Default: yyyy-MM-dd |
| Advanced | timestampFormat | Timestamp parsing format |
| Advanced | Ignore Leading WhiteSpaces | Default: false |
| Advanced | ignore Trailing WhiteSpaces | toggleButton |
Schedule Options
Schedule Options is available only when Schedule Refresh toggle is True
| Option | Description |
|---|---|
| Task Schedule | Options in Task Schedule - Periodic Schedule - CRON |
| Schedule refesh-time period | Available when Task Schedule is Set to Periodic Schedule Options in Schedule refesh-time period -Every Hours -Every Days -Every Weeks |
| Specific interval of periodic refresh(integer value) | Available when Task Schedule is Set to Periodic Schedule |
| CRON string | Available when Task Schedule is Set to CRON |
| CRON TIME ZONE | Available when Task Schedule is Set to CRON |
LDP Deployment
Initial Deployment
When deployed for the first time into an environment DLT node will execute three stages:
| Stage | Description |
|---|---|
| Create Streaming table | This stage will execute a CREATE OR REFRESH statement and create a Streaming table in the target environment |
Redeployment
After the Streaming table has deployed for the first time into a target environment, subsequent deployments may result in either altering the Streaming table or recreating the Streaming table.
If a Streaming table is to be altered this will run the following stage:
Altering the Streaming table
The following config changes trigger ALTER statements:
- Add schedule
- Alter schedule
- Drop schedule
These execute the two stages:
| Stage | Description |
|---|---|
| Alter Streaming table | Executes ALTER to modify parameters |
Recreating the Streaming table
If anything changes other than the configuration options specified above then the Streaming table will be recreated by running a CREATE OR REPLACE statement.
Undeployment
If a Streaming table is deleted from a Workspace, that Workspace is committed to Git and that commit deployed to a higher-level environment then the Streaming table in the target environment will be dropped.
This is executed as a single stage:
| Stage | Description |
|---|---|
| Drop Streaming table | Removes the Streaming table |
Code
Versions
Available versions of the package.
| Version # | Release Date | Notes |
|---|---|---|
| 1.1.0 | February 12, 2026 | File format and Advanced file format options ,support added for AVRO,ORC,TEXT and XML |
| 1.0.1 | January 09, 2026 | Parquet file type support added |
| 1.0.0 | August 01, 2025 | LDP node type added |