Snowflake Connector
Poveži Snowflake z Brevo prek Tajo za sinhronizacijo segmentov strank iz podatkovnega skladišča, obogatitev profilov stikov z analitičnimi podatki in napajanje personaliziranih marketinških kampanj z vpogledi iz skladišča.
Pregled
| Lastnost | Vrednost |
|---|---|
| Platforma | Snowflake |
| Kategorija | Podatkovno skladišče (po meri) |
| Zahtevnost nastavitve | Srednja |
| Uradna integracija | Ne |
| Sinhronizirani podatki | Stranke, segmenti, analitika, dogodki |
| Metoda avtentikacije | Key Pair / OAuth 2.0 |
Funkcionalnosti
- Reverse ETL – potisni segmente strank iz Snowflake v sezname stikov Brevo
- Sinhronizacija občinstva – sinhroniziraj občinstva, izračunana v skladišču, za ciljane kampanje
- Obogatitev z analitiko – obogati stike Brevo z izračunanimi metrikami (LTV, RFM ocene)
- Poizvedbe na osnovi SQL – z REST API Snowflake SQL programsko izvajaj poizvedbe
- Načrtovana sinhronizacija – poganjaj avtomatizirane podatkovne cevovode v nastavljenih intervalih
- Podpora za več stavkov – v enem klicu API izvedi kompleksne pretvorbe podatkov
Predpogoji
Preden začneš, se prepričaj, da imaš:
- Račun Snowflake z vlogo ACCOUNTADMIN ali SYSADMIN
- Brevo račun z dostopom do API
- Tajo račun z dovoljenji za konektorje
- Namensko skladišče Snowflake za integracijske poizvedbe
- Omrežno politiko, ki dovoljuje IP naslove Tajo
Avtentikacija
Avtentikacija z gonilnim parom ključev (priporočeno)
# Generate RSA key pairopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user# In Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';Avtentikacija OAuth 2.0
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });Avtentikacija SQL API
# Using JWT token with the SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'Konfiguracija
Osnovna nastavitev
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Every 6 hours
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncPreslikava polj
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEKončne točke API
| Končna točka | Metoda | Opis |
|---|---|---|
/api/v2/statements | POST | Pošlji SQL stavke za izvajanje |
/api/v2/statements/{statementHandle} | GET | Preveri status izvajanja |
/api/v2/statements/{statementHandle}/cancel | POST | Prekliči tekoči stavek |
/api/v2/statements/{statementHandle}?partition={id} | GET | Pridobi particije rezultatov |
Particije SQL API
Snowflake SQL API vrne velike nabore rezultatov v particijah. Vsaka particija vsebuje do približno 12 MB podatkov. Za iteracijo skozi rezultate uporabi parameter particije.
Primeri kode
Inicializacija konektorja
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});Sinhronizacija segmentov strank prek SQL API
// Execute a SQL query via Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Poll for resultslet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Sync to Brevo via Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Reverse ETL cevovod
// Push computed audiences from Snowflake to Brevo listsawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Omejitve hitrosti
| Vir | Omejitev | Opombe |
|---|---|---|
| Vzporedne poizvedbe SQL API | 20 na uporabnika | Na račun Snowflake |
| Velikost rezultatov SQL API | 12 MB na particijo | Paginiraj z ID-ji particij |
| Čakalna doba stavka | 172.800 s (48 ur) | Nastavljivo na poizvedbo |
| Zahtevki API | Odvisno od plana | Glede na izdajo Snowflake |
Stroški skladišča
Snowflake zaračunava glede na čas računanja. Za poizvedbe Tajo uporabi namensko, primerno veliko skladišče in nastavi samodejno prekinitev za zmanjšanje stroškov.
Odpravljanje težav
| Težava | Vzrok | Rešitev |
|---|---|---|
| Avtentikacija neuspešna | Potekel JWT žeton | Znova ustvari JWT z veljavnim rokom poteka |
| Časovna omejitev poizvedbe | Velik nabor podatkov | Dodaj filtre ali uporabi postopno sinhronizacijo |
| Omrežna napaka | IP ni na belem seznamu | Dodaj IP naslove Tajo v omrežno politiko Snowflake |
| Manjkajoči stolpci | Sprememba sheme | Posodobi konfiguracijo preslikave polj |
| Napaka particije | Preveč velik rezultat | Obdeluj rezultate v manjših particijah |
Način odpravljanja napak
connectors: snowflake: debug: true log_level: verbose log_queries: trueNajboljše prakse
- Uporabi namensko skladišče – izogni se tekmovanju s produkcijskimi obremenitvami
- Implementiraj postopno sinhronizacijo – poizveduj samo zapise, ki so se spremenili od zadnje sinhronizacije
- Nastavi samodejno prekinitev – konfiguriraj skladišče, da se po 5 minutah nedejavnosti prekine
- Uporabi avtentikacijo z gonilnim parom – daj prednost paru ključev pred avtentikacijo z geslom
- Optimiziraj poizvedbe – za hitrejšo sinhronizacijo filtriraj in projiciraj samo potrebne stolpce
- Spremljaj kredite – sledi porabi kreditov Snowflake za integracijske poizvedbe
Varnost
- Avtentikacija z gonilnim parom – RSA 2048-bitno šifriranje za dostop do API
- OAuth 2.0 – avtentikacija na osnovi žetonov z obsegom vlog
- Omrežne politike – seznam dovoljenih IP za končne točke storitve Tajo
- Dostop na osnovi vlog – namenjena vloga Snowflake z minimalnimi zahtevanimi pravicami
- Šifrirani prenos podatkov – TLS 1.2+ za vse komunikacije API
- Maskiranje podatkov – za občutljiva polja uporabi dinamično maskiranje podatkov Snowflake