Files
WikiJS/IT_Abteilung/Automatisierung/Channable_stack.md

108 KiB
Raw Permalink Blame History

title, description, published, date, tags, editor, dateCreated
title description published date tags editor dateCreated
Channable Stack LT24 Profice Dokumentation true 2026-03-06T11:13:13.929Z markdown 2026-03-06T11:09:09.239Z

Self-Hosted ETL-Pipeline für Google Shopping

Lokale Alternative zu Channable — Technische Dokumentation


Eigenschaft Wert
Dokumentversion 1.0.0
Erstellt am 2026-03-06
Zielplattform Debian 12 (Bookworm) Server
Tech-Stack Python 3.11+ · Polars · DuckDB · n8n (Docker)
Datensatzgröße 200.000+ Artikel, 30+ Attribute pro Artikel
Ziel-API Google Content API for Shopping
Sprache Deutsch

Inhaltsverzeichnis

  1. Systemübersicht
  2. Voraussetzungen
  3. Installation & Setup
  4. Projektstruktur
  5. Konfiguration
  6. Core Script Logic (Beispiel)
  7. n8n Workflow Integration
  8. Wartung & Monitoring

1. Systemübersicht

1.1 Architektur-Überblick

Die Pipeline ersetzt den kommerziellen SaaS-Dienst Channable durch eine vollständig selbst gehostete, Open-Source-basierte Lösung auf einem dedizierten Debian-Server. Der gesamte Datenfluss folgt dem klassischen ETL-Muster (Extract → Transform → Load) und wird täglich automatisiert durch n8n orchestriert.

Architektur-Diagramm

graph TB
    subgraph N8N [n8n - Docker Container]
        CRON[Cron-Trigger - 00:00 Uhr]
        ERR_HANDLER[Error-Handler - E-Mail / Slack]
    end

    subgraph EXTRACT [EXTRACT - Datenquellen]
        JTL[JTL-WAWI CSV-Export]
        GAPI[Google Ads/GA API - 90 Tage]
    end

    subgraph TRANSFORM [TRANSFORM - Verarbeitung]
        DUCKDB_JOIN[DuckDB - LEFT JOIN]
        POLARS[Polars Regel-Engine - rules.yaml]
    end

    subgraph LOAD [LOAD - Upload]
        GCONTENT[Google Content API - Batch-Upload]
    end

    DUCKDB_FILE[(DuckDB - products.duckdb)]
    LOGS[Logs und Monitoring]

    CRON -->|startet| JTL
    CRON -->|startet| GAPI
    JTL -->|CSV Import| DUCKDB_JOIN
    GAPI -->|Sales-Daten| DUCKDB_JOIN
    DUCKDB_JOIN -->|DataFrame| POLARS
    POLARS -->|transformierte Daten| GCONTENT
    DUCKDB_JOIN -.->|persistiert| DUCKDB_FILE
    ERR_HANDLER -.->|bei Fehler| LOGS

    style N8N fill:#2d3436,stroke:#636e72,color:#dfe6e9
    style EXTRACT fill:#0a3d62,stroke:#3c6382,color:#dfe6e9
    style TRANSFORM fill:#1e3799,stroke:#4a69bd,color:#dfe6e9
    style LOAD fill:#6a0572,stroke:#a834a8,color:#dfe6e9
    style JTL fill:#079992,stroke:#38ada9,color:#fff
    style GAPI fill:#079992,stroke:#38ada9,color:#fff
    style DUCKDB_JOIN fill:#0c2461,stroke:#1e3799,color:#fff
    style POLARS fill:#0c2461,stroke:#1e3799,color:#fff
    style GCONTENT fill:#6a0572,stroke:#a834a8,color:#fff
    style DUCKDB_FILE fill:#b71540,stroke:#e55039,color:#fff
    style LOGS fill:#474787,stroke:#706fd3,color:#fff
    style CRON fill:#e58e26,stroke:#fa983a,color:#fff
    style ERR_HANDLER fill:#e55039,stroke:#eb2f06,color:#fff

ETL-Datenfluss (vereinfacht)

graph LR
    A[JTL CSV - 200k+ Artikel] --> C[DuckDB - LEFT JOIN]
    B[Google API - Sales 90 Tage] --> C
    C --> D[Polars - rules.yaml]
    D --> E[Google Content API - Batch-Upload]

    style A fill:#079992,stroke:#38ada9,color:#fff
    style B fill:#079992,stroke:#38ada9,color:#fff
    style C fill:#0c2461,stroke:#1e3799,color:#fff
    style D fill:#1e3799,stroke:#4a69bd,color:#fff
    style E fill:#6a0572,stroke:#a834a8,color:#fff

1.2 Datenfluss im Detail

Der Datenfluss gliedert sich in drei klar voneinander getrennte Phasen:

Phase 1 — Extract (Datenextraktion)

In der Extract-Phase werden die Rohdaten aus zwei unabhängigen Quellen bezogen:

Quelle A — JTL-WAWI CSV-Export: Die ERP-Software JTL-WAWI exportiert den vollständigen Produktkatalog als CSV-Datei. Diese Datei enthält alle Stammdaten wie Artikelnummer, Titel, Beschreibung, Preis, EAN, Bestand, Kategorie, Bilder-URLs und weitere produktspezifische Attribute. Der Export wird automatisch durch einen JTL-Worker oder manuell über den JTL-Ameise-Export auf dem Server abgelegt.

