Skip to main content

Lakeflow Declarative Pipelines

Overview

Lakeflow Declarative Pipeline (LDP) in Databricks is a declarative framework for building and managing reliable, scalable, and maintainable data pipelines

Installation

  1. Copy the Package ID: @coalesce/databricks/lakeflow-declarative-pipelines
  2. In Coalesce, open the Workspace where you wish to install the package.
  3. Go to the Build Setting of the Workspace, tab Packages, and click the Install button on the top right of the page.
  4. Paste the Package ID, and proceed with the installation process.

Description

Lakeflow Declarative Pipelines

Brief Summary

The Coalesce LDP node type allows you to create a streaming table. A streaming table is a form of Unity Catalog managed table that is also a streaming target for Lakeflow Declarative Pipelines.

Lakeflow Declarative Pipelines is a declarative framework for developing and running batch and streaming data pipelines in SQL and Python. Lakeflow Declarative Pipelines runs on the performance-optimized Databricks Runtime (DBR), and the Lakeflow Declarative Pipelines flows API uses the same DataFrame API as Apache Spark and Structured Streaming. Common use cases for Lakeflow Declarative Pipelines include incremental data ingestion from sources such as cloud storage (including Amazon S3, Azure ADLS Gen2, and Google Cloud Storage) and message buses (such as Apache Kafka, Amazon Kinesis, Google Pub/Sub, Azure EventHub, and Apache Pulsar), incremental batch and streaming transformations with stateless and stateful operators, and real-time stream processing between transactional stores like message buses and databases.

A streaming table is a Delta table with additional support for streaming or incremental data processing. A streaming table can be targeted by one or more flows in an ETL pipeline.

Key points

1.Databricks by default creates tables with lowercase.Hence,it is better to keep table names in lowercase.https://docs.databricks.com/en/sql/language-manual/sql-ref-names.html

2.The node which loads from a file creates a streaming table.For further processing,Re-Sync the columns in the mapping grid using Re-Sync columns button. The streaming table can be re-created with the Columns inferred using Include Columns Inferred option.

dlt-resync

3.The streaming tables can be recreated and refreshed if there is a need to drop the inferred columns or add transformations to columns inferred in previous step.The structure of the streaming table is refreshed only on enabling the 'Refresh Stream' Option

LDP Node Configuration

The LDP has two configuration groups:

LDP Node Properties

There are four configs within the Node Properties group.

PropertyDescription
Storage LocationStorage Location where the Materialized View will be created
Node TypeName of template used to create node objects
DescriptionA description of the node's purpose
Deploy EnabledIf TRUE the node will be deployed / redeployed when changes are detected
If FALSE the node will not be deployed or will be dropped during redeployment

General Options

OptionDescription
Create AsChoose materialization type-by default it is STREAMING TABLE
Schedule refreshTrue / False toggle
- True: Schedule Option will be visible
- False: Schedule Option will be disabled

LDP options

OptionDescription
Type of Lakeflow Declarative Pipeline- Streaming Table
Read FilesTrue / False toggle
- True: Allows loading from an external cloud location into a streaming table.
- False: Allows loading from a table source.
Include Columns InferredEnables recreating the streaming table with added transformations after syncing columns.
Table PropertiesDefines table properties like quality.
Refresh StreamEnables full refresh of the table if any changes in structure occur.
Partition BySpecifies the columns used for partitioning the streaming table.
Table Constraints- Primary Key
- Foreign Key
Other Constraints- Column Name
- Expectation Expression
- On Violation Action

File options

OptionDescription
File LocationExternal cloud location of the file.
File Name/File PatternThe pattern to identify files to be processed.
File Type- Options: CSV, JSON, Parquet, Avro, ORC, Text, XML.
Advanced File Load OptionsToggle to enable additional loading options for files.
Advanced File Format OptionsToggle to enable additional formatting options for files.

General File Load options

