Snowflake-connector
Verbind Snowflake met Brevo via Tajo om klantsegmenten uit je data warehouse te synchroniseren, contactprofielen te verrijken met analyticsdata en gepersonaliseerde marketingcampagnes aan te sturen met warehouse-gedreven inzichten.
Overzicht
| Eigenschap | Waarde |
|---|---|
| Platform | Snowflake |
| Categorie | Data Warehouse (Custom) |
| Setupcomplexiteit | Gemiddeld |
| Officiële integratie | Nee |
| Gesynchroniseerde data | Klanten, Segmenten, Analytics, Events |
| Auth-methode | Key Pair / OAuth 2.0 |
Functies
- Reverse ETL - Push klantsegmenten vanuit Snowflake naar Brevo-contactlijsten
- Audience-synchronisatie - Synchroniseer warehouse-berekende audiences voor gerichte campagnes
- Analytics-verrijking - Verrijk Brevo-contacten met berekende metrics (LTV, RFM-scores)
- SQL-gebaseerde queries - Gebruik de Snowflake SQL REST API om queries programmatisch uit te voeren
- Geplande synchronisatie - Voer geautomatiseerde data-pipelines uit op configureerbare intervallen
- Multi-statement-ondersteuning - Voer complexe datatransformaties uit in één API-call
Vereisten
Voordat je begint, zorg dat je beschikt over:
- Een Snowflake-account met de rol ACCOUNTADMIN of SYSADMIN
- Een Brevo-account met API-toegang
- Een Tajo-account met connector-rechten
- Een dedicated Snowflake-warehouse voor integratiequeries
- Network policy die Tajo IP-adressen toestaat
Authenticatie
Key Pair-authenticatie (aanbevolen)
# Generate RSA key pairopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user# In Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';OAuth 2.0-authenticatie
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });SQL API-authenticatie
# Using JWT token with the SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'Configuratie
Basisinstelling
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Every 6 hours
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncVeldmapping
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEAPI-endpoints
| Endpoint | Method | Beschrijving |
|---|---|---|
/api/v2/statements | POST | SQL-statements indienen voor uitvoering |
/api/v2/statements/{statementHandle} | GET | Uitvoeringsstatus controleren |
/api/v2/statements/{statementHandle}/cancel | POST | Een lopend statement annuleren |
/api/v2/statements/{statementHandle}?partition={id} | GET | Resultaatpartities ophalen |
SQL API-partities
De Snowflake SQL API retourneert grote resultaatsets in partities. Elke partitie bevat tot ongeveer 12MB aan data. Gebruik de partition-parameter om door resultaten te itereren.
Codevoorbeelden
Connector initialiseren
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});Klantsegmenten synchroniseren via SQL API
// Execute a SQL query via Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Poll for resultslet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Sync to Brevo via Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Reverse ETL-pipeline
// Push computed audiences from Snowflake to Brevo listsawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Rate limits
| Resource | Limiet | Opmerkingen |
|---|---|---|
| Gelijktijdige SQL API-queries | 20 per gebruiker | Per Snowflake-account |
| SQL API-resultaatgrootte | 12MB per partitie | Pagineer met partition-ID’s |
| Statement-timeout | 172.800 sec (48u) | Configureerbaar per query |
| API-requests | Variabel per plan | Afhankelijk van Snowflake-editie |
Warehouse-kosten
Snowflake rekent op basis van rekentijd. Gebruik een dedicated, passend gedimensioneerd warehouse voor Tajo-queries en stel auto-suspend in om kosten te minimaliseren.
Probleemoplossing
| Probleem | Oorzaak | Oplossing |
|---|---|---|
| Authenticatie mislukt | Verlopen JWT-token | Genereer nieuwe JWT met geldige expiratie |
| Query-timeout | Grote dataset | Voeg filters toe of gebruik incrementele synchronisatie |
| Netwerkfout | IP niet whitelisted | Voeg Tajo-IP’s toe aan Snowflake network policy |
| Ontbrekende kolommen | Schema-wijziging | Werk veldmapping-configuratie bij |
| Partitiefout | Resultaat te groot | Verwerk resultaten in kleinere partities |
Debugmodus
connectors: snowflake: debug: true log_level: verbose log_queries: trueBest practices
- Gebruik een dedicated warehouse - Voorkom contentie met productieworkloads
- Implementeer incrementele synchronisatie - Vraag alleen gewijzigde records sinds laatste synchronisatie op
- Stel auto-suspend in - Configureer warehouse om na 5 minuten inactiviteit te suspenden
- Gebruik key pair-auth - Geef voorkeur aan key pair boven wachtwoordauthenticatie
- Optimaliseer queries - Filter en projecteer alleen benodigde kolommen voor snellere syncs
- Monitor credits - Volg het verbruik van Snowflake-credits voor integratiequeries
Beveiliging
- Key pair-authenticatie - RSA 2048-bit-encryptie voor API-toegang
- OAuth 2.0 - Token-gebaseerde authenticatie met rolscoping
- Network policies - IP-allowlisting voor Tajo-service-endpoints
- Rolgebaseerde toegang - Dedicated Snowflake-rol met minimaal vereiste rechten
- Versleutelde dataoverdracht - TLS 1.2+ voor alle API-communicatie
- Data masking - Gebruik Snowflake dynamic data masking voor gevoelige velden