Snowflake-connector

Verbind Snowflake met Brevo via Tajo om klantsegmenten uit je data warehouse te synchroniseren, contactprofielen te verrijken met analyticsdata en gepersonaliseerde marketingcampagnes aan te sturen met warehouse-gedreven inzichten.

Overzicht

EigenschapWaarde
PlatformSnowflake
CategorieData Warehouse (Custom)
SetupcomplexiteitGemiddeld
Officiële integratieNee
Gesynchroniseerde dataKlanten, Segmenten, Analytics, Events
Auth-methodeKey Pair / OAuth 2.0

Functies

  • Reverse ETL - Push klantsegmenten vanuit Snowflake naar Brevo-contactlijsten
  • Audience-synchronisatie - Synchroniseer warehouse-berekende audiences voor gerichte campagnes
  • Analytics-verrijking - Verrijk Brevo-contacten met berekende metrics (LTV, RFM-scores)
  • SQL-gebaseerde queries - Gebruik de Snowflake SQL REST API om queries programmatisch uit te voeren
  • Geplande synchronisatie - Voer geautomatiseerde data-pipelines uit op configureerbare intervallen
  • Multi-statement-ondersteuning - Voer complexe datatransformaties uit in één API-call

Vereisten

Voordat je begint, zorg dat je beschikt over:

  1. Een Snowflake-account met de rol ACCOUNTADMIN of SYSADMIN
  2. Een Brevo-account met API-toegang
  3. Een Tajo-account met connector-rechten
  4. Een dedicated Snowflake-warehouse voor integratiequeries
  5. Network policy die Tajo IP-adressen toestaat

Authenticatie

Key Pair-authenticatie (aanbevolen)

Terminal window
# Generate RSA key pair
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user
# In Snowflake:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

OAuth 2.0-authenticatie

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

SQL API-authenticatie

Terminal window
# Using JWT token with the SQL API
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

Configuratie

Basisinstelling

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # Every 6 hours
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

Veldmapping

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

API-endpoints

EndpointMethodBeschrijving
/api/v2/statementsPOSTSQL-statements indienen voor uitvoering
/api/v2/statements/{statementHandle}GETUitvoeringsstatus controleren
/api/v2/statements/{statementHandle}/cancelPOSTEen lopend statement annuleren
/api/v2/statements/{statementHandle}?partition={id}GETResultaatpartities ophalen

SQL API-partities

De Snowflake SQL API retourneert grote resultaatsets in partities. Elke partitie bevat tot ongeveer 12MB aan data. Gebruik de partition-parameter om door resultaten te itereren.

Codevoorbeelden

Connector initialiseren

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

Klantsegmenten synchroniseren via SQL API

// Execute a SQL query via Snowflake SQL REST API
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// Poll for results
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Sync to Brevo via Tajo
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

Reverse ETL-pipeline

// Push computed audiences from Snowflake to Brevo lists
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

Rate limits

ResourceLimietOpmerkingen
Gelijktijdige SQL API-queries20 per gebruikerPer Snowflake-account
SQL API-resultaatgrootte12MB per partitiePagineer met partition-ID’s
Statement-timeout172.800 sec (48u)Configureerbaar per query
API-requestsVariabel per planAfhankelijk van Snowflake-editie

Warehouse-kosten

Snowflake rekent op basis van rekentijd. Gebruik een dedicated, passend gedimensioneerd warehouse voor Tajo-queries en stel auto-suspend in om kosten te minimaliseren.

Probleemoplossing

ProbleemOorzaakOplossing
Authenticatie misluktVerlopen JWT-tokenGenereer nieuwe JWT met geldige expiratie
Query-timeoutGrote datasetVoeg filters toe of gebruik incrementele synchronisatie
NetwerkfoutIP niet whitelistedVoeg Tajo-IP’s toe aan Snowflake network policy
Ontbrekende kolommenSchema-wijzigingWerk veldmapping-configuratie bij
PartitiefoutResultaat te grootVerwerk resultaten in kleinere partities

Debugmodus

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

Best practices

  1. Gebruik een dedicated warehouse - Voorkom contentie met productieworkloads
  2. Implementeer incrementele synchronisatie - Vraag alleen gewijzigde records sinds laatste synchronisatie op
  3. Stel auto-suspend in - Configureer warehouse om na 5 minuten inactiviteit te suspenden
  4. Gebruik key pair-auth - Geef voorkeur aan key pair boven wachtwoordauthenticatie
  5. Optimaliseer queries - Filter en projecteer alleen benodigde kolommen voor snellere syncs
  6. Monitor credits - Volg het verbruik van Snowflake-credits voor integratiequeries

Beveiliging

  • Key pair-authenticatie - RSA 2048-bit-encryptie voor API-toegang
  • OAuth 2.0 - Token-gebaseerde authenticatie met rolscoping
  • Network policies - IP-allowlisting voor Tajo-service-endpoints
  • Rolgebaseerde toegang - Dedicated Snowflake-rol met minimaal vereiste rechten
  • Versleutelde dataoverdracht - TLS 1.2+ voor alle API-communicatie
  • Data masking - Gebruik Snowflake dynamic data masking voor gevoelige velden

Gerelateerde bronnen

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
AI-assistent

Hallo! Stel me vragen over de documentatie.