Snowflake Connector
Verbinde Snowflake über Tajo mit Brevo, um Kund:innen-Segmente aus deinem Data Warehouse zu synchronisieren, Kontaktprofile mit Analytics-Daten anzureichern und personalisierte Marketingkampagnen mit Warehouse-gestützten Insights umzusetzen.
Überblick
| Eigenschaft | Wert |
|---|---|
| Plattform | Snowflake |
| Kategorie | Data Warehouse (Custom) |
| Einrichtungsaufwand | Mittel |
| Offizielle Integration | Nein |
| Synchronisierte Daten | Kund:innen, Segmente, Analytics, Events |
| Authentifizierung | Key Pair / OAuth 2.0 |
Funktionen
- Reverse ETL - Übertrage Kund:innen-Segmente aus Snowflake in Brevo-Kontaktlisten
- Audience-Synchronisation - Synchronisiere Warehouse-berechnete Audiences für zielgerichtete Kampagnen
- Analytics-Anreicherung - Reichere Brevo-Kontakte mit berechneten Metriken an (LTV, RFM-Scores)
- SQL-basierte Queries - Nutze die Snowflake SQL REST API, um Queries programmatisch auszuführen
- Geplante Synchronisation - Automatisierte Daten-Pipelines in konfigurierbaren Intervallen
- Multi-Statement-Support - Führe komplexe Datentransformationen in einzelnen API-Aufrufen aus
Voraussetzungen
Bevor du beginnst, stelle sicher, dass du Folgendes hast:
- Ein Snowflake-Konto mit der Rolle ACCOUNTADMIN oder SYSADMIN
- Ein Brevo-Konto mit API-Zugriff
- Ein Tajo-Konto mit Connector-Berechtigungen
- Ein dediziertes Snowflake-Warehouse für Integrations-Queries
- Eine Network Policy, die Tajo-IP-Adressen zulässt
Authentifizierung
Key-Pair-Authentifizierung (empfohlen)
# Generate RSA key pairopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user# In Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';OAuth-2.0-Authentifizierung
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });SQL-API-Authentifizierung
# Using JWT token with the SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'Konfiguration
Grundeinrichtung
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Every 6 hours
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncFeld-Mapping
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEAPI-Endpoints
| Endpoint | Methode | Beschreibung |
|---|---|---|
/api/v2/statements | POST | SQL-Statements zur Ausführung übermitteln |
/api/v2/statements/{statementHandle} | GET | Ausführungsstatus prüfen |
/api/v2/statements/{statementHandle}/cancel | POST | Laufendes Statement abbrechen |
/api/v2/statements/{statementHandle}?partition={id} | GET | Ergebnis-Partitionen abrufen |
SQL-API-Partitionen
Die Snowflake SQL API liefert große Ergebnismengen in Partitionen. Jede Partition enthält bis zu ca. 12 MB an Daten. Nutze den Partition-Parameter, um durch Ergebnisse zu iterieren.
Code-Beispiele
Connector initialisieren
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});Kund:innen-Segmente via SQL-API synchronisieren
// Execute a SQL query via Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Poll for resultslet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Sync to Brevo via Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Reverse-ETL-Pipeline
// Push computed audiences from Snowflake to Brevo listsawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Rate Limits
| Ressource | Limit | Hinweise |
|---|---|---|
| SQL-API gleichzeitige Queries | 20 pro User | Pro Snowflake-Konto |
| SQL-API Ergebnisgröße | 12 MB pro Partition | Paginierung über Partition-IDs |
| Statement-Timeout | 172.800 Sek. (48 Std.) | Pro Query konfigurierbar |
| API-Anfragen | Variabel je Tarif | Abhängig von der Snowflake-Edition |
Warehouse-Kosten
Snowflake rechnet nach Compute-Zeit ab. Nutze ein dediziertes, passend dimensioniertes Warehouse für Tajo-Queries und setze Auto-Suspend, um Kosten zu minimieren.
Fehlerbehebung
| Problem | Ursache | Lösung |
|---|---|---|
| Authentifizierung fehlgeschlagen | Abgelaufener JWT-Token | JWT mit gültiger Ablaufzeit neu generieren |
| Query-Timeout | Großer Datensatz | Filter hinzufügen oder inkrementelle Synchronisation nutzen |
| Netzwerkfehler | IP nicht allowlisted | Tajo-IPs zur Snowflake Network Policy hinzufügen |
| Fehlende Spalten | Schema-Änderung | Feld-Mapping-Konfiguration aktualisieren |
| Partitions-Fehler | Ergebnis zu groß | Ergebnisse in kleineren Partitionen verarbeiten |
Debug-Modus
connectors: snowflake: debug: true log_level: verbose log_queries: trueBest Practices
- Dediziertes Warehouse nutzen - Vermeide Konkurrenz mit Produktiv-Workloads
- Inkrementelle Synchronisation - Frage nur Datensätze ab, die sich seit der letzten Synchronisation geändert haben
- Auto-Suspend setzen - Konfiguriere das Warehouse so, dass es nach 5 Minuten Inaktivität suspendiert
- Key-Pair-Auth nutzen - Bevorzuge Key Pair gegenüber Passwort-Authentifizierung
- Queries optimieren - Filtere und projiziere nur benötigte Spalten für schnellere Syncs
- Credits überwachen - Verfolge den Snowflake-Credit-Verbrauch für Integrations-Queries
Sicherheit
- Key-Pair-Authentifizierung - RSA-2048-Bit-Verschlüsselung für API-Zugriff
- OAuth 2.0 - Token-basierte Authentifizierung mit Rollen-Scoping
- Network Policies - IP-Allowlisting für Tajo-Service-Endpoints
- Rollenbasierter Zugriff - Dedizierte Snowflake-Rolle mit minimal nötigen Rechten
- Verschlüsselte Datenübertragung - TLS 1.2+ für alle API-Kommunikationen
- Data Masking - Nutze dynamisches Snowflake-Data-Masking für sensible Felder