Data Pipelines with AI Enrichment¶
Build intelligent ETL pipelines that enrich, transform, and analyze data using SteadyText's AI capabilities directly in PostgreSQL.
Overview¶
This tutorial shows how to create data pipelines that: - Enrich raw data with AI-generated insights - Transform unstructured data into structured formats - Monitor data quality with AI validation - Generate automated reports and summaries - Create real-time data enrichment streams
Prerequisites¶
# Start PostgreSQL with SteadyText
docker run -d -p 5432:5432 --name steadytext-etl julep/pg-steadytext
# Connect and setup
psql -h localhost -U postgres -c "CREATE EXTENSION IF NOT EXISTS pg_steadytext CASCADE;"
psql -h localhost -U postgres -c "CREATE EXTENSION IF NOT EXISTS pg_cron;" # For scheduling
Pipeline Architecture¶
Create a flexible pipeline schema:
-- Pipeline definitions
CREATE TABLE data_pipelines (
id SERIAL PRIMARY KEY,
pipeline_name VARCHAR(100) UNIQUE NOT NULL,
description TEXT,
source_table VARCHAR(100),
target_table VARCHAR(100),
transform_function VARCHAR(100),
schedule_cron VARCHAR(50),
is_active BOOLEAN DEFAULT TRUE,
last_run TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Pipeline execution log
CREATE TABLE pipeline_runs (
id SERIAL PRIMARY KEY,
pipeline_id INTEGER REFERENCES data_pipelines(id),
start_time TIMESTAMPTZ NOT NULL,
end_time TIMESTAMPTZ,
status VARCHAR(20), -- 'running', 'completed', 'failed'
records_processed INTEGER,
records_enriched INTEGER,
error_message TEXT,
execution_stats JSONB
);
-- Raw data staging table
CREATE TABLE raw_data_staging (
id SERIAL PRIMARY KEY,
source_system VARCHAR(50),
raw_content TEXT,
metadata JSONB,
processed BOOLEAN DEFAULT FALSE,
ingested_at TIMESTAMPTZ DEFAULT NOW()
);
-- Enriched data warehouse
CREATE TABLE enriched_data (
id SERIAL PRIMARY KEY,
source_id INTEGER,
source_system VARCHAR(50),
original_content TEXT,
ai_summary TEXT,
extracted_entities JSONB,
sentiment_analysis JSONB,
categories TEXT[],
quality_score DECIMAL(3, 2),
enriched_at TIMESTAMPTZ DEFAULT NOW()
);
-- Data quality monitoring
CREATE TABLE data_quality_issues (
id SERIAL PRIMARY KEY,
pipeline_id INTEGER REFERENCES data_pipelines(id),
record_id INTEGER,
issue_type VARCHAR(50),
severity VARCHAR(20),
description TEXT,
ai_recommendation TEXT,
detected_at TIMESTAMPTZ DEFAULT NOW()
);
Real-Time Data Enrichment Pipeline¶
Create a pipeline that enriches incoming data in real-time:
-- Main enrichment function
CREATE OR REPLACE FUNCTION enrich_raw_data()
RETURNS TRIGGER AS $$
DECLARE
v_summary TEXT;
v_entities JSONB;
v_sentiment JSONB;
v_categories TEXT[];
v_quality_score DECIMAL(3, 2);
BEGIN
-- Skip if already processed
IF NEW.processed THEN
RETURN NEW;
END IF;
-- Generate AI summary
v_summary := steadytext_generate(
format('Summarize this data in 2 sentences: %s',
LEFT(NEW.raw_content, 1000)),
max_tokens := 100
);
-- Extract entities
v_entities := steadytext_generate_json(
format('Extract entities from: %s', LEFT(NEW.raw_content, 500)),
'{
"people": {"type": "array", "items": {"type": "string"}},
"organizations": {"type": "array", "items": {"type": "string"}},
"locations": {"type": "array", "items": {"type": "string"}},
"dates": {"type": "array", "items": {"type": "string"}},
"amounts": {"type": "array", "items": {"type": "string"}}
}'::json
)::jsonb;
-- Sentiment analysis
v_sentiment := jsonb_build_object(
'overall', steadytext_generate_choice(
'Overall sentiment: ' || LEFT(NEW.raw_content, 500),
ARRAY['positive', 'neutral', 'negative']
),
'confidence', 0.85 + random() * 0.15 -- Simulated confidence
);
-- Categorization
v_categories := string_to_array(
steadytext_generate(
format('List up to 3 categories for this content (comma-separated): %s',
LEFT(NEW.raw_content, 500)),
max_tokens := 30
),
', '
);
-- Calculate quality score
v_quality_score := CASE
WHEN length(NEW.raw_content) < 50 THEN 0.3
WHEN v_entities IS NULL OR jsonb_typeof(v_entities) != 'object' THEN 0.5
ELSE 0.7 + random() * 0.3
END;
-- Insert enriched data
INSERT INTO enriched_data (
source_id, source_system, original_content,
ai_summary, extracted_entities, sentiment_analysis,
categories, quality_score
) VALUES (
NEW.id, NEW.source_system, NEW.raw_content,
v_summary, v_entities, v_sentiment,
v_categories, v_quality_score
);
-- Mark as processed
NEW.processed := TRUE;
-- Check for quality issues
IF v_quality_score < 0.5 THEN
INSERT INTO data_quality_issues (
record_id, issue_type, severity, description, ai_recommendation
) VALUES (
NEW.id,
'low_quality_content',
'medium',
format('Quality score %s is below threshold', v_quality_score),
steadytext_generate(
'Suggest how to improve data quality for: ' || LEFT(NEW.raw_content, 200),
max_tokens := 100
)
);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Create trigger for real-time enrichment
CREATE TRIGGER enrich_on_insert
BEFORE INSERT OR UPDATE ON raw_data_staging
FOR EACH ROW
EXECUTE FUNCTION enrich_raw_data();
Batch Processing Pipeline¶
Create a batch pipeline for large-scale data processing:
-- Batch enrichment function
CREATE OR REPLACE FUNCTION batch_enrich_pipeline(
p_pipeline_name VARCHAR,
p_batch_size INTEGER DEFAULT 100
)
RETURNS TABLE (
processed_count INTEGER,
enriched_count INTEGER,
error_count INTEGER,
execution_time INTERVAL
) AS $$
DECLARE
v_pipeline data_pipelines%ROWTYPE;
v_run_id INTEGER;
v_start_time TIMESTAMPTZ;
v_processed INTEGER := 0;
v_enriched INTEGER := 0;
v_errors INTEGER := 0;
BEGIN
v_start_time := NOW();
-- Get pipeline configuration
SELECT * INTO v_pipeline FROM data_pipelines
WHERE pipeline_name = p_pipeline_name AND is_active;
IF NOT FOUND THEN
RAISE EXCEPTION 'Pipeline % not found or inactive', p_pipeline_name;
END IF;
-- Create pipeline run record
INSERT INTO pipeline_runs (pipeline_id, start_time, status)
VALUES (v_pipeline.id, v_start_time, 'running')
RETURNING id INTO v_run_id;
-- Process in batches
FOR r IN
SELECT * FROM raw_data_staging
WHERE NOT processed
ORDER BY ingested_at
LIMIT p_batch_size
LOOP
BEGIN
-- Process individual record
UPDATE raw_data_staging SET processed = TRUE WHERE id = r.id;
v_processed := v_processed + 1;
-- The trigger will handle enrichment
v_enriched := v_enriched + 1;
EXCEPTION WHEN OTHERS THEN
v_errors := v_errors + 1;
INSERT INTO data_quality_issues (
pipeline_id, record_id, issue_type, severity, description
) VALUES (
v_pipeline.id, r.id, 'processing_error', 'high', SQLERRM
);
END;
END LOOP;
-- Update pipeline run status
UPDATE pipeline_runs
SET end_time = NOW(),
status = 'completed',
records_processed = v_processed,
records_enriched = v_enriched,
execution_stats = jsonb_build_object(
'errors', v_errors,
'avg_processing_time_ms',
EXTRACT(MILLISECONDS FROM (NOW() - v_start_time)) / NULLIF(v_processed, 0)
)
WHERE id = v_run_id;
-- Update pipeline last run
UPDATE data_pipelines
SET last_run = NOW()
WHERE id = v_pipeline.id;
RETURN QUERY SELECT
v_processed,
v_enriched,
v_errors,
NOW() - v_start_time;
END;
$$ LANGUAGE plpgsql;
Data Quality Monitoring¶
Implement AI-powered data quality checks:
-- Data quality monitoring function
CREATE OR REPLACE FUNCTION monitor_data_quality(
p_hours_back INTEGER DEFAULT 24
)
RETURNS TABLE (
quality_metric VARCHAR,
score DECIMAL,
issues_found INTEGER,
ai_insights TEXT
) AS $$
BEGIN
RETURN QUERY
WITH quality_metrics AS (
SELECT
'completeness' AS metric,
AVG(CASE
WHEN ai_summary IS NOT NULL
AND extracted_entities IS NOT NULL
AND categories IS NOT NULL
THEN 1.0 ELSE 0.0
END) AS score,
COUNT(*) FILTER (WHERE quality_score < 0.5) AS issues
FROM enriched_data
WHERE enriched_at > NOW() - (p_hours_back || ' hours')::INTERVAL
UNION ALL
SELECT
'accuracy' AS metric,
AVG(quality_score) AS score,
COUNT(DISTINCT dqi.id) AS issues
FROM enriched_data ed
LEFT JOIN data_quality_issues dqi ON dqi.record_id = ed.source_id
WHERE ed.enriched_at > NOW() - (p_hours_back || ' hours')::INTERVAL
UNION ALL
SELECT
'timeliness' AS metric,
CASE
WHEN AVG(EXTRACT(EPOCH FROM (enriched_at - ed.enriched_at))) < 300
THEN 1.0
ELSE 0.5
END AS score,
COUNT(*) FILTER (
WHERE EXTRACT(EPOCH FROM (enriched_at - ed.enriched_at)) > 600
) AS issues
FROM enriched_data ed
JOIN raw_data_staging rs ON ed.source_id = rs.id
WHERE ed.enriched_at > NOW() - (p_hours_back || ' hours')::INTERVAL
)
SELECT
metric,
ROUND(score, 2),
issues,
steadytext_generate(
format('Analyze data quality: %s score is %s with %s issues',
metric, ROUND(score, 2), issues),
max_tokens := 100
) AS ai_insights
FROM quality_metrics;
END;
$$ LANGUAGE plpgsql;
Automated Report Generation¶
Generate intelligent reports from pipeline data:
-- Automated report generation
CREATE OR REPLACE FUNCTION generate_pipeline_report(
p_pipeline_id INTEGER,
p_period INTERVAL DEFAULT INTERVAL '1 day'
)
RETURNS TABLE (
report_section VARCHAR,
content TEXT,
metrics JSONB
) AS $$
DECLARE
v_pipeline data_pipelines%ROWTYPE;
BEGIN
SELECT * INTO v_pipeline FROM data_pipelines WHERE id = p_pipeline_id;
RETURN QUERY
-- Executive Summary
WITH pipeline_stats AS (
SELECT
COUNT(*) AS total_runs,
SUM(records_processed) AS total_processed,
SUM(records_enriched) AS total_enriched,
AVG(EXTRACT(EPOCH FROM (end_time - start_time))) AS avg_duration_seconds,
COUNT(*) FILTER (WHERE status = 'failed') AS failed_runs
FROM pipeline_runs
WHERE pipeline_id = p_pipeline_id
AND start_time > NOW() - p_period
)
SELECT
'executive_summary' AS report_section,
steadytext_generate(
format('Pipeline %s processed %s records in %s runs over the past %s. Average duration: %s seconds. Failed runs: %s',
v_pipeline.pipeline_name,
total_processed,
total_runs,
p_period,
ROUND(avg_duration_seconds, 2),
failed_runs
),
max_tokens := 200
) AS content,
to_jsonb(pipeline_stats.*) AS metrics
FROM pipeline_stats
UNION ALL
-- Data Quality Analysis
WITH quality_analysis AS (
SELECT
AVG(quality_score) AS avg_quality,
COUNT(*) FILTER (WHERE quality_score < 0.5) AS low_quality_count,
array_agg(DISTINCT unnest(categories)) AS all_categories
FROM enriched_data ed
JOIN raw_data_staging rs ON ed.source_id = rs.id
JOIN pipeline_runs pr ON pr.pipeline_id = p_pipeline_id
WHERE ed.enriched_at BETWEEN pr.start_time AND COALESCE(pr.end_time, NOW())
AND pr.start_time > NOW() - p_period
)
SELECT
'quality_analysis',
steadytext_generate(
format('Data quality analysis: Average score %s. Low quality records: %s. Categories covered: %s',
ROUND(avg_quality, 2),
low_quality_count,
array_to_string(all_categories[1:5], ', ')
),
max_tokens := 150
),
jsonb_build_object(
'avg_quality_score', avg_quality,
'low_quality_count', low_quality_count,
'category_count', array_length(all_categories, 1)
)
FROM quality_analysis
UNION ALL
-- Trend Analysis
SELECT
'trend_analysis',
steadytext_generate(
format('Analyze trends for pipeline %s based on: %s',
v_pipeline.pipeline_name,
jsonb_pretty(
jsonb_build_object(
'processing_volume',
(SELECT array_agg(records_processed ORDER BY start_time)
FROM pipeline_runs
WHERE pipeline_id = p_pipeline_id
AND start_time > NOW() - p_period
LIMIT 10)
)
)
),
max_tokens := 200
),
NULL;
END;
$$ LANGUAGE plpgsql;
Stream Processing Integration¶
Handle real-time data streams:
-- Streaming data handler
CREATE OR REPLACE FUNCTION process_data_stream(
p_stream_data JSONB
)
RETURNS VOID AS $$
DECLARE
v_record JSONB;
v_source_system VARCHAR;
BEGIN
-- Extract source system
v_source_system := p_stream_data->>'source_system';
-- Process each record in the stream
FOR v_record IN SELECT * FROM jsonb_array_elements(p_stream_data->'records')
LOOP
INSERT INTO raw_data_staging (
source_system,
raw_content,
metadata
) VALUES (
v_source_system,
v_record->>'content',
v_record->'metadata'
);
END LOOP;
-- Trigger batch processing if needed
IF (SELECT COUNT(*) FROM raw_data_staging WHERE NOT processed) > 1000 THEN
PERFORM batch_enrich_pipeline('main_pipeline', 1000);
END IF;
END;
$$ LANGUAGE plpgsql;
-- API endpoint for streaming
CREATE OR REPLACE FUNCTION api_ingest_stream(
p_api_key VARCHAR,
p_data JSONB
)
RETURNS JSONB AS $$
DECLARE
v_result JSONB;
BEGIN
-- Validate API key (simplified)
IF p_api_key != 'your-secret-key' THEN
RETURN jsonb_build_object('error', 'Invalid API key');
END IF;
-- Process the stream
PERFORM process_data_stream(p_data);
-- Return success response
RETURN jsonb_build_object(
'status', 'success',
'records_received', jsonb_array_length(p_data->'records'),
'timestamp', NOW()
);
END;
$$ LANGUAGE plpgsql;
Pipeline Orchestration¶
Schedule and orchestrate complex pipelines:
-- Create sample pipelines
INSERT INTO data_pipelines (pipeline_name, description, schedule_cron) VALUES
('hourly_enrichment', 'Process and enrich data every hour', '0 * * * *'),
('daily_quality_check', 'Daily data quality monitoring', '0 9 * * *'),
('weekly_report', 'Generate weekly executive reports', '0 10 * * 1');
-- Schedule with pg_cron
SELECT cron.schedule(
'hourly_enrichment_job',
'0 * * * *',
$$SELECT batch_enrich_pipeline('hourly_enrichment', 500);$$
);
SELECT cron.schedule(
'daily_quality_job',
'0 9 * * *',
$$INSERT INTO data_quality_reports
SELECT NOW(), * FROM monitor_data_quality(24);$$
);
-- Complex pipeline with dependencies
CREATE OR REPLACE FUNCTION orchestrate_complex_pipeline()
RETURNS VOID AS $$
BEGIN
-- Step 1: Ingest raw data
PERFORM process_data_stream(
jsonb_build_object(
'source_system', 'automated_import',
'records', (
SELECT jsonb_agg(
jsonb_build_object(
'content', external_content,
'metadata', metadata
)
)
FROM external_data_source
WHERE import_date = CURRENT_DATE
)
)
);
-- Step 2: Enrich data
PERFORM batch_enrich_pipeline('hourly_enrichment', 1000);
-- Step 3: Quality check
INSERT INTO data_quality_reports
SELECT NOW(), * FROM monitor_data_quality(1);
-- Step 4: Generate insights
INSERT INTO executive_insights
SELECT * FROM generate_pipeline_report(
(SELECT id FROM data_pipelines WHERE pipeline_name = 'hourly_enrichment'),
INTERVAL '1 hour'
);
-- Step 5: Alert on issues
PERFORM pg_notify('pipeline_complete',
json_build_object(
'pipeline', 'complex_orchestration',
'status', 'completed',
'timestamp', NOW()
)::text
);
END;
$$ LANGUAGE plpgsql;
Sample Data and Testing¶
-- Insert test data
INSERT INTO raw_data_staging (source_system, raw_content, metadata) VALUES
('crm', 'Customer John Smith called about product issue. He was frustrated with the delayed shipping and wants a refund. Order #12345.',
'{"customer_id": "C123", "call_duration": "15:32"}'::jsonb),
('social_media', 'Just received my new headphones from @YourCompany! Amazing sound quality and super comfortable. Best purchase this year! #Happy',
'{"platform": "twitter", "engagement": {"likes": 45, "retweets": 12}}'::jsonb),
('support_email', 'Subject: Technical Issue\n\nDear Support,\n\nI am experiencing connectivity issues with model XZ-500. The device keeps disconnecting every few minutes. I have tried resetting but the problem persists.\n\nPlease help.\n\nRegards,\nJane Doe',
'{"ticket_id": "T789", "priority": "high"}'::jsonb);
-- Run enrichment pipeline
SELECT * FROM batch_enrich_pipeline('hourly_enrichment', 10);
-- Check enriched data
SELECT
source_system,
ai_summary,
sentiment_analysis->>'overall' AS sentiment,
categories,
quality_score
FROM enriched_data
ORDER BY enriched_at DESC
LIMIT 5;
-- Monitor quality
SELECT * FROM monitor_data_quality(24);
-- Generate report
SELECT * FROM generate_pipeline_report(1, INTERVAL '1 day');
Performance Optimization¶
-- Parallel processing function
CREATE OR REPLACE FUNCTION parallel_enrich_pipeline(
p_pipeline_name VARCHAR,
p_parallel_workers INTEGER DEFAULT 4
)
RETURNS VOID AS $$
BEGIN
-- Use PostgreSQL parallel queries
SET max_parallel_workers_per_gather = p_parallel_workers;
-- Process in parallel
UPDATE raw_data_staging rs
SET processed = TRUE
FROM (
SELECT id,
steadytext_generate('Summarize: ' || raw_content, 100) AS summary
FROM raw_data_staging
WHERE NOT processed
LIMIT 1000
) enriched
WHERE rs.id = enriched.id;
RESET max_parallel_workers_per_gather;
END;
$$ LANGUAGE plpgsql;
Best Practices¶
- Batch Size: Tune batch sizes based on your hardware
- Error Handling: Always implement comprehensive error handling
- Monitoring: Set up alerts for pipeline failures
- Caching: Use SteadyText's caching for repeated AI operations
- Scheduling: Use pg_cron for reliable pipeline scheduling
Next Steps¶
Pro Tip
For high-volume pipelines, consider partitioning your staging tables by date and using parallel workers to maximize throughput.