Snowflake Connector

Verbinde Snowflake über Tajo mit Brevo, um Kund:innen-Segmente aus deinem Data Warehouse zu synchronisieren, Kontaktprofile mit Analytics-Daten anzureichern und personalisierte Marketingkampagnen mit Warehouse-gestützten Insights umzusetzen.

Überblick

EigenschaftWert
PlattformSnowflake
KategorieData Warehouse (Custom)
EinrichtungsaufwandMittel
Offizielle IntegrationNein
Synchronisierte DatenKund:innen, Segmente, Analytics, Events
AuthentifizierungKey Pair / OAuth 2.0

Funktionen

  • Reverse ETL - Übertrage Kund:innen-Segmente aus Snowflake in Brevo-Kontaktlisten
  • Audience-Synchronisation - Synchronisiere Warehouse-berechnete Audiences für zielgerichtete Kampagnen
  • Analytics-Anreicherung - Reichere Brevo-Kontakte mit berechneten Metriken an (LTV, RFM-Scores)
  • SQL-basierte Queries - Nutze die Snowflake SQL REST API, um Queries programmatisch auszuführen
  • Geplante Synchronisation - Automatisierte Daten-Pipelines in konfigurierbaren Intervallen
  • Multi-Statement-Support - Führe komplexe Datentransformationen in einzelnen API-Aufrufen aus

Voraussetzungen

Bevor du beginnst, stelle sicher, dass du Folgendes hast:

  1. Ein Snowflake-Konto mit der Rolle ACCOUNTADMIN oder SYSADMIN
  2. Ein Brevo-Konto mit API-Zugriff
  3. Ein Tajo-Konto mit Connector-Berechtigungen
  4. Ein dediziertes Snowflake-Warehouse für Integrations-Queries
  5. Eine Network Policy, die Tajo-IP-Adressen zulässt

Authentifizierung

Key-Pair-Authentifizierung (empfohlen)

Terminal window
# Generate RSA key pair
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user
# In Snowflake:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

OAuth-2.0-Authentifizierung

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

SQL-API-Authentifizierung

Terminal window
# Using JWT token with the SQL API
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

Konfiguration

Grundeinrichtung

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # Every 6 hours
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

Feld-Mapping

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

API-Endpoints

EndpointMethodeBeschreibung
/api/v2/statementsPOSTSQL-Statements zur Ausführung übermitteln
/api/v2/statements/{statementHandle}GETAusführungsstatus prüfen
/api/v2/statements/{statementHandle}/cancelPOSTLaufendes Statement abbrechen
/api/v2/statements/{statementHandle}?partition={id}GETErgebnis-Partitionen abrufen

SQL-API-Partitionen

Die Snowflake SQL API liefert große Ergebnismengen in Partitionen. Jede Partition enthält bis zu ca. 12 MB an Daten. Nutze den Partition-Parameter, um durch Ergebnisse zu iterieren.

Code-Beispiele

Connector initialisieren

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

Kund:innen-Segmente via SQL-API synchronisieren

// Execute a SQL query via Snowflake SQL REST API
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// Poll for results
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Sync to Brevo via Tajo
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

Reverse-ETL-Pipeline

// Push computed audiences from Snowflake to Brevo lists
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

Rate Limits

RessourceLimitHinweise
SQL-API gleichzeitige Queries20 pro UserPro Snowflake-Konto
SQL-API Ergebnisgröße12 MB pro PartitionPaginierung über Partition-IDs
Statement-Timeout172.800 Sek. (48 Std.)Pro Query konfigurierbar
API-AnfragenVariabel je TarifAbhängig von der Snowflake-Edition

Warehouse-Kosten

Snowflake rechnet nach Compute-Zeit ab. Nutze ein dediziertes, passend dimensioniertes Warehouse für Tajo-Queries und setze Auto-Suspend, um Kosten zu minimieren.

Fehlerbehebung

ProblemUrsacheLösung
Authentifizierung fehlgeschlagenAbgelaufener JWT-TokenJWT mit gültiger Ablaufzeit neu generieren
Query-TimeoutGroßer DatensatzFilter hinzufügen oder inkrementelle Synchronisation nutzen
NetzwerkfehlerIP nicht allowlistedTajo-IPs zur Snowflake Network Policy hinzufügen
Fehlende SpaltenSchema-ÄnderungFeld-Mapping-Konfiguration aktualisieren
Partitions-FehlerErgebnis zu großErgebnisse in kleineren Partitionen verarbeiten

Debug-Modus

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

Best Practices

  1. Dediziertes Warehouse nutzen - Vermeide Konkurrenz mit Produktiv-Workloads
  2. Inkrementelle Synchronisation - Frage nur Datensätze ab, die sich seit der letzten Synchronisation geändert haben
  3. Auto-Suspend setzen - Konfiguriere das Warehouse so, dass es nach 5 Minuten Inaktivität suspendiert
  4. Key-Pair-Auth nutzen - Bevorzuge Key Pair gegenüber Passwort-Authentifizierung
  5. Queries optimieren - Filtere und projiziere nur benötigte Spalten für schnellere Syncs
  6. Credits überwachen - Verfolge den Snowflake-Credit-Verbrauch für Integrations-Queries

Sicherheit

  • Key-Pair-Authentifizierung - RSA-2048-Bit-Verschlüsselung für API-Zugriff
  • OAuth 2.0 - Token-basierte Authentifizierung mit Rollen-Scoping
  • Network Policies - IP-Allowlisting für Tajo-Service-Endpoints
  • Rollenbasierter Zugriff - Dedizierte Snowflake-Rolle mit minimal nötigen Rechten
  • Verschlüsselte Datenübertragung - TLS 1.2+ für alle API-Kommunikationen
  • Data Masking - Nutze dynamisches Snowflake-Data-Masking für sensible Felder

Verwandte Ressourcen

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
AI-Assistent

Hallo! Fragen Sie mich alles über die Dokumentation.