OptionDescription
Schema DefinitionThe schema to define the structure of the input data.
Override inferred schema columnsThe schema to define the structure of the input data.
Infer Exact Column TypesToggles whether to infer column data types accurately.
Ignore Corrupt FilesToggle to ignore files that are corrupt during processing.
Ignore Missing FilesToggle to ignore files that are missing during processing.
Partition ColumnsComma-separated list of Hive-style partition columns to include

Advanced File Load options

OptionDescription
Process files modified after (Ex '2024-01-01T00:00:00Z')Only process files modified after this timestamp
Process files modified before (Ex'2024-12-31T23:59:59Z')Only process files modified before this timestamp
Recursive file lookupSearch nested directories regardless of naming scheme. Default: false
Use strict glob pattern matchingUse strict glob pattern matching. Default: true

File Format options

PARQUET Options
GroupOptionDescription
OptionsMerge SchemaInfer and merge schema across multiple files. Default: false
OptionsColumn name for rescued data columnColumn name to collect data that doesn't match schema
AdvancedDatetime rebase modeRebasing DATE/TIMESTAMP between Julian and Proleptic Gregorian calendars. Options: EXCEPTION, LEGACY, CORRECTED. Default: LEGACY
AdvancedInteger rebase modeRebasing INT96 timestamps. Options: EXCEPTION, LEGACY, CORRECTED. Default: LEGACY
AVRO Options
GroupOptionDescription
OptionsavroSchemaUser-provided Avro schema (can be evolved schema)
OptionsMerge SchemaInfer and merge schema across files. Default: false
OptionsColumn name for rescued data columnColumn name to collect data that doesn't match schema
AdvancedDatetime rebase modeRebasing DATE/TIMESTAMP between Julian and Proleptic Gregorian calendars. Options: EXCEPTION, LEGACY, CORRECTED. Default: LEGACY
AdvancedInteger rebase modeRebasing INT96 timestamps. Options: EXCEPTION, LEGACY, CORRECTED. Default: LEGACY
ORC Options
GroupOptionDescription
OptionsMerge SchemaInfer and merge schema across files. Default: false
XML Options
GroupOptionDescription
OptionsRow TagRequired. XML element to treat as a row (e.g., 'book' for <books><book>...</book></books>)
OptionsEncodingCharacter encoding. Default: UTF-8
OptionsString representation of nullString representation of null. Default: null
OptionsColumn name for rescued data columnColumn name to collect data that doesn't match schema
AdvancedPrefix for attributePrefix for attribute field names. Default: _
AdvancedvalueTagTag for character data in elements with attributes. Default: _VALUE
AdvancedExclude attributes from elementsExclude attributes from elements. Default: false
AdvancedSkip surrounding whitespaceSkip surrounding whitespace. Default: true
AdvancedIgnore namespaceIgnore namespace prefixes. Default: false
AdvancedSampling RatioFraction of rows for schema inference. Default: 1.0
AdvancedModeCorrupt record handling. Options: PERMISSIVE, DROPMALFORMED, FAILFAST. Default: PERMISSIVE
AdvancedDate Parsing FormatDate parsing format. Default: yyyy-MM-dd
AdvancedTimestamp Parsing FormatTimestamp parsing format. Default: yyyy-MM-dd'T'HH:mm
.SSS
[XXX]
AdvancedPath to XSD for row validationPath to XSD for row validation
TEXT Options
GroupOptionDescription
OptionsencodingCharacter encoding. Default: UTF-8
OptionsLine SeparatorLine separator string. Default: auto-detects \r, \r\n, \n
AdvancedWhole textRead entire file as single record. Default: false
JSON Options
GroupOptionDescription
OptionsEncodingCharacter encoding. Default: UTF-8
OptionsquoteQuote character. Default: "
OptionsescapeEscape character. Default: \
AdvancedSkip surrounding whitespaceSkip surrounding whitespace. Default: true
OptionsNumber of rows to skip from beginningNumber of rows to skip from beginning. Default: 0
OptionsColumn name for rescued data columnColumn name for rescued data
AdvancedMultiLineRecords span multiple lines. Default: false
AdvancedModeOptions: PERMISSIVE, DROPMALFORMED, FAILFAST. Default: PERMISSIVE
AdvancedCharacter indicating line commentCharacter indicating line comment. Default: disabled
AdvanceddateFormatDate parsing format. Default: yyyy-MM-dd
AdvancedtimestampFormatTimestamp parsing format
AdvancedIgnore Leading WhiteSpacesDefault: false
Advancedignore Trailing WhiteSpacestoggleButton
AdvancedAllowcommentsAllow comments in JSON data. Default: false
AdvancedAllowUnquotedFieldNamesAllow unquoted field names. Default: false
AdvancedInferPrimitiveTypesInfer primitive types correctly. Default: true
CSV Options
GroupOptionDescription
OptionsDelimiterDelimiter used in csv files
OptionsHeader addedHeader added in csv file
OptionsEncodingCharacter encoding. Default: UTF-8
OptionsquoteQuote character. Default: "
OptionsescapeEscape character. Default: \
AdvancedSkip surrounding whitespaceSkip surrounding whitespace. Default: true
OptionsNumber of rows to skip from beginningNumber of rows to skip from beginning. Default: 0
OptionsColumn name for rescued data columnColumn name for rescued data
AdvancedMultiLineRecords span multiple lines. Default: false
AdvancedModeOptions: PERMISSIVE, DROPMALFORMED, FAILFAST. Default: PERMISSIVE
AdvancedCharacter indicating line commentCharacter indicating line comment. Default: disabled
AdvanceddateFormatDate parsing format. Default: yyyy-MM-dd
AdvancedtimestampFormatTimestamp parsing format
AdvancedIgnore Leading WhiteSpacesDefault: false
Advancedignore Trailing WhiteSpacestoggleButton

