Conector Snowflake
Conectați Snowflake la Brevo prin Tajo pentru a sincroniza segmentele de clienți din depozitul de date, a îmbogăți profilurile de contact cu date analitice și a alimenta campanii de marketing personalizate cu informații bazate pe depozit.
Prezentare generală
| Proprietate | Valoare |
|---|---|
| Platformă | Snowflake |
| Categorie | Depozit de date (Personalizat) |
| Complexitate configurare | Medie |
| Integrare oficială | Nu |
| Date sincronizate | Clienți, Segmente, Analize, Evenimente |
| Metodă de autentificare | Pereche de chei / OAuth 2.0 |
Funcționalități
- Reverse ETL - Transmiteți segmentele de clienți din Snowflake la listele de contacte Brevo
- Sincronizare audiențe - Sincronizați audiențele calculate în depozit pentru campanii targetate
- Îmbogățire cu analize - Îmbogățiți contactele Brevo cu metrici calculate (LTV, scoruri RFM)
- Interogări bazate pe SQL - Utilizați Snowflake SQL REST API pentru a executa interogări programatic
- Sincronizare programată - Rulați pipeline-uri de date automatizate la intervale configurabile
- Suport instrucțiuni multiple - Executați transformări complexe de date într-un singur apel API
Cerințe preliminare
Înainte de a începe, asigurați-vă că aveți:
- Un cont Snowflake cu rolul ACCOUNTADMIN sau SYSADMIN
- Un cont Brevo cu acces API
- Un cont Tajo cu permisiuni de conector
- Un depozit Snowflake dedicat pentru interogările de integrare
- Politică de rețea care permite adresele IP Tajo
Autentificare
Autentificare cu pereche de chei (Recomandat)
# Generate RSA key pairopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user# In Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';Autentificare OAuth 2.0
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });Autentificare SQL API
# Using JWT token with the SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'Configurare
Configurare de bază
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Every 6 hours
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncMapare câmpuri
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEPuncte finale API
| Punct final | Metodă | Descriere |
|---|---|---|
/api/v2/statements | POST | Trimite instrucțiuni SQL pentru execuție |
/api/v2/statements/{statementHandle} | GET | Verifică statusul execuției |
/api/v2/statements/{statementHandle}/cancel | POST | Anulează o instrucțiune în curs |
/api/v2/statements/{statementHandle}?partition={id} | GET | Recuperează partițiile de rezultate |
Partiții SQL API
Snowflake SQL API returnează seturi de rezultate mari în partiții. Fiecare partiție conține aproximativ 12MB de date. Utilizați parametrul partition pentru a itera prin rezultate.
Exemple de cod
Inițializare conector
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});Sincronizare segmente clienți prin SQL API
// Execute a SQL query via Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Poll for resultslet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Sync to Brevo via Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Pipeline Reverse ETL
// Push computed audiences from Snowflake to Brevo listsawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Limite de rată
| Resursă | Limită | Note |
|---|---|---|
| Interogări concurente SQL API | 20 per utilizator | Per cont Snowflake |
| Dimensiune rezultat SQL API | 12MB per partiție | Paginați cu ID-uri de partiție |
| Timeout instrucțiune | 172.800 sec (48h) | Configurabil per interogare |
| Cereri API | Variază după plan | Bazat pe ediția Snowflake |
Costuri depozit
Snowflake taxează în funcție de timpul de calcul. Utilizați un depozit dedicat, de dimensiune corespunzătoare pentru interogările Tajo și setați auto-suspend pentru a minimiza costurile.
Depanare
| Problemă | Cauză | Soluție |
|---|---|---|
| Autentificare eșuată | Token JWT expirat | Regenerați JWT cu expirare validă |
| Timeout interogare | Set de date mare | Adăugați filtre sau utilizați sincronizare incrementală |
| Eroare de rețea | IP neadăugat la lista albă | Adăugați IP-urile Tajo la politica de rețea Snowflake |
| Coloane lipsă | Modificare schemă | Actualizați configurația mapării câmpurilor |
| Eroare partiție | Rezultate prea mari | Procesați rezultatele în partiții mai mici |
Modul de depanare
connectors: snowflake: debug: true log_level: verbose log_queries: trueBune practici
- Utilizați un depozit dedicat - Evitați contenciunea cu volumele de lucru de producție
- Implementați sincronizare incrementală - Interogați doar înregistrările modificate de la ultima sincronizare
- Setați auto-suspend - Configurați depozitul să se suspende după 5 minute de inactivitate
- Utilizați autentificarea cu pereche de chei - Preferați perechea de chei față de autentificarea cu parolă
- Optimizați interogările - Filtrați și proiectați doar coloanele necesare pentru sincronizări mai rapide
- Monitorizați creditele - Urmăriți consumul de credite Snowflake pentru interogările de integrare
Securitate
- Autentificare cu pereche de chei - Criptare RSA 2048-bit pentru accesul API
- OAuth 2.0 - Autentificare bazată pe token cu scopare pe rol
- Politici de rețea - Allowlisting IP pentru punctele finale ale serviciului Tajo
- Acces bazat pe rol - Rol Snowflake dedicat cu privilegii minime necesare
- Transfer de date criptat - TLS 1.2+ pentru toate comunicațiile API
- Mascare date - Utilizați mascarea dinamică a datelor Snowflake pentru câmpurile sensibile