From 0e3b3b020378d1978854dcb4afd6ca38a66a6965 Mon Sep 17 00:00:00 2001 From: Administrator Date: Fri, 6 Mar 2026 11:09:11 +0000 Subject: [PATCH] docs: create IT_Abteilung/Automatisierung/Channable_stack --- .../Automatisierung/Channable_stack.md | 2847 +++++++++++++++++ 1 file changed, 2847 insertions(+) create mode 100644 IT_Abteilung/Automatisierung/Channable_stack.md diff --git a/IT_Abteilung/Automatisierung/Channable_stack.md b/IT_Abteilung/Automatisierung/Channable_stack.md new file mode 100644 index 0000000..5ec290a --- /dev/null +++ b/IT_Abteilung/Automatisierung/Channable_stack.md @@ -0,0 +1,2847 @@ +--- +title: Channable Stack LT24 Profice Dokumentation +description: +published: true +date: 2026-03-06T11:09:09.239Z +tags: +editor: markdown +dateCreated: 2026-03-06T11:09:09.239Z +--- + +# Self-Hosted ETL-Pipeline für Google Shopping + +## Lokale Alternative zu Channable — Technische Dokumentation + +--- + +| Eigenschaft | Wert | +|--------------------|------------------------------------------------| +| **Dokumentversion** | 1.0.0 | +| **Erstellt am** | 2026-03-06 | +| **Zielplattform** | Debian 12 (Bookworm) Server | +| **Tech-Stack** | Python 3.11+ · Polars · DuckDB · n8n (Docker) | +| **Datensatzgröße** | 200.000+ Artikel, 30+ Attribute pro Artikel | +| **Ziel-API** | Google Content API for Shopping | +| **Sprache** | Deutsch | + +--- + +## Inhaltsverzeichnis + +1. [Systemübersicht](#1-systemübersicht) +2. [Voraussetzungen](#2-voraussetzungen) +3. [Installation & Setup](#3-installation--setup) +4. [Projektstruktur](#4-projektstruktur) +5. [Konfiguration](#5-konfiguration) +6. [Core Script Logic (Beispiel)](#6-core-script-logic-beispiel) +7. [n8n Workflow Integration](#7-n8n-workflow-integration) +8. [Wartung & Monitoring](#8-wartung--monitoring) + +--- + +# 1. Systemübersicht + +## 1.1 Architektur-Überblick + +Die Pipeline ersetzt den kommerziellen SaaS-Dienst **Channable** durch eine vollständig selbst gehostete, Open-Source-basierte Lösung auf einem dedizierten Debian-Server. Der gesamte Datenfluss folgt dem klassischen **ETL-Muster** (Extract → Transform → Load) und wird täglich automatisiert durch **n8n** orchestriert. + +### Architektur-Diagramm + +```mermaid +graph TB + subgraph SERVER["🖥️ DEBIAN SERVER — Docker-Host"] + direction TB + + subgraph N8N["⚙️ n8n — Docker Container"] + CRON["⏰ Cron-Trigger
täglich 00:00 Uhr"] + ERR_HANDLER["🔔 Error-Handler
E-Mail / Slack Alert"] + end + + subgraph PYTHON["🐍 PYTHON 3.11+ — Virtual Environment"] + direction LR + + subgraph EXTRACT["📥 EXTRACT"] + JTL["📄 JTL-WAWI
CSV-Export"] + GAPI["☁️ Google Ads/GA
API — 90 Tage"] + end + + subgraph TRANSFORM["🔄 TRANSFORM"] + DUCKDB_JOIN["🦆 DuckDB
LEFT JOIN
Produkte ⟕ Sales"] + POLARS["⚡ Polars
Regel-Engine
rules.yaml"] + end + + subgraph LOAD["📤 LOAD"] + GCONTENT["🛒 Google
Content API
Batch-Upload
mit Rate-Limiting
"] + end + end + + DUCKDB_FILE[("💾 DuckDB
products.duckdb")] + LOGS["📋 Logs & Monitoring
/opt/etl-pipeline/logs/"] + end + + CRON -->|"startet Pipeline"| JTL + CRON -->|"startet Pipeline"| GAPI + JTL -->|"CSV Import"| DUCKDB_JOIN + GAPI -->|"Sales-Daten"| DUCKDB_JOIN + DUCKDB_JOIN -->|"DataFrame"| POLARS + POLARS -->|"transformierte Daten"| GCONTENT + DUCKDB_JOIN -.->|"persistiert"| DUCKDB_FILE + ERR_HANDLER -.->|"bei Exit-Code 1"| LOGS + + style SERVER fill:#1a1a2e,stroke:#16213e,color:#e0e0e0 + style N8N fill:#2d3436,stroke:#636e72,color:#dfe6e9 + style PYTHON fill:#2d3436,stroke:#636e72,color:#dfe6e9 + style EXTRACT fill:#0a3d62,stroke:#3c6382,color:#dfe6e9 + style TRANSFORM fill:#1e3799,stroke:#4a69bd,color:#dfe6e9 + style LOAD fill:#6a0572,stroke:#a834a8,color:#dfe6e9 + style JTL fill:#079992,stroke:#38ada9,color:#fff + style GAPI fill:#079992,stroke:#38ada9,color:#fff + style DUCKDB_JOIN fill:#0c2461,stroke:#1e3799,color:#fff + style POLARS fill:#0c2461,stroke:#1e3799,color:#fff + style GCONTENT fill:#6a0572,stroke:#a834a8,color:#fff + style DUCKDB_FILE fill:#b71540,stroke:#e55039,color:#fff + style LOGS fill:#474787,stroke:#706fd3,color:#fff + style CRON fill:#e58e26,stroke:#fa983a,color:#fff + style ERR_HANDLER fill:#e55039,stroke:#eb2f06,color:#fff +``` + +### ETL-Datenfluss (vereinfacht) + +```mermaid +flowchart LR + A["📄 JTL CSV
200k+ Artikel"] --> C["🦆 DuckDB
LEFT JOIN"] + B["☁️ Google API
Sales 90 Tage"] --> C + C --> D["⚡ Polars
rules.yaml
Custom Labels
Titel-Optimierung
Preis-Logik
Ausschlüsse
"] + D --> E["🛒 Google Content API
Batch-Upload
Rate-Limiting"] + + style A fill:#079992,stroke:#38ada9,color:#fff + style B fill:#079992,stroke:#38ada9,color:#fff + style C fill:#0c2461,stroke:#1e3799,color:#fff + style D fill:#1e3799,stroke:#4a69bd,color:#fff + style E fill:#6a0572,stroke:#a834a8,color:#fff +``` + +## 1.2 Datenfluss im Detail + +Der Datenfluss gliedert sich in drei klar voneinander getrennte Phasen: + +### Phase 1 — Extract (Datenextraktion) + +In der Extract-Phase werden die Rohdaten aus zwei unabhängigen Quellen bezogen: + +**Quelle A — JTL-WAWI CSV-Export:** Die ERP-Software JTL-WAWI exportiert den vollständigen Produktkatalog als CSV-Datei. Diese Datei enthält alle Stammdaten wie Artikelnummer, Titel, Beschreibung, Preis, EAN, Bestand, Kategorie, Bilder-URLs und weitere produktspezifische Attribute. Der Export wird automatisch durch einen JTL-Worker oder manuell über den JTL-Ameise-Export auf dem Server abgelegt. + +**Quelle B — Google Ads/Analytics Reporting API:** Über die Google API werden die Verkaufsperformance-Daten der letzten 90 Tage abgerufen. Dazu gehören Metriken wie Verkaufsanzahl pro Artikel (`sales_quantity`), Umsatz pro Artikel (`revenue`), Klicks, Impressionen und Conversion-Rate. Diese Daten dienen als Grundlage für die regelbasierte Klassifizierung (z. B. „Bestseller"-Labels). + +Beide Datenquellen werden als separate Tabellen in **DuckDB** importiert. + +### Phase 2 — Transform (Datentransformation) + +In der Transform-Phase werden die Daten verknüpft und angereichert: + +Zunächst wird in **DuckDB** ein `LEFT JOIN` zwischen der Produktstammdaten-Tabelle und der Verkaufsdaten-Tabelle durchgeführt. Die Verknüpfung erfolgt über eine gemeinsame Artikelnummer (z. B. `sku` oder `offer_id`). Das Ergebnis ist eine konsolidierte Tabelle, die sowohl Stamm- als auch Performance-Daten enthält. + +Der resultierende DataFrame wird anschließend in **Polars** geladen. Hier werden regelbasierte Transformationen auf Grundlage der Datei `rules.yaml` angewandt. Typische Transformationen umfassen: + +- Zuweisung von Custom Labels (`custom_label_0` bis `custom_label_4`) basierend auf Verkaufszahlen, Marge oder Kategorie. +- Preisanpassungen und Sale-Preis-Logik. +- Titeloptimierung (z. B. Hinzufügen von Marke oder Farbe). +- Filterung von Artikeln, die nicht auf Google Shopping erscheinen sollen (z. B. Bestand = 0). +- Anreicherung mit berechneten Feldern (z. B. `margin_percent`, `performance_tier`). + +### Phase 3 — Load (Daten-Upload) + +Die transformierten Produktdaten werden über die **Google Content API for Shopping** als Batch-Request an das Google Merchant Center übertragen. Der Upload-Prozess berücksichtigt dabei die API-Quotas und implementiert ein Rate-Limiting mit exponentiellem Backoff, um `429 Too Many Requests`-Fehler zu vermeiden. Fehlerhafte Einzelprodukte werden in einer separaten Fehler-Log-Datei protokolliert und blockieren nicht den gesamten Upload. + +--- + +# 2. Voraussetzungen + +## 2.1 Hardware-Anforderungen + +Die folgenden Spezifikationen gelten für die zuverlässige Verarbeitung von **200.000+ Artikeln** mit jeweils **30+ Attributen**. Die Werte sind konservativ kalkuliert und berücksichtigen die parallele Ausführung von n8n (Docker) und der Python-Pipeline. + +| Ressource | Minimum | Empfohlen | Begründung | +|-------------------|----------------------|------------------------|--------------------------------------------------------------| +| **CPU** | 4 Kerne (x86_64) | 8+ Kerne (x86_64) | Polars nutzt Multi-Threading automatisch aus | +| **RAM** | 8 GB | 16 GB | DuckDB + Polars halten Daten im Arbeitsspeicher | +| **Festplatte** | 40 GB SSD | 100 GB NVMe SSD | DuckDB-Dateien + CSV-Exporte + Logs + Docker-Images | +| **Netzwerk** | 100 Mbit/s | 1 Gbit/s | Google API Batch-Uploads erfordern stabile Bandbreite | +| **OS** | Debian 11 (Bullseye) | Debian 12 (Bookworm) | Aktuelle LTS-Basis mit langfristigem Sicherheits-Support | + +### Speicherberechnung (Orientierungswerte) + +Für eine Kalkulation mit 200.000 Zeilen × 30 Spalten (durchschnittlich ~100 Bytes pro Zelle): + +| Komponente | Geschätzter Bedarf | +|--------------------------------------|-------------------------| +| Rohdaten CSV auf Disk | ca. 550 – 650 MB | +| DuckDB komprimiert (auf Disk) | ca. 150 – 250 MB | +| Polars DataFrame im RAM | ca. 800 MB – 1,2 GB | +| Peak Memory (JOIN + Transformation) | ca. 2,5 – 4 GB | +| n8n Docker Container | ca. 300 – 500 MB RAM | +| **Gesamt Peak-RAM** | **ca. 4 – 6 GB** | + +> **Empfehlung:** Bei Datensätzen über 500.000 Artikeln sollte auf 32 GB RAM und Streaming-Verarbeitung (Polars Lazy-Mode) umgestellt werden. + +## 2.2 Erforderliche Debian-Pakete + +Die folgenden Pakete müssen auf dem Debian-Server installiert sein, bevor die Pipeline eingerichtet wird. + +```bash +# ────────────────────────────────────────────────────────── +# System-Updates durchführen +# ────────────────────────────────────────────────────────── +sudo apt update && sudo apt upgrade -y + +# ────────────────────────────────────────────────────────── +# Grundlegende Build-Tools und System-Abhängigkeiten +# ────────────────────────────────────────────────────────── +sudo apt install -y \ + build-essential \ + python3.11 \ + python3.11-venv \ + python3.11-dev \ + python3-pip \ + curl \ + wget \ + git \ + jq \ + unzip \ + ca-certificates \ + gnupg \ + lsb-release \ + cron \ + logrotate \ + htop \ + tree +``` + +> ⚠️ **Hinweis:** Dies sind Beispielbefehle. Falls Python 3.11 nicht in den Standard-Repositories Ihrer Debian-Version verfügbar ist, muss es über das `deadsnakes`-PPA, einen Quellcode-Build oder das Paket `python3` (in Debian 12 standardmäßig Python 3.11) installiert werden. Passen Sie die Versionsnummern an Ihre Umgebung an. + +## 2.3 Software-Voraussetzungen + +| Software | Mindestversion | Zweck | +|---------------------------------|----------------|---------------------------------------------| +| Python | 3.11+ | Hauptlaufzeit für ETL-Skripte | +| Docker Engine | 24.0+ | Container-Runtime für n8n | +| Docker Compose Plugin | 2.20+ | Multi-Container-Orchestrierung | +| n8n | 1.30+ | Workflow-Automatisierung und Scheduling | +| DuckDB (Python-Modul) | 0.10+ | Analytische In-Process-Datenbank | +| Polars (Python-Modul) | 0.20+ | Schnelle DataFrame-Transformationen | +| google-api-python-client | 2.100+ | Google API-Zugriff | +| google-auth / google-auth-oauthlib | 2.20+ | Authentifizierung für Google APIs | +| PyYAML | 6.0+ | Regel-Konfiguration im YAML-Format | + +--- + +# 3. Installation & Setup + +## 3.1 Docker und n8n einrichten + +### 3.1.1 Docker Engine installieren + +```bash +# ────────────────────────────────────────────────────────── +# Offizielle Docker GPG-Schlüssel hinzufügen +# ────────────────────────────────────────────────────────── +sudo install -m 0755 -d /etc/apt/keyrings + +curl -fsSL https://download.docker.com/linux/debian/gpg | \ + sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg + +sudo chmod a+r /etc/apt/keyrings/docker.gpg + +# ────────────────────────────────────────────────────────── +# Docker-Repository einrichten +# ────────────────────────────────────────────────────────── +echo \ + "deb [arch=$(dpkg --print-architecture) \ + signed-by=/etc/apt/keyrings/docker.gpg] \ + https://download.docker.com/linux/debian \ + $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \ + sudo tee /etc/apt/sources.list.d/docker.list > /dev/null + +# ────────────────────────────────────────────────────────── +# Docker Engine und Compose Plugin installieren +# ────────────────────────────────────────────────────────── +sudo apt update +sudo apt install -y \ + docker-ce \ + docker-ce-cli \ + containerd.io \ + docker-buildx-plugin \ + docker-compose-plugin + +# ────────────────────────────────────────────────────────── +# Docker ohne sudo ermöglichen (Relogin erforderlich) +# ────────────────────────────────────────────────────────── +sudo usermod -aG docker $USER +newgrp docker + +# ────────────────────────────────────────────────────────── +# Installation verifizieren +# ────────────────────────────────────────────────────────── +docker --version +docker compose version +``` + +> ⚠️ **Hinweis:** Dies sind Beispielbefehle basierend auf der offiziellen Docker-Dokumentation für Debian. Prüfen Sie stets die aktuelle Installationsanleitung unter [docs.docker.com](https://docs.docker.com/engine/install/debian/). + +### 3.1.2 n8n per Docker Compose bereitstellen + +Erstellen Sie das Verzeichnis und die Konfiguration: + +```bash +# ────────────────────────────────────────────────────────── +# Verzeichnisstruktur für n8n anlegen +# ────────────────────────────────────────────────────────── +sudo mkdir -p /opt/n8n/data +sudo chown -R $USER:$USER /opt/n8n +``` + +> ⚠️ **Hinweis:** Beispielbefehl. Passen Sie den Benutzer und die Berechtigungen an Ihre Server-Konfiguration an. + +Erstellen Sie die Datei `/opt/n8n/.env` mit den Zugangsdaten: + +```bash +# ────────────────────────────────────────────────────────── +# /opt/n8n/.env — Umgebungsvariablen für n8n +# ────────────────────────────────────────────────────────── +N8N_BASIC_AUTH_USER=admin +N8N_BASIC_AUTH_PASSWORD=IhrSicheresPasswortHier123! +N8N_ENCRYPTION_KEY=ein-langer-zufaelliger-encryption-key-hier +``` + +> ⚠️ **Hinweis:** Dies ist eine Beispiel-Konfiguration. Ersetzen Sie alle Platzhalter-Werte durch Ihre eigenen sicheren Passwörter und Schlüssel. Verwenden Sie `openssl rand -hex 32` zur Generierung sicherer Schlüssel. + +Erstellen Sie die Datei `/opt/n8n/docker-compose.yml`: + +```yaml +# ────────────────────────────────────────────────────────── +# /opt/n8n/docker-compose.yml — n8n Workflow-Engine +# ────────────────────────────────────────────────────────── +version: "3.8" + +services: + n8n: + image: n8nio/n8n:latest + container_name: n8n + restart: unless-stopped + ports: + - "5678:5678" + environment: + - N8N_BASIC_AUTH_ACTIVE=true + - N8N_BASIC_AUTH_USER=${N8N_BASIC_AUTH_USER} + - N8N_BASIC_AUTH_PASSWORD=${N8N_BASIC_AUTH_PASSWORD} + - N8N_ENCRYPTION_KEY=${N8N_ENCRYPTION_KEY} + - GENERIC_TIMEZONE=Europe/Berlin + - TZ=Europe/Berlin + - N8N_LOG_LEVEL=info + - N8N_LOG_OUTPUT=console,file + - N8N_LOG_FILE_LOCATION=/home/node/.n8n/logs/n8n.log + - N8N_DIAGNOSTICS_ENABLED=false + - N8N_HIRING_BANNER_ENABLED=false + volumes: + # n8n-Daten persistent speichern + - /opt/n8n/data:/home/node/.n8n + # Zugriff auf ETL-Pipeline-Verzeichnis (Read-Only für Skripte) + - /opt/etl-pipeline:/opt/etl-pipeline:ro + # Zugriff auf ETL-Logs (Read-Write für Monitoring) + - /opt/etl-pipeline/logs:/opt/etl-pipeline/logs:rw + healthcheck: + test: ["CMD-SHELL", "wget -qO- http://localhost:5678/healthz || exit 1"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 30s + deploy: + resources: + limits: + memory: 1G + cpus: "2.0" + reservations: + memory: 256M +``` + +> ⚠️ **Hinweis:** Dies ist eine Beispiel-Konfiguration. Die Volume-Pfade, Ressourcen-Limits und Ports müssen an Ihre Server-Umgebung angepasst werden. + +n8n starten und Status prüfen: + +```bash +# ────────────────────────────────────────────────────────── +# n8n Container starten +# ────────────────────────────────────────────────────────── +cd /opt/n8n +docker compose up -d + +# Status prüfen +docker compose ps +docker compose logs -f --tail=50 +``` + +> ⚠️ **Hinweis:** Beispielbefehle. Nach dem Start ist n8n unter `http://:5678` erreichbar. + +## 3.2 Python-Umgebung einrichten + +### 3.2.1 Virtual Environment erstellen + +```bash +# ────────────────────────────────────────────────────────── +# Projektverzeichnis anlegen +# ────────────────────────────────────────────────────────── +sudo mkdir -p /opt/etl-pipeline +sudo chown -R $USER:$USER /opt/etl-pipeline +cd /opt/etl-pipeline + +# ────────────────────────────────────────────────────────── +# Python Virtual Environment erstellen und aktivieren +# ────────────────────────────────────────────────────────── +python3.11 -m venv venv +source venv/bin/activate + +# ────────────────────────────────────────────────────────── +# pip aktualisieren +# ────────────────────────────────────────────────────────── +pip install --upgrade pip setuptools wheel +``` + +> ⚠️ **Hinweis:** Dies sind Beispielbefehle. Stellen Sie sicher, dass `python3.11` auf Ihrem System korrekt installiert ist (siehe Abschnitt 2.2). + +### 3.2.2 Abhängigkeiten installieren + +Erstellen Sie die Datei `/opt/etl-pipeline/requirements.txt`: + +```txt +# ────────────────────────────────────────────────────────── +# /opt/etl-pipeline/requirements.txt +# ETL-Pipeline Abhängigkeiten +# ────────────────────────────────────────────────────────── + +# DataFrame-Engine für schnelle Transformationen +polars>=0.20.0,<2.0.0 + +# Analytische In-Process SQL-Datenbank +duckdb>=0.10.0,<2.0.0 + +# Google API Client Libraries +google-api-python-client>=2.100.0 +google-auth>=2.20.0 +google-auth-oauthlib>=1.2.0 +google-auth-httplib2>=0.2.0 + +# Konfiguration +PyYAML>=6.0 + +# Utilities +requests>=2.31.0 +python-dateutil>=2.8.0 +tenacity>=8.2.0 +``` + +> ⚠️ **Hinweis:** Dies ist eine Beispiel-Datei. Prüfen Sie die aktuellen Versionen der Pakete und passen Sie die Versionsbeschränkungen an Ihre Kompatibilitätsanforderungen an. + +Abhängigkeiten installieren: + +```bash +# ────────────────────────────────────────────────────────── +# Abhängigkeiten in das Virtual Environment installieren +# ────────────────────────────────────────────────────────── +cd /opt/etl-pipeline +source venv/bin/activate +pip install -r requirements.txt + +# Installation verifizieren +python -c "import polars; print(f'Polars: {polars.__version__}')" +python -c "import duckdb; print(f'DuckDB: {duckdb.__version__}')" +python -c "import googleapiclient; print('Google API Client: OK')" +python -c "import yaml; print(f'PyYAML: {yaml.__version__}')" +``` + +> ⚠️ **Hinweis:** Beispielbefehle zur Verifikation. Die Ausgaben variieren je nach installierten Versionen. + +--- + +# 4. Projektstruktur + +## 4.1 Empfohlene Verzeichnisstruktur + +Die folgende Struktur organisiert alle Komponenten der Pipeline in logisch getrennte Verzeichnisse. Diese Trennung erleichtert Wartung, Backups und Zugriffssteuerung. + +``` +/opt/etl-pipeline/ +│ +├── venv/ # Python Virtual Environment +│ └── ... # (von venv automatisch verwaltet) +│ +├── requirements.txt # Python-Abhängigkeiten +│ +├── scripts/ # Alle Python-Skripte +│ ├── __init__.py +│ ├── main.py # Hauptskript (Entry Point) +│ ├── extract.py # Modul: Datenextraktion (CSV + Google API) +│ ├── transform.py # Modul: DuckDB JOIN + Polars Transformationen +│ ├── load.py # Modul: Google Content API Upload +│ ├── rules_engine.py # Modul: YAML-Regel-Engine +│ └── utils.py # Hilfsfunktionen (Logging, Config-Laden) +│ +├── config/ # Konfigurationsdateien +│ ├── rules.yaml # Channable-ähnliche Transformationsregeln +│ ├── settings.yaml # Allgemeine Pipeline-Einstellungen +│ └── column_mapping.yaml # Spalten-Mapping JTL → Google Shopping +│ +├── credentials/ # Sensible Zugangsdaten (restriktive Rechte!) +│ ├── google_service_account.json # Google Service Account Key +│ └── .gitignore # NIEMALS in Git committen! +│ +├── data/ # Datendateien +│ ├── input/ # Eingangsdaten +│ │ └── jtl_export.csv # Aktueller JTL-WAWI CSV-Export +│ ├── output/ # Verarbeitete Ausgabedaten +│ │ └── google_feed_YYYYMMDD.csv +│ ├── archive/ # Archivierte ältere Exporte +│ │ └── jtl_export_20260301.csv +│ └── db/ # DuckDB-Datenbankdateien +│ └── products.duckdb # Persistente DuckDB-Datenbank +│ +├── logs/ # Log-Dateien +│ ├── etl_pipeline.log # Haupt-Log (rotating) +│ ├── etl_pipeline.log.1 # Rotiertes Log +│ ├── error.log # Nur Fehler-Einträge +│ └── upload_errors/ # Detaillierte Upload-Fehler pro Lauf +│ └── errors_20260306.json +│ +├── tests/ # Unit- und Integrationstests +│ ├── __init__.py +│ ├── test_extract.py +│ ├── test_transform.py +│ └── test_rules_engine.py +│ +└── docs/ # Zusätzliche Projektdokumentation + └── CHANGELOG.md +``` + +> ⚠️ **Hinweis:** Dies ist eine Beispiel-Struktur. Passen Sie die Verzeichnisstruktur an die spezifischen Anforderungen Ihres Projekts und Ihrer Organisation an. + +## 4.2 Verzeichnisstruktur anlegen + +```bash +# ────────────────────────────────────────────────────────── +# Vollständige Verzeichnisstruktur anlegen +# ────────────────────────────────────────────────────────── +cd /opt/etl-pipeline + +mkdir -p scripts +mkdir -p config +mkdir -p credentials +mkdir -p data/{input,output,archive,db} +mkdir -p logs/upload_errors +mkdir -p tests +mkdir -p docs + +# ────────────────────────────────────────────────────────── +# Berechtigungen für sensible Verzeichnisse setzen +# ────────────────────────────────────────────────────────── +chmod 700 credentials/ +chmod 750 config/ +chmod 755 logs/ + +# ────────────────────────────────────────────────────────── +# .gitignore für credentials anlegen +# ────────────────────────────────────────────────────────── +echo "*" > credentials/.gitignore +echo "!.gitignore" >> credentials/.gitignore + +# ────────────────────────────────────────────────────────── +# Python-Modul-Initialisierung +# ────────────────────────────────────────────────────────── +touch scripts/__init__.py +touch tests/__init__.py +``` + +> ⚠️ **Hinweis:** Beispielbefehle. Stellen Sie sicher, dass der Benutzer, unter dem die Pipeline läuft, die entsprechenden Lese- und Schreibrechte auf allen Verzeichnissen besitzt. + +--- + +# 5. Konfiguration + +## 5.1 Google API Credentials sicher speichern + +### 5.1.1 Service Account erstellen + +Für die Kommunikation mit der Google Content API for Shopping wird ein **Service Account** empfohlen. Dieser ermöglicht eine automatisierte Authentifizierung ohne manuellen OAuth2-Flow. + +**Schritte in der Google Cloud Console:** + +1. Navigieren Sie zur [Google Cloud Console](https://console.cloud.google.com/). +2. Erstellen Sie ein neues Projekt oder wählen Sie ein bestehendes aus. +3. Aktivieren Sie die folgenden APIs: + - *Content API for Shopping* + - *Google Ads API* (falls Verkaufsdaten direkt aus Google Ads bezogen werden) + - *Google Analytics Data API* (falls GA4-Daten verwendet werden) +4. Navigieren Sie zu **IAM & Verwaltung → Dienstkonten**. +5. Erstellen Sie einen neuen Service Account mit einem aussagekräftigen Namen. +6. Laden Sie den JSON-Key herunter. +7. Fügen Sie den Service Account als Nutzer in Ihrem **Google Merchant Center** hinzu (mit der Rolle „Standard" oder „Admin"). + +### 5.1.2 Credentials sicher ablegen + +```bash +# ────────────────────────────────────────────────────────── +# Service Account JSON-Key sicher auf dem Server ablegen +# ────────────────────────────────────────────────────────── + +# Datei in das credentials-Verzeichnis kopieren +cp /pfad/zur/heruntergeladenen/datei.json \ + /opt/etl-pipeline/credentials/google_service_account.json + +# Restriktive Berechtigungen setzen (nur Owner lesen) +chmod 600 /opt/etl-pipeline/credentials/google_service_account.json + +# Eigentümerschaft prüfen +ls -la /opt/etl-pipeline/credentials/ +``` + +> ⚠️ **Hinweis:** Beispielbefehle. Ersetzen Sie den Pfad durch den tatsächlichen Speicherort Ihrer heruntergeladenen JSON-Datei. + +### 5.1.3 Credentials als Umgebungsvariable referenzieren (empfohlen) + +Es ist empfehlenswert, den Pfad zur Credentials-Datei nicht hart im Skript zu kodieren, sondern über eine Umgebungsvariable bereitzustellen. + +```bash +# ────────────────────────────────────────────────────────── +# In /opt/etl-pipeline/.env oder ~/.bashrc hinzufügen: +# ────────────────────────────────────────────────────────── +export GOOGLE_APPLICATION_CREDENTIALS="/opt/etl-pipeline/credentials/google_service_account.json" +export MERCHANT_CENTER_ID="123456789" +``` + +> ⚠️ **Hinweis:** Beispielkonfiguration. Ersetzen Sie die `MERCHANT_CENTER_ID` durch Ihre tatsächliche Merchant-Center-ID. + +### 5.1.4 Sicherheitshinweise + +- Die JSON-Datei darf **niemals** in ein Git-Repository committed werden. +- Der Zugriff auf das `credentials/`-Verzeichnis sollte auf den Pipeline-Benutzer beschränkt sein (`chmod 700`). +- Rotieren Sie Service Account Keys regelmäßig (mindestens alle 90 Tage). +- Verwenden Sie nach Möglichkeit **Workload Identity Federation** anstelle von heruntergeladenen JSON-Keys. +- Protokollieren Sie niemals den Inhalt der Credentials-Datei in Log-Dateien. + +## 5.2 Regel-Konfiguration (rules.yaml) + +Die Datei `rules.yaml` ist das Herzstück der Channable-ähnlichen Funktionalität. Sie definiert regelbasierte Transformationen, die auf jeden Artikel angewandt werden. Die Regeln werden in der definierten Reihenfolge sequenziell abgearbeitet. + +### 5.2.1 Struktur und Syntax + +Erstellen Sie die Datei `/opt/etl-pipeline/config/rules.yaml`: + +```yaml +# ══════════════════════════════════════════════════════════ +# /opt/etl-pipeline/config/rules.yaml +# +# Channable-ähnliche Regel-Engine für Google Shopping +# Regeln werden sequenziell von oben nach unten abgearbeitet. +# +# Unterstützte Operatoren: +# Vergleich: >, <, >=, <=, ==, != +# Text: contains, not_contains, starts_with, ends_with +# Logik: and, or (innerhalb von conditions) +# Existenz: is_empty, is_not_empty +# +# Unterstützte Aktionen: +# set_value: Festes Wert setzen +# copy_field: Wert aus anderem Feld kopieren +# prepend: Text am Anfang hinzufügen +# append: Text am Ende hinzufügen +# replace: Text ersetzen +# calculate: Berechnung durchführen +# exclude: Artikel vom Feed ausschließen +# map_category: Kategorie-Mapping anwenden +# ══════════════════════════════════════════════════════════ + +# ────────────────────────────────────────────────────────── +# Globale Einstellungen +# ────────────────────────────────────────────────────────── +settings: + feed_name: "Google Shopping DE" + target_country: "DE" + content_language: "de" + currency: "EUR" + # Artikel mit Bestand 0 automatisch ausschließen + exclude_out_of_stock: true + # Mindestpreis für den Feed (in EUR) + minimum_price: 1.00 + +# ────────────────────────────────────────────────────────── +# Regel-Definitionen +# ────────────────────────────────────────────────────────── +rules: + + # ──────────────────────────────────────────────────────── + # REGEL 1: Bestseller-Label basierend auf Verkaufszahlen + # ──────────────────────────────────────────────────────── + - name: "Bestseller-Klassifizierung" + description: "Artikel mit mehr als 100 Verkäufen in 90 Tagen als Bestseller markieren" + enabled: true + priority: 10 + conditions: + logic: "and" + rules: + - field: "sales_quantity_90d" + operator: ">" + value: 100 + - field: "stock_quantity" + operator: ">" + value: 0 + actions: + - target_field: "custom_label_0" + action: "set_value" + value: "Bestseller" + + # ──────────────────────────────────────────────────────── + # REGEL 2: Slow Mover identifizieren + # ──────────────────────────────────────────────────────── + - name: "Slow-Mover-Klassifizierung" + description: "Artikel mit weniger als 5 Verkäufen in 90 Tagen als Slow Mover markieren" + enabled: true + priority: 20 + conditions: + logic: "and" + rules: + - field: "sales_quantity_90d" + operator: "<" + value: 5 + - field: "days_in_catalog" + operator: ">" + value: 30 + actions: + - target_field: "custom_label_0" + action: "set_value" + value: "Slow Mover" + + # ──────────────────────────────────────────────────────── + # REGEL 3: Performance-Tier (custom_label_1) + # ──────────────────────────────────────────────────────── + - name: "Performance-Tier High" + description: "Umsatzbasierte Tier-Einteilung für Bidding-Strategien" + enabled: true + priority: 30 + conditions: + logic: "and" + rules: + - field: "revenue_90d" + operator: ">=" + value: 1000 + actions: + - target_field: "custom_label_1" + action: "set_value" + value: "Tier-1-High-Revenue" + + - name: "Performance-Tier Mittel" + enabled: true + priority: 31 + conditions: + logic: "and" + rules: + - field: "revenue_90d" + operator: ">=" + value: 200 + - field: "revenue_90d" + operator: "<" + value: 1000 + actions: + - target_field: "custom_label_1" + action: "set_value" + value: "Tier-2-Mid-Revenue" + + - name: "Performance-Tier Niedrig" + enabled: true + priority: 32 + conditions: + logic: "and" + rules: + - field: "revenue_90d" + operator: "<" + value: 200 + actions: + - target_field: "custom_label_1" + action: "set_value" + value: "Tier-3-Low-Revenue" + + # ──────────────────────────────────────────────────────── + # REGEL 4: Margen-Klassifizierung (custom_label_2) + # ──────────────────────────────────────────────────────── + - name: "Hohe Marge" + enabled: true + priority: 40 + conditions: + logic: "and" + rules: + - field: "margin_percent" + operator: ">=" + value: 40 + actions: + - target_field: "custom_label_2" + action: "set_value" + value: "High-Margin" + + - name: "Niedrige Marge" + enabled: true + priority: 41 + conditions: + logic: "and" + rules: + - field: "margin_percent" + operator: "<" + value: 15 + actions: + - target_field: "custom_label_2" + action: "set_value" + value: "Low-Margin" + + # ──────────────────────────────────────────────────────── + # REGEL 5: Preiskategorie (custom_label_3) + # ──────────────────────────────────────────────────────── + - name: "Premium-Preis" + enabled: true + priority: 50 + conditions: + logic: "and" + rules: + - field: "price" + operator: ">=" + value: 100 + actions: + - target_field: "custom_label_3" + action: "set_value" + value: "Premium" + + - name: "Budget-Preis" + enabled: true + priority: 51 + conditions: + logic: "and" + rules: + - field: "price" + operator: "<" + value: 20 + actions: + - target_field: "custom_label_3" + action: "set_value" + value: "Budget" + + # ──────────────────────────────────────────────────────── + # REGEL 6: Titel-Optimierung + # ──────────────────────────────────────────────────────── + - name: "Marke im Titel voranstellen" + description: "Markenname am Anfang des Titels hinzufügen, falls nicht vorhanden" + enabled: true + priority: 60 + conditions: + logic: "and" + rules: + - field: "brand" + operator: "is_not_empty" + - field: "title" + operator: "not_contains" + value_from_field: "brand" + actions: + - target_field: "title" + action: "prepend" + value_template: "{brand} - " + + # ──────────────────────────────────────────────────────── + # REGEL 7: Sale-Preis-Logik + # ──────────────────────────────────────────────────────── + - name: "Sale-Preis setzen" + description: "Wenn ein UVP vorhanden und höher als der VK-Preis ist, als Sale kennzeichnen" + enabled: true + priority: 70 + conditions: + logic: "and" + rules: + - field: "uvp" + operator: "is_not_empty" + - field: "uvp" + operator: ">" + value_from_field: "price" + actions: + - target_field: "sale_price" + action: "copy_field" + source_field: "price" + - target_field: "price" + action: "copy_field" + source_field: "uvp" + + # ──────────────────────────────────────────────────────── + # REGEL 8: Ausschluss-Regeln + # ──────────────────────────────────────────────────────── + - name: "Artikel ohne Bild ausschließen" + enabled: true + priority: 80 + conditions: + logic: "or" + rules: + - field: "image_url" + operator: "is_empty" + - field: "image_url" + operator: "==" + value: "" + actions: + - action: "exclude" + reason: "Kein Produktbild vorhanden" + + - name: "Testprodukte ausschließen" + enabled: true + priority: 81 + conditions: + logic: "or" + rules: + - field: "title" + operator: "contains" + value: "TEST" + - field: "sku" + operator: "starts_with" + value: "ZZ-" + actions: + - action: "exclude" + reason: "Testprodukt erkannt" + + # ──────────────────────────────────────────────────────── + # REGEL 9: Versandkosten basierend auf Preis + # ──────────────────────────────────────────────────────── + - name: "Kostenloser Versand ab 49 EUR" + enabled: true + priority: 90 + conditions: + logic: "and" + rules: + - field: "price" + operator: ">=" + value: 49 + actions: + - target_field: "shipping_cost" + action: "set_value" + value: "0.00" + + - name: "Standard-Versandkosten" + enabled: true + priority: 91 + conditions: + logic: "and" + rules: + - field: "price" + operator: "<" + value: 49 + actions: + - target_field: "shipping_cost" + action: "set_value" + value: "4.95" +``` + +> ⚠️ **Hinweis:** Dies ist eine Beispiel-Konfigurationsdatei. Alle Schwellenwerte, Feldnamen und Regeln müssen an Ihre tatsächlichen Geschäftsanforderungen, JTL-Exportfelder und Google-Shopping-Attribute angepasst werden. + +### 5.2.2 Spalten-Mapping (column_mapping.yaml) + +Erstellen Sie zusätzlich die Datei `/opt/etl-pipeline/config/column_mapping.yaml`: + +```yaml +# ══════════════════════════════════════════════════════════ +# /opt/etl-pipeline/config/column_mapping.yaml +# +# Mapping: JTL-WAWI Exportfeld → Google Shopping Attribut +# ══════════════════════════════════════════════════════════ + +mapping: + # Pflichtfelder Google Shopping + Artikelnummer: "offerId" + Artikelname: "title" + Beschreibung: "description" + URL: "link" + BildURL: "imageLink" + VK_Brutto: "price" + Waehrung: "priceCurrency" + Verfuegbarkeit: "availability" + Marke: "brand" + GTIN: "gtin" + MPN: "mpn" + Zustand: "condition" + Kategorie_Google: "googleProductCategory" + + # Optionale Felder + Farbe: "color" + Groesse: "size" + Material: "material" + Geschlecht: "gender" + Altersgruppe: "ageGroup" + EK_Netto: "cost_of_goods_sold" + UVP: "uvp" + Lagerbestand: "stock_quantity" + + # Berechnete Felder (werden durch Pipeline erzeugt) + # sales_quantity_90d → aus Google API + # revenue_90d → aus Google API + # margin_percent → berechnet aus VK und EK + # custom_label_0-4 → aus rules.yaml +``` + +> ⚠️ **Hinweis:** Dies ist eine Beispiel-Zuordnung. Die tatsächlichen Spaltennamen in Ihrem JTL-WAWI-Export können abweichen. Prüfen Sie die Header-Zeile Ihrer CSV-Datei und passen Sie das Mapping entsprechend an. + +--- + +# 6. Core Script Logic (Beispiel) + +Dieses Kapitel zeigt den konzeptionellen Aufbau der Python-Skripte. Der Code ist als **funktionales Boilerplate** zu verstehen, das als Grundlage für die eigene Implementierung dient. + +## 6.1 Hilfsfunktionen (utils.py) + +```python +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +/opt/etl-pipeline/scripts/utils.py + +Hilfsfunktionen für Logging, Konfiguration und gemeinsam genutzte Utilities. +""" + +import os +import sys +import yaml +import logging +from pathlib import Path +from datetime import datetime +from logging.handlers import RotatingFileHandler + + +# ────────────────────────────────────────────────────────── +# Pfad-Konstanten +# ────────────────────────────────────────────────────────── +BASE_DIR = Path("/opt/etl-pipeline") +CONFIG_DIR = BASE_DIR / "config" +DATA_DIR = BASE_DIR / "data" +LOG_DIR = BASE_DIR / "logs" +CREDENTIALS_DIR = BASE_DIR / "credentials" + + +def setup_logging( + log_name: str = "etl_pipeline", + log_level: int = logging.INFO, + max_bytes: int = 10 * 1024 * 1024, # 10 MB + backup_count: int = 5, +) -> logging.Logger: + """ + Konfiguriert das Logging mit Rotation und separatem Error-Log. + + Args: + log_name: Name des Loggers und der Log-Datei. + log_level: Logging-Level (default: INFO). + max_bytes: Maximale Größe einer Log-Datei vor Rotation. + backup_count: Anzahl der aufzubewahrenden rotierten Log-Dateien. + + Returns: + Konfigurierter Logger. + """ + LOG_DIR.mkdir(parents=True, exist_ok=True) + + logger = logging.getLogger(log_name) + logger.setLevel(log_level) + + # Verhindere doppelte Handler bei mehrfachem Aufruf + if logger.handlers: + return logger + + # Formatter + formatter = logging.Formatter( + fmt="%(asctime)s | %(levelname)-8s | %(name)s | %(funcName)s:%(lineno)d | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + # Haupt-Log (Rotation) + main_handler = RotatingFileHandler( + LOG_DIR / f"{log_name}.log", + maxBytes=max_bytes, + backupCount=backup_count, + encoding="utf-8", + ) + main_handler.setLevel(log_level) + main_handler.setFormatter(formatter) + + # Error-Log (nur ERROR und höher) + error_handler = RotatingFileHandler( + LOG_DIR / "error.log", + maxBytes=max_bytes, + backupCount=backup_count, + encoding="utf-8", + ) + error_handler.setLevel(logging.ERROR) + error_handler.setFormatter(formatter) + + # Console-Handler + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setLevel(log_level) + console_handler.setFormatter(formatter) + + logger.addHandler(main_handler) + logger.addHandler(error_handler) + logger.addHandler(console_handler) + + return logger + + +def load_yaml_config(config_filename: str) -> dict: + """ + Lädt eine YAML-Konfigurationsdatei aus dem config-Verzeichnis. + + Args: + config_filename: Name der YAML-Datei (z. B. 'rules.yaml'). + + Returns: + Dictionary mit dem Inhalt der YAML-Datei. + + Raises: + FileNotFoundError: Wenn die Datei nicht existiert. + yaml.YAMLError: Wenn die YAML-Syntax ungültig ist. + """ + config_path = CONFIG_DIR / config_filename + if not config_path.exists(): + raise FileNotFoundError( + f"Konfigurationsdatei nicht gefunden: {config_path}" + ) + + with open(config_path, "r", encoding="utf-8") as f: + config = yaml.safe_load(f) + + return config + + +def get_timestamp() -> str: + """Gibt einen formatierten Zeitstempel zurück (YYYYMMDD_HHMMSS).""" + return datetime.now().strftime("%Y%m%d_%H%M%S") + + +def get_date_stamp() -> str: + """Gibt einen formatierten Datumsstempel zurück (YYYYMMDD).""" + return datetime.now().strftime("%Y%m%d") +``` + +> ⚠️ **Hinweis:** Dies ist ein Beispiel-Skript. Alle Pfade, Log-Konfigurationen und Funktionen müssen an Ihre tatsächliche Serverumgebung und Anforderungen angepasst werden. + +## 6.2 Extract-Modul (extract.py) + +```python +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +/opt/etl-pipeline/scripts/extract.py + +Modul für die Datenextraktion aus JTL-WAWI CSV und Google APIs. +Beide Datenquellen werden als Tabellen in DuckDB importiert. +""" + +import os +import shutil +import duckdb +import polars as pl +from pathlib import Path +from datetime import datetime, timedelta + +from google.oauth2 import service_account +from googleapiclient.discovery import build + +from scripts.utils import ( + setup_logging, + BASE_DIR, + DATA_DIR, + CREDENTIALS_DIR, + get_date_stamp, +) + +logger = setup_logging("extract") + + +# ══════════════════════════════════════════════════════════ +# JTL-WAWI CSV-Import +# ══════════════════════════════════════════════════════════ + +def load_jtl_csv_to_duckdb( + con: duckdb.DuckDBPyConnection, + csv_path: str | Path | None = None, + table_name: str = "products_raw", +) -> int: + """ + Liest die JTL-WAWI CSV-Exportdatei und importiert sie als Tabelle + in die DuckDB-Datenbank. + + Args: + con: Aktive DuckDB-Verbindung. + csv_path: Pfad zur CSV-Datei. Falls None, wird der Standardpfad verwendet. + table_name: Name der Zieltabelle in DuckDB. + + Returns: + Anzahl der importierten Zeilen. + """ + if csv_path is None: + csv_path = DATA_DIR / "input" / "jtl_export.csv" + + csv_path = Path(csv_path) + if not csv_path.exists(): + raise FileNotFoundError(f"JTL-CSV nicht gefunden: {csv_path}") + + file_size_mb = csv_path.stat().st_size / (1024 * 1024) + logger.info(f"Lade JTL-CSV: {csv_path} ({file_size_mb:.1f} MB)") + + # Bestehende Tabelle löschen und neu erstellen + con.execute(f"DROP TABLE IF EXISTS {table_name}") + + # CSV mit DuckDB's optimiertem CSV-Reader importieren + # auto_detect=true erkennt Trennzeichen, Encoding und Datentypen + con.execute(f""" + CREATE TABLE {table_name} AS + SELECT * + FROM read_csv_auto( + '{csv_path}', + header = true, + delim = ';', + quote = '"', + escape = '"', + encoding = 'utf-8', + sample_size = 10000, + ignore_errors = false + ) + """) + + row_count = con.execute( + f"SELECT COUNT(*) FROM {table_name}" + ).fetchone()[0] + + logger.info( + f"JTL-CSV erfolgreich geladen: " + f"{row_count:,} Zeilen in Tabelle '{table_name}'" + ) + + # Archivierung des Imports + archive_path = DATA_DIR / "archive" / f"jtl_export_{get_date_stamp()}.csv" + if not archive_path.exists(): + shutil.copy2(csv_path, archive_path) + logger.info(f"CSV archiviert: {archive_path}") + + return row_count + + +# ══════════════════════════════════════════════════════════ +# Google Sales-Daten abrufen +# ══════════════════════════════════════════════════════════ + +def fetch_google_sales_data( + con: duckdb.DuckDBPyConnection, + table_name: str = "sales_90d", + lookback_days: int = 90, +) -> int: + """ + Ruft Verkaufsdaten der letzten N Tage über die Google API ab + und importiert sie als Tabelle in DuckDB. + + Args: + con: Aktive DuckDB-Verbindung. + table_name: Name der Zieltabelle in DuckDB. + lookback_days: Anzahl der zurückliegenden Tage (Standard: 90). + + Returns: + Anzahl der importierten Zeilen. + """ + logger.info( + f"Rufe Google Sales-Daten ab (letzte {lookback_days} Tage)..." + ) + + # ── Google API Authentifizierung ────────────────────── + credentials_path = os.environ.get( + "GOOGLE_APPLICATION_CREDENTIALS", + str(CREDENTIALS_DIR / "google_service_account.json"), + ) + merchant_id = os.environ.get("MERCHANT_CENTER_ID") + + if not merchant_id: + raise ValueError( + "Umgebungsvariable MERCHANT_CENTER_ID ist nicht gesetzt" + ) + + credentials = service_account.Credentials.from_service_account_file( + credentials_path, + scopes=["https://www.googleapis.com/auth/content"], + ) + + service = build("content", "v2.1", credentials=credentials) + + # ── Zeitraum berechnen ──────────────────────────────── + end_date = datetime.now() + start_date = end_date - timedelta(days=lookback_days) + + # ── Performance-Daten aus dem Merchant Center abrufen ─ + # HINWEIS: Der tatsächliche API-Aufruf hängt von der + # verwendeten Google API ab (Merchant Center Reports, + # Google Ads API, GA4 Data API, etc.) + # Hier wird ein konzeptionelles Beispiel gezeigt. + + sales_records = [] + + try: + # Beispiel: Merchant Center Performance Report + request_body = { + "query": f""" + SELECT + segments.offer_id, + metrics.impressions, + metrics.clicks, + metrics.conversions, + metrics.conversion_value + FROM MerchantPerformanceView + WHERE segments.date BETWEEN + '{start_date.strftime('%Y-%m-%d')}' + AND '{end_date.strftime('%Y-%m-%d')}' + """ + } + + response = ( + service.reports() + .search(merchantId=merchant_id, body=request_body) + .execute() + ) + + for row in response.get("results", []): + segments = row.get("segments", {}) + metrics = row.get("metrics", {}) + sales_records.append({ + "offer_id": segments.get("offerId", ""), + "impressions": int(metrics.get("impressions", 0)), + "clicks": int(metrics.get("clicks", 0)), + "sales_quantity_90d": int( + metrics.get("conversions", 0) + ), + "revenue_90d": float( + metrics.get("conversionValue", 0.0) + ), + }) + + except Exception as e: + logger.error( + f"Fehler beim Abrufen der Google Sales-Daten: {e}" + ) + raise + + # ── Daten in DuckDB importieren ─────────────────────── + if sales_records: + df_sales = pl.DataFrame(sales_records) + con.execute(f"DROP TABLE IF EXISTS {table_name}") + con.execute( + f"CREATE TABLE {table_name} AS SELECT * FROM df_sales" + ) + row_count = con.execute( + f"SELECT COUNT(*) FROM {table_name}" + ).fetchone()[0] + logger.info( + f"Google Sales-Daten geladen: " + f"{row_count:,} Zeilen in '{table_name}'" + ) + return row_count + else: + logger.warning( + "Keine Sales-Daten von der Google API erhalten." + ) + # Leere Tabelle erstellen, damit der JOIN nicht fehlschlägt + con.execute(f"DROP TABLE IF EXISTS {table_name}") + con.execute(f""" + CREATE TABLE {table_name} ( + offer_id VARCHAR, + impressions INTEGER DEFAULT 0, + clicks INTEGER DEFAULT 0, + sales_quantity_90d INTEGER DEFAULT 0, + revenue_90d DOUBLE DEFAULT 0.0 + ) + """) + return 0 +``` + +> ⚠️ **Hinweis:** Dies ist ein konzeptionelles Beispiel-Skript. Die Google API-Aufrufe, insbesondere der Report-Query und die Antwortstruktur, müssen an die tatsächlich verwendete API-Version und Ihr Merchant-Center-Setup angepasst werden. Konsultieren Sie die offizielle Google API-Dokumentation für die aktuellen Endpunkte und Antwortformate. + +## 6.3 Transform-Modul (transform.py) + +```python +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +/opt/etl-pipeline/scripts/transform.py + +Modul für die Datentransformation: +1. LEFT JOIN in DuckDB (Produkte ⟕ Sales) +2. Polars-Transformationen basierend auf rules.yaml +""" + +import duckdb +import polars as pl +from typing import Any + +from scripts.utils import setup_logging, load_yaml_config + +logger = setup_logging("transform") + + +# ══════════════════════════════════════════════════════════ +# DuckDB LEFT JOIN +# ══════════════════════════════════════════════════════════ + +def join_products_with_sales( + con: duckdb.DuckDBPyConnection, + products_table: str = "products_raw", + sales_table: str = "sales_90d", + joined_table: str = "products_enriched", + join_key_products: str = "Artikelnummer", + join_key_sales: str = "offer_id", +) -> pl.DataFrame: + """ + Führt einen LEFT JOIN zwischen Produktstammdaten und + Verkaufsdaten in DuckDB durch und gibt das Ergebnis als + Polars DataFrame zurück. + + Ein LEFT JOIN stellt sicher, dass alle Produkte erhalten + bleiben, auch wenn keine Verkaufsdaten vorliegen + (NULL-Werte werden mit 0 gefüllt). + + Args: + con: Aktive DuckDB-Verbindung. + products_table: Name der Produkttabelle. + sales_table: Name der Verkaufstabelle. + joined_table: Name der Ergebnistabelle. + join_key_products: JOIN-Schlüssel in der Produkttabelle. + join_key_sales: JOIN-Schlüssel in der Verkaufstabelle. + + Returns: + Polars DataFrame mit den verknüpften Daten. + """ + logger.info( + f"Führe LEFT JOIN durch: {products_table} ⟕ {sales_table} " + f"ON {join_key_products} = {join_key_sales}" + ) + + # ── LEFT JOIN mit COALESCE für NULL-Behandlung ──────── + con.execute(f"DROP TABLE IF EXISTS {joined_table}") + con.execute(f""" + CREATE TABLE {joined_table} AS + SELECT + p.*, + COALESCE(s.impressions, 0) + AS impressions, + COALESCE(s.clicks, 0) + AS clicks, + COALESCE(s.sales_quantity_90d, 0) + AS sales_quantity_90d, + COALESCE(s.revenue_90d, 0.0) + AS revenue_90d + FROM {products_table} AS p + LEFT JOIN {sales_table} AS s + ON CAST(p."{join_key_products}" AS VARCHAR) + = CAST(s.{join_key_sales} AS VARCHAR) + """) + + row_count = con.execute( + f"SELECT COUNT(*) FROM {joined_table}" + ).fetchone()[0] + logger.info( + f"LEFT JOIN abgeschlossen: " + f"{row_count:,} Zeilen in '{joined_table}'" + ) + + # ── Ergebnis als Polars DataFrame exportieren ───────── + result = con.execute(f"SELECT * FROM {joined_table}").pl() + logger.info( + f"Polars DataFrame erstellt: " + f"{result.shape[0]:,} Zeilen, {result.shape[1]} Spalten" + ) + + return result + + +# ══════════════════════════════════════════════════════════ +# Berechnete Felder hinzufügen +# ══════════════════════════════════════════════════════════ + +def add_calculated_fields(df: pl.DataFrame) -> pl.DataFrame: + """ + Fügt berechnete Felder hinzu, die als Grundlage für die + Regel-Engine dienen. + + Args: + df: Polars DataFrame mit den verknüpften Rohdaten. + + Returns: + Polars DataFrame mit zusätzlichen berechneten Spalten. + """ + logger.info("Füge berechnete Felder hinzu...") + + df = df.with_columns([ + # Marge in Prozent (wenn EK vorhanden) + pl.when( + (pl.col("EK_Netto").is_not_null()) + & (pl.col("VK_Brutto") > 0) + ) + .then( + ( + (pl.col("VK_Brutto") - pl.col("EK_Netto")) + / pl.col("VK_Brutto") + * 100 + ).round(2) + ) + .otherwise(pl.lit(None)) + .alias("margin_percent"), + + # Click-Through-Rate + pl.when(pl.col("impressions") > 0) + .then( + (pl.col("clicks") / pl.col("impressions") * 100) + .round(2) + ) + .otherwise(pl.lit(0.0)) + .alias("ctr_percent"), + + # Conversion-Rate + pl.when(pl.col("clicks") > 0) + .then( + ( + pl.col("sales_quantity_90d") + / pl.col("clicks") + * 100 + ).round(2) + ) + .otherwise(pl.lit(0.0)) + .alias("conversion_rate_percent"), + + # Custom Label Spalten initialisieren (leer) + pl.lit("").alias("custom_label_0"), + pl.lit("").alias("custom_label_1"), + pl.lit("").alias("custom_label_2"), + pl.lit("").alias("custom_label_3"), + pl.lit("").alias("custom_label_4"), + + # Versandkosten initialisieren + pl.lit("4.95").alias("shipping_cost"), + + # Exclude-Flag initialisieren + pl.lit(False).alias("_excluded"), + pl.lit("").alias("_exclude_reason"), + ]) + + logger.info( + f"Berechnete Felder hinzugefügt. " + f"DataFrame: {df.shape[1]} Spalten" + ) + return df + + +# ══════════════════════════════════════════════════════════ +# YAML Regel-Engine +# ══════════════════════════════════════════════════════════ + +def evaluate_condition( + df: pl.DataFrame, + condition: dict, +) -> pl.Series: + """ + Evaluiert eine einzelne Bedingung und gibt eine + boolesche Maske zurück. + + Args: + df: Polars DataFrame. + condition: Dictionary mit field, operator, value. + + Returns: + Polars Series (Boolean) als Filtermaske. + """ + field = condition["field"] + operator = condition["operator"] + value = condition.get("value") + value_from_field = condition.get("value_from_field") + + # Prüfen ob das Feld existiert + if field not in df.columns: + logger.warning( + f"Feld '{field}' existiert nicht im DataFrame. " + f"Bedingung übersprungen." + ) + return pl.Series("mask", [False] * df.height) + + col = pl.col(field) + + # Vergleichswert: fester Wert oder aus anderem Feld + if value_from_field: + compare_value = pl.col(value_from_field) + else: + compare_value = value + + # ── Operator-Mapping ────────────────────────────────── + operator_map = { + ">": col > compare_value, + "<": col < compare_value, + ">=": col >= compare_value, + "<=": col <= compare_value, + "==": col == compare_value, + "!=": col != compare_value, + "contains": col.cast(pl.Utf8).str.contains( + str(compare_value), literal=True + ), + "not_contains": ~col.cast(pl.Utf8).str.contains( + str(compare_value), literal=True + ), + "starts_with": col.cast(pl.Utf8).str.starts_with( + str(compare_value) + ), + "ends_with": col.cast(pl.Utf8).str.ends_with( + str(compare_value) + ), + "is_empty": ( + col.is_null() | (col.cast(pl.Utf8) == "") + ), + "is_not_empty": ( + col.is_not_null() & (col.cast(pl.Utf8) != "") + ), + } + + if operator not in operator_map: + logger.error(f"Unbekannter Operator: '{operator}'") + return pl.Series("mask", [False] * df.height) + + return df.select(operator_map[operator]).to_series() + + +def apply_rules( + df: pl.DataFrame, + rules_config: dict, +) -> pl.DataFrame: + """ + Wendet alle aktivierten Regeln aus der rules.yaml auf den + DataFrame an. Regeln werden nach Priorität sortiert und + sequenziell abgearbeitet. + + Args: + df: Polars DataFrame mit den angereicherten Produktdaten. + rules_config: Geladene rules.yaml als Dictionary. + + Returns: + Polars DataFrame nach Anwendung aller Regeln. + """ + rules = rules_config.get("rules", []) + settings = rules_config.get("settings", {}) + + # Regeln nach Priorität sortieren + # (niedrigere Zahl = höhere Priorität) + rules_sorted = sorted( + [r for r in rules if r.get("enabled", True)], + key=lambda r: r.get("priority", 999), + ) + + logger.info(f"Wende {len(rules_sorted)} aktive Regeln an...") + + # ── Globale Ausschlüsse (Settings) ──────────────────── + if settings.get("exclude_out_of_stock", False): + df = df.with_columns( + pl.when(pl.col("stock_quantity") <= 0) + .then(pl.lit(True)) + .otherwise(pl.col("_excluded")) + .alias("_excluded") + ) + excluded = df.filter(pl.col("_excluded")).height + logger.info( + f"Globaler Ausschluss (Bestand=0): " + f"{excluded:,} Artikel markiert" + ) + + min_price = settings.get("minimum_price", 0) + if min_price > 0: + df = df.with_columns( + pl.when(pl.col("VK_Brutto") < min_price) + .then(pl.lit(True)) + .otherwise(pl.col("_excluded")) + .alias("_excluded") + ) + + # ── Regeln sequenziell anwenden ─────────────────────── + for rule in rules_sorted: + rule_name = rule.get("name", "Unbenannt") + conditions_config = rule.get("conditions", {}) + actions = rule.get("actions", []) + logic = conditions_config.get("logic", "and") + sub_rules = conditions_config.get("rules", []) + + logger.debug( + f"Verarbeite Regel: '{rule_name}' " + f"(Priorität: {rule.get('priority', '?')})" + ) + + # Alle Teilbedingungen evaluieren + masks = [ + evaluate_condition(df, cond) for cond in sub_rules + ] + + if not masks: + logger.warning( + f"Regel '{rule_name}' hat keine Bedingungen. " + f"Übersprungen." + ) + continue + + # Logische Verknüpfung der Teilbedingungen + if logic == "and": + combined_mask = masks[0] + for m in masks[1:]: + combined_mask = combined_mask & m + elif logic == "or": + combined_mask = masks[0] + for m in masks[1:]: + combined_mask = combined_mask | m + else: + logger.error( + f"Unbekannte Logik '{logic}' in Regel " + f"'{rule_name}'. Übersprungen." + ) + continue + + matching_count = combined_mask.sum() + logger.info( + f"Regel '{rule_name}': " + f"{matching_count:,} Artikel treffen zu" + ) + + # Aktionen auf übereinstimmende Zeilen anwenden + for action_def in actions: + action_type = action_def.get("action", "set_value") + target_field = action_def.get("target_field", "") + + if action_type == "set_value": + value = action_def.get("value", "") + df = df.with_columns( + pl.when(combined_mask) + .then(pl.lit(value)) + .otherwise(pl.col(target_field)) + .alias(target_field) + ) + + elif action_type == "copy_field": + source = action_def.get("source_field", "") + if source in df.columns: + df = df.with_columns( + pl.when(combined_mask) + .then(pl.col(source)) + .otherwise(pl.col(target_field)) + .alias(target_field) + ) + + elif action_type == "prepend": + template = action_def.get( + "value_template", "" + ) + for col_name in df.columns: + placeholder = f"{{{col_name}}}" + if placeholder in template: + suffix = template.replace( + placeholder, "" + ) + df = df.with_columns( + pl.when(combined_mask) + .then( + pl.col(col_name) + .cast(pl.Utf8) + + pl.lit(suffix) + + pl.col(target_field) + .cast(pl.Utf8) + ) + .otherwise(pl.col(target_field)) + .alias(target_field) + ) + break + + elif action_type == "append": + value = action_def.get("value", "") + df = df.with_columns( + pl.when(combined_mask) + .then( + pl.col(target_field).cast(pl.Utf8) + + pl.lit(value) + ) + .otherwise(pl.col(target_field)) + .alias(target_field) + ) + + elif action_type == "exclude": + reason = action_def.get( + "reason", "Durch Regel ausgeschlossen" + ) + df = df.with_columns( + pl.when(combined_mask) + .then(pl.lit(True)) + .otherwise(pl.col("_excluded")) + .alias("_excluded"), + pl.when(combined_mask) + .then(pl.lit(reason)) + .otherwise(pl.col("_exclude_reason")) + .alias("_exclude_reason"), + ) + + # ── Statistik ───────────────────────────────────────── + total = df.height + excluded_count = df.filter(pl.col("_excluded")).height + active_count = total - excluded_count + logger.info( + f"Transformation abgeschlossen: {total:,} Gesamt | " + f"{active_count:,} Aktiv | " + f"{excluded_count:,} Ausgeschlossen" + ) + + return df +``` + +> ⚠️ **Hinweis:** Dies ist ein konzeptionelles Beispiel-Skript. Die Regel-Engine ist bewusst einfach gehalten und muss für den Produktionseinsatz um Fehlerbehandlung, Validierung der Eingabedaten und weitere Operatoren erweitert werden. Die Feldnamen (`EK_Netto`, `VK_Brutto`, `stock_quantity` usw.) müssen mit den tatsächlichen Spalten Ihres JTL-Exports übereinstimmen. + +## 6.4 Load-Modul (load.py) + +```python +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +/opt/etl-pipeline/scripts/load.py + +Modul für den Batch-Upload transformierter Produktdaten +an die Google Content API for Shopping. +Implementiert Rate-Limiting und exponentielles Backoff. +""" + +import os +import json +import time +import polars as pl +from pathlib import Path +from datetime import datetime + +from google.oauth2 import service_account +from googleapiclient.discovery import build +from googleapiclient.errors import HttpError +from tenacity import ( + retry, + stop_after_attempt, + wait_exponential, + retry_if_exception_type, +) + +from scripts.utils import ( + setup_logging, + load_yaml_config, + CREDENTIALS_DIR, + DATA_DIR, + LOG_DIR, + get_date_stamp, +) + +logger = setup_logging("load") + + +# ══════════════════════════════════════════════════════════ +# Konstanten +# ══════════════════════════════════════════════════════════ +BATCH_SIZE = 1000 # Produkte pro Batch-Request +RATE_LIMIT_DELAY = 1.0 # Sekunden Pause zwischen Batches +MAX_RETRIES = 5 # Maximale Wiederholungsversuche +BACKOFF_MULTIPLIER = 2 # Exponentieller Backoff-Faktor + + +# ══════════════════════════════════════════════════════════ +# Google Content API Client +# ══════════════════════════════════════════════════════════ + +def get_content_api_service(): + """ + Erstellt einen authentifizierten Google Content API Service. + + Returns: + Google API Service-Objekt für Content API v2.1. + """ + credentials_path = os.environ.get( + "GOOGLE_APPLICATION_CREDENTIALS", + str(CREDENTIALS_DIR / "google_service_account.json"), + ) + + credentials = service_account.Credentials.from_service_account_file( + credentials_path, + scopes=["https://www.googleapis.com/auth/content"], + ) + + service = build("content", "v2.1", credentials=credentials) + logger.info("Google Content API Service erstellt.") + return service + + +# ══════════════════════════════════════════════════════════ +# Produkt-Payload-Erstellung +# ══════════════════════════════════════════════════════════ + +def build_product_payload( + row: dict, + column_mapping: dict, + settings: dict, +) -> dict: + """ + Erstellt den API-Payload für ein einzelnes Produkt nach + dem Google Shopping Content API Schema. + + Args: + row: Dictionary mit den Produktdaten einer Zeile. + column_mapping: Spalten-Mapping (JTL → Google). + settings: Globale Pipeline-Einstellungen. + + Returns: + Dictionary im Google Content API Produktformat. + """ + target_country = settings.get("target_country", "DE") + content_language = settings.get("content_language", "de") + currency = settings.get("currency", "EUR") + mapping = column_mapping.get("mapping", {}) + + product = { + "offerId": str(row.get( + mapping.get("Artikelnummer", "Artikelnummer"), "" + )), + "title": str(row.get("title", row.get( + mapping.get("Artikelname", ""), "" + ))), + "description": str(row.get( + mapping.get("Beschreibung", "Beschreibung"), "" + )), + "link": str(row.get( + mapping.get("URL", "URL"), "" + )), + "imageLink": str(row.get( + mapping.get("BildURL", "BildURL"), "" + )), + "contentLanguage": content_language, + "targetCountry": target_country, + "channel": "online", + "availability": ( + "in stock" + if row.get("stock_quantity", 0) > 0 + else "out of stock" + ), + "condition": "new", + "price": { + "value": str(row.get("VK_Brutto", "0.00")), + "currency": currency, + }, + } + + # Optionale Felder hinzufügen + brand = row.get("Marke", row.get("brand", "")) + if brand: + product["brand"] = str(brand) + + gtin = row.get("GTIN", row.get("gtin", "")) + if gtin: + product["gtin"] = str(gtin) + + # Custom Labels (0-4) + for i in range(5): + label_value = row.get(f"custom_label_{i}", "") + if label_value: + product[f"customLabel{i}"] = str(label_value) + + # Sale-Preis + sale_price = row.get("sale_price", "") + if sale_price and float(sale_price) > 0: + product["salePrice"] = { + "value": str(sale_price), + "currency": currency, + } + + # Versandkosten + shipping_cost = row.get("shipping_cost", "") + if shipping_cost is not None and shipping_cost != "": + product["shipping"] = [{ + "country": target_country, + "price": { + "value": str(shipping_cost), + "currency": currency, + }, + }] + + return product + + +# ══════════════════════════════════════════════════════════ +# Batch-Upload mit Rate Limiting +# ══════════════════════════════════════════════════════════ + +@retry( + stop=stop_after_attempt(MAX_RETRIES), + wait=wait_exponential( + multiplier=BACKOFF_MULTIPLIER, min=2, max=60 + ), + retry=retry_if_exception_type( + (HttpError, ConnectionError, TimeoutError) + ), + before_sleep=lambda retry_state: logger.warning( + f"Retry {retry_state.attempt_number}/{MAX_RETRIES} " + f"nach Fehler. " + f"Warte {retry_state.next_action.sleep:.1f}s..." + ), +) +def execute_batch_request( + service, + merchant_id: str, + batch_entries: list[dict], +) -> dict: + """ + Führt einen einzelnen Batch-Request an die Content API aus. + Implementiert exponentielles Backoff bei Rate-Limit-Fehlern. + + Args: + service: Google Content API Service. + merchant_id: Merchant Center ID. + batch_entries: Liste der Batch-Einträge. + + Returns: + API-Antwort als Dictionary. + """ + body = {"entries": batch_entries} + + response = ( + service.products() + .custombatch(body=body) + .execute() + ) + + return response + + +def upload_products_to_google( + df: pl.DataFrame, + batch_size: int = BATCH_SIZE, + rate_limit_delay: float = RATE_LIMIT_DELAY, +) -> dict: + """ + Lädt alle aktiven (nicht ausgeschlossenen) Produkte als + Batch-Requests an die Google Content API hoch. + + Args: + df: Polars DataFrame mit transformierten Produktdaten. + batch_size: Anzahl der Produkte pro Batch-Request. + rate_limit_delay: Pause in Sekunden zwischen Batches. + + Returns: + Dictionary mit Upload-Statistiken. + """ + merchant_id = os.environ.get("MERCHANT_CENTER_ID") + if not merchant_id: + raise ValueError("MERCHANT_CENTER_ID ist nicht gesetzt") + + # Konfiguration laden + column_mapping = load_yaml_config("column_mapping.yaml") + rules_config = load_yaml_config("rules.yaml") + settings = rules_config.get("settings", {}) + + # Nur aktive (nicht ausgeschlossene) Produkte uploaden + df_active = df.filter(~pl.col("_excluded")) + total_products = df_active.height + + logger.info( + f"Starte Upload: {total_products:,} aktive Produkte " + f"in Batches von {batch_size}" + ) + + # Service erstellen + service = get_content_api_service() + + # ── Statistiken ─────────────────────────────────────── + stats = { + "total": total_products, + "success": 0, + "errors": 0, + "batches": 0, + "start_time": datetime.now().isoformat(), + "error_details": [], + } + + # ── Produkte in Batches aufteilen und hochladen ─────── + rows_as_dicts = df_active.to_dicts() + total_batches = ( + (total_products + batch_size - 1) // batch_size + ) + + for batch_start in range(0, total_products, batch_size): + batch_end = min(batch_start + batch_size, total_products) + batch_rows = rows_as_dicts[batch_start:batch_end] + batch_number = (batch_start // batch_size) + 1 + + logger.info( + f"Batch {batch_number}/{total_batches}: " + f"Produkte {batch_start + 1:,} – {batch_end:,}" + ) + + # Batch-Einträge erstellen + batch_entries = [] + for idx, row in enumerate(batch_rows): + product_payload = build_product_payload( + row, column_mapping, settings + ) + batch_entries.append({ + "batchId": batch_start + idx, + "merchantId": merchant_id, + "method": "insert", + "product": product_payload, + }) + + # Batch-Request ausführen (mit Retry/Backoff) + try: + response = execute_batch_request( + service, merchant_id, batch_entries + ) + + # Antwort auswerten + for entry in response.get("entries", []): + if ( + "errors" in entry + and entry["errors"].get("errors") + ): + stats["errors"] += 1 + batch_idx = entry.get("batchId", 0) + local_idx = batch_idx - batch_start + offer_id = "unbekannt" + if 0 <= local_idx < len(batch_rows): + offer_id = batch_rows[local_idx].get( + "Artikelnummer", "unbekannt" + ) + error_info = { + "batchId": batch_idx, + "offerId": offer_id, + "errors": entry["errors"]["errors"], + } + stats["error_details"].append(error_info) + logger.warning( + f"Fehler bei Produkt {offer_id}: " + f"{entry['errors']['errors']}" + ) + else: + stats["success"] += 1 + + except HttpError as e: + if e.resp.status == 429: + logger.warning( + f"Rate Limit erreicht bei Batch " + f"{batch_number}. Erhöhe Wartezeit..." + ) + time.sleep(rate_limit_delay * 5) + else: + logger.error( + f"HTTP-Fehler bei Batch " + f"{batch_number}: {e}" + ) + stats["errors"] += len(batch_rows) + + except Exception as e: + logger.error( + f"Unerwarteter Fehler bei Batch " + f"{batch_number}: {e}" + ) + stats["errors"] += len(batch_rows) + + stats["batches"] += 1 + + # ── Rate Limiting: Pause zwischen Batches ───────── + if batch_end < total_products: + logger.debug( + f"Rate Limiting: {rate_limit_delay}s Pause..." + ) + time.sleep(rate_limit_delay) + + # ── Upload-Statistiken loggen ───────────────────────── + stats["end_time"] = datetime.now().isoformat() + stats["duration_seconds"] = ( + datetime.fromisoformat(stats["end_time"]) + - datetime.fromisoformat(stats["start_time"]) + ).total_seconds() + + logger.info( + f"Upload abgeschlossen: " + f"{stats['success']:,} erfolgreich | " + f"{stats['errors']:,} Fehler | " + f"{stats['batches']} Batches | " + f"{stats['duration_seconds']:.1f}s Gesamtdauer" + ) + + # ── Fehlerdetails als JSON speichern ────────────────── + if stats["error_details"]: + error_dir = LOG_DIR / "upload_errors" + error_dir.mkdir(parents=True, exist_ok=True) + error_file = ( + error_dir / f"errors_{get_date_stamp()}.json" + ) + with open(error_file, "w", encoding="utf-8") as f: + json.dump( + stats["error_details"], + f, + indent=2, + ensure_ascii=False, + ) + logger.info(f"Fehlerdetails gespeichert: {error_file}") + + return stats +``` + +> ⚠️ **Hinweis:** Dies ist ein konzeptionelles Beispiel-Skript. Die Batch-Größe, Rate-Limiting-Werte und API-Payloads müssen an Ihre spezifischen API-Quotas und Produktdatenstrukturen angepasst werden. Testen Sie den Upload zunächst mit einer kleinen Teilmenge (z. B. 10 Produkte) im Testmodus der Google Content API, bevor Sie den vollständigen Datensatz hochladen. + +## 6.5 Hauptskript (main.py) + +```python +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +/opt/etl-pipeline/scripts/main.py + +Hauptskript (Entry Point) für die ETL-Pipeline. +Orchestriert den gesamten Ablauf: Extract → Transform → Load. + +Aufruf: + cd /opt/etl-pipeline + source venv/bin/activate + python -m scripts.main +""" + +import sys +import time +import duckdb +from pathlib import Path + +from scripts.utils import ( + setup_logging, + load_yaml_config, + BASE_DIR, + DATA_DIR, + get_timestamp, +) +from scripts.extract import ( + load_jtl_csv_to_duckdb, + fetch_google_sales_data, +) +from scripts.transform import ( + join_products_with_sales, + add_calculated_fields, + apply_rules, +) +from scripts.load import upload_products_to_google + +logger = setup_logging("main") + + +def run_pipeline() -> dict: + """ + Führt die vollständige ETL-Pipeline aus. + + Returns: + Dictionary mit Statistiken aller drei Phasen. + + Raises: + SystemExit: Bei kritischen Fehlern mit Exit-Code 1. + """ + pipeline_start = time.time() + run_id = get_timestamp() + + logger.info(f"{'═' * 60}") + logger.info(f"ETL-Pipeline gestartet — Run-ID: {run_id}") + logger.info(f"{'═' * 60}") + + stats = { + "run_id": run_id, + "extract": {}, + "transform": {}, + "load": {}, + } + + # ══════════════════════════════════════════════════════ + # DuckDB-Verbindung herstellen + # ══════════════════════════════════════════════════════ + db_path = DATA_DIR / "db" / "products.duckdb" + db_path.parent.mkdir(parents=True, exist_ok=True) + + con = duckdb.connect(str(db_path)) + logger.info(f"DuckDB-Verbindung hergestellt: {db_path}") + + try: + # ══════════════════════════════════════════════════ + # PHASE 1: EXTRACT + # ══════════════════════════════════════════════════ + logger.info(f"{'─' * 40}") + logger.info("PHASE 1: EXTRACT") + logger.info(f"{'─' * 40}") + + # JTL CSV laden + jtl_rows = load_jtl_csv_to_duckdb(con) + stats["extract"]["jtl_rows"] = jtl_rows + + # Google Sales-Daten abrufen + sales_rows = fetch_google_sales_data(con) + stats["extract"]["sales_rows"] = sales_rows + + # ══════════════════════════════════════════════════ + # PHASE 2: TRANSFORM + # ══════════════════════════════════════════════════ + logger.info(f"{'─' * 40}") + logger.info("PHASE 2: TRANSFORM") + logger.info(f"{'─' * 40}") + + # LEFT JOIN: Produkte ⟕ Sales + df = join_products_with_sales(con) + + # Berechnete Felder hinzufügen + df = add_calculated_fields(df) + + # YAML-Regeln anwenden + rules_config = load_yaml_config("rules.yaml") + df = apply_rules(df, rules_config) + + stats["transform"]["total_products"] = df.height + stats["transform"]["active_products"] = df.filter( + ~df["_excluded"] + ).height + stats["transform"]["excluded_products"] = df.filter( + df["_excluded"] + ).height + + # ══════════════════════════════════════════════════ + # PHASE 3: LOAD + # ══════════════════════════════════════════════════ + logger.info(f"{'─' * 40}") + logger.info("PHASE 3: LOAD") + logger.info(f"{'─' * 40}") + + upload_stats = upload_products_to_google(df) + stats["load"] = upload_stats + + except Exception as e: + logger.critical( + f"Pipeline fehlgeschlagen: {e}", exc_info=True + ) + # Exit-Code 1 signalisiert n8n einen Fehler + sys.exit(1) + + finally: + con.close() + logger.info("DuckDB-Verbindung geschlossen.") + + # ══════════════════════════════════════════════════════ + # Zusammenfassung + # ══════════════════════════════════════════════════════ + pipeline_duration = time.time() - pipeline_start + stats["duration_seconds"] = round(pipeline_duration, 2) + + logger.info(f"{'═' * 60}") + logger.info( + f"ETL-Pipeline abgeschlossen — " + f"Dauer: {pipeline_duration:.1f}s" + ) + logger.info( + f" Extract: " + f"{stats['extract'].get('jtl_rows', 0):,} JTL, " + f"{stats['extract'].get('sales_rows', 0):,} Sales" + ) + logger.info( + f" Transform: " + f"{stats['transform'].get('active_products', 0):,} aktiv, " + f"{stats['transform'].get('excluded_products', 0):,} " + f"ausgeschlossen" + ) + logger.info( + f" Load: " + f"{stats['load'].get('success', 0):,} OK, " + f"{stats['load'].get('errors', 0):,} Fehler" + ) + logger.info(f"{'═' * 60}") + + return stats + + +# ────────────────────────────────────────────────────────── +# Entry Point +# ────────────────────────────────────────────────────────── +if __name__ == "__main__": + run_pipeline() +``` + +> ⚠️ **Hinweis:** Dies ist ein konzeptionelles Beispiel-Skript. Es dient als Ausgangspunkt und Entry Point für die Pipeline. Vor dem produktiven Einsatz sollten alle Module gründlich mit Testdaten validiert und die Fehlerbehandlung an Ihre Anforderungen angepasst werden. + +--- + +# 7. n8n Workflow Integration + +## 7.1 Übersicht + +Der n8n Workflow übernimmt zwei zentrale Aufgaben: + +1. **Scheduling:** Tägliche Ausführung der Python-Pipeline um **00:00 Uhr** (Mitternacht). +2. **Error Handling:** Benachrichtigung per E-Mail (oder Slack/Telegram) bei Fehlern. + +Die Kommunikation zwischen n8n (Docker-Container) und der Python-Pipeline auf dem Host-System erfolgt über den n8n-Node **Execute Command**, der Shell-Befehle im Container ausführt. Da das Pipeline-Verzeichnis als Volume gemountet ist, hat n8n Zugriff auf die Skripte. + +## 7.2 Workflow-Architektur + +```mermaid +flowchart TD + CRON["⏰ Cron Trigger
täglich 00:00 Uhr"] + EXEC["⚡ Execute Command
/opt/etl-pipeline/venv/bin/python
-m scripts.main
"] + CHECK{"🔍 Exit-Code
prüfen
"} + SUCCESS["✅ Erfolgs-Meldung
Log + optionale E-Mail"] + ERROR["❌ Fehler-Alarm
E-Mail / Slack
Benachrichtigung
"] + + CRON --> EXEC + EXEC --> CHECK + CHECK -->|"Exit = 0 — OK"| SUCCESS + CHECK -->|"Exit = 1 — Fehler"| ERROR + + style CRON fill:#e58e26,stroke:#fa983a,color:#fff + style EXEC fill:#0c2461,stroke:#1e3799,color:#fff + style CHECK fill:#474787,stroke:#706fd3,color:#fff + style SUCCESS fill:#079992,stroke:#38ada9,color:#fff + style ERROR fill:#b71540,stroke:#e55039,color:#fff +``` + +## 7.3 Workflow als n8n-JSON importieren + +Der folgende JSON-Export kann direkt in n8n importiert werden unter **Workflow → Import from File**. Speichern Sie den Inhalt als `.json`-Datei. + +```json +{ + "name": "ETL Pipeline - Google Shopping (Daily)", + "nodes": [ + { + "parameters": { + "rule": { + "interval": [ + { + "triggerAtHour": 0, + "triggerAtMinute": 0 + } + ] + } + }, + "name": "Cron Trigger (00:00)", + "type": "n8n-nodes-base.scheduleTrigger", + "typeVersion": 1.2, + "position": [240, 300] + }, + { + "parameters": { + "command": "cd /opt/etl-pipeline && source venv/bin/activate && python -m scripts.main 2>&1", + "timeout": 3600 + }, + "name": "Execute ETL Pipeline", + "type": "n8n-nodes-base.executeCommand", + "typeVersion": 1, + "position": [480, 300] + }, + { + "parameters": { + "conditions": { + "number": [ + { + "value1": "={{ $json.exitCode }}", + "operation": "equal", + "value2": 0 + } + ] + } + }, + "name": "Check Exit Code", + "type": "n8n-nodes-base.if", + "typeVersion": 1, + "position": [720, 300] + }, + { + "parameters": { + "fromEmail": "etl-pipeline@ihre-domain.de", + "toEmail": "admin@ihre-domain.de", + "subject": "✅ ETL Pipeline — Erfolg ({{ $now.format('yyyy-MM-dd') }})", + "text": "Die ETL-Pipeline wurde erfolgreich ausgeführt.\n\nZeitstempel: {{ $now.format('yyyy-MM-dd HH:mm:ss') }}\n\nStdout:\n{{ $json.stdout }}" + }, + "name": "Success E-Mail", + "type": "n8n-nodes-base.emailSend", + "typeVersion": 2, + "position": [960, 200] + }, + { + "parameters": { + "fromEmail": "etl-pipeline@ihre-domain.de", + "toEmail": "admin@ihre-domain.de", + "subject": "❌ ETL Pipeline — FEHLER ({{ $now.format('yyyy-MM-dd') }})", + "text": "ACHTUNG: Die ETL-Pipeline ist fehlgeschlagen!\n\nZeitstempel: {{ $now.format('yyyy-MM-dd HH:mm:ss') }}\nExit-Code: {{ $json.exitCode }}\n\nStderr:\n{{ $json.stderr }}\n\nStdout:\n{{ $json.stdout }}\n\nBitte prüfen: /opt/etl-pipeline/logs/error.log" + }, + "name": "Error E-Mail", + "type": "n8n-nodes-base.emailSend", + "typeVersion": 2, + "position": [960, 400] + } + ], + "connections": { + "Cron Trigger (00:00)": { + "main": [ + [{ "node": "Execute ETL Pipeline", "type": "main", "index": 0 }] + ] + }, + "Execute ETL Pipeline": { + "main": [ + [{ "node": "Check Exit Code", "type": "main", "index": 0 }] + ] + }, + "Check Exit Code": { + "main": [ + [{ "node": "Success E-Mail", "type": "main", "index": 0 }], + [{ "node": "Error E-Mail", "type": "main", "index": 0 }] + ] + } + }, + "settings": { + "timezone": "Europe/Berlin", + "saveExecutionProgress": true, + "saveManualExecutions": true, + "executionTimeout": 7200 + } +} +``` + +> ⚠️ **Hinweis:** Dies ist ein Beispiel-Workflow für n8n. Die E-Mail-Adressen, SMTP-Einstellungen und Timeouts müssen an Ihre Umgebung angepasst werden. Konfigurieren Sie die SMTP-Credentials in n8n unter **Settings → Credentials** bevor Sie den Workflow aktivieren. + +## 7.4 Wichtige n8n-Konfigurationshinweise + +### Timeout-Einstellung + +Da die Verarbeitung von 200.000+ Artikeln je nach Serverhardware zwischen **5 und 30 Minuten** dauern kann, muss der Timeout im `Execute Command`-Node ausreichend hoch gesetzt werden: + +``` +timeout: 3600 # 1 Stunde (in Sekunden) +``` + +### Volume-Mount beachten + +Der `Execute Command`-Node führt Befehle **innerhalb des n8n Docker-Containers** aus. Damit die Python-Skripte und das Virtual Environment erreichbar sind, muss der Pfad `/opt/etl-pipeline` als Volume im `docker-compose.yml` gemountet sein (siehe Abschnitt 3.1.2). + +### Alternative: Host-Befehl über SSH + +Falls die Pipeline nicht innerhalb des Containers laufen soll, kann alternativ ein SSH-Node verwendet werden, der den Befehl auf dem Host ausführt: + +```bash +# Befehl für n8n SSH-Node: +ssh pipeline-user@localhost \ + "cd /opt/etl-pipeline && source venv/bin/activate && python -m scripts.main" +``` + +> ⚠️ **Hinweis:** Beispielbefehl. Für die SSH-Variante muss ein SSH-Key ohne Passphrase konfiguriert und die SSH-Credentials in n8n hinterlegt werden. + +## 7.5 Manueller Pipeline-Start (Debugging) + +Für Debugging und Tests kann die Pipeline jederzeit manuell gestartet werden: + +```bash +# ────────────────────────────────────────────────────────── +# Pipeline manuell ausführen +# ────────────────────────────────────────────────────────── +cd /opt/etl-pipeline +source venv/bin/activate +python -m scripts.main + +# Alternativ: Nur den Exit-Code prüfen +python -m scripts.main; echo "Exit-Code: $?" +``` + +> ⚠️ **Hinweis:** Beispielbefehle für den manuellen Start. Stellen Sie sicher, dass die Umgebungsvariablen (`GOOGLE_APPLICATION_CREDENTIALS`, `MERCHANT_CENTER_ID`) in der Shell-Session gesetzt sind. + +--- + +# 8. Wartung & Monitoring + +## 8.1 Log-Management + +### 8.1.1 Log-Struktur + +Die Pipeline erzeugt folgende Log-Dateien: + +| Datei | Inhalt | Rotation | +|----------------------------------------------|--------------------------------------|----------------------| +| `logs/etl_pipeline.log` | Alle Log-Einträge (INFO+) | 10 MB, 5 Backups | +| `logs/error.log` | Nur Fehler (ERROR+) | 10 MB, 5 Backups | +| `logs/upload_errors/errors_YYYYMMDD.json` | Detail-Fehler pro Upload-Lauf | Täglich, eine Datei | + +### 8.1.2 Logrotate-Konfiguration + +Zusätzlich zur integrierten Python-Rotation empfiehlt sich eine systemweite Logrotate-Konfiguration als Sicherheitsnetz. + +Erstellen Sie die Datei `/etc/logrotate.d/etl-pipeline`: + +``` +/opt/etl-pipeline/logs/*.log { + daily + missingok + rotate 30 + compress + delaycompress + notifempty + create 0640 root root + sharedscripts + postrotate + # Optional: Signal an laufende Prozesse + /bin/true + endscript +} + +/opt/etl-pipeline/logs/upload_errors/*.json { + daily + missingok + rotate 90 + compress + delaycompress + notifempty + create 0640 root root +} +``` + +> ⚠️ **Hinweis:** Dies ist eine Beispiel-Logrotate-Konfiguration. Passen Sie den Benutzer, die Aufbewahrungsdauer (`rotate`) und die Dateiberechtigungen an Ihre Sicherheitsrichtlinien an. + +### 8.1.3 Log-Bereinigung (Cron) + +Alte Log-Dateien und Upload-Fehlerprotokolle können zusätzlich per Cron bereinigt werden: + +```bash +# ────────────────────────────────────────────────────────── +# Crontab-Eintrag: Logs älter als 90 Tage löschen +# ────────────────────────────────────────────────────────── +# crontab -e +0 3 * * 0 find /opt/etl-pipeline/logs/ -name "*.log.*" -mtime +90 -delete +0 3 * * 0 find /opt/etl-pipeline/logs/upload_errors/ -name "*.json" -mtime +90 -delete +0 3 * * 0 find /opt/etl-pipeline/data/archive/ -name "*.csv" -mtime +180 -delete +``` + +> ⚠️ **Hinweis:** Beispiel-Crontab-Einträge. Passen Sie die Aufbewahrungsfristen (hier: 90 bzw. 180 Tage) an Ihre Compliance-Anforderungen an. + +## 8.2 Performance-Tipps für 200.000+ Artikel + +### 8.2.1 DuckDB-Optimierungen + +```python +# ────────────────────────────────────────────────────────── +# DuckDB Performance-Konfiguration +# (am Anfang der Pipeline setzen) +# ────────────────────────────────────────────────────────── + +con = duckdb.connect("products.duckdb") + +# Verfügbaren Arbeitsspeicher für DuckDB erhöhen +con.execute("SET memory_limit = '4GB'") + +# Alle verfügbaren CPU-Kerne nutzen +con.execute("SET threads TO 8") + +# Temporäre Dateien auf schnelle SSD legen +con.execute("SET temp_directory = '/opt/etl-pipeline/data/tmp'") + +# Fortschrittsanzeige für lange Queries +con.execute("SET enable_progress_bar = true") +``` + +> ⚠️ **Hinweis:** Beispiel-Konfiguration. Die Werte für `memory_limit` und `threads` müssen an die tatsächliche Serverausstattung angepasst werden. + +### 8.2.2 Polars-Optimierungen + +```python +# ────────────────────────────────────────────────────────── +# Polars Performance-Tipps +# ────────────────────────────────────────────────────────── + +import polars as pl + +# 1. Lazy Evaluation verwenden (bei sehr großen Datensätzen) +# Polars optimiert den Query-Plan automatisch +df_lazy = pl.scan_csv( + "data/input/jtl_export.csv", separator=";" +) +df_result = ( + df_lazy + .filter(pl.col("Lagerbestand") > 0) + .with_columns([ + pl.col("VK_Brutto").cast(pl.Float64), + ]) + .collect() # Erst hier werden die Daten materialisiert +) + +# 2. Streaming-Modus für Datensätze > verfügbares RAM +df_result = ( + df_lazy + .filter(pl.col("Lagerbestand") > 0) + .collect(streaming=True) # Verarbeitet in Chunks +) + +# 3. Datentypen optimieren (reduziert RAM-Verbrauch) +df = df.with_columns([ + pl.col("Lagerbestand").cast(pl.Int32), # statt Int64 + pl.col("clicks").cast(pl.Int32), # statt Int64 + pl.col("impressions").cast(pl.Int32), # statt Int64 + pl.col("custom_label_0").cast(pl.Categorical), # statt Utf8 + pl.col("custom_label_1").cast(pl.Categorical), # statt Utf8 +]) + +# 4. Spaltenauswahl: Nur benötigte Spalten laden +df = pl.read_csv( + "data.csv", + columns=["sku", "title", "price", "stock"], +) +``` + +> ⚠️ **Hinweis:** Dies sind Beispiel-Codeausschnitte zur Performance-Optimierung. Die tatsächlichen Spaltennamen und Datentypen müssen an Ihren Datensatz angepasst werden. + +### 8.2.3 Google API Upload-Optimierungen + +| Parameter | Empfehlung | Begründung | +|--------------------------|--------------------------|--------------------------------------------------| +| `BATCH_SIZE` | 1.000 | Google empfiehlt max. 1.000 Einträge pro Batch | +| `RATE_LIMIT_DELAY` | 1,0 – 2,0 Sekunden | Verhindert 429-Fehler bei Standard-Quotas | +| `MAX_RETRIES` | 5 | Ausreichend für temporäre Fehler | +| Parallele Requests | Nicht empfohlen | Erhöht 429-Risiko, sequenziell ist sicherer | +| Geschätzte Upload-Dauer | 10 – 20 Minuten | Bei 200 Batches × 1s Delay + API-Latenz | + +### 8.2.4 Allgemeine Server-Monitoring-Befehle + +```bash +# ────────────────────────────────────────────────────────── +# System-Monitoring während Pipeline-Lauf +# ────────────────────────────────────────────────────────── + +# RAM- und CPU-Auslastung in Echtzeit beobachten +htop + +# Festplatten-I/O überwachen +iostat -x 5 + +# Pipeline-Prozess gezielt überwachen +watch -n 2 "ps aux | grep 'python.*scripts.main' | grep -v grep" + +# Festplattennutzung des Pipeline-Verzeichnisses +du -sh /opt/etl-pipeline/data/* + +# DuckDB-Dateigröße prüfen +ls -lh /opt/etl-pipeline/data/db/products.duckdb +``` + +> ⚠️ **Hinweis:** Beispielbefehle für das System-Monitoring. Diese dienen zur Orientierung und sollten je nach Ihren Monitoring-Tools (z. B. Prometheus, Grafana, Zabbix) ergänzt oder ersetzt werden. + +## 8.3 Backup-Empfehlungen + +Die folgenden Dateien und Verzeichnisse sollten regelmäßig gesichert werden: + +| Pfad | Priorität | Frequenz | +|------------------------------------------------|-----------|----------------| +| `/opt/etl-pipeline/config/` | Hoch | Bei Änderung | +| `/opt/etl-pipeline/credentials/` | Kritisch | Bei Änderung | +| `/opt/etl-pipeline/scripts/` | Hoch | Bei Änderung | +| `/opt/etl-pipeline/data/db/products.duckdb` | Mittel | Täglich | +| `/opt/n8n/data/` | Hoch | Wöchentlich | + +```bash +#!/bin/bash +# ────────────────────────────────────────────────────────── +# Einfaches Backup-Skript (Beispiel) +# ────────────────────────────────────────────────────────── +BACKUP_DIR="/backup/etl-pipeline/$(date +%Y%m%d)" +mkdir -p "$BACKUP_DIR" + +# Konfiguration und Skripte sichern +tar czf "$BACKUP_DIR/config_scripts.tar.gz" \ + /opt/etl-pipeline/config/ \ + /opt/etl-pipeline/scripts/ \ + /opt/etl-pipeline/requirements.txt + +# DuckDB sichern +cp /opt/etl-pipeline/data/db/products.duckdb \ + "$BACKUP_DIR/products.duckdb" + +# n8n Workflows sichern +tar czf "$BACKUP_DIR/n8n_data.tar.gz" /opt/n8n/data/ + +echo "Backup erstellt: $BACKUP_DIR" +``` + +> ⚠️ **Hinweis:** Dies ist ein vereinfachtes Beispiel-Backup-Skript. Für den Produktionseinsatz sollte eine vollwertige Backup-Lösung (z. B. BorgBackup, Restic) mit Verschlüsselung und Off-Site-Speicherung verwendet werden. + +## 8.4 Checkliste für den Produktivbetrieb + +Vor der Aktivierung der automatisierten Pipeline sollten folgende Punkte abgearbeitet sein: + +- [ ] Python Virtual Environment erstellt und alle Abhängigkeiten installiert +- [ ] DuckDB-Datenbank initialisiert und CSV-Import getestet +- [ ] Google Service Account erstellt und in Merchant Center eingeladen +- [ ] `GOOGLE_APPLICATION_CREDENTIALS` und `MERCHANT_CENTER_ID` als Umgebungsvariablen gesetzt +- [ ] `rules.yaml` mit initialen Regeln konfiguriert +- [ ] `column_mapping.yaml` an JTL-Export-Spalten angepasst +- [ ] Pipeline manuell mit Testdaten durchgelaufen (alle 3 Phasen) +- [ ] n8n Workflow importiert und Cron-Trigger konfiguriert +- [ ] E-Mail-/Slack-Benachrichtigung bei Fehlern getestet +- [ ] Logrotate-Konfiguration aktiv +- [ ] Backup-Strategie implementiert +- [ ] Firewall-Regeln: Ausgehender Traffic zu Google APIs erlaubt +- [ ] Monitoring-Dashboard eingerichtet (optional) + +--- + +# Anhang + +## A. Nützliche Befehle (Kurzreferenz) + +```bash +# ── Pipeline ────────────────────────────────────────────── +cd /opt/etl-pipeline && source venv/bin/activate +python -m scripts.main # Pipeline starten +python -m scripts.main 2>&1 | tee run.log # Mit Log-Ausgabe + +# ── n8n ─────────────────────────────────────────────────── +cd /opt/n8n +docker compose up -d # Starten +docker compose down # Stoppen +docker compose logs -f --tail=100 # Logs anzeigen +docker compose restart # Neustart + +# ── DuckDB CLI ──────────────────────────────────────────── +cd /opt/etl-pipeline && source venv/bin/activate +python -c " +import duckdb +con = duckdb.connect('data/db/products.duckdb') +print(con.execute('SHOW TABLES').fetchall()) +print(con.execute('SELECT COUNT(*) FROM products_raw').fetchone()) +con.close() +" + +# ── Logs prüfen ────────────────────────────────────────── +tail -f /opt/etl-pipeline/logs/etl_pipeline.log +tail -100 /opt/etl-pipeline/logs/error.log +cat /opt/etl-pipeline/logs/upload_errors/errors_$(date +%Y%m%d).json | jq . + +# ── Disk-Usage ──────────────────────────────────────────── +du -sh /opt/etl-pipeline/data/db/* +du -sh /opt/etl-pipeline/logs/* +df -h /opt/etl-pipeline/ +``` + +> ⚠️ **Hinweis:** Beispiel-Befehlsreferenz. Alle Pfade und Befehle müssen an Ihre Serverumgebung angepasst werden. + +## B. Fehlerbehandlung (Häufige Probleme) + +| Problem | Mögliche Ursache | Lösung | +|------------------------------------------------------|------------------------------------------|--------------------------------------------------------| +| `FileNotFoundError: jtl_export.csv` | CSV nicht am erwarteten Pfad | JTL-Export-Pfad in `settings.yaml` prüfen | +| `google.auth.exceptions.DefaultCredentialsError` | Credentials nicht gefunden | `GOOGLE_APPLICATION_CREDENTIALS` prüfen | +| `HttpError 429: Too Many Requests` | API-Quote überschritten | `RATE_LIMIT_DELAY` erhöhen (z. B. auf 2–3s) | +| `MemoryError` bei Polars | RAM nicht ausreichend | Polars Streaming-Modus oder RAM aufstocken | +| n8n: Timeout nach 600s | Pipeline dauert zu lang | Timeout im Execute Command auf 3600s erhöhen | +| DuckDB: `IO Error` | Festplatte voll | Archiv- und Temp-Dateien bereinigen | +| `ModuleNotFoundError` | venv nicht aktiviert | `source venv/bin/activate` vor Ausführung | + +--- + +*Dokumentation erstellt am 2026-03-06 | Version 1.0.0* +*Letzte Aktualisierung: 2026-03-06*