Konektor Snowflake

Połącz Snowflake z Brevo przez Tajo, aby synchronizować segmenty klientów z hurtowni danych, wzbogacać profile kontaktów danymi analitycznymi i zasilać spersonalizowane kampanie marketingowe wglądami opartymi na hurtowni.

Przegląd

WłaściwośćWartość
PlatformaSnowflake
KategoriaHurtownia danych (Niestandardowa)
Poziom konfiguracjiŚredni
Integracja oficjalnaNie
Synchronizowane daneKlienci, Segmenty, Analityka, Zdarzenia
Metoda uwierzytelnianiaPara kluczy / OAuth 2.0

Funkcje

  • Reverse ETL - Wypychaj segmenty klientów ze Snowflake do list kontaktów Brevo
  • Synchronizacja odbiorców - Synchronizuj odbiorców obliczonych w hurtowni dla ukierunkowanych kampanii
  • Wzbogacanie analityczne - Wzbogacaj kontakty Brevo obliczonymi metrykami (LTV, wyniki RFM)
  • Zapytania SQL - Używaj Snowflake SQL REST API do wykonywania zapytań programistycznie
  • Zaplanowana synchronizacja - Uruchamiaj zautomatyzowane pipeline danych w konfigurowalnych interwałach
  • Obsługa wielu instrukcji - Wykonuj złożone transformacje danych w pojedynczych wywołaniach API

Wymagania wstępne

Zanim zaczniesz, upewnij się, że masz:

  1. Konto Snowflake z rolą ACCOUNTADMIN lub SYSADMIN
  2. Konto Brevo z dostępem do API
  3. Konto Tajo z uprawnieniami konektora
  4. Dedykowany magazyn Snowflake dla zapytań integracyjnych
  5. Politykę sieciową zezwalającą na adresy IP Tajo

Uwierzytelnianie

Uwierzytelnianie parą kluczy (zalecane)

Terminal window
# Generate RSA key pair
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user
# In Snowflake:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

Uwierzytelnianie OAuth 2.0

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

Uwierzytelnianie SQL API

Terminal window
# Using JWT token with the SQL API
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

Konfiguracja

Podstawowa konfiguracja

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # Every 6 hours
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

Mapowanie pól

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

Endpointy API

EndpointMetodaOpis
/api/v2/statementsPOSTWyślij instrukcje SQL do wykonania
/api/v2/statements/{statementHandle}GETSprawdź status wykonania
/api/v2/statements/{statementHandle}/cancelPOSTAnuluj uruchomioną instrukcję
/api/v2/statements/{statementHandle}?partition={id}GETPobierz partycje wyników

Partycje SQL API

Snowflake SQL API zwraca duże zestawy wyników w partycjach. Każda partycja zawiera do około 12 MB danych. Używaj parametru partition do iteracji przez wyniki.

Przykłady kodu

Inicjalizacja konektora

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

Synchronizacja segmentów klientów przez SQL API

// Execute a SQL query via Snowflake SQL REST API
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// Poll for results
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Sync to Brevo via Tajo
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

Pipeline Reverse ETL

// Push computed audiences from Snowflake to Brevo lists
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

Limity API

ZasóbLimitUwagi
Jednoczesne zapytania SQL API20 na użytkownikaNa konto Snowflake
Rozmiar wyników SQL API12 MB na partycjęPaginacja z ID partycji
Timeout instrukcji172 800 sek (48h)Konfigurowalny na zapytanie
Żądania APIZależy od planuZależy od edycji Snowflake

Koszty magazynu

Snowflake nalicza opłaty na podstawie czasu obliczeń. Używaj dedykowanego, odpowiednio dobranego magazynu dla zapytań Tajo i ustaw auto-zawieszanie, aby minimalizować koszty.

Rozwiązywanie problemów

ProblemPrzyczynaRozwiązanie
Błąd uwierzytelnianiaWygasły token JWTWygeneruj ponownie JWT z ważną datą wygaśnięcia
Timeout zapytaniaDuży zestaw danychDodaj filtry lub użyj synchronizacji przyrostowej
Błąd sieciIP nie na białej liścieDodaj IP Tajo do polityki sieciowej Snowflake
Brakujące kolumnyZmiana schematuZaktualizuj konfigurację mapowania pól
Błąd partycjiZbyt duże wynikiPrzetwarzaj wyniki w mniejszych partycjach

Tryb debug

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

Najlepsze praktyki

  1. Używaj dedykowanego magazynu - Unikaj rywalizacji z obciążeniami produkcyjnymi
  2. Wdrożyj synchronizację przyrostową - Zapytuj tylko o zmienione rekordy od ostatniej synchronizacji
  3. Ustaw auto-zawieszanie - Skonfiguruj magazyn do zawieszenia po 5 minutach bezczynności
  4. Używaj uwierzytelniania parą kluczy - Preferuj parę kluczy zamiast uwierzytelniania hasłem
  5. Optymalizuj zapytania - Filtruj i projektuj tylko potrzebne kolumny dla szybszych synchronizacji
  6. Monitoruj kredyty - Śledź zużycie kredytów Snowflake dla zapytań integracyjnych

Bezpieczeństwo

  • Uwierzytelnianie parą kluczy - Szyfrowanie RSA 2048-bit dla dostępu do API
  • OAuth 2.0 - Uwierzytelnianie oparte na tokenach z zakresem roli
  • Polityki sieciowe - Allowlisting IP dla punktów końcowych usługi Tajo
  • Dostęp oparty na rolach - Dedykowana rola Snowflake z minimalnymi wymaganymi uprawnieniami
  • Szyfrowany transfer danych - TLS 1.2+ dla całej komunikacji API
  • Maskowanie danych - Używaj dynamicznego maskowania danych Snowflake dla wrażliwych pól

Powiązane zasoby

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
Asystent AI

Cześć! Zapytaj mnie o dokumentację.