Schedule Options

Schedule Options is available only when Schedule Refresh toggle is True

OptionDescription
Task ScheduleOptions in Task Schedule
- Periodic Schedule
- CRON
Schedule refesh-time periodAvailable when Task Schedule is Set to Periodic Schedule
Options in Schedule refesh-time period
-Every Hours
-Every Days
-Every Weeks
Specific interval of periodic refresh(integer value)Available when Task Schedule is Set to Periodic Schedule
CRON stringAvailable when Task Schedule is Set to CRON
CRON TIME ZONEAvailable when Task Schedule is Set to CRON

LDP Deployment

Initial Deployment

When deployed for the first time into an environment DLT node will execute three stages:

StageDescription
Create Streaming tableThis stage will execute a CREATE OR REFRESH statement and create a Streaming table in the target environment

Redeployment

After the Streaming table has deployed for the first time into a target environment, subsequent deployments may result in either altering the Streaming table or recreating the Streaming table.

If a Streaming table is to be altered this will run the following stage:

Altering the Streaming table

The following config changes trigger ALTER statements:

  1. Add schedule
  2. Alter schedule
  3. Drop schedule

These execute the two stages:

StageDescription
Alter Streaming tableExecutes ALTER to modify parameters

Recreating the Streaming table

If anything changes other than the configuration options specified above then the Streaming table will be recreated by running a CREATE OR REPLACE statement.

Undeployment

If a Streaming table is deleted from a Workspace, that Workspace is committed to Git and that commit deployed to a higher-level environment then the Streaming table in the target environment will be dropped.

This is executed as a single stage:

StageDescription
Drop Streaming tableRemoves the Streaming table

Code

Versions

Available versions of the package.

Version #Release DateNotes
1.1.0February 12, 2026File format and Advanced file format options ,support added for AVRO,ORC,TEXT and XML
1.0.1January 09, 2026Parquet file type support added
1.0.0August 01, 2025LDP node type added