diff --git a/IT_Abteilung/Automatisierung/Channable_stack.md b/IT_Abteilung/Automatisierung/Channable_stack.md
new file mode 100644
index 0000000..5ec290a
--- /dev/null
+++ b/IT_Abteilung/Automatisierung/Channable_stack.md
@@ -0,0 +1,2847 @@
+---
+title: Channable Stack LT24 Profice Dokumentation
+description:
+published: true
+date: 2026-03-06T11:09:09.239Z
+tags:
+editor: markdown
+dateCreated: 2026-03-06T11:09:09.239Z
+---
+
+# Self-Hosted ETL-Pipeline für Google Shopping
+
+## Lokale Alternative zu Channable — Technische Dokumentation
+
+---
+
+| Eigenschaft | Wert |
+|--------------------|------------------------------------------------|
+| **Dokumentversion** | 1.0.0 |
+| **Erstellt am** | 2026-03-06 |
+| **Zielplattform** | Debian 12 (Bookworm) Server |
+| **Tech-Stack** | Python 3.11+ · Polars · DuckDB · n8n (Docker) |
+| **Datensatzgröße** | 200.000+ Artikel, 30+ Attribute pro Artikel |
+| **Ziel-API** | Google Content API for Shopping |
+| **Sprache** | Deutsch |
+
+---
+
+## Inhaltsverzeichnis
+
+1. [Systemübersicht](#1-systemübersicht)
+2. [Voraussetzungen](#2-voraussetzungen)
+3. [Installation & Setup](#3-installation--setup)
+4. [Projektstruktur](#4-projektstruktur)
+5. [Konfiguration](#5-konfiguration)
+6. [Core Script Logic (Beispiel)](#6-core-script-logic-beispiel)
+7. [n8n Workflow Integration](#7-n8n-workflow-integration)
+8. [Wartung & Monitoring](#8-wartung--monitoring)
+
+---
+
+# 1. Systemübersicht
+
+## 1.1 Architektur-Überblick
+
+Die Pipeline ersetzt den kommerziellen SaaS-Dienst **Channable** durch eine vollständig selbst gehostete, Open-Source-basierte Lösung auf einem dedizierten Debian-Server. Der gesamte Datenfluss folgt dem klassischen **ETL-Muster** (Extract → Transform → Load) und wird täglich automatisiert durch **n8n** orchestriert.
+
+### Architektur-Diagramm
+
+```mermaid
+graph TB
+ subgraph SERVER["🖥️ DEBIAN SERVER — Docker-Host"]
+ direction TB
+
+ subgraph N8N["⚙️ n8n — Docker Container"]
+ CRON["⏰ Cron-Trigger
täglich 00:00 Uhr"]
+ ERR_HANDLER["🔔 Error-Handler
E-Mail / Slack Alert"]
+ end
+
+ subgraph PYTHON["🐍 PYTHON 3.11+ — Virtual Environment"]
+ direction LR
+
+ subgraph EXTRACT["📥 EXTRACT"]
+ JTL["📄 JTL-WAWI
CSV-Export"]
+ GAPI["☁️ Google Ads/GA
API — 90 Tage"]
+ end
+
+ subgraph TRANSFORM["🔄 TRANSFORM"]
+ DUCKDB_JOIN["🦆 DuckDB
LEFT JOIN
Produkte ⟕ Sales"]
+ POLARS["⚡ Polars
Regel-Engine
rules.yaml"]
+ end
+
+ subgraph LOAD["📤 LOAD"]
+ GCONTENT["🛒 Google
Content API
Batch-Upload
mit Rate-Limiting"]
+ end
+ end
+
+ DUCKDB_FILE[("💾 DuckDB
products.duckdb")]
+ LOGS["📋 Logs & Monitoring
/opt/etl-pipeline/logs/"]
+ end
+
+ CRON -->|"startet Pipeline"| JTL
+ CRON -->|"startet Pipeline"| GAPI
+ JTL -->|"CSV Import"| DUCKDB_JOIN
+ GAPI -->|"Sales-Daten"| DUCKDB_JOIN
+ DUCKDB_JOIN -->|"DataFrame"| POLARS
+ POLARS -->|"transformierte Daten"| GCONTENT
+ DUCKDB_JOIN -.->|"persistiert"| DUCKDB_FILE
+ ERR_HANDLER -.->|"bei Exit-Code 1"| LOGS
+
+ style SERVER fill:#1a1a2e,stroke:#16213e,color:#e0e0e0
+ style N8N fill:#2d3436,stroke:#636e72,color:#dfe6e9
+ style PYTHON fill:#2d3436,stroke:#636e72,color:#dfe6e9
+ style EXTRACT fill:#0a3d62,stroke:#3c6382,color:#dfe6e9
+ style TRANSFORM fill:#1e3799,stroke:#4a69bd,color:#dfe6e9
+ style LOAD fill:#6a0572,stroke:#a834a8,color:#dfe6e9
+ style JTL fill:#079992,stroke:#38ada9,color:#fff
+ style GAPI fill:#079992,stroke:#38ada9,color:#fff
+ style DUCKDB_JOIN fill:#0c2461,stroke:#1e3799,color:#fff
+ style POLARS fill:#0c2461,stroke:#1e3799,color:#fff
+ style GCONTENT fill:#6a0572,stroke:#a834a8,color:#fff
+ style DUCKDB_FILE fill:#b71540,stroke:#e55039,color:#fff
+ style LOGS fill:#474787,stroke:#706fd3,color:#fff
+ style CRON fill:#e58e26,stroke:#fa983a,color:#fff
+ style ERR_HANDLER fill:#e55039,stroke:#eb2f06,color:#fff
+```
+
+### ETL-Datenfluss (vereinfacht)
+
+```mermaid
+flowchart LR
+ A["📄 JTL CSV
200k+ Artikel"] --> C["🦆 DuckDB
LEFT JOIN"]
+ B["☁️ Google API
Sales 90 Tage"] --> C
+ C --> D["⚡ Polars
rules.yaml
Custom Labels
Titel-Optimierung
Preis-Logik
Ausschlüsse"]
+ D --> E["🛒 Google Content API
Batch-Upload
Rate-Limiting"]
+
+ style A fill:#079992,stroke:#38ada9,color:#fff
+ style B fill:#079992,stroke:#38ada9,color:#fff
+ style C fill:#0c2461,stroke:#1e3799,color:#fff
+ style D fill:#1e3799,stroke:#4a69bd,color:#fff
+ style E fill:#6a0572,stroke:#a834a8,color:#fff
+```
+
+## 1.2 Datenfluss im Detail
+
+Der Datenfluss gliedert sich in drei klar voneinander getrennte Phasen:
+
+### Phase 1 — Extract (Datenextraktion)
+
+In der Extract-Phase werden die Rohdaten aus zwei unabhängigen Quellen bezogen:
+
+**Quelle A — JTL-WAWI CSV-Export:** Die ERP-Software JTL-WAWI exportiert den vollständigen Produktkatalog als CSV-Datei. Diese Datei enthält alle Stammdaten wie Artikelnummer, Titel, Beschreibung, Preis, EAN, Bestand, Kategorie, Bilder-URLs und weitere produktspezifische Attribute. Der Export wird automatisch durch einen JTL-Worker oder manuell über den JTL-Ameise-Export auf dem Server abgelegt.
+
+**Quelle B — Google Ads/Analytics Reporting API:** Über die Google API werden die Verkaufsperformance-Daten der letzten 90 Tage abgerufen. Dazu gehören Metriken wie Verkaufsanzahl pro Artikel (`sales_quantity`), Umsatz pro Artikel (`revenue`), Klicks, Impressionen und Conversion-Rate. Diese Daten dienen als Grundlage für die regelbasierte Klassifizierung (z. B. „Bestseller"-Labels).
+
+Beide Datenquellen werden als separate Tabellen in **DuckDB** importiert.
+
+### Phase 2 — Transform (Datentransformation)
+
+In der Transform-Phase werden die Daten verknüpft und angereichert:
+
+Zunächst wird in **DuckDB** ein `LEFT JOIN` zwischen der Produktstammdaten-Tabelle und der Verkaufsdaten-Tabelle durchgeführt. Die Verknüpfung erfolgt über eine gemeinsame Artikelnummer (z. B. `sku` oder `offer_id`). Das Ergebnis ist eine konsolidierte Tabelle, die sowohl Stamm- als auch Performance-Daten enthält.
+
+Der resultierende DataFrame wird anschließend in **Polars** geladen. Hier werden regelbasierte Transformationen auf Grundlage der Datei `rules.yaml` angewandt. Typische Transformationen umfassen:
+
+- Zuweisung von Custom Labels (`custom_label_0` bis `custom_label_4`) basierend auf Verkaufszahlen, Marge oder Kategorie.
+- Preisanpassungen und Sale-Preis-Logik.
+- Titeloptimierung (z. B. Hinzufügen von Marke oder Farbe).
+- Filterung von Artikeln, die nicht auf Google Shopping erscheinen sollen (z. B. Bestand = 0).
+- Anreicherung mit berechneten Feldern (z. B. `margin_percent`, `performance_tier`).
+
+### Phase 3 — Load (Daten-Upload)
+
+Die transformierten Produktdaten werden über die **Google Content API for Shopping** als Batch-Request an das Google Merchant Center übertragen. Der Upload-Prozess berücksichtigt dabei die API-Quotas und implementiert ein Rate-Limiting mit exponentiellem Backoff, um `429 Too Many Requests`-Fehler zu vermeiden. Fehlerhafte Einzelprodukte werden in einer separaten Fehler-Log-Datei protokolliert und blockieren nicht den gesamten Upload.
+
+---
+
+# 2. Voraussetzungen
+
+## 2.1 Hardware-Anforderungen
+
+Die folgenden Spezifikationen gelten für die zuverlässige Verarbeitung von **200.000+ Artikeln** mit jeweils **30+ Attributen**. Die Werte sind konservativ kalkuliert und berücksichtigen die parallele Ausführung von n8n (Docker) und der Python-Pipeline.
+
+| Ressource | Minimum | Empfohlen | Begründung |
+|-------------------|----------------------|------------------------|--------------------------------------------------------------|
+| **CPU** | 4 Kerne (x86_64) | 8+ Kerne (x86_64) | Polars nutzt Multi-Threading automatisch aus |
+| **RAM** | 8 GB | 16 GB | DuckDB + Polars halten Daten im Arbeitsspeicher |
+| **Festplatte** | 40 GB SSD | 100 GB NVMe SSD | DuckDB-Dateien + CSV-Exporte + Logs + Docker-Images |
+| **Netzwerk** | 100 Mbit/s | 1 Gbit/s | Google API Batch-Uploads erfordern stabile Bandbreite |
+| **OS** | Debian 11 (Bullseye) | Debian 12 (Bookworm) | Aktuelle LTS-Basis mit langfristigem Sicherheits-Support |
+
+### Speicherberechnung (Orientierungswerte)
+
+Für eine Kalkulation mit 200.000 Zeilen × 30 Spalten (durchschnittlich ~100 Bytes pro Zelle):
+
+| Komponente | Geschätzter Bedarf |
+|--------------------------------------|-------------------------|
+| Rohdaten CSV auf Disk | ca. 550 – 650 MB |
+| DuckDB komprimiert (auf Disk) | ca. 150 – 250 MB |
+| Polars DataFrame im RAM | ca. 800 MB – 1,2 GB |
+| Peak Memory (JOIN + Transformation) | ca. 2,5 – 4 GB |
+| n8n Docker Container | ca. 300 – 500 MB RAM |
+| **Gesamt Peak-RAM** | **ca. 4 – 6 GB** |
+
+> **Empfehlung:** Bei Datensätzen über 500.000 Artikeln sollte auf 32 GB RAM und Streaming-Verarbeitung (Polars Lazy-Mode) umgestellt werden.
+
+## 2.2 Erforderliche Debian-Pakete
+
+Die folgenden Pakete müssen auf dem Debian-Server installiert sein, bevor die Pipeline eingerichtet wird.
+
+```bash
+# ──────────────────────────────────────────────────────────
+# System-Updates durchführen
+# ──────────────────────────────────────────────────────────
+sudo apt update && sudo apt upgrade -y
+
+# ──────────────────────────────────────────────────────────
+# Grundlegende Build-Tools und System-Abhängigkeiten
+# ──────────────────────────────────────────────────────────
+sudo apt install -y \
+ build-essential \
+ python3.11 \
+ python3.11-venv \
+ python3.11-dev \
+ python3-pip \
+ curl \
+ wget \
+ git \
+ jq \
+ unzip \
+ ca-certificates \
+ gnupg \
+ lsb-release \
+ cron \
+ logrotate \
+ htop \
+ tree
+```
+
+> ⚠️ **Hinweis:** Dies sind Beispielbefehle. Falls Python 3.11 nicht in den Standard-Repositories Ihrer Debian-Version verfügbar ist, muss es über das `deadsnakes`-PPA, einen Quellcode-Build oder das Paket `python3` (in Debian 12 standardmäßig Python 3.11) installiert werden. Passen Sie die Versionsnummern an Ihre Umgebung an.
+
+## 2.3 Software-Voraussetzungen
+
+| Software | Mindestversion | Zweck |
+|---------------------------------|----------------|---------------------------------------------|
+| Python | 3.11+ | Hauptlaufzeit für ETL-Skripte |
+| Docker Engine | 24.0+ | Container-Runtime für n8n |
+| Docker Compose Plugin | 2.20+ | Multi-Container-Orchestrierung |
+| n8n | 1.30+ | Workflow-Automatisierung und Scheduling |
+| DuckDB (Python-Modul) | 0.10+ | Analytische In-Process-Datenbank |
+| Polars (Python-Modul) | 0.20+ | Schnelle DataFrame-Transformationen |
+| google-api-python-client | 2.100+ | Google API-Zugriff |
+| google-auth / google-auth-oauthlib | 2.20+ | Authentifizierung für Google APIs |
+| PyYAML | 6.0+ | Regel-Konfiguration im YAML-Format |
+
+---
+
+# 3. Installation & Setup
+
+## 3.1 Docker und n8n einrichten
+
+### 3.1.1 Docker Engine installieren
+
+```bash
+# ──────────────────────────────────────────────────────────
+# Offizielle Docker GPG-Schlüssel hinzufügen
+# ──────────────────────────────────────────────────────────
+sudo install -m 0755 -d /etc/apt/keyrings
+
+curl -fsSL https://download.docker.com/linux/debian/gpg | \
+ sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
+
+sudo chmod a+r /etc/apt/keyrings/docker.gpg
+
+# ──────────────────────────────────────────────────────────
+# Docker-Repository einrichten
+# ──────────────────────────────────────────────────────────
+echo \
+ "deb [arch=$(dpkg --print-architecture) \
+ signed-by=/etc/apt/keyrings/docker.gpg] \
+ https://download.docker.com/linux/debian \
+ $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
+ sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
+
+# ──────────────────────────────────────────────────────────
+# Docker Engine und Compose Plugin installieren
+# ──────────────────────────────────────────────────────────
+sudo apt update
+sudo apt install -y \
+ docker-ce \
+ docker-ce-cli \
+ containerd.io \
+ docker-buildx-plugin \
+ docker-compose-plugin
+
+# ──────────────────────────────────────────────────────────
+# Docker ohne sudo ermöglichen (Relogin erforderlich)
+# ──────────────────────────────────────────────────────────
+sudo usermod -aG docker $USER
+newgrp docker
+
+# ──────────────────────────────────────────────────────────
+# Installation verifizieren
+# ──────────────────────────────────────────────────────────
+docker --version
+docker compose version
+```
+
+> ⚠️ **Hinweis:** Dies sind Beispielbefehle basierend auf der offiziellen Docker-Dokumentation für Debian. Prüfen Sie stets die aktuelle Installationsanleitung unter [docs.docker.com](https://docs.docker.com/engine/install/debian/).
+
+### 3.1.2 n8n per Docker Compose bereitstellen
+
+Erstellen Sie das Verzeichnis und die Konfiguration:
+
+```bash
+# ──────────────────────────────────────────────────────────
+# Verzeichnisstruktur für n8n anlegen
+# ──────────────────────────────────────────────────────────
+sudo mkdir -p /opt/n8n/data
+sudo chown -R $USER:$USER /opt/n8n
+```
+
+> ⚠️ **Hinweis:** Beispielbefehl. Passen Sie den Benutzer und die Berechtigungen an Ihre Server-Konfiguration an.
+
+Erstellen Sie die Datei `/opt/n8n/.env` mit den Zugangsdaten:
+
+```bash
+# ──────────────────────────────────────────────────────────
+# /opt/n8n/.env — Umgebungsvariablen für n8n
+# ──────────────────────────────────────────────────────────
+N8N_BASIC_AUTH_USER=admin
+N8N_BASIC_AUTH_PASSWORD=IhrSicheresPasswortHier123!
+N8N_ENCRYPTION_KEY=ein-langer-zufaelliger-encryption-key-hier
+```
+
+> ⚠️ **Hinweis:** Dies ist eine Beispiel-Konfiguration. Ersetzen Sie alle Platzhalter-Werte durch Ihre eigenen sicheren Passwörter und Schlüssel. Verwenden Sie `openssl rand -hex 32` zur Generierung sicherer Schlüssel.
+
+Erstellen Sie die Datei `/opt/n8n/docker-compose.yml`:
+
+```yaml
+# ──────────────────────────────────────────────────────────
+# /opt/n8n/docker-compose.yml — n8n Workflow-Engine
+# ──────────────────────────────────────────────────────────
+version: "3.8"
+
+services:
+ n8n:
+ image: n8nio/n8n:latest
+ container_name: n8n
+ restart: unless-stopped
+ ports:
+ - "5678:5678"
+ environment:
+ - N8N_BASIC_AUTH_ACTIVE=true
+ - N8N_BASIC_AUTH_USER=${N8N_BASIC_AUTH_USER}
+ - N8N_BASIC_AUTH_PASSWORD=${N8N_BASIC_AUTH_PASSWORD}
+ - N8N_ENCRYPTION_KEY=${N8N_ENCRYPTION_KEY}
+ - GENERIC_TIMEZONE=Europe/Berlin
+ - TZ=Europe/Berlin
+ - N8N_LOG_LEVEL=info
+ - N8N_LOG_OUTPUT=console,file
+ - N8N_LOG_FILE_LOCATION=/home/node/.n8n/logs/n8n.log
+ - N8N_DIAGNOSTICS_ENABLED=false
+ - N8N_HIRING_BANNER_ENABLED=false
+ volumes:
+ # n8n-Daten persistent speichern
+ - /opt/n8n/data:/home/node/.n8n
+ # Zugriff auf ETL-Pipeline-Verzeichnis (Read-Only für Skripte)
+ - /opt/etl-pipeline:/opt/etl-pipeline:ro
+ # Zugriff auf ETL-Logs (Read-Write für Monitoring)
+ - /opt/etl-pipeline/logs:/opt/etl-pipeline/logs:rw
+ healthcheck:
+ test: ["CMD-SHELL", "wget -qO- http://localhost:5678/healthz || exit 1"]
+ interval: 30s
+ timeout: 10s
+ retries: 3
+ start_period: 30s
+ deploy:
+ resources:
+ limits:
+ memory: 1G
+ cpus: "2.0"
+ reservations:
+ memory: 256M
+```
+
+> ⚠️ **Hinweis:** Dies ist eine Beispiel-Konfiguration. Die Volume-Pfade, Ressourcen-Limits und Ports müssen an Ihre Server-Umgebung angepasst werden.
+
+n8n starten und Status prüfen:
+
+```bash
+# ──────────────────────────────────────────────────────────
+# n8n Container starten
+# ──────────────────────────────────────────────────────────
+cd /opt/n8n
+docker compose up -d
+
+# Status prüfen
+docker compose ps
+docker compose logs -f --tail=50
+```
+
+> ⚠️ **Hinweis:** Beispielbefehle. Nach dem Start ist n8n unter `http://:5678` erreichbar.
+
+## 3.2 Python-Umgebung einrichten
+
+### 3.2.1 Virtual Environment erstellen
+
+```bash
+# ──────────────────────────────────────────────────────────
+# Projektverzeichnis anlegen
+# ──────────────────────────────────────────────────────────
+sudo mkdir -p /opt/etl-pipeline
+sudo chown -R $USER:$USER /opt/etl-pipeline
+cd /opt/etl-pipeline
+
+# ──────────────────────────────────────────────────────────
+# Python Virtual Environment erstellen und aktivieren
+# ──────────────────────────────────────────────────────────
+python3.11 -m venv venv
+source venv/bin/activate
+
+# ──────────────────────────────────────────────────────────
+# pip aktualisieren
+# ──────────────────────────────────────────────────────────
+pip install --upgrade pip setuptools wheel
+```
+
+> ⚠️ **Hinweis:** Dies sind Beispielbefehle. Stellen Sie sicher, dass `python3.11` auf Ihrem System korrekt installiert ist (siehe Abschnitt 2.2).
+
+### 3.2.2 Abhängigkeiten installieren
+
+Erstellen Sie die Datei `/opt/etl-pipeline/requirements.txt`:
+
+```txt
+# ──────────────────────────────────────────────────────────
+# /opt/etl-pipeline/requirements.txt
+# ETL-Pipeline Abhängigkeiten
+# ──────────────────────────────────────────────────────────
+
+# DataFrame-Engine für schnelle Transformationen
+polars>=0.20.0,<2.0.0
+
+# Analytische In-Process SQL-Datenbank
+duckdb>=0.10.0,<2.0.0
+
+# Google API Client Libraries
+google-api-python-client>=2.100.0
+google-auth>=2.20.0
+google-auth-oauthlib>=1.2.0
+google-auth-httplib2>=0.2.0
+
+# Konfiguration
+PyYAML>=6.0
+
+# Utilities
+requests>=2.31.0
+python-dateutil>=2.8.0
+tenacity>=8.2.0
+```
+
+> ⚠️ **Hinweis:** Dies ist eine Beispiel-Datei. Prüfen Sie die aktuellen Versionen der Pakete und passen Sie die Versionsbeschränkungen an Ihre Kompatibilitätsanforderungen an.
+
+Abhängigkeiten installieren:
+
+```bash
+# ──────────────────────────────────────────────────────────
+# Abhängigkeiten in das Virtual Environment installieren
+# ──────────────────────────────────────────────────────────
+cd /opt/etl-pipeline
+source venv/bin/activate
+pip install -r requirements.txt
+
+# Installation verifizieren
+python -c "import polars; print(f'Polars: {polars.__version__}')"
+python -c "import duckdb; print(f'DuckDB: {duckdb.__version__}')"
+python -c "import googleapiclient; print('Google API Client: OK')"
+python -c "import yaml; print(f'PyYAML: {yaml.__version__}')"
+```
+
+> ⚠️ **Hinweis:** Beispielbefehle zur Verifikation. Die Ausgaben variieren je nach installierten Versionen.
+
+---
+
+# 4. Projektstruktur
+
+## 4.1 Empfohlene Verzeichnisstruktur
+
+Die folgende Struktur organisiert alle Komponenten der Pipeline in logisch getrennte Verzeichnisse. Diese Trennung erleichtert Wartung, Backups und Zugriffssteuerung.
+
+```
+/opt/etl-pipeline/
+│
+├── venv/ # Python Virtual Environment
+│ └── ... # (von venv automatisch verwaltet)
+│
+├── requirements.txt # Python-Abhängigkeiten
+│
+├── scripts/ # Alle Python-Skripte
+│ ├── __init__.py
+│ ├── main.py # Hauptskript (Entry Point)
+│ ├── extract.py # Modul: Datenextraktion (CSV + Google API)
+│ ├── transform.py # Modul: DuckDB JOIN + Polars Transformationen
+│ ├── load.py # Modul: Google Content API Upload
+│ ├── rules_engine.py # Modul: YAML-Regel-Engine
+│ └── utils.py # Hilfsfunktionen (Logging, Config-Laden)
+│
+├── config/ # Konfigurationsdateien
+│ ├── rules.yaml # Channable-ähnliche Transformationsregeln
+│ ├── settings.yaml # Allgemeine Pipeline-Einstellungen
+│ └── column_mapping.yaml # Spalten-Mapping JTL → Google Shopping
+│
+├── credentials/ # Sensible Zugangsdaten (restriktive Rechte!)
+│ ├── google_service_account.json # Google Service Account Key
+│ └── .gitignore # NIEMALS in Git committen!
+│
+├── data/ # Datendateien
+│ ├── input/ # Eingangsdaten
+│ │ └── jtl_export.csv # Aktueller JTL-WAWI CSV-Export
+│ ├── output/ # Verarbeitete Ausgabedaten
+│ │ └── google_feed_YYYYMMDD.csv
+│ ├── archive/ # Archivierte ältere Exporte
+│ │ └── jtl_export_20260301.csv
+│ └── db/ # DuckDB-Datenbankdateien
+│ └── products.duckdb # Persistente DuckDB-Datenbank
+│
+├── logs/ # Log-Dateien
+│ ├── etl_pipeline.log # Haupt-Log (rotating)
+│ ├── etl_pipeline.log.1 # Rotiertes Log
+│ ├── error.log # Nur Fehler-Einträge
+│ └── upload_errors/ # Detaillierte Upload-Fehler pro Lauf
+│ └── errors_20260306.json
+│
+├── tests/ # Unit- und Integrationstests
+│ ├── __init__.py
+│ ├── test_extract.py
+│ ├── test_transform.py
+│ └── test_rules_engine.py
+│
+└── docs/ # Zusätzliche Projektdokumentation
+ └── CHANGELOG.md
+```
+
+> ⚠️ **Hinweis:** Dies ist eine Beispiel-Struktur. Passen Sie die Verzeichnisstruktur an die spezifischen Anforderungen Ihres Projekts und Ihrer Organisation an.
+
+## 4.2 Verzeichnisstruktur anlegen
+
+```bash
+# ──────────────────────────────────────────────────────────
+# Vollständige Verzeichnisstruktur anlegen
+# ──────────────────────────────────────────────────────────
+cd /opt/etl-pipeline
+
+mkdir -p scripts
+mkdir -p config
+mkdir -p credentials
+mkdir -p data/{input,output,archive,db}
+mkdir -p logs/upload_errors
+mkdir -p tests
+mkdir -p docs
+
+# ──────────────────────────────────────────────────────────
+# Berechtigungen für sensible Verzeichnisse setzen
+# ──────────────────────────────────────────────────────────
+chmod 700 credentials/
+chmod 750 config/
+chmod 755 logs/
+
+# ──────────────────────────────────────────────────────────
+# .gitignore für credentials anlegen
+# ──────────────────────────────────────────────────────────
+echo "*" > credentials/.gitignore
+echo "!.gitignore" >> credentials/.gitignore
+
+# ──────────────────────────────────────────────────────────
+# Python-Modul-Initialisierung
+# ──────────────────────────────────────────────────────────
+touch scripts/__init__.py
+touch tests/__init__.py
+```
+
+> ⚠️ **Hinweis:** Beispielbefehle. Stellen Sie sicher, dass der Benutzer, unter dem die Pipeline läuft, die entsprechenden Lese- und Schreibrechte auf allen Verzeichnissen besitzt.
+
+---
+
+# 5. Konfiguration
+
+## 5.1 Google API Credentials sicher speichern
+
+### 5.1.1 Service Account erstellen
+
+Für die Kommunikation mit der Google Content API for Shopping wird ein **Service Account** empfohlen. Dieser ermöglicht eine automatisierte Authentifizierung ohne manuellen OAuth2-Flow.
+
+**Schritte in der Google Cloud Console:**
+
+1. Navigieren Sie zur [Google Cloud Console](https://console.cloud.google.com/).
+2. Erstellen Sie ein neues Projekt oder wählen Sie ein bestehendes aus.
+3. Aktivieren Sie die folgenden APIs:
+ - *Content API for Shopping*
+ - *Google Ads API* (falls Verkaufsdaten direkt aus Google Ads bezogen werden)
+ - *Google Analytics Data API* (falls GA4-Daten verwendet werden)
+4. Navigieren Sie zu **IAM & Verwaltung → Dienstkonten**.
+5. Erstellen Sie einen neuen Service Account mit einem aussagekräftigen Namen.
+6. Laden Sie den JSON-Key herunter.
+7. Fügen Sie den Service Account als Nutzer in Ihrem **Google Merchant Center** hinzu (mit der Rolle „Standard" oder „Admin").
+
+### 5.1.2 Credentials sicher ablegen
+
+```bash
+# ──────────────────────────────────────────────────────────
+# Service Account JSON-Key sicher auf dem Server ablegen
+# ──────────────────────────────────────────────────────────
+
+# Datei in das credentials-Verzeichnis kopieren
+cp /pfad/zur/heruntergeladenen/datei.json \
+ /opt/etl-pipeline/credentials/google_service_account.json
+
+# Restriktive Berechtigungen setzen (nur Owner lesen)
+chmod 600 /opt/etl-pipeline/credentials/google_service_account.json
+
+# Eigentümerschaft prüfen
+ls -la /opt/etl-pipeline/credentials/
+```
+
+> ⚠️ **Hinweis:** Beispielbefehle. Ersetzen Sie den Pfad durch den tatsächlichen Speicherort Ihrer heruntergeladenen JSON-Datei.
+
+### 5.1.3 Credentials als Umgebungsvariable referenzieren (empfohlen)
+
+Es ist empfehlenswert, den Pfad zur Credentials-Datei nicht hart im Skript zu kodieren, sondern über eine Umgebungsvariable bereitzustellen.
+
+```bash
+# ──────────────────────────────────────────────────────────
+# In /opt/etl-pipeline/.env oder ~/.bashrc hinzufügen:
+# ──────────────────────────────────────────────────────────
+export GOOGLE_APPLICATION_CREDENTIALS="/opt/etl-pipeline/credentials/google_service_account.json"
+export MERCHANT_CENTER_ID="123456789"
+```
+
+> ⚠️ **Hinweis:** Beispielkonfiguration. Ersetzen Sie die `MERCHANT_CENTER_ID` durch Ihre tatsächliche Merchant-Center-ID.
+
+### 5.1.4 Sicherheitshinweise
+
+- Die JSON-Datei darf **niemals** in ein Git-Repository committed werden.
+- Der Zugriff auf das `credentials/`-Verzeichnis sollte auf den Pipeline-Benutzer beschränkt sein (`chmod 700`).
+- Rotieren Sie Service Account Keys regelmäßig (mindestens alle 90 Tage).
+- Verwenden Sie nach Möglichkeit **Workload Identity Federation** anstelle von heruntergeladenen JSON-Keys.
+- Protokollieren Sie niemals den Inhalt der Credentials-Datei in Log-Dateien.
+
+## 5.2 Regel-Konfiguration (rules.yaml)
+
+Die Datei `rules.yaml` ist das Herzstück der Channable-ähnlichen Funktionalität. Sie definiert regelbasierte Transformationen, die auf jeden Artikel angewandt werden. Die Regeln werden in der definierten Reihenfolge sequenziell abgearbeitet.
+
+### 5.2.1 Struktur und Syntax
+
+Erstellen Sie die Datei `/opt/etl-pipeline/config/rules.yaml`:
+
+```yaml
+# ══════════════════════════════════════════════════════════
+# /opt/etl-pipeline/config/rules.yaml
+#
+# Channable-ähnliche Regel-Engine für Google Shopping
+# Regeln werden sequenziell von oben nach unten abgearbeitet.
+#
+# Unterstützte Operatoren:
+# Vergleich: >, <, >=, <=, ==, !=
+# Text: contains, not_contains, starts_with, ends_with
+# Logik: and, or (innerhalb von conditions)
+# Existenz: is_empty, is_not_empty
+#
+# Unterstützte Aktionen:
+# set_value: Festes Wert setzen
+# copy_field: Wert aus anderem Feld kopieren
+# prepend: Text am Anfang hinzufügen
+# append: Text am Ende hinzufügen
+# replace: Text ersetzen
+# calculate: Berechnung durchführen
+# exclude: Artikel vom Feed ausschließen
+# map_category: Kategorie-Mapping anwenden
+# ══════════════════════════════════════════════════════════
+
+# ──────────────────────────────────────────────────────────
+# Globale Einstellungen
+# ──────────────────────────────────────────────────────────
+settings:
+ feed_name: "Google Shopping DE"
+ target_country: "DE"
+ content_language: "de"
+ currency: "EUR"
+ # Artikel mit Bestand 0 automatisch ausschließen
+ exclude_out_of_stock: true
+ # Mindestpreis für den Feed (in EUR)
+ minimum_price: 1.00
+
+# ──────────────────────────────────────────────────────────
+# Regel-Definitionen
+# ──────────────────────────────────────────────────────────
+rules:
+
+ # ────────────────────────────────────────────────────────
+ # REGEL 1: Bestseller-Label basierend auf Verkaufszahlen
+ # ────────────────────────────────────────────────────────
+ - name: "Bestseller-Klassifizierung"
+ description: "Artikel mit mehr als 100 Verkäufen in 90 Tagen als Bestseller markieren"
+ enabled: true
+ priority: 10
+ conditions:
+ logic: "and"
+ rules:
+ - field: "sales_quantity_90d"
+ operator: ">"
+ value: 100
+ - field: "stock_quantity"
+ operator: ">"
+ value: 0
+ actions:
+ - target_field: "custom_label_0"
+ action: "set_value"
+ value: "Bestseller"
+
+ # ────────────────────────────────────────────────────────
+ # REGEL 2: Slow Mover identifizieren
+ # ────────────────────────────────────────────────────────
+ - name: "Slow-Mover-Klassifizierung"
+ description: "Artikel mit weniger als 5 Verkäufen in 90 Tagen als Slow Mover markieren"
+ enabled: true
+ priority: 20
+ conditions:
+ logic: "and"
+ rules:
+ - field: "sales_quantity_90d"
+ operator: "<"
+ value: 5
+ - field: "days_in_catalog"
+ operator: ">"
+ value: 30
+ actions:
+ - target_field: "custom_label_0"
+ action: "set_value"
+ value: "Slow Mover"
+
+ # ────────────────────────────────────────────────────────
+ # REGEL 3: Performance-Tier (custom_label_1)
+ # ────────────────────────────────────────────────────────
+ - name: "Performance-Tier High"
+ description: "Umsatzbasierte Tier-Einteilung für Bidding-Strategien"
+ enabled: true
+ priority: 30
+ conditions:
+ logic: "and"
+ rules:
+ - field: "revenue_90d"
+ operator: ">="
+ value: 1000
+ actions:
+ - target_field: "custom_label_1"
+ action: "set_value"
+ value: "Tier-1-High-Revenue"
+
+ - name: "Performance-Tier Mittel"
+ enabled: true
+ priority: 31
+ conditions:
+ logic: "and"
+ rules:
+ - field: "revenue_90d"
+ operator: ">="
+ value: 200
+ - field: "revenue_90d"
+ operator: "<"
+ value: 1000
+ actions:
+ - target_field: "custom_label_1"
+ action: "set_value"
+ value: "Tier-2-Mid-Revenue"
+
+ - name: "Performance-Tier Niedrig"
+ enabled: true
+ priority: 32
+ conditions:
+ logic: "and"
+ rules:
+ - field: "revenue_90d"
+ operator: "<"
+ value: 200
+ actions:
+ - target_field: "custom_label_1"
+ action: "set_value"
+ value: "Tier-3-Low-Revenue"
+
+ # ────────────────────────────────────────────────────────
+ # REGEL 4: Margen-Klassifizierung (custom_label_2)
+ # ────────────────────────────────────────────────────────
+ - name: "Hohe Marge"
+ enabled: true
+ priority: 40
+ conditions:
+ logic: "and"
+ rules:
+ - field: "margin_percent"
+ operator: ">="
+ value: 40
+ actions:
+ - target_field: "custom_label_2"
+ action: "set_value"
+ value: "High-Margin"
+
+ - name: "Niedrige Marge"
+ enabled: true
+ priority: 41
+ conditions:
+ logic: "and"
+ rules:
+ - field: "margin_percent"
+ operator: "<"
+ value: 15
+ actions:
+ - target_field: "custom_label_2"
+ action: "set_value"
+ value: "Low-Margin"
+
+ # ────────────────────────────────────────────────────────
+ # REGEL 5: Preiskategorie (custom_label_3)
+ # ────────────────────────────────────────────────────────
+ - name: "Premium-Preis"
+ enabled: true
+ priority: 50
+ conditions:
+ logic: "and"
+ rules:
+ - field: "price"
+ operator: ">="
+ value: 100
+ actions:
+ - target_field: "custom_label_3"
+ action: "set_value"
+ value: "Premium"
+
+ - name: "Budget-Preis"
+ enabled: true
+ priority: 51
+ conditions:
+ logic: "and"
+ rules:
+ - field: "price"
+ operator: "<"
+ value: 20
+ actions:
+ - target_field: "custom_label_3"
+ action: "set_value"
+ value: "Budget"
+
+ # ────────────────────────────────────────────────────────
+ # REGEL 6: Titel-Optimierung
+ # ────────────────────────────────────────────────────────
+ - name: "Marke im Titel voranstellen"
+ description: "Markenname am Anfang des Titels hinzufügen, falls nicht vorhanden"
+ enabled: true
+ priority: 60
+ conditions:
+ logic: "and"
+ rules:
+ - field: "brand"
+ operator: "is_not_empty"
+ - field: "title"
+ operator: "not_contains"
+ value_from_field: "brand"
+ actions:
+ - target_field: "title"
+ action: "prepend"
+ value_template: "{brand} - "
+
+ # ────────────────────────────────────────────────────────
+ # REGEL 7: Sale-Preis-Logik
+ # ────────────────────────────────────────────────────────
+ - name: "Sale-Preis setzen"
+ description: "Wenn ein UVP vorhanden und höher als der VK-Preis ist, als Sale kennzeichnen"
+ enabled: true
+ priority: 70
+ conditions:
+ logic: "and"
+ rules:
+ - field: "uvp"
+ operator: "is_not_empty"
+ - field: "uvp"
+ operator: ">"
+ value_from_field: "price"
+ actions:
+ - target_field: "sale_price"
+ action: "copy_field"
+ source_field: "price"
+ - target_field: "price"
+ action: "copy_field"
+ source_field: "uvp"
+
+ # ────────────────────────────────────────────────────────
+ # REGEL 8: Ausschluss-Regeln
+ # ────────────────────────────────────────────────────────
+ - name: "Artikel ohne Bild ausschließen"
+ enabled: true
+ priority: 80
+ conditions:
+ logic: "or"
+ rules:
+ - field: "image_url"
+ operator: "is_empty"
+ - field: "image_url"
+ operator: "=="
+ value: ""
+ actions:
+ - action: "exclude"
+ reason: "Kein Produktbild vorhanden"
+
+ - name: "Testprodukte ausschließen"
+ enabled: true
+ priority: 81
+ conditions:
+ logic: "or"
+ rules:
+ - field: "title"
+ operator: "contains"
+ value: "TEST"
+ - field: "sku"
+ operator: "starts_with"
+ value: "ZZ-"
+ actions:
+ - action: "exclude"
+ reason: "Testprodukt erkannt"
+
+ # ────────────────────────────────────────────────────────
+ # REGEL 9: Versandkosten basierend auf Preis
+ # ────────────────────────────────────────────────────────
+ - name: "Kostenloser Versand ab 49 EUR"
+ enabled: true
+ priority: 90
+ conditions:
+ logic: "and"
+ rules:
+ - field: "price"
+ operator: ">="
+ value: 49
+ actions:
+ - target_field: "shipping_cost"
+ action: "set_value"
+ value: "0.00"
+
+ - name: "Standard-Versandkosten"
+ enabled: true
+ priority: 91
+ conditions:
+ logic: "and"
+ rules:
+ - field: "price"
+ operator: "<"
+ value: 49
+ actions:
+ - target_field: "shipping_cost"
+ action: "set_value"
+ value: "4.95"
+```
+
+> ⚠️ **Hinweis:** Dies ist eine Beispiel-Konfigurationsdatei. Alle Schwellenwerte, Feldnamen und Regeln müssen an Ihre tatsächlichen Geschäftsanforderungen, JTL-Exportfelder und Google-Shopping-Attribute angepasst werden.
+
+### 5.2.2 Spalten-Mapping (column_mapping.yaml)
+
+Erstellen Sie zusätzlich die Datei `/opt/etl-pipeline/config/column_mapping.yaml`:
+
+```yaml
+# ══════════════════════════════════════════════════════════
+# /opt/etl-pipeline/config/column_mapping.yaml
+#
+# Mapping: JTL-WAWI Exportfeld → Google Shopping Attribut
+# ══════════════════════════════════════════════════════════
+
+mapping:
+ # Pflichtfelder Google Shopping
+ Artikelnummer: "offerId"
+ Artikelname: "title"
+ Beschreibung: "description"
+ URL: "link"
+ BildURL: "imageLink"
+ VK_Brutto: "price"
+ Waehrung: "priceCurrency"
+ Verfuegbarkeit: "availability"
+ Marke: "brand"
+ GTIN: "gtin"
+ MPN: "mpn"
+ Zustand: "condition"
+ Kategorie_Google: "googleProductCategory"
+
+ # Optionale Felder
+ Farbe: "color"
+ Groesse: "size"
+ Material: "material"
+ Geschlecht: "gender"
+ Altersgruppe: "ageGroup"
+ EK_Netto: "cost_of_goods_sold"
+ UVP: "uvp"
+ Lagerbestand: "stock_quantity"
+
+ # Berechnete Felder (werden durch Pipeline erzeugt)
+ # sales_quantity_90d → aus Google API
+ # revenue_90d → aus Google API
+ # margin_percent → berechnet aus VK und EK
+ # custom_label_0-4 → aus rules.yaml
+```
+
+> ⚠️ **Hinweis:** Dies ist eine Beispiel-Zuordnung. Die tatsächlichen Spaltennamen in Ihrem JTL-WAWI-Export können abweichen. Prüfen Sie die Header-Zeile Ihrer CSV-Datei und passen Sie das Mapping entsprechend an.
+
+---
+
+# 6. Core Script Logic (Beispiel)
+
+Dieses Kapitel zeigt den konzeptionellen Aufbau der Python-Skripte. Der Code ist als **funktionales Boilerplate** zu verstehen, das als Grundlage für die eigene Implementierung dient.
+
+## 6.1 Hilfsfunktionen (utils.py)
+
+```python
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+/opt/etl-pipeline/scripts/utils.py
+
+Hilfsfunktionen für Logging, Konfiguration und gemeinsam genutzte Utilities.
+"""
+
+import os
+import sys
+import yaml
+import logging
+from pathlib import Path
+from datetime import datetime
+from logging.handlers import RotatingFileHandler
+
+
+# ──────────────────────────────────────────────────────────
+# Pfad-Konstanten
+# ──────────────────────────────────────────────────────────
+BASE_DIR = Path("/opt/etl-pipeline")
+CONFIG_DIR = BASE_DIR / "config"
+DATA_DIR = BASE_DIR / "data"
+LOG_DIR = BASE_DIR / "logs"
+CREDENTIALS_DIR = BASE_DIR / "credentials"
+
+
+def setup_logging(
+ log_name: str = "etl_pipeline",
+ log_level: int = logging.INFO,
+ max_bytes: int = 10 * 1024 * 1024, # 10 MB
+ backup_count: int = 5,
+) -> logging.Logger:
+ """
+ Konfiguriert das Logging mit Rotation und separatem Error-Log.
+
+ Args:
+ log_name: Name des Loggers und der Log-Datei.
+ log_level: Logging-Level (default: INFO).
+ max_bytes: Maximale Größe einer Log-Datei vor Rotation.
+ backup_count: Anzahl der aufzubewahrenden rotierten Log-Dateien.
+
+ Returns:
+ Konfigurierter Logger.
+ """
+ LOG_DIR.mkdir(parents=True, exist_ok=True)
+
+ logger = logging.getLogger(log_name)
+ logger.setLevel(log_level)
+
+ # Verhindere doppelte Handler bei mehrfachem Aufruf
+ if logger.handlers:
+ return logger
+
+ # Formatter
+ formatter = logging.Formatter(
+ fmt="%(asctime)s | %(levelname)-8s | %(name)s | %(funcName)s:%(lineno)d | %(message)s",
+ datefmt="%Y-%m-%d %H:%M:%S",
+ )
+
+ # Haupt-Log (Rotation)
+ main_handler = RotatingFileHandler(
+ LOG_DIR / f"{log_name}.log",
+ maxBytes=max_bytes,
+ backupCount=backup_count,
+ encoding="utf-8",
+ )
+ main_handler.setLevel(log_level)
+ main_handler.setFormatter(formatter)
+
+ # Error-Log (nur ERROR und höher)
+ error_handler = RotatingFileHandler(
+ LOG_DIR / "error.log",
+ maxBytes=max_bytes,
+ backupCount=backup_count,
+ encoding="utf-8",
+ )
+ error_handler.setLevel(logging.ERROR)
+ error_handler.setFormatter(formatter)
+
+ # Console-Handler
+ console_handler = logging.StreamHandler(sys.stdout)
+ console_handler.setLevel(log_level)
+ console_handler.setFormatter(formatter)
+
+ logger.addHandler(main_handler)
+ logger.addHandler(error_handler)
+ logger.addHandler(console_handler)
+
+ return logger
+
+
+def load_yaml_config(config_filename: str) -> dict:
+ """
+ Lädt eine YAML-Konfigurationsdatei aus dem config-Verzeichnis.
+
+ Args:
+ config_filename: Name der YAML-Datei (z. B. 'rules.yaml').
+
+ Returns:
+ Dictionary mit dem Inhalt der YAML-Datei.
+
+ Raises:
+ FileNotFoundError: Wenn die Datei nicht existiert.
+ yaml.YAMLError: Wenn die YAML-Syntax ungültig ist.
+ """
+ config_path = CONFIG_DIR / config_filename
+ if not config_path.exists():
+ raise FileNotFoundError(
+ f"Konfigurationsdatei nicht gefunden: {config_path}"
+ )
+
+ with open(config_path, "r", encoding="utf-8") as f:
+ config = yaml.safe_load(f)
+
+ return config
+
+
+def get_timestamp() -> str:
+ """Gibt einen formatierten Zeitstempel zurück (YYYYMMDD_HHMMSS)."""
+ return datetime.now().strftime("%Y%m%d_%H%M%S")
+
+
+def get_date_stamp() -> str:
+ """Gibt einen formatierten Datumsstempel zurück (YYYYMMDD)."""
+ return datetime.now().strftime("%Y%m%d")
+```
+
+> ⚠️ **Hinweis:** Dies ist ein Beispiel-Skript. Alle Pfade, Log-Konfigurationen und Funktionen müssen an Ihre tatsächliche Serverumgebung und Anforderungen angepasst werden.
+
+## 6.2 Extract-Modul (extract.py)
+
+```python
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+/opt/etl-pipeline/scripts/extract.py
+
+Modul für die Datenextraktion aus JTL-WAWI CSV und Google APIs.
+Beide Datenquellen werden als Tabellen in DuckDB importiert.
+"""
+
+import os
+import shutil
+import duckdb
+import polars as pl
+from pathlib import Path
+from datetime import datetime, timedelta
+
+from google.oauth2 import service_account
+from googleapiclient.discovery import build
+
+from scripts.utils import (
+ setup_logging,
+ BASE_DIR,
+ DATA_DIR,
+ CREDENTIALS_DIR,
+ get_date_stamp,
+)
+
+logger = setup_logging("extract")
+
+
+# ══════════════════════════════════════════════════════════
+# JTL-WAWI CSV-Import
+# ══════════════════════════════════════════════════════════
+
+def load_jtl_csv_to_duckdb(
+ con: duckdb.DuckDBPyConnection,
+ csv_path: str | Path | None = None,
+ table_name: str = "products_raw",
+) -> int:
+ """
+ Liest die JTL-WAWI CSV-Exportdatei und importiert sie als Tabelle
+ in die DuckDB-Datenbank.
+
+ Args:
+ con: Aktive DuckDB-Verbindung.
+ csv_path: Pfad zur CSV-Datei. Falls None, wird der Standardpfad verwendet.
+ table_name: Name der Zieltabelle in DuckDB.
+
+ Returns:
+ Anzahl der importierten Zeilen.
+ """
+ if csv_path is None:
+ csv_path = DATA_DIR / "input" / "jtl_export.csv"
+
+ csv_path = Path(csv_path)
+ if not csv_path.exists():
+ raise FileNotFoundError(f"JTL-CSV nicht gefunden: {csv_path}")
+
+ file_size_mb = csv_path.stat().st_size / (1024 * 1024)
+ logger.info(f"Lade JTL-CSV: {csv_path} ({file_size_mb:.1f} MB)")
+
+ # Bestehende Tabelle löschen und neu erstellen
+ con.execute(f"DROP TABLE IF EXISTS {table_name}")
+
+ # CSV mit DuckDB's optimiertem CSV-Reader importieren
+ # auto_detect=true erkennt Trennzeichen, Encoding und Datentypen
+ con.execute(f"""
+ CREATE TABLE {table_name} AS
+ SELECT *
+ FROM read_csv_auto(
+ '{csv_path}',
+ header = true,
+ delim = ';',
+ quote = '"',
+ escape = '"',
+ encoding = 'utf-8',
+ sample_size = 10000,
+ ignore_errors = false
+ )
+ """)
+
+ row_count = con.execute(
+ f"SELECT COUNT(*) FROM {table_name}"
+ ).fetchone()[0]
+
+ logger.info(
+ f"JTL-CSV erfolgreich geladen: "
+ f"{row_count:,} Zeilen in Tabelle '{table_name}'"
+ )
+
+ # Archivierung des Imports
+ archive_path = DATA_DIR / "archive" / f"jtl_export_{get_date_stamp()}.csv"
+ if not archive_path.exists():
+ shutil.copy2(csv_path, archive_path)
+ logger.info(f"CSV archiviert: {archive_path}")
+
+ return row_count
+
+
+# ══════════════════════════════════════════════════════════
+# Google Sales-Daten abrufen
+# ══════════════════════════════════════════════════════════
+
+def fetch_google_sales_data(
+ con: duckdb.DuckDBPyConnection,
+ table_name: str = "sales_90d",
+ lookback_days: int = 90,
+) -> int:
+ """
+ Ruft Verkaufsdaten der letzten N Tage über die Google API ab
+ und importiert sie als Tabelle in DuckDB.
+
+ Args:
+ con: Aktive DuckDB-Verbindung.
+ table_name: Name der Zieltabelle in DuckDB.
+ lookback_days: Anzahl der zurückliegenden Tage (Standard: 90).
+
+ Returns:
+ Anzahl der importierten Zeilen.
+ """
+ logger.info(
+ f"Rufe Google Sales-Daten ab (letzte {lookback_days} Tage)..."
+ )
+
+ # ── Google API Authentifizierung ──────────────────────
+ credentials_path = os.environ.get(
+ "GOOGLE_APPLICATION_CREDENTIALS",
+ str(CREDENTIALS_DIR / "google_service_account.json"),
+ )
+ merchant_id = os.environ.get("MERCHANT_CENTER_ID")
+
+ if not merchant_id:
+ raise ValueError(
+ "Umgebungsvariable MERCHANT_CENTER_ID ist nicht gesetzt"
+ )
+
+ credentials = service_account.Credentials.from_service_account_file(
+ credentials_path,
+ scopes=["https://www.googleapis.com/auth/content"],
+ )
+
+ service = build("content", "v2.1", credentials=credentials)
+
+ # ── Zeitraum berechnen ────────────────────────────────
+ end_date = datetime.now()
+ start_date = end_date - timedelta(days=lookback_days)
+
+ # ── Performance-Daten aus dem Merchant Center abrufen ─
+ # HINWEIS: Der tatsächliche API-Aufruf hängt von der
+ # verwendeten Google API ab (Merchant Center Reports,
+ # Google Ads API, GA4 Data API, etc.)
+ # Hier wird ein konzeptionelles Beispiel gezeigt.
+
+ sales_records = []
+
+ try:
+ # Beispiel: Merchant Center Performance Report
+ request_body = {
+ "query": f"""
+ SELECT
+ segments.offer_id,
+ metrics.impressions,
+ metrics.clicks,
+ metrics.conversions,
+ metrics.conversion_value
+ FROM MerchantPerformanceView
+ WHERE segments.date BETWEEN
+ '{start_date.strftime('%Y-%m-%d')}'
+ AND '{end_date.strftime('%Y-%m-%d')}'
+ """
+ }
+
+ response = (
+ service.reports()
+ .search(merchantId=merchant_id, body=request_body)
+ .execute()
+ )
+
+ for row in response.get("results", []):
+ segments = row.get("segments", {})
+ metrics = row.get("metrics", {})
+ sales_records.append({
+ "offer_id": segments.get("offerId", ""),
+ "impressions": int(metrics.get("impressions", 0)),
+ "clicks": int(metrics.get("clicks", 0)),
+ "sales_quantity_90d": int(
+ metrics.get("conversions", 0)
+ ),
+ "revenue_90d": float(
+ metrics.get("conversionValue", 0.0)
+ ),
+ })
+
+ except Exception as e:
+ logger.error(
+ f"Fehler beim Abrufen der Google Sales-Daten: {e}"
+ )
+ raise
+
+ # ── Daten in DuckDB importieren ───────────────────────
+ if sales_records:
+ df_sales = pl.DataFrame(sales_records)
+ con.execute(f"DROP TABLE IF EXISTS {table_name}")
+ con.execute(
+ f"CREATE TABLE {table_name} AS SELECT * FROM df_sales"
+ )
+ row_count = con.execute(
+ f"SELECT COUNT(*) FROM {table_name}"
+ ).fetchone()[0]
+ logger.info(
+ f"Google Sales-Daten geladen: "
+ f"{row_count:,} Zeilen in '{table_name}'"
+ )
+ return row_count
+ else:
+ logger.warning(
+ "Keine Sales-Daten von der Google API erhalten."
+ )
+ # Leere Tabelle erstellen, damit der JOIN nicht fehlschlägt
+ con.execute(f"DROP TABLE IF EXISTS {table_name}")
+ con.execute(f"""
+ CREATE TABLE {table_name} (
+ offer_id VARCHAR,
+ impressions INTEGER DEFAULT 0,
+ clicks INTEGER DEFAULT 0,
+ sales_quantity_90d INTEGER DEFAULT 0,
+ revenue_90d DOUBLE DEFAULT 0.0
+ )
+ """)
+ return 0
+```
+
+> ⚠️ **Hinweis:** Dies ist ein konzeptionelles Beispiel-Skript. Die Google API-Aufrufe, insbesondere der Report-Query und die Antwortstruktur, müssen an die tatsächlich verwendete API-Version und Ihr Merchant-Center-Setup angepasst werden. Konsultieren Sie die offizielle Google API-Dokumentation für die aktuellen Endpunkte und Antwortformate.
+
+## 6.3 Transform-Modul (transform.py)
+
+```python
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+/opt/etl-pipeline/scripts/transform.py
+
+Modul für die Datentransformation:
+1. LEFT JOIN in DuckDB (Produkte ⟕ Sales)
+2. Polars-Transformationen basierend auf rules.yaml
+"""
+
+import duckdb
+import polars as pl
+from typing import Any
+
+from scripts.utils import setup_logging, load_yaml_config
+
+logger = setup_logging("transform")
+
+
+# ══════════════════════════════════════════════════════════
+# DuckDB LEFT JOIN
+# ══════════════════════════════════════════════════════════
+
+def join_products_with_sales(
+ con: duckdb.DuckDBPyConnection,
+ products_table: str = "products_raw",
+ sales_table: str = "sales_90d",
+ joined_table: str = "products_enriched",
+ join_key_products: str = "Artikelnummer",
+ join_key_sales: str = "offer_id",
+) -> pl.DataFrame:
+ """
+ Führt einen LEFT JOIN zwischen Produktstammdaten und
+ Verkaufsdaten in DuckDB durch und gibt das Ergebnis als
+ Polars DataFrame zurück.
+
+ Ein LEFT JOIN stellt sicher, dass alle Produkte erhalten
+ bleiben, auch wenn keine Verkaufsdaten vorliegen
+ (NULL-Werte werden mit 0 gefüllt).
+
+ Args:
+ con: Aktive DuckDB-Verbindung.
+ products_table: Name der Produkttabelle.
+ sales_table: Name der Verkaufstabelle.
+ joined_table: Name der Ergebnistabelle.
+ join_key_products: JOIN-Schlüssel in der Produkttabelle.
+ join_key_sales: JOIN-Schlüssel in der Verkaufstabelle.
+
+ Returns:
+ Polars DataFrame mit den verknüpften Daten.
+ """
+ logger.info(
+ f"Führe LEFT JOIN durch: {products_table} ⟕ {sales_table} "
+ f"ON {join_key_products} = {join_key_sales}"
+ )
+
+ # ── LEFT JOIN mit COALESCE für NULL-Behandlung ────────
+ con.execute(f"DROP TABLE IF EXISTS {joined_table}")
+ con.execute(f"""
+ CREATE TABLE {joined_table} AS
+ SELECT
+ p.*,
+ COALESCE(s.impressions, 0)
+ AS impressions,
+ COALESCE(s.clicks, 0)
+ AS clicks,
+ COALESCE(s.sales_quantity_90d, 0)
+ AS sales_quantity_90d,
+ COALESCE(s.revenue_90d, 0.0)
+ AS revenue_90d
+ FROM {products_table} AS p
+ LEFT JOIN {sales_table} AS s
+ ON CAST(p."{join_key_products}" AS VARCHAR)
+ = CAST(s.{join_key_sales} AS VARCHAR)
+ """)
+
+ row_count = con.execute(
+ f"SELECT COUNT(*) FROM {joined_table}"
+ ).fetchone()[0]
+ logger.info(
+ f"LEFT JOIN abgeschlossen: "
+ f"{row_count:,} Zeilen in '{joined_table}'"
+ )
+
+ # ── Ergebnis als Polars DataFrame exportieren ─────────
+ result = con.execute(f"SELECT * FROM {joined_table}").pl()
+ logger.info(
+ f"Polars DataFrame erstellt: "
+ f"{result.shape[0]:,} Zeilen, {result.shape[1]} Spalten"
+ )
+
+ return result
+
+
+# ══════════════════════════════════════════════════════════
+# Berechnete Felder hinzufügen
+# ══════════════════════════════════════════════════════════
+
+def add_calculated_fields(df: pl.DataFrame) -> pl.DataFrame:
+ """
+ Fügt berechnete Felder hinzu, die als Grundlage für die
+ Regel-Engine dienen.
+
+ Args:
+ df: Polars DataFrame mit den verknüpften Rohdaten.
+
+ Returns:
+ Polars DataFrame mit zusätzlichen berechneten Spalten.
+ """
+ logger.info("Füge berechnete Felder hinzu...")
+
+ df = df.with_columns([
+ # Marge in Prozent (wenn EK vorhanden)
+ pl.when(
+ (pl.col("EK_Netto").is_not_null())
+ & (pl.col("VK_Brutto") > 0)
+ )
+ .then(
+ (
+ (pl.col("VK_Brutto") - pl.col("EK_Netto"))
+ / pl.col("VK_Brutto")
+ * 100
+ ).round(2)
+ )
+ .otherwise(pl.lit(None))
+ .alias("margin_percent"),
+
+ # Click-Through-Rate
+ pl.when(pl.col("impressions") > 0)
+ .then(
+ (pl.col("clicks") / pl.col("impressions") * 100)
+ .round(2)
+ )
+ .otherwise(pl.lit(0.0))
+ .alias("ctr_percent"),
+
+ # Conversion-Rate
+ pl.when(pl.col("clicks") > 0)
+ .then(
+ (
+ pl.col("sales_quantity_90d")
+ / pl.col("clicks")
+ * 100
+ ).round(2)
+ )
+ .otherwise(pl.lit(0.0))
+ .alias("conversion_rate_percent"),
+
+ # Custom Label Spalten initialisieren (leer)
+ pl.lit("").alias("custom_label_0"),
+ pl.lit("").alias("custom_label_1"),
+ pl.lit("").alias("custom_label_2"),
+ pl.lit("").alias("custom_label_3"),
+ pl.lit("").alias("custom_label_4"),
+
+ # Versandkosten initialisieren
+ pl.lit("4.95").alias("shipping_cost"),
+
+ # Exclude-Flag initialisieren
+ pl.lit(False).alias("_excluded"),
+ pl.lit("").alias("_exclude_reason"),
+ ])
+
+ logger.info(
+ f"Berechnete Felder hinzugefügt. "
+ f"DataFrame: {df.shape[1]} Spalten"
+ )
+ return df
+
+
+# ══════════════════════════════════════════════════════════
+# YAML Regel-Engine
+# ══════════════════════════════════════════════════════════
+
+def evaluate_condition(
+ df: pl.DataFrame,
+ condition: dict,
+) -> pl.Series:
+ """
+ Evaluiert eine einzelne Bedingung und gibt eine
+ boolesche Maske zurück.
+
+ Args:
+ df: Polars DataFrame.
+ condition: Dictionary mit field, operator, value.
+
+ Returns:
+ Polars Series (Boolean) als Filtermaske.
+ """
+ field = condition["field"]
+ operator = condition["operator"]
+ value = condition.get("value")
+ value_from_field = condition.get("value_from_field")
+
+ # Prüfen ob das Feld existiert
+ if field not in df.columns:
+ logger.warning(
+ f"Feld '{field}' existiert nicht im DataFrame. "
+ f"Bedingung übersprungen."
+ )
+ return pl.Series("mask", [False] * df.height)
+
+ col = pl.col(field)
+
+ # Vergleichswert: fester Wert oder aus anderem Feld
+ if value_from_field:
+ compare_value = pl.col(value_from_field)
+ else:
+ compare_value = value
+
+ # ── Operator-Mapping ──────────────────────────────────
+ operator_map = {
+ ">": col > compare_value,
+ "<": col < compare_value,
+ ">=": col >= compare_value,
+ "<=": col <= compare_value,
+ "==": col == compare_value,
+ "!=": col != compare_value,
+ "contains": col.cast(pl.Utf8).str.contains(
+ str(compare_value), literal=True
+ ),
+ "not_contains": ~col.cast(pl.Utf8).str.contains(
+ str(compare_value), literal=True
+ ),
+ "starts_with": col.cast(pl.Utf8).str.starts_with(
+ str(compare_value)
+ ),
+ "ends_with": col.cast(pl.Utf8).str.ends_with(
+ str(compare_value)
+ ),
+ "is_empty": (
+ col.is_null() | (col.cast(pl.Utf8) == "")
+ ),
+ "is_not_empty": (
+ col.is_not_null() & (col.cast(pl.Utf8) != "")
+ ),
+ }
+
+ if operator not in operator_map:
+ logger.error(f"Unbekannter Operator: '{operator}'")
+ return pl.Series("mask", [False] * df.height)
+
+ return df.select(operator_map[operator]).to_series()
+
+
+def apply_rules(
+ df: pl.DataFrame,
+ rules_config: dict,
+) -> pl.DataFrame:
+ """
+ Wendet alle aktivierten Regeln aus der rules.yaml auf den
+ DataFrame an. Regeln werden nach Priorität sortiert und
+ sequenziell abgearbeitet.
+
+ Args:
+ df: Polars DataFrame mit den angereicherten Produktdaten.
+ rules_config: Geladene rules.yaml als Dictionary.
+
+ Returns:
+ Polars DataFrame nach Anwendung aller Regeln.
+ """
+ rules = rules_config.get("rules", [])
+ settings = rules_config.get("settings", {})
+
+ # Regeln nach Priorität sortieren
+ # (niedrigere Zahl = höhere Priorität)
+ rules_sorted = sorted(
+ [r for r in rules if r.get("enabled", True)],
+ key=lambda r: r.get("priority", 999),
+ )
+
+ logger.info(f"Wende {len(rules_sorted)} aktive Regeln an...")
+
+ # ── Globale Ausschlüsse (Settings) ────────────────────
+ if settings.get("exclude_out_of_stock", False):
+ df = df.with_columns(
+ pl.when(pl.col("stock_quantity") <= 0)
+ .then(pl.lit(True))
+ .otherwise(pl.col("_excluded"))
+ .alias("_excluded")
+ )
+ excluded = df.filter(pl.col("_excluded")).height
+ logger.info(
+ f"Globaler Ausschluss (Bestand=0): "
+ f"{excluded:,} Artikel markiert"
+ )
+
+ min_price = settings.get("minimum_price", 0)
+ if min_price > 0:
+ df = df.with_columns(
+ pl.when(pl.col("VK_Brutto") < min_price)
+ .then(pl.lit(True))
+ .otherwise(pl.col("_excluded"))
+ .alias("_excluded")
+ )
+
+ # ── Regeln sequenziell anwenden ───────────────────────
+ for rule in rules_sorted:
+ rule_name = rule.get("name", "Unbenannt")
+ conditions_config = rule.get("conditions", {})
+ actions = rule.get("actions", [])
+ logic = conditions_config.get("logic", "and")
+ sub_rules = conditions_config.get("rules", [])
+
+ logger.debug(
+ f"Verarbeite Regel: '{rule_name}' "
+ f"(Priorität: {rule.get('priority', '?')})"
+ )
+
+ # Alle Teilbedingungen evaluieren
+ masks = [
+ evaluate_condition(df, cond) for cond in sub_rules
+ ]
+
+ if not masks:
+ logger.warning(
+ f"Regel '{rule_name}' hat keine Bedingungen. "
+ f"Übersprungen."
+ )
+ continue
+
+ # Logische Verknüpfung der Teilbedingungen
+ if logic == "and":
+ combined_mask = masks[0]
+ for m in masks[1:]:
+ combined_mask = combined_mask & m
+ elif logic == "or":
+ combined_mask = masks[0]
+ for m in masks[1:]:
+ combined_mask = combined_mask | m
+ else:
+ logger.error(
+ f"Unbekannte Logik '{logic}' in Regel "
+ f"'{rule_name}'. Übersprungen."
+ )
+ continue
+
+ matching_count = combined_mask.sum()
+ logger.info(
+ f"Regel '{rule_name}': "
+ f"{matching_count:,} Artikel treffen zu"
+ )
+
+ # Aktionen auf übereinstimmende Zeilen anwenden
+ for action_def in actions:
+ action_type = action_def.get("action", "set_value")
+ target_field = action_def.get("target_field", "")
+
+ if action_type == "set_value":
+ value = action_def.get("value", "")
+ df = df.with_columns(
+ pl.when(combined_mask)
+ .then(pl.lit(value))
+ .otherwise(pl.col(target_field))
+ .alias(target_field)
+ )
+
+ elif action_type == "copy_field":
+ source = action_def.get("source_field", "")
+ if source in df.columns:
+ df = df.with_columns(
+ pl.when(combined_mask)
+ .then(pl.col(source))
+ .otherwise(pl.col(target_field))
+ .alias(target_field)
+ )
+
+ elif action_type == "prepend":
+ template = action_def.get(
+ "value_template", ""
+ )
+ for col_name in df.columns:
+ placeholder = f"{{{col_name}}}"
+ if placeholder in template:
+ suffix = template.replace(
+ placeholder, ""
+ )
+ df = df.with_columns(
+ pl.when(combined_mask)
+ .then(
+ pl.col(col_name)
+ .cast(pl.Utf8)
+ + pl.lit(suffix)
+ + pl.col(target_field)
+ .cast(pl.Utf8)
+ )
+ .otherwise(pl.col(target_field))
+ .alias(target_field)
+ )
+ break
+
+ elif action_type == "append":
+ value = action_def.get("value", "")
+ df = df.with_columns(
+ pl.when(combined_mask)
+ .then(
+ pl.col(target_field).cast(pl.Utf8)
+ + pl.lit(value)
+ )
+ .otherwise(pl.col(target_field))
+ .alias(target_field)
+ )
+
+ elif action_type == "exclude":
+ reason = action_def.get(
+ "reason", "Durch Regel ausgeschlossen"
+ )
+ df = df.with_columns(
+ pl.when(combined_mask)
+ .then(pl.lit(True))
+ .otherwise(pl.col("_excluded"))
+ .alias("_excluded"),
+ pl.when(combined_mask)
+ .then(pl.lit(reason))
+ .otherwise(pl.col("_exclude_reason"))
+ .alias("_exclude_reason"),
+ )
+
+ # ── Statistik ─────────────────────────────────────────
+ total = df.height
+ excluded_count = df.filter(pl.col("_excluded")).height
+ active_count = total - excluded_count
+ logger.info(
+ f"Transformation abgeschlossen: {total:,} Gesamt | "
+ f"{active_count:,} Aktiv | "
+ f"{excluded_count:,} Ausgeschlossen"
+ )
+
+ return df
+```
+
+> ⚠️ **Hinweis:** Dies ist ein konzeptionelles Beispiel-Skript. Die Regel-Engine ist bewusst einfach gehalten und muss für den Produktionseinsatz um Fehlerbehandlung, Validierung der Eingabedaten und weitere Operatoren erweitert werden. Die Feldnamen (`EK_Netto`, `VK_Brutto`, `stock_quantity` usw.) müssen mit den tatsächlichen Spalten Ihres JTL-Exports übereinstimmen.
+
+## 6.4 Load-Modul (load.py)
+
+```python
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+/opt/etl-pipeline/scripts/load.py
+
+Modul für den Batch-Upload transformierter Produktdaten
+an die Google Content API for Shopping.
+Implementiert Rate-Limiting und exponentielles Backoff.
+"""
+
+import os
+import json
+import time
+import polars as pl
+from pathlib import Path
+from datetime import datetime
+
+from google.oauth2 import service_account
+from googleapiclient.discovery import build
+from googleapiclient.errors import HttpError
+from tenacity import (
+ retry,
+ stop_after_attempt,
+ wait_exponential,
+ retry_if_exception_type,
+)
+
+from scripts.utils import (
+ setup_logging,
+ load_yaml_config,
+ CREDENTIALS_DIR,
+ DATA_DIR,
+ LOG_DIR,
+ get_date_stamp,
+)
+
+logger = setup_logging("load")
+
+
+# ══════════════════════════════════════════════════════════
+# Konstanten
+# ══════════════════════════════════════════════════════════
+BATCH_SIZE = 1000 # Produkte pro Batch-Request
+RATE_LIMIT_DELAY = 1.0 # Sekunden Pause zwischen Batches
+MAX_RETRIES = 5 # Maximale Wiederholungsversuche
+BACKOFF_MULTIPLIER = 2 # Exponentieller Backoff-Faktor
+
+
+# ══════════════════════════════════════════════════════════
+# Google Content API Client
+# ══════════════════════════════════════════════════════════
+
+def get_content_api_service():
+ """
+ Erstellt einen authentifizierten Google Content API Service.
+
+ Returns:
+ Google API Service-Objekt für Content API v2.1.
+ """
+ credentials_path = os.environ.get(
+ "GOOGLE_APPLICATION_CREDENTIALS",
+ str(CREDENTIALS_DIR / "google_service_account.json"),
+ )
+
+ credentials = service_account.Credentials.from_service_account_file(
+ credentials_path,
+ scopes=["https://www.googleapis.com/auth/content"],
+ )
+
+ service = build("content", "v2.1", credentials=credentials)
+ logger.info("Google Content API Service erstellt.")
+ return service
+
+
+# ══════════════════════════════════════════════════════════
+# Produkt-Payload-Erstellung
+# ══════════════════════════════════════════════════════════
+
+def build_product_payload(
+ row: dict,
+ column_mapping: dict,
+ settings: dict,
+) -> dict:
+ """
+ Erstellt den API-Payload für ein einzelnes Produkt nach
+ dem Google Shopping Content API Schema.
+
+ Args:
+ row: Dictionary mit den Produktdaten einer Zeile.
+ column_mapping: Spalten-Mapping (JTL → Google).
+ settings: Globale Pipeline-Einstellungen.
+
+ Returns:
+ Dictionary im Google Content API Produktformat.
+ """
+ target_country = settings.get("target_country", "DE")
+ content_language = settings.get("content_language", "de")
+ currency = settings.get("currency", "EUR")
+ mapping = column_mapping.get("mapping", {})
+
+ product = {
+ "offerId": str(row.get(
+ mapping.get("Artikelnummer", "Artikelnummer"), ""
+ )),
+ "title": str(row.get("title", row.get(
+ mapping.get("Artikelname", ""), ""
+ ))),
+ "description": str(row.get(
+ mapping.get("Beschreibung", "Beschreibung"), ""
+ )),
+ "link": str(row.get(
+ mapping.get("URL", "URL"), ""
+ )),
+ "imageLink": str(row.get(
+ mapping.get("BildURL", "BildURL"), ""
+ )),
+ "contentLanguage": content_language,
+ "targetCountry": target_country,
+ "channel": "online",
+ "availability": (
+ "in stock"
+ if row.get("stock_quantity", 0) > 0
+ else "out of stock"
+ ),
+ "condition": "new",
+ "price": {
+ "value": str(row.get("VK_Brutto", "0.00")),
+ "currency": currency,
+ },
+ }
+
+ # Optionale Felder hinzufügen
+ brand = row.get("Marke", row.get("brand", ""))
+ if brand:
+ product["brand"] = str(brand)
+
+ gtin = row.get("GTIN", row.get("gtin", ""))
+ if gtin:
+ product["gtin"] = str(gtin)
+
+ # Custom Labels (0-4)
+ for i in range(5):
+ label_value = row.get(f"custom_label_{i}", "")
+ if label_value:
+ product[f"customLabel{i}"] = str(label_value)
+
+ # Sale-Preis
+ sale_price = row.get("sale_price", "")
+ if sale_price and float(sale_price) > 0:
+ product["salePrice"] = {
+ "value": str(sale_price),
+ "currency": currency,
+ }
+
+ # Versandkosten
+ shipping_cost = row.get("shipping_cost", "")
+ if shipping_cost is not None and shipping_cost != "":
+ product["shipping"] = [{
+ "country": target_country,
+ "price": {
+ "value": str(shipping_cost),
+ "currency": currency,
+ },
+ }]
+
+ return product
+
+
+# ══════════════════════════════════════════════════════════
+# Batch-Upload mit Rate Limiting
+# ══════════════════════════════════════════════════════════
+
+@retry(
+ stop=stop_after_attempt(MAX_RETRIES),
+ wait=wait_exponential(
+ multiplier=BACKOFF_MULTIPLIER, min=2, max=60
+ ),
+ retry=retry_if_exception_type(
+ (HttpError, ConnectionError, TimeoutError)
+ ),
+ before_sleep=lambda retry_state: logger.warning(
+ f"Retry {retry_state.attempt_number}/{MAX_RETRIES} "
+ f"nach Fehler. "
+ f"Warte {retry_state.next_action.sleep:.1f}s..."
+ ),
+)
+def execute_batch_request(
+ service,
+ merchant_id: str,
+ batch_entries: list[dict],
+) -> dict:
+ """
+ Führt einen einzelnen Batch-Request an die Content API aus.
+ Implementiert exponentielles Backoff bei Rate-Limit-Fehlern.
+
+ Args:
+ service: Google Content API Service.
+ merchant_id: Merchant Center ID.
+ batch_entries: Liste der Batch-Einträge.
+
+ Returns:
+ API-Antwort als Dictionary.
+ """
+ body = {"entries": batch_entries}
+
+ response = (
+ service.products()
+ .custombatch(body=body)
+ .execute()
+ )
+
+ return response
+
+
+def upload_products_to_google(
+ df: pl.DataFrame,
+ batch_size: int = BATCH_SIZE,
+ rate_limit_delay: float = RATE_LIMIT_DELAY,
+) -> dict:
+ """
+ Lädt alle aktiven (nicht ausgeschlossenen) Produkte als
+ Batch-Requests an die Google Content API hoch.
+
+ Args:
+ df: Polars DataFrame mit transformierten Produktdaten.
+ batch_size: Anzahl der Produkte pro Batch-Request.
+ rate_limit_delay: Pause in Sekunden zwischen Batches.
+
+ Returns:
+ Dictionary mit Upload-Statistiken.
+ """
+ merchant_id = os.environ.get("MERCHANT_CENTER_ID")
+ if not merchant_id:
+ raise ValueError("MERCHANT_CENTER_ID ist nicht gesetzt")
+
+ # Konfiguration laden
+ column_mapping = load_yaml_config("column_mapping.yaml")
+ rules_config = load_yaml_config("rules.yaml")
+ settings = rules_config.get("settings", {})
+
+ # Nur aktive (nicht ausgeschlossene) Produkte uploaden
+ df_active = df.filter(~pl.col("_excluded"))
+ total_products = df_active.height
+
+ logger.info(
+ f"Starte Upload: {total_products:,} aktive Produkte "
+ f"in Batches von {batch_size}"
+ )
+
+ # Service erstellen
+ service = get_content_api_service()
+
+ # ── Statistiken ───────────────────────────────────────
+ stats = {
+ "total": total_products,
+ "success": 0,
+ "errors": 0,
+ "batches": 0,
+ "start_time": datetime.now().isoformat(),
+ "error_details": [],
+ }
+
+ # ── Produkte in Batches aufteilen und hochladen ───────
+ rows_as_dicts = df_active.to_dicts()
+ total_batches = (
+ (total_products + batch_size - 1) // batch_size
+ )
+
+ for batch_start in range(0, total_products, batch_size):
+ batch_end = min(batch_start + batch_size, total_products)
+ batch_rows = rows_as_dicts[batch_start:batch_end]
+ batch_number = (batch_start // batch_size) + 1
+
+ logger.info(
+ f"Batch {batch_number}/{total_batches}: "
+ f"Produkte {batch_start + 1:,} – {batch_end:,}"
+ )
+
+ # Batch-Einträge erstellen
+ batch_entries = []
+ for idx, row in enumerate(batch_rows):
+ product_payload = build_product_payload(
+ row, column_mapping, settings
+ )
+ batch_entries.append({
+ "batchId": batch_start + idx,
+ "merchantId": merchant_id,
+ "method": "insert",
+ "product": product_payload,
+ })
+
+ # Batch-Request ausführen (mit Retry/Backoff)
+ try:
+ response = execute_batch_request(
+ service, merchant_id, batch_entries
+ )
+
+ # Antwort auswerten
+ for entry in response.get("entries", []):
+ if (
+ "errors" in entry
+ and entry["errors"].get("errors")
+ ):
+ stats["errors"] += 1
+ batch_idx = entry.get("batchId", 0)
+ local_idx = batch_idx - batch_start
+ offer_id = "unbekannt"
+ if 0 <= local_idx < len(batch_rows):
+ offer_id = batch_rows[local_idx].get(
+ "Artikelnummer", "unbekannt"
+ )
+ error_info = {
+ "batchId": batch_idx,
+ "offerId": offer_id,
+ "errors": entry["errors"]["errors"],
+ }
+ stats["error_details"].append(error_info)
+ logger.warning(
+ f"Fehler bei Produkt {offer_id}: "
+ f"{entry['errors']['errors']}"
+ )
+ else:
+ stats["success"] += 1
+
+ except HttpError as e:
+ if e.resp.status == 429:
+ logger.warning(
+ f"Rate Limit erreicht bei Batch "
+ f"{batch_number}. Erhöhe Wartezeit..."
+ )
+ time.sleep(rate_limit_delay * 5)
+ else:
+ logger.error(
+ f"HTTP-Fehler bei Batch "
+ f"{batch_number}: {e}"
+ )
+ stats["errors"] += len(batch_rows)
+
+ except Exception as e:
+ logger.error(
+ f"Unerwarteter Fehler bei Batch "
+ f"{batch_number}: {e}"
+ )
+ stats["errors"] += len(batch_rows)
+
+ stats["batches"] += 1
+
+ # ── Rate Limiting: Pause zwischen Batches ─────────
+ if batch_end < total_products:
+ logger.debug(
+ f"Rate Limiting: {rate_limit_delay}s Pause..."
+ )
+ time.sleep(rate_limit_delay)
+
+ # ── Upload-Statistiken loggen ─────────────────────────
+ stats["end_time"] = datetime.now().isoformat()
+ stats["duration_seconds"] = (
+ datetime.fromisoformat(stats["end_time"])
+ - datetime.fromisoformat(stats["start_time"])
+ ).total_seconds()
+
+ logger.info(
+ f"Upload abgeschlossen: "
+ f"{stats['success']:,} erfolgreich | "
+ f"{stats['errors']:,} Fehler | "
+ f"{stats['batches']} Batches | "
+ f"{stats['duration_seconds']:.1f}s Gesamtdauer"
+ )
+
+ # ── Fehlerdetails als JSON speichern ──────────────────
+ if stats["error_details"]:
+ error_dir = LOG_DIR / "upload_errors"
+ error_dir.mkdir(parents=True, exist_ok=True)
+ error_file = (
+ error_dir / f"errors_{get_date_stamp()}.json"
+ )
+ with open(error_file, "w", encoding="utf-8") as f:
+ json.dump(
+ stats["error_details"],
+ f,
+ indent=2,
+ ensure_ascii=False,
+ )
+ logger.info(f"Fehlerdetails gespeichert: {error_file}")
+
+ return stats
+```
+
+> ⚠️ **Hinweis:** Dies ist ein konzeptionelles Beispiel-Skript. Die Batch-Größe, Rate-Limiting-Werte und API-Payloads müssen an Ihre spezifischen API-Quotas und Produktdatenstrukturen angepasst werden. Testen Sie den Upload zunächst mit einer kleinen Teilmenge (z. B. 10 Produkte) im Testmodus der Google Content API, bevor Sie den vollständigen Datensatz hochladen.
+
+## 6.5 Hauptskript (main.py)
+
+```python
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+/opt/etl-pipeline/scripts/main.py
+
+Hauptskript (Entry Point) für die ETL-Pipeline.
+Orchestriert den gesamten Ablauf: Extract → Transform → Load.
+
+Aufruf:
+ cd /opt/etl-pipeline
+ source venv/bin/activate
+ python -m scripts.main
+"""
+
+import sys
+import time
+import duckdb
+from pathlib import Path
+
+from scripts.utils import (
+ setup_logging,
+ load_yaml_config,
+ BASE_DIR,
+ DATA_DIR,
+ get_timestamp,
+)
+from scripts.extract import (
+ load_jtl_csv_to_duckdb,
+ fetch_google_sales_data,
+)
+from scripts.transform import (
+ join_products_with_sales,
+ add_calculated_fields,
+ apply_rules,
+)
+from scripts.load import upload_products_to_google
+
+logger = setup_logging("main")
+
+
+def run_pipeline() -> dict:
+ """
+ Führt die vollständige ETL-Pipeline aus.
+
+ Returns:
+ Dictionary mit Statistiken aller drei Phasen.
+
+ Raises:
+ SystemExit: Bei kritischen Fehlern mit Exit-Code 1.
+ """
+ pipeline_start = time.time()
+ run_id = get_timestamp()
+
+ logger.info(f"{'═' * 60}")
+ logger.info(f"ETL-Pipeline gestartet — Run-ID: {run_id}")
+ logger.info(f"{'═' * 60}")
+
+ stats = {
+ "run_id": run_id,
+ "extract": {},
+ "transform": {},
+ "load": {},
+ }
+
+ # ══════════════════════════════════════════════════════
+ # DuckDB-Verbindung herstellen
+ # ══════════════════════════════════════════════════════
+ db_path = DATA_DIR / "db" / "products.duckdb"
+ db_path.parent.mkdir(parents=True, exist_ok=True)
+
+ con = duckdb.connect(str(db_path))
+ logger.info(f"DuckDB-Verbindung hergestellt: {db_path}")
+
+ try:
+ # ══════════════════════════════════════════════════
+ # PHASE 1: EXTRACT
+ # ══════════════════════════════════════════════════
+ logger.info(f"{'─' * 40}")
+ logger.info("PHASE 1: EXTRACT")
+ logger.info(f"{'─' * 40}")
+
+ # JTL CSV laden
+ jtl_rows = load_jtl_csv_to_duckdb(con)
+ stats["extract"]["jtl_rows"] = jtl_rows
+
+ # Google Sales-Daten abrufen
+ sales_rows = fetch_google_sales_data(con)
+ stats["extract"]["sales_rows"] = sales_rows
+
+ # ══════════════════════════════════════════════════
+ # PHASE 2: TRANSFORM
+ # ══════════════════════════════════════════════════
+ logger.info(f"{'─' * 40}")
+ logger.info("PHASE 2: TRANSFORM")
+ logger.info(f"{'─' * 40}")
+
+ # LEFT JOIN: Produkte ⟕ Sales
+ df = join_products_with_sales(con)
+
+ # Berechnete Felder hinzufügen
+ df = add_calculated_fields(df)
+
+ # YAML-Regeln anwenden
+ rules_config = load_yaml_config("rules.yaml")
+ df = apply_rules(df, rules_config)
+
+ stats["transform"]["total_products"] = df.height
+ stats["transform"]["active_products"] = df.filter(
+ ~df["_excluded"]
+ ).height
+ stats["transform"]["excluded_products"] = df.filter(
+ df["_excluded"]
+ ).height
+
+ # ══════════════════════════════════════════════════
+ # PHASE 3: LOAD
+ # ══════════════════════════════════════════════════
+ logger.info(f"{'─' * 40}")
+ logger.info("PHASE 3: LOAD")
+ logger.info(f"{'─' * 40}")
+
+ upload_stats = upload_products_to_google(df)
+ stats["load"] = upload_stats
+
+ except Exception as e:
+ logger.critical(
+ f"Pipeline fehlgeschlagen: {e}", exc_info=True
+ )
+ # Exit-Code 1 signalisiert n8n einen Fehler
+ sys.exit(1)
+
+ finally:
+ con.close()
+ logger.info("DuckDB-Verbindung geschlossen.")
+
+ # ══════════════════════════════════════════════════════
+ # Zusammenfassung
+ # ══════════════════════════════════════════════════════
+ pipeline_duration = time.time() - pipeline_start
+ stats["duration_seconds"] = round(pipeline_duration, 2)
+
+ logger.info(f"{'═' * 60}")
+ logger.info(
+ f"ETL-Pipeline abgeschlossen — "
+ f"Dauer: {pipeline_duration:.1f}s"
+ )
+ logger.info(
+ f" Extract: "
+ f"{stats['extract'].get('jtl_rows', 0):,} JTL, "
+ f"{stats['extract'].get('sales_rows', 0):,} Sales"
+ )
+ logger.info(
+ f" Transform: "
+ f"{stats['transform'].get('active_products', 0):,} aktiv, "
+ f"{stats['transform'].get('excluded_products', 0):,} "
+ f"ausgeschlossen"
+ )
+ logger.info(
+ f" Load: "
+ f"{stats['load'].get('success', 0):,} OK, "
+ f"{stats['load'].get('errors', 0):,} Fehler"
+ )
+ logger.info(f"{'═' * 60}")
+
+ return stats
+
+
+# ──────────────────────────────────────────────────────────
+# Entry Point
+# ──────────────────────────────────────────────────────────
+if __name__ == "__main__":
+ run_pipeline()
+```
+
+> ⚠️ **Hinweis:** Dies ist ein konzeptionelles Beispiel-Skript. Es dient als Ausgangspunkt und Entry Point für die Pipeline. Vor dem produktiven Einsatz sollten alle Module gründlich mit Testdaten validiert und die Fehlerbehandlung an Ihre Anforderungen angepasst werden.
+
+---
+
+# 7. n8n Workflow Integration
+
+## 7.1 Übersicht
+
+Der n8n Workflow übernimmt zwei zentrale Aufgaben:
+
+1. **Scheduling:** Tägliche Ausführung der Python-Pipeline um **00:00 Uhr** (Mitternacht).
+2. **Error Handling:** Benachrichtigung per E-Mail (oder Slack/Telegram) bei Fehlern.
+
+Die Kommunikation zwischen n8n (Docker-Container) und der Python-Pipeline auf dem Host-System erfolgt über den n8n-Node **Execute Command**, der Shell-Befehle im Container ausführt. Da das Pipeline-Verzeichnis als Volume gemountet ist, hat n8n Zugriff auf die Skripte.
+
+## 7.2 Workflow-Architektur
+
+```mermaid
+flowchart TD
+ CRON["⏰ Cron Trigger
täglich 00:00 Uhr"]
+ EXEC["⚡ Execute Command
/opt/etl-pipeline/venv/bin/python
-m scripts.main"]
+ CHECK{"🔍 Exit-Code
prüfen"}
+ SUCCESS["✅ Erfolgs-Meldung
Log + optionale E-Mail"]
+ ERROR["❌ Fehler-Alarm
E-Mail / Slack
Benachrichtigung"]
+
+ CRON --> EXEC
+ EXEC --> CHECK
+ CHECK -->|"Exit = 0 — OK"| SUCCESS
+ CHECK -->|"Exit = 1 — Fehler"| ERROR
+
+ style CRON fill:#e58e26,stroke:#fa983a,color:#fff
+ style EXEC fill:#0c2461,stroke:#1e3799,color:#fff
+ style CHECK fill:#474787,stroke:#706fd3,color:#fff
+ style SUCCESS fill:#079992,stroke:#38ada9,color:#fff
+ style ERROR fill:#b71540,stroke:#e55039,color:#fff
+```
+
+## 7.3 Workflow als n8n-JSON importieren
+
+Der folgende JSON-Export kann direkt in n8n importiert werden unter **Workflow → Import from File**. Speichern Sie den Inhalt als `.json`-Datei.
+
+```json
+{
+ "name": "ETL Pipeline - Google Shopping (Daily)",
+ "nodes": [
+ {
+ "parameters": {
+ "rule": {
+ "interval": [
+ {
+ "triggerAtHour": 0,
+ "triggerAtMinute": 0
+ }
+ ]
+ }
+ },
+ "name": "Cron Trigger (00:00)",
+ "type": "n8n-nodes-base.scheduleTrigger",
+ "typeVersion": 1.2,
+ "position": [240, 300]
+ },
+ {
+ "parameters": {
+ "command": "cd /opt/etl-pipeline && source venv/bin/activate && python -m scripts.main 2>&1",
+ "timeout": 3600
+ },
+ "name": "Execute ETL Pipeline",
+ "type": "n8n-nodes-base.executeCommand",
+ "typeVersion": 1,
+ "position": [480, 300]
+ },
+ {
+ "parameters": {
+ "conditions": {
+ "number": [
+ {
+ "value1": "={{ $json.exitCode }}",
+ "operation": "equal",
+ "value2": 0
+ }
+ ]
+ }
+ },
+ "name": "Check Exit Code",
+ "type": "n8n-nodes-base.if",
+ "typeVersion": 1,
+ "position": [720, 300]
+ },
+ {
+ "parameters": {
+ "fromEmail": "etl-pipeline@ihre-domain.de",
+ "toEmail": "admin@ihre-domain.de",
+ "subject": "✅ ETL Pipeline — Erfolg ({{ $now.format('yyyy-MM-dd') }})",
+ "text": "Die ETL-Pipeline wurde erfolgreich ausgeführt.\n\nZeitstempel: {{ $now.format('yyyy-MM-dd HH:mm:ss') }}\n\nStdout:\n{{ $json.stdout }}"
+ },
+ "name": "Success E-Mail",
+ "type": "n8n-nodes-base.emailSend",
+ "typeVersion": 2,
+ "position": [960, 200]
+ },
+ {
+ "parameters": {
+ "fromEmail": "etl-pipeline@ihre-domain.de",
+ "toEmail": "admin@ihre-domain.de",
+ "subject": "❌ ETL Pipeline — FEHLER ({{ $now.format('yyyy-MM-dd') }})",
+ "text": "ACHTUNG: Die ETL-Pipeline ist fehlgeschlagen!\n\nZeitstempel: {{ $now.format('yyyy-MM-dd HH:mm:ss') }}\nExit-Code: {{ $json.exitCode }}\n\nStderr:\n{{ $json.stderr }}\n\nStdout:\n{{ $json.stdout }}\n\nBitte prüfen: /opt/etl-pipeline/logs/error.log"
+ },
+ "name": "Error E-Mail",
+ "type": "n8n-nodes-base.emailSend",
+ "typeVersion": 2,
+ "position": [960, 400]
+ }
+ ],
+ "connections": {
+ "Cron Trigger (00:00)": {
+ "main": [
+ [{ "node": "Execute ETL Pipeline", "type": "main", "index": 0 }]
+ ]
+ },
+ "Execute ETL Pipeline": {
+ "main": [
+ [{ "node": "Check Exit Code", "type": "main", "index": 0 }]
+ ]
+ },
+ "Check Exit Code": {
+ "main": [
+ [{ "node": "Success E-Mail", "type": "main", "index": 0 }],
+ [{ "node": "Error E-Mail", "type": "main", "index": 0 }]
+ ]
+ }
+ },
+ "settings": {
+ "timezone": "Europe/Berlin",
+ "saveExecutionProgress": true,
+ "saveManualExecutions": true,
+ "executionTimeout": 7200
+ }
+}
+```
+
+> ⚠️ **Hinweis:** Dies ist ein Beispiel-Workflow für n8n. Die E-Mail-Adressen, SMTP-Einstellungen und Timeouts müssen an Ihre Umgebung angepasst werden. Konfigurieren Sie die SMTP-Credentials in n8n unter **Settings → Credentials** bevor Sie den Workflow aktivieren.
+
+## 7.4 Wichtige n8n-Konfigurationshinweise
+
+### Timeout-Einstellung
+
+Da die Verarbeitung von 200.000+ Artikeln je nach Serverhardware zwischen **5 und 30 Minuten** dauern kann, muss der Timeout im `Execute Command`-Node ausreichend hoch gesetzt werden:
+
+```
+timeout: 3600 # 1 Stunde (in Sekunden)
+```
+
+### Volume-Mount beachten
+
+Der `Execute Command`-Node führt Befehle **innerhalb des n8n Docker-Containers** aus. Damit die Python-Skripte und das Virtual Environment erreichbar sind, muss der Pfad `/opt/etl-pipeline` als Volume im `docker-compose.yml` gemountet sein (siehe Abschnitt 3.1.2).
+
+### Alternative: Host-Befehl über SSH
+
+Falls die Pipeline nicht innerhalb des Containers laufen soll, kann alternativ ein SSH-Node verwendet werden, der den Befehl auf dem Host ausführt:
+
+```bash
+# Befehl für n8n SSH-Node:
+ssh pipeline-user@localhost \
+ "cd /opt/etl-pipeline && source venv/bin/activate && python -m scripts.main"
+```
+
+> ⚠️ **Hinweis:** Beispielbefehl. Für die SSH-Variante muss ein SSH-Key ohne Passphrase konfiguriert und die SSH-Credentials in n8n hinterlegt werden.
+
+## 7.5 Manueller Pipeline-Start (Debugging)
+
+Für Debugging und Tests kann die Pipeline jederzeit manuell gestartet werden:
+
+```bash
+# ──────────────────────────────────────────────────────────
+# Pipeline manuell ausführen
+# ──────────────────────────────────────────────────────────
+cd /opt/etl-pipeline
+source venv/bin/activate
+python -m scripts.main
+
+# Alternativ: Nur den Exit-Code prüfen
+python -m scripts.main; echo "Exit-Code: $?"
+```
+
+> ⚠️ **Hinweis:** Beispielbefehle für den manuellen Start. Stellen Sie sicher, dass die Umgebungsvariablen (`GOOGLE_APPLICATION_CREDENTIALS`, `MERCHANT_CENTER_ID`) in der Shell-Session gesetzt sind.
+
+---
+
+# 8. Wartung & Monitoring
+
+## 8.1 Log-Management
+
+### 8.1.1 Log-Struktur
+
+Die Pipeline erzeugt folgende Log-Dateien:
+
+| Datei | Inhalt | Rotation |
+|----------------------------------------------|--------------------------------------|----------------------|
+| `logs/etl_pipeline.log` | Alle Log-Einträge (INFO+) | 10 MB, 5 Backups |
+| `logs/error.log` | Nur Fehler (ERROR+) | 10 MB, 5 Backups |
+| `logs/upload_errors/errors_YYYYMMDD.json` | Detail-Fehler pro Upload-Lauf | Täglich, eine Datei |
+
+### 8.1.2 Logrotate-Konfiguration
+
+Zusätzlich zur integrierten Python-Rotation empfiehlt sich eine systemweite Logrotate-Konfiguration als Sicherheitsnetz.
+
+Erstellen Sie die Datei `/etc/logrotate.d/etl-pipeline`:
+
+```
+/opt/etl-pipeline/logs/*.log {
+ daily
+ missingok
+ rotate 30
+ compress
+ delaycompress
+ notifempty
+ create 0640 root root
+ sharedscripts
+ postrotate
+ # Optional: Signal an laufende Prozesse
+ /bin/true
+ endscript
+}
+
+/opt/etl-pipeline/logs/upload_errors/*.json {
+ daily
+ missingok
+ rotate 90
+ compress
+ delaycompress
+ notifempty
+ create 0640 root root
+}
+```
+
+> ⚠️ **Hinweis:** Dies ist eine Beispiel-Logrotate-Konfiguration. Passen Sie den Benutzer, die Aufbewahrungsdauer (`rotate`) und die Dateiberechtigungen an Ihre Sicherheitsrichtlinien an.
+
+### 8.1.3 Log-Bereinigung (Cron)
+
+Alte Log-Dateien und Upload-Fehlerprotokolle können zusätzlich per Cron bereinigt werden:
+
+```bash
+# ──────────────────────────────────────────────────────────
+# Crontab-Eintrag: Logs älter als 90 Tage löschen
+# ──────────────────────────────────────────────────────────
+# crontab -e
+0 3 * * 0 find /opt/etl-pipeline/logs/ -name "*.log.*" -mtime +90 -delete
+0 3 * * 0 find /opt/etl-pipeline/logs/upload_errors/ -name "*.json" -mtime +90 -delete
+0 3 * * 0 find /opt/etl-pipeline/data/archive/ -name "*.csv" -mtime +180 -delete
+```
+
+> ⚠️ **Hinweis:** Beispiel-Crontab-Einträge. Passen Sie die Aufbewahrungsfristen (hier: 90 bzw. 180 Tage) an Ihre Compliance-Anforderungen an.
+
+## 8.2 Performance-Tipps für 200.000+ Artikel
+
+### 8.2.1 DuckDB-Optimierungen
+
+```python
+# ──────────────────────────────────────────────────────────
+# DuckDB Performance-Konfiguration
+# (am Anfang der Pipeline setzen)
+# ──────────────────────────────────────────────────────────
+
+con = duckdb.connect("products.duckdb")
+
+# Verfügbaren Arbeitsspeicher für DuckDB erhöhen
+con.execute("SET memory_limit = '4GB'")
+
+# Alle verfügbaren CPU-Kerne nutzen
+con.execute("SET threads TO 8")
+
+# Temporäre Dateien auf schnelle SSD legen
+con.execute("SET temp_directory = '/opt/etl-pipeline/data/tmp'")
+
+# Fortschrittsanzeige für lange Queries
+con.execute("SET enable_progress_bar = true")
+```
+
+> ⚠️ **Hinweis:** Beispiel-Konfiguration. Die Werte für `memory_limit` und `threads` müssen an die tatsächliche Serverausstattung angepasst werden.
+
+### 8.2.2 Polars-Optimierungen
+
+```python
+# ──────────────────────────────────────────────────────────
+# Polars Performance-Tipps
+# ──────────────────────────────────────────────────────────
+
+import polars as pl
+
+# 1. Lazy Evaluation verwenden (bei sehr großen Datensätzen)
+# Polars optimiert den Query-Plan automatisch
+df_lazy = pl.scan_csv(
+ "data/input/jtl_export.csv", separator=";"
+)
+df_result = (
+ df_lazy
+ .filter(pl.col("Lagerbestand") > 0)
+ .with_columns([
+ pl.col("VK_Brutto").cast(pl.Float64),
+ ])
+ .collect() # Erst hier werden die Daten materialisiert
+)
+
+# 2. Streaming-Modus für Datensätze > verfügbares RAM
+df_result = (
+ df_lazy
+ .filter(pl.col("Lagerbestand") > 0)
+ .collect(streaming=True) # Verarbeitet in Chunks
+)
+
+# 3. Datentypen optimieren (reduziert RAM-Verbrauch)
+df = df.with_columns([
+ pl.col("Lagerbestand").cast(pl.Int32), # statt Int64
+ pl.col("clicks").cast(pl.Int32), # statt Int64
+ pl.col("impressions").cast(pl.Int32), # statt Int64
+ pl.col("custom_label_0").cast(pl.Categorical), # statt Utf8
+ pl.col("custom_label_1").cast(pl.Categorical), # statt Utf8
+])
+
+# 4. Spaltenauswahl: Nur benötigte Spalten laden
+df = pl.read_csv(
+ "data.csv",
+ columns=["sku", "title", "price", "stock"],
+)
+```
+
+> ⚠️ **Hinweis:** Dies sind Beispiel-Codeausschnitte zur Performance-Optimierung. Die tatsächlichen Spaltennamen und Datentypen müssen an Ihren Datensatz angepasst werden.
+
+### 8.2.3 Google API Upload-Optimierungen
+
+| Parameter | Empfehlung | Begründung |
+|--------------------------|--------------------------|--------------------------------------------------|
+| `BATCH_SIZE` | 1.000 | Google empfiehlt max. 1.000 Einträge pro Batch |
+| `RATE_LIMIT_DELAY` | 1,0 – 2,0 Sekunden | Verhindert 429-Fehler bei Standard-Quotas |
+| `MAX_RETRIES` | 5 | Ausreichend für temporäre Fehler |
+| Parallele Requests | Nicht empfohlen | Erhöht 429-Risiko, sequenziell ist sicherer |
+| Geschätzte Upload-Dauer | 10 – 20 Minuten | Bei 200 Batches × 1s Delay + API-Latenz |
+
+### 8.2.4 Allgemeine Server-Monitoring-Befehle
+
+```bash
+# ──────────────────────────────────────────────────────────
+# System-Monitoring während Pipeline-Lauf
+# ──────────────────────────────────────────────────────────
+
+# RAM- und CPU-Auslastung in Echtzeit beobachten
+htop
+
+# Festplatten-I/O überwachen
+iostat -x 5
+
+# Pipeline-Prozess gezielt überwachen
+watch -n 2 "ps aux | grep 'python.*scripts.main' | grep -v grep"
+
+# Festplattennutzung des Pipeline-Verzeichnisses
+du -sh /opt/etl-pipeline/data/*
+
+# DuckDB-Dateigröße prüfen
+ls -lh /opt/etl-pipeline/data/db/products.duckdb
+```
+
+> ⚠️ **Hinweis:** Beispielbefehle für das System-Monitoring. Diese dienen zur Orientierung und sollten je nach Ihren Monitoring-Tools (z. B. Prometheus, Grafana, Zabbix) ergänzt oder ersetzt werden.
+
+## 8.3 Backup-Empfehlungen
+
+Die folgenden Dateien und Verzeichnisse sollten regelmäßig gesichert werden:
+
+| Pfad | Priorität | Frequenz |
+|------------------------------------------------|-----------|----------------|
+| `/opt/etl-pipeline/config/` | Hoch | Bei Änderung |
+| `/opt/etl-pipeline/credentials/` | Kritisch | Bei Änderung |
+| `/opt/etl-pipeline/scripts/` | Hoch | Bei Änderung |
+| `/opt/etl-pipeline/data/db/products.duckdb` | Mittel | Täglich |
+| `/opt/n8n/data/` | Hoch | Wöchentlich |
+
+```bash
+#!/bin/bash
+# ──────────────────────────────────────────────────────────
+# Einfaches Backup-Skript (Beispiel)
+# ──────────────────────────────────────────────────────────
+BACKUP_DIR="/backup/etl-pipeline/$(date +%Y%m%d)"
+mkdir -p "$BACKUP_DIR"
+
+# Konfiguration und Skripte sichern
+tar czf "$BACKUP_DIR/config_scripts.tar.gz" \
+ /opt/etl-pipeline/config/ \
+ /opt/etl-pipeline/scripts/ \
+ /opt/etl-pipeline/requirements.txt
+
+# DuckDB sichern
+cp /opt/etl-pipeline/data/db/products.duckdb \
+ "$BACKUP_DIR/products.duckdb"
+
+# n8n Workflows sichern
+tar czf "$BACKUP_DIR/n8n_data.tar.gz" /opt/n8n/data/
+
+echo "Backup erstellt: $BACKUP_DIR"
+```
+
+> ⚠️ **Hinweis:** Dies ist ein vereinfachtes Beispiel-Backup-Skript. Für den Produktionseinsatz sollte eine vollwertige Backup-Lösung (z. B. BorgBackup, Restic) mit Verschlüsselung und Off-Site-Speicherung verwendet werden.
+
+## 8.4 Checkliste für den Produktivbetrieb
+
+Vor der Aktivierung der automatisierten Pipeline sollten folgende Punkte abgearbeitet sein:
+
+- [ ] Python Virtual Environment erstellt und alle Abhängigkeiten installiert
+- [ ] DuckDB-Datenbank initialisiert und CSV-Import getestet
+- [ ] Google Service Account erstellt und in Merchant Center eingeladen
+- [ ] `GOOGLE_APPLICATION_CREDENTIALS` und `MERCHANT_CENTER_ID` als Umgebungsvariablen gesetzt
+- [ ] `rules.yaml` mit initialen Regeln konfiguriert
+- [ ] `column_mapping.yaml` an JTL-Export-Spalten angepasst
+- [ ] Pipeline manuell mit Testdaten durchgelaufen (alle 3 Phasen)
+- [ ] n8n Workflow importiert und Cron-Trigger konfiguriert
+- [ ] E-Mail-/Slack-Benachrichtigung bei Fehlern getestet
+- [ ] Logrotate-Konfiguration aktiv
+- [ ] Backup-Strategie implementiert
+- [ ] Firewall-Regeln: Ausgehender Traffic zu Google APIs erlaubt
+- [ ] Monitoring-Dashboard eingerichtet (optional)
+
+---
+
+# Anhang
+
+## A. Nützliche Befehle (Kurzreferenz)
+
+```bash
+# ── Pipeline ──────────────────────────────────────────────
+cd /opt/etl-pipeline && source venv/bin/activate
+python -m scripts.main # Pipeline starten
+python -m scripts.main 2>&1 | tee run.log # Mit Log-Ausgabe
+
+# ── n8n ───────────────────────────────────────────────────
+cd /opt/n8n
+docker compose up -d # Starten
+docker compose down # Stoppen
+docker compose logs -f --tail=100 # Logs anzeigen
+docker compose restart # Neustart
+
+# ── DuckDB CLI ────────────────────────────────────────────
+cd /opt/etl-pipeline && source venv/bin/activate
+python -c "
+import duckdb
+con = duckdb.connect('data/db/products.duckdb')
+print(con.execute('SHOW TABLES').fetchall())
+print(con.execute('SELECT COUNT(*) FROM products_raw').fetchone())
+con.close()
+"
+
+# ── Logs prüfen ──────────────────────────────────────────
+tail -f /opt/etl-pipeline/logs/etl_pipeline.log
+tail -100 /opt/etl-pipeline/logs/error.log
+cat /opt/etl-pipeline/logs/upload_errors/errors_$(date +%Y%m%d).json | jq .
+
+# ── Disk-Usage ────────────────────────────────────────────
+du -sh /opt/etl-pipeline/data/db/*
+du -sh /opt/etl-pipeline/logs/*
+df -h /opt/etl-pipeline/
+```
+
+> ⚠️ **Hinweis:** Beispiel-Befehlsreferenz. Alle Pfade und Befehle müssen an Ihre Serverumgebung angepasst werden.
+
+## B. Fehlerbehandlung (Häufige Probleme)
+
+| Problem | Mögliche Ursache | Lösung |
+|------------------------------------------------------|------------------------------------------|--------------------------------------------------------|
+| `FileNotFoundError: jtl_export.csv` | CSV nicht am erwarteten Pfad | JTL-Export-Pfad in `settings.yaml` prüfen |
+| `google.auth.exceptions.DefaultCredentialsError` | Credentials nicht gefunden | `GOOGLE_APPLICATION_CREDENTIALS` prüfen |
+| `HttpError 429: Too Many Requests` | API-Quote überschritten | `RATE_LIMIT_DELAY` erhöhen (z. B. auf 2–3s) |
+| `MemoryError` bei Polars | RAM nicht ausreichend | Polars Streaming-Modus oder RAM aufstocken |
+| n8n: Timeout nach 600s | Pipeline dauert zu lang | Timeout im Execute Command auf 3600s erhöhen |
+| DuckDB: `IO Error` | Festplatte voll | Archiv- und Temp-Dateien bereinigen |
+| `ModuleNotFoundError` | venv nicht aktiviert | `source venv/bin/activate` vor Ausführung |
+
+---
+
+*Dokumentation erstellt am 2026-03-06 | Version 1.0.0*
+*Letzte Aktualisierung: 2026-03-06*