2838 lines
108 KiB
Markdown
2838 lines
108 KiB
Markdown
---
|
||
title: Channable Stack LT24 Profice Dokumentation
|
||
description:
|
||
published: true
|
||
date: 2026-03-06T11:13:13.929Z
|
||
tags:
|
||
editor: markdown
|
||
dateCreated: 2026-03-06T11:09:09.239Z
|
||
---
|
||
|
||
# Self-Hosted ETL-Pipeline für Google Shopping
|
||
|
||
## Lokale Alternative zu Channable — Technische Dokumentation
|
||
|
||
---
|
||
|
||
| Eigenschaft | Wert |
|
||
|--------------------|------------------------------------------------|
|
||
| **Dokumentversion** | 1.0.0 |
|
||
| **Erstellt am** | 2026-03-06 |
|
||
| **Zielplattform** | Debian 12 (Bookworm) Server |
|
||
| **Tech-Stack** | Python 3.11+ · Polars · DuckDB · n8n (Docker) |
|
||
| **Datensatzgröße** | 200.000+ Artikel, 30+ Attribute pro Artikel |
|
||
| **Ziel-API** | Google Content API for Shopping |
|
||
| **Sprache** | Deutsch |
|
||
|
||
---
|
||
|
||
## Inhaltsverzeichnis
|
||
|
||
1. [Systemübersicht](#1-systemübersicht)
|
||
2. [Voraussetzungen](#2-voraussetzungen)
|
||
3. [Installation & Setup](#3-installation--setup)
|
||
4. [Projektstruktur](#4-projektstruktur)
|
||
5. [Konfiguration](#5-konfiguration)
|
||
6. [Core Script Logic (Beispiel)](#6-core-script-logic-beispiel)
|
||
7. [n8n Workflow Integration](#7-n8n-workflow-integration)
|
||
8. [Wartung & Monitoring](#8-wartung--monitoring)
|
||
|
||
---
|
||
|
||
# 1. Systemübersicht
|
||
|
||
## 1.1 Architektur-Überblick
|
||
|
||
Die Pipeline ersetzt den kommerziellen SaaS-Dienst **Channable** durch eine vollständig selbst gehostete, Open-Source-basierte Lösung auf einem dedizierten Debian-Server. Der gesamte Datenfluss folgt dem klassischen **ETL-Muster** (Extract → Transform → Load) und wird täglich automatisiert durch **n8n** orchestriert.
|
||
|
||
### Architektur-Diagramm
|
||
|
||
```mermaid
|
||
graph TB
|
||
subgraph N8N [n8n - Docker Container]
|
||
CRON[Cron-Trigger - 00:00 Uhr]
|
||
ERR_HANDLER[Error-Handler - E-Mail / Slack]
|
||
end
|
||
|
||
subgraph EXTRACT [EXTRACT - Datenquellen]
|
||
JTL[JTL-WAWI CSV-Export]
|
||
GAPI[Google Ads/GA API - 90 Tage]
|
||
end
|
||
|
||
subgraph TRANSFORM [TRANSFORM - Verarbeitung]
|
||
DUCKDB_JOIN[DuckDB - LEFT JOIN]
|
||
POLARS[Polars Regel-Engine - rules.yaml]
|
||
end
|
||
|
||
subgraph LOAD [LOAD - Upload]
|
||
GCONTENT[Google Content API - Batch-Upload]
|
||
end
|
||
|
||
DUCKDB_FILE[(DuckDB - products.duckdb)]
|
||
LOGS[Logs und Monitoring]
|
||
|
||
CRON -->|startet| JTL
|
||
CRON -->|startet| GAPI
|
||
JTL -->|CSV Import| DUCKDB_JOIN
|
||
GAPI -->|Sales-Daten| DUCKDB_JOIN
|
||
DUCKDB_JOIN -->|DataFrame| POLARS
|
||
POLARS -->|transformierte Daten| GCONTENT
|
||
DUCKDB_JOIN -.->|persistiert| DUCKDB_FILE
|
||
ERR_HANDLER -.->|bei Fehler| LOGS
|
||
|
||
style N8N fill:#2d3436,stroke:#636e72,color:#dfe6e9
|
||
style EXTRACT fill:#0a3d62,stroke:#3c6382,color:#dfe6e9
|
||
style TRANSFORM fill:#1e3799,stroke:#4a69bd,color:#dfe6e9
|
||
style LOAD fill:#6a0572,stroke:#a834a8,color:#dfe6e9
|
||
style JTL fill:#079992,stroke:#38ada9,color:#fff
|
||
style GAPI fill:#079992,stroke:#38ada9,color:#fff
|
||
style DUCKDB_JOIN fill:#0c2461,stroke:#1e3799,color:#fff
|
||
style POLARS fill:#0c2461,stroke:#1e3799,color:#fff
|
||
style GCONTENT fill:#6a0572,stroke:#a834a8,color:#fff
|
||
style DUCKDB_FILE fill:#b71540,stroke:#e55039,color:#fff
|
||
style LOGS fill:#474787,stroke:#706fd3,color:#fff
|
||
style CRON fill:#e58e26,stroke:#fa983a,color:#fff
|
||
style ERR_HANDLER fill:#e55039,stroke:#eb2f06,color:#fff
|
||
```
|
||
|
||
### ETL-Datenfluss (vereinfacht)
|
||
|
||
```mermaid
|
||
graph LR
|
||
A[JTL CSV - 200k+ Artikel] --> C[DuckDB - LEFT JOIN]
|
||
B[Google API - Sales 90 Tage] --> C
|
||
C --> D[Polars - rules.yaml]
|
||
D --> E[Google Content API - Batch-Upload]
|
||
|
||
style A fill:#079992,stroke:#38ada9,color:#fff
|
||
style B fill:#079992,stroke:#38ada9,color:#fff
|
||
style C fill:#0c2461,stroke:#1e3799,color:#fff
|
||
style D fill:#1e3799,stroke:#4a69bd,color:#fff
|
||
style E fill:#6a0572,stroke:#a834a8,color:#fff
|
||
```
|
||
|
||
## 1.2 Datenfluss im Detail
|
||
|
||
Der Datenfluss gliedert sich in drei klar voneinander getrennte Phasen:
|
||
|
||
### Phase 1 — Extract (Datenextraktion)
|
||
|
||
In der Extract-Phase werden die Rohdaten aus zwei unabhängigen Quellen bezogen:
|
||
|
||
**Quelle A — JTL-WAWI CSV-Export:** Die ERP-Software JTL-WAWI exportiert den vollständigen Produktkatalog als CSV-Datei. Diese Datei enthält alle Stammdaten wie Artikelnummer, Titel, Beschreibung, Preis, EAN, Bestand, Kategorie, Bilder-URLs und weitere produktspezifische Attribute. Der Export wird automatisch durch einen JTL-Worker oder manuell über den JTL-Ameise-Export auf dem Server abgelegt.
|
||
|
||
**Quelle B — Google Ads/Analytics Reporting API:** Über die Google API werden die Verkaufsperformance-Daten der letzten 90 Tage abgerufen. Dazu gehören Metriken wie Verkaufsanzahl pro Artikel (`sales_quantity`), Umsatz pro Artikel (`revenue`), Klicks, Impressionen und Conversion-Rate. Diese Daten dienen als Grundlage für die regelbasierte Klassifizierung (z. B. „Bestseller"-Labels).
|
||
|
||
Beide Datenquellen werden als separate Tabellen in **DuckDB** importiert.
|
||
|
||
### Phase 2 — Transform (Datentransformation)
|
||
|
||
In der Transform-Phase werden die Daten verknüpft und angereichert:
|
||
|
||
Zunächst wird in **DuckDB** ein `LEFT JOIN` zwischen der Produktstammdaten-Tabelle und der Verkaufsdaten-Tabelle durchgeführt. Die Verknüpfung erfolgt über eine gemeinsame Artikelnummer (z. B. `sku` oder `offer_id`). Das Ergebnis ist eine konsolidierte Tabelle, die sowohl Stamm- als auch Performance-Daten enthält.
|
||
|
||
Der resultierende DataFrame wird anschließend in **Polars** geladen. Hier werden regelbasierte Transformationen auf Grundlage der Datei `rules.yaml` angewandt. Typische Transformationen umfassen:
|
||
|
||
- Zuweisung von Custom Labels (`custom_label_0` bis `custom_label_4`) basierend auf Verkaufszahlen, Marge oder Kategorie.
|
||
- Preisanpassungen und Sale-Preis-Logik.
|
||
- Titeloptimierung (z. B. Hinzufügen von Marke oder Farbe).
|
||
- Filterung von Artikeln, die nicht auf Google Shopping erscheinen sollen (z. B. Bestand = 0).
|
||
- Anreicherung mit berechneten Feldern (z. B. `margin_percent`, `performance_tier`).
|
||
|
||
### Phase 3 — Load (Daten-Upload)
|
||
|
||
Die transformierten Produktdaten werden über die **Google Content API for Shopping** als Batch-Request an das Google Merchant Center übertragen. Der Upload-Prozess berücksichtigt dabei die API-Quotas und implementiert ein Rate-Limiting mit exponentiellem Backoff, um `429 Too Many Requests`-Fehler zu vermeiden. Fehlerhafte Einzelprodukte werden in einer separaten Fehler-Log-Datei protokolliert und blockieren nicht den gesamten Upload.
|
||
|
||
---
|
||
|
||
# 2. Voraussetzungen
|
||
|
||
## 2.1 Hardware-Anforderungen
|
||
|
||
Die folgenden Spezifikationen gelten für die zuverlässige Verarbeitung von **200.000+ Artikeln** mit jeweils **30+ Attributen**. Die Werte sind konservativ kalkuliert und berücksichtigen die parallele Ausführung von n8n (Docker) und der Python-Pipeline.
|
||
|
||
| Ressource | Minimum | Empfohlen | Begründung |
|
||
|-------------------|----------------------|------------------------|--------------------------------------------------------------|
|
||
| **CPU** | 4 Kerne (x86_64) | 8+ Kerne (x86_64) | Polars nutzt Multi-Threading automatisch aus |
|
||
| **RAM** | 8 GB | 16 GB | DuckDB + Polars halten Daten im Arbeitsspeicher |
|
||
| **Festplatte** | 40 GB SSD | 100 GB NVMe SSD | DuckDB-Dateien + CSV-Exporte + Logs + Docker-Images |
|
||
| **Netzwerk** | 100 Mbit/s | 1 Gbit/s | Google API Batch-Uploads erfordern stabile Bandbreite |
|
||
| **OS** | Debian 11 (Bullseye) | Debian 12 (Bookworm) | Aktuelle LTS-Basis mit langfristigem Sicherheits-Support |
|
||
|
||
### Speicherberechnung (Orientierungswerte)
|
||
|
||
Für eine Kalkulation mit 200.000 Zeilen × 30 Spalten (durchschnittlich ~100 Bytes pro Zelle):
|
||
|
||
| Komponente | Geschätzter Bedarf |
|
||
|--------------------------------------|-------------------------|
|
||
| Rohdaten CSV auf Disk | ca. 550 – 650 MB |
|
||
| DuckDB komprimiert (auf Disk) | ca. 150 – 250 MB |
|
||
| Polars DataFrame im RAM | ca. 800 MB – 1,2 GB |
|
||
| Peak Memory (JOIN + Transformation) | ca. 2,5 – 4 GB |
|
||
| n8n Docker Container | ca. 300 – 500 MB RAM |
|
||
| **Gesamt Peak-RAM** | **ca. 4 – 6 GB** |
|
||
|
||
> **Empfehlung:** Bei Datensätzen über 500.000 Artikeln sollte auf 32 GB RAM und Streaming-Verarbeitung (Polars Lazy-Mode) umgestellt werden.
|
||
|
||
## 2.2 Erforderliche Debian-Pakete
|
||
|
||
Die folgenden Pakete müssen auf dem Debian-Server installiert sein, bevor die Pipeline eingerichtet wird.
|
||
|
||
```bash
|
||
# ──────────────────────────────────────────────────────────
|
||
# System-Updates durchführen
|
||
# ──────────────────────────────────────────────────────────
|
||
sudo apt update && sudo apt upgrade -y
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# Grundlegende Build-Tools und System-Abhängigkeiten
|
||
# ──────────────────────────────────────────────────────────
|
||
sudo apt install -y \
|
||
build-essential \
|
||
python3.11 \
|
||
python3.11-venv \
|
||
python3.11-dev \
|
||
python3-pip \
|
||
curl \
|
||
wget \
|
||
git \
|
||
jq \
|
||
unzip \
|
||
ca-certificates \
|
||
gnupg \
|
||
lsb-release \
|
||
cron \
|
||
logrotate \
|
||
htop \
|
||
tree
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies sind Beispielbefehle. Falls Python 3.11 nicht in den Standard-Repositories Ihrer Debian-Version verfügbar ist, muss es über das `deadsnakes`-PPA, einen Quellcode-Build oder das Paket `python3` (in Debian 12 standardmäßig Python 3.11) installiert werden. Passen Sie die Versionsnummern an Ihre Umgebung an.
|
||
|
||
## 2.3 Software-Voraussetzungen
|
||
|
||
| Software | Mindestversion | Zweck |
|
||
|---------------------------------|----------------|---------------------------------------------|
|
||
| Python | 3.11+ | Hauptlaufzeit für ETL-Skripte |
|
||
| Docker Engine | 24.0+ | Container-Runtime für n8n |
|
||
| Docker Compose Plugin | 2.20+ | Multi-Container-Orchestrierung |
|
||
| n8n | 1.30+ | Workflow-Automatisierung und Scheduling |
|
||
| DuckDB (Python-Modul) | 0.10+ | Analytische In-Process-Datenbank |
|
||
| Polars (Python-Modul) | 0.20+ | Schnelle DataFrame-Transformationen |
|
||
| google-api-python-client | 2.100+ | Google API-Zugriff |
|
||
| google-auth / google-auth-oauthlib | 2.20+ | Authentifizierung für Google APIs |
|
||
| PyYAML | 6.0+ | Regel-Konfiguration im YAML-Format |
|
||
|
||
---
|
||
|
||
# 3. Installation & Setup
|
||
|
||
## 3.1 Docker und n8n einrichten
|
||
|
||
### 3.1.1 Docker Engine installieren
|
||
|
||
```bash
|
||
# ──────────────────────────────────────────────────────────
|
||
# Offizielle Docker GPG-Schlüssel hinzufügen
|
||
# ──────────────────────────────────────────────────────────
|
||
sudo install -m 0755 -d /etc/apt/keyrings
|
||
|
||
curl -fsSL https://download.docker.com/linux/debian/gpg | \
|
||
sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
|
||
|
||
sudo chmod a+r /etc/apt/keyrings/docker.gpg
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# Docker-Repository einrichten
|
||
# ──────────────────────────────────────────────────────────
|
||
echo \
|
||
"deb [arch=$(dpkg --print-architecture) \
|
||
signed-by=/etc/apt/keyrings/docker.gpg] \
|
||
https://download.docker.com/linux/debian \
|
||
$(. /etc/os-release && echo "$VERSION_CODENAME") stable" | \
|
||
sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# Docker Engine und Compose Plugin installieren
|
||
# ──────────────────────────────────────────────────────────
|
||
sudo apt update
|
||
sudo apt install -y \
|
||
docker-ce \
|
||
docker-ce-cli \
|
||
containerd.io \
|
||
docker-buildx-plugin \
|
||
docker-compose-plugin
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# Docker ohne sudo ermöglichen (Relogin erforderlich)
|
||
# ──────────────────────────────────────────────────────────
|
||
sudo usermod -aG docker $USER
|
||
newgrp docker
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# Installation verifizieren
|
||
# ──────────────────────────────────────────────────────────
|
||
docker --version
|
||
docker compose version
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies sind Beispielbefehle basierend auf der offiziellen Docker-Dokumentation für Debian. Prüfen Sie stets die aktuelle Installationsanleitung unter [docs.docker.com](https://docs.docker.com/engine/install/debian/).
|
||
|
||
### 3.1.2 n8n per Docker Compose bereitstellen
|
||
|
||
Erstellen Sie das Verzeichnis und die Konfiguration:
|
||
|
||
```bash
|
||
# ──────────────────────────────────────────────────────────
|
||
# Verzeichnisstruktur für n8n anlegen
|
||
# ──────────────────────────────────────────────────────────
|
||
sudo mkdir -p /opt/n8n/data
|
||
sudo chown -R $USER:$USER /opt/n8n
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Beispielbefehl. Passen Sie den Benutzer und die Berechtigungen an Ihre Server-Konfiguration an.
|
||
|
||
Erstellen Sie die Datei `/opt/n8n/.env` mit den Zugangsdaten:
|
||
|
||
```bash
|
||
# ──────────────────────────────────────────────────────────
|
||
# /opt/n8n/.env — Umgebungsvariablen für n8n
|
||
# ──────────────────────────────────────────────────────────
|
||
N8N_BASIC_AUTH_USER=admin
|
||
N8N_BASIC_AUTH_PASSWORD=IhrSicheresPasswortHier123!
|
||
N8N_ENCRYPTION_KEY=ein-langer-zufaelliger-encryption-key-hier
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies ist eine Beispiel-Konfiguration. Ersetzen Sie alle Platzhalter-Werte durch Ihre eigenen sicheren Passwörter und Schlüssel. Verwenden Sie `openssl rand -hex 32` zur Generierung sicherer Schlüssel.
|
||
|
||
Erstellen Sie die Datei `/opt/n8n/docker-compose.yml`:
|
||
|
||
```yaml
|
||
# ──────────────────────────────────────────────────────────
|
||
# /opt/n8n/docker-compose.yml — n8n Workflow-Engine
|
||
# ──────────────────────────────────────────────────────────
|
||
version: "3.8"
|
||
|
||
services:
|
||
n8n:
|
||
image: n8nio/n8n:latest
|
||
container_name: n8n
|
||
restart: unless-stopped
|
||
ports:
|
||
- "5678:5678"
|
||
environment:
|
||
- N8N_BASIC_AUTH_ACTIVE=true
|
||
- N8N_BASIC_AUTH_USER=${N8N_BASIC_AUTH_USER}
|
||
- N8N_BASIC_AUTH_PASSWORD=${N8N_BASIC_AUTH_PASSWORD}
|
||
- N8N_ENCRYPTION_KEY=${N8N_ENCRYPTION_KEY}
|
||
- GENERIC_TIMEZONE=Europe/Berlin
|
||
- TZ=Europe/Berlin
|
||
- N8N_LOG_LEVEL=info
|
||
- N8N_LOG_OUTPUT=console,file
|
||
- N8N_LOG_FILE_LOCATION=/home/node/.n8n/logs/n8n.log
|
||
- N8N_DIAGNOSTICS_ENABLED=false
|
||
- N8N_HIRING_BANNER_ENABLED=false
|
||
volumes:
|
||
# n8n-Daten persistent speichern
|
||
- /opt/n8n/data:/home/node/.n8n
|
||
# Zugriff auf ETL-Pipeline-Verzeichnis (Read-Only für Skripte)
|
||
- /opt/etl-pipeline:/opt/etl-pipeline:ro
|
||
# Zugriff auf ETL-Logs (Read-Write für Monitoring)
|
||
- /opt/etl-pipeline/logs:/opt/etl-pipeline/logs:rw
|
||
healthcheck:
|
||
test: ["CMD-SHELL", "wget -qO- http://localhost:5678/healthz || exit 1"]
|
||
interval: 30s
|
||
timeout: 10s
|
||
retries: 3
|
||
start_period: 30s
|
||
deploy:
|
||
resources:
|
||
limits:
|
||
memory: 1G
|
||
cpus: "2.0"
|
||
reservations:
|
||
memory: 256M
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies ist eine Beispiel-Konfiguration. Die Volume-Pfade, Ressourcen-Limits und Ports müssen an Ihre Server-Umgebung angepasst werden.
|
||
|
||
n8n starten und Status prüfen:
|
||
|
||
```bash
|
||
# ──────────────────────────────────────────────────────────
|
||
# n8n Container starten
|
||
# ──────────────────────────────────────────────────────────
|
||
cd /opt/n8n
|
||
docker compose up -d
|
||
|
||
# Status prüfen
|
||
docker compose ps
|
||
docker compose logs -f --tail=50
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Beispielbefehle. Nach dem Start ist n8n unter `http://<Server-IP>:5678` erreichbar.
|
||
|
||
## 3.2 Python-Umgebung einrichten
|
||
|
||
### 3.2.1 Virtual Environment erstellen
|
||
|
||
```bash
|
||
# ──────────────────────────────────────────────────────────
|
||
# Projektverzeichnis anlegen
|
||
# ──────────────────────────────────────────────────────────
|
||
sudo mkdir -p /opt/etl-pipeline
|
||
sudo chown -R $USER:$USER /opt/etl-pipeline
|
||
cd /opt/etl-pipeline
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# Python Virtual Environment erstellen und aktivieren
|
||
# ──────────────────────────────────────────────────────────
|
||
python3.11 -m venv venv
|
||
source venv/bin/activate
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# pip aktualisieren
|
||
# ──────────────────────────────────────────────────────────
|
||
pip install --upgrade pip setuptools wheel
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies sind Beispielbefehle. Stellen Sie sicher, dass `python3.11` auf Ihrem System korrekt installiert ist (siehe Abschnitt 2.2).
|
||
|
||
### 3.2.2 Abhängigkeiten installieren
|
||
|
||
Erstellen Sie die Datei `/opt/etl-pipeline/requirements.txt`:
|
||
|
||
```txt
|
||
# ──────────────────────────────────────────────────────────
|
||
# /opt/etl-pipeline/requirements.txt
|
||
# ETL-Pipeline Abhängigkeiten
|
||
# ──────────────────────────────────────────────────────────
|
||
|
||
# DataFrame-Engine für schnelle Transformationen
|
||
polars>=0.20.0,<2.0.0
|
||
|
||
# Analytische In-Process SQL-Datenbank
|
||
duckdb>=0.10.0,<2.0.0
|
||
|
||
# Google API Client Libraries
|
||
google-api-python-client>=2.100.0
|
||
google-auth>=2.20.0
|
||
google-auth-oauthlib>=1.2.0
|
||
google-auth-httplib2>=0.2.0
|
||
|
||
# Konfiguration
|
||
PyYAML>=6.0
|
||
|
||
# Utilities
|
||
requests>=2.31.0
|
||
python-dateutil>=2.8.0
|
||
tenacity>=8.2.0
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies ist eine Beispiel-Datei. Prüfen Sie die aktuellen Versionen der Pakete und passen Sie die Versionsbeschränkungen an Ihre Kompatibilitätsanforderungen an.
|
||
|
||
Abhängigkeiten installieren:
|
||
|
||
```bash
|
||
# ──────────────────────────────────────────────────────────
|
||
# Abhängigkeiten in das Virtual Environment installieren
|
||
# ──────────────────────────────────────────────────────────
|
||
cd /opt/etl-pipeline
|
||
source venv/bin/activate
|
||
pip install -r requirements.txt
|
||
|
||
# Installation verifizieren
|
||
python -c "import polars; print(f'Polars: {polars.__version__}')"
|
||
python -c "import duckdb; print(f'DuckDB: {duckdb.__version__}')"
|
||
python -c "import googleapiclient; print('Google API Client: OK')"
|
||
python -c "import yaml; print(f'PyYAML: {yaml.__version__}')"
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Beispielbefehle zur Verifikation. Die Ausgaben variieren je nach installierten Versionen.
|
||
|
||
---
|
||
|
||
# 4. Projektstruktur
|
||
|
||
## 4.1 Empfohlene Verzeichnisstruktur
|
||
|
||
Die folgende Struktur organisiert alle Komponenten der Pipeline in logisch getrennte Verzeichnisse. Diese Trennung erleichtert Wartung, Backups und Zugriffssteuerung.
|
||
|
||
```
|
||
/opt/etl-pipeline/
|
||
│
|
||
├── venv/ # Python Virtual Environment
|
||
│ └── ... # (von venv automatisch verwaltet)
|
||
│
|
||
├── requirements.txt # Python-Abhängigkeiten
|
||
│
|
||
├── scripts/ # Alle Python-Skripte
|
||
│ ├── __init__.py
|
||
│ ├── main.py # Hauptskript (Entry Point)
|
||
│ ├── extract.py # Modul: Datenextraktion (CSV + Google API)
|
||
│ ├── transform.py # Modul: DuckDB JOIN + Polars Transformationen
|
||
│ ├── load.py # Modul: Google Content API Upload
|
||
│ ├── rules_engine.py # Modul: YAML-Regel-Engine
|
||
│ └── utils.py # Hilfsfunktionen (Logging, Config-Laden)
|
||
│
|
||
├── config/ # Konfigurationsdateien
|
||
│ ├── rules.yaml # Channable-ähnliche Transformationsregeln
|
||
│ ├── settings.yaml # Allgemeine Pipeline-Einstellungen
|
||
│ └── column_mapping.yaml # Spalten-Mapping JTL → Google Shopping
|
||
│
|
||
├── credentials/ # Sensible Zugangsdaten (restriktive Rechte!)
|
||
│ ├── google_service_account.json # Google Service Account Key
|
||
│ └── .gitignore # NIEMALS in Git committen!
|
||
│
|
||
├── data/ # Datendateien
|
||
│ ├── input/ # Eingangsdaten
|
||
│ │ └── jtl_export.csv # Aktueller JTL-WAWI CSV-Export
|
||
│ ├── output/ # Verarbeitete Ausgabedaten
|
||
│ │ └── google_feed_YYYYMMDD.csv
|
||
│ ├── archive/ # Archivierte ältere Exporte
|
||
│ │ └── jtl_export_20260301.csv
|
||
│ └── db/ # DuckDB-Datenbankdateien
|
||
│ └── products.duckdb # Persistente DuckDB-Datenbank
|
||
│
|
||
├── logs/ # Log-Dateien
|
||
│ ├── etl_pipeline.log # Haupt-Log (rotating)
|
||
│ ├── etl_pipeline.log.1 # Rotiertes Log
|
||
│ ├── error.log # Nur Fehler-Einträge
|
||
│ └── upload_errors/ # Detaillierte Upload-Fehler pro Lauf
|
||
│ └── errors_20260306.json
|
||
│
|
||
├── tests/ # Unit- und Integrationstests
|
||
│ ├── __init__.py
|
||
│ ├── test_extract.py
|
||
│ ├── test_transform.py
|
||
│ └── test_rules_engine.py
|
||
│
|
||
└── docs/ # Zusätzliche Projektdokumentation
|
||
└── CHANGELOG.md
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies ist eine Beispiel-Struktur. Passen Sie die Verzeichnisstruktur an die spezifischen Anforderungen Ihres Projekts und Ihrer Organisation an.
|
||
|
||
## 4.2 Verzeichnisstruktur anlegen
|
||
|
||
```bash
|
||
# ──────────────────────────────────────────────────────────
|
||
# Vollständige Verzeichnisstruktur anlegen
|
||
# ──────────────────────────────────────────────────────────
|
||
cd /opt/etl-pipeline
|
||
|
||
mkdir -p scripts
|
||
mkdir -p config
|
||
mkdir -p credentials
|
||
mkdir -p data/{input,output,archive,db}
|
||
mkdir -p logs/upload_errors
|
||
mkdir -p tests
|
||
mkdir -p docs
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# Berechtigungen für sensible Verzeichnisse setzen
|
||
# ──────────────────────────────────────────────────────────
|
||
chmod 700 credentials/
|
||
chmod 750 config/
|
||
chmod 755 logs/
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# .gitignore für credentials anlegen
|
||
# ──────────────────────────────────────────────────────────
|
||
echo "*" > credentials/.gitignore
|
||
echo "!.gitignore" >> credentials/.gitignore
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# Python-Modul-Initialisierung
|
||
# ──────────────────────────────────────────────────────────
|
||
touch scripts/__init__.py
|
||
touch tests/__init__.py
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Beispielbefehle. Stellen Sie sicher, dass der Benutzer, unter dem die Pipeline läuft, die entsprechenden Lese- und Schreibrechte auf allen Verzeichnissen besitzt.
|
||
|
||
---
|
||
|
||
# 5. Konfiguration
|
||
|
||
## 5.1 Google API Credentials sicher speichern
|
||
|
||
### 5.1.1 Service Account erstellen
|
||
|
||
Für die Kommunikation mit der Google Content API for Shopping wird ein **Service Account** empfohlen. Dieser ermöglicht eine automatisierte Authentifizierung ohne manuellen OAuth2-Flow.
|
||
|
||
**Schritte in der Google Cloud Console:**
|
||
|
||
1. Navigieren Sie zur [Google Cloud Console](https://console.cloud.google.com/).
|
||
2. Erstellen Sie ein neues Projekt oder wählen Sie ein bestehendes aus.
|
||
3. Aktivieren Sie die folgenden APIs:
|
||
- *Content API for Shopping*
|
||
- *Google Ads API* (falls Verkaufsdaten direkt aus Google Ads bezogen werden)
|
||
- *Google Analytics Data API* (falls GA4-Daten verwendet werden)
|
||
4. Navigieren Sie zu **IAM & Verwaltung → Dienstkonten**.
|
||
5. Erstellen Sie einen neuen Service Account mit einem aussagekräftigen Namen.
|
||
6. Laden Sie den JSON-Key herunter.
|
||
7. Fügen Sie den Service Account als Nutzer in Ihrem **Google Merchant Center** hinzu (mit der Rolle „Standard" oder „Admin").
|
||
|
||
### 5.1.2 Credentials sicher ablegen
|
||
|
||
```bash
|
||
# ──────────────────────────────────────────────────────────
|
||
# Service Account JSON-Key sicher auf dem Server ablegen
|
||
# ──────────────────────────────────────────────────────────
|
||
|
||
# Datei in das credentials-Verzeichnis kopieren
|
||
cp /pfad/zur/heruntergeladenen/datei.json \
|
||
/opt/etl-pipeline/credentials/google_service_account.json
|
||
|
||
# Restriktive Berechtigungen setzen (nur Owner lesen)
|
||
chmod 600 /opt/etl-pipeline/credentials/google_service_account.json
|
||
|
||
# Eigentümerschaft prüfen
|
||
ls -la /opt/etl-pipeline/credentials/
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Beispielbefehle. Ersetzen Sie den Pfad durch den tatsächlichen Speicherort Ihrer heruntergeladenen JSON-Datei.
|
||
|
||
### 5.1.3 Credentials als Umgebungsvariable referenzieren (empfohlen)
|
||
|
||
Es ist empfehlenswert, den Pfad zur Credentials-Datei nicht hart im Skript zu kodieren, sondern über eine Umgebungsvariable bereitzustellen.
|
||
|
||
```bash
|
||
# ──────────────────────────────────────────────────────────
|
||
# In /opt/etl-pipeline/.env oder ~/.bashrc hinzufügen:
|
||
# ──────────────────────────────────────────────────────────
|
||
export GOOGLE_APPLICATION_CREDENTIALS="/opt/etl-pipeline/credentials/google_service_account.json"
|
||
export MERCHANT_CENTER_ID="123456789"
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Beispielkonfiguration. Ersetzen Sie die `MERCHANT_CENTER_ID` durch Ihre tatsächliche Merchant-Center-ID.
|
||
|
||
### 5.1.4 Sicherheitshinweise
|
||
|
||
- Die JSON-Datei darf **niemals** in ein Git-Repository committed werden.
|
||
- Der Zugriff auf das `credentials/`-Verzeichnis sollte auf den Pipeline-Benutzer beschränkt sein (`chmod 700`).
|
||
- Rotieren Sie Service Account Keys regelmäßig (mindestens alle 90 Tage).
|
||
- Verwenden Sie nach Möglichkeit **Workload Identity Federation** anstelle von heruntergeladenen JSON-Keys.
|
||
- Protokollieren Sie niemals den Inhalt der Credentials-Datei in Log-Dateien.
|
||
|
||
## 5.2 Regel-Konfiguration (rules.yaml)
|
||
|
||
Die Datei `rules.yaml` ist das Herzstück der Channable-ähnlichen Funktionalität. Sie definiert regelbasierte Transformationen, die auf jeden Artikel angewandt werden. Die Regeln werden in der definierten Reihenfolge sequenziell abgearbeitet.
|
||
|
||
### 5.2.1 Struktur und Syntax
|
||
|
||
Erstellen Sie die Datei `/opt/etl-pipeline/config/rules.yaml`:
|
||
|
||
```yaml
|
||
# ══════════════════════════════════════════════════════════
|
||
# /opt/etl-pipeline/config/rules.yaml
|
||
#
|
||
# Channable-ähnliche Regel-Engine für Google Shopping
|
||
# Regeln werden sequenziell von oben nach unten abgearbeitet.
|
||
#
|
||
# Unterstützte Operatoren:
|
||
# Vergleich: >, <, >=, <=, ==, !=
|
||
# Text: contains, not_contains, starts_with, ends_with
|
||
# Logik: and, or (innerhalb von conditions)
|
||
# Existenz: is_empty, is_not_empty
|
||
#
|
||
# Unterstützte Aktionen:
|
||
# set_value: Festes Wert setzen
|
||
# copy_field: Wert aus anderem Feld kopieren
|
||
# prepend: Text am Anfang hinzufügen
|
||
# append: Text am Ende hinzufügen
|
||
# replace: Text ersetzen
|
||
# calculate: Berechnung durchführen
|
||
# exclude: Artikel vom Feed ausschließen
|
||
# map_category: Kategorie-Mapping anwenden
|
||
# ══════════════════════════════════════════════════════════
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# Globale Einstellungen
|
||
# ──────────────────────────────────────────────────────────
|
||
settings:
|
||
feed_name: "Google Shopping DE"
|
||
target_country: "DE"
|
||
content_language: "de"
|
||
currency: "EUR"
|
||
# Artikel mit Bestand 0 automatisch ausschließen
|
||
exclude_out_of_stock: true
|
||
# Mindestpreis für den Feed (in EUR)
|
||
minimum_price: 1.00
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# Regel-Definitionen
|
||
# ──────────────────────────────────────────────────────────
|
||
rules:
|
||
|
||
# ────────────────────────────────────────────────────────
|
||
# REGEL 1: Bestseller-Label basierend auf Verkaufszahlen
|
||
# ────────────────────────────────────────────────────────
|
||
- name: "Bestseller-Klassifizierung"
|
||
description: "Artikel mit mehr als 100 Verkäufen in 90 Tagen als Bestseller markieren"
|
||
enabled: true
|
||
priority: 10
|
||
conditions:
|
||
logic: "and"
|
||
rules:
|
||
- field: "sales_quantity_90d"
|
||
operator: ">"
|
||
value: 100
|
||
- field: "stock_quantity"
|
||
operator: ">"
|
||
value: 0
|
||
actions:
|
||
- target_field: "custom_label_0"
|
||
action: "set_value"
|
||
value: "Bestseller"
|
||
|
||
# ────────────────────────────────────────────────────────
|
||
# REGEL 2: Slow Mover identifizieren
|
||
# ────────────────────────────────────────────────────────
|
||
- name: "Slow-Mover-Klassifizierung"
|
||
description: "Artikel mit weniger als 5 Verkäufen in 90 Tagen als Slow Mover markieren"
|
||
enabled: true
|
||
priority: 20
|
||
conditions:
|
||
logic: "and"
|
||
rules:
|
||
- field: "sales_quantity_90d"
|
||
operator: "<"
|
||
value: 5
|
||
- field: "days_in_catalog"
|
||
operator: ">"
|
||
value: 30
|
||
actions:
|
||
- target_field: "custom_label_0"
|
||
action: "set_value"
|
||
value: "Slow Mover"
|
||
|
||
# ────────────────────────────────────────────────────────
|
||
# REGEL 3: Performance-Tier (custom_label_1)
|
||
# ────────────────────────────────────────────────────────
|
||
- name: "Performance-Tier High"
|
||
description: "Umsatzbasierte Tier-Einteilung für Bidding-Strategien"
|
||
enabled: true
|
||
priority: 30
|
||
conditions:
|
||
logic: "and"
|
||
rules:
|
||
- field: "revenue_90d"
|
||
operator: ">="
|
||
value: 1000
|
||
actions:
|
||
- target_field: "custom_label_1"
|
||
action: "set_value"
|
||
value: "Tier-1-High-Revenue"
|
||
|
||
- name: "Performance-Tier Mittel"
|
||
enabled: true
|
||
priority: 31
|
||
conditions:
|
||
logic: "and"
|
||
rules:
|
||
- field: "revenue_90d"
|
||
operator: ">="
|
||
value: 200
|
||
- field: "revenue_90d"
|
||
operator: "<"
|
||
value: 1000
|
||
actions:
|
||
- target_field: "custom_label_1"
|
||
action: "set_value"
|
||
value: "Tier-2-Mid-Revenue"
|
||
|
||
- name: "Performance-Tier Niedrig"
|
||
enabled: true
|
||
priority: 32
|
||
conditions:
|
||
logic: "and"
|
||
rules:
|
||
- field: "revenue_90d"
|
||
operator: "<"
|
||
value: 200
|
||
actions:
|
||
- target_field: "custom_label_1"
|
||
action: "set_value"
|
||
value: "Tier-3-Low-Revenue"
|
||
|
||
# ────────────────────────────────────────────────────────
|
||
# REGEL 4: Margen-Klassifizierung (custom_label_2)
|
||
# ────────────────────────────────────────────────────────
|
||
- name: "Hohe Marge"
|
||
enabled: true
|
||
priority: 40
|
||
conditions:
|
||
logic: "and"
|
||
rules:
|
||
- field: "margin_percent"
|
||
operator: ">="
|
||
value: 40
|
||
actions:
|
||
- target_field: "custom_label_2"
|
||
action: "set_value"
|
||
value: "High-Margin"
|
||
|
||
- name: "Niedrige Marge"
|
||
enabled: true
|
||
priority: 41
|
||
conditions:
|
||
logic: "and"
|
||
rules:
|
||
- field: "margin_percent"
|
||
operator: "<"
|
||
value: 15
|
||
actions:
|
||
- target_field: "custom_label_2"
|
||
action: "set_value"
|
||
value: "Low-Margin"
|
||
|
||
# ────────────────────────────────────────────────────────
|
||
# REGEL 5: Preiskategorie (custom_label_3)
|
||
# ────────────────────────────────────────────────────────
|
||
- name: "Premium-Preis"
|
||
enabled: true
|
||
priority: 50
|
||
conditions:
|
||
logic: "and"
|
||
rules:
|
||
- field: "price"
|
||
operator: ">="
|
||
value: 100
|
||
actions:
|
||
- target_field: "custom_label_3"
|
||
action: "set_value"
|
||
value: "Premium"
|
||
|
||
- name: "Budget-Preis"
|
||
enabled: true
|
||
priority: 51
|
||
conditions:
|
||
logic: "and"
|
||
rules:
|
||
- field: "price"
|
||
operator: "<"
|
||
value: 20
|
||
actions:
|
||
- target_field: "custom_label_3"
|
||
action: "set_value"
|
||
value: "Budget"
|
||
|
||
# ────────────────────────────────────────────────────────
|
||
# REGEL 6: Titel-Optimierung
|
||
# ────────────────────────────────────────────────────────
|
||
- name: "Marke im Titel voranstellen"
|
||
description: "Markenname am Anfang des Titels hinzufügen, falls nicht vorhanden"
|
||
enabled: true
|
||
priority: 60
|
||
conditions:
|
||
logic: "and"
|
||
rules:
|
||
- field: "brand"
|
||
operator: "is_not_empty"
|
||
- field: "title"
|
||
operator: "not_contains"
|
||
value_from_field: "brand"
|
||
actions:
|
||
- target_field: "title"
|
||
action: "prepend"
|
||
value_template: "{brand} - "
|
||
|
||
# ────────────────────────────────────────────────────────
|
||
# REGEL 7: Sale-Preis-Logik
|
||
# ────────────────────────────────────────────────────────
|
||
- name: "Sale-Preis setzen"
|
||
description: "Wenn ein UVP vorhanden und höher als der VK-Preis ist, als Sale kennzeichnen"
|
||
enabled: true
|
||
priority: 70
|
||
conditions:
|
||
logic: "and"
|
||
rules:
|
||
- field: "uvp"
|
||
operator: "is_not_empty"
|
||
- field: "uvp"
|
||
operator: ">"
|
||
value_from_field: "price"
|
||
actions:
|
||
- target_field: "sale_price"
|
||
action: "copy_field"
|
||
source_field: "price"
|
||
- target_field: "price"
|
||
action: "copy_field"
|
||
source_field: "uvp"
|
||
|
||
# ────────────────────────────────────────────────────────
|
||
# REGEL 8: Ausschluss-Regeln
|
||
# ────────────────────────────────────────────────────────
|
||
- name: "Artikel ohne Bild ausschließen"
|
||
enabled: true
|
||
priority: 80
|
||
conditions:
|
||
logic: "or"
|
||
rules:
|
||
- field: "image_url"
|
||
operator: "is_empty"
|
||
- field: "image_url"
|
||
operator: "=="
|
||
value: ""
|
||
actions:
|
||
- action: "exclude"
|
||
reason: "Kein Produktbild vorhanden"
|
||
|
||
- name: "Testprodukte ausschließen"
|
||
enabled: true
|
||
priority: 81
|
||
conditions:
|
||
logic: "or"
|
||
rules:
|
||
- field: "title"
|
||
operator: "contains"
|
||
value: "TEST"
|
||
- field: "sku"
|
||
operator: "starts_with"
|
||
value: "ZZ-"
|
||
actions:
|
||
- action: "exclude"
|
||
reason: "Testprodukt erkannt"
|
||
|
||
# ────────────────────────────────────────────────────────
|
||
# REGEL 9: Versandkosten basierend auf Preis
|
||
# ────────────────────────────────────────────────────────
|
||
- name: "Kostenloser Versand ab 49 EUR"
|
||
enabled: true
|
||
priority: 90
|
||
conditions:
|
||
logic: "and"
|
||
rules:
|
||
- field: "price"
|
||
operator: ">="
|
||
value: 49
|
||
actions:
|
||
- target_field: "shipping_cost"
|
||
action: "set_value"
|
||
value: "0.00"
|
||
|
||
- name: "Standard-Versandkosten"
|
||
enabled: true
|
||
priority: 91
|
||
conditions:
|
||
logic: "and"
|
||
rules:
|
||
- field: "price"
|
||
operator: "<"
|
||
value: 49
|
||
actions:
|
||
- target_field: "shipping_cost"
|
||
action: "set_value"
|
||
value: "4.95"
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies ist eine Beispiel-Konfigurationsdatei. Alle Schwellenwerte, Feldnamen und Regeln müssen an Ihre tatsächlichen Geschäftsanforderungen, JTL-Exportfelder und Google-Shopping-Attribute angepasst werden.
|
||
|
||
### 5.2.2 Spalten-Mapping (column_mapping.yaml)
|
||
|
||
Erstellen Sie zusätzlich die Datei `/opt/etl-pipeline/config/column_mapping.yaml`:
|
||
|
||
```yaml
|
||
# ══════════════════════════════════════════════════════════
|
||
# /opt/etl-pipeline/config/column_mapping.yaml
|
||
#
|
||
# Mapping: JTL-WAWI Exportfeld → Google Shopping Attribut
|
||
# ══════════════════════════════════════════════════════════
|
||
|
||
mapping:
|
||
# Pflichtfelder Google Shopping
|
||
Artikelnummer: "offerId"
|
||
Artikelname: "title"
|
||
Beschreibung: "description"
|
||
URL: "link"
|
||
BildURL: "imageLink"
|
||
VK_Brutto: "price"
|
||
Waehrung: "priceCurrency"
|
||
Verfuegbarkeit: "availability"
|
||
Marke: "brand"
|
||
GTIN: "gtin"
|
||
MPN: "mpn"
|
||
Zustand: "condition"
|
||
Kategorie_Google: "googleProductCategory"
|
||
|
||
# Optionale Felder
|
||
Farbe: "color"
|
||
Groesse: "size"
|
||
Material: "material"
|
||
Geschlecht: "gender"
|
||
Altersgruppe: "ageGroup"
|
||
EK_Netto: "cost_of_goods_sold"
|
||
UVP: "uvp"
|
||
Lagerbestand: "stock_quantity"
|
||
|
||
# Berechnete Felder (werden durch Pipeline erzeugt)
|
||
# sales_quantity_90d → aus Google API
|
||
# revenue_90d → aus Google API
|
||
# margin_percent → berechnet aus VK und EK
|
||
# custom_label_0-4 → aus rules.yaml
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies ist eine Beispiel-Zuordnung. Die tatsächlichen Spaltennamen in Ihrem JTL-WAWI-Export können abweichen. Prüfen Sie die Header-Zeile Ihrer CSV-Datei und passen Sie das Mapping entsprechend an.
|
||
|
||
---
|
||
|
||
# 6. Core Script Logic (Beispiel)
|
||
|
||
Dieses Kapitel zeigt den konzeptionellen Aufbau der Python-Skripte. Der Code ist als **funktionales Boilerplate** zu verstehen, das als Grundlage für die eigene Implementierung dient.
|
||
|
||
## 6.1 Hilfsfunktionen (utils.py)
|
||
|
||
```python
|
||
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
/opt/etl-pipeline/scripts/utils.py
|
||
|
||
Hilfsfunktionen für Logging, Konfiguration und gemeinsam genutzte Utilities.
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import yaml
|
||
import logging
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
from logging.handlers import RotatingFileHandler
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# Pfad-Konstanten
|
||
# ──────────────────────────────────────────────────────────
|
||
BASE_DIR = Path("/opt/etl-pipeline")
|
||
CONFIG_DIR = BASE_DIR / "config"
|
||
DATA_DIR = BASE_DIR / "data"
|
||
LOG_DIR = BASE_DIR / "logs"
|
||
CREDENTIALS_DIR = BASE_DIR / "credentials"
|
||
|
||
|
||
def setup_logging(
|
||
log_name: str = "etl_pipeline",
|
||
log_level: int = logging.INFO,
|
||
max_bytes: int = 10 * 1024 * 1024, # 10 MB
|
||
backup_count: int = 5,
|
||
) -> logging.Logger:
|
||
"""
|
||
Konfiguriert das Logging mit Rotation und separatem Error-Log.
|
||
|
||
Args:
|
||
log_name: Name des Loggers und der Log-Datei.
|
||
log_level: Logging-Level (default: INFO).
|
||
max_bytes: Maximale Größe einer Log-Datei vor Rotation.
|
||
backup_count: Anzahl der aufzubewahrenden rotierten Log-Dateien.
|
||
|
||
Returns:
|
||
Konfigurierter Logger.
|
||
"""
|
||
LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
logger = logging.getLogger(log_name)
|
||
logger.setLevel(log_level)
|
||
|
||
# Verhindere doppelte Handler bei mehrfachem Aufruf
|
||
if logger.handlers:
|
||
return logger
|
||
|
||
# Formatter
|
||
formatter = logging.Formatter(
|
||
fmt="%(asctime)s | %(levelname)-8s | %(name)s | %(funcName)s:%(lineno)d | %(message)s",
|
||
datefmt="%Y-%m-%d %H:%M:%S",
|
||
)
|
||
|
||
# Haupt-Log (Rotation)
|
||
main_handler = RotatingFileHandler(
|
||
LOG_DIR / f"{log_name}.log",
|
||
maxBytes=max_bytes,
|
||
backupCount=backup_count,
|
||
encoding="utf-8",
|
||
)
|
||
main_handler.setLevel(log_level)
|
||
main_handler.setFormatter(formatter)
|
||
|
||
# Error-Log (nur ERROR und höher)
|
||
error_handler = RotatingFileHandler(
|
||
LOG_DIR / "error.log",
|
||
maxBytes=max_bytes,
|
||
backupCount=backup_count,
|
||
encoding="utf-8",
|
||
)
|
||
error_handler.setLevel(logging.ERROR)
|
||
error_handler.setFormatter(formatter)
|
||
|
||
# Console-Handler
|
||
console_handler = logging.StreamHandler(sys.stdout)
|
||
console_handler.setLevel(log_level)
|
||
console_handler.setFormatter(formatter)
|
||
|
||
logger.addHandler(main_handler)
|
||
logger.addHandler(error_handler)
|
||
logger.addHandler(console_handler)
|
||
|
||
return logger
|
||
|
||
|
||
def load_yaml_config(config_filename: str) -> dict:
|
||
"""
|
||
Lädt eine YAML-Konfigurationsdatei aus dem config-Verzeichnis.
|
||
|
||
Args:
|
||
config_filename: Name der YAML-Datei (z. B. 'rules.yaml').
|
||
|
||
Returns:
|
||
Dictionary mit dem Inhalt der YAML-Datei.
|
||
|
||
Raises:
|
||
FileNotFoundError: Wenn die Datei nicht existiert.
|
||
yaml.YAMLError: Wenn die YAML-Syntax ungültig ist.
|
||
"""
|
||
config_path = CONFIG_DIR / config_filename
|
||
if not config_path.exists():
|
||
raise FileNotFoundError(
|
||
f"Konfigurationsdatei nicht gefunden: {config_path}"
|
||
)
|
||
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
config = yaml.safe_load(f)
|
||
|
||
return config
|
||
|
||
|
||
def get_timestamp() -> str:
|
||
"""Gibt einen formatierten Zeitstempel zurück (YYYYMMDD_HHMMSS)."""
|
||
return datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
|
||
|
||
def get_date_stamp() -> str:
|
||
"""Gibt einen formatierten Datumsstempel zurück (YYYYMMDD)."""
|
||
return datetime.now().strftime("%Y%m%d")
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies ist ein Beispiel-Skript. Alle Pfade, Log-Konfigurationen und Funktionen müssen an Ihre tatsächliche Serverumgebung und Anforderungen angepasst werden.
|
||
|
||
## 6.2 Extract-Modul (extract.py)
|
||
|
||
```python
|
||
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
/opt/etl-pipeline/scripts/extract.py
|
||
|
||
Modul für die Datenextraktion aus JTL-WAWI CSV und Google APIs.
|
||
Beide Datenquellen werden als Tabellen in DuckDB importiert.
|
||
"""
|
||
|
||
import os
|
||
import shutil
|
||
import duckdb
|
||
import polars as pl
|
||
from pathlib import Path
|
||
from datetime import datetime, timedelta
|
||
|
||
from google.oauth2 import service_account
|
||
from googleapiclient.discovery import build
|
||
|
||
from scripts.utils import (
|
||
setup_logging,
|
||
BASE_DIR,
|
||
DATA_DIR,
|
||
CREDENTIALS_DIR,
|
||
get_date_stamp,
|
||
)
|
||
|
||
logger = setup_logging("extract")
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════
|
||
# JTL-WAWI CSV-Import
|
||
# ══════════════════════════════════════════════════════════
|
||
|
||
def load_jtl_csv_to_duckdb(
|
||
con: duckdb.DuckDBPyConnection,
|
||
csv_path: str | Path | None = None,
|
||
table_name: str = "products_raw",
|
||
) -> int:
|
||
"""
|
||
Liest die JTL-WAWI CSV-Exportdatei und importiert sie als Tabelle
|
||
in die DuckDB-Datenbank.
|
||
|
||
Args:
|
||
con: Aktive DuckDB-Verbindung.
|
||
csv_path: Pfad zur CSV-Datei. Falls None, wird der Standardpfad verwendet.
|
||
table_name: Name der Zieltabelle in DuckDB.
|
||
|
||
Returns:
|
||
Anzahl der importierten Zeilen.
|
||
"""
|
||
if csv_path is None:
|
||
csv_path = DATA_DIR / "input" / "jtl_export.csv"
|
||
|
||
csv_path = Path(csv_path)
|
||
if not csv_path.exists():
|
||
raise FileNotFoundError(f"JTL-CSV nicht gefunden: {csv_path}")
|
||
|
||
file_size_mb = csv_path.stat().st_size / (1024 * 1024)
|
||
logger.info(f"Lade JTL-CSV: {csv_path} ({file_size_mb:.1f} MB)")
|
||
|
||
# Bestehende Tabelle löschen und neu erstellen
|
||
con.execute(f"DROP TABLE IF EXISTS {table_name}")
|
||
|
||
# CSV mit DuckDB's optimiertem CSV-Reader importieren
|
||
# auto_detect=true erkennt Trennzeichen, Encoding und Datentypen
|
||
con.execute(f"""
|
||
CREATE TABLE {table_name} AS
|
||
SELECT *
|
||
FROM read_csv_auto(
|
||
'{csv_path}',
|
||
header = true,
|
||
delim = ';',
|
||
quote = '"',
|
||
escape = '"',
|
||
encoding = 'utf-8',
|
||
sample_size = 10000,
|
||
ignore_errors = false
|
||
)
|
||
""")
|
||
|
||
row_count = con.execute(
|
||
f"SELECT COUNT(*) FROM {table_name}"
|
||
).fetchone()[0]
|
||
|
||
logger.info(
|
||
f"JTL-CSV erfolgreich geladen: "
|
||
f"{row_count:,} Zeilen in Tabelle '{table_name}'"
|
||
)
|
||
|
||
# Archivierung des Imports
|
||
archive_path = DATA_DIR / "archive" / f"jtl_export_{get_date_stamp()}.csv"
|
||
if not archive_path.exists():
|
||
shutil.copy2(csv_path, archive_path)
|
||
logger.info(f"CSV archiviert: {archive_path}")
|
||
|
||
return row_count
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════
|
||
# Google Sales-Daten abrufen
|
||
# ══════════════════════════════════════════════════════════
|
||
|
||
def fetch_google_sales_data(
|
||
con: duckdb.DuckDBPyConnection,
|
||
table_name: str = "sales_90d",
|
||
lookback_days: int = 90,
|
||
) -> int:
|
||
"""
|
||
Ruft Verkaufsdaten der letzten N Tage über die Google API ab
|
||
und importiert sie als Tabelle in DuckDB.
|
||
|
||
Args:
|
||
con: Aktive DuckDB-Verbindung.
|
||
table_name: Name der Zieltabelle in DuckDB.
|
||
lookback_days: Anzahl der zurückliegenden Tage (Standard: 90).
|
||
|
||
Returns:
|
||
Anzahl der importierten Zeilen.
|
||
"""
|
||
logger.info(
|
||
f"Rufe Google Sales-Daten ab (letzte {lookback_days} Tage)..."
|
||
)
|
||
|
||
# ── Google API Authentifizierung ──────────────────────
|
||
credentials_path = os.environ.get(
|
||
"GOOGLE_APPLICATION_CREDENTIALS",
|
||
str(CREDENTIALS_DIR / "google_service_account.json"),
|
||
)
|
||
merchant_id = os.environ.get("MERCHANT_CENTER_ID")
|
||
|
||
if not merchant_id:
|
||
raise ValueError(
|
||
"Umgebungsvariable MERCHANT_CENTER_ID ist nicht gesetzt"
|
||
)
|
||
|
||
credentials = service_account.Credentials.from_service_account_file(
|
||
credentials_path,
|
||
scopes=["https://www.googleapis.com/auth/content"],
|
||
)
|
||
|
||
service = build("content", "v2.1", credentials=credentials)
|
||
|
||
# ── Zeitraum berechnen ────────────────────────────────
|
||
end_date = datetime.now()
|
||
start_date = end_date - timedelta(days=lookback_days)
|
||
|
||
# ── Performance-Daten aus dem Merchant Center abrufen ─
|
||
# HINWEIS: Der tatsächliche API-Aufruf hängt von der
|
||
# verwendeten Google API ab (Merchant Center Reports,
|
||
# Google Ads API, GA4 Data API, etc.)
|
||
# Hier wird ein konzeptionelles Beispiel gezeigt.
|
||
|
||
sales_records = []
|
||
|
||
try:
|
||
# Beispiel: Merchant Center Performance Report
|
||
request_body = {
|
||
"query": f"""
|
||
SELECT
|
||
segments.offer_id,
|
||
metrics.impressions,
|
||
metrics.clicks,
|
||
metrics.conversions,
|
||
metrics.conversion_value
|
||
FROM MerchantPerformanceView
|
||
WHERE segments.date BETWEEN
|
||
'{start_date.strftime('%Y-%m-%d')}'
|
||
AND '{end_date.strftime('%Y-%m-%d')}'
|
||
"""
|
||
}
|
||
|
||
response = (
|
||
service.reports()
|
||
.search(merchantId=merchant_id, body=request_body)
|
||
.execute()
|
||
)
|
||
|
||
for row in response.get("results", []):
|
||
segments = row.get("segments", {})
|
||
metrics = row.get("metrics", {})
|
||
sales_records.append({
|
||
"offer_id": segments.get("offerId", ""),
|
||
"impressions": int(metrics.get("impressions", 0)),
|
||
"clicks": int(metrics.get("clicks", 0)),
|
||
"sales_quantity_90d": int(
|
||
metrics.get("conversions", 0)
|
||
),
|
||
"revenue_90d": float(
|
||
metrics.get("conversionValue", 0.0)
|
||
),
|
||
})
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"Fehler beim Abrufen der Google Sales-Daten: {e}"
|
||
)
|
||
raise
|
||
|
||
# ── Daten in DuckDB importieren ───────────────────────
|
||
if sales_records:
|
||
df_sales = pl.DataFrame(sales_records)
|
||
con.execute(f"DROP TABLE IF EXISTS {table_name}")
|
||
con.execute(
|
||
f"CREATE TABLE {table_name} AS SELECT * FROM df_sales"
|
||
)
|
||
row_count = con.execute(
|
||
f"SELECT COUNT(*) FROM {table_name}"
|
||
).fetchone()[0]
|
||
logger.info(
|
||
f"Google Sales-Daten geladen: "
|
||
f"{row_count:,} Zeilen in '{table_name}'"
|
||
)
|
||
return row_count
|
||
else:
|
||
logger.warning(
|
||
"Keine Sales-Daten von der Google API erhalten."
|
||
)
|
||
# Leere Tabelle erstellen, damit der JOIN nicht fehlschlägt
|
||
con.execute(f"DROP TABLE IF EXISTS {table_name}")
|
||
con.execute(f"""
|
||
CREATE TABLE {table_name} (
|
||
offer_id VARCHAR,
|
||
impressions INTEGER DEFAULT 0,
|
||
clicks INTEGER DEFAULT 0,
|
||
sales_quantity_90d INTEGER DEFAULT 0,
|
||
revenue_90d DOUBLE DEFAULT 0.0
|
||
)
|
||
""")
|
||
return 0
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies ist ein konzeptionelles Beispiel-Skript. Die Google API-Aufrufe, insbesondere der Report-Query und die Antwortstruktur, müssen an die tatsächlich verwendete API-Version und Ihr Merchant-Center-Setup angepasst werden. Konsultieren Sie die offizielle Google API-Dokumentation für die aktuellen Endpunkte und Antwortformate.
|
||
|
||
## 6.3 Transform-Modul (transform.py)
|
||
|
||
```python
|
||
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
/opt/etl-pipeline/scripts/transform.py
|
||
|
||
Modul für die Datentransformation:
|
||
1. LEFT JOIN in DuckDB (Produkte ⟕ Sales)
|
||
2. Polars-Transformationen basierend auf rules.yaml
|
||
"""
|
||
|
||
import duckdb
|
||
import polars as pl
|
||
from typing import Any
|
||
|
||
from scripts.utils import setup_logging, load_yaml_config
|
||
|
||
logger = setup_logging("transform")
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════
|
||
# DuckDB LEFT JOIN
|
||
# ══════════════════════════════════════════════════════════
|
||
|
||
def join_products_with_sales(
|
||
con: duckdb.DuckDBPyConnection,
|
||
products_table: str = "products_raw",
|
||
sales_table: str = "sales_90d",
|
||
joined_table: str = "products_enriched",
|
||
join_key_products: str = "Artikelnummer",
|
||
join_key_sales: str = "offer_id",
|
||
) -> pl.DataFrame:
|
||
"""
|
||
Führt einen LEFT JOIN zwischen Produktstammdaten und
|
||
Verkaufsdaten in DuckDB durch und gibt das Ergebnis als
|
||
Polars DataFrame zurück.
|
||
|
||
Ein LEFT JOIN stellt sicher, dass alle Produkte erhalten
|
||
bleiben, auch wenn keine Verkaufsdaten vorliegen
|
||
(NULL-Werte werden mit 0 gefüllt).
|
||
|
||
Args:
|
||
con: Aktive DuckDB-Verbindung.
|
||
products_table: Name der Produkttabelle.
|
||
sales_table: Name der Verkaufstabelle.
|
||
joined_table: Name der Ergebnistabelle.
|
||
join_key_products: JOIN-Schlüssel in der Produkttabelle.
|
||
join_key_sales: JOIN-Schlüssel in der Verkaufstabelle.
|
||
|
||
Returns:
|
||
Polars DataFrame mit den verknüpften Daten.
|
||
"""
|
||
logger.info(
|
||
f"Führe LEFT JOIN durch: {products_table} ⟕ {sales_table} "
|
||
f"ON {join_key_products} = {join_key_sales}"
|
||
)
|
||
|
||
# ── LEFT JOIN mit COALESCE für NULL-Behandlung ────────
|
||
con.execute(f"DROP TABLE IF EXISTS {joined_table}")
|
||
con.execute(f"""
|
||
CREATE TABLE {joined_table} AS
|
||
SELECT
|
||
p.*,
|
||
COALESCE(s.impressions, 0)
|
||
AS impressions,
|
||
COALESCE(s.clicks, 0)
|
||
AS clicks,
|
||
COALESCE(s.sales_quantity_90d, 0)
|
||
AS sales_quantity_90d,
|
||
COALESCE(s.revenue_90d, 0.0)
|
||
AS revenue_90d
|
||
FROM {products_table} AS p
|
||
LEFT JOIN {sales_table} AS s
|
||
ON CAST(p."{join_key_products}" AS VARCHAR)
|
||
= CAST(s.{join_key_sales} AS VARCHAR)
|
||
""")
|
||
|
||
row_count = con.execute(
|
||
f"SELECT COUNT(*) FROM {joined_table}"
|
||
).fetchone()[0]
|
||
logger.info(
|
||
f"LEFT JOIN abgeschlossen: "
|
||
f"{row_count:,} Zeilen in '{joined_table}'"
|
||
)
|
||
|
||
# ── Ergebnis als Polars DataFrame exportieren ─────────
|
||
result = con.execute(f"SELECT * FROM {joined_table}").pl()
|
||
logger.info(
|
||
f"Polars DataFrame erstellt: "
|
||
f"{result.shape[0]:,} Zeilen, {result.shape[1]} Spalten"
|
||
)
|
||
|
||
return result
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════
|
||
# Berechnete Felder hinzufügen
|
||
# ══════════════════════════════════════════════════════════
|
||
|
||
def add_calculated_fields(df: pl.DataFrame) -> pl.DataFrame:
|
||
"""
|
||
Fügt berechnete Felder hinzu, die als Grundlage für die
|
||
Regel-Engine dienen.
|
||
|
||
Args:
|
||
df: Polars DataFrame mit den verknüpften Rohdaten.
|
||
|
||
Returns:
|
||
Polars DataFrame mit zusätzlichen berechneten Spalten.
|
||
"""
|
||
logger.info("Füge berechnete Felder hinzu...")
|
||
|
||
df = df.with_columns([
|
||
# Marge in Prozent (wenn EK vorhanden)
|
||
pl.when(
|
||
(pl.col("EK_Netto").is_not_null())
|
||
& (pl.col("VK_Brutto") > 0)
|
||
)
|
||
.then(
|
||
(
|
||
(pl.col("VK_Brutto") - pl.col("EK_Netto"))
|
||
/ pl.col("VK_Brutto")
|
||
* 100
|
||
).round(2)
|
||
)
|
||
.otherwise(pl.lit(None))
|
||
.alias("margin_percent"),
|
||
|
||
# Click-Through-Rate
|
||
pl.when(pl.col("impressions") > 0)
|
||
.then(
|
||
(pl.col("clicks") / pl.col("impressions") * 100)
|
||
.round(2)
|
||
)
|
||
.otherwise(pl.lit(0.0))
|
||
.alias("ctr_percent"),
|
||
|
||
# Conversion-Rate
|
||
pl.when(pl.col("clicks") > 0)
|
||
.then(
|
||
(
|
||
pl.col("sales_quantity_90d")
|
||
/ pl.col("clicks")
|
||
* 100
|
||
).round(2)
|
||
)
|
||
.otherwise(pl.lit(0.0))
|
||
.alias("conversion_rate_percent"),
|
||
|
||
# Custom Label Spalten initialisieren (leer)
|
||
pl.lit("").alias("custom_label_0"),
|
||
pl.lit("").alias("custom_label_1"),
|
||
pl.lit("").alias("custom_label_2"),
|
||
pl.lit("").alias("custom_label_3"),
|
||
pl.lit("").alias("custom_label_4"),
|
||
|
||
# Versandkosten initialisieren
|
||
pl.lit("4.95").alias("shipping_cost"),
|
||
|
||
# Exclude-Flag initialisieren
|
||
pl.lit(False).alias("_excluded"),
|
||
pl.lit("").alias("_exclude_reason"),
|
||
])
|
||
|
||
logger.info(
|
||
f"Berechnete Felder hinzugefügt. "
|
||
f"DataFrame: {df.shape[1]} Spalten"
|
||
)
|
||
return df
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════
|
||
# YAML Regel-Engine
|
||
# ══════════════════════════════════════════════════════════
|
||
|
||
def evaluate_condition(
|
||
df: pl.DataFrame,
|
||
condition: dict,
|
||
) -> pl.Series:
|
||
"""
|
||
Evaluiert eine einzelne Bedingung und gibt eine
|
||
boolesche Maske zurück.
|
||
|
||
Args:
|
||
df: Polars DataFrame.
|
||
condition: Dictionary mit field, operator, value.
|
||
|
||
Returns:
|
||
Polars Series (Boolean) als Filtermaske.
|
||
"""
|
||
field = condition["field"]
|
||
operator = condition["operator"]
|
||
value = condition.get("value")
|
||
value_from_field = condition.get("value_from_field")
|
||
|
||
# Prüfen ob das Feld existiert
|
||
if field not in df.columns:
|
||
logger.warning(
|
||
f"Feld '{field}' existiert nicht im DataFrame. "
|
||
f"Bedingung übersprungen."
|
||
)
|
||
return pl.Series("mask", [False] * df.height)
|
||
|
||
col = pl.col(field)
|
||
|
||
# Vergleichswert: fester Wert oder aus anderem Feld
|
||
if value_from_field:
|
||
compare_value = pl.col(value_from_field)
|
||
else:
|
||
compare_value = value
|
||
|
||
# ── Operator-Mapping ──────────────────────────────────
|
||
operator_map = {
|
||
">": col > compare_value,
|
||
"<": col < compare_value,
|
||
">=": col >= compare_value,
|
||
"<=": col <= compare_value,
|
||
"==": col == compare_value,
|
||
"!=": col != compare_value,
|
||
"contains": col.cast(pl.Utf8).str.contains(
|
||
str(compare_value), literal=True
|
||
),
|
||
"not_contains": ~col.cast(pl.Utf8).str.contains(
|
||
str(compare_value), literal=True
|
||
),
|
||
"starts_with": col.cast(pl.Utf8).str.starts_with(
|
||
str(compare_value)
|
||
),
|
||
"ends_with": col.cast(pl.Utf8).str.ends_with(
|
||
str(compare_value)
|
||
),
|
||
"is_empty": (
|
||
col.is_null() | (col.cast(pl.Utf8) == "")
|
||
),
|
||
"is_not_empty": (
|
||
col.is_not_null() & (col.cast(pl.Utf8) != "")
|
||
),
|
||
}
|
||
|
||
if operator not in operator_map:
|
||
logger.error(f"Unbekannter Operator: '{operator}'")
|
||
return pl.Series("mask", [False] * df.height)
|
||
|
||
return df.select(operator_map[operator]).to_series()
|
||
|
||
|
||
def apply_rules(
|
||
df: pl.DataFrame,
|
||
rules_config: dict,
|
||
) -> pl.DataFrame:
|
||
"""
|
||
Wendet alle aktivierten Regeln aus der rules.yaml auf den
|
||
DataFrame an. Regeln werden nach Priorität sortiert und
|
||
sequenziell abgearbeitet.
|
||
|
||
Args:
|
||
df: Polars DataFrame mit den angereicherten Produktdaten.
|
||
rules_config: Geladene rules.yaml als Dictionary.
|
||
|
||
Returns:
|
||
Polars DataFrame nach Anwendung aller Regeln.
|
||
"""
|
||
rules = rules_config.get("rules", [])
|
||
settings = rules_config.get("settings", {})
|
||
|
||
# Regeln nach Priorität sortieren
|
||
# (niedrigere Zahl = höhere Priorität)
|
||
rules_sorted = sorted(
|
||
[r for r in rules if r.get("enabled", True)],
|
||
key=lambda r: r.get("priority", 999),
|
||
)
|
||
|
||
logger.info(f"Wende {len(rules_sorted)} aktive Regeln an...")
|
||
|
||
# ── Globale Ausschlüsse (Settings) ────────────────────
|
||
if settings.get("exclude_out_of_stock", False):
|
||
df = df.with_columns(
|
||
pl.when(pl.col("stock_quantity") <= 0)
|
||
.then(pl.lit(True))
|
||
.otherwise(pl.col("_excluded"))
|
||
.alias("_excluded")
|
||
)
|
||
excluded = df.filter(pl.col("_excluded")).height
|
||
logger.info(
|
||
f"Globaler Ausschluss (Bestand=0): "
|
||
f"{excluded:,} Artikel markiert"
|
||
)
|
||
|
||
min_price = settings.get("minimum_price", 0)
|
||
if min_price > 0:
|
||
df = df.with_columns(
|
||
pl.when(pl.col("VK_Brutto") < min_price)
|
||
.then(pl.lit(True))
|
||
.otherwise(pl.col("_excluded"))
|
||
.alias("_excluded")
|
||
)
|
||
|
||
# ── Regeln sequenziell anwenden ───────────────────────
|
||
for rule in rules_sorted:
|
||
rule_name = rule.get("name", "Unbenannt")
|
||
conditions_config = rule.get("conditions", {})
|
||
actions = rule.get("actions", [])
|
||
logic = conditions_config.get("logic", "and")
|
||
sub_rules = conditions_config.get("rules", [])
|
||
|
||
logger.debug(
|
||
f"Verarbeite Regel: '{rule_name}' "
|
||
f"(Priorität: {rule.get('priority', '?')})"
|
||
)
|
||
|
||
# Alle Teilbedingungen evaluieren
|
||
masks = [
|
||
evaluate_condition(df, cond) for cond in sub_rules
|
||
]
|
||
|
||
if not masks:
|
||
logger.warning(
|
||
f"Regel '{rule_name}' hat keine Bedingungen. "
|
||
f"Übersprungen."
|
||
)
|
||
continue
|
||
|
||
# Logische Verknüpfung der Teilbedingungen
|
||
if logic == "and":
|
||
combined_mask = masks[0]
|
||
for m in masks[1:]:
|
||
combined_mask = combined_mask & m
|
||
elif logic == "or":
|
||
combined_mask = masks[0]
|
||
for m in masks[1:]:
|
||
combined_mask = combined_mask | m
|
||
else:
|
||
logger.error(
|
||
f"Unbekannte Logik '{logic}' in Regel "
|
||
f"'{rule_name}'. Übersprungen."
|
||
)
|
||
continue
|
||
|
||
matching_count = combined_mask.sum()
|
||
logger.info(
|
||
f"Regel '{rule_name}': "
|
||
f"{matching_count:,} Artikel treffen zu"
|
||
)
|
||
|
||
# Aktionen auf übereinstimmende Zeilen anwenden
|
||
for action_def in actions:
|
||
action_type = action_def.get("action", "set_value")
|
||
target_field = action_def.get("target_field", "")
|
||
|
||
if action_type == "set_value":
|
||
value = action_def.get("value", "")
|
||
df = df.with_columns(
|
||
pl.when(combined_mask)
|
||
.then(pl.lit(value))
|
||
.otherwise(pl.col(target_field))
|
||
.alias(target_field)
|
||
)
|
||
|
||
elif action_type == "copy_field":
|
||
source = action_def.get("source_field", "")
|
||
if source in df.columns:
|
||
df = df.with_columns(
|
||
pl.when(combined_mask)
|
||
.then(pl.col(source))
|
||
.otherwise(pl.col(target_field))
|
||
.alias(target_field)
|
||
)
|
||
|
||
elif action_type == "prepend":
|
||
template = action_def.get(
|
||
"value_template", ""
|
||
)
|
||
for col_name in df.columns:
|
||
placeholder = f"{{{col_name}}}"
|
||
if placeholder in template:
|
||
suffix = template.replace(
|
||
placeholder, ""
|
||
)
|
||
df = df.with_columns(
|
||
pl.when(combined_mask)
|
||
.then(
|
||
pl.col(col_name)
|
||
.cast(pl.Utf8)
|
||
+ pl.lit(suffix)
|
||
+ pl.col(target_field)
|
||
.cast(pl.Utf8)
|
||
)
|
||
.otherwise(pl.col(target_field))
|
||
.alias(target_field)
|
||
)
|
||
break
|
||
|
||
elif action_type == "append":
|
||
value = action_def.get("value", "")
|
||
df = df.with_columns(
|
||
pl.when(combined_mask)
|
||
.then(
|
||
pl.col(target_field).cast(pl.Utf8)
|
||
+ pl.lit(value)
|
||
)
|
||
.otherwise(pl.col(target_field))
|
||
.alias(target_field)
|
||
)
|
||
|
||
elif action_type == "exclude":
|
||
reason = action_def.get(
|
||
"reason", "Durch Regel ausgeschlossen"
|
||
)
|
||
df = df.with_columns(
|
||
pl.when(combined_mask)
|
||
.then(pl.lit(True))
|
||
.otherwise(pl.col("_excluded"))
|
||
.alias("_excluded"),
|
||
pl.when(combined_mask)
|
||
.then(pl.lit(reason))
|
||
.otherwise(pl.col("_exclude_reason"))
|
||
.alias("_exclude_reason"),
|
||
)
|
||
|
||
# ── Statistik ─────────────────────────────────────────
|
||
total = df.height
|
||
excluded_count = df.filter(pl.col("_excluded")).height
|
||
active_count = total - excluded_count
|
||
logger.info(
|
||
f"Transformation abgeschlossen: {total:,} Gesamt | "
|
||
f"{active_count:,} Aktiv | "
|
||
f"{excluded_count:,} Ausgeschlossen"
|
||
)
|
||
|
||
return df
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies ist ein konzeptionelles Beispiel-Skript. Die Regel-Engine ist bewusst einfach gehalten und muss für den Produktionseinsatz um Fehlerbehandlung, Validierung der Eingabedaten und weitere Operatoren erweitert werden. Die Feldnamen (`EK_Netto`, `VK_Brutto`, `stock_quantity` usw.) müssen mit den tatsächlichen Spalten Ihres JTL-Exports übereinstimmen.
|
||
|
||
## 6.4 Load-Modul (load.py)
|
||
|
||
```python
|
||
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
/opt/etl-pipeline/scripts/load.py
|
||
|
||
Modul für den Batch-Upload transformierter Produktdaten
|
||
an die Google Content API for Shopping.
|
||
Implementiert Rate-Limiting und exponentielles Backoff.
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import time
|
||
import polars as pl
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
|
||
from google.oauth2 import service_account
|
||
from googleapiclient.discovery import build
|
||
from googleapiclient.errors import HttpError
|
||
from tenacity import (
|
||
retry,
|
||
stop_after_attempt,
|
||
wait_exponential,
|
||
retry_if_exception_type,
|
||
)
|
||
|
||
from scripts.utils import (
|
||
setup_logging,
|
||
load_yaml_config,
|
||
CREDENTIALS_DIR,
|
||
DATA_DIR,
|
||
LOG_DIR,
|
||
get_date_stamp,
|
||
)
|
||
|
||
logger = setup_logging("load")
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════
|
||
# Konstanten
|
||
# ══════════════════════════════════════════════════════════
|
||
BATCH_SIZE = 1000 # Produkte pro Batch-Request
|
||
RATE_LIMIT_DELAY = 1.0 # Sekunden Pause zwischen Batches
|
||
MAX_RETRIES = 5 # Maximale Wiederholungsversuche
|
||
BACKOFF_MULTIPLIER = 2 # Exponentieller Backoff-Faktor
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════
|
||
# Google Content API Client
|
||
# ══════════════════════════════════════════════════════════
|
||
|
||
def get_content_api_service():
|
||
"""
|
||
Erstellt einen authentifizierten Google Content API Service.
|
||
|
||
Returns:
|
||
Google API Service-Objekt für Content API v2.1.
|
||
"""
|
||
credentials_path = os.environ.get(
|
||
"GOOGLE_APPLICATION_CREDENTIALS",
|
||
str(CREDENTIALS_DIR / "google_service_account.json"),
|
||
)
|
||
|
||
credentials = service_account.Credentials.from_service_account_file(
|
||
credentials_path,
|
||
scopes=["https://www.googleapis.com/auth/content"],
|
||
)
|
||
|
||
service = build("content", "v2.1", credentials=credentials)
|
||
logger.info("Google Content API Service erstellt.")
|
||
return service
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════
|
||
# Produkt-Payload-Erstellung
|
||
# ══════════════════════════════════════════════════════════
|
||
|
||
def build_product_payload(
|
||
row: dict,
|
||
column_mapping: dict,
|
||
settings: dict,
|
||
) -> dict:
|
||
"""
|
||
Erstellt den API-Payload für ein einzelnes Produkt nach
|
||
dem Google Shopping Content API Schema.
|
||
|
||
Args:
|
||
row: Dictionary mit den Produktdaten einer Zeile.
|
||
column_mapping: Spalten-Mapping (JTL → Google).
|
||
settings: Globale Pipeline-Einstellungen.
|
||
|
||
Returns:
|
||
Dictionary im Google Content API Produktformat.
|
||
"""
|
||
target_country = settings.get("target_country", "DE")
|
||
content_language = settings.get("content_language", "de")
|
||
currency = settings.get("currency", "EUR")
|
||
mapping = column_mapping.get("mapping", {})
|
||
|
||
product = {
|
||
"offerId": str(row.get(
|
||
mapping.get("Artikelnummer", "Artikelnummer"), ""
|
||
)),
|
||
"title": str(row.get("title", row.get(
|
||
mapping.get("Artikelname", ""), ""
|
||
))),
|
||
"description": str(row.get(
|
||
mapping.get("Beschreibung", "Beschreibung"), ""
|
||
)),
|
||
"link": str(row.get(
|
||
mapping.get("URL", "URL"), ""
|
||
)),
|
||
"imageLink": str(row.get(
|
||
mapping.get("BildURL", "BildURL"), ""
|
||
)),
|
||
"contentLanguage": content_language,
|
||
"targetCountry": target_country,
|
||
"channel": "online",
|
||
"availability": (
|
||
"in stock"
|
||
if row.get("stock_quantity", 0) > 0
|
||
else "out of stock"
|
||
),
|
||
"condition": "new",
|
||
"price": {
|
||
"value": str(row.get("VK_Brutto", "0.00")),
|
||
"currency": currency,
|
||
},
|
||
}
|
||
|
||
# Optionale Felder hinzufügen
|
||
brand = row.get("Marke", row.get("brand", ""))
|
||
if brand:
|
||
product["brand"] = str(brand)
|
||
|
||
gtin = row.get("GTIN", row.get("gtin", ""))
|
||
if gtin:
|
||
product["gtin"] = str(gtin)
|
||
|
||
# Custom Labels (0-4)
|
||
for i in range(5):
|
||
label_value = row.get(f"custom_label_{i}", "")
|
||
if label_value:
|
||
product[f"customLabel{i}"] = str(label_value)
|
||
|
||
# Sale-Preis
|
||
sale_price = row.get("sale_price", "")
|
||
if sale_price and float(sale_price) > 0:
|
||
product["salePrice"] = {
|
||
"value": str(sale_price),
|
||
"currency": currency,
|
||
}
|
||
|
||
# Versandkosten
|
||
shipping_cost = row.get("shipping_cost", "")
|
||
if shipping_cost is not None and shipping_cost != "":
|
||
product["shipping"] = [{
|
||
"country": target_country,
|
||
"price": {
|
||
"value": str(shipping_cost),
|
||
"currency": currency,
|
||
},
|
||
}]
|
||
|
||
return product
|
||
|
||
|
||
# ══════════════════════════════════════════════════════════
|
||
# Batch-Upload mit Rate Limiting
|
||
# ══════════════════════════════════════════════════════════
|
||
|
||
@retry(
|
||
stop=stop_after_attempt(MAX_RETRIES),
|
||
wait=wait_exponential(
|
||
multiplier=BACKOFF_MULTIPLIER, min=2, max=60
|
||
),
|
||
retry=retry_if_exception_type(
|
||
(HttpError, ConnectionError, TimeoutError)
|
||
),
|
||
before_sleep=lambda retry_state: logger.warning(
|
||
f"Retry {retry_state.attempt_number}/{MAX_RETRIES} "
|
||
f"nach Fehler. "
|
||
f"Warte {retry_state.next_action.sleep:.1f}s..."
|
||
),
|
||
)
|
||
def execute_batch_request(
|
||
service,
|
||
merchant_id: str,
|
||
batch_entries: list[dict],
|
||
) -> dict:
|
||
"""
|
||
Führt einen einzelnen Batch-Request an die Content API aus.
|
||
Implementiert exponentielles Backoff bei Rate-Limit-Fehlern.
|
||
|
||
Args:
|
||
service: Google Content API Service.
|
||
merchant_id: Merchant Center ID.
|
||
batch_entries: Liste der Batch-Einträge.
|
||
|
||
Returns:
|
||
API-Antwort als Dictionary.
|
||
"""
|
||
body = {"entries": batch_entries}
|
||
|
||
response = (
|
||
service.products()
|
||
.custombatch(body=body)
|
||
.execute()
|
||
)
|
||
|
||
return response
|
||
|
||
|
||
def upload_products_to_google(
|
||
df: pl.DataFrame,
|
||
batch_size: int = BATCH_SIZE,
|
||
rate_limit_delay: float = RATE_LIMIT_DELAY,
|
||
) -> dict:
|
||
"""
|
||
Lädt alle aktiven (nicht ausgeschlossenen) Produkte als
|
||
Batch-Requests an die Google Content API hoch.
|
||
|
||
Args:
|
||
df: Polars DataFrame mit transformierten Produktdaten.
|
||
batch_size: Anzahl der Produkte pro Batch-Request.
|
||
rate_limit_delay: Pause in Sekunden zwischen Batches.
|
||
|
||
Returns:
|
||
Dictionary mit Upload-Statistiken.
|
||
"""
|
||
merchant_id = os.environ.get("MERCHANT_CENTER_ID")
|
||
if not merchant_id:
|
||
raise ValueError("MERCHANT_CENTER_ID ist nicht gesetzt")
|
||
|
||
# Konfiguration laden
|
||
column_mapping = load_yaml_config("column_mapping.yaml")
|
||
rules_config = load_yaml_config("rules.yaml")
|
||
settings = rules_config.get("settings", {})
|
||
|
||
# Nur aktive (nicht ausgeschlossene) Produkte uploaden
|
||
df_active = df.filter(~pl.col("_excluded"))
|
||
total_products = df_active.height
|
||
|
||
logger.info(
|
||
f"Starte Upload: {total_products:,} aktive Produkte "
|
||
f"in Batches von {batch_size}"
|
||
)
|
||
|
||
# Service erstellen
|
||
service = get_content_api_service()
|
||
|
||
# ── Statistiken ───────────────────────────────────────
|
||
stats = {
|
||
"total": total_products,
|
||
"success": 0,
|
||
"errors": 0,
|
||
"batches": 0,
|
||
"start_time": datetime.now().isoformat(),
|
||
"error_details": [],
|
||
}
|
||
|
||
# ── Produkte in Batches aufteilen und hochladen ───────
|
||
rows_as_dicts = df_active.to_dicts()
|
||
total_batches = (
|
||
(total_products + batch_size - 1) // batch_size
|
||
)
|
||
|
||
for batch_start in range(0, total_products, batch_size):
|
||
batch_end = min(batch_start + batch_size, total_products)
|
||
batch_rows = rows_as_dicts[batch_start:batch_end]
|
||
batch_number = (batch_start // batch_size) + 1
|
||
|
||
logger.info(
|
||
f"Batch {batch_number}/{total_batches}: "
|
||
f"Produkte {batch_start + 1:,} – {batch_end:,}"
|
||
)
|
||
|
||
# Batch-Einträge erstellen
|
||
batch_entries = []
|
||
for idx, row in enumerate(batch_rows):
|
||
product_payload = build_product_payload(
|
||
row, column_mapping, settings
|
||
)
|
||
batch_entries.append({
|
||
"batchId": batch_start + idx,
|
||
"merchantId": merchant_id,
|
||
"method": "insert",
|
||
"product": product_payload,
|
||
})
|
||
|
||
# Batch-Request ausführen (mit Retry/Backoff)
|
||
try:
|
||
response = execute_batch_request(
|
||
service, merchant_id, batch_entries
|
||
)
|
||
|
||
# Antwort auswerten
|
||
for entry in response.get("entries", []):
|
||
if (
|
||
"errors" in entry
|
||
and entry["errors"].get("errors")
|
||
):
|
||
stats["errors"] += 1
|
||
batch_idx = entry.get("batchId", 0)
|
||
local_idx = batch_idx - batch_start
|
||
offer_id = "unbekannt"
|
||
if 0 <= local_idx < len(batch_rows):
|
||
offer_id = batch_rows[local_idx].get(
|
||
"Artikelnummer", "unbekannt"
|
||
)
|
||
error_info = {
|
||
"batchId": batch_idx,
|
||
"offerId": offer_id,
|
||
"errors": entry["errors"]["errors"],
|
||
}
|
||
stats["error_details"].append(error_info)
|
||
logger.warning(
|
||
f"Fehler bei Produkt {offer_id}: "
|
||
f"{entry['errors']['errors']}"
|
||
)
|
||
else:
|
||
stats["success"] += 1
|
||
|
||
except HttpError as e:
|
||
if e.resp.status == 429:
|
||
logger.warning(
|
||
f"Rate Limit erreicht bei Batch "
|
||
f"{batch_number}. Erhöhe Wartezeit..."
|
||
)
|
||
time.sleep(rate_limit_delay * 5)
|
||
else:
|
||
logger.error(
|
||
f"HTTP-Fehler bei Batch "
|
||
f"{batch_number}: {e}"
|
||
)
|
||
stats["errors"] += len(batch_rows)
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"Unerwarteter Fehler bei Batch "
|
||
f"{batch_number}: {e}"
|
||
)
|
||
stats["errors"] += len(batch_rows)
|
||
|
||
stats["batches"] += 1
|
||
|
||
# ── Rate Limiting: Pause zwischen Batches ─────────
|
||
if batch_end < total_products:
|
||
logger.debug(
|
||
f"Rate Limiting: {rate_limit_delay}s Pause..."
|
||
)
|
||
time.sleep(rate_limit_delay)
|
||
|
||
# ── Upload-Statistiken loggen ─────────────────────────
|
||
stats["end_time"] = datetime.now().isoformat()
|
||
stats["duration_seconds"] = (
|
||
datetime.fromisoformat(stats["end_time"])
|
||
- datetime.fromisoformat(stats["start_time"])
|
||
).total_seconds()
|
||
|
||
logger.info(
|
||
f"Upload abgeschlossen: "
|
||
f"{stats['success']:,} erfolgreich | "
|
||
f"{stats['errors']:,} Fehler | "
|
||
f"{stats['batches']} Batches | "
|
||
f"{stats['duration_seconds']:.1f}s Gesamtdauer"
|
||
)
|
||
|
||
# ── Fehlerdetails als JSON speichern ──────────────────
|
||
if stats["error_details"]:
|
||
error_dir = LOG_DIR / "upload_errors"
|
||
error_dir.mkdir(parents=True, exist_ok=True)
|
||
error_file = (
|
||
error_dir / f"errors_{get_date_stamp()}.json"
|
||
)
|
||
with open(error_file, "w", encoding="utf-8") as f:
|
||
json.dump(
|
||
stats["error_details"],
|
||
f,
|
||
indent=2,
|
||
ensure_ascii=False,
|
||
)
|
||
logger.info(f"Fehlerdetails gespeichert: {error_file}")
|
||
|
||
return stats
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies ist ein konzeptionelles Beispiel-Skript. Die Batch-Größe, Rate-Limiting-Werte und API-Payloads müssen an Ihre spezifischen API-Quotas und Produktdatenstrukturen angepasst werden. Testen Sie den Upload zunächst mit einer kleinen Teilmenge (z. B. 10 Produkte) im Testmodus der Google Content API, bevor Sie den vollständigen Datensatz hochladen.
|
||
|
||
## 6.5 Hauptskript (main.py)
|
||
|
||
```python
|
||
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
/opt/etl-pipeline/scripts/main.py
|
||
|
||
Hauptskript (Entry Point) für die ETL-Pipeline.
|
||
Orchestriert den gesamten Ablauf: Extract → Transform → Load.
|
||
|
||
Aufruf:
|
||
cd /opt/etl-pipeline
|
||
source venv/bin/activate
|
||
python -m scripts.main
|
||
"""
|
||
|
||
import sys
|
||
import time
|
||
import duckdb
|
||
from pathlib import Path
|
||
|
||
from scripts.utils import (
|
||
setup_logging,
|
||
load_yaml_config,
|
||
BASE_DIR,
|
||
DATA_DIR,
|
||
get_timestamp,
|
||
)
|
||
from scripts.extract import (
|
||
load_jtl_csv_to_duckdb,
|
||
fetch_google_sales_data,
|
||
)
|
||
from scripts.transform import (
|
||
join_products_with_sales,
|
||
add_calculated_fields,
|
||
apply_rules,
|
||
)
|
||
from scripts.load import upload_products_to_google
|
||
|
||
logger = setup_logging("main")
|
||
|
||
|
||
def run_pipeline() -> dict:
|
||
"""
|
||
Führt die vollständige ETL-Pipeline aus.
|
||
|
||
Returns:
|
||
Dictionary mit Statistiken aller drei Phasen.
|
||
|
||
Raises:
|
||
SystemExit: Bei kritischen Fehlern mit Exit-Code 1.
|
||
"""
|
||
pipeline_start = time.time()
|
||
run_id = get_timestamp()
|
||
|
||
logger.info(f"{'═' * 60}")
|
||
logger.info(f"ETL-Pipeline gestartet — Run-ID: {run_id}")
|
||
logger.info(f"{'═' * 60}")
|
||
|
||
stats = {
|
||
"run_id": run_id,
|
||
"extract": {},
|
||
"transform": {},
|
||
"load": {},
|
||
}
|
||
|
||
# ══════════════════════════════════════════════════════
|
||
# DuckDB-Verbindung herstellen
|
||
# ══════════════════════════════════════════════════════
|
||
db_path = DATA_DIR / "db" / "products.duckdb"
|
||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
con = duckdb.connect(str(db_path))
|
||
logger.info(f"DuckDB-Verbindung hergestellt: {db_path}")
|
||
|
||
try:
|
||
# ══════════════════════════════════════════════════
|
||
# PHASE 1: EXTRACT
|
||
# ══════════════════════════════════════════════════
|
||
logger.info(f"{'─' * 40}")
|
||
logger.info("PHASE 1: EXTRACT")
|
||
logger.info(f"{'─' * 40}")
|
||
|
||
# JTL CSV laden
|
||
jtl_rows = load_jtl_csv_to_duckdb(con)
|
||
stats["extract"]["jtl_rows"] = jtl_rows
|
||
|
||
# Google Sales-Daten abrufen
|
||
sales_rows = fetch_google_sales_data(con)
|
||
stats["extract"]["sales_rows"] = sales_rows
|
||
|
||
# ══════════════════════════════════════════════════
|
||
# PHASE 2: TRANSFORM
|
||
# ══════════════════════════════════════════════════
|
||
logger.info(f"{'─' * 40}")
|
||
logger.info("PHASE 2: TRANSFORM")
|
||
logger.info(f"{'─' * 40}")
|
||
|
||
# LEFT JOIN: Produkte ⟕ Sales
|
||
df = join_products_with_sales(con)
|
||
|
||
# Berechnete Felder hinzufügen
|
||
df = add_calculated_fields(df)
|
||
|
||
# YAML-Regeln anwenden
|
||
rules_config = load_yaml_config("rules.yaml")
|
||
df = apply_rules(df, rules_config)
|
||
|
||
stats["transform"]["total_products"] = df.height
|
||
stats["transform"]["active_products"] = df.filter(
|
||
~df["_excluded"]
|
||
).height
|
||
stats["transform"]["excluded_products"] = df.filter(
|
||
df["_excluded"]
|
||
).height
|
||
|
||
# ══════════════════════════════════════════════════
|
||
# PHASE 3: LOAD
|
||
# ══════════════════════════════════════════════════
|
||
logger.info(f"{'─' * 40}")
|
||
logger.info("PHASE 3: LOAD")
|
||
logger.info(f"{'─' * 40}")
|
||
|
||
upload_stats = upload_products_to_google(df)
|
||
stats["load"] = upload_stats
|
||
|
||
except Exception as e:
|
||
logger.critical(
|
||
f"Pipeline fehlgeschlagen: {e}", exc_info=True
|
||
)
|
||
# Exit-Code 1 signalisiert n8n einen Fehler
|
||
sys.exit(1)
|
||
|
||
finally:
|
||
con.close()
|
||
logger.info("DuckDB-Verbindung geschlossen.")
|
||
|
||
# ══════════════════════════════════════════════════════
|
||
# Zusammenfassung
|
||
# ══════════════════════════════════════════════════════
|
||
pipeline_duration = time.time() - pipeline_start
|
||
stats["duration_seconds"] = round(pipeline_duration, 2)
|
||
|
||
logger.info(f"{'═' * 60}")
|
||
logger.info(
|
||
f"ETL-Pipeline abgeschlossen — "
|
||
f"Dauer: {pipeline_duration:.1f}s"
|
||
)
|
||
logger.info(
|
||
f" Extract: "
|
||
f"{stats['extract'].get('jtl_rows', 0):,} JTL, "
|
||
f"{stats['extract'].get('sales_rows', 0):,} Sales"
|
||
)
|
||
logger.info(
|
||
f" Transform: "
|
||
f"{stats['transform'].get('active_products', 0):,} aktiv, "
|
||
f"{stats['transform'].get('excluded_products', 0):,} "
|
||
f"ausgeschlossen"
|
||
)
|
||
logger.info(
|
||
f" Load: "
|
||
f"{stats['load'].get('success', 0):,} OK, "
|
||
f"{stats['load'].get('errors', 0):,} Fehler"
|
||
)
|
||
logger.info(f"{'═' * 60}")
|
||
|
||
return stats
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────
|
||
# Entry Point
|
||
# ──────────────────────────────────────────────────────────
|
||
if __name__ == "__main__":
|
||
run_pipeline()
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies ist ein konzeptionelles Beispiel-Skript. Es dient als Ausgangspunkt und Entry Point für die Pipeline. Vor dem produktiven Einsatz sollten alle Module gründlich mit Testdaten validiert und die Fehlerbehandlung an Ihre Anforderungen angepasst werden.
|
||
|
||
---
|
||
|
||
# 7. n8n Workflow Integration
|
||
|
||
## 7.1 Übersicht
|
||
|
||
Der n8n Workflow übernimmt zwei zentrale Aufgaben:
|
||
|
||
1. **Scheduling:** Tägliche Ausführung der Python-Pipeline um **00:00 Uhr** (Mitternacht).
|
||
2. **Error Handling:** Benachrichtigung per E-Mail (oder Slack/Telegram) bei Fehlern.
|
||
|
||
Die Kommunikation zwischen n8n (Docker-Container) und der Python-Pipeline auf dem Host-System erfolgt über den n8n-Node **Execute Command**, der Shell-Befehle im Container ausführt. Da das Pipeline-Verzeichnis als Volume gemountet ist, hat n8n Zugriff auf die Skripte.
|
||
|
||
## 7.2 Workflow-Architektur
|
||
|
||
```mermaid
|
||
graph TD
|
||
CRON[Cron Trigger - 00:00 Uhr]
|
||
EXEC[Execute Command - python -m scripts.main]
|
||
CHECK{Exit-Code pruefen}
|
||
SUCCESS[Erfolgs-Meldung - Log + E-Mail]
|
||
ERROR[Fehler-Alarm - E-Mail / Slack]
|
||
|
||
CRON --> EXEC
|
||
EXEC --> CHECK
|
||
CHECK -->|Exit 0 - OK| SUCCESS
|
||
CHECK -->|Exit 1 - Fehler| ERROR
|
||
|
||
style CRON fill:#e58e26,stroke:#fa983a,color:#fff
|
||
style EXEC fill:#0c2461,stroke:#1e3799,color:#fff
|
||
style CHECK fill:#474787,stroke:#706fd3,color:#fff
|
||
style SUCCESS fill:#079992,stroke:#38ada9,color:#fff
|
||
style ERROR fill:#b71540,stroke:#e55039,color:#fff
|
||
```
|
||
|
||
## 7.3 Workflow als n8n-JSON importieren
|
||
|
||
Der folgende JSON-Export kann direkt in n8n importiert werden unter **Workflow → Import from File**. Speichern Sie den Inhalt als `.json`-Datei.
|
||
|
||
```json
|
||
{
|
||
"name": "ETL Pipeline - Google Shopping (Daily)",
|
||
"nodes": [
|
||
{
|
||
"parameters": {
|
||
"rule": {
|
||
"interval": [
|
||
{
|
||
"triggerAtHour": 0,
|
||
"triggerAtMinute": 0
|
||
}
|
||
]
|
||
}
|
||
},
|
||
"name": "Cron Trigger (00:00)",
|
||
"type": "n8n-nodes-base.scheduleTrigger",
|
||
"typeVersion": 1.2,
|
||
"position": [240, 300]
|
||
},
|
||
{
|
||
"parameters": {
|
||
"command": "cd /opt/etl-pipeline && source venv/bin/activate && python -m scripts.main 2>&1",
|
||
"timeout": 3600
|
||
},
|
||
"name": "Execute ETL Pipeline",
|
||
"type": "n8n-nodes-base.executeCommand",
|
||
"typeVersion": 1,
|
||
"position": [480, 300]
|
||
},
|
||
{
|
||
"parameters": {
|
||
"conditions": {
|
||
"number": [
|
||
{
|
||
"value1": "={{ $json.exitCode }}",
|
||
"operation": "equal",
|
||
"value2": 0
|
||
}
|
||
]
|
||
}
|
||
},
|
||
"name": "Check Exit Code",
|
||
"type": "n8n-nodes-base.if",
|
||
"typeVersion": 1,
|
||
"position": [720, 300]
|
||
},
|
||
{
|
||
"parameters": {
|
||
"fromEmail": "etl-pipeline@ihre-domain.de",
|
||
"toEmail": "admin@ihre-domain.de",
|
||
"subject": "✅ ETL Pipeline — Erfolg ({{ $now.format('yyyy-MM-dd') }})",
|
||
"text": "Die ETL-Pipeline wurde erfolgreich ausgeführt.\n\nZeitstempel: {{ $now.format('yyyy-MM-dd HH:mm:ss') }}\n\nStdout:\n{{ $json.stdout }}"
|
||
},
|
||
"name": "Success E-Mail",
|
||
"type": "n8n-nodes-base.emailSend",
|
||
"typeVersion": 2,
|
||
"position": [960, 200]
|
||
},
|
||
{
|
||
"parameters": {
|
||
"fromEmail": "etl-pipeline@ihre-domain.de",
|
||
"toEmail": "admin@ihre-domain.de",
|
||
"subject": "❌ ETL Pipeline — FEHLER ({{ $now.format('yyyy-MM-dd') }})",
|
||
"text": "ACHTUNG: Die ETL-Pipeline ist fehlgeschlagen!\n\nZeitstempel: {{ $now.format('yyyy-MM-dd HH:mm:ss') }}\nExit-Code: {{ $json.exitCode }}\n\nStderr:\n{{ $json.stderr }}\n\nStdout:\n{{ $json.stdout }}\n\nBitte prüfen: /opt/etl-pipeline/logs/error.log"
|
||
},
|
||
"name": "Error E-Mail",
|
||
"type": "n8n-nodes-base.emailSend",
|
||
"typeVersion": 2,
|
||
"position": [960, 400]
|
||
}
|
||
],
|
||
"connections": {
|
||
"Cron Trigger (00:00)": {
|
||
"main": [
|
||
[{ "node": "Execute ETL Pipeline", "type": "main", "index": 0 }]
|
||
]
|
||
},
|
||
"Execute ETL Pipeline": {
|
||
"main": [
|
||
[{ "node": "Check Exit Code", "type": "main", "index": 0 }]
|
||
]
|
||
},
|
||
"Check Exit Code": {
|
||
"main": [
|
||
[{ "node": "Success E-Mail", "type": "main", "index": 0 }],
|
||
[{ "node": "Error E-Mail", "type": "main", "index": 0 }]
|
||
]
|
||
}
|
||
},
|
||
"settings": {
|
||
"timezone": "Europe/Berlin",
|
||
"saveExecutionProgress": true,
|
||
"saveManualExecutions": true,
|
||
"executionTimeout": 7200
|
||
}
|
||
}
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies ist ein Beispiel-Workflow für n8n. Die E-Mail-Adressen, SMTP-Einstellungen und Timeouts müssen an Ihre Umgebung angepasst werden. Konfigurieren Sie die SMTP-Credentials in n8n unter **Settings → Credentials** bevor Sie den Workflow aktivieren.
|
||
|
||
## 7.4 Wichtige n8n-Konfigurationshinweise
|
||
|
||
### Timeout-Einstellung
|
||
|
||
Da die Verarbeitung von 200.000+ Artikeln je nach Serverhardware zwischen **5 und 30 Minuten** dauern kann, muss der Timeout im `Execute Command`-Node ausreichend hoch gesetzt werden:
|
||
|
||
```
|
||
timeout: 3600 # 1 Stunde (in Sekunden)
|
||
```
|
||
|
||
### Volume-Mount beachten
|
||
|
||
Der `Execute Command`-Node führt Befehle **innerhalb des n8n Docker-Containers** aus. Damit die Python-Skripte und das Virtual Environment erreichbar sind, muss der Pfad `/opt/etl-pipeline` als Volume im `docker-compose.yml` gemountet sein (siehe Abschnitt 3.1.2).
|
||
|
||
### Alternative: Host-Befehl über SSH
|
||
|
||
Falls die Pipeline nicht innerhalb des Containers laufen soll, kann alternativ ein SSH-Node verwendet werden, der den Befehl auf dem Host ausführt:
|
||
|
||
```bash
|
||
# Befehl für n8n SSH-Node:
|
||
ssh pipeline-user@localhost \
|
||
"cd /opt/etl-pipeline && source venv/bin/activate && python -m scripts.main"
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Beispielbefehl. Für die SSH-Variante muss ein SSH-Key ohne Passphrase konfiguriert und die SSH-Credentials in n8n hinterlegt werden.
|
||
|
||
## 7.5 Manueller Pipeline-Start (Debugging)
|
||
|
||
Für Debugging und Tests kann die Pipeline jederzeit manuell gestartet werden:
|
||
|
||
```bash
|
||
# ──────────────────────────────────────────────────────────
|
||
# Pipeline manuell ausführen
|
||
# ──────────────────────────────────────────────────────────
|
||
cd /opt/etl-pipeline
|
||
source venv/bin/activate
|
||
python -m scripts.main
|
||
|
||
# Alternativ: Nur den Exit-Code prüfen
|
||
python -m scripts.main; echo "Exit-Code: $?"
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Beispielbefehle für den manuellen Start. Stellen Sie sicher, dass die Umgebungsvariablen (`GOOGLE_APPLICATION_CREDENTIALS`, `MERCHANT_CENTER_ID`) in der Shell-Session gesetzt sind.
|
||
|
||
---
|
||
|
||
# 8. Wartung & Monitoring
|
||
|
||
## 8.1 Log-Management
|
||
|
||
### 8.1.1 Log-Struktur
|
||
|
||
Die Pipeline erzeugt folgende Log-Dateien:
|
||
|
||
| Datei | Inhalt | Rotation |
|
||
|----------------------------------------------|--------------------------------------|----------------------|
|
||
| `logs/etl_pipeline.log` | Alle Log-Einträge (INFO+) | 10 MB, 5 Backups |
|
||
| `logs/error.log` | Nur Fehler (ERROR+) | 10 MB, 5 Backups |
|
||
| `logs/upload_errors/errors_YYYYMMDD.json` | Detail-Fehler pro Upload-Lauf | Täglich, eine Datei |
|
||
|
||
### 8.1.2 Logrotate-Konfiguration
|
||
|
||
Zusätzlich zur integrierten Python-Rotation empfiehlt sich eine systemweite Logrotate-Konfiguration als Sicherheitsnetz.
|
||
|
||
Erstellen Sie die Datei `/etc/logrotate.d/etl-pipeline`:
|
||
|
||
```
|
||
/opt/etl-pipeline/logs/*.log {
|
||
daily
|
||
missingok
|
||
rotate 30
|
||
compress
|
||
delaycompress
|
||
notifempty
|
||
create 0640 root root
|
||
sharedscripts
|
||
postrotate
|
||
# Optional: Signal an laufende Prozesse
|
||
/bin/true
|
||
endscript
|
||
}
|
||
|
||
/opt/etl-pipeline/logs/upload_errors/*.json {
|
||
daily
|
||
missingok
|
||
rotate 90
|
||
compress
|
||
delaycompress
|
||
notifempty
|
||
create 0640 root root
|
||
}
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies ist eine Beispiel-Logrotate-Konfiguration. Passen Sie den Benutzer, die Aufbewahrungsdauer (`rotate`) und die Dateiberechtigungen an Ihre Sicherheitsrichtlinien an.
|
||
|
||
### 8.1.3 Log-Bereinigung (Cron)
|
||
|
||
Alte Log-Dateien und Upload-Fehlerprotokolle können zusätzlich per Cron bereinigt werden:
|
||
|
||
```bash
|
||
# ──────────────────────────────────────────────────────────
|
||
# Crontab-Eintrag: Logs älter als 90 Tage löschen
|
||
# ──────────────────────────────────────────────────────────
|
||
# crontab -e
|
||
0 3 * * 0 find /opt/etl-pipeline/logs/ -name "*.log.*" -mtime +90 -delete
|
||
0 3 * * 0 find /opt/etl-pipeline/logs/upload_errors/ -name "*.json" -mtime +90 -delete
|
||
0 3 * * 0 find /opt/etl-pipeline/data/archive/ -name "*.csv" -mtime +180 -delete
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Beispiel-Crontab-Einträge. Passen Sie die Aufbewahrungsfristen (hier: 90 bzw. 180 Tage) an Ihre Compliance-Anforderungen an.
|
||
|
||
## 8.2 Performance-Tipps für 200.000+ Artikel
|
||
|
||
### 8.2.1 DuckDB-Optimierungen
|
||
|
||
```python
|
||
# ──────────────────────────────────────────────────────────
|
||
# DuckDB Performance-Konfiguration
|
||
# (am Anfang der Pipeline setzen)
|
||
# ──────────────────────────────────────────────────────────
|
||
|
||
con = duckdb.connect("products.duckdb")
|
||
|
||
# Verfügbaren Arbeitsspeicher für DuckDB erhöhen
|
||
con.execute("SET memory_limit = '4GB'")
|
||
|
||
# Alle verfügbaren CPU-Kerne nutzen
|
||
con.execute("SET threads TO 8")
|
||
|
||
# Temporäre Dateien auf schnelle SSD legen
|
||
con.execute("SET temp_directory = '/opt/etl-pipeline/data/tmp'")
|
||
|
||
# Fortschrittsanzeige für lange Queries
|
||
con.execute("SET enable_progress_bar = true")
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Beispiel-Konfiguration. Die Werte für `memory_limit` und `threads` müssen an die tatsächliche Serverausstattung angepasst werden.
|
||
|
||
### 8.2.2 Polars-Optimierungen
|
||
|
||
```python
|
||
# ──────────────────────────────────────────────────────────
|
||
# Polars Performance-Tipps
|
||
# ──────────────────────────────────────────────────────────
|
||
|
||
import polars as pl
|
||
|
||
# 1. Lazy Evaluation verwenden (bei sehr großen Datensätzen)
|
||
# Polars optimiert den Query-Plan automatisch
|
||
df_lazy = pl.scan_csv(
|
||
"data/input/jtl_export.csv", separator=";"
|
||
)
|
||
df_result = (
|
||
df_lazy
|
||
.filter(pl.col("Lagerbestand") > 0)
|
||
.with_columns([
|
||
pl.col("VK_Brutto").cast(pl.Float64),
|
||
])
|
||
.collect() # Erst hier werden die Daten materialisiert
|
||
)
|
||
|
||
# 2. Streaming-Modus für Datensätze > verfügbares RAM
|
||
df_result = (
|
||
df_lazy
|
||
.filter(pl.col("Lagerbestand") > 0)
|
||
.collect(streaming=True) # Verarbeitet in Chunks
|
||
)
|
||
|
||
# 3. Datentypen optimieren (reduziert RAM-Verbrauch)
|
||
df = df.with_columns([
|
||
pl.col("Lagerbestand").cast(pl.Int32), # statt Int64
|
||
pl.col("clicks").cast(pl.Int32), # statt Int64
|
||
pl.col("impressions").cast(pl.Int32), # statt Int64
|
||
pl.col("custom_label_0").cast(pl.Categorical), # statt Utf8
|
||
pl.col("custom_label_1").cast(pl.Categorical), # statt Utf8
|
||
])
|
||
|
||
# 4. Spaltenauswahl: Nur benötigte Spalten laden
|
||
df = pl.read_csv(
|
||
"data.csv",
|
||
columns=["sku", "title", "price", "stock"],
|
||
)
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies sind Beispiel-Codeausschnitte zur Performance-Optimierung. Die tatsächlichen Spaltennamen und Datentypen müssen an Ihren Datensatz angepasst werden.
|
||
|
||
### 8.2.3 Google API Upload-Optimierungen
|
||
|
||
| Parameter | Empfehlung | Begründung |
|
||
|--------------------------|--------------------------|--------------------------------------------------|
|
||
| `BATCH_SIZE` | 1.000 | Google empfiehlt max. 1.000 Einträge pro Batch |
|
||
| `RATE_LIMIT_DELAY` | 1,0 – 2,0 Sekunden | Verhindert 429-Fehler bei Standard-Quotas |
|
||
| `MAX_RETRIES` | 5 | Ausreichend für temporäre Fehler |
|
||
| Parallele Requests | Nicht empfohlen | Erhöht 429-Risiko, sequenziell ist sicherer |
|
||
| Geschätzte Upload-Dauer | 10 – 20 Minuten | Bei 200 Batches × 1s Delay + API-Latenz |
|
||
|
||
### 8.2.4 Allgemeine Server-Monitoring-Befehle
|
||
|
||
```bash
|
||
# ──────────────────────────────────────────────────────────
|
||
# System-Monitoring während Pipeline-Lauf
|
||
# ──────────────────────────────────────────────────────────
|
||
|
||
# RAM- und CPU-Auslastung in Echtzeit beobachten
|
||
htop
|
||
|
||
# Festplatten-I/O überwachen
|
||
iostat -x 5
|
||
|
||
# Pipeline-Prozess gezielt überwachen
|
||
watch -n 2 "ps aux | grep 'python.*scripts.main' | grep -v grep"
|
||
|
||
# Festplattennutzung des Pipeline-Verzeichnisses
|
||
du -sh /opt/etl-pipeline/data/*
|
||
|
||
# DuckDB-Dateigröße prüfen
|
||
ls -lh /opt/etl-pipeline/data/db/products.duckdb
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Beispielbefehle für das System-Monitoring. Diese dienen zur Orientierung und sollten je nach Ihren Monitoring-Tools (z. B. Prometheus, Grafana, Zabbix) ergänzt oder ersetzt werden.
|
||
|
||
## 8.3 Backup-Empfehlungen
|
||
|
||
Die folgenden Dateien und Verzeichnisse sollten regelmäßig gesichert werden:
|
||
|
||
| Pfad | Priorität | Frequenz |
|
||
|------------------------------------------------|-----------|----------------|
|
||
| `/opt/etl-pipeline/config/` | Hoch | Bei Änderung |
|
||
| `/opt/etl-pipeline/credentials/` | Kritisch | Bei Änderung |
|
||
| `/opt/etl-pipeline/scripts/` | Hoch | Bei Änderung |
|
||
| `/opt/etl-pipeline/data/db/products.duckdb` | Mittel | Täglich |
|
||
| `/opt/n8n/data/` | Hoch | Wöchentlich |
|
||
|
||
```bash
|
||
#!/bin/bash
|
||
# ──────────────────────────────────────────────────────────
|
||
# Einfaches Backup-Skript (Beispiel)
|
||
# ──────────────────────────────────────────────────────────
|
||
BACKUP_DIR="/backup/etl-pipeline/$(date +%Y%m%d)"
|
||
mkdir -p "$BACKUP_DIR"
|
||
|
||
# Konfiguration und Skripte sichern
|
||
tar czf "$BACKUP_DIR/config_scripts.tar.gz" \
|
||
/opt/etl-pipeline/config/ \
|
||
/opt/etl-pipeline/scripts/ \
|
||
/opt/etl-pipeline/requirements.txt
|
||
|
||
# DuckDB sichern
|
||
cp /opt/etl-pipeline/data/db/products.duckdb \
|
||
"$BACKUP_DIR/products.duckdb"
|
||
|
||
# n8n Workflows sichern
|
||
tar czf "$BACKUP_DIR/n8n_data.tar.gz" /opt/n8n/data/
|
||
|
||
echo "Backup erstellt: $BACKUP_DIR"
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Dies ist ein vereinfachtes Beispiel-Backup-Skript. Für den Produktionseinsatz sollte eine vollwertige Backup-Lösung (z. B. BorgBackup, Restic) mit Verschlüsselung und Off-Site-Speicherung verwendet werden.
|
||
|
||
## 8.4 Checkliste für den Produktivbetrieb
|
||
|
||
Vor der Aktivierung der automatisierten Pipeline sollten folgende Punkte abgearbeitet sein:
|
||
|
||
- [ ] Python Virtual Environment erstellt und alle Abhängigkeiten installiert
|
||
- [ ] DuckDB-Datenbank initialisiert und CSV-Import getestet
|
||
- [ ] Google Service Account erstellt und in Merchant Center eingeladen
|
||
- [ ] `GOOGLE_APPLICATION_CREDENTIALS` und `MERCHANT_CENTER_ID` als Umgebungsvariablen gesetzt
|
||
- [ ] `rules.yaml` mit initialen Regeln konfiguriert
|
||
- [ ] `column_mapping.yaml` an JTL-Export-Spalten angepasst
|
||
- [ ] Pipeline manuell mit Testdaten durchgelaufen (alle 3 Phasen)
|
||
- [ ] n8n Workflow importiert und Cron-Trigger konfiguriert
|
||
- [ ] E-Mail-/Slack-Benachrichtigung bei Fehlern getestet
|
||
- [ ] Logrotate-Konfiguration aktiv
|
||
- [ ] Backup-Strategie implementiert
|
||
- [ ] Firewall-Regeln: Ausgehender Traffic zu Google APIs erlaubt
|
||
- [ ] Monitoring-Dashboard eingerichtet (optional)
|
||
|
||
---
|
||
|
||
# Anhang
|
||
|
||
## A. Nützliche Befehle (Kurzreferenz)
|
||
|
||
```bash
|
||
# ── Pipeline ──────────────────────────────────────────────
|
||
cd /opt/etl-pipeline && source venv/bin/activate
|
||
python -m scripts.main # Pipeline starten
|
||
python -m scripts.main 2>&1 | tee run.log # Mit Log-Ausgabe
|
||
|
||
# ── n8n ───────────────────────────────────────────────────
|
||
cd /opt/n8n
|
||
docker compose up -d # Starten
|
||
docker compose down # Stoppen
|
||
docker compose logs -f --tail=100 # Logs anzeigen
|
||
docker compose restart # Neustart
|
||
|
||
# ── DuckDB CLI ────────────────────────────────────────────
|
||
cd /opt/etl-pipeline && source venv/bin/activate
|
||
python -c "
|
||
import duckdb
|
||
con = duckdb.connect('data/db/products.duckdb')
|
||
print(con.execute('SHOW TABLES').fetchall())
|
||
print(con.execute('SELECT COUNT(*) FROM products_raw').fetchone())
|
||
con.close()
|
||
"
|
||
|
||
# ── Logs prüfen ──────────────────────────────────────────
|
||
tail -f /opt/etl-pipeline/logs/etl_pipeline.log
|
||
tail -100 /opt/etl-pipeline/logs/error.log
|
||
cat /opt/etl-pipeline/logs/upload_errors/errors_$(date +%Y%m%d).json | jq .
|
||
|
||
# ── Disk-Usage ────────────────────────────────────────────
|
||
du -sh /opt/etl-pipeline/data/db/*
|
||
du -sh /opt/etl-pipeline/logs/*
|
||
df -h /opt/etl-pipeline/
|
||
```
|
||
|
||
> ⚠️ **Hinweis:** Beispiel-Befehlsreferenz. Alle Pfade und Befehle müssen an Ihre Serverumgebung angepasst werden.
|
||
|
||
## B. Fehlerbehandlung (Häufige Probleme)
|
||
|
||
| Problem | Mögliche Ursache | Lösung |
|
||
|------------------------------------------------------|------------------------------------------|--------------------------------------------------------|
|
||
| `FileNotFoundError: jtl_export.csv` | CSV nicht am erwarteten Pfad | JTL-Export-Pfad in `settings.yaml` prüfen |
|
||
| `google.auth.exceptions.DefaultCredentialsError` | Credentials nicht gefunden | `GOOGLE_APPLICATION_CREDENTIALS` prüfen |
|
||
| `HttpError 429: Too Many Requests` | API-Quote überschritten | `RATE_LIMIT_DELAY` erhöhen (z. B. auf 2–3s) |
|
||
| `MemoryError` bei Polars | RAM nicht ausreichend | Polars Streaming-Modus oder RAM aufstocken |
|
||
| n8n: Timeout nach 600s | Pipeline dauert zu lang | Timeout im Execute Command auf 3600s erhöhen |
|
||
| DuckDB: `IO Error` | Festplatte voll | Archiv- und Temp-Dateien bereinigen |
|
||
| `ModuleNotFoundError` | venv nicht aktiviert | `source venv/bin/activate` vor Ausführung |
|
||
|
||
---
|
||
|
||
*Dokumentation erstellt am 2026-03-06 | Version 1.0.0*
|
||
*Letzte Aktualisierung: 2026-03-06*
|