Connettore Snowflake

Collega Snowflake a Brevo tramite Tajo per sincronizzare i segmenti cliente dal tuo data warehouse, arricchire i profili dei contatti con dati analitici e alimentare campagne marketing personalizzate con insight dal warehouse.

Panoramica

ProprietàValore
PiattaformaSnowflake
CategoriaData Warehouse (Custom)
Complessità di setupMedia
Integrazione ufficialeNo
Dati sincronizzatiClienti, Segmenti, Analytics, Eventi
Metodo di autenticazioneKey Pair / OAuth 2.0

Funzionalità

  • Reverse ETL - Invia segmenti cliente da Snowflake alle liste contatti Brevo
  • Sync delle audience - Sincronizza audience computate nel warehouse per campagne mirate
  • Arricchimento analytics - Arricchisci i contatti Brevo con metriche computate (LTV, score RFM)
  • Query SQL-based - Usa la REST API SQL di Snowflake per eseguire query in modo programmatico
  • Sync pianificato - Esegui pipeline dati automatizzate a intervalli configurabili
  • Supporto multi-statement - Esegui trasformazioni dati complesse in singole chiamate API

Prerequisiti

Prima di iniziare, assicurati di avere:

  1. Un account Snowflake con ruolo ACCOUNTADMIN o SYSADMIN
  2. Un account Brevo con accesso API
  3. Un account Tajo con permessi sui connettori
  4. Un warehouse Snowflake dedicato per le query di integrazione
  5. Una network policy che consente gli indirizzi IP Tajo

Autenticazione

Autenticazione Key Pair (consigliata)

Terminal window
# Genera la coppia di chiavi RSA
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assegna la chiave pubblica all'utente Snowflake
# In Snowflake:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

Autenticazione OAuth 2.0

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

Autenticazione SQL API

Terminal window
# Usando un token JWT con la SQL API
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

Configurazione

Setup di base

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # Ogni 6 ore
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

Mapping dei campi

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

Endpoint API

EndpointMetodoDescrizione
/api/v2/statementsPOSTInvia statement SQL per l’esecuzione
/api/v2/statements/{statementHandle}GETControlla lo stato di esecuzione
/api/v2/statements/{statementHandle}/cancelPOSTAnnulla uno statement in esecuzione
/api/v2/statements/{statementHandle}?partition={id}GETRecupera partizioni di risultato

Partizioni SQL API

La SQL API di Snowflake restituisce result set grandi in partizioni. Ogni partizione contiene fino a circa 12MB di dati. Usa il parametro partition per iterare sui risultati.

Esempi di codice

Inizializzare il connettore

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

Sincronizzare i segmenti cliente tramite SQL API

// Esegue una query SQL tramite la REST API SQL di Snowflake
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// Polling per i risultati
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Sincronizza su Brevo tramite Tajo
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

Pipeline Reverse ETL

// Invia audience computate da Snowflake alle liste Brevo
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

Limiti di rate

RisorsaLimiteNote
Query concorrenti SQL API20 per utentePer account Snowflake
Dimensione risultato SQL API12MB per partizionePagina con ID partizione
Statement timeout172.800 sec (48h)Configurabile per query
Richieste APIVariabile per pianoIn base all’edizione Snowflake

Costi del warehouse

Snowflake addebita in base al tempo di compute. Usa un warehouse dedicato e dimensionato adeguatamente per le query Tajo e imposta l’auto-suspend per minimizzare i costi.

Risoluzione dei problemi

ProblemaCausaSoluzione
Autenticazione fallitaToken JWT scadutoRigenera il JWT con scadenza valida
Timeout della queryDataset grandeAggiungi filtri o usa sync incrementale
Errore di reteIP non in whitelistAggiungi gli IP Tajo alla network policy di Snowflake
Colonne mancantiCambio di schemaAggiorna la configurazione del field mapping
Errore di partizioneRisultato troppo grandeElabora i risultati in partizioni più piccole

Modalità debug

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

Best practice

  1. Usa un warehouse dedicato - Evita contesa con i workload di produzione
  2. Implementa sync incrementale - Interroga solo i record modificati dal sync precedente
  3. Imposta auto-suspend - Configura il warehouse per sospendersi dopo 5 minuti di inattività
  4. Usa autenticazione key pair - Preferisci key pair all’autenticazione via password
  5. Ottimizza le query - Filtra e proietta solo le colonne necessarie per sync più veloci
  6. Monitora i crediti - Traccia il consumo di crediti Snowflake per le query di integrazione

Sicurezza

  • Autenticazione key pair - Cifratura RSA 2048-bit per l’accesso API
  • OAuth 2.0 - Autenticazione basata su token con scope di ruolo
  • Network policy - IP allowlisting per gli endpoint del servizio Tajo
  • Accesso basato su ruolo - Ruolo Snowflake dedicato con privilegi minimi necessari
  • Trasferimento dati cifrato - TLS 1.2+ per tutte le comunicazioni API
  • Data masking - Usa il dynamic data masking di Snowflake per campi sensibili

Risorse correlate

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
Assistente AI

Ciao! Chiedimi qualsiasi cosa sulla documentazione.