data-quality-frameworks by wshobson
Implement data quality validation with Great Expectations, dbt tests, and data contracts. Use when building data quality pipelines, implementing validation rules, or establishing data contracts.
Data & Analytics
25.1K Stars
2.8K Forks
Updated Jan 9, 2026, 03:41 PM
Why Use This
This skill provides specialized capabilities for wshobson's codebase.
Use Cases
- Developing new features in the wshobson repository
- Refactoring existing code to follow wshobson standards
- Understanding and working with wshobson's codebase structure
Skill Snapshot
Auto scan of skill assets. Informational only.
Valid SKILL.md
Checks against SKILL.md specification
Source & Community
Skill Stats
SKILL.md 588 Lines
Total Files 1
Total Size 0 B
License NOASSERTION
---
name: data-quality-frameworks
description: Implement data quality validation with Great Expectations, dbt tests, and data contracts. Use when building data quality pipelines, implementing validation rules, or establishing data contracts.
---
# Data Quality Frameworks
Production patterns for implementing data quality with Great Expectations, dbt tests, and data contracts to ensure reliable data pipelines.
## When to Use This Skill
- Implementing data quality checks in pipelines
- Setting up Great Expectations validation
- Building comprehensive dbt test suites
- Establishing data contracts between teams
- Monitoring data quality metrics
- Automating data validation in CI/CD
## Core Concepts
### 1. Data Quality Dimensions
| Dimension | Description | Example Check |
|-----------|-------------|---------------|
| **Completeness** | No missing values | `expect_column_values_to_not_be_null` |
| **Uniqueness** | No duplicates | `expect_column_values_to_be_unique` |
| **Validity** | Values in expected range | `expect_column_values_to_be_in_set` |
| **Accuracy** | Data matches reality | Cross-reference validation |
| **Consistency** | No contradictions | `expect_column_pair_values_A_to_be_greater_than_B` |
| **Timeliness** | Data is recent | `expect_column_max_to_be_between` |
### 2. Testing Pyramid for Data
```
/\
/ \ Integration Tests (cross-table)
/────\
/ \ Unit Tests (single column)
/────────\
/ \ Schema Tests (structure)
/────────────\
```
## Quick Start
### Great Expectations Setup
```bash
# Install
pip install great_expectations
# Initialize project
great_expectations init
# Create datasource
great_expectations datasource new
```
```python
# great_expectations/checkpoints/daily_validation.yml
import great_expectations as gx
# Create context
context = gx.get_context()
# Create expectation suite
suite = context.add_expectation_suite("orders_suite")
# Add expectations
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
# Validate
results = context.run_checkpoint(checkpoint_name="daily_orders")
```
## Patterns
### Pattern 1: Great Expectations Suite
```python
# expectations/orders_suite.py
import great_expectations as gx
from great_expectations.core import ExpectationSuite
from great_expectations.core.expectation_configuration import ExpectationConfiguration
def build_orders_suite() -> ExpectationSuite:
"""Build comprehensive orders expectation suite"""
suite = ExpectationSuite(expectation_suite_name="orders_suite")
# Schema expectations
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_table_columns_to_match_set",
kwargs={
"column_set": ["order_id", "customer_id", "amount", "status", "created_at"],
"exact_match": False # Allow additional columns
}
))
# Primary key
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "order_id"}
))
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_unique",
kwargs={"column": "order_id"}
))
# Foreign key
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "customer_id"}
))
# Categorical values
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_in_set",
kwargs={
"column": "status",
"value_set": ["pending", "processing", "shipped", "delivered", "cancelled"]
}
))
# Numeric ranges
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={
"column": "amount",
"min_value": 0,
"max_value": 100000,
"strict_min": True # amount > 0
}
))
# Date validity
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_dateutil_parseable",
kwargs={"column": "created_at"}
))
# Freshness - data should be recent
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_max_to_be_between",
kwargs={
"column": "created_at",
"min_value": {"$PARAMETER": "now - timedelta(days=1)"},
"max_value": {"$PARAMETER": "now"}
}
))
# Row count sanity
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_table_row_count_to_be_between",
kwargs={
"min_value": 1000, # Expect at least 1000 rows
"max_value": 10000000
}
))
# Statistical expectations
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_mean_to_be_between",
kwargs={
"column": "amount",
"min_value": 50,
"max_value": 500
}
))
return suite
```
### Pattern 2: Great Expectations Checkpoint
```yaml
# great_expectations/checkpoints/orders_checkpoint.yml
name: orders_checkpoint
config_version: 1.0
class_name: Checkpoint
run_name_template: "%Y%m%d-%H%M%S-orders-validation"
validations:
- batch_request:
datasource_name: warehouse
data_connector_name: default_inferred_data_connector_name
data_asset_name: orders
data_connector_query:
index: -1 # Latest batch
expectation_suite_name: orders_suite
action_list:
- name: store_validation_result
action:
class_name: StoreValidationResultAction
- name: store_evaluation_parameters
action:
class_name: StoreEvaluationParametersAction
- name: update_data_docs
action:
class_name: UpdateDataDocsAction
# Slack notification on failure
- name: send_slack_notification
action:
class_name: SlackNotificationAction
slack_webhook: ${SLACK_WEBHOOK}
notify_on: failure
renderer:
module_name: great_expectations.render.renderer.slack_renderer
class_name: SlackRenderer
```
```python
# Run checkpoint
import great_expectations as gx
context = gx.get_context()
result = context.run_checkpoint(checkpoint_name="orders_checkpoint")
if not result.success:
failed_expectations = [
r for r in result.run_results.values()
if not r.success
]
raise ValueError(f"Data quality check failed: {failed_expectations}")
```
### Pattern 3: dbt Data Tests
```yaml
# models/marts/core/_core__models.yml
version: 2
models:
- name: fct_orders
description: Order fact table
tests:
# Table-level tests
- dbt_utils.recency:
datepart: day
field: created_at
interval: 1
- dbt_utils.at_least_one
- dbt_utils.expression_is_true:
expression: "total_amount >= 0"
columns:
- name: order_id
description: Primary key
tests:
- unique
- not_null
- name: customer_id
description: Foreign key to dim_customers
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: order_status
tests:
- accepted_values:
values: ['pending', 'processing', 'shipped', 'delivered', 'cancelled']
- name: total_amount
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: created_at
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "<= current_timestamp"
- name: dim_customers
columns:
- name: customer_id
tests:
- unique
- not_null
- name: email
tests:
- unique
- not_null
# Custom regex test
- dbt_utils.expression_is_true:
expression: "email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'"
```
### Pattern 4: Custom dbt Tests
```sql
-- tests/generic/test_row_count_in_range.sql
{% test row_count_in_range(model, min_count, max_count) %}
with row_count as (
select count(*) as cnt from {{ model }}
)
select cnt
from row_count
where cnt < {{ min_count }} or cnt > {{ max_count }}
{% endtest %}
-- Usage in schema.yml:
-- tests:
-- - row_count_in_range:
-- min_count: 1000
-- max_count: 10000000
```
```sql
-- tests/generic/test_sequential_values.sql
{% test sequential_values(model, column_name, interval=1) %}
with lagged as (
select
{{ column_name }},
lag({{ column_name }}) over (order by {{ column_name }}) as prev_value
from {{ model }}
)
select *
from lagged
where {{ column_name }} - prev_value != {{ interval }}
and prev_value is not null
{% endtest %}
```
```sql
-- tests/singular/assert_orders_customers_match.sql
-- Singular test: specific business rule
with orders_customers as (
select distinct customer_id from {{ ref('fct_orders') }}
),
dim_customers as (
select customer_id from {{ ref('dim_customers') }}
),
orphaned_orders as (
select o.customer_id
from orders_customers o
left join dim_customers c using (customer_id)
where c.customer_id is null
)
select * from orphaned_orders
-- Test passes if this returns 0 rows
```
### Pattern 5: Data Contracts
```yaml
# contracts/orders_contract.yaml
apiVersion: datacontract.com/v1.0.0
kind: DataContract
metadata:
name: orders
version: 1.0.0
owner: data-platform-team
contact: [email protected]
info:
title: Orders Data Contract
description: Contract for order event data from the ecommerce platform
purpose: Analytics, reporting, and ML features
servers:
production:
type: snowflake
account: company.us-east-1
database: ANALYTICS
schema: CORE
terms:
usage: Internal analytics only
limitations: PII must not be exposed in downstream marts
billing: Charged per query TB scanned
schema:
type: object
properties:
order_id:
type: string
format: uuid
description: Unique order identifier
required: true
unique: true
pii: false
customer_id:
type: string
format: uuid
description: Customer identifier
required: true
pii: true
piiClassification: indirect
total_amount:
type: number
minimum: 0
maximum: 100000
description: Order total in USD
created_at:
type: string
format: date-time
description: Order creation timestamp
required: true
status:
type: string
enum: [pending, processing, shipped, delivered, cancelled]
description: Current order status
quality:
type: SodaCL
specification:
checks for orders:
- row_count > 0
- missing_count(order_id) = 0
- duplicate_count(order_id) = 0
- invalid_count(status) = 0:
valid values: [pending, processing, shipped, delivered, cancelled]
- freshness(created_at) < 24h
sla:
availability: 99.9%
freshness: 1 hour
latency: 5 minutes
```
### Pattern 6: Automated Quality Pipeline
```python
# quality_pipeline.py
from dataclasses import dataclass
from typing import List, Dict, Any
import great_expectations as gx
from datetime import datetime
@dataclass
class QualityResult:
table: str
passed: bool
total_expectations: int
failed_expectations: int
details: List[Dict[str, Any]]
timestamp: datetime
class DataQualityPipeline:
"""Orchestrate data quality checks across tables"""
def __init__(self, context: gx.DataContext):
self.context = context
self.results: List[QualityResult] = []
def validate_table(self, table: str, suite: str) -> QualityResult:
"""Validate a single table against expectation suite"""
checkpoint_config = {
"name": f"{table}_validation",
"config_version": 1.0,
"class_name": "Checkpoint",
"validations": [{
"batch_request": {
"datasource_name": "warehouse",
"data_asset_name": table,
},
"expectation_suite_name": suite,
}],
}
result = self.context.run_checkpoint(**checkpoint_config)
# Parse results
validation_result = list(result.run_results.values())[0]
results = validation_result.results
failed = [r for r in results if not r.success]
return QualityResult(
table=table,
passed=result.success,
total_expectations=len(results),
failed_expectations=len(failed),
details=[{
"expectation": r.expectation_config.expectation_type,
"success": r.success,
"observed_value": r.result.get("observed_value"),
} for r in results],
timestamp=datetime.now()
)
def run_all(self, tables: Dict[str, str]) -> Dict[str, QualityResult]:
"""Run validation for all tables"""
results = {}
for table, suite in tables.items():
print(f"Validating {table}...")
results[table] = self.validate_table(table, suite)
return results
def generate_report(self, results: Dict[str, QualityResult]) -> str:
"""Generate quality report"""
report = ["# Data Quality Report", f"Generated: {datetime.now()}", ""]
total_passed = sum(1 for r in results.values() if r.passed)
total_tables = len(results)
report.append(f"## Summary: {total_passed}/{total_tables} tables passed")
report.append("")
for table, result in results.items():
status = "✅" if result.passed else "❌"
report.append(f"### {status} {table}")
report.append(f"- Expectations: {result.total_expectations}")
report.append(f"- Failed: {result.failed_expectations}")
if not result.passed:
report.append("- Failed checks:")
for detail in result.details:
if not detail["success"]:
report.append(f" - {detail['expectation']}: {detail['observed_value']}")
report.append("")
return "\n".join(report)
# Usage
context = gx.get_context()
pipeline = DataQualityPipeline(context)
tables_to_validate = {
"orders": "orders_suite",
"customers": "customers_suite",
"products": "products_suite",
}
results = pipeline.run_all(tables_to_validate)
report = pipeline.generate_report(results)
# Fail pipeline if any table failed
if not all(r.passed for r in results.values()):
print(report)
raise ValueError("Data quality checks failed!")
```
## Best Practices
### Do's
- **Test early** - Validate source data before transformations
- **Test incrementally** - Add tests as you find issues
- **Document expectations** - Clear descriptions for each test
- **Alert on failures** - Integrate with monitoring
- **Version contracts** - Track schema changes
### Don'ts
- **Don't test everything** - Focus on critical columns
- **Don't ignore warnings** - They often precede failures
- **Don't skip freshness** - Stale data is bad data
- **Don't hardcode thresholds** - Use dynamic baselines
- **Don't test in isolation** - Test relationships too
## Resources
- [Great Expectations Documentation](https://docs.greatexpectations.io/)
- [dbt Testing Documentation](https://docs.getdbt.com/docs/build/tests)
- [Data Contract Specification](https://datacontract.com/)
- [Soda Core](https://docs.soda.io/soda-core/overview.html)
Name Size