Implement data quality validation with Great Expectations, dbt tests, and data contracts. Use when building data quality pipelines, implementing validation rules, or establishing data contracts.
Inherits all available tools
Additional assets for this skill
This skill inherits all available tools. When active, it can use any tool Claude has access to.
Production patterns for implementing data quality with Great Expectations, dbt tests, and data contracts to ensure reliable data pipelines.
| Dimension | Description | Example Check |
|---|---|---|
| Completeness | No missing values | expect_column_values_to_not_be_null |
| Uniqueness | No duplicates | expect_column_values_to_be_unique |
| Validity | Values in expected range | expect_column_values_to_be_in_set |
| Accuracy | Data matches reality | Cross-reference validation |
| Consistency | No contradictions | expect_column_pair_values_A_to_be_greater_than_B |
| Timeliness | Data is recent | expect_column_max_to_be_between |
/\
/ \ Integration Tests (cross-table)
/────\
/ \ Unit Tests (single column)
/────────\
/ \ Schema Tests (structure)
/────────────\
# Install
pip install great_expectations
# Initialize project
great_expectations init
# Create datasource
great_expectations datasource new
# great_expectations/checkpoints/daily_validation.yml
import great_expectations as gx
# Create context
context = gx.get_context()
# Create expectation suite
suite = context.add_expectation_suite("orders_suite")
# Add expectations
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
# Validate
results = context.run_checkpoint(checkpoint_name="daily_orders")
# expectations/orders_suite.py
import great_expectations as gx
from great_expectations.core import ExpectationSuite
from great_expectations.core.expectation_configuration import ExpectationConfiguration
def build_orders_suite() -> ExpectationSuite:
"""Build comprehensive orders expectation suite"""
suite = ExpectationSuite(expectation_suite_name="orders_suite")
# Schema expectations
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_table_columns_to_match_set",
kwargs={
"column_set": ["order_id", "customer_id", "amount", "status", "created_at"],
"exact_match": False # Allow additional columns
}
))
# Primary key
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "order_id"}
))
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_unique",
kwargs={"column": "order_id"}
))
# Foreign key
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "customer_id"}
))
# Categorical values
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_in_set",
kwargs={
"column": "status",
"value_set": ["pending", "processing", "shipped", "delivered", "cancelled"]
}
))
# Numeric ranges
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={
"column": "amount",
"min_value": 0,
"max_value": 100000,
"strict_min": True # amount > 0
}
))
# Date validity
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_dateutil_parseable",
kwargs={"column": "created_at"}
))
# Freshness - data should be recent
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_max_to_be_between",
kwargs={
"column": "created_at",
"min_value": {"$PARAMETER": "now - timedelta(days=1)"},
"max_value": {"$PARAMETER": "now"}
}
))
# Row count sanity
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_table_row_count_to_be_between",
kwargs={
"min_value": 1000, # Expect at least 1000 rows
"max_value": 10000000
}
))
# Statistical expectations
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_mean_to_be_between",
kwargs={
"column": "amount",
"min_value": 50,
"max_value": 500
}
))
return suite
# great_expectations/checkpoints/orders_checkpoint.yml
name: orders_checkpoint
config_version: 1.0
class_name: Checkpoint
run_name_template: "%Y%m%d-%H%M%S-orders-validation"
validations:
- batch_request:
datasource_name: warehouse
data_connector_name: default_inferred_data_connector_name
data_asset_name: orders
data_connector_query:
index: -1 # Latest batch
expectation_suite_name: orders_suite
action_list:
- name: store_validation_result
action:
class_name: StoreValidationResultAction
- name: store_evaluation_parameters
action:
class_name: StoreEvaluationParametersAction
- name: update_data_docs
action:
class_name: UpdateDataDocsAction
# Slack notification on failure
- name: send_slack_notification
action:
class_name: SlackNotificationAction
slack_webhook: ${SLACK_WEBHOOK}
notify_on: failure
renderer:
module_name: great_expectations.render.renderer.slack_renderer
class_name: SlackRenderer
# Run checkpoint
import great_expectations as gx
context = gx.get_context()
result = context.run_checkpoint(checkpoint_name="orders_checkpoint")
if not result.success:
failed_expectations = [
r for r in result.run_results.values()
if not r.success
]
raise ValueError(f"Data quality check failed: {failed_expectations}")
# models/marts/core/_core__models.yml
version: 2
models:
- name: fct_orders
description: Order fact table
tests:
# Table-level tests
- dbt_utils.recency:
datepart: day
field: created_at
interval: 1
- dbt_utils.at_least_one
- dbt_utils.expression_is_true:
expression: "total_amount >= 0"
columns:
- name: order_id
description: Primary key
tests:
- unique
- not_null
- name: customer_id
description: Foreign key to dim_customers
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: order_status
tests:
- accepted_values:
values: ['pending', 'processing', 'shipped', 'delivered', 'cancelled']
- name: total_amount
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: created_at
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "<= current_timestamp"
- name: dim_customers
columns:
- name: customer_id
tests:
- unique
- not_null
- name: email
tests:
- unique
- not_null
# Custom regex test
- dbt_utils.expression_is_true:
expression: "email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'"
-- tests/generic/test_row_count_in_range.sql
{% test row_count_in_range(model, min_count, max_count) %}
with row_count as (
select count(*) as cnt from {{ model }}
)
select cnt
from row_count
where cnt < {{ min_count }} or cnt > {{ max_count }}
{% endtest %}
-- Usage in schema.yml:
-- tests:
-- - row_count_in_range:
-- min_count: 1000
-- max_count: 10000000
-- tests/generic/test_sequential_values.sql
{% test sequential_values(model, column_name, interval=1) %}
with lagged as (
select
{{ column_name }},
lag({{ column_name }}) over (order by {{ column_name }}) as prev_value
from {{ model }}
)
select *
from lagged
where {{ column_name }} - prev_value != {{ interval }}
and prev_value is not null
{% endtest %}
-- tests/singular/assert_orders_customers_match.sql
-- Singular test: specific business rule
with orders_customers as (
select distinct customer_id from {{ ref('fct_orders') }}
),
dim_customers as (
select customer_id from {{ ref('dim_customers') }}
),
orphaned_orders as (
select o.customer_id
from orders_customers o
left join dim_customers c using (customer_id)
where c.customer_id is null
)
select * from orphaned_orders
-- Test passes if this returns 0 rows
# contracts/orders_contract.yaml
apiVersion: datacontract.com/v1.0.0
kind: DataContract
metadata:
name: orders
version: 1.0.0
owner: data-platform-team
contact: data-team@company.com
info:
title: Orders Data Contract
description: Contract for order event data from the ecommerce platform
purpose: Analytics, reporting, and ML features
servers:
production:
type: snowflake
account: company.us-east-1
database: ANALYTICS
schema: CORE
terms:
usage: Internal analytics only
limitations: PII must not be exposed in downstream marts
billing: Charged per query TB scanned
schema:
type: object
properties:
order_id:
type: string
format: uuid
description: Unique order identifier
required: true
unique: true
pii: false
customer_id:
type: string
format: uuid
description: Customer identifier
required: true
pii: true
piiClassification: indirect
total_amount:
type: number
minimum: 0
maximum: 100000
description: Order total in USD
created_at:
type: string
format: date-time
description: Order creation timestamp
required: true
status:
type: string
enum: [pending, processing, shipped, delivered, cancelled]
description: Current order status
quality:
type: SodaCL
specification:
checks for orders:
- row_count > 0
- missing_count(order_id) = 0
- duplicate_count(order_id) = 0
- invalid_count(status) = 0:
valid values: [pending, processing, shipped, delivered, cancelled]
- freshness(created_at) < 24h
sla:
availability: 99.9%
freshness: 1 hour
latency: 5 minutes
# quality_pipeline.py
from dataclasses import dataclass
from typing import List, Dict, Any
import great_expectations as gx
from datetime import datetime
@dataclass
class QualityResult:
table: str
passed: bool
total_expectations: int
failed_expectations: int
details: List[Dict[str, Any]]
timestamp: datetime
class DataQualityPipeline:
"""Orchestrate data quality checks across tables"""
def __init__(self, context: gx.DataContext):
self.context = context
self.results: List[QualityResult] = []
def validate_table(self, table: str, suite: str) -> QualityResult:
"""Validate a single table against expectation suite"""
checkpoint_config = {
"name": f"{table}_validation",
"config_version": 1.0,
"class_name": "Checkpoint",
"validations": [{
"batch_request": {
"datasource_name": "warehouse",
"data_asset_name": table,
},
"expectation_suite_name": suite,
}],
}
result = self.context.run_checkpoint(**checkpoint_config)
# Parse results
validation_result = list(result.run_results.values())[0]
results = validation_result.results
failed = [r for r in results if not r.success]
return QualityResult(
table=table,
passed=result.success,
total_expectations=len(results),
failed_expectations=len(failed),
details=[{
"expectation": r.expectation_config.expectation_type,
"success": r.success,
"observed_value": r.result.get("observed_value"),
} for r in results],
timestamp=datetime.now()
)
def run_all(self, tables: Dict[str, str]) -> Dict[str, QualityResult]:
"""Run validation for all tables"""
results = {}
for table, suite in tables.items():
print(f"Validating {table}...")
results[table] = self.validate_table(table, suite)
return results
def generate_report(self, results: Dict[str, QualityResult]) -> str:
"""Generate quality report"""
report = ["# Data Quality Report", f"Generated: {datetime.now()}", ""]
total_passed = sum(1 for r in results.values() if r.passed)
total_tables = len(results)
report.append(f"## Summary: {total_passed}/{total_tables} tables passed")
report.append("")
for table, result in results.items():
status = "✅" if result.passed else "❌"
report.append(f"### {status} {table}")
report.append(f"- Expectations: {result.total_expectations}")
report.append(f"- Failed: {result.failed_expectations}")
if not result.passed:
report.append("- Failed checks:")
for detail in result.details:
if not detail["success"]:
report.append(f" - {detail['expectation']}: {detail['observed_value']}")
report.append("")
return "\n".join(report)
# Usage
context = gx.get_context()
pipeline = DataQualityPipeline(context)
tables_to_validate = {
"orders": "orders_suite",
"customers": "customers_suite",
"products": "products_suite",
}
results = pipeline.run_all(tables_to_validate)
report = pipeline.generate_report(results)
# Fail pipeline if any table failed
if not all(r.passed for r in results.values()):
print(report)
raise ValueError("Data quality checks failed!")