Snowflake Connector

Poveži Snowflake z Brevo prek Tajo za sinhronizacijo segmentov strank iz podatkovnega skladišča, obogatitev profilov stikov z analitičnimi podatki in napajanje personaliziranih marketinških kampanj z vpogledi iz skladišča.

Pregled

LastnostVrednost
PlatformaSnowflake
KategorijaPodatkovno skladišče (po meri)
Zahtevnost nastavitveSrednja
Uradna integracijaNe
Sinhronizirani podatkiStranke, segmenti, analitika, dogodki
Metoda avtentikacijeKey Pair / OAuth 2.0

Funkcionalnosti

  • Reverse ETL – potisni segmente strank iz Snowflake v sezname stikov Brevo
  • Sinhronizacija občinstva – sinhroniziraj občinstva, izračunana v skladišču, za ciljane kampanje
  • Obogatitev z analitiko – obogati stike Brevo z izračunanimi metrikami (LTV, RFM ocene)
  • Poizvedbe na osnovi SQL – z REST API Snowflake SQL programsko izvajaj poizvedbe
  • Načrtovana sinhronizacija – poganjaj avtomatizirane podatkovne cevovode v nastavljenih intervalih
  • Podpora za več stavkov – v enem klicu API izvedi kompleksne pretvorbe podatkov

Predpogoji

Preden začneš, se prepričaj, da imaš:

  1. Račun Snowflake z vlogo ACCOUNTADMIN ali SYSADMIN
  2. Brevo račun z dostopom do API
  3. Tajo račun z dovoljenji za konektorje
  4. Namensko skladišče Snowflake za integracijske poizvedbe
  5. Omrežno politiko, ki dovoljuje IP naslove Tajo

Avtentikacija

Avtentikacija z gonilnim parom ključev (priporočeno)

Terminal window
# Generate RSA key pair
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user
# In Snowflake:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

Avtentikacija OAuth 2.0

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

Avtentikacija SQL API

Terminal window
# Using JWT token with the SQL API
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

Konfiguracija

Osnovna nastavitev

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # Every 6 hours
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

Preslikava polj

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

Končne točke API

Končna točkaMetodaOpis
/api/v2/statementsPOSTPošlji SQL stavke za izvajanje
/api/v2/statements/{statementHandle}GETPreveri status izvajanja
/api/v2/statements/{statementHandle}/cancelPOSTPrekliči tekoči stavek
/api/v2/statements/{statementHandle}?partition={id}GETPridobi particije rezultatov

Particije SQL API

Snowflake SQL API vrne velike nabore rezultatov v particijah. Vsaka particija vsebuje do približno 12 MB podatkov. Za iteracijo skozi rezultate uporabi parameter particije.

Primeri kode

Inicializacija konektorja

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

Sinhronizacija segmentov strank prek SQL API

// Execute a SQL query via Snowflake SQL REST API
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// Poll for results
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Sync to Brevo via Tajo
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

Reverse ETL cevovod

// Push computed audiences from Snowflake to Brevo lists
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

Omejitve hitrosti

VirOmejitevOpombe
Vzporedne poizvedbe SQL API20 na uporabnikaNa račun Snowflake
Velikost rezultatov SQL API12 MB na particijoPaginiraj z ID-ji particij
Čakalna doba stavka172.800 s (48 ur)Nastavljivo na poizvedbo
Zahtevki APIOdvisno od planaGlede na izdajo Snowflake

Stroški skladišča

Snowflake zaračunava glede na čas računanja. Za poizvedbe Tajo uporabi namensko, primerno veliko skladišče in nastavi samodejno prekinitev za zmanjšanje stroškov.

Odpravljanje težav

TežavaVzrokRešitev
Avtentikacija neuspešnaPotekel JWT žetonZnova ustvari JWT z veljavnim rokom poteka
Časovna omejitev poizvedbeVelik nabor podatkovDodaj filtre ali uporabi postopno sinhronizacijo
Omrežna napakaIP ni na belem seznamuDodaj IP naslove Tajo v omrežno politiko Snowflake
Manjkajoči stolpciSprememba shemePosodobi konfiguracijo preslikave polj
Napaka particijePreveč velik rezultatObdeluj rezultate v manjših particijah

Način odpravljanja napak

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

Najboljše prakse

  1. Uporabi namensko skladišče – izogni se tekmovanju s produkcijskimi obremenitvami
  2. Implementiraj postopno sinhronizacijo – poizveduj samo zapise, ki so se spremenili od zadnje sinhronizacije
  3. Nastavi samodejno prekinitev – konfiguriraj skladišče, da se po 5 minutah nedejavnosti prekine
  4. Uporabi avtentikacijo z gonilnim parom – daj prednost paru ključev pred avtentikacijo z geslom
  5. Optimiziraj poizvedbe – za hitrejšo sinhronizacijo filtriraj in projiciraj samo potrebne stolpce
  6. Spremljaj kredite – sledi porabi kreditov Snowflake za integracijske poizvedbe

Varnost

  • Avtentikacija z gonilnim parom – RSA 2048-bitno šifriranje za dostop do API
  • OAuth 2.0 – avtentikacija na osnovi žetonov z obsegom vlog
  • Omrežne politike – seznam dovoljenih IP za končne točke storitve Tajo
  • Dostop na osnovi vlog – namenjena vloga Snowflake z minimalnimi zahtevanimi pravicami
  • Šifrirani prenos podatkov – TLS 1.2+ za vse komunikacije API
  • Maskiranje podatkov – za občutljiva polja uporabi dinamično maskiranje podatkov Snowflake

Povezani viri

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
AI pomočnik

Živjo! Vprašajte me o dokumentaciji.