Data & Automation

Intelligent agents that build and maintain data pipelines, monitor quality, and automate reporting.

Data & Automation Agents

These agents handle the unglamorous but critical work of data engineering โ€” building reliable pipelines, catching anomalies, and delivering insights without human intervention.

๐Ÿ›  Active Use Cases


๐Ÿ”„ ETL Pipeline Orchestrator Agent

An agent that monitors multiple data source schemas, detects breaking changes (renamed columns, type changes, new nulls), automatically adapts transformation logic, and alerts the data team. It acts as a self-healing data pipeline layer that reduces downtime.

from anthropic import Anthropic

client = Anthropic()

def detect_schema_changes(old_schema: dict, new_schema: dict) -> str:
    changes = []
    for col in old_schema:
        if col not in new_schema:
            changes.append(f"REMOVED column: {col}")
        elif old_schema[col] != new_schema[col]:
            changes.append(f"TYPE CHANGE: {col} ({old_schema[col]} -> {new_schema[col]})")
    for col in new_schema:
        if col not in old_schema:
            changes.append(f"NEW column: {col} ({new_schema[col]})")
    return "\n".join(changes) if changes else "No changes detected."

def fix_pipeline(current_transform_code: str, schema_changes: str) -> str:
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        system="You are a senior data engineer. Fix Python/pandas transformation code to handle schema changes. Return only valid Python code.",
        messages=[{
            "role": "user",
            "content": f"""Schema changes detected:
{schema_changes}

Current transformation code:
```python
{current_transform_code}

Rewrite the code to handle these changes with backward compatibility.โ€โ€œโ€ }] ) return response.content[0].text

:::

::: {.callout-warning collapse="true"}
## Implementation โ€” Low/NoCode
**Stack:** Airbyte + n8n + Claude API + PagerDuty

1. **Monitor**: n8n polls Airbyte sync logs every hour for failures or schema warnings.
2. **Diagnose**: On failure, the error log is sent to Claude, which identifies the root cause and suggests a fix.
3. **Alert**: A structured alert (cause + suggested fix + affected tables) is sent to PagerDuty and Slack.
4. **Auto-fix**: For simple cases (null handling, type casting), Claude generates a dbt patch for review.
:::

---

### ๐Ÿงน Data Quality Monitor & Anomaly Detector

::: {.callout-note collapse="true"}
## Description
An agent that runs automated data quality checks on key datasets (completeness, uniqueness, referential integrity, statistical anomalies), explains issues in plain language, traces them to upstream causes, and prioritizes fixes based on business impact.
:::

::: {.callout-tip collapse="true"}
## Implementation โ€” Python
```python
import pandas as pd
import numpy as np
from anthropic import Anthropic

client = Anthropic()

def run_quality_checks(df: pd.DataFrame) -> dict:
    checks = {
        "row_count": len(df),
        "null_percentages": (df.isnull().sum() / len(df) * 100).to_dict(),
        "duplicate_rows": int(df.duplicated().sum()),
    }
    for col in df.select_dtypes(include=[np.number]).columns:
        mean, std = df[col].mean(), df[col].std()
        outliers = df[abs(df[col] - mean) > 3 * std][col].count()
        checks[f"{col}_outliers_3sigma"] = int(outliers)
    return checks

def explain_quality_issues(checks: dict, table_name: str, business_context: str) -> str:
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1500,
        system="You are a data quality expert. Explain issues in plain English, prioritized by business impact.",
        messages=[{
            "role": "user",
            "content": f"Table: {table_name}\nContext: {business_context}\n\nQuality checks:\n{checks}\n\nExplain the top issues and their likely causes."
        }]
    )
    return response.content[0].text

Stack: Great Expectations (GX Cloud) + n8n + Claude API + Notion

  1. Daily Run: n8n triggers a Great Expectations validation suite on key tables each morning.
  2. Failed Checks: n8n fetches the GX validation report for any failing expectations.
  3. Explanation: Failed checks are sent to Claude, which produces a plain-language summary with severity and root causes.
  4. Tracking: Issues are logged in a Notion Data Quality Dashboard with status (New/In Progress/Resolved).

๐Ÿ“Š Automated KPI Reporting Agent

An agent that fetches KPI data from multiple sources (databases, analytics platforms, CRMs), compares current performance against targets and historical benchmarks, identifies significant deviations, and writes a narrative executive summary โ€” replacing the manual weekly reporting cycle.

from anthropic import Anthropic

client = Anthropic()

def generate_kpi_report(kpis: dict, period: str, targets: dict) -> str:
    deviations = {}
    for metric, value in kpis.items():
        if metric in targets:
            deviation_pct = ((value - targets[metric]) / targets[metric]) * 100
            deviations[metric] = {
                "value": value,
                "target": targets[metric],
                "deviation_pct": round(deviation_pct, 1)
            }

    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=2000,
        system="You are a business analyst writing concise executive reports. Highlight wins, concerns, and recommendations.",
        messages=[{
            "role": "user",
            "content": f"""Period: {period}
KPI Performance vs Targets: {deviations}

Write an executive report with:
1. Executive Summary (3 sentences max)
2. Green metrics (on/above target)
3. Red metrics with likely causes
4. Top 3 recommended actions"""
        }]
    )
    return response.content[0].text

kpis = {"MRR": 48200, "Churn_Rate": 3.2, "NPS": 42, "CAC": 380}
targets = {"MRR": 50000, "Churn_Rate": 2.5, "NPS": 45, "CAC": 350}
report = generate_kpi_report(kpis, "February 2026", targets)

Stack: Google Looker Studio + Make + Claude API + Google Slides + Slack

  1. Schedule: Make runs every Monday at 7AM.
  2. Fetch: Data is pulled from BigQuery, Stripe, and HubSpot via API connectors.
  3. Analyze: KPIs vs targets are calculated in Make, then sent to Claude for narrative generation.
  4. Build: A Google Slides presentation is auto-generated with the narrative inserted.
  5. Deliver: The PDF report link is posted to the #leadership Slack channel.
Back to top