--- title: Channable Stack LT24 Profice Dokumentation description: published: true date: 2026-03-06T11:09:09.239Z tags: editor: markdown dateCreated: 2026-03-06T11:09:09.239Z --- # Self-Hosted ETL-Pipeline für Google Shopping ## Lokale Alternative zu Channable — Technische Dokumentation --- | Eigenschaft | Wert | |--------------------|------------------------------------------------| | **Dokumentversion** | 1.0.0 | | **Erstellt am** | 2026-03-06 | | **Zielplattform** | Debian 12 (Bookworm) Server | | **Tech-Stack** | Python 3.11+ · Polars · DuckDB · n8n (Docker) | | **Datensatzgröße** | 200.000+ Artikel, 30+ Attribute pro Artikel | | **Ziel-API** | Google Content API for Shopping | | **Sprache** | Deutsch | --- ## Inhaltsverzeichnis 1. [Systemübersicht](#1-systemübersicht) 2. [Voraussetzungen](#2-voraussetzungen) 3. [Installation & Setup](#3-installation--setup) 4. [Projektstruktur](#4-projektstruktur) 5. [Konfiguration](#5-konfiguration) 6. [Core Script Logic (Beispiel)](#6-core-script-logic-beispiel) 7. [n8n Workflow Integration](#7-n8n-workflow-integration) 8. [Wartung & Monitoring](#8-wartung--monitoring) --- # 1. Systemübersicht ## 1.1 Architektur-Überblick Die Pipeline ersetzt den kommerziellen SaaS-Dienst **Channable** durch eine vollständig selbst gehostete, Open-Source-basierte Lösung auf einem dedizierten Debian-Server. Der gesamte Datenfluss folgt dem klassischen **ETL-Muster** (Extract → Transform → Load) und wird täglich automatisiert durch **n8n** orchestriert. ### Architektur-Diagramm ```mermaid graph TB subgraph SERVER["🖥️ DEBIAN SERVER — Docker-Host"] direction TB subgraph N8N["⚙️ n8n — Docker Container"] CRON["⏰ Cron-Trigger
täglich 00:00 Uhr"] ERR_HANDLER["🔔 Error-Handler
E-Mail / Slack Alert"] end subgraph PYTHON["🐍 PYTHON 3.11+ — Virtual Environment"] direction LR subgraph EXTRACT["📥 EXTRACT"] JTL["📄 JTL-WAWI
CSV-Export"] GAPI["☁️ Google Ads/GA
API — 90 Tage"] end subgraph TRANSFORM["🔄 TRANSFORM"] DUCKDB_JOIN["🦆 DuckDB
LEFT JOIN
Produkte ⟕ Sales"] POLARS["⚡ Polars
Regel-Engine
rules.yaml"] end subgraph LOAD["📤 LOAD"] GCONTENT["🛒 Google
Content API
Batch-Upload
mit Rate-Limiting
"] end end DUCKDB_FILE[("💾 DuckDB
products.duckdb")] LOGS["📋 Logs & Monitoring
/opt/etl-pipeline/logs/"] end CRON -->|"startet Pipeline"| JTL CRON -->|"startet Pipeline"| GAPI JTL -->|"CSV Import"| DUCKDB_JOIN GAPI -->|"Sales-Daten"| DUCKDB_JOIN DUCKDB_JOIN -->|"DataFrame"| POLARS POLARS -->|"transformierte Daten"| GCONTENT DUCKDB_JOIN -.->|"persistiert"| DUCKDB_FILE ERR_HANDLER -.->|"bei Exit-Code 1"| LOGS style SERVER fill:#1a1a2e,stroke:#16213e,color:#e0e0e0 style N8N fill:#2d3436,stroke:#636e72,color:#dfe6e9 style PYTHON fill:#2d3436,stroke:#636e72,color:#dfe6e9 style EXTRACT fill:#0a3d62,stroke:#3c6382,color:#dfe6e9 style TRANSFORM fill:#1e3799,stroke:#4a69bd,color:#dfe6e9 style LOAD fill:#6a0572,stroke:#a834a8,color:#dfe6e9 style JTL fill:#079992,stroke:#38ada9,color:#fff style GAPI fill:#079992,stroke:#38ada9,color:#fff style DUCKDB_JOIN fill:#0c2461,stroke:#1e3799,color:#fff style POLARS fill:#0c2461,stroke:#1e3799,color:#fff style GCONTENT fill:#6a0572,stroke:#a834a8,color:#fff style DUCKDB_FILE fill:#b71540,stroke:#e55039,color:#fff style LOGS fill:#474787,stroke:#706fd3,color:#fff style CRON fill:#e58e26,stroke:#fa983a,color:#fff style ERR_HANDLER fill:#e55039,stroke:#eb2f06,color:#fff ``` ### ETL-Datenfluss (vereinfacht) ```mermaid flowchart LR A["📄 JTL CSV
200k+ Artikel"] --> C["🦆 DuckDB
LEFT JOIN"] B["☁️ Google API
Sales 90 Tage"] --> C C --> D["⚡ Polars
rules.yaml
Custom Labels
Titel-Optimierung
Preis-Logik
Ausschlüsse
"] D --> E["🛒 Google Content API
Batch-Upload
Rate-Limiting"] style A fill:#079992,stroke:#38ada9,color:#fff style B fill:#079992,stroke:#38ada9,color:#fff style C fill:#0c2461,stroke:#1e3799,color:#fff style D fill:#1e3799,stroke:#4a69bd,color:#fff style E fill:#6a0572,stroke:#a834a8,color:#fff ``` ## 1.2 Datenfluss im Detail Der Datenfluss gliedert sich in drei klar voneinander getrennte Phasen: ### Phase 1 — Extract (Datenextraktion) In der Extract-Phase werden die Rohdaten aus zwei unabhängigen Quellen bezogen: **Quelle A — JTL-WAWI CSV-Export:** Die ERP-Software JTL-WAWI exportiert den vollständigen Produktkatalog als CSV-Datei. Diese Datei enthält alle Stammdaten wie Artikelnummer, Titel, Beschreibung, Preis, EAN, Bestand, Kategorie, Bilder-URLs und weitere produktspezifische Attribute. Der Export wird automatisch durch einen JTL-Worker oder manuell über den JTL-Ameise-Export auf dem Server abgelegt. **Quelle B — Google Ads/Analytics Reporting API:** Über die Google API werden die Verkaufsperformance-Daten der letzten 90 Tage abgerufen. Dazu gehören Metriken wie Verkaufsanzahl pro Artikel (`sales_quantity`), Umsatz pro Artikel (`revenue`), Klicks, Impressionen und Conversion-Rate. Diese Daten dienen als Grundlage für die regelbasierte Klassifizierung (z. B. „Bestseller"-Labels). Beide Datenquellen werden als separate Tabellen in **DuckDB** importiert. ### Phase 2 — Transform (Datentransformation) In der Transform-Phase werden die Daten verknüpft und angereichert: Zunächst wird in **DuckDB** ein `LEFT JOIN` zwischen der Produktstammdaten-Tabelle und der Verkaufsdaten-Tabelle durchgeführt. Die Verknüpfung erfolgt über eine gemeinsame Artikelnummer (z. B. `sku` oder `offer_id`). Das Ergebnis ist eine konsolidierte Tabelle, die sowohl Stamm- als auch Performance-Daten enthält. Der resultierende DataFrame wird anschließend in **Polars** geladen. Hier werden regelbasierte Transformationen auf Grundlage der Datei `rules.yaml` angewandt. Typische Transformationen umfassen: - Zuweisung von Custom Labels (`custom_label_0` bis `custom_label_4`) basierend auf Verkaufszahlen, Marge oder Kategorie. - Preisanpassungen und Sale-Preis-Logik. - Titeloptimierung (z. B. Hinzufügen von Marke oder Farbe). - Filterung von Artikeln, die nicht auf Google Shopping erscheinen sollen (z. B. Bestand = 0). - Anreicherung mit berechneten Feldern (z. B. `margin_percent`, `performance_tier`). ### Phase 3 — Load (Daten-Upload) Die transformierten Produktdaten werden über die **Google Content API for Shopping** als Batch-Request an das Google Merchant Center übertragen. Der Upload-Prozess berücksichtigt dabei die API-Quotas und implementiert ein Rate-Limiting mit exponentiellem Backoff, um `429 Too Many Requests`-Fehler zu vermeiden. Fehlerhafte Einzelprodukte werden in einer separaten Fehler-Log-Datei protokolliert und blockieren nicht den gesamten Upload. --- # 2. Voraussetzungen ## 2.1 Hardware-Anforderungen Die folgenden Spezifikationen gelten für die zuverlässige Verarbeitung von **200.000+ Artikeln** mit jeweils **30+ Attributen**. Die Werte sind konservativ kalkuliert und berücksichtigen die parallele Ausführung von n8n (Docker) und der Python-Pipeline. | Ressource | Minimum | Empfohlen | Begründung | |-------------------|----------------------|------------------------|--------------------------------------------------------------| | **CPU** | 4 Kerne (x86_64) | 8+ Kerne (x86_64) | Polars nutzt Multi-Threading automatisch aus | | **RAM** | 8 GB | 16 GB | DuckDB + Polars halten Daten im Arbeitsspeicher | | **Festplatte** | 40 GB SSD | 100 GB NVMe SSD | DuckDB-Dateien + CSV-Exporte + Logs + Docker-Images | | **Netzwerk** | 100 Mbit/s | 1 Gbit/s | Google API Batch-Uploads erfordern stabile Bandbreite | | **OS** | Debian 11 (Bullseye) | Debian 12 (Bookworm) | Aktuelle LTS-Basis mit langfristigem Sicherheits-Support | ### Speicherberechnung (Orientierungswerte) Für eine Kalkulation mit 200.000 Zeilen × 30 Spalten (durchschnittlich ~100 Bytes pro Zelle): | Komponente | Geschätzter Bedarf | |--------------------------------------|-------------------------| | Rohdaten CSV auf Disk | ca. 550 – 650 MB | | DuckDB komprimiert (auf Disk) | ca. 150 – 250 MB | | Polars DataFrame im RAM | ca. 800 MB – 1,2 GB | | Peak Memory (JOIN + Transformation) | ca. 2,5 – 4 GB | | n8n Docker Container | ca. 300 – 500 MB RAM | | **Gesamt Peak-RAM** | **ca. 4 – 6 GB** | > **Empfehlung:** Bei Datensätzen über 500.000 Artikeln sollte auf 32 GB RAM und Streaming-Verarbeitung (Polars Lazy-Mode) umgestellt werden. ## 2.2 Erforderliche Debian-Pakete Die folgenden Pakete müssen auf dem Debian-Server installiert sein, bevor die Pipeline eingerichtet wird. ```bash # ────────────────────────────────────────────────────────── # System-Updates durchführen # ────────────────────────────────────────────────────────── sudo apt update && sudo apt upgrade -y # ────────────────────────────────────────────────────────── # Grundlegende Build-Tools und System-Abhängigkeiten # ────────────────────────────────────────────────────────── sudo apt install -y \ build-essential \ python3.11 \ python3.11-venv \ python3.11-dev \ python3-pip \ curl \ wget \ git \ jq \ unzip \ ca-certificates \ gnupg \ lsb-release \ cron \ logrotate \ htop \ tree ``` > ⚠️ **Hinweis:** Dies sind Beispielbefehle. Falls Python 3.11 nicht in den Standard-Repositories Ihrer Debian-Version verfügbar ist, muss es über das `deadsnakes`-PPA, einen Quellcode-Build oder das Paket `python3` (in Debian 12 standardmäßig Python 3.11) installiert werden. Passen Sie die Versionsnummern an Ihre Umgebung an. ## 2.3 Software-Voraussetzungen | Software | Mindestversion | Zweck | |---------------------------------|----------------|---------------------------------------------| | Python | 3.11+ | Hauptlaufzeit für ETL-Skripte | | Docker Engine | 24.0+ | Container-Runtime für n8n | | Docker Compose Plugin | 2.20+ | Multi-Container-Orchestrierung | | n8n | 1.30+ | Workflow-Automatisierung und Scheduling | | DuckDB (Python-Modul) | 0.10+ | Analytische In-Process-Datenbank | | Polars (Python-Modul) | 0.20+ | Schnelle DataFrame-Transformationen | | google-api-python-client | 2.100+ | Google API-Zugriff | | google-auth / google-auth-oauthlib | 2.20+ | Authentifizierung für Google APIs | | PyYAML | 6.0+ | Regel-Konfiguration im YAML-Format | --- # 3. Installation & Setup ## 3.1 Docker und n8n einrichten ### 3.1.1 Docker Engine installieren ```bash # ────────────────────────────────────────────────────────── # Offizielle Docker GPG-Schlüssel hinzufügen # ────────────────────────────────────────────────────────── sudo install -m 0755 -d /etc/apt/keyrings curl -fsSL https://download.docker.com/linux/debian/gpg | \ sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg sudo chmod a+r /etc/apt/keyrings/docker.gpg # ────────────────────────────────────────────────────────── # Docker-Repository einrichten # ────────────────────────────────────────────────────────── echo \ "deb [arch=$(dpkg --print-architecture) \ signed-by=/etc/apt/keyrings/docker.gpg] \ https://download.docker.com/linux/debian \ $(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \ sudo tee /etc/apt/sources.list.d/docker.list > /dev/null # ────────────────────────────────────────────────────────── # Docker Engine und Compose Plugin installieren # ────────────────────────────────────────────────────────── sudo apt update sudo apt install -y \ docker-ce \ docker-ce-cli \ containerd.io \ docker-buildx-plugin \ docker-compose-plugin # ────────────────────────────────────────────────────────── # Docker ohne sudo ermöglichen (Relogin erforderlich) # ────────────────────────────────────────────────────────── sudo usermod -aG docker $USER newgrp docker # ────────────────────────────────────────────────────────── # Installation verifizieren # ────────────────────────────────────────────────────────── docker --version docker compose version ``` > ⚠️ **Hinweis:** Dies sind Beispielbefehle basierend auf der offiziellen Docker-Dokumentation für Debian. Prüfen Sie stets die aktuelle Installationsanleitung unter [docs.docker.com](https://docs.docker.com/engine/install/debian/). ### 3.1.2 n8n per Docker Compose bereitstellen Erstellen Sie das Verzeichnis und die Konfiguration: ```bash # ────────────────────────────────────────────────────────── # Verzeichnisstruktur für n8n anlegen # ────────────────────────────────────────────────────────── sudo mkdir -p /opt/n8n/data sudo chown -R $USER:$USER /opt/n8n ``` > ⚠️ **Hinweis:** Beispielbefehl. Passen Sie den Benutzer und die Berechtigungen an Ihre Server-Konfiguration an. Erstellen Sie die Datei `/opt/n8n/.env` mit den Zugangsdaten: ```bash # ────────────────────────────────────────────────────────── # /opt/n8n/.env — Umgebungsvariablen für n8n # ────────────────────────────────────────────────────────── N8N_BASIC_AUTH_USER=admin N8N_BASIC_AUTH_PASSWORD=IhrSicheresPasswortHier123! N8N_ENCRYPTION_KEY=ein-langer-zufaelliger-encryption-key-hier ``` > ⚠️ **Hinweis:** Dies ist eine Beispiel-Konfiguration. Ersetzen Sie alle Platzhalter-Werte durch Ihre eigenen sicheren Passwörter und Schlüssel. Verwenden Sie `openssl rand -hex 32` zur Generierung sicherer Schlüssel. Erstellen Sie die Datei `/opt/n8n/docker-compose.yml`: ```yaml # ────────────────────────────────────────────────────────── # /opt/n8n/docker-compose.yml — n8n Workflow-Engine # ────────────────────────────────────────────────────────── version: "3.8" services: n8n: image: n8nio/n8n:latest container_name: n8n restart: unless-stopped ports: - "5678:5678" environment: - N8N_BASIC_AUTH_ACTIVE=true - N8N_BASIC_AUTH_USER=${N8N_BASIC_AUTH_USER} - N8N_BASIC_AUTH_PASSWORD=${N8N_BASIC_AUTH_PASSWORD} - N8N_ENCRYPTION_KEY=${N8N_ENCRYPTION_KEY} - GENERIC_TIMEZONE=Europe/Berlin - TZ=Europe/Berlin - N8N_LOG_LEVEL=info - N8N_LOG_OUTPUT=console,file - N8N_LOG_FILE_LOCATION=/home/node/.n8n/logs/n8n.log - N8N_DIAGNOSTICS_ENABLED=false - N8N_HIRING_BANNER_ENABLED=false volumes: # n8n-Daten persistent speichern - /opt/n8n/data:/home/node/.n8n # Zugriff auf ETL-Pipeline-Verzeichnis (Read-Only für Skripte) - /opt/etl-pipeline:/opt/etl-pipeline:ro # Zugriff auf ETL-Logs (Read-Write für Monitoring) - /opt/etl-pipeline/logs:/opt/etl-pipeline/logs:rw healthcheck: test: ["CMD-SHELL", "wget -qO- http://localhost:5678/healthz || exit 1"] interval: 30s timeout: 10s retries: 3 start_period: 30s deploy: resources: limits: memory: 1G cpus: "2.0" reservations: memory: 256M ``` > ⚠️ **Hinweis:** Dies ist eine Beispiel-Konfiguration. Die Volume-Pfade, Ressourcen-Limits und Ports müssen an Ihre Server-Umgebung angepasst werden. n8n starten und Status prüfen: ```bash # ────────────────────────────────────────────────────────── # n8n Container starten # ────────────────────────────────────────────────────────── cd /opt/n8n docker compose up -d # Status prüfen docker compose ps docker compose logs -f --tail=50 ``` > ⚠️ **Hinweis:** Beispielbefehle. Nach dem Start ist n8n unter `http://:5678` erreichbar. ## 3.2 Python-Umgebung einrichten ### 3.2.1 Virtual Environment erstellen ```bash # ────────────────────────────────────────────────────────── # Projektverzeichnis anlegen # ────────────────────────────────────────────────────────── sudo mkdir -p /opt/etl-pipeline sudo chown -R $USER:$USER /opt/etl-pipeline cd /opt/etl-pipeline # ────────────────────────────────────────────────────────── # Python Virtual Environment erstellen und aktivieren # ────────────────────────────────────────────────────────── python3.11 -m venv venv source venv/bin/activate # ────────────────────────────────────────────────────────── # pip aktualisieren # ────────────────────────────────────────────────────────── pip install --upgrade pip setuptools wheel ``` > ⚠️ **Hinweis:** Dies sind Beispielbefehle. Stellen Sie sicher, dass `python3.11` auf Ihrem System korrekt installiert ist (siehe Abschnitt 2.2). ### 3.2.2 Abhängigkeiten installieren Erstellen Sie die Datei `/opt/etl-pipeline/requirements.txt`: ```txt # ────────────────────────────────────────────────────────── # /opt/etl-pipeline/requirements.txt # ETL-Pipeline Abhängigkeiten # ────────────────────────────────────────────────────────── # DataFrame-Engine für schnelle Transformationen polars>=0.20.0,<2.0.0 # Analytische In-Process SQL-Datenbank duckdb>=0.10.0,<2.0.0 # Google API Client Libraries google-api-python-client>=2.100.0 google-auth>=2.20.0 google-auth-oauthlib>=1.2.0 google-auth-httplib2>=0.2.0 # Konfiguration PyYAML>=6.0 # Utilities requests>=2.31.0 python-dateutil>=2.8.0 tenacity>=8.2.0 ``` > ⚠️ **Hinweis:** Dies ist eine Beispiel-Datei. Prüfen Sie die aktuellen Versionen der Pakete und passen Sie die Versionsbeschränkungen an Ihre Kompatibilitätsanforderungen an. Abhängigkeiten installieren: ```bash # ────────────────────────────────────────────────────────── # Abhängigkeiten in das Virtual Environment installieren # ────────────────────────────────────────────────────────── cd /opt/etl-pipeline source venv/bin/activate pip install -r requirements.txt # Installation verifizieren python -c "import polars; print(f'Polars: {polars.__version__}')" python -c "import duckdb; print(f'DuckDB: {duckdb.__version__}')" python -c "import googleapiclient; print('Google API Client: OK')" python -c "import yaml; print(f'PyYAML: {yaml.__version__}')" ``` > ⚠️ **Hinweis:** Beispielbefehle zur Verifikation. Die Ausgaben variieren je nach installierten Versionen. --- # 4. Projektstruktur ## 4.1 Empfohlene Verzeichnisstruktur Die folgende Struktur organisiert alle Komponenten der Pipeline in logisch getrennte Verzeichnisse. Diese Trennung erleichtert Wartung, Backups und Zugriffssteuerung. ``` /opt/etl-pipeline/ │ ├── venv/ # Python Virtual Environment │ └── ... # (von venv automatisch verwaltet) │ ├── requirements.txt # Python-Abhängigkeiten │ ├── scripts/ # Alle Python-Skripte │ ├── __init__.py │ ├── main.py # Hauptskript (Entry Point) │ ├── extract.py # Modul: Datenextraktion (CSV + Google API) │ ├── transform.py # Modul: DuckDB JOIN + Polars Transformationen │ ├── load.py # Modul: Google Content API Upload │ ├── rules_engine.py # Modul: YAML-Regel-Engine │ └── utils.py # Hilfsfunktionen (Logging, Config-Laden) │ ├── config/ # Konfigurationsdateien │ ├── rules.yaml # Channable-ähnliche Transformationsregeln │ ├── settings.yaml # Allgemeine Pipeline-Einstellungen │ └── column_mapping.yaml # Spalten-Mapping JTL → Google Shopping │ ├── credentials/ # Sensible Zugangsdaten (restriktive Rechte!) │ ├── google_service_account.json # Google Service Account Key │ └── .gitignore # NIEMALS in Git committen! │ ├── data/ # Datendateien │ ├── input/ # Eingangsdaten │ │ └── jtl_export.csv # Aktueller JTL-WAWI CSV-Export │ ├── output/ # Verarbeitete Ausgabedaten │ │ └── google_feed_YYYYMMDD.csv │ ├── archive/ # Archivierte ältere Exporte │ │ └── jtl_export_20260301.csv │ └── db/ # DuckDB-Datenbankdateien │ └── products.duckdb # Persistente DuckDB-Datenbank │ ├── logs/ # Log-Dateien │ ├── etl_pipeline.log # Haupt-Log (rotating) │ ├── etl_pipeline.log.1 # Rotiertes Log │ ├── error.log # Nur Fehler-Einträge │ └── upload_errors/ # Detaillierte Upload-Fehler pro Lauf │ └── errors_20260306.json │ ├── tests/ # Unit- und Integrationstests │ ├── __init__.py │ ├── test_extract.py │ ├── test_transform.py │ └── test_rules_engine.py │ └── docs/ # Zusätzliche Projektdokumentation └── CHANGELOG.md ``` > ⚠️ **Hinweis:** Dies ist eine Beispiel-Struktur. Passen Sie die Verzeichnisstruktur an die spezifischen Anforderungen Ihres Projekts und Ihrer Organisation an. ## 4.2 Verzeichnisstruktur anlegen ```bash # ────────────────────────────────────────────────────────── # Vollständige Verzeichnisstruktur anlegen # ────────────────────────────────────────────────────────── cd /opt/etl-pipeline mkdir -p scripts mkdir -p config mkdir -p credentials mkdir -p data/{input,output,archive,db} mkdir -p logs/upload_errors mkdir -p tests mkdir -p docs # ────────────────────────────────────────────────────────── # Berechtigungen für sensible Verzeichnisse setzen # ────────────────────────────────────────────────────────── chmod 700 credentials/ chmod 750 config/ chmod 755 logs/ # ────────────────────────────────────────────────────────── # .gitignore für credentials anlegen # ────────────────────────────────────────────────────────── echo "*" > credentials/.gitignore echo "!.gitignore" >> credentials/.gitignore # ────────────────────────────────────────────────────────── # Python-Modul-Initialisierung # ────────────────────────────────────────────────────────── touch scripts/__init__.py touch tests/__init__.py ``` > ⚠️ **Hinweis:** Beispielbefehle. Stellen Sie sicher, dass der Benutzer, unter dem die Pipeline läuft, die entsprechenden Lese- und Schreibrechte auf allen Verzeichnissen besitzt. --- # 5. Konfiguration ## 5.1 Google API Credentials sicher speichern ### 5.1.1 Service Account erstellen Für die Kommunikation mit der Google Content API for Shopping wird ein **Service Account** empfohlen. Dieser ermöglicht eine automatisierte Authentifizierung ohne manuellen OAuth2-Flow. **Schritte in der Google Cloud Console:** 1. Navigieren Sie zur [Google Cloud Console](https://console.cloud.google.com/). 2. Erstellen Sie ein neues Projekt oder wählen Sie ein bestehendes aus. 3. Aktivieren Sie die folgenden APIs: - *Content API for Shopping* - *Google Ads API* (falls Verkaufsdaten direkt aus Google Ads bezogen werden) - *Google Analytics Data API* (falls GA4-Daten verwendet werden) 4. Navigieren Sie zu **IAM & Verwaltung → Dienstkonten**. 5. Erstellen Sie einen neuen Service Account mit einem aussagekräftigen Namen. 6. Laden Sie den JSON-Key herunter. 7. Fügen Sie den Service Account als Nutzer in Ihrem **Google Merchant Center** hinzu (mit der Rolle „Standard" oder „Admin"). ### 5.1.2 Credentials sicher ablegen ```bash # ────────────────────────────────────────────────────────── # Service Account JSON-Key sicher auf dem Server ablegen # ────────────────────────────────────────────────────────── # Datei in das credentials-Verzeichnis kopieren cp /pfad/zur/heruntergeladenen/datei.json \ /opt/etl-pipeline/credentials/google_service_account.json # Restriktive Berechtigungen setzen (nur Owner lesen) chmod 600 /opt/etl-pipeline/credentials/google_service_account.json # Eigentümerschaft prüfen ls -la /opt/etl-pipeline/credentials/ ``` > ⚠️ **Hinweis:** Beispielbefehle. Ersetzen Sie den Pfad durch den tatsächlichen Speicherort Ihrer heruntergeladenen JSON-Datei. ### 5.1.3 Credentials als Umgebungsvariable referenzieren (empfohlen) Es ist empfehlenswert, den Pfad zur Credentials-Datei nicht hart im Skript zu kodieren, sondern über eine Umgebungsvariable bereitzustellen. ```bash # ────────────────────────────────────────────────────────── # In /opt/etl-pipeline/.env oder ~/.bashrc hinzufügen: # ────────────────────────────────────────────────────────── export GOOGLE_APPLICATION_CREDENTIALS="/opt/etl-pipeline/credentials/google_service_account.json" export MERCHANT_CENTER_ID="123456789" ``` > ⚠️ **Hinweis:** Beispielkonfiguration. Ersetzen Sie die `MERCHANT_CENTER_ID` durch Ihre tatsächliche Merchant-Center-ID. ### 5.1.4 Sicherheitshinweise - Die JSON-Datei darf **niemals** in ein Git-Repository committed werden. - Der Zugriff auf das `credentials/`-Verzeichnis sollte auf den Pipeline-Benutzer beschränkt sein (`chmod 700`). - Rotieren Sie Service Account Keys regelmäßig (mindestens alle 90 Tage). - Verwenden Sie nach Möglichkeit **Workload Identity Federation** anstelle von heruntergeladenen JSON-Keys. - Protokollieren Sie niemals den Inhalt der Credentials-Datei in Log-Dateien. ## 5.2 Regel-Konfiguration (rules.yaml) Die Datei `rules.yaml` ist das Herzstück der Channable-ähnlichen Funktionalität. Sie definiert regelbasierte Transformationen, die auf jeden Artikel angewandt werden. Die Regeln werden in der definierten Reihenfolge sequenziell abgearbeitet. ### 5.2.1 Struktur und Syntax Erstellen Sie die Datei `/opt/etl-pipeline/config/rules.yaml`: ```yaml # ══════════════════════════════════════════════════════════ # /opt/etl-pipeline/config/rules.yaml # # Channable-ähnliche Regel-Engine für Google Shopping # Regeln werden sequenziell von oben nach unten abgearbeitet. # # Unterstützte Operatoren: # Vergleich: >, <, >=, <=, ==, != # Text: contains, not_contains, starts_with, ends_with # Logik: and, or (innerhalb von conditions) # Existenz: is_empty, is_not_empty # # Unterstützte Aktionen: # set_value: Festes Wert setzen # copy_field: Wert aus anderem Feld kopieren # prepend: Text am Anfang hinzufügen # append: Text am Ende hinzufügen # replace: Text ersetzen # calculate: Berechnung durchführen # exclude: Artikel vom Feed ausschließen # map_category: Kategorie-Mapping anwenden # ══════════════════════════════════════════════════════════ # ────────────────────────────────────────────────────────── # Globale Einstellungen # ────────────────────────────────────────────────────────── settings: feed_name: "Google Shopping DE" target_country: "DE" content_language: "de" currency: "EUR" # Artikel mit Bestand 0 automatisch ausschließen exclude_out_of_stock: true # Mindestpreis für den Feed (in EUR) minimum_price: 1.00 # ────────────────────────────────────────────────────────── # Regel-Definitionen # ────────────────────────────────────────────────────────── rules: # ──────────────────────────────────────────────────────── # REGEL 1: Bestseller-Label basierend auf Verkaufszahlen # ──────────────────────────────────────────────────────── - name: "Bestseller-Klassifizierung" description: "Artikel mit mehr als 100 Verkäufen in 90 Tagen als Bestseller markieren" enabled: true priority: 10 conditions: logic: "and" rules: - field: "sales_quantity_90d" operator: ">" value: 100 - field: "stock_quantity" operator: ">" value: 0 actions: - target_field: "custom_label_0" action: "set_value" value: "Bestseller" # ──────────────────────────────────────────────────────── # REGEL 2: Slow Mover identifizieren # ──────────────────────────────────────────────────────── - name: "Slow-Mover-Klassifizierung" description: "Artikel mit weniger als 5 Verkäufen in 90 Tagen als Slow Mover markieren" enabled: true priority: 20 conditions: logic: "and" rules: - field: "sales_quantity_90d" operator: "<" value: 5 - field: "days_in_catalog" operator: ">" value: 30 actions: - target_field: "custom_label_0" action: "set_value" value: "Slow Mover" # ──────────────────────────────────────────────────────── # REGEL 3: Performance-Tier (custom_label_1) # ──────────────────────────────────────────────────────── - name: "Performance-Tier High" description: "Umsatzbasierte Tier-Einteilung für Bidding-Strategien" enabled: true priority: 30 conditions: logic: "and" rules: - field: "revenue_90d" operator: ">=" value: 1000 actions: - target_field: "custom_label_1" action: "set_value" value: "Tier-1-High-Revenue" - name: "Performance-Tier Mittel" enabled: true priority: 31 conditions: logic: "and" rules: - field: "revenue_90d" operator: ">=" value: 200 - field: "revenue_90d" operator: "<" value: 1000 actions: - target_field: "custom_label_1" action: "set_value" value: "Tier-2-Mid-Revenue" - name: "Performance-Tier Niedrig" enabled: true priority: 32 conditions: logic: "and" rules: - field: "revenue_90d" operator: "<" value: 200 actions: - target_field: "custom_label_1" action: "set_value" value: "Tier-3-Low-Revenue" # ──────────────────────────────────────────────────────── # REGEL 4: Margen-Klassifizierung (custom_label_2) # ──────────────────────────────────────────────────────── - name: "Hohe Marge" enabled: true priority: 40 conditions: logic: "and" rules: - field: "margin_percent" operator: ">=" value: 40 actions: - target_field: "custom_label_2" action: "set_value" value: "High-Margin" - name: "Niedrige Marge" enabled: true priority: 41 conditions: logic: "and" rules: - field: "margin_percent" operator: "<" value: 15 actions: - target_field: "custom_label_2" action: "set_value" value: "Low-Margin" # ──────────────────────────────────────────────────────── # REGEL 5: Preiskategorie (custom_label_3) # ──────────────────────────────────────────────────────── - name: "Premium-Preis" enabled: true priority: 50 conditions: logic: "and" rules: - field: "price" operator: ">=" value: 100 actions: - target_field: "custom_label_3" action: "set_value" value: "Premium" - name: "Budget-Preis" enabled: true priority: 51 conditions: logic: "and" rules: - field: "price" operator: "<" value: 20 actions: - target_field: "custom_label_3" action: "set_value" value: "Budget" # ──────────────────────────────────────────────────────── # REGEL 6: Titel-Optimierung # ──────────────────────────────────────────────────────── - name: "Marke im Titel voranstellen" description: "Markenname am Anfang des Titels hinzufügen, falls nicht vorhanden" enabled: true priority: 60 conditions: logic: "and" rules: - field: "brand" operator: "is_not_empty" - field: "title" operator: "not_contains" value_from_field: "brand" actions: - target_field: "title" action: "prepend" value_template: "{brand} - " # ──────────────────────────────────────────────────────── # REGEL 7: Sale-Preis-Logik # ──────────────────────────────────────────────────────── - name: "Sale-Preis setzen" description: "Wenn ein UVP vorhanden und höher als der VK-Preis ist, als Sale kennzeichnen" enabled: true priority: 70 conditions: logic: "and" rules: - field: "uvp" operator: "is_not_empty" - field: "uvp" operator: ">" value_from_field: "price" actions: - target_field: "sale_price" action: "copy_field" source_field: "price" - target_field: "price" action: "copy_field" source_field: "uvp" # ──────────────────────────────────────────────────────── # REGEL 8: Ausschluss-Regeln # ──────────────────────────────────────────────────────── - name: "Artikel ohne Bild ausschließen" enabled: true priority: 80 conditions: logic: "or" rules: - field: "image_url" operator: "is_empty" - field: "image_url" operator: "==" value: "" actions: - action: "exclude" reason: "Kein Produktbild vorhanden" - name: "Testprodukte ausschließen" enabled: true priority: 81 conditions: logic: "or" rules: - field: "title" operator: "contains" value: "TEST" - field: "sku" operator: "starts_with" value: "ZZ-" actions: - action: "exclude" reason: "Testprodukt erkannt" # ──────────────────────────────────────────────────────── # REGEL 9: Versandkosten basierend auf Preis # ──────────────────────────────────────────────────────── - name: "Kostenloser Versand ab 49 EUR" enabled: true priority: 90 conditions: logic: "and" rules: - field: "price" operator: ">=" value: 49 actions: - target_field: "shipping_cost" action: "set_value" value: "0.00" - name: "Standard-Versandkosten" enabled: true priority: 91 conditions: logic: "and" rules: - field: "price" operator: "<" value: 49 actions: - target_field: "shipping_cost" action: "set_value" value: "4.95" ``` > ⚠️ **Hinweis:** Dies ist eine Beispiel-Konfigurationsdatei. Alle Schwellenwerte, Feldnamen und Regeln müssen an Ihre tatsächlichen Geschäftsanforderungen, JTL-Exportfelder und Google-Shopping-Attribute angepasst werden. ### 5.2.2 Spalten-Mapping (column_mapping.yaml) Erstellen Sie zusätzlich die Datei `/opt/etl-pipeline/config/column_mapping.yaml`: ```yaml # ══════════════════════════════════════════════════════════ # /opt/etl-pipeline/config/column_mapping.yaml # # Mapping: JTL-WAWI Exportfeld → Google Shopping Attribut # ══════════════════════════════════════════════════════════ mapping: # Pflichtfelder Google Shopping Artikelnummer: "offerId" Artikelname: "title" Beschreibung: "description" URL: "link" BildURL: "imageLink" VK_Brutto: "price" Waehrung: "priceCurrency" Verfuegbarkeit: "availability" Marke: "brand" GTIN: "gtin" MPN: "mpn" Zustand: "condition" Kategorie_Google: "googleProductCategory" # Optionale Felder Farbe: "color" Groesse: "size" Material: "material" Geschlecht: "gender" Altersgruppe: "ageGroup" EK_Netto: "cost_of_goods_sold" UVP: "uvp" Lagerbestand: "stock_quantity" # Berechnete Felder (werden durch Pipeline erzeugt) # sales_quantity_90d → aus Google API # revenue_90d → aus Google API # margin_percent → berechnet aus VK und EK # custom_label_0-4 → aus rules.yaml ``` > ⚠️ **Hinweis:** Dies ist eine Beispiel-Zuordnung. Die tatsächlichen Spaltennamen in Ihrem JTL-WAWI-Export können abweichen. Prüfen Sie die Header-Zeile Ihrer CSV-Datei und passen Sie das Mapping entsprechend an. --- # 6. Core Script Logic (Beispiel) Dieses Kapitel zeigt den konzeptionellen Aufbau der Python-Skripte. Der Code ist als **funktionales Boilerplate** zu verstehen, das als Grundlage für die eigene Implementierung dient. ## 6.1 Hilfsfunktionen (utils.py) ```python #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ /opt/etl-pipeline/scripts/utils.py Hilfsfunktionen für Logging, Konfiguration und gemeinsam genutzte Utilities. """ import os import sys import yaml import logging from pathlib import Path from datetime import datetime from logging.handlers import RotatingFileHandler # ────────────────────────────────────────────────────────── # Pfad-Konstanten # ────────────────────────────────────────────────────────── BASE_DIR = Path("/opt/etl-pipeline") CONFIG_DIR = BASE_DIR / "config" DATA_DIR = BASE_DIR / "data" LOG_DIR = BASE_DIR / "logs" CREDENTIALS_DIR = BASE_DIR / "credentials" def setup_logging( log_name: str = "etl_pipeline", log_level: int = logging.INFO, max_bytes: int = 10 * 1024 * 1024, # 10 MB backup_count: int = 5, ) -> logging.Logger: """ Konfiguriert das Logging mit Rotation und separatem Error-Log. Args: log_name: Name des Loggers und der Log-Datei. log_level: Logging-Level (default: INFO). max_bytes: Maximale Größe einer Log-Datei vor Rotation. backup_count: Anzahl der aufzubewahrenden rotierten Log-Dateien. Returns: Konfigurierter Logger. """ LOG_DIR.mkdir(parents=True, exist_ok=True) logger = logging.getLogger(log_name) logger.setLevel(log_level) # Verhindere doppelte Handler bei mehrfachem Aufruf if logger.handlers: return logger # Formatter formatter = logging.Formatter( fmt="%(asctime)s | %(levelname)-8s | %(name)s | %(funcName)s:%(lineno)d | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) # Haupt-Log (Rotation) main_handler = RotatingFileHandler( LOG_DIR / f"{log_name}.log", maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8", ) main_handler.setLevel(log_level) main_handler.setFormatter(formatter) # Error-Log (nur ERROR und höher) error_handler = RotatingFileHandler( LOG_DIR / "error.log", maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8", ) error_handler.setLevel(logging.ERROR) error_handler.setFormatter(formatter) # Console-Handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(log_level) console_handler.setFormatter(formatter) logger.addHandler(main_handler) logger.addHandler(error_handler) logger.addHandler(console_handler) return logger def load_yaml_config(config_filename: str) -> dict: """ Lädt eine YAML-Konfigurationsdatei aus dem config-Verzeichnis. Args: config_filename: Name der YAML-Datei (z. B. 'rules.yaml'). Returns: Dictionary mit dem Inhalt der YAML-Datei. Raises: FileNotFoundError: Wenn die Datei nicht existiert. yaml.YAMLError: Wenn die YAML-Syntax ungültig ist. """ config_path = CONFIG_DIR / config_filename if not config_path.exists(): raise FileNotFoundError( f"Konfigurationsdatei nicht gefunden: {config_path}" ) with open(config_path, "r", encoding="utf-8") as f: config = yaml.safe_load(f) return config def get_timestamp() -> str: """Gibt einen formatierten Zeitstempel zurück (YYYYMMDD_HHMMSS).""" return datetime.now().strftime("%Y%m%d_%H%M%S") def get_date_stamp() -> str: """Gibt einen formatierten Datumsstempel zurück (YYYYMMDD).""" return datetime.now().strftime("%Y%m%d") ``` > ⚠️ **Hinweis:** Dies ist ein Beispiel-Skript. Alle Pfade, Log-Konfigurationen und Funktionen müssen an Ihre tatsächliche Serverumgebung und Anforderungen angepasst werden. ## 6.2 Extract-Modul (extract.py) ```python #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ /opt/etl-pipeline/scripts/extract.py Modul für die Datenextraktion aus JTL-WAWI CSV und Google APIs. Beide Datenquellen werden als Tabellen in DuckDB importiert. """ import os import shutil import duckdb import polars as pl from pathlib import Path from datetime import datetime, timedelta from google.oauth2 import service_account from googleapiclient.discovery import build from scripts.utils import ( setup_logging, BASE_DIR, DATA_DIR, CREDENTIALS_DIR, get_date_stamp, ) logger = setup_logging("extract") # ══════════════════════════════════════════════════════════ # JTL-WAWI CSV-Import # ══════════════════════════════════════════════════════════ def load_jtl_csv_to_duckdb( con: duckdb.DuckDBPyConnection, csv_path: str | Path | None = None, table_name: str = "products_raw", ) -> int: """ Liest die JTL-WAWI CSV-Exportdatei und importiert sie als Tabelle in die DuckDB-Datenbank. Args: con: Aktive DuckDB-Verbindung. csv_path: Pfad zur CSV-Datei. Falls None, wird der Standardpfad verwendet. table_name: Name der Zieltabelle in DuckDB. Returns: Anzahl der importierten Zeilen. """ if csv_path is None: csv_path = DATA_DIR / "input" / "jtl_export.csv" csv_path = Path(csv_path) if not csv_path.exists(): raise FileNotFoundError(f"JTL-CSV nicht gefunden: {csv_path}") file_size_mb = csv_path.stat().st_size / (1024 * 1024) logger.info(f"Lade JTL-CSV: {csv_path} ({file_size_mb:.1f} MB)") # Bestehende Tabelle löschen und neu erstellen con.execute(f"DROP TABLE IF EXISTS {table_name}") # CSV mit DuckDB's optimiertem CSV-Reader importieren # auto_detect=true erkennt Trennzeichen, Encoding und Datentypen con.execute(f""" CREATE TABLE {table_name} AS SELECT * FROM read_csv_auto( '{csv_path}', header = true, delim = ';', quote = '"', escape = '"', encoding = 'utf-8', sample_size = 10000, ignore_errors = false ) """) row_count = con.execute( f"SELECT COUNT(*) FROM {table_name}" ).fetchone()[0] logger.info( f"JTL-CSV erfolgreich geladen: " f"{row_count:,} Zeilen in Tabelle '{table_name}'" ) # Archivierung des Imports archive_path = DATA_DIR / "archive" / f"jtl_export_{get_date_stamp()}.csv" if not archive_path.exists(): shutil.copy2(csv_path, archive_path) logger.info(f"CSV archiviert: {archive_path}") return row_count # ══════════════════════════════════════════════════════════ # Google Sales-Daten abrufen # ══════════════════════════════════════════════════════════ def fetch_google_sales_data( con: duckdb.DuckDBPyConnection, table_name: str = "sales_90d", lookback_days: int = 90, ) -> int: """ Ruft Verkaufsdaten der letzten N Tage über die Google API ab und importiert sie als Tabelle in DuckDB. Args: con: Aktive DuckDB-Verbindung. table_name: Name der Zieltabelle in DuckDB. lookback_days: Anzahl der zurückliegenden Tage (Standard: 90). Returns: Anzahl der importierten Zeilen. """ logger.info( f"Rufe Google Sales-Daten ab (letzte {lookback_days} Tage)..." ) # ── Google API Authentifizierung ────────────────────── credentials_path = os.environ.get( "GOOGLE_APPLICATION_CREDENTIALS", str(CREDENTIALS_DIR / "google_service_account.json"), ) merchant_id = os.environ.get("MERCHANT_CENTER_ID") if not merchant_id: raise ValueError( "Umgebungsvariable MERCHANT_CENTER_ID ist nicht gesetzt" ) credentials = service_account.Credentials.from_service_account_file( credentials_path, scopes=["https://www.googleapis.com/auth/content"], ) service = build("content", "v2.1", credentials=credentials) # ── Zeitraum berechnen ──────────────────────────────── end_date = datetime.now() start_date = end_date - timedelta(days=lookback_days) # ── Performance-Daten aus dem Merchant Center abrufen ─ # HINWEIS: Der tatsächliche API-Aufruf hängt von der # verwendeten Google API ab (Merchant Center Reports, # Google Ads API, GA4 Data API, etc.) # Hier wird ein konzeptionelles Beispiel gezeigt. sales_records = [] try: # Beispiel: Merchant Center Performance Report request_body = { "query": f""" SELECT segments.offer_id, metrics.impressions, metrics.clicks, metrics.conversions, metrics.conversion_value FROM MerchantPerformanceView WHERE segments.date BETWEEN '{start_date.strftime('%Y-%m-%d')}' AND '{end_date.strftime('%Y-%m-%d')}' """ } response = ( service.reports() .search(merchantId=merchant_id, body=request_body) .execute() ) for row in response.get("results", []): segments = row.get("segments", {}) metrics = row.get("metrics", {}) sales_records.append({ "offer_id": segments.get("offerId", ""), "impressions": int(metrics.get("impressions", 0)), "clicks": int(metrics.get("clicks", 0)), "sales_quantity_90d": int( metrics.get("conversions", 0) ), "revenue_90d": float( metrics.get("conversionValue", 0.0) ), }) except Exception as e: logger.error( f"Fehler beim Abrufen der Google Sales-Daten: {e}" ) raise # ── Daten in DuckDB importieren ─────────────────────── if sales_records: df_sales = pl.DataFrame(sales_records) con.execute(f"DROP TABLE IF EXISTS {table_name}") con.execute( f"CREATE TABLE {table_name} AS SELECT * FROM df_sales" ) row_count = con.execute( f"SELECT COUNT(*) FROM {table_name}" ).fetchone()[0] logger.info( f"Google Sales-Daten geladen: " f"{row_count:,} Zeilen in '{table_name}'" ) return row_count else: logger.warning( "Keine Sales-Daten von der Google API erhalten." ) # Leere Tabelle erstellen, damit der JOIN nicht fehlschlägt con.execute(f"DROP TABLE IF EXISTS {table_name}") con.execute(f""" CREATE TABLE {table_name} ( offer_id VARCHAR, impressions INTEGER DEFAULT 0, clicks INTEGER DEFAULT 0, sales_quantity_90d INTEGER DEFAULT 0, revenue_90d DOUBLE DEFAULT 0.0 ) """) return 0 ``` > ⚠️ **Hinweis:** Dies ist ein konzeptionelles Beispiel-Skript. Die Google API-Aufrufe, insbesondere der Report-Query und die Antwortstruktur, müssen an die tatsächlich verwendete API-Version und Ihr Merchant-Center-Setup angepasst werden. Konsultieren Sie die offizielle Google API-Dokumentation für die aktuellen Endpunkte und Antwortformate. ## 6.3 Transform-Modul (transform.py) ```python #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ /opt/etl-pipeline/scripts/transform.py Modul für die Datentransformation: 1. LEFT JOIN in DuckDB (Produkte ⟕ Sales) 2. Polars-Transformationen basierend auf rules.yaml """ import duckdb import polars as pl from typing import Any from scripts.utils import setup_logging, load_yaml_config logger = setup_logging("transform") # ══════════════════════════════════════════════════════════ # DuckDB LEFT JOIN # ══════════════════════════════════════════════════════════ def join_products_with_sales( con: duckdb.DuckDBPyConnection, products_table: str = "products_raw", sales_table: str = "sales_90d", joined_table: str = "products_enriched", join_key_products: str = "Artikelnummer", join_key_sales: str = "offer_id", ) -> pl.DataFrame: """ Führt einen LEFT JOIN zwischen Produktstammdaten und Verkaufsdaten in DuckDB durch und gibt das Ergebnis als Polars DataFrame zurück. Ein LEFT JOIN stellt sicher, dass alle Produkte erhalten bleiben, auch wenn keine Verkaufsdaten vorliegen (NULL-Werte werden mit 0 gefüllt). Args: con: Aktive DuckDB-Verbindung. products_table: Name der Produkttabelle. sales_table: Name der Verkaufstabelle. joined_table: Name der Ergebnistabelle. join_key_products: JOIN-Schlüssel in der Produkttabelle. join_key_sales: JOIN-Schlüssel in der Verkaufstabelle. Returns: Polars DataFrame mit den verknüpften Daten. """ logger.info( f"Führe LEFT JOIN durch: {products_table} ⟕ {sales_table} " f"ON {join_key_products} = {join_key_sales}" ) # ── LEFT JOIN mit COALESCE für NULL-Behandlung ──────── con.execute(f"DROP TABLE IF EXISTS {joined_table}") con.execute(f""" CREATE TABLE {joined_table} AS SELECT p.*, COALESCE(s.impressions, 0) AS impressions, COALESCE(s.clicks, 0) AS clicks, COALESCE(s.sales_quantity_90d, 0) AS sales_quantity_90d, COALESCE(s.revenue_90d, 0.0) AS revenue_90d FROM {products_table} AS p LEFT JOIN {sales_table} AS s ON CAST(p."{join_key_products}" AS VARCHAR) = CAST(s.{join_key_sales} AS VARCHAR) """) row_count = con.execute( f"SELECT COUNT(*) FROM {joined_table}" ).fetchone()[0] logger.info( f"LEFT JOIN abgeschlossen: " f"{row_count:,} Zeilen in '{joined_table}'" ) # ── Ergebnis als Polars DataFrame exportieren ───────── result = con.execute(f"SELECT * FROM {joined_table}").pl() logger.info( f"Polars DataFrame erstellt: " f"{result.shape[0]:,} Zeilen, {result.shape[1]} Spalten" ) return result # ══════════════════════════════════════════════════════════ # Berechnete Felder hinzufügen # ══════════════════════════════════════════════════════════ def add_calculated_fields(df: pl.DataFrame) -> pl.DataFrame: """ Fügt berechnete Felder hinzu, die als Grundlage für die Regel-Engine dienen. Args: df: Polars DataFrame mit den verknüpften Rohdaten. Returns: Polars DataFrame mit zusätzlichen berechneten Spalten. """ logger.info("Füge berechnete Felder hinzu...") df = df.with_columns([ # Marge in Prozent (wenn EK vorhanden) pl.when( (pl.col("EK_Netto").is_not_null()) & (pl.col("VK_Brutto") > 0) ) .then( ( (pl.col("VK_Brutto") - pl.col("EK_Netto")) / pl.col("VK_Brutto") * 100 ).round(2) ) .otherwise(pl.lit(None)) .alias("margin_percent"), # Click-Through-Rate pl.when(pl.col("impressions") > 0) .then( (pl.col("clicks") / pl.col("impressions") * 100) .round(2) ) .otherwise(pl.lit(0.0)) .alias("ctr_percent"), # Conversion-Rate pl.when(pl.col("clicks") > 0) .then( ( pl.col("sales_quantity_90d") / pl.col("clicks") * 100 ).round(2) ) .otherwise(pl.lit(0.0)) .alias("conversion_rate_percent"), # Custom Label Spalten initialisieren (leer) pl.lit("").alias("custom_label_0"), pl.lit("").alias("custom_label_1"), pl.lit("").alias("custom_label_2"), pl.lit("").alias("custom_label_3"), pl.lit("").alias("custom_label_4"), # Versandkosten initialisieren pl.lit("4.95").alias("shipping_cost"), # Exclude-Flag initialisieren pl.lit(False).alias("_excluded"), pl.lit("").alias("_exclude_reason"), ]) logger.info( f"Berechnete Felder hinzugefügt. " f"DataFrame: {df.shape[1]} Spalten" ) return df # ══════════════════════════════════════════════════════════ # YAML Regel-Engine # ══════════════════════════════════════════════════════════ def evaluate_condition( df: pl.DataFrame, condition: dict, ) -> pl.Series: """ Evaluiert eine einzelne Bedingung und gibt eine boolesche Maske zurück. Args: df: Polars DataFrame. condition: Dictionary mit field, operator, value. Returns: Polars Series (Boolean) als Filtermaske. """ field = condition["field"] operator = condition["operator"] value = condition.get("value") value_from_field = condition.get("value_from_field") # Prüfen ob das Feld existiert if field not in df.columns: logger.warning( f"Feld '{field}' existiert nicht im DataFrame. " f"Bedingung übersprungen." ) return pl.Series("mask", [False] * df.height) col = pl.col(field) # Vergleichswert: fester Wert oder aus anderem Feld if value_from_field: compare_value = pl.col(value_from_field) else: compare_value = value # ── Operator-Mapping ────────────────────────────────── operator_map = { ">": col > compare_value, "<": col < compare_value, ">=": col >= compare_value, "<=": col <= compare_value, "==": col == compare_value, "!=": col != compare_value, "contains": col.cast(pl.Utf8).str.contains( str(compare_value), literal=True ), "not_contains": ~col.cast(pl.Utf8).str.contains( str(compare_value), literal=True ), "starts_with": col.cast(pl.Utf8).str.starts_with( str(compare_value) ), "ends_with": col.cast(pl.Utf8).str.ends_with( str(compare_value) ), "is_empty": ( col.is_null() | (col.cast(pl.Utf8) == "") ), "is_not_empty": ( col.is_not_null() & (col.cast(pl.Utf8) != "") ), } if operator not in operator_map: logger.error(f"Unbekannter Operator: '{operator}'") return pl.Series("mask", [False] * df.height) return df.select(operator_map[operator]).to_series() def apply_rules( df: pl.DataFrame, rules_config: dict, ) -> pl.DataFrame: """ Wendet alle aktivierten Regeln aus der rules.yaml auf den DataFrame an. Regeln werden nach Priorität sortiert und sequenziell abgearbeitet. Args: df: Polars DataFrame mit den angereicherten Produktdaten. rules_config: Geladene rules.yaml als Dictionary. Returns: Polars DataFrame nach Anwendung aller Regeln. """ rules = rules_config.get("rules", []) settings = rules_config.get("settings", {}) # Regeln nach Priorität sortieren # (niedrigere Zahl = höhere Priorität) rules_sorted = sorted( [r for r in rules if r.get("enabled", True)], key=lambda r: r.get("priority", 999), ) logger.info(f"Wende {len(rules_sorted)} aktive Regeln an...") # ── Globale Ausschlüsse (Settings) ──────────────────── if settings.get("exclude_out_of_stock", False): df = df.with_columns( pl.when(pl.col("stock_quantity") <= 0) .then(pl.lit(True)) .otherwise(pl.col("_excluded")) .alias("_excluded") ) excluded = df.filter(pl.col("_excluded")).height logger.info( f"Globaler Ausschluss (Bestand=0): " f"{excluded:,} Artikel markiert" ) min_price = settings.get("minimum_price", 0) if min_price > 0: df = df.with_columns( pl.when(pl.col("VK_Brutto") < min_price) .then(pl.lit(True)) .otherwise(pl.col("_excluded")) .alias("_excluded") ) # ── Regeln sequenziell anwenden ─────────────────────── for rule in rules_sorted: rule_name = rule.get("name", "Unbenannt") conditions_config = rule.get("conditions", {}) actions = rule.get("actions", []) logic = conditions_config.get("logic", "and") sub_rules = conditions_config.get("rules", []) logger.debug( f"Verarbeite Regel: '{rule_name}' " f"(Priorität: {rule.get('priority', '?')})" ) # Alle Teilbedingungen evaluieren masks = [ evaluate_condition(df, cond) for cond in sub_rules ] if not masks: logger.warning( f"Regel '{rule_name}' hat keine Bedingungen. " f"Übersprungen." ) continue # Logische Verknüpfung der Teilbedingungen if logic == "and": combined_mask = masks[0] for m in masks[1:]: combined_mask = combined_mask & m elif logic == "or": combined_mask = masks[0] for m in masks[1:]: combined_mask = combined_mask | m else: logger.error( f"Unbekannte Logik '{logic}' in Regel " f"'{rule_name}'. Übersprungen." ) continue matching_count = combined_mask.sum() logger.info( f"Regel '{rule_name}': " f"{matching_count:,} Artikel treffen zu" ) # Aktionen auf übereinstimmende Zeilen anwenden for action_def in actions: action_type = action_def.get("action", "set_value") target_field = action_def.get("target_field", "") if action_type == "set_value": value = action_def.get("value", "") df = df.with_columns( pl.when(combined_mask) .then(pl.lit(value)) .otherwise(pl.col(target_field)) .alias(target_field) ) elif action_type == "copy_field": source = action_def.get("source_field", "") if source in df.columns: df = df.with_columns( pl.when(combined_mask) .then(pl.col(source)) .otherwise(pl.col(target_field)) .alias(target_field) ) elif action_type == "prepend": template = action_def.get( "value_template", "" ) for col_name in df.columns: placeholder = f"{{{col_name}}}" if placeholder in template: suffix = template.replace( placeholder, "" ) df = df.with_columns( pl.when(combined_mask) .then( pl.col(col_name) .cast(pl.Utf8) + pl.lit(suffix) + pl.col(target_field) .cast(pl.Utf8) ) .otherwise(pl.col(target_field)) .alias(target_field) ) break elif action_type == "append": value = action_def.get("value", "") df = df.with_columns( pl.when(combined_mask) .then( pl.col(target_field).cast(pl.Utf8) + pl.lit(value) ) .otherwise(pl.col(target_field)) .alias(target_field) ) elif action_type == "exclude": reason = action_def.get( "reason", "Durch Regel ausgeschlossen" ) df = df.with_columns( pl.when(combined_mask) .then(pl.lit(True)) .otherwise(pl.col("_excluded")) .alias("_excluded"), pl.when(combined_mask) .then(pl.lit(reason)) .otherwise(pl.col("_exclude_reason")) .alias("_exclude_reason"), ) # ── Statistik ───────────────────────────────────────── total = df.height excluded_count = df.filter(pl.col("_excluded")).height active_count = total - excluded_count logger.info( f"Transformation abgeschlossen: {total:,} Gesamt | " f"{active_count:,} Aktiv | " f"{excluded_count:,} Ausgeschlossen" ) return df ``` > ⚠️ **Hinweis:** Dies ist ein konzeptionelles Beispiel-Skript. Die Regel-Engine ist bewusst einfach gehalten und muss für den Produktionseinsatz um Fehlerbehandlung, Validierung der Eingabedaten und weitere Operatoren erweitert werden. Die Feldnamen (`EK_Netto`, `VK_Brutto`, `stock_quantity` usw.) müssen mit den tatsächlichen Spalten Ihres JTL-Exports übereinstimmen. ## 6.4 Load-Modul (load.py) ```python #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ /opt/etl-pipeline/scripts/load.py Modul für den Batch-Upload transformierter Produktdaten an die Google Content API for Shopping. Implementiert Rate-Limiting und exponentielles Backoff. """ import os import json import time import polars as pl from pathlib import Path from datetime import datetime from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.errors import HttpError from tenacity import ( retry, stop_after_attempt, wait_exponential, retry_if_exception_type, ) from scripts.utils import ( setup_logging, load_yaml_config, CREDENTIALS_DIR, DATA_DIR, LOG_DIR, get_date_stamp, ) logger = setup_logging("load") # ══════════════════════════════════════════════════════════ # Konstanten # ══════════════════════════════════════════════════════════ BATCH_SIZE = 1000 # Produkte pro Batch-Request RATE_LIMIT_DELAY = 1.0 # Sekunden Pause zwischen Batches MAX_RETRIES = 5 # Maximale Wiederholungsversuche BACKOFF_MULTIPLIER = 2 # Exponentieller Backoff-Faktor # ══════════════════════════════════════════════════════════ # Google Content API Client # ══════════════════════════════════════════════════════════ def get_content_api_service(): """ Erstellt einen authentifizierten Google Content API Service. Returns: Google API Service-Objekt für Content API v2.1. """ credentials_path = os.environ.get( "GOOGLE_APPLICATION_CREDENTIALS", str(CREDENTIALS_DIR / "google_service_account.json"), ) credentials = service_account.Credentials.from_service_account_file( credentials_path, scopes=["https://www.googleapis.com/auth/content"], ) service = build("content", "v2.1", credentials=credentials) logger.info("Google Content API Service erstellt.") return service # ══════════════════════════════════════════════════════════ # Produkt-Payload-Erstellung # ══════════════════════════════════════════════════════════ def build_product_payload( row: dict, column_mapping: dict, settings: dict, ) -> dict: """ Erstellt den API-Payload für ein einzelnes Produkt nach dem Google Shopping Content API Schema. Args: row: Dictionary mit den Produktdaten einer Zeile. column_mapping: Spalten-Mapping (JTL → Google). settings: Globale Pipeline-Einstellungen. Returns: Dictionary im Google Content API Produktformat. """ target_country = settings.get("target_country", "DE") content_language = settings.get("content_language", "de") currency = settings.get("currency", "EUR") mapping = column_mapping.get("mapping", {}) product = { "offerId": str(row.get( mapping.get("Artikelnummer", "Artikelnummer"), "" )), "title": str(row.get("title", row.get( mapping.get("Artikelname", ""), "" ))), "description": str(row.get( mapping.get("Beschreibung", "Beschreibung"), "" )), "link": str(row.get( mapping.get("URL", "URL"), "" )), "imageLink": str(row.get( mapping.get("BildURL", "BildURL"), "" )), "contentLanguage": content_language, "targetCountry": target_country, "channel": "online", "availability": ( "in stock" if row.get("stock_quantity", 0) > 0 else "out of stock" ), "condition": "new", "price": { "value": str(row.get("VK_Brutto", "0.00")), "currency": currency, }, } # Optionale Felder hinzufügen brand = row.get("Marke", row.get("brand", "")) if brand: product["brand"] = str(brand) gtin = row.get("GTIN", row.get("gtin", "")) if gtin: product["gtin"] = str(gtin) # Custom Labels (0-4) for i in range(5): label_value = row.get(f"custom_label_{i}", "") if label_value: product[f"customLabel{i}"] = str(label_value) # Sale-Preis sale_price = row.get("sale_price", "") if sale_price and float(sale_price) > 0: product["salePrice"] = { "value": str(sale_price), "currency": currency, } # Versandkosten shipping_cost = row.get("shipping_cost", "") if shipping_cost is not None and shipping_cost != "": product["shipping"] = [{ "country": target_country, "price": { "value": str(shipping_cost), "currency": currency, }, }] return product # ══════════════════════════════════════════════════════════ # Batch-Upload mit Rate Limiting # ══════════════════════════════════════════════════════════ @retry( stop=stop_after_attempt(MAX_RETRIES), wait=wait_exponential( multiplier=BACKOFF_MULTIPLIER, min=2, max=60 ), retry=retry_if_exception_type( (HttpError, ConnectionError, TimeoutError) ), before_sleep=lambda retry_state: logger.warning( f"Retry {retry_state.attempt_number}/{MAX_RETRIES} " f"nach Fehler. " f"Warte {retry_state.next_action.sleep:.1f}s..." ), ) def execute_batch_request( service, merchant_id: str, batch_entries: list[dict], ) -> dict: """ Führt einen einzelnen Batch-Request an die Content API aus. Implementiert exponentielles Backoff bei Rate-Limit-Fehlern. Args: service: Google Content API Service. merchant_id: Merchant Center ID. batch_entries: Liste der Batch-Einträge. Returns: API-Antwort als Dictionary. """ body = {"entries": batch_entries} response = ( service.products() .custombatch(body=body) .execute() ) return response def upload_products_to_google( df: pl.DataFrame, batch_size: int = BATCH_SIZE, rate_limit_delay: float = RATE_LIMIT_DELAY, ) -> dict: """ Lädt alle aktiven (nicht ausgeschlossenen) Produkte als Batch-Requests an die Google Content API hoch. Args: df: Polars DataFrame mit transformierten Produktdaten. batch_size: Anzahl der Produkte pro Batch-Request. rate_limit_delay: Pause in Sekunden zwischen Batches. Returns: Dictionary mit Upload-Statistiken. """ merchant_id = os.environ.get("MERCHANT_CENTER_ID") if not merchant_id: raise ValueError("MERCHANT_CENTER_ID ist nicht gesetzt") # Konfiguration laden column_mapping = load_yaml_config("column_mapping.yaml") rules_config = load_yaml_config("rules.yaml") settings = rules_config.get("settings", {}) # Nur aktive (nicht ausgeschlossene) Produkte uploaden df_active = df.filter(~pl.col("_excluded")) total_products = df_active.height logger.info( f"Starte Upload: {total_products:,} aktive Produkte " f"in Batches von {batch_size}" ) # Service erstellen service = get_content_api_service() # ── Statistiken ─────────────────────────────────────── stats = { "total": total_products, "success": 0, "errors": 0, "batches": 0, "start_time": datetime.now().isoformat(), "error_details": [], } # ── Produkte in Batches aufteilen und hochladen ─────── rows_as_dicts = df_active.to_dicts() total_batches = ( (total_products + batch_size - 1) // batch_size ) for batch_start in range(0, total_products, batch_size): batch_end = min(batch_start + batch_size, total_products) batch_rows = rows_as_dicts[batch_start:batch_end] batch_number = (batch_start // batch_size) + 1 logger.info( f"Batch {batch_number}/{total_batches}: " f"Produkte {batch_start + 1:,} – {batch_end:,}" ) # Batch-Einträge erstellen batch_entries = [] for idx, row in enumerate(batch_rows): product_payload = build_product_payload( row, column_mapping, settings ) batch_entries.append({ "batchId": batch_start + idx, "merchantId": merchant_id, "method": "insert", "product": product_payload, }) # Batch-Request ausführen (mit Retry/Backoff) try: response = execute_batch_request( service, merchant_id, batch_entries ) # Antwort auswerten for entry in response.get("entries", []): if ( "errors" in entry and entry["errors"].get("errors") ): stats["errors"] += 1 batch_idx = entry.get("batchId", 0) local_idx = batch_idx - batch_start offer_id = "unbekannt" if 0 <= local_idx < len(batch_rows): offer_id = batch_rows[local_idx].get( "Artikelnummer", "unbekannt" ) error_info = { "batchId": batch_idx, "offerId": offer_id, "errors": entry["errors"]["errors"], } stats["error_details"].append(error_info) logger.warning( f"Fehler bei Produkt {offer_id}: " f"{entry['errors']['errors']}" ) else: stats["success"] += 1 except HttpError as e: if e.resp.status == 429: logger.warning( f"Rate Limit erreicht bei Batch " f"{batch_number}. Erhöhe Wartezeit..." ) time.sleep(rate_limit_delay * 5) else: logger.error( f"HTTP-Fehler bei Batch " f"{batch_number}: {e}" ) stats["errors"] += len(batch_rows) except Exception as e: logger.error( f"Unerwarteter Fehler bei Batch " f"{batch_number}: {e}" ) stats["errors"] += len(batch_rows) stats["batches"] += 1 # ── Rate Limiting: Pause zwischen Batches ───────── if batch_end < total_products: logger.debug( f"Rate Limiting: {rate_limit_delay}s Pause..." ) time.sleep(rate_limit_delay) # ── Upload-Statistiken loggen ───────────────────────── stats["end_time"] = datetime.now().isoformat() stats["duration_seconds"] = ( datetime.fromisoformat(stats["end_time"]) - datetime.fromisoformat(stats["start_time"]) ).total_seconds() logger.info( f"Upload abgeschlossen: " f"{stats['success']:,} erfolgreich | " f"{stats['errors']:,} Fehler | " f"{stats['batches']} Batches | " f"{stats['duration_seconds']:.1f}s Gesamtdauer" ) # ── Fehlerdetails als JSON speichern ────────────────── if stats["error_details"]: error_dir = LOG_DIR / "upload_errors" error_dir.mkdir(parents=True, exist_ok=True) error_file = ( error_dir / f"errors_{get_date_stamp()}.json" ) with open(error_file, "w", encoding="utf-8") as f: json.dump( stats["error_details"], f, indent=2, ensure_ascii=False, ) logger.info(f"Fehlerdetails gespeichert: {error_file}") return stats ``` > ⚠️ **Hinweis:** Dies ist ein konzeptionelles Beispiel-Skript. Die Batch-Größe, Rate-Limiting-Werte und API-Payloads müssen an Ihre spezifischen API-Quotas und Produktdatenstrukturen angepasst werden. Testen Sie den Upload zunächst mit einer kleinen Teilmenge (z. B. 10 Produkte) im Testmodus der Google Content API, bevor Sie den vollständigen Datensatz hochladen. ## 6.5 Hauptskript (main.py) ```python #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ /opt/etl-pipeline/scripts/main.py Hauptskript (Entry Point) für die ETL-Pipeline. Orchestriert den gesamten Ablauf: Extract → Transform → Load. Aufruf: cd /opt/etl-pipeline source venv/bin/activate python -m scripts.main """ import sys import time import duckdb from pathlib import Path from scripts.utils import ( setup_logging, load_yaml_config, BASE_DIR, DATA_DIR, get_timestamp, ) from scripts.extract import ( load_jtl_csv_to_duckdb, fetch_google_sales_data, ) from scripts.transform import ( join_products_with_sales, add_calculated_fields, apply_rules, ) from scripts.load import upload_products_to_google logger = setup_logging("main") def run_pipeline() -> dict: """ Führt die vollständige ETL-Pipeline aus. Returns: Dictionary mit Statistiken aller drei Phasen. Raises: SystemExit: Bei kritischen Fehlern mit Exit-Code 1. """ pipeline_start = time.time() run_id = get_timestamp() logger.info(f"{'═' * 60}") logger.info(f"ETL-Pipeline gestartet — Run-ID: {run_id}") logger.info(f"{'═' * 60}") stats = { "run_id": run_id, "extract": {}, "transform": {}, "load": {}, } # ══════════════════════════════════════════════════════ # DuckDB-Verbindung herstellen # ══════════════════════════════════════════════════════ db_path = DATA_DIR / "db" / "products.duckdb" db_path.parent.mkdir(parents=True, exist_ok=True) con = duckdb.connect(str(db_path)) logger.info(f"DuckDB-Verbindung hergestellt: {db_path}") try: # ══════════════════════════════════════════════════ # PHASE 1: EXTRACT # ══════════════════════════════════════════════════ logger.info(f"{'─' * 40}") logger.info("PHASE 1: EXTRACT") logger.info(f"{'─' * 40}") # JTL CSV laden jtl_rows = load_jtl_csv_to_duckdb(con) stats["extract"]["jtl_rows"] = jtl_rows # Google Sales-Daten abrufen sales_rows = fetch_google_sales_data(con) stats["extract"]["sales_rows"] = sales_rows # ══════════════════════════════════════════════════ # PHASE 2: TRANSFORM # ══════════════════════════════════════════════════ logger.info(f"{'─' * 40}") logger.info("PHASE 2: TRANSFORM") logger.info(f"{'─' * 40}") # LEFT JOIN: Produkte ⟕ Sales df = join_products_with_sales(con) # Berechnete Felder hinzufügen df = add_calculated_fields(df) # YAML-Regeln anwenden rules_config = load_yaml_config("rules.yaml") df = apply_rules(df, rules_config) stats["transform"]["total_products"] = df.height stats["transform"]["active_products"] = df.filter( ~df["_excluded"] ).height stats["transform"]["excluded_products"] = df.filter( df["_excluded"] ).height # ══════════════════════════════════════════════════ # PHASE 3: LOAD # ══════════════════════════════════════════════════ logger.info(f"{'─' * 40}") logger.info("PHASE 3: LOAD") logger.info(f"{'─' * 40}") upload_stats = upload_products_to_google(df) stats["load"] = upload_stats except Exception as e: logger.critical( f"Pipeline fehlgeschlagen: {e}", exc_info=True ) # Exit-Code 1 signalisiert n8n einen Fehler sys.exit(1) finally: con.close() logger.info("DuckDB-Verbindung geschlossen.") # ══════════════════════════════════════════════════════ # Zusammenfassung # ══════════════════════════════════════════════════════ pipeline_duration = time.time() - pipeline_start stats["duration_seconds"] = round(pipeline_duration, 2) logger.info(f"{'═' * 60}") logger.info( f"ETL-Pipeline abgeschlossen — " f"Dauer: {pipeline_duration:.1f}s" ) logger.info( f" Extract: " f"{stats['extract'].get('jtl_rows', 0):,} JTL, " f"{stats['extract'].get('sales_rows', 0):,} Sales" ) logger.info( f" Transform: " f"{stats['transform'].get('active_products', 0):,} aktiv, " f"{stats['transform'].get('excluded_products', 0):,} " f"ausgeschlossen" ) logger.info( f" Load: " f"{stats['load'].get('success', 0):,} OK, " f"{stats['load'].get('errors', 0):,} Fehler" ) logger.info(f"{'═' * 60}") return stats # ────────────────────────────────────────────────────────── # Entry Point # ────────────────────────────────────────────────────────── if __name__ == "__main__": run_pipeline() ``` > ⚠️ **Hinweis:** Dies ist ein konzeptionelles Beispiel-Skript. Es dient als Ausgangspunkt und Entry Point für die Pipeline. Vor dem produktiven Einsatz sollten alle Module gründlich mit Testdaten validiert und die Fehlerbehandlung an Ihre Anforderungen angepasst werden. --- # 7. n8n Workflow Integration ## 7.1 Übersicht Der n8n Workflow übernimmt zwei zentrale Aufgaben: 1. **Scheduling:** Tägliche Ausführung der Python-Pipeline um **00:00 Uhr** (Mitternacht). 2. **Error Handling:** Benachrichtigung per E-Mail (oder Slack/Telegram) bei Fehlern. Die Kommunikation zwischen n8n (Docker-Container) und der Python-Pipeline auf dem Host-System erfolgt über den n8n-Node **Execute Command**, der Shell-Befehle im Container ausführt. Da das Pipeline-Verzeichnis als Volume gemountet ist, hat n8n Zugriff auf die Skripte. ## 7.2 Workflow-Architektur ```mermaid flowchart TD CRON["⏰ Cron Trigger
täglich 00:00 Uhr"] EXEC["⚡ Execute Command
/opt/etl-pipeline/venv/bin/python
-m scripts.main
"] CHECK{"🔍 Exit-Code
prüfen
"} SUCCESS["✅ Erfolgs-Meldung
Log + optionale E-Mail"] ERROR["❌ Fehler-Alarm
E-Mail / Slack
Benachrichtigung
"] CRON --> EXEC EXEC --> CHECK CHECK -->|"Exit = 0 — OK"| SUCCESS CHECK -->|"Exit = 1 — Fehler"| ERROR style CRON fill:#e58e26,stroke:#fa983a,color:#fff style EXEC fill:#0c2461,stroke:#1e3799,color:#fff style CHECK fill:#474787,stroke:#706fd3,color:#fff style SUCCESS fill:#079992,stroke:#38ada9,color:#fff style ERROR fill:#b71540,stroke:#e55039,color:#fff ``` ## 7.3 Workflow als n8n-JSON importieren Der folgende JSON-Export kann direkt in n8n importiert werden unter **Workflow → Import from File**. Speichern Sie den Inhalt als `.json`-Datei. ```json { "name": "ETL Pipeline - Google Shopping (Daily)", "nodes": [ { "parameters": { "rule": { "interval": [ { "triggerAtHour": 0, "triggerAtMinute": 0 } ] } }, "name": "Cron Trigger (00:00)", "type": "n8n-nodes-base.scheduleTrigger", "typeVersion": 1.2, "position": [240, 300] }, { "parameters": { "command": "cd /opt/etl-pipeline && source venv/bin/activate && python -m scripts.main 2>&1", "timeout": 3600 }, "name": "Execute ETL Pipeline", "type": "n8n-nodes-base.executeCommand", "typeVersion": 1, "position": [480, 300] }, { "parameters": { "conditions": { "number": [ { "value1": "={{ $json.exitCode }}", "operation": "equal", "value2": 0 } ] } }, "name": "Check Exit Code", "type": "n8n-nodes-base.if", "typeVersion": 1, "position": [720, 300] }, { "parameters": { "fromEmail": "etl-pipeline@ihre-domain.de", "toEmail": "admin@ihre-domain.de", "subject": "✅ ETL Pipeline — Erfolg ({{ $now.format('yyyy-MM-dd') }})", "text": "Die ETL-Pipeline wurde erfolgreich ausgeführt.\n\nZeitstempel: {{ $now.format('yyyy-MM-dd HH:mm:ss') }}\n\nStdout:\n{{ $json.stdout }}" }, "name": "Success E-Mail", "type": "n8n-nodes-base.emailSend", "typeVersion": 2, "position": [960, 200] }, { "parameters": { "fromEmail": "etl-pipeline@ihre-domain.de", "toEmail": "admin@ihre-domain.de", "subject": "❌ ETL Pipeline — FEHLER ({{ $now.format('yyyy-MM-dd') }})", "text": "ACHTUNG: Die ETL-Pipeline ist fehlgeschlagen!\n\nZeitstempel: {{ $now.format('yyyy-MM-dd HH:mm:ss') }}\nExit-Code: {{ $json.exitCode }}\n\nStderr:\n{{ $json.stderr }}\n\nStdout:\n{{ $json.stdout }}\n\nBitte prüfen: /opt/etl-pipeline/logs/error.log" }, "name": "Error E-Mail", "type": "n8n-nodes-base.emailSend", "typeVersion": 2, "position": [960, 400] } ], "connections": { "Cron Trigger (00:00)": { "main": [ [{ "node": "Execute ETL Pipeline", "type": "main", "index": 0 }] ] }, "Execute ETL Pipeline": { "main": [ [{ "node": "Check Exit Code", "type": "main", "index": 0 }] ] }, "Check Exit Code": { "main": [ [{ "node": "Success E-Mail", "type": "main", "index": 0 }], [{ "node": "Error E-Mail", "type": "main", "index": 0 }] ] } }, "settings": { "timezone": "Europe/Berlin", "saveExecutionProgress": true, "saveManualExecutions": true, "executionTimeout": 7200 } } ``` > ⚠️ **Hinweis:** Dies ist ein Beispiel-Workflow für n8n. Die E-Mail-Adressen, SMTP-Einstellungen und Timeouts müssen an Ihre Umgebung angepasst werden. Konfigurieren Sie die SMTP-Credentials in n8n unter **Settings → Credentials** bevor Sie den Workflow aktivieren. ## 7.4 Wichtige n8n-Konfigurationshinweise ### Timeout-Einstellung Da die Verarbeitung von 200.000+ Artikeln je nach Serverhardware zwischen **5 und 30 Minuten** dauern kann, muss der Timeout im `Execute Command`-Node ausreichend hoch gesetzt werden: ``` timeout: 3600 # 1 Stunde (in Sekunden) ``` ### Volume-Mount beachten Der `Execute Command`-Node führt Befehle **innerhalb des n8n Docker-Containers** aus. Damit die Python-Skripte und das Virtual Environment erreichbar sind, muss der Pfad `/opt/etl-pipeline` als Volume im `docker-compose.yml` gemountet sein (siehe Abschnitt 3.1.2). ### Alternative: Host-Befehl über SSH Falls die Pipeline nicht innerhalb des Containers laufen soll, kann alternativ ein SSH-Node verwendet werden, der den Befehl auf dem Host ausführt: ```bash # Befehl für n8n SSH-Node: ssh pipeline-user@localhost \ "cd /opt/etl-pipeline && source venv/bin/activate && python -m scripts.main" ``` > ⚠️ **Hinweis:** Beispielbefehl. Für die SSH-Variante muss ein SSH-Key ohne Passphrase konfiguriert und die SSH-Credentials in n8n hinterlegt werden. ## 7.5 Manueller Pipeline-Start (Debugging) Für Debugging und Tests kann die Pipeline jederzeit manuell gestartet werden: ```bash # ────────────────────────────────────────────────────────── # Pipeline manuell ausführen # ────────────────────────────────────────────────────────── cd /opt/etl-pipeline source venv/bin/activate python -m scripts.main # Alternativ: Nur den Exit-Code prüfen python -m scripts.main; echo "Exit-Code: $?" ``` > ⚠️ **Hinweis:** Beispielbefehle für den manuellen Start. Stellen Sie sicher, dass die Umgebungsvariablen (`GOOGLE_APPLICATION_CREDENTIALS`, `MERCHANT_CENTER_ID`) in der Shell-Session gesetzt sind. --- # 8. Wartung & Monitoring ## 8.1 Log-Management ### 8.1.1 Log-Struktur Die Pipeline erzeugt folgende Log-Dateien: | Datei | Inhalt | Rotation | |----------------------------------------------|--------------------------------------|----------------------| | `logs/etl_pipeline.log` | Alle Log-Einträge (INFO+) | 10 MB, 5 Backups | | `logs/error.log` | Nur Fehler (ERROR+) | 10 MB, 5 Backups | | `logs/upload_errors/errors_YYYYMMDD.json` | Detail-Fehler pro Upload-Lauf | Täglich, eine Datei | ### 8.1.2 Logrotate-Konfiguration Zusätzlich zur integrierten Python-Rotation empfiehlt sich eine systemweite Logrotate-Konfiguration als Sicherheitsnetz. Erstellen Sie die Datei `/etc/logrotate.d/etl-pipeline`: ``` /opt/etl-pipeline/logs/*.log { daily missingok rotate 30 compress delaycompress notifempty create 0640 root root sharedscripts postrotate # Optional: Signal an laufende Prozesse /bin/true endscript } /opt/etl-pipeline/logs/upload_errors/*.json { daily missingok rotate 90 compress delaycompress notifempty create 0640 root root } ``` > ⚠️ **Hinweis:** Dies ist eine Beispiel-Logrotate-Konfiguration. Passen Sie den Benutzer, die Aufbewahrungsdauer (`rotate`) und die Dateiberechtigungen an Ihre Sicherheitsrichtlinien an. ### 8.1.3 Log-Bereinigung (Cron) Alte Log-Dateien und Upload-Fehlerprotokolle können zusätzlich per Cron bereinigt werden: ```bash # ────────────────────────────────────────────────────────── # Crontab-Eintrag: Logs älter als 90 Tage löschen # ────────────────────────────────────────────────────────── # crontab -e 0 3 * * 0 find /opt/etl-pipeline/logs/ -name "*.log.*" -mtime +90 -delete 0 3 * * 0 find /opt/etl-pipeline/logs/upload_errors/ -name "*.json" -mtime +90 -delete 0 3 * * 0 find /opt/etl-pipeline/data/archive/ -name "*.csv" -mtime +180 -delete ``` > ⚠️ **Hinweis:** Beispiel-Crontab-Einträge. Passen Sie die Aufbewahrungsfristen (hier: 90 bzw. 180 Tage) an Ihre Compliance-Anforderungen an. ## 8.2 Performance-Tipps für 200.000+ Artikel ### 8.2.1 DuckDB-Optimierungen ```python # ────────────────────────────────────────────────────────── # DuckDB Performance-Konfiguration # (am Anfang der Pipeline setzen) # ────────────────────────────────────────────────────────── con = duckdb.connect("products.duckdb") # Verfügbaren Arbeitsspeicher für DuckDB erhöhen con.execute("SET memory_limit = '4GB'") # Alle verfügbaren CPU-Kerne nutzen con.execute("SET threads TO 8") # Temporäre Dateien auf schnelle SSD legen con.execute("SET temp_directory = '/opt/etl-pipeline/data/tmp'") # Fortschrittsanzeige für lange Queries con.execute("SET enable_progress_bar = true") ``` > ⚠️ **Hinweis:** Beispiel-Konfiguration. Die Werte für `memory_limit` und `threads` müssen an die tatsächliche Serverausstattung angepasst werden. ### 8.2.2 Polars-Optimierungen ```python # ────────────────────────────────────────────────────────── # Polars Performance-Tipps # ────────────────────────────────────────────────────────── import polars as pl # 1. Lazy Evaluation verwenden (bei sehr großen Datensätzen) # Polars optimiert den Query-Plan automatisch df_lazy = pl.scan_csv( "data/input/jtl_export.csv", separator=";" ) df_result = ( df_lazy .filter(pl.col("Lagerbestand") > 0) .with_columns([ pl.col("VK_Brutto").cast(pl.Float64), ]) .collect() # Erst hier werden die Daten materialisiert ) # 2. Streaming-Modus für Datensätze > verfügbares RAM df_result = ( df_lazy .filter(pl.col("Lagerbestand") > 0) .collect(streaming=True) # Verarbeitet in Chunks ) # 3. Datentypen optimieren (reduziert RAM-Verbrauch) df = df.with_columns([ pl.col("Lagerbestand").cast(pl.Int32), # statt Int64 pl.col("clicks").cast(pl.Int32), # statt Int64 pl.col("impressions").cast(pl.Int32), # statt Int64 pl.col("custom_label_0").cast(pl.Categorical), # statt Utf8 pl.col("custom_label_1").cast(pl.Categorical), # statt Utf8 ]) # 4. Spaltenauswahl: Nur benötigte Spalten laden df = pl.read_csv( "data.csv", columns=["sku", "title", "price", "stock"], ) ``` > ⚠️ **Hinweis:** Dies sind Beispiel-Codeausschnitte zur Performance-Optimierung. Die tatsächlichen Spaltennamen und Datentypen müssen an Ihren Datensatz angepasst werden. ### 8.2.3 Google API Upload-Optimierungen | Parameter | Empfehlung | Begründung | |--------------------------|--------------------------|--------------------------------------------------| | `BATCH_SIZE` | 1.000 | Google empfiehlt max. 1.000 Einträge pro Batch | | `RATE_LIMIT_DELAY` | 1,0 – 2,0 Sekunden | Verhindert 429-Fehler bei Standard-Quotas | | `MAX_RETRIES` | 5 | Ausreichend für temporäre Fehler | | Parallele Requests | Nicht empfohlen | Erhöht 429-Risiko, sequenziell ist sicherer | | Geschätzte Upload-Dauer | 10 – 20 Minuten | Bei 200 Batches × 1s Delay + API-Latenz | ### 8.2.4 Allgemeine Server-Monitoring-Befehle ```bash # ────────────────────────────────────────────────────────── # System-Monitoring während Pipeline-Lauf # ────────────────────────────────────────────────────────── # RAM- und CPU-Auslastung in Echtzeit beobachten htop # Festplatten-I/O überwachen iostat -x 5 # Pipeline-Prozess gezielt überwachen watch -n 2 "ps aux | grep 'python.*scripts.main' | grep -v grep" # Festplattennutzung des Pipeline-Verzeichnisses du -sh /opt/etl-pipeline/data/* # DuckDB-Dateigröße prüfen ls -lh /opt/etl-pipeline/data/db/products.duckdb ``` > ⚠️ **Hinweis:** Beispielbefehle für das System-Monitoring. Diese dienen zur Orientierung und sollten je nach Ihren Monitoring-Tools (z. B. Prometheus, Grafana, Zabbix) ergänzt oder ersetzt werden. ## 8.3 Backup-Empfehlungen Die folgenden Dateien und Verzeichnisse sollten regelmäßig gesichert werden: | Pfad | Priorität | Frequenz | |------------------------------------------------|-----------|----------------| | `/opt/etl-pipeline/config/` | Hoch | Bei Änderung | | `/opt/etl-pipeline/credentials/` | Kritisch | Bei Änderung | | `/opt/etl-pipeline/scripts/` | Hoch | Bei Änderung | | `/opt/etl-pipeline/data/db/products.duckdb` | Mittel | Täglich | | `/opt/n8n/data/` | Hoch | Wöchentlich | ```bash #!/bin/bash # ────────────────────────────────────────────────────────── # Einfaches Backup-Skript (Beispiel) # ────────────────────────────────────────────────────────── BACKUP_DIR="/backup/etl-pipeline/$(date +%Y%m%d)" mkdir -p "$BACKUP_DIR" # Konfiguration und Skripte sichern tar czf "$BACKUP_DIR/config_scripts.tar.gz" \ /opt/etl-pipeline/config/ \ /opt/etl-pipeline/scripts/ \ /opt/etl-pipeline/requirements.txt # DuckDB sichern cp /opt/etl-pipeline/data/db/products.duckdb \ "$BACKUP_DIR/products.duckdb" # n8n Workflows sichern tar czf "$BACKUP_DIR/n8n_data.tar.gz" /opt/n8n/data/ echo "Backup erstellt: $BACKUP_DIR" ``` > ⚠️ **Hinweis:** Dies ist ein vereinfachtes Beispiel-Backup-Skript. Für den Produktionseinsatz sollte eine vollwertige Backup-Lösung (z. B. BorgBackup, Restic) mit Verschlüsselung und Off-Site-Speicherung verwendet werden. ## 8.4 Checkliste für den Produktivbetrieb Vor der Aktivierung der automatisierten Pipeline sollten folgende Punkte abgearbeitet sein: - [ ] Python Virtual Environment erstellt und alle Abhängigkeiten installiert - [ ] DuckDB-Datenbank initialisiert und CSV-Import getestet - [ ] Google Service Account erstellt und in Merchant Center eingeladen - [ ] `GOOGLE_APPLICATION_CREDENTIALS` und `MERCHANT_CENTER_ID` als Umgebungsvariablen gesetzt - [ ] `rules.yaml` mit initialen Regeln konfiguriert - [ ] `column_mapping.yaml` an JTL-Export-Spalten angepasst - [ ] Pipeline manuell mit Testdaten durchgelaufen (alle 3 Phasen) - [ ] n8n Workflow importiert und Cron-Trigger konfiguriert - [ ] E-Mail-/Slack-Benachrichtigung bei Fehlern getestet - [ ] Logrotate-Konfiguration aktiv - [ ] Backup-Strategie implementiert - [ ] Firewall-Regeln: Ausgehender Traffic zu Google APIs erlaubt - [ ] Monitoring-Dashboard eingerichtet (optional) --- # Anhang ## A. Nützliche Befehle (Kurzreferenz) ```bash # ── Pipeline ────────────────────────────────────────────── cd /opt/etl-pipeline && source venv/bin/activate python -m scripts.main # Pipeline starten python -m scripts.main 2>&1 | tee run.log # Mit Log-Ausgabe # ── n8n ─────────────────────────────────────────────────── cd /opt/n8n docker compose up -d # Starten docker compose down # Stoppen docker compose logs -f --tail=100 # Logs anzeigen docker compose restart # Neustart # ── DuckDB CLI ──────────────────────────────────────────── cd /opt/etl-pipeline && source venv/bin/activate python -c " import duckdb con = duckdb.connect('data/db/products.duckdb') print(con.execute('SHOW TABLES').fetchall()) print(con.execute('SELECT COUNT(*) FROM products_raw').fetchone()) con.close() " # ── Logs prüfen ────────────────────────────────────────── tail -f /opt/etl-pipeline/logs/etl_pipeline.log tail -100 /opt/etl-pipeline/logs/error.log cat /opt/etl-pipeline/logs/upload_errors/errors_$(date +%Y%m%d).json | jq . # ── Disk-Usage ──────────────────────────────────────────── du -sh /opt/etl-pipeline/data/db/* du -sh /opt/etl-pipeline/logs/* df -h /opt/etl-pipeline/ ``` > ⚠️ **Hinweis:** Beispiel-Befehlsreferenz. Alle Pfade und Befehle müssen an Ihre Serverumgebung angepasst werden. ## B. Fehlerbehandlung (Häufige Probleme) | Problem | Mögliche Ursache | Lösung | |------------------------------------------------------|------------------------------------------|--------------------------------------------------------| | `FileNotFoundError: jtl_export.csv` | CSV nicht am erwarteten Pfad | JTL-Export-Pfad in `settings.yaml` prüfen | | `google.auth.exceptions.DefaultCredentialsError` | Credentials nicht gefunden | `GOOGLE_APPLICATION_CREDENTIALS` prüfen | | `HttpError 429: Too Many Requests` | API-Quote überschritten | `RATE_LIMIT_DELAY` erhöhen (z. B. auf 2–3s) | | `MemoryError` bei Polars | RAM nicht ausreichend | Polars Streaming-Modus oder RAM aufstocken | | n8n: Timeout nach 600s | Pipeline dauert zu lang | Timeout im Execute Command auf 3600s erhöhen | | DuckDB: `IO Error` | Festplatte voll | Archiv- und Temp-Dateien bereinigen | | `ModuleNotFoundError` | venv nicht aktiviert | `source venv/bin/activate` vor Ausführung | --- *Dokumentation erstellt am 2026-03-06 | Version 1.0.0* *Letzte Aktualisierung: 2026-03-06*