Quelle B — Google Ads/Analytics Reporting API: Über die Google API werden die Verkaufsperformance-Daten der letzten 90 Tage abgerufen. Dazu gehören Metriken wie Verkaufsanzahl pro Artikel (sales_quantity), Umsatz pro Artikel (revenue), Klicks, Impressionen und Conversion-Rate. Diese Daten dienen als Grundlage für die regelbasierte Klassifizierung (z. B. „Bestseller"-Labels).

Beide Datenquellen werden als separate Tabellen in DuckDB importiert.

Phase 2 — Transform (Datentransformation)

In der Transform-Phase werden die Daten verknüpft und angereichert:

Zunächst wird in DuckDB ein LEFT JOIN zwischen der Produktstammdaten-Tabelle und der Verkaufsdaten-Tabelle durchgeführt. Die Verknüpfung erfolgt über eine gemeinsame Artikelnummer (z. B. sku oder offer_id). Das Ergebnis ist eine konsolidierte Tabelle, die sowohl Stamm- als auch Performance-Daten enthält.

Der resultierende DataFrame wird anschließend in Polars geladen. Hier werden regelbasierte Transformationen auf Grundlage der Datei rules.yaml angewandt. Typische Transformationen umfassen:

  • Zuweisung von Custom Labels (custom_label_0 bis custom_label_4) basierend auf Verkaufszahlen, Marge oder Kategorie.
  • Preisanpassungen und Sale-Preis-Logik.
  • Titeloptimierung (z. B. Hinzufügen von Marke oder Farbe).
  • Filterung von Artikeln, die nicht auf Google Shopping erscheinen sollen (z. B. Bestand = 0).
  • Anreicherung mit berechneten Feldern (z. B. margin_percent, performance_tier).

Phase 3 — Load (Daten-Upload)

Die transformierten Produktdaten werden über die Google Content API for Shopping als Batch-Request an das Google Merchant Center übertragen. Der Upload-Prozess berücksichtigt dabei die API-Quotas und implementiert ein Rate-Limiting mit exponentiellem Backoff, um 429 Too Many Requests-Fehler zu vermeiden. Fehlerhafte Einzelprodukte werden in einer separaten Fehler-Log-Datei protokolliert und blockieren nicht den gesamten Upload.


2. Voraussetzungen

2.1 Hardware-Anforderungen

Die folgenden Spezifikationen gelten für die zuverlässige Verarbeitung von 200.000+ Artikeln mit jeweils 30+ Attributen. Die Werte sind konservativ kalkuliert und berücksichtigen die parallele Ausführung von n8n (Docker) und der Python-Pipeline.

Ressource Minimum Empfohlen Begründung
CPU 4 Kerne (x86_64) 8+ Kerne (x86_64) Polars nutzt Multi-Threading automatisch aus
RAM 8 GB 16 GB DuckDB + Polars halten Daten im Arbeitsspeicher
Festplatte 40 GB SSD 100 GB NVMe SSD DuckDB-Dateien + CSV-Exporte + Logs + Docker-Images
Netzwerk 100 Mbit/s 1 Gbit/s Google API Batch-Uploads erfordern stabile Bandbreite
OS Debian 11 (Bullseye) Debian 12 (Bookworm) Aktuelle LTS-Basis mit langfristigem Sicherheits-Support

Speicherberechnung (Orientierungswerte)

Für eine Kalkulation mit 200.000 Zeilen × 30 Spalten (durchschnittlich ~100 Bytes pro Zelle):

Komponente Geschätzter Bedarf
Rohdaten CSV auf Disk ca. 550 650 MB
DuckDB komprimiert (auf Disk) ca. 150 250 MB
Polars DataFrame im RAM ca. 800 MB 1,2 GB
Peak Memory (JOIN + Transformation) ca. 2,5 4 GB
n8n Docker Container ca. 300 500 MB RAM
Gesamt Peak-RAM ca. 4 6 GB

Empfehlung: Bei Datensätzen über 500.000 Artikeln sollte auf 32 GB RAM und Streaming-Verarbeitung (Polars Lazy-Mode) umgestellt werden.

2.2 Erforderliche Debian-Pakete

Die folgenden Pakete müssen auf dem Debian-Server installiert sein, bevor die Pipeline eingerichtet wird.

# ──────────────────────────────────────────────────────────
# System-Updates durchführen
# ──────────────────────────────────────────────────────────
sudo apt update && sudo apt upgrade -y

# ──────────────────────────────────────────────────────────
# Grundlegende Build-Tools und System-Abhängigkeiten
# ──────────────────────────────────────────────────────────
sudo apt install -y \
    build-essential \
    python3.11 \
    python3.11-venv \
    python3.11-dev \
    python3-pip \
    curl \
    wget \
    git \
    jq \
    unzip \
    ca-certificates \
    gnupg \
    lsb-release \
    cron \
    logrotate \
    htop \
    tree

⚠️ Hinweis: Dies sind Beispielbefehle. Falls Python 3.11 nicht in den Standard-Repositories Ihrer Debian-Version verfügbar ist, muss es über das deadsnakes-PPA, einen Quellcode-Build oder das Paket python3 (in Debian 12 standardmäßig Python 3.11) installiert werden. Passen Sie die Versionsnummern an Ihre Umgebung an.

2.3 Software-Voraussetzungen

Software Mindestversion Zweck
Python 3.11+ Hauptlaufzeit für ETL-Skripte
Docker Engine 24.0+ Container-Runtime für n8n
Docker Compose Plugin 2.20+ Multi-Container-Orchestrierung
n8n 1.30+ Workflow-Automatisierung und Scheduling
DuckDB (Python-Modul) 0.10+ Analytische In-Process-Datenbank
Polars (Python-Modul) 0.20+ Schnelle DataFrame-Transformationen
google-api-python-client 2.100+ Google API-Zugriff
google-auth / google-auth-oauthlib 2.20+ Authentifizierung für Google APIs
PyYAML 6.0+ Regel-Konfiguration im YAML-Format

3. Installation & Setup

3.1 Docker und n8n einrichten

3.1.1 Docker Engine installieren

# ──────────────────────────────────────────────────────────
# Offizielle Docker GPG-Schlüssel hinzufügen
# ──────────────────────────────────────────────────────────
sudo install -m 0755 -d /etc/apt/keyrings

curl -fsSL https://download.docker.com/linux/debian/gpg | \
    sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg

sudo chmod a+r /etc/apt/keyrings/docker.gpg

# ──────────────────────────────────────────────────────────
# Docker-Repository einrichten
# ──────────────────────────────────────────────────────────
echo \
  "deb [arch=$(dpkg --print-architecture) \
  signed-by=/etc/apt/keyrings/docker.gpg] \
  https://download.docker.com/linux/debian \
  $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null

# ──────────────────────────────────────────────────────────
# Docker Engine und Compose Plugin installieren
# ──────────────────────────────────────────────────────────
sudo apt update
sudo apt install -y \
    docker-ce \
    docker-ce-cli \
    containerd.io \
    docker-buildx-plugin \
    docker-compose-plugin

# ──────────────────────────────────────────────────────────
# Docker ohne sudo ermöglichen (Relogin erforderlich)
# ──────────────────────────────────────────────────────────
sudo usermod -aG docker $USER
newgrp docker

# ──────────────────────────────────────────────────────────
# Installation verifizieren
# ──────────────────────────────────────────────────────────
docker --version
docker compose version

⚠️ Hinweis: Dies sind Beispielbefehle basierend auf der offiziellen Docker-Dokumentation für Debian. Prüfen Sie stets die aktuelle Installationsanleitung unter docs.docker.com.

3.1.2 n8n per Docker Compose bereitstellen

Erstellen Sie das Verzeichnis und die Konfiguration:

# ──────────────────────────────────────────────────────────
# Verzeichnisstruktur für n8n anlegen
# ──────────────────────────────────────────────────────────
sudo mkdir -p /opt/n8n/data
sudo chown -R $USER:$USER /opt/n8n

⚠️ Hinweis: Beispielbefehl. Passen Sie den Benutzer und die Berechtigungen an Ihre Server-Konfiguration an.

Erstellen Sie die Datei /opt/n8n/.env mit den Zugangsdaten:

# ──────────────────────────────────────────────────────────
# /opt/n8n/.env — Umgebungsvariablen für n8n
# ──────────────────────────────────────────────────────────
N8N_BASIC_AUTH_USER=admin
N8N_BASIC_AUTH_PASSWORD=IhrSicheresPasswortHier123!
N8N_ENCRYPTION_KEY=ein-langer-zufaelliger-encryption-key-hier

⚠️ Hinweis: Dies ist eine Beispiel-Konfiguration. Ersetzen Sie alle Platzhalter-Werte durch Ihre eigenen sicheren Passwörter und Schlüssel. Verwenden Sie openssl rand -hex 32 zur Generierung sicherer Schlüssel.

Erstellen Sie die Datei /opt/n8n/docker-compose.yml:

# ──────────────────────────────────────────────────────────
# /opt/n8n/docker-compose.yml — n8n Workflow-Engine
# ──────────────────────────────────────────────────────────
version: "3.8"

services:
  n8n:
    image: n8nio/n8n:latest
    container_name: n8n
    restart: unless-stopped
    ports:
      - "5678:5678"
    environment:
      - N8N_BASIC_AUTH_ACTIVE=true
      - N8N_BASIC_AUTH_USER=${N8N_BASIC_AUTH_USER}
      - N8N_BASIC_AUTH_PASSWORD=${N8N_BASIC_AUTH_PASSWORD}
      - N8N_ENCRYPTION_KEY=${N8N_ENCRYPTION_KEY}
      - GENERIC_TIMEZONE=Europe/Berlin
      - TZ=Europe/Berlin
      - N8N_LOG_LEVEL=info
      - N8N_LOG_OUTPUT=console,file
      - N8N_LOG_FILE_LOCATION=/home/node/.n8n/logs/n8n.log
      - N8N_DIAGNOSTICS_ENABLED=false
      - N8N_HIRING_BANNER_ENABLED=false
    volumes:
      # n8n-Daten persistent speichern
      - /opt/n8n/data:/home/node/.n8n
      # Zugriff auf ETL-Pipeline-Verzeichnis (Read-Only für Skripte)
      - /opt/etl-pipeline:/opt/etl-pipeline:ro
      # Zugriff auf ETL-Logs (Read-Write für Monitoring)
      - /opt/etl-pipeline/logs:/opt/etl-pipeline/logs:rw
    healthcheck:
      test: ["CMD-SHELL", "wget -qO- http://localhost:5678/healthz || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 30s
    deploy:
      resources:
        limits:
          memory: 1G
          cpus: "2.0"
        reservations:
          memory: 256M

⚠️ Hinweis: Dies ist eine Beispiel-Konfiguration. Die Volume-Pfade, Ressourcen-Limits und Ports müssen an Ihre Server-Umgebung angepasst werden.

n8n starten und Status prüfen:

# ──────────────────────────────────────────────────────────
# n8n Container starten
# ──────────────────────────────────────────────────────────
cd /opt/n8n
docker compose up -d

# Status prüfen
docker compose ps
docker compose logs -f --tail=50

⚠️ Hinweis: Beispielbefehle. Nach dem Start ist n8n unter http://<Server-IP>:5678 erreichbar.

3.2 Python-Umgebung einrichten

3.2.1 Virtual Environment erstellen

# ──────────────────────────────────────────────────────────
# Projektverzeichnis anlegen
# ──────────────────────────────────────────────────────────
sudo mkdir -p /opt/etl-pipeline
sudo chown -R $USER:$USER /opt/etl-pipeline
cd /opt/etl-pipeline

# ──────────────────────────────────────────────────────────
# Python Virtual Environment erstellen und aktivieren
# ──────────────────────────────────────────────────────────
python3.11 -m venv venv
source venv/bin/activate

# ──────────────────────────────────────────────────────────
# pip aktualisieren
# ──────────────────────────────────────────────────────────
pip install --upgrade pip setuptools wheel

⚠️ Hinweis: Dies sind Beispielbefehle. Stellen Sie sicher, dass python3.11 auf Ihrem System korrekt installiert ist (siehe Abschnitt 2.2).

3.2.2 Abhängigkeiten installieren

Erstellen Sie die Datei /opt/etl-pipeline/requirements.txt:

# ──────────────────────────────────────────────────────────
# /opt/etl-pipeline/requirements.txt
# ETL-Pipeline Abhängigkeiten
# ──────────────────────────────────────────────────────────

# DataFrame-Engine für schnelle Transformationen
polars>=0.20.0,<2.0.0

# Analytische In-Process SQL-Datenbank
duckdb>=0.10.0,<2.0.0

# Google API Client Libraries
google-api-python-client>=2.100.0
google-auth>=2.20.0
google-auth-oauthlib>=1.2.0
google-auth-httplib2>=0.2.0

# Konfiguration
PyYAML>=6.0

# Utilities
requests>=2.31.0
python-dateutil>=2.8.0
tenacity>=8.2.0

⚠️ Hinweis: Dies ist eine Beispiel-Datei. Prüfen Sie die aktuellen Versionen der Pakete und passen Sie die Versionsbeschränkungen an Ihre Kompatibilitätsanforderungen an.

Abhängigkeiten installieren:

# ──────────────────────────────────────────────────────────
# Abhängigkeiten in das Virtual Environment installieren
# ──────────────────────────────────────────────────────────
cd /opt/etl-pipeline
source venv/bin/activate
pip install -r requirements.txt

# Installation verifizieren
python -c "import polars; print(f'Polars: {polars.__version__}')"
python -c "import duckdb; print(f'DuckDB: {duckdb.__version__}')"
python -c "import googleapiclient; print('Google API Client: OK')"
python -c "import yaml; print(f'PyYAML: {yaml.__version__}')"

⚠️ Hinweis: Beispielbefehle zur Verifikation. Die Ausgaben variieren je nach installierten Versionen.


4. Projektstruktur

4.1 Empfohlene Verzeichnisstruktur

Die folgende Struktur organisiert alle Komponenten der Pipeline in logisch getrennte Verzeichnisse. Diese Trennung erleichtert Wartung, Backups und Zugriffssteuerung.

/opt/etl-pipeline/
│
├── venv/                           # Python Virtual Environment
│   └── ...                         # (von venv automatisch verwaltet)
│
├── requirements.txt                # Python-Abhängigkeiten
│
├── scripts/                        # Alle Python-Skripte
│   ├── __init__.py
│   ├── main.py                     # Hauptskript (Entry Point)
│   ├── extract.py                  # Modul: Datenextraktion (CSV + Google API)
│   ├── transform.py                # Modul: DuckDB JOIN + Polars Transformationen
│   ├── load.py                     # Modul: Google Content API Upload
│   ├── rules_engine.py             # Modul: YAML-Regel-Engine
│   └── utils.py                    # Hilfsfunktionen (Logging, Config-Laden)
│
├── config/                         # Konfigurationsdateien
│   ├── rules.yaml                  # Channable-ähnliche Transformationsregeln
│   ├── settings.yaml               # Allgemeine Pipeline-Einstellungen
│   └── column_mapping.yaml         # Spalten-Mapping JTL → Google Shopping
│
├── credentials/                    # Sensible Zugangsdaten (restriktive Rechte!)
│   ├── google_service_account.json # Google Service Account Key
│   └── .gitignore                  # NIEMALS in Git committen!
│
├── data/                           # Datendateien
│   ├── input/                      # Eingangsdaten
│   │   └── jtl_export.csv          # Aktueller JTL-WAWI CSV-Export
│   ├── output/                     # Verarbeitete Ausgabedaten
│   │   └── google_feed_YYYYMMDD.csv
│   ├── archive/                    # Archivierte ältere Exporte
│   │   └── jtl_export_20260301.csv
│   └── db/                         # DuckDB-Datenbankdateien
│       └── products.duckdb         # Persistente DuckDB-Datenbank
│
├── logs/                           # Log-Dateien
│   ├── etl_pipeline.log            # Haupt-Log (rotating)
│   ├── etl_pipeline.log.1          # Rotiertes Log
│   ├── error.log                   # Nur Fehler-Einträge
│   └── upload_errors/              # Detaillierte Upload-Fehler pro Lauf
│       └── errors_20260306.json
│
├── tests/                          # Unit- und Integrationstests
│   ├── __init__.py
│   ├── test_extract.py
│   ├── test_transform.py
│   └── test_rules_engine.py
│
└── docs/                           # Zusätzliche Projektdokumentation
    └── CHANGELOG.md

⚠️ Hinweis: Dies ist eine Beispiel-Struktur. Passen Sie die Verzeichnisstruktur an die spezifischen Anforderungen Ihres Projekts und Ihrer Organisation an.

4.2 Verzeichnisstruktur anlegen

# ──────────────────────────────────────────────────────────
# Vollständige Verzeichnisstruktur anlegen
# ──────────────────────────────────────────────────────────
cd /opt/etl-pipeline

mkdir -p scripts
mkdir -p config
mkdir -p credentials
mkdir -p data/{input,output,archive,db}
mkdir -p logs/upload_errors
mkdir -p tests
mkdir -p docs

# ──────────────────────────────────────────────────────────
# Berechtigungen für sensible Verzeichnisse setzen
# ──────────────────────────────────────────────────────────
chmod 700 credentials/
chmod 750 config/
chmod 755 logs/

# ──────────────────────────────────────────────────────────
# .gitignore für credentials anlegen
# ──────────────────────────────────────────────────────────
echo "*" > credentials/.gitignore
echo "!.gitignore" >> credentials/.gitignore

# ──────────────────────────────────────────────────────────
# Python-Modul-Initialisierung
# ──────────────────────────────────────────────────────────
touch scripts/__init__.py
touch tests/__init__.py

⚠️ Hinweis: Beispielbefehle. Stellen Sie sicher, dass der Benutzer, unter dem die Pipeline läuft, die entsprechenden Lese- und Schreibrechte auf allen Verzeichnissen besitzt.


5. Konfiguration

5.1 Google API Credentials sicher speichern

5.1.1 Service Account erstellen

Für die Kommunikation mit der Google Content API for Shopping wird ein Service Account empfohlen. Dieser ermöglicht eine automatisierte Authentifizierung ohne manuellen OAuth2-Flow.

Schritte in der Google Cloud Console:

  1. Navigieren Sie zur Google Cloud Console.
  2. Erstellen Sie ein neues Projekt oder wählen Sie ein bestehendes aus.
  3. Aktivieren Sie die folgenden APIs:
    • Content API for Shopping
    • Google Ads API (falls Verkaufsdaten direkt aus Google Ads bezogen werden)
    • Google Analytics Data API (falls GA4-Daten verwendet werden)
  4. Navigieren Sie zu IAM & Verwaltung → Dienstkonten.
  5. Erstellen Sie einen neuen Service Account mit einem aussagekräftigen Namen.
  6. Laden Sie den JSON-Key herunter.
  7. Fügen Sie den Service Account als Nutzer in Ihrem Google Merchant Center hinzu (mit der Rolle „Standard" oder „Admin").

5.1.2 Credentials sicher ablegen

# ──────────────────────────────────────────────────────────
# Service Account JSON-Key sicher auf dem Server ablegen
# ──────────────────────────────────────────────────────────

# Datei in das credentials-Verzeichnis kopieren
cp /pfad/zur/heruntergeladenen/datei.json \
    /opt/etl-pipeline/credentials/google_service_account.json

# Restriktive Berechtigungen setzen (nur Owner lesen)
chmod 600 /opt/etl-pipeline/credentials/google_service_account.json

# Eigentümerschaft prüfen
ls -la /opt/etl-pipeline/credentials/

⚠️ Hinweis: Beispielbefehle. Ersetzen Sie den Pfad durch den tatsächlichen Speicherort Ihrer heruntergeladenen JSON-Datei.

5.1.3 Credentials als Umgebungsvariable referenzieren (empfohlen)

Es ist empfehlenswert, den Pfad zur Credentials-Datei nicht hart im Skript zu kodieren, sondern über eine Umgebungsvariable bereitzustellen.

# ──────────────────────────────────────────────────────────
# In /opt/etl-pipeline/.env oder ~/.bashrc hinzufügen:
# ──────────────────────────────────────────────────────────
export GOOGLE_APPLICATION_CREDENTIALS="/opt/etl-pipeline/credentials/google_service_account.json"
export MERCHANT_CENTER_ID="123456789"

⚠️ Hinweis: Beispielkonfiguration. Ersetzen Sie die MERCHANT_CENTER_ID durch Ihre tatsächliche Merchant-Center-ID.

5.1.4 Sicherheitshinweise

  • Die JSON-Datei darf niemals in ein Git-Repository committed werden.
  • Der Zugriff auf das credentials/-Verzeichnis sollte auf den Pipeline-Benutzer beschränkt sein (chmod 700).
  • Rotieren Sie Service Account Keys regelmäßig (mindestens alle 90 Tage).
  • Verwenden Sie nach Möglichkeit Workload Identity Federation anstelle von heruntergeladenen JSON-Keys.
  • Protokollieren Sie niemals den Inhalt der Credentials-Datei in Log-Dateien.

5.2 Regel-Konfiguration (rules.yaml)

Die Datei rules.yaml ist das Herzstück der Channable-ähnlichen Funktionalität. Sie definiert regelbasierte Transformationen, die auf jeden Artikel angewandt werden. Die Regeln werden in der definierten Reihenfolge sequenziell abgearbeitet.

5.2.1 Struktur und Syntax

Erstellen Sie die Datei /opt/etl-pipeline/config/rules.yaml:

# ══════════════════════════════════════════════════════════
# /opt/etl-pipeline/config/rules.yaml
#
# Channable-ähnliche Regel-Engine für Google Shopping
# Regeln werden sequenziell von oben nach unten abgearbeitet.
#
# Unterstützte Operatoren:
#   Vergleich:  >, <, >=, <=, ==, !=
#   Text:       contains, not_contains, starts_with, ends_with
#   Logik:      and, or (innerhalb von conditions)
#   Existenz:   is_empty, is_not_empty
#
# Unterstützte Aktionen:
#   set_value:        Festes Wert setzen
#   copy_field:       Wert aus anderem Feld kopieren
#   prepend:          Text am Anfang hinzufügen
#   append:           Text am Ende hinzufügen
#   replace:          Text ersetzen
#   calculate:        Berechnung durchführen
#   exclude:          Artikel vom Feed ausschließen
#   map_category:     Kategorie-Mapping anwenden
# ══════════════════════════════════════════════════════════

# ──────────────────────────────────────────────────────────
# Globale Einstellungen
# ──────────────────────────────────────────────────────────
settings:
  feed_name: "Google Shopping DE"
  target_country: "DE"
  content_language: "de"
  currency: "EUR"
  # Artikel mit Bestand 0 automatisch ausschließen
  exclude_out_of_stock: true
  # Mindestpreis für den Feed (in EUR)
  minimum_price: 1.00

# ──────────────────────────────────────────────────────────
# Regel-Definitionen
# ──────────────────────────────────────────────────────────
rules:

  # ────────────────────────────────────────────────────────
  # REGEL 1: Bestseller-Label basierend auf Verkaufszahlen
  # ────────────────────────────────────────────────────────
  - name: "Bestseller-Klassifizierung"
    description: "Artikel mit mehr als 100 Verkäufen in 90 Tagen als Bestseller markieren"
    enabled: true
    priority: 10
    conditions:
      logic: "and"
      rules:
        - field: "sales_quantity_90d"
          operator: ">"
          value: 100
        - field: "stock_quantity"
          operator: ">"
          value: 0
    actions:
      - target_field: "custom_label_0"
        action: "set_value"
        value: "Bestseller"

  # ────────────────────────────────────────────────────────
  # REGEL 2: Slow Mover identifizieren
  # ────────────────────────────────────────────────────────
  - name: "Slow-Mover-Klassifizierung"
    description: "Artikel mit weniger als 5 Verkäufen in 90 Tagen als Slow Mover markieren"
    enabled: true
    priority: 20
    conditions:
      logic: "and"
      rules:
        - field: "sales_quantity_90d"
          operator: "<"
          value: 5
        - field: "days_in_catalog"
          operator: ">"
          value: 30
    actions:
      - target_field: "custom_label_0"
        action: "set_value"
        value: "Slow Mover"

  # ────────────────────────────────────────────────────────
  # REGEL 3: Performance-Tier (custom_label_1)
  # ────────────────────────────────────────────────────────
  - name: "Performance-Tier High"
    description: "Umsatzbasierte Tier-Einteilung für Bidding-Strategien"
    enabled: true
    priority: 30
    conditions:
      logic: "and"
      rules:
        - field: "revenue_90d"
          operator: ">="
          value: 1000
    actions:
      - target_field: "custom_label_1"
        action: "set_value"
        value: "Tier-1-High-Revenue"

  - name: "Performance-Tier Mittel"
    enabled: true
    priority: 31
    conditions:
      logic: "and"
      rules:
        - field: "revenue_90d"
          operator: ">="
          value: 200
        - field: "revenue_90d"
          operator: "<"
          value: 1000
    actions:
      - target_field: "custom_label_1"
        action: "set_value"
        value: "Tier-2-Mid-Revenue"

  - name: "Performance-Tier Niedrig"
    enabled: true
    priority: 32
    conditions:
      logic: "and"
      rules:
        - field: "revenue_90d"
          operator: "<"
          value: 200
    actions:
      - target_field: "custom_label_1"
        action: "set_value"
        value: "Tier-3-Low-Revenue"

  # ────────────────────────────────────────────────────────
  # REGEL 4: Margen-Klassifizierung (custom_label_2)
  # ────────────────────────────────────────────────────────
  - name: "Hohe Marge"
    enabled: true
    priority: 40
    conditions:
      logic: "and"
      rules:
        - field: "margin_percent"
          operator: ">="
          value: 40
    actions:
      - target_field: "custom_label_2"
        action: "set_value"
        value: "High-Margin"

  - name: "Niedrige Marge"
    enabled: true
    priority: 41
    conditions:
      logic: "and"
      rules:
        - field: "margin_percent"
          operator: "<"
          value: 15
    actions:
      - target_field: "custom_label_2"
        action: "set_value"
        value: "Low-Margin"

  # ────────────────────────────────────────────────────────
  # REGEL 5: Preiskategorie (custom_label_3)
  # ────────────────────────────────────────────────────────
  - name: "Premium-Preis"
    enabled: true
    priority: 50
    conditions:
      logic: "and"
      rules:
        - field: "price"
          operator: ">="
          value: 100
    actions:
      - target_field: "custom_label_3"
        action: "set_value"
        value: "Premium"

  - name: "Budget-Preis"
    enabled: true
    priority: 51
    conditions:
      logic: "and"
      rules:
        - field: "price"
          operator: "<"
          value: 20
    actions:
      - target_field: "custom_label_3"
        action: "set_value"
        value: "Budget"

  # ────────────────────────────────────────────────────────
  # REGEL 6: Titel-Optimierung
  # ────────────────────────────────────────────────────────
  - name: "Marke im Titel voranstellen"
    description: "Markenname am Anfang des Titels hinzufügen, falls nicht vorhanden"
    enabled: true
    priority: 60
    conditions:
      logic: "and"
      rules:
        - field: "brand"
          operator: "is_not_empty"
        - field: "title"
          operator: "not_contains"
          value_from_field: "brand"
    actions:
      - target_field: "title"
        action: "prepend"
        value_template: "{brand} - "

  # ────────────────────────────────────────────────────────
  # REGEL 7: Sale-Preis-Logik
  # ────────────────────────────────────────────────────────
  - name: "Sale-Preis setzen"
    description: "Wenn ein UVP vorhanden und höher als der VK-Preis ist, als Sale kennzeichnen"
    enabled: true
    priority: 70
    conditions:
      logic: "and"
      rules:
        - field: "uvp"
          operator: "is_not_empty"
        - field: "uvp"
          operator: ">"
          value_from_field: "price"
    actions:
      - target_field: "sale_price"
        action: "copy_field"
        source_field: "price"
      - target_field: "price"
        action: "copy_field"
        source_field: "uvp"

  # ────────────────────────────────────────────────────────
  # REGEL 8: Ausschluss-Regeln
  # ────────────────────────────────────────────────────────
  - name: "Artikel ohne Bild ausschließen"
    enabled: true
    priority: 80
    conditions:
      logic: "or"
      rules:
        - field: "image_url"
          operator: "is_empty"
        - field: "image_url"
          operator: "=="
          value: ""
    actions:
      - action: "exclude"
        reason: "Kein Produktbild vorhanden"

  - name: "Testprodukte ausschließen"
    enabled: true
    priority: 81
    conditions:
      logic: "or"
      rules:
        - field: "title"
          operator: "contains"
          value: "TEST"
        - field: "sku"
          operator: "starts_with"
          value: "ZZ-"
    actions:
      - action: "exclude"
        reason: "Testprodukt erkannt"

  # ────────────────────────────────────────────────────────
  # REGEL 9: Versandkosten basierend auf Preis
  # ────────────────────────────────────────────────────────
  - name: "Kostenloser Versand ab 49 EUR"
    enabled: true
    priority: 90
    conditions:
      logic: "and"
      rules:
        - field: "price"
          operator: ">="
          value: 49
    actions:
      - target_field: "shipping_cost"
        action: "set_value"
        value: "0.00"

  - name: "Standard-Versandkosten"
    enabled: true
    priority: 91
    conditions:
      logic: "and"
      rules:
        - field: "price"
          operator: "<"
          value: 49
    actions:
      - target_field: "shipping_cost"
        action: "set_value"
        value: "4.95"

⚠️ Hinweis: Dies ist eine Beispiel-Konfigurationsdatei. Alle Schwellenwerte, Feldnamen und Regeln müssen an Ihre tatsächlichen Geschäftsanforderungen, JTL-Exportfelder und Google-Shopping-Attribute angepasst werden.

5.2.2 Spalten-Mapping (column_mapping.yaml)

Erstellen Sie zusätzlich die Datei /opt/etl-pipeline/config/column_mapping.yaml:

# ══════════════════════════════════════════════════════════
# /opt/etl-pipeline/config/column_mapping.yaml
#
# Mapping: JTL-WAWI Exportfeld → Google Shopping Attribut
# ══════════════════════════════════════════════════════════

mapping:
  # Pflichtfelder Google Shopping
  Artikelnummer:          "offerId"
  Artikelname:            "title"
  Beschreibung:           "description"
  URL:                    "link"
  BildURL:                "imageLink"
  VK_Brutto:              "price"
  Waehrung:               "priceCurrency"
  Verfuegbarkeit:         "availability"
  Marke:                  "brand"
  GTIN:                   "gtin"
  MPN:                    "mpn"
  Zustand:                "condition"
  Kategorie_Google:       "googleProductCategory"

  # Optionale Felder
  Farbe:                  "color"
  Groesse:                "size"
  Material:               "material"
  Geschlecht:             "gender"
  Altersgruppe:           "ageGroup"
  EK_Netto:               "cost_of_goods_sold"
  UVP:                    "uvp"
  Lagerbestand:           "stock_quantity"

  # Berechnete Felder (werden durch Pipeline erzeugt)
  # sales_quantity_90d     → aus Google API
  # revenue_90d            → aus Google API
  # margin_percent         → berechnet aus VK und EK
  # custom_label_0-4       → aus rules.yaml

⚠️ Hinweis: Dies ist eine Beispiel-Zuordnung. Die tatsächlichen Spaltennamen in Ihrem JTL-WAWI-Export können abweichen. Prüfen Sie die Header-Zeile Ihrer CSV-Datei und passen Sie das Mapping entsprechend an.


6. Core Script Logic (Beispiel)

Dieses Kapitel zeigt den konzeptionellen Aufbau der Python-Skripte. Der Code ist als funktionales Boilerplate zu verstehen, das als Grundlage für die eigene Implementierung dient.

6.1 Hilfsfunktionen (utils.py)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
/opt/etl-pipeline/scripts/utils.py

Hilfsfunktionen für Logging, Konfiguration und gemeinsam genutzte Utilities.
"""

import os
import sys
import yaml
import logging
from pathlib import Path
from datetime import datetime
from logging.handlers import RotatingFileHandler


# ──────────────────────────────────────────────────────────
# Pfad-Konstanten
# ──────────────────────────────────────────────────────────
BASE_DIR = Path("/opt/etl-pipeline")
CONFIG_DIR = BASE_DIR / "config"
DATA_DIR = BASE_DIR / "data"
LOG_DIR = BASE_DIR / "logs"
CREDENTIALS_DIR = BASE_DIR / "credentials"


def setup_logging(
    log_name: str = "etl_pipeline",
    log_level: int = logging.INFO,
    max_bytes: int = 10 * 1024 * 1024,  # 10 MB
    backup_count: int = 5,
) -> logging.Logger:
    """
    Konfiguriert das Logging mit Rotation und separatem Error-Log.

    Args:
        log_name: Name des Loggers und der Log-Datei.
        log_level: Logging-Level (default: INFO).
        max_bytes: Maximale Größe einer Log-Datei vor Rotation.
        backup_count: Anzahl der aufzubewahrenden rotierten Log-Dateien.

    Returns:
        Konfigurierter Logger.
    """
    LOG_DIR.mkdir(parents=True, exist_ok=True)

    logger = logging.getLogger(log_name)
    logger.setLevel(log_level)

    # Verhindere doppelte Handler bei mehrfachem Aufruf
    if logger.handlers:
        return logger

    # Formatter
    formatter = logging.Formatter(
        fmt="%(asctime)s | %(levelname)-8s | %(name)s | %(funcName)s:%(lineno)d | %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )

    # Haupt-Log (Rotation)
    main_handler = RotatingFileHandler(
        LOG_DIR / f"{log_name}.log",
        maxBytes=max_bytes,
        backupCount=backup_count,
        encoding="utf-8",
    )
    main_handler.setLevel(log_level)
    main_handler.setFormatter(formatter)

    # Error-Log (nur ERROR und höher)
    error_handler = RotatingFileHandler(
        LOG_DIR / "error.log",
        maxBytes=max_bytes,
        backupCount=backup_count,
        encoding="utf-8",
    )
    error_handler.setLevel(logging.ERROR)
    error_handler.setFormatter(formatter)

    # Console-Handler
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(log_level)
    console_handler.setFormatter(formatter)

    logger.addHandler(main_handler)
    logger.addHandler(error_handler)
    logger.addHandler(console_handler)

    return logger


def load_yaml_config(config_filename: str) -> dict:
    """
    Lädt eine YAML-Konfigurationsdatei aus dem config-Verzeichnis.

    Args:
        config_filename: Name der YAML-Datei (z. B. 'rules.yaml').

    Returns:
        Dictionary mit dem Inhalt der YAML-Datei.

    Raises:
        FileNotFoundError: Wenn die Datei nicht existiert.
        yaml.YAMLError: Wenn die YAML-Syntax ungültig ist.
    """
    config_path = CONFIG_DIR / config_filename
    if not config_path.exists():
        raise FileNotFoundError(
            f"Konfigurationsdatei nicht gefunden: {config_path}"
        )

    with open(config_path, "r", encoding="utf-8") as f:
        config = yaml.safe_load(f)

    return config


def get_timestamp() -> str:
    """Gibt einen formatierten Zeitstempel zurück (YYYYMMDD_HHMMSS)."""
    return datetime.now().strftime("%Y%m%d_%H%M%S")


def get_date_stamp() -> str:
    """Gibt einen formatierten Datumsstempel zurück (YYYYMMDD)."""
    return datetime.now().strftime("%Y%m%d")

⚠️ Hinweis: Dies ist ein Beispiel-Skript. Alle Pfade, Log-Konfigurationen und Funktionen müssen an Ihre tatsächliche Serverumgebung und Anforderungen angepasst werden.

6.2 Extract-Modul (extract.py)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
/opt/etl-pipeline/scripts/extract.py

Modul für die Datenextraktion aus JTL-WAWI CSV und Google APIs.
Beide Datenquellen werden als Tabellen in DuckDB importiert.
"""

import os
import shutil
import duckdb
import polars as pl
from pathlib import Path
from datetime import datetime, timedelta

from google.oauth2 import service_account
from googleapiclient.discovery import build

from scripts.utils import (
    setup_logging,
    BASE_DIR,
    DATA_DIR,
    CREDENTIALS_DIR,
    get_date_stamp,
)

logger = setup_logging("extract")


# ══════════════════════════════════════════════════════════
# JTL-WAWI CSV-Import
# ══════════════════════════════════════════════════════════

def load_jtl_csv_to_duckdb(
    con: duckdb.DuckDBPyConnection,
    csv_path: str | Path | None = None,
    table_name: str = "products_raw",
) -> int:
    """
    Liest die JTL-WAWI CSV-Exportdatei und importiert sie als Tabelle
    in die DuckDB-Datenbank.

    Args:
        con: Aktive DuckDB-Verbindung.
        csv_path: Pfad zur CSV-Datei. Falls None, wird der Standardpfad verwendet.
        table_name: Name der Zieltabelle in DuckDB.

    Returns:
        Anzahl der importierten Zeilen.
    """
    if csv_path is None:
        csv_path = DATA_DIR / "input" / "jtl_export.csv"

    csv_path = Path(csv_path)
    if not csv_path.exists():
        raise FileNotFoundError(f"JTL-CSV nicht gefunden: {csv_path}")

    file_size_mb = csv_path.stat().st_size / (1024 * 1024)
    logger.info(f"Lade JTL-CSV: {csv_path} ({file_size_mb:.1f} MB)")

    # Bestehende Tabelle löschen und neu erstellen
    con.execute(f"DROP TABLE IF EXISTS {table_name}")

    # CSV mit DuckDB's optimiertem CSV-Reader importieren
    # auto_detect=true erkennt Trennzeichen, Encoding und Datentypen
    con.execute(f"""
        CREATE TABLE {table_name} AS
        SELECT *
        FROM read_csv_auto(
            '{csv_path}',
            header = true,
            delim = ';',
            quote = '"',
            escape = '"',
            encoding = 'utf-8',
            sample_size = 10000,
            ignore_errors = false
        )
    """)

    row_count = con.execute(
        f"SELECT COUNT(*) FROM {table_name}"
    ).fetchone()[0]

    logger.info(
        f"JTL-CSV erfolgreich geladen: "
        f"{row_count:,} Zeilen in Tabelle '{table_name}'"
    )

    # Archivierung des Imports
    archive_path = DATA_DIR / "archive" / f"jtl_export_{get_date_stamp()}.csv"
    if not archive_path.exists():
        shutil.copy2(csv_path, archive_path)
        logger.info(f"CSV archiviert: {archive_path}")

    return row_count


# ══════════════════════════════════════════════════════════
# Google Sales-Daten abrufen
# ══════════════════════════════════════════════════════════

def fetch_google_sales_data(
    con: duckdb.DuckDBPyConnection,
    table_name: str = "sales_90d",
    lookback_days: int = 90,
) -> int:
    """
    Ruft Verkaufsdaten der letzten N Tage über die Google API ab
    und importiert sie als Tabelle in DuckDB.

    Args:
        con: Aktive DuckDB-Verbindung.
        table_name: Name der Zieltabelle in DuckDB.
        lookback_days: Anzahl der zurückliegenden Tage (Standard: 90).

    Returns:
        Anzahl der importierten Zeilen.
    """
    logger.info(
        f"Rufe Google Sales-Daten ab (letzte {lookback_days} Tage)..."
    )

    # ── Google API Authentifizierung ──────────────────────
    credentials_path = os.environ.get(
        "GOOGLE_APPLICATION_CREDENTIALS",
        str(CREDENTIALS_DIR / "google_service_account.json"),
    )
    merchant_id = os.environ.get("MERCHANT_CENTER_ID")

    if not merchant_id:
        raise ValueError(
            "Umgebungsvariable MERCHANT_CENTER_ID ist nicht gesetzt"
        )

    credentials = service_account.Credentials.from_service_account_file(
        credentials_path,
        scopes=["https://www.googleapis.com/auth/content"],
    )

    service = build("content", "v2.1", credentials=credentials)

    # ── Zeitraum berechnen ────────────────────────────────
    end_date = datetime.now()
    start_date = end_date - timedelta(days=lookback_days)

    # ── Performance-Daten aus dem Merchant Center abrufen ─
    # HINWEIS: Der tatsächliche API-Aufruf hängt von der
    # verwendeten Google API ab (Merchant Center Reports,
    # Google Ads API, GA4 Data API, etc.)
    # Hier wird ein konzeptionelles Beispiel gezeigt.

    sales_records = []

    try:
        # Beispiel: Merchant Center Performance Report
        request_body = {
            "query": f"""
                SELECT
                    segments.offer_id,
                    metrics.impressions,
                    metrics.clicks,
                    metrics.conversions,
                    metrics.conversion_value
                FROM MerchantPerformanceView
                WHERE segments.date BETWEEN
                    '{start_date.strftime('%Y-%m-%d')}'
                    AND '{end_date.strftime('%Y-%m-%d')}'
            """
        }

        response = (
            service.reports()
            .search(merchantId=merchant_id, body=request_body)
            .execute()
        )

        for row in response.get("results", []):
            segments = row.get("segments", {})
            metrics = row.get("metrics", {})
            sales_records.append({
                "offer_id": segments.get("offerId", ""),
                "impressions": int(metrics.get("impressions", 0)),
                "clicks": int(metrics.get("clicks", 0)),
                "sales_quantity_90d": int(
                    metrics.get("conversions", 0)
                ),
                "revenue_90d": float(
                    metrics.get("conversionValue", 0.0)
                ),
            })

    except Exception as e:
        logger.error(
            f"Fehler beim Abrufen der Google Sales-Daten: {e}"
        )
        raise

    # ── Daten in DuckDB importieren ───────────────────────
    if sales_records:
        df_sales = pl.DataFrame(sales_records)
        con.execute(f"DROP TABLE IF EXISTS {table_name}")
        con.execute(
            f"CREATE TABLE {table_name} AS SELECT * FROM df_sales"
        )
        row_count = con.execute(
            f"SELECT COUNT(*) FROM {table_name}"
        ).fetchone()[0]
        logger.info(
            f"Google Sales-Daten geladen: "
            f"{row_count:,} Zeilen in '{table_name}'"
        )
        return row_count
    else:
        logger.warning(
            "Keine Sales-Daten von der Google API erhalten."
        )
        # Leere Tabelle erstellen, damit der JOIN nicht fehlschlägt
        con.execute(f"DROP TABLE IF EXISTS {table_name}")
        con.execute(f"""
            CREATE TABLE {table_name} (
                offer_id VARCHAR,
                impressions INTEGER DEFAULT 0,
                clicks INTEGER DEFAULT 0,
                sales_quantity_90d INTEGER DEFAULT 0,
                revenue_90d DOUBLE DEFAULT 0.0
            )
        """)
        return 0

⚠️ Hinweis: Dies ist ein konzeptionelles Beispiel-Skript. Die Google API-Aufrufe, insbesondere der Report-Query und die Antwortstruktur, müssen an die tatsächlich verwendete API-Version und Ihr Merchant-Center-Setup angepasst werden. Konsultieren Sie die offizielle Google API-Dokumentation für die aktuellen Endpunkte und Antwortformate.

6.3 Transform-Modul (transform.py)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
/opt/etl-pipeline/scripts/transform.py

Modul für die Datentransformation:
1. LEFT JOIN in DuckDB (Produkte ⟕ Sales)
2. Polars-Transformationen basierend auf rules.yaml
"""

import duckdb
import polars as pl
from typing import Any

from scripts.utils import setup_logging, load_yaml_config

logger = setup_logging("transform")


# ══════════════════════════════════════════════════════════
# DuckDB LEFT JOIN
# ══════════════════════════════════════════════════════════

def join_products_with_sales(
    con: duckdb.DuckDBPyConnection,
    products_table: str = "products_raw",
    sales_table: str = "sales_90d",
    joined_table: str = "products_enriched",
    join_key_products: str = "Artikelnummer",
    join_key_sales: str = "offer_id",
) -> pl.DataFrame:
    """
    Führt einen LEFT JOIN zwischen Produktstammdaten und
    Verkaufsdaten in DuckDB durch und gibt das Ergebnis als
    Polars DataFrame zurück.

    Ein LEFT JOIN stellt sicher, dass alle Produkte erhalten
    bleiben, auch wenn keine Verkaufsdaten vorliegen
    (NULL-Werte werden mit 0 gefüllt).

    Args:
        con: Aktive DuckDB-Verbindung.
        products_table: Name der Produkttabelle.
        sales_table: Name der Verkaufstabelle.
        joined_table: Name der Ergebnistabelle.
        join_key_products: JOIN-Schlüssel in der Produkttabelle.
        join_key_sales: JOIN-Schlüssel in der Verkaufstabelle.

    Returns:
        Polars DataFrame mit den verknüpften Daten.
    """
    logger.info(
        f"Führe LEFT JOIN durch: {products_table}{sales_table} "
        f"ON {join_key_products} = {join_key_sales}"
    )

    # ── LEFT JOIN mit COALESCE für NULL-Behandlung ────────
    con.execute(f"DROP TABLE IF EXISTS {joined_table}")
    con.execute(f"""
        CREATE TABLE {joined_table} AS
        SELECT
            p.*,
            COALESCE(s.impressions, 0)
                AS impressions,
            COALESCE(s.clicks, 0)
                AS clicks,
            COALESCE(s.sales_quantity_90d, 0)
                AS sales_quantity_90d,
            COALESCE(s.revenue_90d, 0.0)
                AS revenue_90d
        FROM {products_table} AS p
        LEFT JOIN {sales_table} AS s
            ON CAST(p."{join_key_products}" AS VARCHAR)
             = CAST(s.{join_key_sales} AS VARCHAR)
    """)

    row_count = con.execute(
        f"SELECT COUNT(*) FROM {joined_table}"
    ).fetchone()[0]
    logger.info(
        f"LEFT JOIN abgeschlossen: "
        f"{row_count:,} Zeilen in '{joined_table}'"
    )

    # ── Ergebnis als Polars DataFrame exportieren ─────────
    result = con.execute(f"SELECT * FROM {joined_table}").pl()
    logger.info(
        f"Polars DataFrame erstellt: "
        f"{result.shape[0]:,} Zeilen, {result.shape[1]} Spalten"
    )

    return result


# ══════════════════════════════════════════════════════════
# Berechnete Felder hinzufügen
# ══════════════════════════════════════════════════════════

def add_calculated_fields(df: pl.DataFrame) -> pl.DataFrame:
    """
    Fügt berechnete Felder hinzu, die als Grundlage für die
    Regel-Engine dienen.

    Args:
        df: Polars DataFrame mit den verknüpften Rohdaten.

    Returns:
        Polars DataFrame mit zusätzlichen berechneten Spalten.
    """
    logger.info("Füge berechnete Felder hinzu...")

    df = df.with_columns([
        # Marge in Prozent (wenn EK vorhanden)
        pl.when(
            (pl.col("EK_Netto").is_not_null())
            & (pl.col("VK_Brutto") > 0)
        )
        .then(
            (
                (pl.col("VK_Brutto") - pl.col("EK_Netto"))
                / pl.col("VK_Brutto")
                * 100
            ).round(2)
        )
        .otherwise(pl.lit(None))
        .alias("margin_percent"),

        # Click-Through-Rate
        pl.when(pl.col("impressions") > 0)
        .then(
            (pl.col("clicks") / pl.col("impressions") * 100)
            .round(2)
        )
        .otherwise(pl.lit(0.0))
        .alias("ctr_percent"),

        # Conversion-Rate
        pl.when(pl.col("clicks") > 0)
        .then(
            (
                pl.col("sales_quantity_90d")
                / pl.col("clicks")
                * 100
            ).round(2)
        )
        .otherwise(pl.lit(0.0))
        .alias("conversion_rate_percent"),

        # Custom Label Spalten initialisieren (leer)
        pl.lit("").alias("custom_label_0"),
        pl.lit("").alias("custom_label_1"),
        pl.lit("").alias("custom_label_2"),
        pl.lit("").alias("custom_label_3"),
        pl.lit("").alias("custom_label_4"),

        # Versandkosten initialisieren
        pl.lit("4.95").alias("shipping_cost"),

        # Exclude-Flag initialisieren
        pl.lit(False).alias("_excluded"),
        pl.lit("").alias("_exclude_reason"),
    ])

    logger.info(
        f"Berechnete Felder hinzugefügt. "
        f"DataFrame: {df.shape[1]} Spalten"
    )
    return df


# ══════════════════════════════════════════════════════════
# YAML Regel-Engine
# ══════════════════════════════════════════════════════════

def evaluate_condition(
    df: pl.DataFrame,
    condition: dict,
) -> pl.Series:
    """
    Evaluiert eine einzelne Bedingung und gibt eine
    boolesche Maske zurück.

    Args:
        df: Polars DataFrame.
        condition: Dictionary mit field, operator, value.

    Returns:
        Polars Series (Boolean) als Filtermaske.
    """
    field = condition["field"]
    operator = condition["operator"]
    value = condition.get("value")
    value_from_field = condition.get("value_from_field")

    # Prüfen ob das Feld existiert
    if field not in df.columns:
        logger.warning(
            f"Feld '{field}' existiert nicht im DataFrame. "
            f"Bedingung übersprungen."
        )
        return pl.Series("mask", [False] * df.height)

    col = pl.col(field)

    # Vergleichswert: fester Wert oder aus anderem Feld
    if value_from_field:
        compare_value = pl.col(value_from_field)
    else:
        compare_value = value

    # ── Operator-Mapping ──────────────────────────────────
    operator_map = {
        ">":    col > compare_value,
        "<":    col < compare_value,
        ">=":   col >= compare_value,
        "<=":   col <= compare_value,
        "==":   col == compare_value,
        "!=":   col != compare_value,
        "contains": col.cast(pl.Utf8).str.contains(
            str(compare_value), literal=True
        ),
        "not_contains": ~col.cast(pl.Utf8).str.contains(
            str(compare_value), literal=True
        ),
        "starts_with": col.cast(pl.Utf8).str.starts_with(
            str(compare_value)
        ),
        "ends_with": col.cast(pl.Utf8).str.ends_with(
            str(compare_value)
        ),
        "is_empty": (
            col.is_null() | (col.cast(pl.Utf8) == "")
        ),
        "is_not_empty": (
            col.is_not_null() & (col.cast(pl.Utf8) != "")
        ),
    }

    if operator not in operator_map:
        logger.error(f"Unbekannter Operator: '{operator}'")
        return pl.Series("mask", [False] * df.height)

    return df.select(operator_map[operator]).to_series()


def apply_rules(
    df: pl.DataFrame,
    rules_config: dict,
) -> pl.DataFrame:
    """
    Wendet alle aktivierten Regeln aus der rules.yaml auf den
    DataFrame an. Regeln werden nach Priorität sortiert und
    sequenziell abgearbeitet.

    Args:
        df: Polars DataFrame mit den angereicherten Produktdaten.
        rules_config: Geladene rules.yaml als Dictionary.

    Returns:
        Polars DataFrame nach Anwendung aller Regeln.
    """
    rules = rules_config.get("rules", [])
    settings = rules_config.get("settings", {})

    # Regeln nach Priorität sortieren
    # (niedrigere Zahl = höhere Priorität)
    rules_sorted = sorted(
        [r for r in rules if r.get("enabled", True)],
        key=lambda r: r.get("priority", 999),
    )

    logger.info(f"Wende {len(rules_sorted)} aktive Regeln an...")

    # ── Globale Ausschlüsse (Settings) ────────────────────
    if settings.get("exclude_out_of_stock", False):
        df = df.with_columns(
            pl.when(pl.col("stock_quantity") <= 0)
            .then(pl.lit(True))
            .otherwise(pl.col("_excluded"))
            .alias("_excluded")
        )
        excluded = df.filter(pl.col("_excluded")).height
        logger.info(
            f"Globaler Ausschluss (Bestand=0): "
            f"{excluded:,} Artikel markiert"
        )

    min_price = settings.get("minimum_price", 0)
    if min_price > 0:
        df = df.with_columns(
            pl.when(pl.col("VK_Brutto") < min_price)
            .then(pl.lit(True))
            .otherwise(pl.col("_excluded"))
            .alias("_excluded")
        )

    # ── Regeln sequenziell anwenden ───────────────────────
    for rule in rules_sorted:
        rule_name = rule.get("name", "Unbenannt")
        conditions_config = rule.get("conditions", {})
        actions = rule.get("actions", [])
        logic = conditions_config.get("logic", "and")
        sub_rules = conditions_config.get("rules", [])

        logger.debug(
            f"Verarbeite Regel: '{rule_name}' "
            f"(Priorität: {rule.get('priority', '?')})"
        )

        # Alle Teilbedingungen evaluieren
        masks = [
            evaluate_condition(df, cond) for cond in sub_rules
        ]

        if not masks:
            logger.warning(
                f"Regel '{rule_name}' hat keine Bedingungen. "
                f"Übersprungen."
            )
            continue

        # Logische Verknüpfung der Teilbedingungen
        if logic == "and":
            combined_mask = masks[0]
            for m in masks[1:]:
                combined_mask = combined_mask & m
        elif logic == "or":
            combined_mask = masks[0]
            for m in masks[1:]:
                combined_mask = combined_mask | m
        else:
            logger.error(
                f"Unbekannte Logik '{logic}' in Regel "
                f"'{rule_name}'. Übersprungen."
            )
            continue

        matching_count = combined_mask.sum()
        logger.info(
            f"Regel '{rule_name}': "
            f"{matching_count:,} Artikel treffen zu"
        )

        # Aktionen auf übereinstimmende Zeilen anwenden
        for action_def in actions:
            action_type = action_def.get("action", "set_value")
            target_field = action_def.get("target_field", "")

            if action_type == "set_value":
                value = action_def.get("value", "")
                df = df.with_columns(
                    pl.when(combined_mask)
                    .then(pl.lit(value))
                    .otherwise(pl.col(target_field))
                    .alias(target_field)
                )

            elif action_type == "copy_field":
                source = action_def.get("source_field", "")
                if source in df.columns:
                    df = df.with_columns(
                        pl.when(combined_mask)
                        .then(pl.col(source))
                        .otherwise(pl.col(target_field))
                        .alias(target_field)
                    )

            elif action_type == "prepend":
                template = action_def.get(
                    "value_template", ""
                )
                for col_name in df.columns:
                    placeholder = f"{{{col_name}}}"
                    if placeholder in template:
                        suffix = template.replace(
                            placeholder, ""
                        )
                        df = df.with_columns(
                            pl.when(combined_mask)
                            .then(
                                pl.col(col_name)
                                .cast(pl.Utf8)
                                + pl.lit(suffix)
                                + pl.col(target_field)
                                .cast(pl.Utf8)
                            )
                            .otherwise(pl.col(target_field))
                            .alias(target_field)
                        )
                        break

            elif action_type == "append":
                value = action_def.get("value", "")
                df = df.with_columns(
                    pl.when(combined_mask)
                    .then(
                        pl.col(target_field).cast(pl.Utf8)
                        + pl.lit(value)
                    )
                    .otherwise(pl.col(target_field))
                    .alias(target_field)
                )

            elif action_type == "exclude":
                reason = action_def.get(
                    "reason", "Durch Regel ausgeschlossen"
                )
                df = df.with_columns(
                    pl.when(combined_mask)
                    .then(pl.lit(True))
                    .otherwise(pl.col("_excluded"))
                    .alias("_excluded"),
                    pl.when(combined_mask)
                    .then(pl.lit(reason))
                    .otherwise(pl.col("_exclude_reason"))
                    .alias("_exclude_reason"),
                )

    # ── Statistik ─────────────────────────────────────────
    total = df.height
    excluded_count = df.filter(pl.col("_excluded")).height
    active_count = total - excluded_count
    logger.info(
        f"Transformation abgeschlossen: {total:,} Gesamt | "
        f"{active_count:,} Aktiv | "
        f"{excluded_count:,} Ausgeschlossen"
    )

    return df

⚠️ Hinweis: Dies ist ein konzeptionelles Beispiel-Skript. Die Regel-Engine ist bewusst einfach gehalten und muss für den Produktionseinsatz um Fehlerbehandlung, Validierung der Eingabedaten und weitere Operatoren erweitert werden. Die Feldnamen (EK_Netto, VK_Brutto, stock_quantity usw.) müssen mit den tatsächlichen Spalten Ihres JTL-Exports übereinstimmen.

6.4 Load-Modul (load.py)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
/opt/etl-pipeline/scripts/load.py

Modul für den Batch-Upload transformierter Produktdaten
an die Google Content API for Shopping.
Implementiert Rate-Limiting und exponentielles Backoff.
"""

import os
import json
import time
import polars as pl
from pathlib import Path
from datetime import datetime

from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type,
)

from scripts.utils import (
    setup_logging,
    load_yaml_config,
    CREDENTIALS_DIR,
    DATA_DIR,
    LOG_DIR,
    get_date_stamp,
)

logger = setup_logging("load")


# ══════════════════════════════════════════════════════════
# Konstanten
# ══════════════════════════════════════════════════════════
BATCH_SIZE = 1000         # Produkte pro Batch-Request
RATE_LIMIT_DELAY = 1.0    # Sekunden Pause zwischen Batches
MAX_RETRIES = 5           # Maximale Wiederholungsversuche
BACKOFF_MULTIPLIER = 2    # Exponentieller Backoff-Faktor


# ══════════════════════════════════════════════════════════
# Google Content API Client
# ══════════════════════════════════════════════════════════

def get_content_api_service():
    """
    Erstellt einen authentifizierten Google Content API Service.

    Returns:
        Google API Service-Objekt für Content API v2.1.
    """
    credentials_path = os.environ.get(
        "GOOGLE_APPLICATION_CREDENTIALS",
        str(CREDENTIALS_DIR / "google_service_account.json"),
    )

    credentials = service_account.Credentials.from_service_account_file(
        credentials_path,
        scopes=["https://www.googleapis.com/auth/content"],
    )

    service = build("content", "v2.1", credentials=credentials)
    logger.info("Google Content API Service erstellt.")
    return service


# ══════════════════════════════════════════════════════════
# Produkt-Payload-Erstellung
# ══════════════════════════════════════════════════════════

def build_product_payload(
    row: dict,
    column_mapping: dict,
    settings: dict,
) -> dict:
    """
    Erstellt den API-Payload für ein einzelnes Produkt nach
    dem Google Shopping Content API Schema.

    Args:
        row: Dictionary mit den Produktdaten einer Zeile.
        column_mapping: Spalten-Mapping (JTL → Google).
        settings: Globale Pipeline-Einstellungen.

    Returns:
        Dictionary im Google Content API Produktformat.
    """
    target_country = settings.get("target_country", "DE")
    content_language = settings.get("content_language", "de")
    currency = settings.get("currency", "EUR")
    mapping = column_mapping.get("mapping", {})

    product = {
        "offerId": str(row.get(
            mapping.get("Artikelnummer", "Artikelnummer"), ""
        )),
        "title": str(row.get("title", row.get(
            mapping.get("Artikelname", ""), ""
        ))),
        "description": str(row.get(
            mapping.get("Beschreibung", "Beschreibung"), ""
        )),
        "link": str(row.get(
            mapping.get("URL", "URL"), ""
        )),
        "imageLink": str(row.get(
            mapping.get("BildURL", "BildURL"), ""
        )),
        "contentLanguage": content_language,
        "targetCountry": target_country,
        "channel": "online",
        "availability": (
            "in stock"
            if row.get("stock_quantity", 0) > 0
            else "out of stock"
        ),
        "condition": "new",
        "price": {
            "value": str(row.get("VK_Brutto", "0.00")),
            "currency": currency,
        },
    }

    # Optionale Felder hinzufügen
    brand = row.get("Marke", row.get("brand", ""))
    if brand:
        product["brand"] = str(brand)

    gtin = row.get("GTIN", row.get("gtin", ""))
    if gtin:
        product["gtin"] = str(gtin)

    # Custom Labels (0-4)
    for i in range(5):
        label_value = row.get(f"custom_label_{i}", "")
        if label_value:
            product[f"customLabel{i}"] = str(label_value)

    # Sale-Preis
    sale_price = row.get("sale_price", "")
    if sale_price and float(sale_price) > 0:
        product["salePrice"] = {
            "value": str(sale_price),
            "currency": currency,
        }

    # Versandkosten
    shipping_cost = row.get("shipping_cost", "")
    if shipping_cost is not None and shipping_cost != "":
        product["shipping"] = [{
            "country": target_country,
            "price": {
                "value": str(shipping_cost),
                "currency": currency,
            },
        }]

    return product


# ══════════════════════════════════════════════════════════
# Batch-Upload mit Rate Limiting
# ══════════════════════════════════════════════════════════

@retry(
    stop=stop_after_attempt(MAX_RETRIES),
    wait=wait_exponential(
        multiplier=BACKOFF_MULTIPLIER, min=2, max=60
    ),
    retry=retry_if_exception_type(
        (HttpError, ConnectionError, TimeoutError)
    ),
    before_sleep=lambda retry_state: logger.warning(
        f"Retry {retry_state.attempt_number}/{MAX_RETRIES} "
        f"nach Fehler. "
        f"Warte {retry_state.next_action.sleep:.1f}s..."
    ),
)
def execute_batch_request(
    service,
    merchant_id: str,
    batch_entries: list[dict],
) -> dict:
    """
    Führt einen einzelnen Batch-Request an die Content API aus.
    Implementiert exponentielles Backoff bei Rate-Limit-Fehlern.

    Args:
        service: Google Content API Service.
        merchant_id: Merchant Center ID.
        batch_entries: Liste der Batch-Einträge.

    Returns:
        API-Antwort als Dictionary.
    """
    body = {"entries": batch_entries}

    response = (
        service.products()
        .custombatch(body=body)
        .execute()
    )

    return response


def upload_products_to_google(
    df: pl.DataFrame,
    batch_size: int = BATCH_SIZE,
    rate_limit_delay: float = RATE_LIMIT_DELAY,
) -> dict:
    """
    Lädt alle aktiven (nicht ausgeschlossenen) Produkte als
    Batch-Requests an die Google Content API hoch.

    Args:
        df: Polars DataFrame mit transformierten Produktdaten.
        batch_size: Anzahl der Produkte pro Batch-Request.
        rate_limit_delay: Pause in Sekunden zwischen Batches.

    Returns:
        Dictionary mit Upload-Statistiken.
    """
    merchant_id = os.environ.get("MERCHANT_CENTER_ID")
    if not merchant_id:
        raise ValueError("MERCHANT_CENTER_ID ist nicht gesetzt")

    # Konfiguration laden
    column_mapping = load_yaml_config("column_mapping.yaml")
    rules_config = load_yaml_config("rules.yaml")
    settings = rules_config.get("settings", {})

    # Nur aktive (nicht ausgeschlossene) Produkte uploaden
    df_active = df.filter(~pl.col("_excluded"))
    total_products = df_active.height

    logger.info(
        f"Starte Upload: {total_products:,} aktive Produkte "
        f"in Batches von {batch_size}"
    )

    # Service erstellen
    service = get_content_api_service()

    # ── Statistiken ───────────────────────────────────────
    stats = {
        "total": total_products,
        "success": 0,
        "errors": 0,
        "batches": 0,
        "start_time": datetime.now().isoformat(),
        "error_details": [],
    }

    # ── Produkte in Batches aufteilen und hochladen ───────
    rows_as_dicts = df_active.to_dicts()
    total_batches = (
        (total_products + batch_size - 1) // batch_size
    )

    for batch_start in range(0, total_products, batch_size):
        batch_end = min(batch_start + batch_size, total_products)
        batch_rows = rows_as_dicts[batch_start:batch_end]
        batch_number = (batch_start // batch_size) + 1

        logger.info(
            f"Batch {batch_number}/{total_batches}: "
            f"Produkte {batch_start + 1:,}  {batch_end:,}"
        )

        # Batch-Einträge erstellen
        batch_entries = []
        for idx, row in enumerate(batch_rows):
            product_payload = build_product_payload(
                row, column_mapping, settings
            )
            batch_entries.append({
                "batchId": batch_start + idx,
                "merchantId": merchant_id,
                "method": "insert",
                "product": product_payload,
            })

        # Batch-Request ausführen (mit Retry/Backoff)
        try:
            response = execute_batch_request(
                service, merchant_id, batch_entries
            )

            # Antwort auswerten
            for entry in response.get("entries", []):
                if (
                    "errors" in entry
                    and entry["errors"].get("errors")
                ):
                    stats["errors"] += 1
                    batch_idx = entry.get("batchId", 0)
                    local_idx = batch_idx - batch_start
                    offer_id = "unbekannt"
                    if 0 <= local_idx < len(batch_rows):
                        offer_id = batch_rows[local_idx].get(
                            "Artikelnummer", "unbekannt"
                        )
                    error_info = {
                        "batchId": batch_idx,
                        "offerId": offer_id,
                        "errors": entry["errors"]["errors"],
                    }
                    stats["error_details"].append(error_info)
                    logger.warning(
                        f"Fehler bei Produkt {offer_id}: "
                        f"{entry['errors']['errors']}"
                    )
                else:
                    stats["success"] += 1

        except HttpError as e:
            if e.resp.status == 429:
                logger.warning(
                    f"Rate Limit erreicht bei Batch "
                    f"{batch_number}. Erhöhe Wartezeit..."
                )
                time.sleep(rate_limit_delay * 5)
            else:
                logger.error(
                    f"HTTP-Fehler bei Batch "
                    f"{batch_number}: {e}"
                )
                stats["errors"] += len(batch_rows)

        except Exception as e:
            logger.error(
                f"Unerwarteter Fehler bei Batch "
                f"{batch_number}: {e}"
            )
            stats["errors"] += len(batch_rows)

        stats["batches"] += 1

        # ── Rate Limiting: Pause zwischen Batches ─────────
        if batch_end < total_products:
            logger.debug(
                f"Rate Limiting: {rate_limit_delay}s Pause..."
            )
            time.sleep(rate_limit_delay)

    # ── Upload-Statistiken loggen ─────────────────────────
    stats["end_time"] = datetime.now().isoformat()
    stats["duration_seconds"] = (
        datetime.fromisoformat(stats["end_time"])
        - datetime.fromisoformat(stats["start_time"])
    ).total_seconds()

    logger.info(
        f"Upload abgeschlossen: "
        f"{stats['success']:,} erfolgreich | "
        f"{stats['errors']:,} Fehler | "
        f"{stats['batches']} Batches | "
        f"{stats['duration_seconds']:.1f}s Gesamtdauer"
    )

    # ── Fehlerdetails als JSON speichern ──────────────────
    if stats["error_details"]:
        error_dir = LOG_DIR / "upload_errors"
        error_dir.mkdir(parents=True, exist_ok=True)
        error_file = (
            error_dir / f"errors_{get_date_stamp()}.json"
        )
        with open(error_file, "w", encoding="utf-8") as f:
            json.dump(
                stats["error_details"],
                f,
                indent=2,
                ensure_ascii=False,
            )
        logger.info(f"Fehlerdetails gespeichert: {error_file}")

    return stats

⚠️ Hinweis: Dies ist ein konzeptionelles Beispiel-Skript. Die Batch-Größe, Rate-Limiting-Werte und API-Payloads müssen an Ihre spezifischen API-Quotas und Produktdatenstrukturen angepasst werden. Testen Sie den Upload zunächst mit einer kleinen Teilmenge (z. B. 10 Produkte) im Testmodus der Google Content API, bevor Sie den vollständigen Datensatz hochladen.

6.5 Hauptskript (main.py)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
/opt/etl-pipeline/scripts/main.py

Hauptskript (Entry Point) für die ETL-Pipeline.
Orchestriert den gesamten Ablauf: Extract → Transform → Load.

Aufruf:
    cd /opt/etl-pipeline
    source venv/bin/activate
    python -m scripts.main
"""

import sys
import time
import duckdb
from pathlib import Path

from scripts.utils import (
    setup_logging,
    load_yaml_config,
    BASE_DIR,
    DATA_DIR,
    get_timestamp,
)
from scripts.extract import (
    load_jtl_csv_to_duckdb,
    fetch_google_sales_data,
)
from scripts.transform import (
    join_products_with_sales,
    add_calculated_fields,
    apply_rules,
)
from scripts.load import upload_products_to_google

logger = setup_logging("main")


def run_pipeline() -> dict:
    """
    Führt die vollständige ETL-Pipeline aus.

    Returns:
        Dictionary mit Statistiken aller drei Phasen.

    Raises:
        SystemExit: Bei kritischen Fehlern mit Exit-Code 1.
    """
    pipeline_start = time.time()
    run_id = get_timestamp()

    logger.info(f"{'═' * 60}")
    logger.info(f"ETL-Pipeline gestartet — Run-ID: {run_id}")
    logger.info(f"{'═' * 60}")

    stats = {
        "run_id": run_id,
        "extract": {},
        "transform": {},
        "load": {},
    }

    # ══════════════════════════════════════════════════════
    # DuckDB-Verbindung herstellen
    # ══════════════════════════════════════════════════════
    db_path = DATA_DIR / "db" / "products.duckdb"
    db_path.parent.mkdir(parents=True, exist_ok=True)

    con = duckdb.connect(str(db_path))
    logger.info(f"DuckDB-Verbindung hergestellt: {db_path}")

    try:
        # ══════════════════════════════════════════════════
        # PHASE 1: EXTRACT
        # ══════════════════════════════════════════════════
        logger.info(f"{'─' * 40}")
        logger.info("PHASE 1: EXTRACT")
        logger.info(f"{'─' * 40}")

        # JTL CSV laden
        jtl_rows = load_jtl_csv_to_duckdb(con)
        stats["extract"]["jtl_rows"] = jtl_rows

        # Google Sales-Daten abrufen
        sales_rows = fetch_google_sales_data(con)
        stats["extract"]["sales_rows"] = sales_rows

        # ══════════════════════════════════════════════════
        # PHASE 2: TRANSFORM
        # ══════════════════════════════════════════════════
        logger.info(f"{'─' * 40}")
        logger.info("PHASE 2: TRANSFORM")
        logger.info(f"{'─' * 40}")

        # LEFT JOIN: Produkte ⟕ Sales
        df = join_products_with_sales(con)

        # Berechnete Felder hinzufügen
        df = add_calculated_fields(df)

        # YAML-Regeln anwenden
        rules_config = load_yaml_config("rules.yaml")
        df = apply_rules(df, rules_config)

        stats["transform"]["total_products"] = df.height
        stats["transform"]["active_products"] = df.filter(
            ~df["_excluded"]
        ).height
        stats["transform"]["excluded_products"] = df.filter(
            df["_excluded"]
        ).height

        # ══════════════════════════════════════════════════
        # PHASE 3: LOAD
        # ══════════════════════════════════════════════════
        logger.info(f"{'─' * 40}")
        logger.info("PHASE 3: LOAD")
        logger.info(f"{'─' * 40}")

        upload_stats = upload_products_to_google(df)
        stats["load"] = upload_stats

    except Exception as e:
        logger.critical(
            f"Pipeline fehlgeschlagen: {e}", exc_info=True
        )
        # Exit-Code 1 signalisiert n8n einen Fehler
        sys.exit(1)

    finally:
        con.close()
        logger.info("DuckDB-Verbindung geschlossen.")

    # ══════════════════════════════════════════════════════
    # Zusammenfassung
    # ══════════════════════════════════════════════════════
    pipeline_duration = time.time() - pipeline_start
    stats["duration_seconds"] = round(pipeline_duration, 2)

    logger.info(f"{'═' * 60}")
    logger.info(
        f"ETL-Pipeline abgeschlossen — "
        f"Dauer: {pipeline_duration:.1f}s"
    )
    logger.info(
        f"  Extract:   "
        f"{stats['extract'].get('jtl_rows', 0):,} JTL, "
        f"{stats['extract'].get('sales_rows', 0):,} Sales"
    )
    logger.info(
        f"  Transform: "
        f"{stats['transform'].get('active_products', 0):,} aktiv, "
        f"{stats['transform'].get('excluded_products', 0):,} "
        f"ausgeschlossen"
    )
    logger.info(
        f"  Load:      "
        f"{stats['load'].get('success', 0):,} OK, "
        f"{stats['load'].get('errors', 0):,} Fehler"
    )
    logger.info(f"{'═' * 60}")

    return stats


# ──────────────────────────────────────────────────────────
# Entry Point
# ──────────────────────────────────────────────────────────
if __name__ == "__main__":
    run_pipeline()

⚠️ Hinweis: Dies ist ein konzeptionelles Beispiel-Skript. Es dient als Ausgangspunkt und Entry Point für die Pipeline. Vor dem produktiven Einsatz sollten alle Module gründlich mit Testdaten validiert und die Fehlerbehandlung an Ihre Anforderungen angepasst werden.


7. n8n Workflow Integration

7.1 Übersicht

Der n8n Workflow übernimmt zwei zentrale Aufgaben:

  1. Scheduling: Tägliche Ausführung der Python-Pipeline um 00:00 Uhr (Mitternacht).
  2. Error Handling: Benachrichtigung per E-Mail (oder Slack/Telegram) bei Fehlern.

Die Kommunikation zwischen n8n (Docker-Container) und der Python-Pipeline auf dem Host-System erfolgt über den n8n-Node Execute Command, der Shell-Befehle im Container ausführt. Da das Pipeline-Verzeichnis als Volume gemountet ist, hat n8n Zugriff auf die Skripte.

7.2 Workflow-Architektur

graph TD
    CRON[Cron Trigger - 00:00 Uhr]
    EXEC[Execute Command - python -m scripts.main]
    CHECK{Exit-Code pruefen}
    SUCCESS[Erfolgs-Meldung - Log + E-Mail]
    ERROR[Fehler-Alarm - E-Mail / Slack]

    CRON --> EXEC
    EXEC --> CHECK
    CHECK -->|Exit 0 - OK| SUCCESS
    CHECK -->|Exit 1 - Fehler| ERROR

    style CRON fill:#e58e26,stroke:#fa983a,color:#fff
    style EXEC fill:#0c2461,stroke:#1e3799,color:#fff
    style CHECK fill:#474787,stroke:#706fd3,color:#fff
    style SUCCESS fill:#079992,stroke:#38ada9,color:#fff
    style ERROR fill:#b71540,stroke:#e55039,color:#fff

7.3 Workflow als n8n-JSON importieren

Der folgende JSON-Export kann direkt in n8n importiert werden unter Workflow → Import from File. Speichern Sie den Inhalt als .json-Datei.

{
  "name": "ETL Pipeline - Google Shopping (Daily)",
  "nodes": [
    {
      "parameters": {
        "rule": {
          "interval": [
            {
              "triggerAtHour": 0,
              "triggerAtMinute": 0
            }
          ]
        }
      },
      "name": "Cron Trigger (00:00)",
      "type": "n8n-nodes-base.scheduleTrigger",
      "typeVersion": 1.2,
      "position": [240, 300]
    },
    {
      "parameters": {
        "command": "cd /opt/etl-pipeline && source venv/bin/activate && python -m scripts.main 2>&1",
        "timeout": 3600
      },
      "name": "Execute ETL Pipeline",
      "type": "n8n-nodes-base.executeCommand",
      "typeVersion": 1,
      "position": [480, 300]
    },
    {
      "parameters": {
        "conditions": {
          "number": [
            {
              "value1": "={{ $json.exitCode }}",
              "operation": "equal",
              "value2": 0
            }
          ]
        }
      },
      "name": "Check Exit Code",
      "type": "n8n-nodes-base.if",
      "typeVersion": 1,
      "position": [720, 300]
    },
    {
      "parameters": {
        "fromEmail": "etl-pipeline@ihre-domain.de",
        "toEmail": "admin@ihre-domain.de",
        "subject": "✅ ETL Pipeline — Erfolg ({{ $now.format('yyyy-MM-dd') }})",
        "text": "Die ETL-Pipeline wurde erfolgreich ausgeführt.\n\nZeitstempel: {{ $now.format('yyyy-MM-dd HH:mm:ss') }}\n\nStdout:\n{{ $json.stdout }}"
      },
      "name": "Success E-Mail",
      "type": "n8n-nodes-base.emailSend",
      "typeVersion": 2,
      "position": [960, 200]
    },
    {
      "parameters": {
        "fromEmail": "etl-pipeline@ihre-domain.de",
        "toEmail": "admin@ihre-domain.de",
        "subject": "❌ ETL Pipeline — FEHLER ({{ $now.format('yyyy-MM-dd') }})",
        "text": "ACHTUNG: Die ETL-Pipeline ist fehlgeschlagen!\n\nZeitstempel: {{ $now.format('yyyy-MM-dd HH:mm:ss') }}\nExit-Code: {{ $json.exitCode }}\n\nStderr:\n{{ $json.stderr }}\n\nStdout:\n{{ $json.stdout }}\n\nBitte prüfen: /opt/etl-pipeline/logs/error.log"
      },
      "name": "Error E-Mail",
      "type": "n8n-nodes-base.emailSend",
      "typeVersion": 2,
      "position": [960, 400]
    }
  ],
  "connections": {
    "Cron Trigger (00:00)": {
      "main": [
        [{ "node": "Execute ETL Pipeline", "type": "main", "index": 0 }]
      ]
    },
    "Execute ETL Pipeline": {
      "main": [
        [{ "node": "Check Exit Code", "type": "main", "index": 0 }]
      ]
    },
    "Check Exit Code": {
      "main": [
        [{ "node": "Success E-Mail", "type": "main", "index": 0 }],
        [{ "node": "Error E-Mail", "type": "main", "index": 0 }]
      ]
    }
  },
  "settings": {
    "timezone": "Europe/Berlin",
    "saveExecutionProgress": true,
    "saveManualExecutions": true,
    "executionTimeout": 7200
  }
}

⚠️ Hinweis: Dies ist ein Beispiel-Workflow für n8n. Die E-Mail-Adressen, SMTP-Einstellungen und Timeouts müssen an Ihre Umgebung angepasst werden. Konfigurieren Sie die SMTP-Credentials in n8n unter Settings → Credentials bevor Sie den Workflow aktivieren.

7.4 Wichtige n8n-Konfigurationshinweise

Timeout-Einstellung

Da die Verarbeitung von 200.000+ Artikeln je nach Serverhardware zwischen 5 und 30 Minuten dauern kann, muss der Timeout im Execute Command-Node ausreichend hoch gesetzt werden:

timeout: 3600   # 1 Stunde (in Sekunden)

Volume-Mount beachten

Der Execute Command-Node führt Befehle innerhalb des n8n Docker-Containers aus. Damit die Python-Skripte und das Virtual Environment erreichbar sind, muss der Pfad /opt/etl-pipeline als Volume im docker-compose.yml gemountet sein (siehe Abschnitt 3.1.2).

Alternative: Host-Befehl über SSH

Falls die Pipeline nicht innerhalb des Containers laufen soll, kann alternativ ein SSH-Node verwendet werden, der den Befehl auf dem Host ausführt:

# Befehl für n8n SSH-Node:
ssh pipeline-user@localhost \
    "cd /opt/etl-pipeline && source venv/bin/activate && python -m scripts.main"

⚠️ Hinweis: Beispielbefehl. Für die SSH-Variante muss ein SSH-Key ohne Passphrase konfiguriert und die SSH-Credentials in n8n hinterlegt werden.

7.5 Manueller Pipeline-Start (Debugging)

Für Debugging und Tests kann die Pipeline jederzeit manuell gestartet werden:

# ──────────────────────────────────────────────────────────
# Pipeline manuell ausführen
# ──────────────────────────────────────────────────────────
cd /opt/etl-pipeline
source venv/bin/activate
python -m scripts.main

# Alternativ: Nur den Exit-Code prüfen
python -m scripts.main; echo "Exit-Code: $?"

⚠️ Hinweis: Beispielbefehle für den manuellen Start. Stellen Sie sicher, dass die Umgebungsvariablen (GOOGLE_APPLICATION_CREDENTIALS, MERCHANT_CENTER_ID) in der Shell-Session gesetzt sind.


8. Wartung & Monitoring

8.1 Log-Management

8.1.1 Log-Struktur

Die Pipeline erzeugt folgende Log-Dateien:

Datei Inhalt Rotation
logs/etl_pipeline.log Alle Log-Einträge (INFO+) 10 MB, 5 Backups
logs/error.log Nur Fehler (ERROR+) 10 MB, 5 Backups
logs/upload_errors/errors_YYYYMMDD.json Detail-Fehler pro Upload-Lauf Täglich, eine Datei

8.1.2 Logrotate-Konfiguration

Zusätzlich zur integrierten Python-Rotation empfiehlt sich eine systemweite Logrotate-Konfiguration als Sicherheitsnetz.

Erstellen Sie die Datei /etc/logrotate.d/etl-pipeline:

/opt/etl-pipeline/logs/*.log {
    daily
    missingok
    rotate 30
    compress
    delaycompress
    notifempty
    create 0640 root root
    sharedscripts
    postrotate
        # Optional: Signal an laufende Prozesse
        /bin/true
    endscript
}

/opt/etl-pipeline/logs/upload_errors/*.json {
    daily
    missingok
    rotate 90
    compress
    delaycompress
    notifempty
    create 0640 root root
}

⚠️ Hinweis: Dies ist eine Beispiel-Logrotate-Konfiguration. Passen Sie den Benutzer, die Aufbewahrungsdauer (rotate) und die Dateiberechtigungen an Ihre Sicherheitsrichtlinien an.

8.1.3 Log-Bereinigung (Cron)

Alte Log-Dateien und Upload-Fehlerprotokolle können zusätzlich per Cron bereinigt werden:

# ──────────────────────────────────────────────────────────
# Crontab-Eintrag: Logs älter als 90 Tage löschen
# ──────────────────────────────────────────────────────────
# crontab -e
0 3 * * 0 find /opt/etl-pipeline/logs/ -name "*.log.*" -mtime +90 -delete
0 3 * * 0 find /opt/etl-pipeline/logs/upload_errors/ -name "*.json" -mtime +90 -delete
0 3 * * 0 find /opt/etl-pipeline/data/archive/ -name "*.csv" -mtime +180 -delete

⚠️ Hinweis: Beispiel-Crontab-Einträge. Passen Sie die Aufbewahrungsfristen (hier: 90 bzw. 180 Tage) an Ihre Compliance-Anforderungen an.

8.2 Performance-Tipps für 200.000+ Artikel

8.2.1 DuckDB-Optimierungen

# ──────────────────────────────────────────────────────────
# DuckDB Performance-Konfiguration
# (am Anfang der Pipeline setzen)
# ──────────────────────────────────────────────────────────

con = duckdb.connect("products.duckdb")

# Verfügbaren Arbeitsspeicher für DuckDB erhöhen
con.execute("SET memory_limit = '4GB'")

# Alle verfügbaren CPU-Kerne nutzen
con.execute("SET threads TO 8")

# Temporäre Dateien auf schnelle SSD legen
con.execute("SET temp_directory = '/opt/etl-pipeline/data/tmp'")

# Fortschrittsanzeige für lange Queries
con.execute("SET enable_progress_bar = true")

⚠️ Hinweis: Beispiel-Konfiguration. Die Werte für memory_limit und threads müssen an die tatsächliche Serverausstattung angepasst werden.

8.2.2 Polars-Optimierungen

# ──────────────────────────────────────────────────────────
# Polars Performance-Tipps
# ──────────────────────────────────────────────────────────

import polars as pl

# 1. Lazy Evaluation verwenden (bei sehr großen Datensätzen)
#    Polars optimiert den Query-Plan automatisch
df_lazy = pl.scan_csv(
    "data/input/jtl_export.csv", separator=";"
)
df_result = (
    df_lazy
    .filter(pl.col("Lagerbestand") > 0)
    .with_columns([
        pl.col("VK_Brutto").cast(pl.Float64),
    ])
    .collect()  # Erst hier werden die Daten materialisiert
)

# 2. Streaming-Modus für Datensätze > verfügbares RAM
df_result = (
    df_lazy
    .filter(pl.col("Lagerbestand") > 0)
    .collect(streaming=True)  # Verarbeitet in Chunks
)

# 3. Datentypen optimieren (reduziert RAM-Verbrauch)
df = df.with_columns([
    pl.col("Lagerbestand").cast(pl.Int32),        # statt Int64
    pl.col("clicks").cast(pl.Int32),               # statt Int64
    pl.col("impressions").cast(pl.Int32),           # statt Int64
    pl.col("custom_label_0").cast(pl.Categorical), # statt Utf8
    pl.col("custom_label_1").cast(pl.Categorical), # statt Utf8
])

# 4. Spaltenauswahl: Nur benötigte Spalten laden
df = pl.read_csv(
    "data.csv",
    columns=["sku", "title", "price", "stock"],
)

⚠️ Hinweis: Dies sind Beispiel-Codeausschnitte zur Performance-Optimierung. Die tatsächlichen Spaltennamen und Datentypen müssen an Ihren Datensatz angepasst werden.

8.2.3 Google API Upload-Optimierungen

Parameter Empfehlung Begründung
BATCH_SIZE 1.000 Google empfiehlt max. 1.000 Einträge pro Batch
RATE_LIMIT_DELAY 1,0 2,0 Sekunden Verhindert 429-Fehler bei Standard-Quotas
MAX_RETRIES 5 Ausreichend für temporäre Fehler
Parallele Requests Nicht empfohlen Erhöht 429-Risiko, sequenziell ist sicherer
Geschätzte Upload-Dauer 10 20 Minuten Bei 200 Batches × 1s Delay + API-Latenz

8.2.4 Allgemeine Server-Monitoring-Befehle

# ──────────────────────────────────────────────────────────
# System-Monitoring während Pipeline-Lauf
# ──────────────────────────────────────────────────────────

# RAM- und CPU-Auslastung in Echtzeit beobachten
htop

# Festplatten-I/O überwachen
iostat -x 5

# Pipeline-Prozess gezielt überwachen
watch -n 2 "ps aux | grep 'python.*scripts.main' | grep -v grep"

# Festplattennutzung des Pipeline-Verzeichnisses
du -sh /opt/etl-pipeline/data/*

# DuckDB-Dateigröße prüfen
ls -lh /opt/etl-pipeline/data/db/products.duckdb

⚠️ Hinweis: Beispielbefehle für das System-Monitoring. Diese dienen zur Orientierung und sollten je nach Ihren Monitoring-Tools (z. B. Prometheus, Grafana, Zabbix) ergänzt oder ersetzt werden.

8.3 Backup-Empfehlungen

Die folgenden Dateien und Verzeichnisse sollten regelmäßig gesichert werden:

Pfad Priorität Frequenz
/opt/etl-pipeline/config/ Hoch Bei Änderung
/opt/etl-pipeline/credentials/ Kritisch Bei Änderung
/opt/etl-pipeline/scripts/ Hoch Bei Änderung
/opt/etl-pipeline/data/db/products.duckdb Mittel Täglich
/opt/n8n/data/ Hoch Wöchentlich
#!/bin/bash
# ──────────────────────────────────────────────────────────
# Einfaches Backup-Skript (Beispiel)
# ──────────────────────────────────────────────────────────
BACKUP_DIR="/backup/etl-pipeline/$(date +%Y%m%d)"
mkdir -p "$BACKUP_DIR"

# Konfiguration und Skripte sichern
tar czf "$BACKUP_DIR/config_scripts.tar.gz" \
    /opt/etl-pipeline/config/ \
    /opt/etl-pipeline/scripts/ \
    /opt/etl-pipeline/requirements.txt

# DuckDB sichern
cp /opt/etl-pipeline/data/db/products.duckdb \
   "$BACKUP_DIR/products.duckdb"

# n8n Workflows sichern
tar czf "$BACKUP_DIR/n8n_data.tar.gz" /opt/n8n/data/

echo "Backup erstellt: $BACKUP_DIR"

⚠️ Hinweis: Dies ist ein vereinfachtes Beispiel-Backup-Skript. Für den Produktionseinsatz sollte eine vollwertige Backup-Lösung (z. B. BorgBackup, Restic) mit Verschlüsselung und Off-Site-Speicherung verwendet werden.

8.4 Checkliste für den Produktivbetrieb

Vor der Aktivierung der automatisierten Pipeline sollten folgende Punkte abgearbeitet sein:

  • Python Virtual Environment erstellt und alle Abhängigkeiten installiert
  • DuckDB-Datenbank initialisiert und CSV-Import getestet
  • Google Service Account erstellt und in Merchant Center eingeladen
  • GOOGLE_APPLICATION_CREDENTIALS und MERCHANT_CENTER_ID als Umgebungsvariablen gesetzt
  • rules.yaml mit initialen Regeln konfiguriert
  • column_mapping.yaml an JTL-Export-Spalten angepasst
  • Pipeline manuell mit Testdaten durchgelaufen (alle 3 Phasen)
  • n8n Workflow importiert und Cron-Trigger konfiguriert
  • E-Mail-/Slack-Benachrichtigung bei Fehlern getestet
  • Logrotate-Konfiguration aktiv
  • Backup-Strategie implementiert
  • Firewall-Regeln: Ausgehender Traffic zu Google APIs erlaubt
  • Monitoring-Dashboard eingerichtet (optional)

Anhang

A. Nützliche Befehle (Kurzreferenz)

# ── Pipeline ──────────────────────────────────────────────
cd /opt/etl-pipeline && source venv/bin/activate
python -m scripts.main                    # Pipeline starten
python -m scripts.main 2>&1 | tee run.log # Mit Log-Ausgabe

# ── n8n ───────────────────────────────────────────────────
cd /opt/n8n
docker compose up -d                      # Starten
docker compose down                       # Stoppen
docker compose logs -f --tail=100         # Logs anzeigen
docker compose restart                    # Neustart

# ── DuckDB CLI ────────────────────────────────────────────
cd /opt/etl-pipeline && source venv/bin/activate
python -c "
import duckdb
con = duckdb.connect('data/db/products.duckdb')
print(con.execute('SHOW TABLES').fetchall())
print(con.execute('SELECT COUNT(*) FROM products_raw').fetchone())
con.close()
"

# ── Logs prüfen ──────────────────────────────────────────
tail -f /opt/etl-pipeline/logs/etl_pipeline.log
tail -100 /opt/etl-pipeline/logs/error.log
cat /opt/etl-pipeline/logs/upload_errors/errors_$(date +%Y%m%d).json | jq .

# ── Disk-Usage ────────────────────────────────────────────
du -sh /opt/etl-pipeline/data/db/*
du -sh /opt/etl-pipeline/logs/*
df -h /opt/etl-pipeline/

⚠️ Hinweis: Beispiel-Befehlsreferenz. Alle Pfade und Befehle müssen an Ihre Serverumgebung angepasst werden.

B. Fehlerbehandlung (Häufige Probleme)

Problem Mögliche Ursache Lösung
FileNotFoundError: jtl_export.csv CSV nicht am erwarteten Pfad JTL-Export-Pfad in settings.yaml prüfen
google.auth.exceptions.DefaultCredentialsError Credentials nicht gefunden GOOGLE_APPLICATION_CREDENTIALS prüfen
HttpError 429: Too Many Requests API-Quote überschritten RATE_LIMIT_DELAY erhöhen (z. B. auf 23s)
MemoryError bei Polars RAM nicht ausreichend Polars Streaming-Modus oder RAM aufstocken
n8n: Timeout nach 600s Pipeline dauert zu lang Timeout im Execute Command auf 3600s erhöhen
DuckDB: IO Error Festplatte voll Archiv- und Temp-Dateien bereinigen
ModuleNotFoundError venv nicht aktiviert source venv/bin/activate vor Ausführung

Dokumentation erstellt am 2026-03-06 | Version 1.0.0 Letzte Aktualisierung: 2026-03-06