108 KiB
title, description, published, date, tags, editor, dateCreated
| title | description | published | date | tags | editor | dateCreated |
|---|---|---|---|---|---|---|
| Channable Stack LT24 Profice Dokumentation | true | 2026-03-06T11:13:13.929Z | markdown | 2026-03-06T11:09:09.239Z |
Self-Hosted ETL-Pipeline für Google Shopping
Lokale Alternative zu Channable — Technische Dokumentation
| Eigenschaft | Wert |
|---|---|
| Dokumentversion | 1.0.0 |
| Erstellt am | 2026-03-06 |
| Zielplattform | Debian 12 (Bookworm) Server |
| Tech-Stack | Python 3.11+ · Polars · DuckDB · n8n (Docker) |
| Datensatzgröße | 200.000+ Artikel, 30+ Attribute pro Artikel |
| Ziel-API | Google Content API for Shopping |
| Sprache | Deutsch |
Inhaltsverzeichnis
- Systemübersicht
- Voraussetzungen
- Installation & Setup
- Projektstruktur
- Konfiguration
- Core Script Logic (Beispiel)
- n8n Workflow Integration
- Wartung & Monitoring
1. Systemübersicht
1.1 Architektur-Überblick
Die Pipeline ersetzt den kommerziellen SaaS-Dienst Channable durch eine vollständig selbst gehostete, Open-Source-basierte Lösung auf einem dedizierten Debian-Server. Der gesamte Datenfluss folgt dem klassischen ETL-Muster (Extract → Transform → Load) und wird täglich automatisiert durch n8n orchestriert.
Architektur-Diagramm
graph TB
subgraph N8N [n8n - Docker Container]
CRON[Cron-Trigger - 00:00 Uhr]
ERR_HANDLER[Error-Handler - E-Mail / Slack]
end
subgraph EXTRACT [EXTRACT - Datenquellen]
JTL[JTL-WAWI CSV-Export]
GAPI[Google Ads/GA API - 90 Tage]
end
subgraph TRANSFORM [TRANSFORM - Verarbeitung]
DUCKDB_JOIN[DuckDB - LEFT JOIN]
POLARS[Polars Regel-Engine - rules.yaml]
end
subgraph LOAD [LOAD - Upload]
GCONTENT[Google Content API - Batch-Upload]
end
DUCKDB_FILE[(DuckDB - products.duckdb)]
LOGS[Logs und Monitoring]
CRON -->|startet| JTL
CRON -->|startet| GAPI
JTL -->|CSV Import| DUCKDB_JOIN
GAPI -->|Sales-Daten| DUCKDB_JOIN
DUCKDB_JOIN -->|DataFrame| POLARS
POLARS -->|transformierte Daten| GCONTENT
DUCKDB_JOIN -.->|persistiert| DUCKDB_FILE
ERR_HANDLER -.->|bei Fehler| LOGS
style N8N fill:#2d3436,stroke:#636e72,color:#dfe6e9
style EXTRACT fill:#0a3d62,stroke:#3c6382,color:#dfe6e9
style TRANSFORM fill:#1e3799,stroke:#4a69bd,color:#dfe6e9
style LOAD fill:#6a0572,stroke:#a834a8,color:#dfe6e9
style JTL fill:#079992,stroke:#38ada9,color:#fff
style GAPI fill:#079992,stroke:#38ada9,color:#fff
style DUCKDB_JOIN fill:#0c2461,stroke:#1e3799,color:#fff
style POLARS fill:#0c2461,stroke:#1e3799,color:#fff
style GCONTENT fill:#6a0572,stroke:#a834a8,color:#fff
style DUCKDB_FILE fill:#b71540,stroke:#e55039,color:#fff
style LOGS fill:#474787,stroke:#706fd3,color:#fff
style CRON fill:#e58e26,stroke:#fa983a,color:#fff
style ERR_HANDLER fill:#e55039,stroke:#eb2f06,color:#fff
ETL-Datenfluss (vereinfacht)
graph LR
A[JTL CSV - 200k+ Artikel] --> C[DuckDB - LEFT JOIN]
B[Google API - Sales 90 Tage] --> C
C --> D[Polars - rules.yaml]
D --> E[Google Content API - Batch-Upload]
style A fill:#079992,stroke:#38ada9,color:#fff
style B fill:#079992,stroke:#38ada9,color:#fff
style C fill:#0c2461,stroke:#1e3799,color:#fff
style D fill:#1e3799,stroke:#4a69bd,color:#fff
style E fill:#6a0572,stroke:#a834a8,color:#fff
1.2 Datenfluss im Detail
Der Datenfluss gliedert sich in drei klar voneinander getrennte Phasen:
Phase 1 — Extract (Datenextraktion)
In der Extract-Phase werden die Rohdaten aus zwei unabhängigen Quellen bezogen:
Quelle A — JTL-WAWI CSV-Export: Die ERP-Software JTL-WAWI exportiert den vollständigen Produktkatalog als CSV-Datei. Diese Datei enthält alle Stammdaten wie Artikelnummer, Titel, Beschreibung, Preis, EAN, Bestand, Kategorie, Bilder-URLs und weitere produktspezifische Attribute. Der Export wird automatisch durch einen JTL-Worker oder manuell über den JTL-Ameise-Export auf dem Server abgelegt.
Quelle B — Google Ads/Analytics Reporting API: Über die Google API werden die Verkaufsperformance-Daten der letzten 90 Tage abgerufen. Dazu gehören Metriken wie Verkaufsanzahl pro Artikel (sales_quantity), Umsatz pro Artikel (revenue), Klicks, Impressionen und Conversion-Rate. Diese Daten dienen als Grundlage für die regelbasierte Klassifizierung (z. B. „Bestseller"-Labels).
Beide Datenquellen werden als separate Tabellen in DuckDB importiert.
Phase 2 — Transform (Datentransformation)
In der Transform-Phase werden die Daten verknüpft und angereichert:
Zunächst wird in DuckDB ein LEFT JOIN zwischen der Produktstammdaten-Tabelle und der Verkaufsdaten-Tabelle durchgeführt. Die Verknüpfung erfolgt über eine gemeinsame Artikelnummer (z. B. sku oder offer_id). Das Ergebnis ist eine konsolidierte Tabelle, die sowohl Stamm- als auch Performance-Daten enthält.
Der resultierende DataFrame wird anschließend in Polars geladen. Hier werden regelbasierte Transformationen auf Grundlage der Datei rules.yaml angewandt. Typische Transformationen umfassen:
- Zuweisung von Custom Labels (
custom_label_0biscustom_label_4) basierend auf Verkaufszahlen, Marge oder Kategorie. - Preisanpassungen und Sale-Preis-Logik.
- Titeloptimierung (z. B. Hinzufügen von Marke oder Farbe).
- Filterung von Artikeln, die nicht auf Google Shopping erscheinen sollen (z. B. Bestand = 0).
- Anreicherung mit berechneten Feldern (z. B.
margin_percent,performance_tier).
Phase 3 — Load (Daten-Upload)
Die transformierten Produktdaten werden über die Google Content API for Shopping als Batch-Request an das Google Merchant Center übertragen. Der Upload-Prozess berücksichtigt dabei die API-Quotas und implementiert ein Rate-Limiting mit exponentiellem Backoff, um 429 Too Many Requests-Fehler zu vermeiden. Fehlerhafte Einzelprodukte werden in einer separaten Fehler-Log-Datei protokolliert und blockieren nicht den gesamten Upload.
2. Voraussetzungen
2.1 Hardware-Anforderungen
Die folgenden Spezifikationen gelten für die zuverlässige Verarbeitung von 200.000+ Artikeln mit jeweils 30+ Attributen. Die Werte sind konservativ kalkuliert und berücksichtigen die parallele Ausführung von n8n (Docker) und der Python-Pipeline.
| Ressource | Minimum | Empfohlen | Begründung |
|---|---|---|---|
| CPU | 4 Kerne (x86_64) | 8+ Kerne (x86_64) | Polars nutzt Multi-Threading automatisch aus |
| RAM | 8 GB | 16 GB | DuckDB + Polars halten Daten im Arbeitsspeicher |
| Festplatte | 40 GB SSD | 100 GB NVMe SSD | DuckDB-Dateien + CSV-Exporte + Logs + Docker-Images |
| Netzwerk | 100 Mbit/s | 1 Gbit/s | Google API Batch-Uploads erfordern stabile Bandbreite |
| OS | Debian 11 (Bullseye) | Debian 12 (Bookworm) | Aktuelle LTS-Basis mit langfristigem Sicherheits-Support |
Speicherberechnung (Orientierungswerte)
Für eine Kalkulation mit 200.000 Zeilen × 30 Spalten (durchschnittlich ~100 Bytes pro Zelle):
| Komponente | Geschätzter Bedarf |
|---|---|
| Rohdaten CSV auf Disk | ca. 550 – 650 MB |
| DuckDB komprimiert (auf Disk) | ca. 150 – 250 MB |
| Polars DataFrame im RAM | ca. 800 MB – 1,2 GB |
| Peak Memory (JOIN + Transformation) | ca. 2,5 – 4 GB |
| n8n Docker Container | ca. 300 – 500 MB RAM |
| Gesamt Peak-RAM | ca. 4 – 6 GB |
Empfehlung: Bei Datensätzen über 500.000 Artikeln sollte auf 32 GB RAM und Streaming-Verarbeitung (Polars Lazy-Mode) umgestellt werden.
2.2 Erforderliche Debian-Pakete
Die folgenden Pakete müssen auf dem Debian-Server installiert sein, bevor die Pipeline eingerichtet wird.
# ──────────────────────────────────────────────────────────
# System-Updates durchführen
# ──────────────────────────────────────────────────────────
sudo apt update && sudo apt upgrade -y
# ──────────────────────────────────────────────────────────
# Grundlegende Build-Tools und System-Abhängigkeiten
# ──────────────────────────────────────────────────────────
sudo apt install -y \
build-essential \
python3.11 \
python3.11-venv \
python3.11-dev \
python3-pip \
curl \
wget \
git \
jq \
unzip \
ca-certificates \
gnupg \
lsb-release \
cron \
logrotate \
htop \
tree
⚠️ Hinweis: Dies sind Beispielbefehle. Falls Python 3.11 nicht in den Standard-Repositories Ihrer Debian-Version verfügbar ist, muss es über das
deadsnakes-PPA, einen Quellcode-Build oder das Paketpython3(in Debian 12 standardmäßig Python 3.11) installiert werden. Passen Sie die Versionsnummern an Ihre Umgebung an.
2.3 Software-Voraussetzungen
| Software | Mindestversion | Zweck |
|---|---|---|
| Python | 3.11+ | Hauptlaufzeit für ETL-Skripte |
| Docker Engine | 24.0+ | Container-Runtime für n8n |
| Docker Compose Plugin | 2.20+ | Multi-Container-Orchestrierung |
| n8n | 1.30+ | Workflow-Automatisierung und Scheduling |
| DuckDB (Python-Modul) | 0.10+ | Analytische In-Process-Datenbank |
| Polars (Python-Modul) | 0.20+ | Schnelle DataFrame-Transformationen |
| google-api-python-client | 2.100+ | Google API-Zugriff |
| google-auth / google-auth-oauthlib | 2.20+ | Authentifizierung für Google APIs |
| PyYAML | 6.0+ | Regel-Konfiguration im YAML-Format |
3. Installation & Setup
3.1 Docker und n8n einrichten
3.1.1 Docker Engine installieren
# ──────────────────────────────────────────────────────────
# Offizielle Docker GPG-Schlüssel hinzufügen
# ──────────────────────────────────────────────────────────
sudo install -m 0755 -d /etc/apt/keyrings
curl -fsSL https://download.docker.com/linux/debian/gpg | \
sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
sudo chmod a+r /etc/apt/keyrings/docker.gpg
# ──────────────────────────────────────────────────────────
# Docker-Repository einrichten
# ──────────────────────────────────────────────────────────
echo \
"deb [arch=$(dpkg --print-architecture) \
signed-by=/etc/apt/keyrings/docker.gpg] \
https://download.docker.com/linux/debian \
$(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
# ──────────────────────────────────────────────────────────
# Docker Engine und Compose Plugin installieren
# ──────────────────────────────────────────────────────────
sudo apt update
sudo apt install -y \
docker-ce \
docker-ce-cli \
containerd.io \
docker-buildx-plugin \
docker-compose-plugin
# ──────────────────────────────────────────────────────────
# Docker ohne sudo ermöglichen (Relogin erforderlich)
# ──────────────────────────────────────────────────────────
sudo usermod -aG docker $USER
newgrp docker
# ──────────────────────────────────────────────────────────
# Installation verifizieren
# ──────────────────────────────────────────────────────────
docker --version
docker compose version
⚠️ Hinweis: Dies sind Beispielbefehle basierend auf der offiziellen Docker-Dokumentation für Debian. Prüfen Sie stets die aktuelle Installationsanleitung unter docs.docker.com.
3.1.2 n8n per Docker Compose bereitstellen
Erstellen Sie das Verzeichnis und die Konfiguration:
# ──────────────────────────────────────────────────────────
# Verzeichnisstruktur für n8n anlegen
# ──────────────────────────────────────────────────────────
sudo mkdir -p /opt/n8n/data
sudo chown -R $USER:$USER /opt/n8n
⚠️ Hinweis: Beispielbefehl. Passen Sie den Benutzer und die Berechtigungen an Ihre Server-Konfiguration an.
Erstellen Sie die Datei /opt/n8n/.env mit den Zugangsdaten:
# ──────────────────────────────────────────────────────────
# /opt/n8n/.env — Umgebungsvariablen für n8n
# ──────────────────────────────────────────────────────────
N8N_BASIC_AUTH_USER=admin
N8N_BASIC_AUTH_PASSWORD=IhrSicheresPasswortHier123!
N8N_ENCRYPTION_KEY=ein-langer-zufaelliger-encryption-key-hier
⚠️ Hinweis: Dies ist eine Beispiel-Konfiguration. Ersetzen Sie alle Platzhalter-Werte durch Ihre eigenen sicheren Passwörter und Schlüssel. Verwenden Sie
openssl rand -hex 32zur Generierung sicherer Schlüssel.
Erstellen Sie die Datei /opt/n8n/docker-compose.yml:
# ──────────────────────────────────────────────────────────
# /opt/n8n/docker-compose.yml — n8n Workflow-Engine
# ──────────────────────────────────────────────────────────
version: "3.8"
services:
n8n:
image: n8nio/n8n:latest
container_name: n8n
restart: unless-stopped
ports:
- "5678:5678"
environment:
- N8N_BASIC_AUTH_ACTIVE=true
- N8N_BASIC_AUTH_USER=${N8N_BASIC_AUTH_USER}
- N8N_BASIC_AUTH_PASSWORD=${N8N_BASIC_AUTH_PASSWORD}
- N8N_ENCRYPTION_KEY=${N8N_ENCRYPTION_KEY}
- GENERIC_TIMEZONE=Europe/Berlin
- TZ=Europe/Berlin
- N8N_LOG_LEVEL=info
- N8N_LOG_OUTPUT=console,file
- N8N_LOG_FILE_LOCATION=/home/node/.n8n/logs/n8n.log
- N8N_DIAGNOSTICS_ENABLED=false
- N8N_HIRING_BANNER_ENABLED=false
volumes:
# n8n-Daten persistent speichern
- /opt/n8n/data:/home/node/.n8n
# Zugriff auf ETL-Pipeline-Verzeichnis (Read-Only für Skripte)
- /opt/etl-pipeline:/opt/etl-pipeline:ro
# Zugriff auf ETL-Logs (Read-Write für Monitoring)
- /opt/etl-pipeline/logs:/opt/etl-pipeline/logs:rw
healthcheck:
test: ["CMD-SHELL", "wget -qO- http://localhost:5678/healthz || exit 1"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
deploy:
resources:
limits:
memory: 1G
cpus: "2.0"
reservations:
memory: 256M
⚠️ Hinweis: Dies ist eine Beispiel-Konfiguration. Die Volume-Pfade, Ressourcen-Limits und Ports müssen an Ihre Server-Umgebung angepasst werden.
n8n starten und Status prüfen:
# ──────────────────────────────────────────────────────────
# n8n Container starten
# ──────────────────────────────────────────────────────────
cd /opt/n8n
docker compose up -d
# Status prüfen
docker compose ps
docker compose logs -f --tail=50
⚠️ Hinweis: Beispielbefehle. Nach dem Start ist n8n unter
http://<Server-IP>:5678erreichbar.
3.2 Python-Umgebung einrichten
3.2.1 Virtual Environment erstellen
# ──────────────────────────────────────────────────────────
# Projektverzeichnis anlegen
# ──────────────────────────────────────────────────────────
sudo mkdir -p /opt/etl-pipeline
sudo chown -R $USER:$USER /opt/etl-pipeline
cd /opt/etl-pipeline
# ──────────────────────────────────────────────────────────
# Python Virtual Environment erstellen und aktivieren
# ──────────────────────────────────────────────────────────
python3.11 -m venv venv
source venv/bin/activate
# ──────────────────────────────────────────────────────────
# pip aktualisieren
# ──────────────────────────────────────────────────────────
pip install --upgrade pip setuptools wheel
⚠️ Hinweis: Dies sind Beispielbefehle. Stellen Sie sicher, dass
python3.11auf Ihrem System korrekt installiert ist (siehe Abschnitt 2.2).
3.2.2 Abhängigkeiten installieren
Erstellen Sie die Datei /opt/etl-pipeline/requirements.txt:
# ──────────────────────────────────────────────────────────
# /opt/etl-pipeline/requirements.txt
# ETL-Pipeline Abhängigkeiten
# ──────────────────────────────────────────────────────────
# DataFrame-Engine für schnelle Transformationen
polars>=0.20.0,<2.0.0
# Analytische In-Process SQL-Datenbank
duckdb>=0.10.0,<2.0.0
# Google API Client Libraries
google-api-python-client>=2.100.0
google-auth>=2.20.0
google-auth-oauthlib>=1.2.0
google-auth-httplib2>=0.2.0
# Konfiguration
PyYAML>=6.0
# Utilities
requests>=2.31.0
python-dateutil>=2.8.0
tenacity>=8.2.0
⚠️ Hinweis: Dies ist eine Beispiel-Datei. Prüfen Sie die aktuellen Versionen der Pakete und passen Sie die Versionsbeschränkungen an Ihre Kompatibilitätsanforderungen an.
Abhängigkeiten installieren:
# ──────────────────────────────────────────────────────────
# Abhängigkeiten in das Virtual Environment installieren
# ──────────────────────────────────────────────────────────
cd /opt/etl-pipeline
source venv/bin/activate
pip install -r requirements.txt
# Installation verifizieren
python -c "import polars; print(f'Polars: {polars.__version__}')"
python -c "import duckdb; print(f'DuckDB: {duckdb.__version__}')"
python -c "import googleapiclient; print('Google API Client: OK')"
python -c "import yaml; print(f'PyYAML: {yaml.__version__}')"
⚠️ Hinweis: Beispielbefehle zur Verifikation. Die Ausgaben variieren je nach installierten Versionen.
4. Projektstruktur
4.1 Empfohlene Verzeichnisstruktur
Die folgende Struktur organisiert alle Komponenten der Pipeline in logisch getrennte Verzeichnisse. Diese Trennung erleichtert Wartung, Backups und Zugriffssteuerung.
/opt/etl-pipeline/
│
├── venv/ # Python Virtual Environment
│ └── ... # (von venv automatisch verwaltet)
│
├── requirements.txt # Python-Abhängigkeiten
│
├── scripts/ # Alle Python-Skripte
│ ├── __init__.py
│ ├── main.py # Hauptskript (Entry Point)
│ ├── extract.py # Modul: Datenextraktion (CSV + Google API)
│ ├── transform.py # Modul: DuckDB JOIN + Polars Transformationen
│ ├── load.py # Modul: Google Content API Upload
│ ├── rules_engine.py # Modul: YAML-Regel-Engine
│ └── utils.py # Hilfsfunktionen (Logging, Config-Laden)
│
├── config/ # Konfigurationsdateien
│ ├── rules.yaml # Channable-ähnliche Transformationsregeln
│ ├── settings.yaml # Allgemeine Pipeline-Einstellungen
│ └── column_mapping.yaml # Spalten-Mapping JTL → Google Shopping
│
├── credentials/ # Sensible Zugangsdaten (restriktive Rechte!)
│ ├── google_service_account.json # Google Service Account Key
│ └── .gitignore # NIEMALS in Git committen!
│
├── data/ # Datendateien
│ ├── input/ # Eingangsdaten
│ │ └── jtl_export.csv # Aktueller JTL-WAWI CSV-Export
│ ├── output/ # Verarbeitete Ausgabedaten
│ │ └── google_feed_YYYYMMDD.csv
│ ├── archive/ # Archivierte ältere Exporte
│ │ └── jtl_export_20260301.csv
│ └── db/ # DuckDB-Datenbankdateien
│ └── products.duckdb # Persistente DuckDB-Datenbank
│
├── logs/ # Log-Dateien
│ ├── etl_pipeline.log # Haupt-Log (rotating)
│ ├── etl_pipeline.log.1 # Rotiertes Log
│ ├── error.log # Nur Fehler-Einträge
│ └── upload_errors/ # Detaillierte Upload-Fehler pro Lauf
│ └── errors_20260306.json
│
├── tests/ # Unit- und Integrationstests
│ ├── __init__.py
│ ├── test_extract.py
│ ├── test_transform.py
│ └── test_rules_engine.py
│
└── docs/ # Zusätzliche Projektdokumentation
└── CHANGELOG.md
⚠️ Hinweis: Dies ist eine Beispiel-Struktur. Passen Sie die Verzeichnisstruktur an die spezifischen Anforderungen Ihres Projekts und Ihrer Organisation an.
4.2 Verzeichnisstruktur anlegen
# ──────────────────────────────────────────────────────────
# Vollständige Verzeichnisstruktur anlegen
# ──────────────────────────────────────────────────────────
cd /opt/etl-pipeline
mkdir -p scripts
mkdir -p config
mkdir -p credentials
mkdir -p data/{input,output,archive,db}
mkdir -p logs/upload_errors
mkdir -p tests
mkdir -p docs
# ──────────────────────────────────────────────────────────
# Berechtigungen für sensible Verzeichnisse setzen
# ──────────────────────────────────────────────────────────
chmod 700 credentials/
chmod 750 config/
chmod 755 logs/
# ──────────────────────────────────────────────────────────
# .gitignore für credentials anlegen
# ──────────────────────────────────────────────────────────
echo "*" > credentials/.gitignore
echo "!.gitignore" >> credentials/.gitignore
# ──────────────────────────────────────────────────────────
# Python-Modul-Initialisierung
# ──────────────────────────────────────────────────────────
touch scripts/__init__.py
touch tests/__init__.py
⚠️ Hinweis: Beispielbefehle. Stellen Sie sicher, dass der Benutzer, unter dem die Pipeline läuft, die entsprechenden Lese- und Schreibrechte auf allen Verzeichnissen besitzt.
5. Konfiguration
5.1 Google API Credentials sicher speichern
5.1.1 Service Account erstellen
Für die Kommunikation mit der Google Content API for Shopping wird ein Service Account empfohlen. Dieser ermöglicht eine automatisierte Authentifizierung ohne manuellen OAuth2-Flow.
Schritte in der Google Cloud Console:
- Navigieren Sie zur Google Cloud Console.
- Erstellen Sie ein neues Projekt oder wählen Sie ein bestehendes aus.
- Aktivieren Sie die folgenden APIs:
- Content API for Shopping
- Google Ads API (falls Verkaufsdaten direkt aus Google Ads bezogen werden)
- Google Analytics Data API (falls GA4-Daten verwendet werden)
- Navigieren Sie zu IAM & Verwaltung → Dienstkonten.
- Erstellen Sie einen neuen Service Account mit einem aussagekräftigen Namen.
- Laden Sie den JSON-Key herunter.
- Fügen Sie den Service Account als Nutzer in Ihrem Google Merchant Center hinzu (mit der Rolle „Standard" oder „Admin").
5.1.2 Credentials sicher ablegen
# ──────────────────────────────────────────────────────────
# Service Account JSON-Key sicher auf dem Server ablegen
# ──────────────────────────────────────────────────────────
# Datei in das credentials-Verzeichnis kopieren
cp /pfad/zur/heruntergeladenen/datei.json \
/opt/etl-pipeline/credentials/google_service_account.json
# Restriktive Berechtigungen setzen (nur Owner lesen)
chmod 600 /opt/etl-pipeline/credentials/google_service_account.json
# Eigentümerschaft prüfen
ls -la /opt/etl-pipeline/credentials/
⚠️ Hinweis: Beispielbefehle. Ersetzen Sie den Pfad durch den tatsächlichen Speicherort Ihrer heruntergeladenen JSON-Datei.
5.1.3 Credentials als Umgebungsvariable referenzieren (empfohlen)
Es ist empfehlenswert, den Pfad zur Credentials-Datei nicht hart im Skript zu kodieren, sondern über eine Umgebungsvariable bereitzustellen.
# ──────────────────────────────────────────────────────────
# In /opt/etl-pipeline/.env oder ~/.bashrc hinzufügen:
# ──────────────────────────────────────────────────────────
export GOOGLE_APPLICATION_CREDENTIALS="/opt/etl-pipeline/credentials/google_service_account.json"
export MERCHANT_CENTER_ID="123456789"
⚠️ Hinweis: Beispielkonfiguration. Ersetzen Sie die
MERCHANT_CENTER_IDdurch Ihre tatsächliche Merchant-Center-ID.
5.1.4 Sicherheitshinweise
- Die JSON-Datei darf niemals in ein Git-Repository committed werden.
- Der Zugriff auf das
credentials/-Verzeichnis sollte auf den Pipeline-Benutzer beschränkt sein (chmod 700). - Rotieren Sie Service Account Keys regelmäßig (mindestens alle 90 Tage).
- Verwenden Sie nach Möglichkeit Workload Identity Federation anstelle von heruntergeladenen JSON-Keys.
- Protokollieren Sie niemals den Inhalt der Credentials-Datei in Log-Dateien.
5.2 Regel-Konfiguration (rules.yaml)
Die Datei rules.yaml ist das Herzstück der Channable-ähnlichen Funktionalität. Sie definiert regelbasierte Transformationen, die auf jeden Artikel angewandt werden. Die Regeln werden in der definierten Reihenfolge sequenziell abgearbeitet.
5.2.1 Struktur und Syntax
Erstellen Sie die Datei /opt/etl-pipeline/config/rules.yaml:
# ══════════════════════════════════════════════════════════
# /opt/etl-pipeline/config/rules.yaml
#
# Channable-ähnliche Regel-Engine für Google Shopping
# Regeln werden sequenziell von oben nach unten abgearbeitet.
#
# Unterstützte Operatoren:
# Vergleich: >, <, >=, <=, ==, !=
# Text: contains, not_contains, starts_with, ends_with
# Logik: and, or (innerhalb von conditions)
# Existenz: is_empty, is_not_empty
#
# Unterstützte Aktionen:
# set_value: Festes Wert setzen
# copy_field: Wert aus anderem Feld kopieren
# prepend: Text am Anfang hinzufügen
# append: Text am Ende hinzufügen
# replace: Text ersetzen
# calculate: Berechnung durchführen
# exclude: Artikel vom Feed ausschließen
# map_category: Kategorie-Mapping anwenden
# ══════════════════════════════════════════════════════════
# ──────────────────────────────────────────────────────────
# Globale Einstellungen
# ──────────────────────────────────────────────────────────
settings:
feed_name: "Google Shopping DE"
target_country: "DE"
content_language: "de"
currency: "EUR"
# Artikel mit Bestand 0 automatisch ausschließen
exclude_out_of_stock: true
# Mindestpreis für den Feed (in EUR)
minimum_price: 1.00
# ──────────────────────────────────────────────────────────
# Regel-Definitionen
# ──────────────────────────────────────────────────────────
rules:
# ────────────────────────────────────────────────────────
# REGEL 1: Bestseller-Label basierend auf Verkaufszahlen
# ────────────────────────────────────────────────────────
- name: "Bestseller-Klassifizierung"
description: "Artikel mit mehr als 100 Verkäufen in 90 Tagen als Bestseller markieren"
enabled: true
priority: 10
conditions:
logic: "and"
rules:
- field: "sales_quantity_90d"
operator: ">"
value: 100
- field: "stock_quantity"
operator: ">"
value: 0
actions:
- target_field: "custom_label_0"
action: "set_value"
value: "Bestseller"
# ────────────────────────────────────────────────────────
# REGEL 2: Slow Mover identifizieren
# ────────────────────────────────────────────────────────
- name: "Slow-Mover-Klassifizierung"
description: "Artikel mit weniger als 5 Verkäufen in 90 Tagen als Slow Mover markieren"
enabled: true
priority: 20
conditions:
logic: "and"
rules:
- field: "sales_quantity_90d"
operator: "<"
value: 5
- field: "days_in_catalog"
operator: ">"
value: 30
actions:
- target_field: "custom_label_0"
action: "set_value"
value: "Slow Mover"
# ────────────────────────────────────────────────────────
# REGEL 3: Performance-Tier (custom_label_1)
# ────────────────────────────────────────────────────────
- name: "Performance-Tier High"
description: "Umsatzbasierte Tier-Einteilung für Bidding-Strategien"
enabled: true
priority: 30
conditions:
logic: "and"
rules:
- field: "revenue_90d"
operator: ">="
value: 1000
actions:
- target_field: "custom_label_1"
action: "set_value"
value: "Tier-1-High-Revenue"
- name: "Performance-Tier Mittel"
enabled: true
priority: 31
conditions:
logic: "and"
rules:
- field: "revenue_90d"
operator: ">="
value: 200
- field: "revenue_90d"
operator: "<"
value: 1000
actions:
- target_field: "custom_label_1"
action: "set_value"
value: "Tier-2-Mid-Revenue"
- name: "Performance-Tier Niedrig"
enabled: true
priority: 32
conditions:
logic: "and"
rules:
- field: "revenue_90d"
operator: "<"
value: 200
actions:
- target_field: "custom_label_1"
action: "set_value"
value: "Tier-3-Low-Revenue"
# ────────────────────────────────────────────────────────
# REGEL 4: Margen-Klassifizierung (custom_label_2)
# ────────────────────────────────────────────────────────
- name: "Hohe Marge"
enabled: true
priority: 40
conditions:
logic: "and"
rules:
- field: "margin_percent"
operator: ">="
value: 40
actions:
- target_field: "custom_label_2"
action: "set_value"
value: "High-Margin"
- name: "Niedrige Marge"
enabled: true
priority: 41
conditions:
logic: "and"
rules:
- field: "margin_percent"
operator: "<"
value: 15
actions:
- target_field: "custom_label_2"
action: "set_value"
value: "Low-Margin"
# ────────────────────────────────────────────────────────
# REGEL 5: Preiskategorie (custom_label_3)
# ────────────────────────────────────────────────────────
- name: "Premium-Preis"
enabled: true
priority: 50
conditions:
logic: "and"
rules:
- field: "price"
operator: ">="
value: 100
actions:
- target_field: "custom_label_3"
action: "set_value"
value: "Premium"
- name: "Budget-Preis"
enabled: true
priority: 51
conditions:
logic: "and"
rules:
- field: "price"
operator: "<"
value: 20
actions:
- target_field: "custom_label_3"
action: "set_value"
value: "Budget"
# ────────────────────────────────────────────────────────
# REGEL 6: Titel-Optimierung
# ────────────────────────────────────────────────────────
- name: "Marke im Titel voranstellen"
description: "Markenname am Anfang des Titels hinzufügen, falls nicht vorhanden"
enabled: true
priority: 60
conditions:
logic: "and"
rules:
- field: "brand"
operator: "is_not_empty"
- field: "title"
operator: "not_contains"
value_from_field: "brand"
actions:
- target_field: "title"
action: "prepend"
value_template: "{brand} - "
# ────────────────────────────────────────────────────────
# REGEL 7: Sale-Preis-Logik
# ────────────────────────────────────────────────────────
- name: "Sale-Preis setzen"
description: "Wenn ein UVP vorhanden und höher als der VK-Preis ist, als Sale kennzeichnen"
enabled: true
priority: 70
conditions:
logic: "and"
rules:
- field: "uvp"
operator: "is_not_empty"
- field: "uvp"
operator: ">"
value_from_field: "price"
actions:
- target_field: "sale_price"
action: "copy_field"
source_field: "price"
- target_field: "price"
action: "copy_field"
source_field: "uvp"
# ────────────────────────────────────────────────────────
# REGEL 8: Ausschluss-Regeln
# ────────────────────────────────────────────────────────
- name: "Artikel ohne Bild ausschließen"
enabled: true
priority: 80
conditions:
logic: "or"
rules:
- field: "image_url"
operator: "is_empty"
- field: "image_url"
operator: "=="
value: ""
actions:
- action: "exclude"
reason: "Kein Produktbild vorhanden"
- name: "Testprodukte ausschließen"
enabled: true
priority: 81
conditions:
logic: "or"
rules:
- field: "title"
operator: "contains"
value: "TEST"
- field: "sku"
operator: "starts_with"
value: "ZZ-"
actions:
- action: "exclude"
reason: "Testprodukt erkannt"
# ────────────────────────────────────────────────────────
# REGEL 9: Versandkosten basierend auf Preis
# ────────────────────────────────────────────────────────
- name: "Kostenloser Versand ab 49 EUR"
enabled: true
priority: 90
conditions:
logic: "and"
rules:
- field: "price"
operator: ">="
value: 49
actions:
- target_field: "shipping_cost"
action: "set_value"
value: "0.00"
- name: "Standard-Versandkosten"
enabled: true
priority: 91
conditions:
logic: "and"
rules:
- field: "price"
operator: "<"
value: 49
actions:
- target_field: "shipping_cost"
action: "set_value"
value: "4.95"
⚠️ Hinweis: Dies ist eine Beispiel-Konfigurationsdatei. Alle Schwellenwerte, Feldnamen und Regeln müssen an Ihre tatsächlichen Geschäftsanforderungen, JTL-Exportfelder und Google-Shopping-Attribute angepasst werden.
5.2.2 Spalten-Mapping (column_mapping.yaml)
Erstellen Sie zusätzlich die Datei /opt/etl-pipeline/config/column_mapping.yaml:
# ══════════════════════════════════════════════════════════
# /opt/etl-pipeline/config/column_mapping.yaml
#
# Mapping: JTL-WAWI Exportfeld → Google Shopping Attribut
# ══════════════════════════════════════════════════════════
mapping:
# Pflichtfelder Google Shopping
Artikelnummer: "offerId"
Artikelname: "title"
Beschreibung: "description"
URL: "link"
BildURL: "imageLink"
VK_Brutto: "price"
Waehrung: "priceCurrency"
Verfuegbarkeit: "availability"
Marke: "brand"
GTIN: "gtin"
MPN: "mpn"
Zustand: "condition"
Kategorie_Google: "googleProductCategory"
# Optionale Felder
Farbe: "color"
Groesse: "size"
Material: "material"
Geschlecht: "gender"
Altersgruppe: "ageGroup"
EK_Netto: "cost_of_goods_sold"
UVP: "uvp"
Lagerbestand: "stock_quantity"
# Berechnete Felder (werden durch Pipeline erzeugt)
# sales_quantity_90d → aus Google API
# revenue_90d → aus Google API
# margin_percent → berechnet aus VK und EK
# custom_label_0-4 → aus rules.yaml
⚠️ Hinweis: Dies ist eine Beispiel-Zuordnung. Die tatsächlichen Spaltennamen in Ihrem JTL-WAWI-Export können abweichen. Prüfen Sie die Header-Zeile Ihrer CSV-Datei und passen Sie das Mapping entsprechend an.
6. Core Script Logic (Beispiel)
Dieses Kapitel zeigt den konzeptionellen Aufbau der Python-Skripte. Der Code ist als funktionales Boilerplate zu verstehen, das als Grundlage für die eigene Implementierung dient.
6.1 Hilfsfunktionen (utils.py)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
/opt/etl-pipeline/scripts/utils.py
Hilfsfunktionen für Logging, Konfiguration und gemeinsam genutzte Utilities.
"""
import os
import sys
import yaml
import logging
from pathlib import Path
from datetime import datetime
from logging.handlers import RotatingFileHandler
# ──────────────────────────────────────────────────────────
# Pfad-Konstanten
# ──────────────────────────────────────────────────────────
BASE_DIR = Path("/opt/etl-pipeline")
CONFIG_DIR = BASE_DIR / "config"
DATA_DIR = BASE_DIR / "data"
LOG_DIR = BASE_DIR / "logs"
CREDENTIALS_DIR = BASE_DIR / "credentials"
def setup_logging(
log_name: str = "etl_pipeline",
log_level: int = logging.INFO,
max_bytes: int = 10 * 1024 * 1024, # 10 MB
backup_count: int = 5,
) -> logging.Logger:
"""
Konfiguriert das Logging mit Rotation und separatem Error-Log.
Args:
log_name: Name des Loggers und der Log-Datei.
log_level: Logging-Level (default: INFO).
max_bytes: Maximale Größe einer Log-Datei vor Rotation.
backup_count: Anzahl der aufzubewahrenden rotierten Log-Dateien.
Returns:
Konfigurierter Logger.
"""
LOG_DIR.mkdir(parents=True, exist_ok=True)
logger = logging.getLogger(log_name)
logger.setLevel(log_level)
# Verhindere doppelte Handler bei mehrfachem Aufruf
if logger.handlers:
return logger
# Formatter
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)-8s | %(name)s | %(funcName)s:%(lineno)d | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Haupt-Log (Rotation)
main_handler = RotatingFileHandler(
LOG_DIR / f"{log_name}.log",
maxBytes=max_bytes,
backupCount=backup_count,
encoding="utf-8",
)
main_handler.setLevel(log_level)
main_handler.setFormatter(formatter)
# Error-Log (nur ERROR und höher)
error_handler = RotatingFileHandler(
LOG_DIR / "error.log",
maxBytes=max_bytes,
backupCount=backup_count,
encoding="utf-8",
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)
# Console-Handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(log_level)
console_handler.setFormatter(formatter)
logger.addHandler(main_handler)
logger.addHandler(error_handler)
logger.addHandler(console_handler)
return logger
def load_yaml_config(config_filename: str) -> dict:
"""
Lädt eine YAML-Konfigurationsdatei aus dem config-Verzeichnis.
Args:
config_filename: Name der YAML-Datei (z. B. 'rules.yaml').
Returns:
Dictionary mit dem Inhalt der YAML-Datei.
Raises:
FileNotFoundError: Wenn die Datei nicht existiert.
yaml.YAMLError: Wenn die YAML-Syntax ungültig ist.
"""
config_path = CONFIG_DIR / config_filename
if not config_path.exists():
raise FileNotFoundError(
f"Konfigurationsdatei nicht gefunden: {config_path}"
)
with open(config_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
return config
def get_timestamp() -> str:
"""Gibt einen formatierten Zeitstempel zurück (YYYYMMDD_HHMMSS)."""
return datetime.now().strftime("%Y%m%d_%H%M%S")
def get_date_stamp() -> str:
"""Gibt einen formatierten Datumsstempel zurück (YYYYMMDD)."""
return datetime.now().strftime("%Y%m%d")
⚠️ Hinweis: Dies ist ein Beispiel-Skript. Alle Pfade, Log-Konfigurationen und Funktionen müssen an Ihre tatsächliche Serverumgebung und Anforderungen angepasst werden.
6.2 Extract-Modul (extract.py)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
/opt/etl-pipeline/scripts/extract.py
Modul für die Datenextraktion aus JTL-WAWI CSV und Google APIs.
Beide Datenquellen werden als Tabellen in DuckDB importiert.
"""
import os
import shutil
import duckdb
import polars as pl
from pathlib import Path
from datetime import datetime, timedelta
from google.oauth2 import service_account
from googleapiclient.discovery import build
from scripts.utils import (
setup_logging,
BASE_DIR,
DATA_DIR,
CREDENTIALS_DIR,
get_date_stamp,
)
logger = setup_logging("extract")
# ══════════════════════════════════════════════════════════
# JTL-WAWI CSV-Import
# ══════════════════════════════════════════════════════════
def load_jtl_csv_to_duckdb(
con: duckdb.DuckDBPyConnection,
csv_path: str | Path | None = None,
table_name: str = "products_raw",
) -> int:
"""
Liest die JTL-WAWI CSV-Exportdatei und importiert sie als Tabelle
in die DuckDB-Datenbank.
Args:
con: Aktive DuckDB-Verbindung.
csv_path: Pfad zur CSV-Datei. Falls None, wird der Standardpfad verwendet.
table_name: Name der Zieltabelle in DuckDB.
Returns:
Anzahl der importierten Zeilen.
"""
if csv_path is None:
csv_path = DATA_DIR / "input" / "jtl_export.csv"
csv_path = Path(csv_path)
if not csv_path.exists():
raise FileNotFoundError(f"JTL-CSV nicht gefunden: {csv_path}")
file_size_mb = csv_path.stat().st_size / (1024 * 1024)
logger.info(f"Lade JTL-CSV: {csv_path} ({file_size_mb:.1f} MB)")
# Bestehende Tabelle löschen und neu erstellen
con.execute(f"DROP TABLE IF EXISTS {table_name}")
# CSV mit DuckDB's optimiertem CSV-Reader importieren
# auto_detect=true erkennt Trennzeichen, Encoding und Datentypen
con.execute(f"""
CREATE TABLE {table_name} AS
SELECT *
FROM read_csv_auto(
'{csv_path}',
header = true,
delim = ';',
quote = '"',
escape = '"',
encoding = 'utf-8',
sample_size = 10000,
ignore_errors = false
)
""")
row_count = con.execute(
f"SELECT COUNT(*) FROM {table_name}"
).fetchone()[0]
logger.info(
f"JTL-CSV erfolgreich geladen: "
f"{row_count:,} Zeilen in Tabelle '{table_name}'"
)
# Archivierung des Imports
archive_path = DATA_DIR / "archive" / f"jtl_export_{get_date_stamp()}.csv"
if not archive_path.exists():
shutil.copy2(csv_path, archive_path)
logger.info(f"CSV archiviert: {archive_path}")
return row_count
# ══════════════════════════════════════════════════════════
# Google Sales-Daten abrufen
# ══════════════════════════════════════════════════════════
def fetch_google_sales_data(
con: duckdb.DuckDBPyConnection,
table_name: str = "sales_90d",
lookback_days: int = 90,
) -> int:
"""
Ruft Verkaufsdaten der letzten N Tage über die Google API ab
und importiert sie als Tabelle in DuckDB.
Args:
con: Aktive DuckDB-Verbindung.
table_name: Name der Zieltabelle in DuckDB.
lookback_days: Anzahl der zurückliegenden Tage (Standard: 90).
Returns:
Anzahl der importierten Zeilen.
"""
logger.info(
f"Rufe Google Sales-Daten ab (letzte {lookback_days} Tage)..."
)
# ── Google API Authentifizierung ──────────────────────
credentials_path = os.environ.get(
"GOOGLE_APPLICATION_CREDENTIALS",
str(CREDENTIALS_DIR / "google_service_account.json"),
)
merchant_id = os.environ.get("MERCHANT_CENTER_ID")
if not merchant_id:
raise ValueError(
"Umgebungsvariable MERCHANT_CENTER_ID ist nicht gesetzt"
)
credentials = service_account.Credentials.from_service_account_file(
credentials_path,
scopes=["https://www.googleapis.com/auth/content"],
)
service = build("content", "v2.1", credentials=credentials)
# ── Zeitraum berechnen ────────────────────────────────
end_date = datetime.now()
start_date = end_date - timedelta(days=lookback_days)
# ── Performance-Daten aus dem Merchant Center abrufen ─
# HINWEIS: Der tatsächliche API-Aufruf hängt von der
# verwendeten Google API ab (Merchant Center Reports,
# Google Ads API, GA4 Data API, etc.)
# Hier wird ein konzeptionelles Beispiel gezeigt.
sales_records = []
try:
# Beispiel: Merchant Center Performance Report
request_body = {
"query": f"""
SELECT
segments.offer_id,
metrics.impressions,
metrics.clicks,
metrics.conversions,
metrics.conversion_value
FROM MerchantPerformanceView
WHERE segments.date BETWEEN
'{start_date.strftime('%Y-%m-%d')}'
AND '{end_date.strftime('%Y-%m-%d')}'
"""
}
response = (
service.reports()
.search(merchantId=merchant_id, body=request_body)
.execute()
)
for row in response.get("results", []):
segments = row.get("segments", {})
metrics = row.get("metrics", {})
sales_records.append({
"offer_id": segments.get("offerId", ""),
"impressions": int(metrics.get("impressions", 0)),
"clicks": int(metrics.get("clicks", 0)),
"sales_quantity_90d": int(
metrics.get("conversions", 0)
),
"revenue_90d": float(
metrics.get("conversionValue", 0.0)
),
})
except Exception as e:
logger.error(
f"Fehler beim Abrufen der Google Sales-Daten: {e}"
)
raise
# ── Daten in DuckDB importieren ───────────────────────
if sales_records:
df_sales = pl.DataFrame(sales_records)
con.execute(f"DROP TABLE IF EXISTS {table_name}")
con.execute(
f"CREATE TABLE {table_name} AS SELECT * FROM df_sales"
)
row_count = con.execute(
f"SELECT COUNT(*) FROM {table_name}"
).fetchone()[0]
logger.info(
f"Google Sales-Daten geladen: "
f"{row_count:,} Zeilen in '{table_name}'"
)
return row_count
else:
logger.warning(
"Keine Sales-Daten von der Google API erhalten."
)
# Leere Tabelle erstellen, damit der JOIN nicht fehlschlägt
con.execute(f"DROP TABLE IF EXISTS {table_name}")
con.execute(f"""
CREATE TABLE {table_name} (
offer_id VARCHAR,
impressions INTEGER DEFAULT 0,
clicks INTEGER DEFAULT 0,
sales_quantity_90d INTEGER DEFAULT 0,
revenue_90d DOUBLE DEFAULT 0.0
)
""")
return 0
⚠️ Hinweis: Dies ist ein konzeptionelles Beispiel-Skript. Die Google API-Aufrufe, insbesondere der Report-Query und die Antwortstruktur, müssen an die tatsächlich verwendete API-Version und Ihr Merchant-Center-Setup angepasst werden. Konsultieren Sie die offizielle Google API-Dokumentation für die aktuellen Endpunkte und Antwortformate.
6.3 Transform-Modul (transform.py)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
/opt/etl-pipeline/scripts/transform.py
Modul für die Datentransformation:
1. LEFT JOIN in DuckDB (Produkte ⟕ Sales)
2. Polars-Transformationen basierend auf rules.yaml
"""
import duckdb
import polars as pl
from typing import Any
from scripts.utils import setup_logging, load_yaml_config
logger = setup_logging("transform")
# ══════════════════════════════════════════════════════════
# DuckDB LEFT JOIN
# ══════════════════════════════════════════════════════════
def join_products_with_sales(
con: duckdb.DuckDBPyConnection,
products_table: str = "products_raw",
sales_table: str = "sales_90d",
joined_table: str = "products_enriched",
join_key_products: str = "Artikelnummer",
join_key_sales: str = "offer_id",
) -> pl.DataFrame:
"""
Führt einen LEFT JOIN zwischen Produktstammdaten und
Verkaufsdaten in DuckDB durch und gibt das Ergebnis als
Polars DataFrame zurück.
Ein LEFT JOIN stellt sicher, dass alle Produkte erhalten
bleiben, auch wenn keine Verkaufsdaten vorliegen
(NULL-Werte werden mit 0 gefüllt).
Args:
con: Aktive DuckDB-Verbindung.
products_table: Name der Produkttabelle.
sales_table: Name der Verkaufstabelle.
joined_table: Name der Ergebnistabelle.
join_key_products: JOIN-Schlüssel in der Produkttabelle.
join_key_sales: JOIN-Schlüssel in der Verkaufstabelle.
Returns:
Polars DataFrame mit den verknüpften Daten.
"""
logger.info(
f"Führe LEFT JOIN durch: {products_table} ⟕ {sales_table} "
f"ON {join_key_products} = {join_key_sales}"
)
# ── LEFT JOIN mit COALESCE für NULL-Behandlung ────────
con.execute(f"DROP TABLE IF EXISTS {joined_table}")
con.execute(f"""
CREATE TABLE {joined_table} AS
SELECT
p.*,
COALESCE(s.impressions, 0)
AS impressions,
COALESCE(s.clicks, 0)
AS clicks,
COALESCE(s.sales_quantity_90d, 0)
AS sales_quantity_90d,
COALESCE(s.revenue_90d, 0.0)
AS revenue_90d
FROM {products_table} AS p
LEFT JOIN {sales_table} AS s
ON CAST(p."{join_key_products}" AS VARCHAR)
= CAST(s.{join_key_sales} AS VARCHAR)
""")
row_count = con.execute(
f"SELECT COUNT(*) FROM {joined_table}"
).fetchone()[0]
logger.info(
f"LEFT JOIN abgeschlossen: "
f"{row_count:,} Zeilen in '{joined_table}'"
)
# ── Ergebnis als Polars DataFrame exportieren ─────────
result = con.execute(f"SELECT * FROM {joined_table}").pl()
logger.info(
f"Polars DataFrame erstellt: "
f"{result.shape[0]:,} Zeilen, {result.shape[1]} Spalten"
)
return result
# ══════════════════════════════════════════════════════════
# Berechnete Felder hinzufügen
# ══════════════════════════════════════════════════════════
def add_calculated_fields(df: pl.DataFrame) -> pl.DataFrame:
"""
Fügt berechnete Felder hinzu, die als Grundlage für die
Regel-Engine dienen.
Args:
df: Polars DataFrame mit den verknüpften Rohdaten.
Returns:
Polars DataFrame mit zusätzlichen berechneten Spalten.
"""
logger.info("Füge berechnete Felder hinzu...")
df = df.with_columns([
# Marge in Prozent (wenn EK vorhanden)
pl.when(
(pl.col("EK_Netto").is_not_null())
& (pl.col("VK_Brutto") > 0)
)
.then(
(
(pl.col("VK_Brutto") - pl.col("EK_Netto"))
/ pl.col("VK_Brutto")
* 100
).round(2)
)
.otherwise(pl.lit(None))
.alias("margin_percent"),
# Click-Through-Rate
pl.when(pl.col("impressions") > 0)
.then(
(pl.col("clicks") / pl.col("impressions") * 100)
.round(2)
)
.otherwise(pl.lit(0.0))
.alias("ctr_percent"),
# Conversion-Rate
pl.when(pl.col("clicks") > 0)
.then(
(
pl.col("sales_quantity_90d")
/ pl.col("clicks")
* 100
).round(2)
)
.otherwise(pl.lit(0.0))
.alias("conversion_rate_percent"),
# Custom Label Spalten initialisieren (leer)
pl.lit("").alias("custom_label_0"),
pl.lit("").alias("custom_label_1"),
pl.lit("").alias("custom_label_2"),
pl.lit("").alias("custom_label_3"),
pl.lit("").alias("custom_label_4"),
# Versandkosten initialisieren
pl.lit("4.95").alias("shipping_cost"),
# Exclude-Flag initialisieren
pl.lit(False).alias("_excluded"),
pl.lit("").alias("_exclude_reason"),
])
logger.info(
f"Berechnete Felder hinzugefügt. "
f"DataFrame: {df.shape[1]} Spalten"
)
return df
# ══════════════════════════════════════════════════════════
# YAML Regel-Engine
# ══════════════════════════════════════════════════════════
def evaluate_condition(
df: pl.DataFrame,
condition: dict,
) -> pl.Series:
"""
Evaluiert eine einzelne Bedingung und gibt eine
boolesche Maske zurück.
Args:
df: Polars DataFrame.
condition: Dictionary mit field, operator, value.
Returns:
Polars Series (Boolean) als Filtermaske.
"""
field = condition["field"]
operator = condition["operator"]
value = condition.get("value")
value_from_field = condition.get("value_from_field")
# Prüfen ob das Feld existiert
if field not in df.columns:
logger.warning(
f"Feld '{field}' existiert nicht im DataFrame. "
f"Bedingung übersprungen."
)
return pl.Series("mask", [False] * df.height)
col = pl.col(field)
# Vergleichswert: fester Wert oder aus anderem Feld
if value_from_field:
compare_value = pl.col(value_from_field)
else:
compare_value = value
# ── Operator-Mapping ──────────────────────────────────
operator_map = {
">": col > compare_value,
"<": col < compare_value,
">=": col >= compare_value,
"<=": col <= compare_value,
"==": col == compare_value,
"!=": col != compare_value,
"contains": col.cast(pl.Utf8).str.contains(
str(compare_value), literal=True
),
"not_contains": ~col.cast(pl.Utf8).str.contains(
str(compare_value), literal=True
),
"starts_with": col.cast(pl.Utf8).str.starts_with(
str(compare_value)
),
"ends_with": col.cast(pl.Utf8).str.ends_with(
str(compare_value)
),
"is_empty": (
col.is_null() | (col.cast(pl.Utf8) == "")
),
"is_not_empty": (
col.is_not_null() & (col.cast(pl.Utf8) != "")
),
}
if operator not in operator_map:
logger.error(f"Unbekannter Operator: '{operator}'")
return pl.Series("mask", [False] * df.height)
return df.select(operator_map[operator]).to_series()
def apply_rules(
df: pl.DataFrame,
rules_config: dict,
) -> pl.DataFrame:
"""
Wendet alle aktivierten Regeln aus der rules.yaml auf den
DataFrame an. Regeln werden nach Priorität sortiert und
sequenziell abgearbeitet.
Args:
df: Polars DataFrame mit den angereicherten Produktdaten.
rules_config: Geladene rules.yaml als Dictionary.
Returns:
Polars DataFrame nach Anwendung aller Regeln.
"""
rules = rules_config.get("rules", [])
settings = rules_config.get("settings", {})
# Regeln nach Priorität sortieren
# (niedrigere Zahl = höhere Priorität)
rules_sorted = sorted(
[r for r in rules if r.get("enabled", True)],
key=lambda r: r.get("priority", 999),
)
logger.info(f"Wende {len(rules_sorted)} aktive Regeln an...")
# ── Globale Ausschlüsse (Settings) ────────────────────
if settings.get("exclude_out_of_stock", False):
df = df.with_columns(
pl.when(pl.col("stock_quantity") <= 0)
.then(pl.lit(True))
.otherwise(pl.col("_excluded"))
.alias("_excluded")
)
excluded = df.filter(pl.col("_excluded")).height
logger.info(
f"Globaler Ausschluss (Bestand=0): "
f"{excluded:,} Artikel markiert"
)
min_price = settings.get("minimum_price", 0)
if min_price > 0:
df = df.with_columns(
pl.when(pl.col("VK_Brutto") < min_price)
.then(pl.lit(True))
.otherwise(pl.col("_excluded"))
.alias("_excluded")
)
# ── Regeln sequenziell anwenden ───────────────────────
for rule in rules_sorted:
rule_name = rule.get("name", "Unbenannt")
conditions_config = rule.get("conditions", {})
actions = rule.get("actions", [])
logic = conditions_config.get("logic", "and")
sub_rules = conditions_config.get("rules", [])
logger.debug(
f"Verarbeite Regel: '{rule_name}' "
f"(Priorität: {rule.get('priority', '?')})"
)
# Alle Teilbedingungen evaluieren
masks = [
evaluate_condition(df, cond) for cond in sub_rules
]
if not masks:
logger.warning(
f"Regel '{rule_name}' hat keine Bedingungen. "
f"Übersprungen."
)
continue
# Logische Verknüpfung der Teilbedingungen
if logic == "and":
combined_mask = masks[0]
for m in masks[1:]:
combined_mask = combined_mask & m
elif logic == "or":
combined_mask = masks[0]
for m in masks[1:]:
combined_mask = combined_mask | m
else:
logger.error(
f"Unbekannte Logik '{logic}' in Regel "
f"'{rule_name}'. Übersprungen."
)
continue
matching_count = combined_mask.sum()
logger.info(
f"Regel '{rule_name}': "
f"{matching_count:,} Artikel treffen zu"
)
# Aktionen auf übereinstimmende Zeilen anwenden
for action_def in actions:
action_type = action_def.get("action", "set_value")
target_field = action_def.get("target_field", "")
if action_type == "set_value":
value = action_def.get("value", "")
df = df.with_columns(
pl.when(combined_mask)
.then(pl.lit(value))
.otherwise(pl.col(target_field))
.alias(target_field)
)
elif action_type == "copy_field":
source = action_def.get("source_field", "")
if source in df.columns:
df = df.with_columns(
pl.when(combined_mask)
.then(pl.col(source))
.otherwise(pl.col(target_field))
.alias(target_field)
)
elif action_type == "prepend":
template = action_def.get(
"value_template", ""
)
for col_name in df.columns:
placeholder = f"{{{col_name}}}"
if placeholder in template:
suffix = template.replace(
placeholder, ""
)
df = df.with_columns(
pl.when(combined_mask)
.then(
pl.col(col_name)
.cast(pl.Utf8)
+ pl.lit(suffix)
+ pl.col(target_field)
.cast(pl.Utf8)
)
.otherwise(pl.col(target_field))
.alias(target_field)
)
break
elif action_type == "append":
value = action_def.get("value", "")
df = df.with_columns(
pl.when(combined_mask)
.then(
pl.col(target_field).cast(pl.Utf8)
+ pl.lit(value)
)
.otherwise(pl.col(target_field))
.alias(target_field)
)
elif action_type == "exclude":
reason = action_def.get(
"reason", "Durch Regel ausgeschlossen"
)
df = df.with_columns(
pl.when(combined_mask)
.then(pl.lit(True))
.otherwise(pl.col("_excluded"))
.alias("_excluded"),
pl.when(combined_mask)
.then(pl.lit(reason))
.otherwise(pl.col("_exclude_reason"))
.alias("_exclude_reason"),
)
# ── Statistik ─────────────────────────────────────────
total = df.height
excluded_count = df.filter(pl.col("_excluded")).height
active_count = total - excluded_count
logger.info(
f"Transformation abgeschlossen: {total:,} Gesamt | "
f"{active_count:,} Aktiv | "
f"{excluded_count:,} Ausgeschlossen"
)
return df
⚠️ Hinweis: Dies ist ein konzeptionelles Beispiel-Skript. Die Regel-Engine ist bewusst einfach gehalten und muss für den Produktionseinsatz um Fehlerbehandlung, Validierung der Eingabedaten und weitere Operatoren erweitert werden. Die Feldnamen (
EK_Netto,VK_Brutto,stock_quantityusw.) müssen mit den tatsächlichen Spalten Ihres JTL-Exports übereinstimmen.
6.4 Load-Modul (load.py)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
/opt/etl-pipeline/scripts/load.py
Modul für den Batch-Upload transformierter Produktdaten
an die Google Content API for Shopping.
Implementiert Rate-Limiting und exponentielles Backoff.
"""
import os
import json
import time
import polars as pl
from pathlib import Path
from datetime import datetime
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
from scripts.utils import (
setup_logging,
load_yaml_config,
CREDENTIALS_DIR,
DATA_DIR,
LOG_DIR,
get_date_stamp,
)
logger = setup_logging("load")
# ══════════════════════════════════════════════════════════
# Konstanten
# ══════════════════════════════════════════════════════════
BATCH_SIZE = 1000 # Produkte pro Batch-Request
RATE_LIMIT_DELAY = 1.0 # Sekunden Pause zwischen Batches
MAX_RETRIES = 5 # Maximale Wiederholungsversuche
BACKOFF_MULTIPLIER = 2 # Exponentieller Backoff-Faktor
# ══════════════════════════════════════════════════════════
# Google Content API Client
# ══════════════════════════════════════════════════════════
def get_content_api_service():
"""
Erstellt einen authentifizierten Google Content API Service.
Returns:
Google API Service-Objekt für Content API v2.1.
"""
credentials_path = os.environ.get(
"GOOGLE_APPLICATION_CREDENTIALS",
str(CREDENTIALS_DIR / "google_service_account.json"),
)
credentials = service_account.Credentials.from_service_account_file(
credentials_path,
scopes=["https://www.googleapis.com/auth/content"],
)
service = build("content", "v2.1", credentials=credentials)
logger.info("Google Content API Service erstellt.")
return service
# ══════════════════════════════════════════════════════════
# Produkt-Payload-Erstellung
# ══════════════════════════════════════════════════════════
def build_product_payload(
row: dict,
column_mapping: dict,
settings: dict,
) -> dict:
"""
Erstellt den API-Payload für ein einzelnes Produkt nach
dem Google Shopping Content API Schema.
Args:
row: Dictionary mit den Produktdaten einer Zeile.
column_mapping: Spalten-Mapping (JTL → Google).
settings: Globale Pipeline-Einstellungen.
Returns:
Dictionary im Google Content API Produktformat.
"""
target_country = settings.get("target_country", "DE")
content_language = settings.get("content_language", "de")
currency = settings.get("currency", "EUR")
mapping = column_mapping.get("mapping", {})
product = {
"offerId": str(row.get(
mapping.get("Artikelnummer", "Artikelnummer"), ""
)),
"title": str(row.get("title", row.get(
mapping.get("Artikelname", ""), ""
))),
"description": str(row.get(
mapping.get("Beschreibung", "Beschreibung"), ""
)),
"link": str(row.get(
mapping.get("URL", "URL"), ""
)),
"imageLink": str(row.get(
mapping.get("BildURL", "BildURL"), ""
)),
"contentLanguage": content_language,
"targetCountry": target_country,
"channel": "online",
"availability": (
"in stock"
if row.get("stock_quantity", 0) > 0
else "out of stock"
),
"condition": "new",
"price": {
"value": str(row.get("VK_Brutto", "0.00")),
"currency": currency,
},
}
# Optionale Felder hinzufügen
brand = row.get("Marke", row.get("brand", ""))
if brand:
product["brand"] = str(brand)
gtin = row.get("GTIN", row.get("gtin", ""))
if gtin:
product["gtin"] = str(gtin)
# Custom Labels (0-4)
for i in range(5):
label_value = row.get(f"custom_label_{i}", "")
if label_value:
product[f"customLabel{i}"] = str(label_value)
# Sale-Preis
sale_price = row.get("sale_price", "")
if sale_price and float(sale_price) > 0:
product["salePrice"] = {
"value": str(sale_price),
"currency": currency,
}
# Versandkosten
shipping_cost = row.get("shipping_cost", "")
if shipping_cost is not None and shipping_cost != "":
product["shipping"] = [{
"country": target_country,
"price": {
"value": str(shipping_cost),
"currency": currency,
},
}]
return product
# ══════════════════════════════════════════════════════════
# Batch-Upload mit Rate Limiting
# ══════════════════════════════════════════════════════════
@retry(
stop=stop_after_attempt(MAX_RETRIES),
wait=wait_exponential(
multiplier=BACKOFF_MULTIPLIER, min=2, max=60
),
retry=retry_if_exception_type(
(HttpError, ConnectionError, TimeoutError)
),
before_sleep=lambda retry_state: logger.warning(
f"Retry {retry_state.attempt_number}/{MAX_RETRIES} "
f"nach Fehler. "
f"Warte {retry_state.next_action.sleep:.1f}s..."
),
)
def execute_batch_request(
service,
merchant_id: str,
batch_entries: list[dict],
) -> dict:
"""
Führt einen einzelnen Batch-Request an die Content API aus.
Implementiert exponentielles Backoff bei Rate-Limit-Fehlern.
Args:
service: Google Content API Service.
merchant_id: Merchant Center ID.
batch_entries: Liste der Batch-Einträge.
Returns:
API-Antwort als Dictionary.
"""
body = {"entries": batch_entries}
response = (
service.products()
.custombatch(body=body)
.execute()
)
return response
def upload_products_to_google(
df: pl.DataFrame,
batch_size: int = BATCH_SIZE,
rate_limit_delay: float = RATE_LIMIT_DELAY,
) -> dict:
"""
Lädt alle aktiven (nicht ausgeschlossenen) Produkte als
Batch-Requests an die Google Content API hoch.
Args:
df: Polars DataFrame mit transformierten Produktdaten.
batch_size: Anzahl der Produkte pro Batch-Request.
rate_limit_delay: Pause in Sekunden zwischen Batches.
Returns:
Dictionary mit Upload-Statistiken.
"""
merchant_id = os.environ.get("MERCHANT_CENTER_ID")
if not merchant_id:
raise ValueError("MERCHANT_CENTER_ID ist nicht gesetzt")
# Konfiguration laden
column_mapping = load_yaml_config("column_mapping.yaml")
rules_config = load_yaml_config("rules.yaml")
settings = rules_config.get("settings", {})
# Nur aktive (nicht ausgeschlossene) Produkte uploaden
df_active = df.filter(~pl.col("_excluded"))
total_products = df_active.height
logger.info(
f"Starte Upload: {total_products:,} aktive Produkte "
f"in Batches von {batch_size}"
)
# Service erstellen
service = get_content_api_service()
# ── Statistiken ───────────────────────────────────────
stats = {
"total": total_products,
"success": 0,
"errors": 0,
"batches": 0,
"start_time": datetime.now().isoformat(),
"error_details": [],
}
# ── Produkte in Batches aufteilen und hochladen ───────
rows_as_dicts = df_active.to_dicts()
total_batches = (
(total_products + batch_size - 1) // batch_size
)
for batch_start in range(0, total_products, batch_size):
batch_end = min(batch_start + batch_size, total_products)
batch_rows = rows_as_dicts[batch_start:batch_end]
batch_number = (batch_start // batch_size) + 1
logger.info(
f"Batch {batch_number}/{total_batches}: "
f"Produkte {batch_start + 1:,} – {batch_end:,}"
)
# Batch-Einträge erstellen
batch_entries = []
for idx, row in enumerate(batch_rows):
product_payload = build_product_payload(
row, column_mapping, settings
)
batch_entries.append({
"batchId": batch_start + idx,
"merchantId": merchant_id,
"method": "insert",
"product": product_payload,
})
# Batch-Request ausführen (mit Retry/Backoff)
try:
response = execute_batch_request(
service, merchant_id, batch_entries
)
# Antwort auswerten
for entry in response.get("entries", []):
if (
"errors" in entry
and entry["errors"].get("errors")
):
stats["errors"] += 1
batch_idx = entry.get("batchId", 0)
local_idx = batch_idx - batch_start
offer_id = "unbekannt"
if 0 <= local_idx < len(batch_rows):
offer_id = batch_rows[local_idx].get(
"Artikelnummer", "unbekannt"
)
error_info = {
"batchId": batch_idx,
"offerId": offer_id,
"errors": entry["errors"]["errors"],
}
stats["error_details"].append(error_info)
logger.warning(
f"Fehler bei Produkt {offer_id}: "
f"{entry['errors']['errors']}"
)
else:
stats["success"] += 1
except HttpError as e:
if e.resp.status == 429:
logger.warning(
f"Rate Limit erreicht bei Batch "
f"{batch_number}. Erhöhe Wartezeit..."
)
time.sleep(rate_limit_delay * 5)
else:
logger.error(
f"HTTP-Fehler bei Batch "
f"{batch_number}: {e}"
)
stats["errors"] += len(batch_rows)
except Exception as e:
logger.error(
f"Unerwarteter Fehler bei Batch "
f"{batch_number}: {e}"
)
stats["errors"] += len(batch_rows)
stats["batches"] += 1
# ── Rate Limiting: Pause zwischen Batches ─────────
if batch_end < total_products:
logger.debug(
f"Rate Limiting: {rate_limit_delay}s Pause..."
)
time.sleep(rate_limit_delay)
# ── Upload-Statistiken loggen ─────────────────────────
stats["end_time"] = datetime.now().isoformat()
stats["duration_seconds"] = (
datetime.fromisoformat(stats["end_time"])
- datetime.fromisoformat(stats["start_time"])
).total_seconds()
logger.info(
f"Upload abgeschlossen: "
f"{stats['success']:,} erfolgreich | "
f"{stats['errors']:,} Fehler | "
f"{stats['batches']} Batches | "
f"{stats['duration_seconds']:.1f}s Gesamtdauer"
)
# ── Fehlerdetails als JSON speichern ──────────────────
if stats["error_details"]:
error_dir = LOG_DIR / "upload_errors"
error_dir.mkdir(parents=True, exist_ok=True)
error_file = (
error_dir / f"errors_{get_date_stamp()}.json"
)
with open(error_file, "w", encoding="utf-8") as f:
json.dump(
stats["error_details"],
f,
indent=2,
ensure_ascii=False,
)
logger.info(f"Fehlerdetails gespeichert: {error_file}")
return stats
⚠️ Hinweis: Dies ist ein konzeptionelles Beispiel-Skript. Die Batch-Größe, Rate-Limiting-Werte und API-Payloads müssen an Ihre spezifischen API-Quotas und Produktdatenstrukturen angepasst werden. Testen Sie den Upload zunächst mit einer kleinen Teilmenge (z. B. 10 Produkte) im Testmodus der Google Content API, bevor Sie den vollständigen Datensatz hochladen.
6.5 Hauptskript (main.py)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
/opt/etl-pipeline/scripts/main.py
Hauptskript (Entry Point) für die ETL-Pipeline.
Orchestriert den gesamten Ablauf: Extract → Transform → Load.
Aufruf:
cd /opt/etl-pipeline
source venv/bin/activate
python -m scripts.main
"""
import sys
import time
import duckdb
from pathlib import Path
from scripts.utils import (
setup_logging,
load_yaml_config,
BASE_DIR,
DATA_DIR,
get_timestamp,
)
from scripts.extract import (
load_jtl_csv_to_duckdb,
fetch_google_sales_data,
)
from scripts.transform import (
join_products_with_sales,
add_calculated_fields,
apply_rules,
)
from scripts.load import upload_products_to_google
logger = setup_logging("main")
def run_pipeline() -> dict:
"""
Führt die vollständige ETL-Pipeline aus.
Returns:
Dictionary mit Statistiken aller drei Phasen.
Raises:
SystemExit: Bei kritischen Fehlern mit Exit-Code 1.
"""
pipeline_start = time.time()
run_id = get_timestamp()
logger.info(f"{'═' * 60}")
logger.info(f"ETL-Pipeline gestartet — Run-ID: {run_id}")
logger.info(f"{'═' * 60}")
stats = {
"run_id": run_id,
"extract": {},
"transform": {},
"load": {},
}
# ══════════════════════════════════════════════════════
# DuckDB-Verbindung herstellen
# ══════════════════════════════════════════════════════
db_path = DATA_DIR / "db" / "products.duckdb"
db_path.parent.mkdir(parents=True, exist_ok=True)
con = duckdb.connect(str(db_path))
logger.info(f"DuckDB-Verbindung hergestellt: {db_path}")
try:
# ══════════════════════════════════════════════════
# PHASE 1: EXTRACT
# ══════════════════════════════════════════════════
logger.info(f"{'─' * 40}")
logger.info("PHASE 1: EXTRACT")
logger.info(f"{'─' * 40}")
# JTL CSV laden
jtl_rows = load_jtl_csv_to_duckdb(con)
stats["extract"]["jtl_rows"] = jtl_rows
# Google Sales-Daten abrufen
sales_rows = fetch_google_sales_data(con)
stats["extract"]["sales_rows"] = sales_rows
# ══════════════════════════════════════════════════
# PHASE 2: TRANSFORM
# ══════════════════════════════════════════════════
logger.info(f"{'─' * 40}")
logger.info("PHASE 2: TRANSFORM")
logger.info(f"{'─' * 40}")
# LEFT JOIN: Produkte ⟕ Sales
df = join_products_with_sales(con)
# Berechnete Felder hinzufügen
df = add_calculated_fields(df)
# YAML-Regeln anwenden
rules_config = load_yaml_config("rules.yaml")
df = apply_rules(df, rules_config)
stats["transform"]["total_products"] = df.height
stats["transform"]["active_products"] = df.filter(
~df["_excluded"]
).height
stats["transform"]["excluded_products"] = df.filter(
df["_excluded"]
).height
# ══════════════════════════════════════════════════
# PHASE 3: LOAD
# ══════════════════════════════════════════════════
logger.info(f"{'─' * 40}")
logger.info("PHASE 3: LOAD")
logger.info(f"{'─' * 40}")
upload_stats = upload_products_to_google(df)
stats["load"] = upload_stats
except Exception as e:
logger.critical(
f"Pipeline fehlgeschlagen: {e}", exc_info=True
)
# Exit-Code 1 signalisiert n8n einen Fehler
sys.exit(1)
finally:
con.close()
logger.info("DuckDB-Verbindung geschlossen.")
# ══════════════════════════════════════════════════════
# Zusammenfassung
# ══════════════════════════════════════════════════════
pipeline_duration = time.time() - pipeline_start
stats["duration_seconds"] = round(pipeline_duration, 2)
logger.info(f"{'═' * 60}")
logger.info(
f"ETL-Pipeline abgeschlossen — "
f"Dauer: {pipeline_duration:.1f}s"
)
logger.info(
f" Extract: "
f"{stats['extract'].get('jtl_rows', 0):,} JTL, "
f"{stats['extract'].get('sales_rows', 0):,} Sales"
)
logger.info(
f" Transform: "
f"{stats['transform'].get('active_products', 0):,} aktiv, "
f"{stats['transform'].get('excluded_products', 0):,} "
f"ausgeschlossen"
)
logger.info(
f" Load: "
f"{stats['load'].get('success', 0):,} OK, "
f"{stats['load'].get('errors', 0):,} Fehler"
)
logger.info(f"{'═' * 60}")
return stats
# ──────────────────────────────────────────────────────────
# Entry Point
# ──────────────────────────────────────────────────────────
if __name__ == "__main__":
run_pipeline()
⚠️ Hinweis: Dies ist ein konzeptionelles Beispiel-Skript. Es dient als Ausgangspunkt und Entry Point für die Pipeline. Vor dem produktiven Einsatz sollten alle Module gründlich mit Testdaten validiert und die Fehlerbehandlung an Ihre Anforderungen angepasst werden.
7. n8n Workflow Integration
7.1 Übersicht
Der n8n Workflow übernimmt zwei zentrale Aufgaben:
- Scheduling: Tägliche Ausführung der Python-Pipeline um 00:00 Uhr (Mitternacht).
- Error Handling: Benachrichtigung per E-Mail (oder Slack/Telegram) bei Fehlern.
Die Kommunikation zwischen n8n (Docker-Container) und der Python-Pipeline auf dem Host-System erfolgt über den n8n-Node Execute Command, der Shell-Befehle im Container ausführt. Da das Pipeline-Verzeichnis als Volume gemountet ist, hat n8n Zugriff auf die Skripte.
7.2 Workflow-Architektur
graph TD
CRON[Cron Trigger - 00:00 Uhr]
EXEC[Execute Command - python -m scripts.main]
CHECK{Exit-Code pruefen}
SUCCESS[Erfolgs-Meldung - Log + E-Mail]
ERROR[Fehler-Alarm - E-Mail / Slack]
CRON --> EXEC
EXEC --> CHECK
CHECK -->|Exit 0 - OK| SUCCESS
CHECK -->|Exit 1 - Fehler| ERROR
style CRON fill:#e58e26,stroke:#fa983a,color:#fff
style EXEC fill:#0c2461,stroke:#1e3799,color:#fff
style CHECK fill:#474787,stroke:#706fd3,color:#fff
style SUCCESS fill:#079992,stroke:#38ada9,color:#fff
style ERROR fill:#b71540,stroke:#e55039,color:#fff
7.3 Workflow als n8n-JSON importieren
Der folgende JSON-Export kann direkt in n8n importiert werden unter Workflow → Import from File. Speichern Sie den Inhalt als .json-Datei.
{
"name": "ETL Pipeline - Google Shopping (Daily)",
"nodes": [
{
"parameters": {
"rule": {
"interval": [
{
"triggerAtHour": 0,
"triggerAtMinute": 0
}
]
}
},
"name": "Cron Trigger (00:00)",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1.2,
"position": [240, 300]
},
{
"parameters": {
"command": "cd /opt/etl-pipeline && source venv/bin/activate && python -m scripts.main 2>&1",
"timeout": 3600
},
"name": "Execute ETL Pipeline",
"type": "n8n-nodes-base.executeCommand",
"typeVersion": 1,
"position": [480, 300]
},
{
"parameters": {
"conditions": {
"number": [
{
"value1": "={{ $json.exitCode }}",
"operation": "equal",
"value2": 0
}
]
}
},
"name": "Check Exit Code",
"type": "n8n-nodes-base.if",
"typeVersion": 1,
"position": [720, 300]
},
{
"parameters": {
"fromEmail": "etl-pipeline@ihre-domain.de",
"toEmail": "admin@ihre-domain.de",
"subject": "✅ ETL Pipeline — Erfolg ({{ $now.format('yyyy-MM-dd') }})",
"text": "Die ETL-Pipeline wurde erfolgreich ausgeführt.\n\nZeitstempel: {{ $now.format('yyyy-MM-dd HH:mm:ss') }}\n\nStdout:\n{{ $json.stdout }}"
},
"name": "Success E-Mail",
"type": "n8n-nodes-base.emailSend",
"typeVersion": 2,
"position": [960, 200]
},
{
"parameters": {
"fromEmail": "etl-pipeline@ihre-domain.de",
"toEmail": "admin@ihre-domain.de",
"subject": "❌ ETL Pipeline — FEHLER ({{ $now.format('yyyy-MM-dd') }})",
"text": "ACHTUNG: Die ETL-Pipeline ist fehlgeschlagen!\n\nZeitstempel: {{ $now.format('yyyy-MM-dd HH:mm:ss') }}\nExit-Code: {{ $json.exitCode }}\n\nStderr:\n{{ $json.stderr }}\n\nStdout:\n{{ $json.stdout }}\n\nBitte prüfen: /opt/etl-pipeline/logs/error.log"
},
"name": "Error E-Mail",
"type": "n8n-nodes-base.emailSend",
"typeVersion": 2,
"position": [960, 400]
}
],
"connections": {
"Cron Trigger (00:00)": {
"main": [
[{ "node": "Execute ETL Pipeline", "type": "main", "index": 0 }]
]
},
"Execute ETL Pipeline": {
"main": [
[{ "node": "Check Exit Code", "type": "main", "index": 0 }]
]
},
"Check Exit Code": {
"main": [
[{ "node": "Success E-Mail", "type": "main", "index": 0 }],
[{ "node": "Error E-Mail", "type": "main", "index": 0 }]
]
}
},
"settings": {
"timezone": "Europe/Berlin",
"saveExecutionProgress": true,
"saveManualExecutions": true,
"executionTimeout": 7200
}
}
⚠️ Hinweis: Dies ist ein Beispiel-Workflow für n8n. Die E-Mail-Adressen, SMTP-Einstellungen und Timeouts müssen an Ihre Umgebung angepasst werden. Konfigurieren Sie die SMTP-Credentials in n8n unter Settings → Credentials bevor Sie den Workflow aktivieren.
7.4 Wichtige n8n-Konfigurationshinweise
Timeout-Einstellung
Da die Verarbeitung von 200.000+ Artikeln je nach Serverhardware zwischen 5 und 30 Minuten dauern kann, muss der Timeout im Execute Command-Node ausreichend hoch gesetzt werden:
timeout: 3600 # 1 Stunde (in Sekunden)
Volume-Mount beachten
Der Execute Command-Node führt Befehle innerhalb des n8n Docker-Containers aus. Damit die Python-Skripte und das Virtual Environment erreichbar sind, muss der Pfad /opt/etl-pipeline als Volume im docker-compose.yml gemountet sein (siehe Abschnitt 3.1.2).
Alternative: Host-Befehl über SSH
Falls die Pipeline nicht innerhalb des Containers laufen soll, kann alternativ ein SSH-Node verwendet werden, der den Befehl auf dem Host ausführt:
# Befehl für n8n SSH-Node:
ssh pipeline-user@localhost \
"cd /opt/etl-pipeline && source venv/bin/activate && python -m scripts.main"
⚠️ Hinweis: Beispielbefehl. Für die SSH-Variante muss ein SSH-Key ohne Passphrase konfiguriert und die SSH-Credentials in n8n hinterlegt werden.
7.5 Manueller Pipeline-Start (Debugging)
Für Debugging und Tests kann die Pipeline jederzeit manuell gestartet werden:
# ──────────────────────────────────────────────────────────
# Pipeline manuell ausführen
# ──────────────────────────────────────────────────────────
cd /opt/etl-pipeline
source venv/bin/activate
python -m scripts.main
# Alternativ: Nur den Exit-Code prüfen
python -m scripts.main; echo "Exit-Code: $?"
⚠️ Hinweis: Beispielbefehle für den manuellen Start. Stellen Sie sicher, dass die Umgebungsvariablen (
GOOGLE_APPLICATION_CREDENTIALS,MERCHANT_CENTER_ID) in der Shell-Session gesetzt sind.
8. Wartung & Monitoring
8.1 Log-Management
8.1.1 Log-Struktur
Die Pipeline erzeugt folgende Log-Dateien:
| Datei | Inhalt | Rotation |
|---|---|---|
logs/etl_pipeline.log |
Alle Log-Einträge (INFO+) | 10 MB, 5 Backups |
logs/error.log |
Nur Fehler (ERROR+) | 10 MB, 5 Backups |
logs/upload_errors/errors_YYYYMMDD.json |
Detail-Fehler pro Upload-Lauf | Täglich, eine Datei |
8.1.2 Logrotate-Konfiguration
Zusätzlich zur integrierten Python-Rotation empfiehlt sich eine systemweite Logrotate-Konfiguration als Sicherheitsnetz.
Erstellen Sie die Datei /etc/logrotate.d/etl-pipeline:
/opt/etl-pipeline/logs/*.log {
daily
missingok
rotate 30
compress
delaycompress
notifempty
create 0640 root root
sharedscripts
postrotate
# Optional: Signal an laufende Prozesse
/bin/true
endscript
}
/opt/etl-pipeline/logs/upload_errors/*.json {
daily
missingok
rotate 90
compress
delaycompress
notifempty
create 0640 root root
}
⚠️ Hinweis: Dies ist eine Beispiel-Logrotate-Konfiguration. Passen Sie den Benutzer, die Aufbewahrungsdauer (
rotate) und die Dateiberechtigungen an Ihre Sicherheitsrichtlinien an.
8.1.3 Log-Bereinigung (Cron)
Alte Log-Dateien und Upload-Fehlerprotokolle können zusätzlich per Cron bereinigt werden:
# ──────────────────────────────────────────────────────────
# Crontab-Eintrag: Logs älter als 90 Tage löschen
# ──────────────────────────────────────────────────────────
# crontab -e
0 3 * * 0 find /opt/etl-pipeline/logs/ -name "*.log.*" -mtime +90 -delete
0 3 * * 0 find /opt/etl-pipeline/logs/upload_errors/ -name "*.json" -mtime +90 -delete
0 3 * * 0 find /opt/etl-pipeline/data/archive/ -name "*.csv" -mtime +180 -delete
⚠️ Hinweis: Beispiel-Crontab-Einträge. Passen Sie die Aufbewahrungsfristen (hier: 90 bzw. 180 Tage) an Ihre Compliance-Anforderungen an.
8.2 Performance-Tipps für 200.000+ Artikel
8.2.1 DuckDB-Optimierungen
# ──────────────────────────────────────────────────────────
# DuckDB Performance-Konfiguration
# (am Anfang der Pipeline setzen)
# ──────────────────────────────────────────────────────────
con = duckdb.connect("products.duckdb")
# Verfügbaren Arbeitsspeicher für DuckDB erhöhen
con.execute("SET memory_limit = '4GB'")
# Alle verfügbaren CPU-Kerne nutzen
con.execute("SET threads TO 8")
# Temporäre Dateien auf schnelle SSD legen
con.execute("SET temp_directory = '/opt/etl-pipeline/data/tmp'")
# Fortschrittsanzeige für lange Queries
con.execute("SET enable_progress_bar = true")
⚠️ Hinweis: Beispiel-Konfiguration. Die Werte für
memory_limitundthreadsmüssen an die tatsächliche Serverausstattung angepasst werden.
8.2.2 Polars-Optimierungen
# ──────────────────────────────────────────────────────────
# Polars Performance-Tipps
# ──────────────────────────────────────────────────────────
import polars as pl
# 1. Lazy Evaluation verwenden (bei sehr großen Datensätzen)
# Polars optimiert den Query-Plan automatisch
df_lazy = pl.scan_csv(
"data/input/jtl_export.csv", separator=";"
)
df_result = (
df_lazy
.filter(pl.col("Lagerbestand") > 0)
.with_columns([
pl.col("VK_Brutto").cast(pl.Float64),
])
.collect() # Erst hier werden die Daten materialisiert
)
# 2. Streaming-Modus für Datensätze > verfügbares RAM
df_result = (
df_lazy
.filter(pl.col("Lagerbestand") > 0)
.collect(streaming=True) # Verarbeitet in Chunks
)
# 3. Datentypen optimieren (reduziert RAM-Verbrauch)
df = df.with_columns([
pl.col("Lagerbestand").cast(pl.Int32), # statt Int64
pl.col("clicks").cast(pl.Int32), # statt Int64
pl.col("impressions").cast(pl.Int32), # statt Int64
pl.col("custom_label_0").cast(pl.Categorical), # statt Utf8
pl.col("custom_label_1").cast(pl.Categorical), # statt Utf8
])
# 4. Spaltenauswahl: Nur benötigte Spalten laden
df = pl.read_csv(
"data.csv",
columns=["sku", "title", "price", "stock"],
)
⚠️ Hinweis: Dies sind Beispiel-Codeausschnitte zur Performance-Optimierung. Die tatsächlichen Spaltennamen und Datentypen müssen an Ihren Datensatz angepasst werden.
8.2.3 Google API Upload-Optimierungen
| Parameter | Empfehlung | Begründung |
|---|---|---|
BATCH_SIZE |
1.000 | Google empfiehlt max. 1.000 Einträge pro Batch |
RATE_LIMIT_DELAY |
1,0 – 2,0 Sekunden | Verhindert 429-Fehler bei Standard-Quotas |
MAX_RETRIES |
5 | Ausreichend für temporäre Fehler |
| Parallele Requests | Nicht empfohlen | Erhöht 429-Risiko, sequenziell ist sicherer |
| Geschätzte Upload-Dauer | 10 – 20 Minuten | Bei 200 Batches × 1s Delay + API-Latenz |
8.2.4 Allgemeine Server-Monitoring-Befehle
# ──────────────────────────────────────────────────────────
# System-Monitoring während Pipeline-Lauf
# ──────────────────────────────────────────────────────────
# RAM- und CPU-Auslastung in Echtzeit beobachten
htop
# Festplatten-I/O überwachen
iostat -x 5
# Pipeline-Prozess gezielt überwachen
watch -n 2 "ps aux | grep 'python.*scripts.main' | grep -v grep"
# Festplattennutzung des Pipeline-Verzeichnisses
du -sh /opt/etl-pipeline/data/*
# DuckDB-Dateigröße prüfen
ls -lh /opt/etl-pipeline/data/db/products.duckdb
⚠️ Hinweis: Beispielbefehle für das System-Monitoring. Diese dienen zur Orientierung und sollten je nach Ihren Monitoring-Tools (z. B. Prometheus, Grafana, Zabbix) ergänzt oder ersetzt werden.
8.3 Backup-Empfehlungen
Die folgenden Dateien und Verzeichnisse sollten regelmäßig gesichert werden:
| Pfad | Priorität | Frequenz |
|---|---|---|
/opt/etl-pipeline/config/ |
Hoch | Bei Änderung |
/opt/etl-pipeline/credentials/ |
Kritisch | Bei Änderung |
/opt/etl-pipeline/scripts/ |
Hoch | Bei Änderung |
/opt/etl-pipeline/data/db/products.duckdb |
Mittel | Täglich |
/opt/n8n/data/ |
Hoch | Wöchentlich |
#!/bin/bash
# ──────────────────────────────────────────────────────────
# Einfaches Backup-Skript (Beispiel)
# ──────────────────────────────────────────────────────────
BACKUP_DIR="/backup/etl-pipeline/$(date +%Y%m%d)"
mkdir -p "$BACKUP_DIR"
# Konfiguration und Skripte sichern
tar czf "$BACKUP_DIR/config_scripts.tar.gz" \
/opt/etl-pipeline/config/ \
/opt/etl-pipeline/scripts/ \
/opt/etl-pipeline/requirements.txt
# DuckDB sichern
cp /opt/etl-pipeline/data/db/products.duckdb \
"$BACKUP_DIR/products.duckdb"
# n8n Workflows sichern
tar czf "$BACKUP_DIR/n8n_data.tar.gz" /opt/n8n/data/
echo "Backup erstellt: $BACKUP_DIR"
⚠️ Hinweis: Dies ist ein vereinfachtes Beispiel-Backup-Skript. Für den Produktionseinsatz sollte eine vollwertige Backup-Lösung (z. B. BorgBackup, Restic) mit Verschlüsselung und Off-Site-Speicherung verwendet werden.
8.4 Checkliste für den Produktivbetrieb
Vor der Aktivierung der automatisierten Pipeline sollten folgende Punkte abgearbeitet sein:
- Python Virtual Environment erstellt und alle Abhängigkeiten installiert
- DuckDB-Datenbank initialisiert und CSV-Import getestet
- Google Service Account erstellt und in Merchant Center eingeladen
GOOGLE_APPLICATION_CREDENTIALSundMERCHANT_CENTER_IDals Umgebungsvariablen gesetztrules.yamlmit initialen Regeln konfiguriertcolumn_mapping.yamlan JTL-Export-Spalten angepasst- Pipeline manuell mit Testdaten durchgelaufen (alle 3 Phasen)
- n8n Workflow importiert und Cron-Trigger konfiguriert
- E-Mail-/Slack-Benachrichtigung bei Fehlern getestet
- Logrotate-Konfiguration aktiv
- Backup-Strategie implementiert
- Firewall-Regeln: Ausgehender Traffic zu Google APIs erlaubt
- Monitoring-Dashboard eingerichtet (optional)
Anhang
A. Nützliche Befehle (Kurzreferenz)
# ── Pipeline ──────────────────────────────────────────────
cd /opt/etl-pipeline && source venv/bin/activate
python -m scripts.main # Pipeline starten
python -m scripts.main 2>&1 | tee run.log # Mit Log-Ausgabe
# ── n8n ───────────────────────────────────────────────────
cd /opt/n8n
docker compose up -d # Starten
docker compose down # Stoppen
docker compose logs -f --tail=100 # Logs anzeigen
docker compose restart # Neustart
# ── DuckDB CLI ────────────────────────────────────────────
cd /opt/etl-pipeline && source venv/bin/activate
python -c "
import duckdb
con = duckdb.connect('data/db/products.duckdb')
print(con.execute('SHOW TABLES').fetchall())
print(con.execute('SELECT COUNT(*) FROM products_raw').fetchone())
con.close()
"
# ── Logs prüfen ──────────────────────────────────────────
tail -f /opt/etl-pipeline/logs/etl_pipeline.log
tail -100 /opt/etl-pipeline/logs/error.log
cat /opt/etl-pipeline/logs/upload_errors/errors_$(date +%Y%m%d).json | jq .
# ── Disk-Usage ────────────────────────────────────────────
du -sh /opt/etl-pipeline/data/db/*
du -sh /opt/etl-pipeline/logs/*
df -h /opt/etl-pipeline/
⚠️ Hinweis: Beispiel-Befehlsreferenz. Alle Pfade und Befehle müssen an Ihre Serverumgebung angepasst werden.
B. Fehlerbehandlung (Häufige Probleme)
| Problem | Mögliche Ursache | Lösung |
|---|---|---|
FileNotFoundError: jtl_export.csv |
CSV nicht am erwarteten Pfad | JTL-Export-Pfad in settings.yaml prüfen |
google.auth.exceptions.DefaultCredentialsError |
Credentials nicht gefunden | GOOGLE_APPLICATION_CREDENTIALS prüfen |
HttpError 429: Too Many Requests |
API-Quote überschritten | RATE_LIMIT_DELAY erhöhen (z. B. auf 2–3s) |
MemoryError bei Polars |
RAM nicht ausreichend | Polars Streaming-Modus oder RAM aufstocken |
| n8n: Timeout nach 600s | Pipeline dauert zu lang | Timeout im Execute Command auf 3600s erhöhen |
DuckDB: IO Error |
Festplatte voll | Archiv- und Temp-Dateien bereinigen |
ModuleNotFoundError |
venv nicht aktiviert | source venv/bin/activate vor Ausführung |
Dokumentation erstellt am 2026-03-06 | Version 1.0.0 Letzte Aktualisierung: 2026-03-06