Snowflake konektor
Poveži Snowflake sa Brevo putem Tajo da sinhronizuješ korisničke segmente iz svog skladišta podataka, obogatiš profile kontakata analitičkim podacima i pokrećeš personalizovane marketing kampanje uvidima iz warehouse-a.
Pregled
| Svojstvo | Vrednost |
|---|---|
| Platforma | Snowflake |
| Kategorija | Skladište podataka (Prilagođeno) |
| Složenost podešavanja | Srednje |
| Zvanična integracija | Ne |
| Sinhronizovani podaci | Kupci, Segmenti, Analitika, Događaji |
| Metod autentifikacije | Key Pair / OAuth 2.0 |
Karakteristike
- Reverse ETL - Prosleđivanje korisničkih segmenata iz Snowflake na Brevo liste kontakata
- Sinhronizacija publike - Sinhronizacija publike izračunate u warehouse-u za ciljane kampanje
- Obogaćivanje analitikom - Obogaćivanje Brevo kontakata izračunatim metrikama (LTV, RFM rezultati)
- SQL-bazirani upiti - Korišćenje Snowflake SQL REST API-ja za programsko izvršavanje upita
- Zakazana sinhronizacija - Pokretanje automatizovanih pipeline-ova podataka na podesivim intervalima
- Podrška za više iskaza - Izvršavanje složenih transformacija podataka u jednom API pozivu
Preduslovi
Pre nego što započneš, proveri da imaš:
- Snowflake nalog sa ACCOUNTADMIN ili SYSADMIN ulogom
- Brevo nalog sa API pristupom
- Tajo nalog sa dozvolama konektora
- Namenski Snowflake warehouse za upite integracije
- Mrežnu politiku koja dozvoljava Tajo IP adrese
Autentifikacija
Autentifikacija ključnim parom (preporučeno)
# Generate RSA key pairopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user# In Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';OAuth 2.0 autentifikacija
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });Autentifikacija SQL API-ja
# Using JWT token with the SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'Konfiguracija
Osnovno podešavanje
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Every 6 hours
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncMapiranje polja
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEAPI endpointi
| Endpoint | Metoda | Opis |
|---|---|---|
/api/v2/statements | POST | Slanje SQL iskaza za izvršavanje |
/api/v2/statements/{statementHandle} | GET | Provera statusa izvršavanja |
/api/v2/statements/{statementHandle}/cancel | POST | Otkazivanje aktivnog iskaza |
/api/v2/statements/{statementHandle}?partition={id} | GET | Preuzimanje particija rezultata |
SQL API particije
Snowflake SQL API vraća velike skupove rezultata u particijama. Svaka particija sadrži do oko 12MB podataka. Koristi parametar partition za iteraciju kroz rezultate.
Primeri koda
Inicijalizacija konektora
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});Sinhronizacija korisničkih segmenata putem SQL API-ja
// Execute a SQL query via Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Poll for resultslet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Sync to Brevo via Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Reverse ETL pipeline
// Push computed audiences from Snowflake to Brevo listsawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Ograničenja brzine
| Resurs | Limit | Napomene |
|---|---|---|
| SQL API istovremeni upiti | 20 po korisniku | Po Snowflake nalogu |
| SQL API veličina rezultata | 12MB po particiji | Paginiraj sa ID-ovima particija |
| Timeout iskaza | 172.800 sek (48h) | Podesivo po upitu |
| API zahtevi | Zavisi od plana | Prema Snowflake izdanju |
Troškovi warehouse-a
Snowflake naplaćuje prema vremenu računanja. Koristi namenski, odgovarajuće veličine warehouse za Tajo upite i postavi auto-suspend da minimizuješ troškove.
Rešavanje problema
| Problem | Uzrok | Rešenje |
|---|---|---|
| Autentifikacija neuspešna | Istekli JWT token | Regeneriši JWT sa važećim rokom isteka |
| Timeout upita | Veliki skup podataka | Dodaj filtere ili koristi inkrementalnu sinhronizaciju |
| Greška mreže | IP nije na listi dozvoljenih | Dodaj Tajo IP-ove u Snowflake mrežnu politiku |
| Nedostaju kolone | Promena šeme | Ažuriraj konfiguraciju mapiranja polja |
| Greška particije | Rezultat preveliki | Obradi rezultate u manjim particijama |
Debug režim
connectors: snowflake: debug: true log_level: verbose log_queries: trueNajbolje prakse
- Koristi namenski warehouse - Izbegni sukob sa produkcijskim opterećenjima
- Implementiraj inkrementalnu sinhronizaciju - Upiši samo izmenjene zapise od poslednje sinhronizacije
- Postavi auto-suspend - Konfiguriši warehouse da se suspenduje nakon 5 minuta neaktivnosti
- Koristi autentifikaciju ključnim parom - Preferiraj ključni par umesto lozinke
- Optimizuj upite - Filtriraj i projektuj samo potrebne kolone za brže sinhronizacije
- Prati kredite - Prati potrošnju Snowflake kredita za upite integracije
Bezbednost
- Autentifikacija ključnim parom - RSA 2048-bit enkripcija za API pristup
- OAuth 2.0 - Autentifikacija zasnovana na tokenima sa skopovanjem uloga
- Mrežne politike - IP allowlisting za Tajo servisne endpointe
- Pristup zasnovan na ulogama - Namenski Snowflake ulog sa minimalnim potrebnim privilegijama
- Enkriptovani prenos podataka - TLS 1.2+ za svu API komunikaciju
- Maskiranje podataka - Korišćenje Snowflake dinamičkog maskiranja podataka za osetljiva polja