Snowflake konektor

Poveži Snowflake sa Brevo putem Tajo da sinhronizuješ korisničke segmente iz svog skladišta podataka, obogatiš profile kontakata analitičkim podacima i pokrećeš personalizovane marketing kampanje uvidima iz warehouse-a.

Pregled

SvojstvoVrednost
PlatformaSnowflake
KategorijaSkladište podataka (Prilagođeno)
Složenost podešavanjaSrednje
Zvanična integracijaNe
Sinhronizovani podaciKupci, Segmenti, Analitika, Događaji
Metod autentifikacijeKey Pair / OAuth 2.0

Karakteristike

  • Reverse ETL - Prosleđivanje korisničkih segmenata iz Snowflake na Brevo liste kontakata
  • Sinhronizacija publike - Sinhronizacija publike izračunate u warehouse-u za ciljane kampanje
  • Obogaćivanje analitikom - Obogaćivanje Brevo kontakata izračunatim metrikama (LTV, RFM rezultati)
  • SQL-bazirani upiti - Korišćenje Snowflake SQL REST API-ja za programsko izvršavanje upita
  • Zakazana sinhronizacija - Pokretanje automatizovanih pipeline-ova podataka na podesivim intervalima
  • Podrška za više iskaza - Izvršavanje složenih transformacija podataka u jednom API pozivu

Preduslovi

Pre nego što započneš, proveri da imaš:

  1. Snowflake nalog sa ACCOUNTADMIN ili SYSADMIN ulogom
  2. Brevo nalog sa API pristupom
  3. Tajo nalog sa dozvolama konektora
  4. Namenski Snowflake warehouse za upite integracije
  5. Mrežnu politiku koja dozvoljava Tajo IP adrese

Autentifikacija

Autentifikacija ključnim parom (preporučeno)

Terminal window
# Generate RSA key pair
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user
# In Snowflake:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

OAuth 2.0 autentifikacija

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

Autentifikacija SQL API-ja

Terminal window
# Using JWT token with the SQL API
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

Konfiguracija

Osnovno podešavanje

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # Every 6 hours
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

Mapiranje polja

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

API endpointi

EndpointMetodaOpis
/api/v2/statementsPOSTSlanje SQL iskaza za izvršavanje
/api/v2/statements/{statementHandle}GETProvera statusa izvršavanja
/api/v2/statements/{statementHandle}/cancelPOSTOtkazivanje aktivnog iskaza
/api/v2/statements/{statementHandle}?partition={id}GETPreuzimanje particija rezultata

SQL API particije

Snowflake SQL API vraća velike skupove rezultata u particijama. Svaka particija sadrži do oko 12MB podataka. Koristi parametar partition za iteraciju kroz rezultate.

Primeri koda

Inicijalizacija konektora

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

Sinhronizacija korisničkih segmenata putem SQL API-ja

// Execute a SQL query via Snowflake SQL REST API
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// Poll for results
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Sync to Brevo via Tajo
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

Reverse ETL pipeline

// Push computed audiences from Snowflake to Brevo lists
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

Ograničenja brzine

ResursLimitNapomene
SQL API istovremeni upiti20 po korisnikuPo Snowflake nalogu
SQL API veličina rezultata12MB po particijiPaginiraj sa ID-ovima particija
Timeout iskaza172.800 sek (48h)Podesivo po upitu
API zahteviZavisi od planaPrema Snowflake izdanju

Troškovi warehouse-a

Snowflake naplaćuje prema vremenu računanja. Koristi namenski, odgovarajuće veličine warehouse za Tajo upite i postavi auto-suspend da minimizuješ troškove.

Rešavanje problema

ProblemUzrokRešenje
Autentifikacija neuspešnaIstekli JWT tokenRegeneriši JWT sa važećim rokom isteka
Timeout upitaVeliki skup podatakaDodaj filtere ili koristi inkrementalnu sinhronizaciju
Greška mrežeIP nije na listi dozvoljenihDodaj Tajo IP-ove u Snowflake mrežnu politiku
Nedostaju kolonePromena šemeAžuriraj konfiguraciju mapiranja polja
Greška particijeRezultat prevelikiObradi rezultate u manjim particijama

Debug režim

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

Najbolje prakse

  1. Koristi namenski warehouse - Izbegni sukob sa produkcijskim opterećenjima
  2. Implementiraj inkrementalnu sinhronizaciju - Upiši samo izmenjene zapise od poslednje sinhronizacije
  3. Postavi auto-suspend - Konfiguriši warehouse da se suspenduje nakon 5 minuta neaktivnosti
  4. Koristi autentifikaciju ključnim parom - Preferiraj ključni par umesto lozinke
  5. Optimizuj upite - Filtriraj i projektuj samo potrebne kolone za brže sinhronizacije
  6. Prati kredite - Prati potrošnju Snowflake kredita za upite integracije

Bezbednost

  • Autentifikacija ključnim parom - RSA 2048-bit enkripcija za API pristup
  • OAuth 2.0 - Autentifikacija zasnovana na tokenima sa skopovanjem uloga
  • Mrežne politike - IP allowlisting za Tajo servisne endpointe
  • Pristup zasnovan na ulogama - Namenski Snowflake ulog sa minimalnim potrebnim privilegijama
  • Enkriptovani prenos podataka - TLS 1.2+ za svu API komunikaciju
  • Maskiranje podataka - Korišćenje Snowflake dinamičkog maskiranja podataka za osetljiva polja

Povezani resursi

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
AI асистент

Здраво! Питајте ме о документацији.