From dqx
Defines DQX data quality rules for PySpark DataFrames or Delta tables using Python classes (DQRowRule, DQDatasetRule, DQForEachColRule) or YAML/JSON metadata. Supports filters, custom checks, and criticality levels.
How this skill is triggered — by the user, by Claude, or both
Slash command
/dqx:dqx-define-checksThe summary Claude sees in its skill listing — used to decide when to auto-load this skill
DQX rules come in two interchangeable forms. **Pick based on where the checks will live.**
DQX rules come in two interchangeable forms. Pick based on where the checks will live.
DQRowRule, DQDatasetRule, DQForEachColRule) — use when checks are authored in code next to the pipeline. Static typing + IDE autocomplete.apply_checks_by_metadata* path.Every check has a criticality of error (failing row quarantined) or warn (failing row passes but flagged). Default is error.
from databricks.labs.dqx import check_funcs
from databricks.labs.dqx.rule import DQRowRule, DQDatasetRule, DQForEachColRule
checks = [
# row-level: one column
DQRowRule(
name="col3_is_not_null",
criticality="warn",
check_func=check_funcs.is_not_null_and_not_empty,
column="col3",
),
# same check across many columns
*DQForEachColRule(
columns=["col1", "col2"],
criticality="error",
check_func=check_funcs.is_not_null,
).get_rules(),
# dataset-level: uniqueness across a composite key
DQDatasetRule(
criticality="error",
check_func=check_funcs.is_unique,
columns=["order_id", "line_item_id"],
),
]
Load into Python via yaml.safe_load(...), then pass the resulting list[dict] to any apply_checks_by_metadata* call, or save through a storage config (see dqx-storage).
- name: col3_is_not_null
criticality: warn
check:
function: is_not_null_and_not_empty
arguments:
column: col3
- criticality: error
check:
function: is_not_null
for_each_column: [col1, col2]
- criticality: error
check:
function: is_unique
arguments:
columns: [order_id, line_item_id]
filter="col1 < 3" (class) or filter: "col1 < 3" (YAML).check_func_args=[[1, 2]]; keyword args — check_func_kwargs={"allowed": [1, 2]}.F.try_element_at(...) or dotted path (col7.field1) as the column value.user_metadata dict (e.g. {"check_type": "completeness"}) that flows into the result struct.Column as check_func. For inline SQL, use the fallback section below — only after confirming no built-in fits.is_aggr_not_greater_than, is_aggr_not_less_than, is_aggr_equal, is_aggr_not_equal; supply aggr_type (count, avg, stddev, percentile, count_distinct…), optional group_by, and limit.is_unique, with columns, nulls_distinct (bool), and optional row_filter. Not an aggregate check — no aggr_type.Full reference: https://databrickslabs.github.io/dqx/docs/reference/quality_checks.
Search check_funcs first — the built-ins cover null/empty, range, set membership, regex, referential, aggregate, uniqueness, schema, freshness, comparison, and outlier cases with typed error messages and tested edge handling. Drop down to SQL only when no built-in fits.
sql_expression — row-level SQL boolean expression. Use when one row's validity depends on its own columns.sql_query — dataset-level SQL query against {{ input_view }}. Use for cross-row aggregates, joins to reference DataFrames, or anything needing GROUP BY. Queries are validated by is_sql_query_safe() — read-only SELECT, no DDL/DML.# row-level: SQL expression evaluated per row
- name: amount_positive_or_refunded
criticality: error
check:
function: sql_expression
arguments:
expression: amount > 0 OR refunded = true
msg: amount must be positive unless refunded
# dataset-level: SQL query, joined back to rows via merge_columns
- name: order_total_matches_lines
criticality: error
check:
function: sql_query
arguments:
query: |
SELECT order_id,
SUM(line_amount) <> order_total AS condition
FROM {{ input_view }}
GROUP BY order_id, order_total
merge_columns: [order_id] # row-level: joins back per order_id
condition_column: condition # column in query output; true = fail
# omit merge_columns for dataset-level (one verdict applies to every row)
For the equivalent class form, use DQRowRule(check_func=check_funcs.sql_expression, check_func_kwargs={"expression": "..."}) or DQDatasetRule(check_func=check_funcs.sql_query, check_func_kwargs={...}).
from databricks.labs.dqx.checks_serializer import serialize_checks, deserialize_checks
checks_metadata = serialize_checks(checks) # classes → list[dict]
checks_classes = deserialize_checks(checks_metadata) # list[dict] → classes
Catch syntax errors without running the pipeline:
from databricks.labs.dqx.engine import DQEngine
status = DQEngine.validate_checks(checks) # raises / returns ValidationStatus
name — it ends up in result columns and dashboards.dqx-storage) — classes are fine for a handful, metadata scales.check_funcs.sql_expression / sql_query when a built-in covers the case — they bypass typed error messages and security guards. Search check_funcs first.check_func — it must return a Column expression only.Canonical docs: https://databrickslabs.github.io/dqx/docs/guide/quality_checks_definition.
npx claudepluginhub databrickslabs/dqx --plugin dqxValidates PySpark DataFrames or Delta tables against DQX quality rules using DQEngine. Appends results as columns, splits valid/invalid rows, or uses metadata rules.
Validates AIDP tables against data-quality rules (not-null, uniqueness, range/set, referential integrity, freshness) using bounded Spark SQL. Reports pass/fail with violation counts and can persist rule sets for re-runs.
Implements data quality validation with Great Expectations, dbt tests, and data contracts. Use for building data quality pipelines, validation rules, or establishing data contracts.