Konektor Snowflake
Połącz Snowflake z Brevo przez Tajo, aby synchronizować segmenty klientów z hurtowni danych, wzbogacać profile kontaktów danymi analitycznymi i zasilać spersonalizowane kampanie marketingowe wglądami opartymi na hurtowni.
Przegląd
| Właściwość | Wartość |
|---|---|
| Platforma | Snowflake |
| Kategoria | Hurtownia danych (Niestandardowa) |
| Poziom konfiguracji | Średni |
| Integracja oficjalna | Nie |
| Synchronizowane dane | Klienci, Segmenty, Analityka, Zdarzenia |
| Metoda uwierzytelniania | Para kluczy / OAuth 2.0 |
Funkcje
- Reverse ETL - Wypychaj segmenty klientów ze Snowflake do list kontaktów Brevo
- Synchronizacja odbiorców - Synchronizuj odbiorców obliczonych w hurtowni dla ukierunkowanych kampanii
- Wzbogacanie analityczne - Wzbogacaj kontakty Brevo obliczonymi metrykami (LTV, wyniki RFM)
- Zapytania SQL - Używaj Snowflake SQL REST API do wykonywania zapytań programistycznie
- Zaplanowana synchronizacja - Uruchamiaj zautomatyzowane pipeline danych w konfigurowalnych interwałach
- Obsługa wielu instrukcji - Wykonuj złożone transformacje danych w pojedynczych wywołaniach API
Wymagania wstępne
Zanim zaczniesz, upewnij się, że masz:
- Konto Snowflake z rolą ACCOUNTADMIN lub SYSADMIN
- Konto Brevo z dostępem do API
- Konto Tajo z uprawnieniami konektora
- Dedykowany magazyn Snowflake dla zapytań integracyjnych
- Politykę sieciową zezwalającą na adresy IP Tajo
Uwierzytelnianie
Uwierzytelnianie parą kluczy (zalecane)
# Generate RSA key pairopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user# In Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';Uwierzytelnianie OAuth 2.0
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });Uwierzytelnianie SQL API
# Using JWT token with the SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'Konfiguracja
Podstawowa konfiguracja
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Every 6 hours
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncMapowanie pól
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEEndpointy API
| Endpoint | Metoda | Opis |
|---|---|---|
/api/v2/statements | POST | Wyślij instrukcje SQL do wykonania |
/api/v2/statements/{statementHandle} | GET | Sprawdź status wykonania |
/api/v2/statements/{statementHandle}/cancel | POST | Anuluj uruchomioną instrukcję |
/api/v2/statements/{statementHandle}?partition={id} | GET | Pobierz partycje wyników |
Partycje SQL API
Snowflake SQL API zwraca duże zestawy wyników w partycjach. Każda partycja zawiera do około 12 MB danych. Używaj parametru partition do iteracji przez wyniki.
Przykłady kodu
Inicjalizacja konektora
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});Synchronizacja segmentów klientów przez SQL API
// Execute a SQL query via Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Poll for resultslet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Sync to Brevo via Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Pipeline Reverse ETL
// Push computed audiences from Snowflake to Brevo listsawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Limity API
| Zasób | Limit | Uwagi |
|---|---|---|
| Jednoczesne zapytania SQL API | 20 na użytkownika | Na konto Snowflake |
| Rozmiar wyników SQL API | 12 MB na partycję | Paginacja z ID partycji |
| Timeout instrukcji | 172 800 sek (48h) | Konfigurowalny na zapytanie |
| Żądania API | Zależy od planu | Zależy od edycji Snowflake |
Koszty magazynu
Snowflake nalicza opłaty na podstawie czasu obliczeń. Używaj dedykowanego, odpowiednio dobranego magazynu dla zapytań Tajo i ustaw auto-zawieszanie, aby minimalizować koszty.
Rozwiązywanie problemów
| Problem | Przyczyna | Rozwiązanie |
|---|---|---|
| Błąd uwierzytelniania | Wygasły token JWT | Wygeneruj ponownie JWT z ważną datą wygaśnięcia |
| Timeout zapytania | Duży zestaw danych | Dodaj filtry lub użyj synchronizacji przyrostowej |
| Błąd sieci | IP nie na białej liście | Dodaj IP Tajo do polityki sieciowej Snowflake |
| Brakujące kolumny | Zmiana schematu | Zaktualizuj konfigurację mapowania pól |
| Błąd partycji | Zbyt duże wyniki | Przetwarzaj wyniki w mniejszych partycjach |
Tryb debug
connectors: snowflake: debug: true log_level: verbose log_queries: trueNajlepsze praktyki
- Używaj dedykowanego magazynu - Unikaj rywalizacji z obciążeniami produkcyjnymi
- Wdrożyj synchronizację przyrostową - Zapytuj tylko o zmienione rekordy od ostatniej synchronizacji
- Ustaw auto-zawieszanie - Skonfiguruj magazyn do zawieszenia po 5 minutach bezczynności
- Używaj uwierzytelniania parą kluczy - Preferuj parę kluczy zamiast uwierzytelniania hasłem
- Optymalizuj zapytania - Filtruj i projektuj tylko potrzebne kolumny dla szybszych synchronizacji
- Monitoruj kredyty - Śledź zużycie kredytów Snowflake dla zapytań integracyjnych
Bezpieczeństwo
- Uwierzytelnianie parą kluczy - Szyfrowanie RSA 2048-bit dla dostępu do API
- OAuth 2.0 - Uwierzytelnianie oparte na tokenach z zakresem roli
- Polityki sieciowe - Allowlisting IP dla punktów końcowych usługi Tajo
- Dostęp oparty na rolach - Dedykowana rola Snowflake z minimalnymi wymaganymi uprawnieniami
- Szyfrowany transfer danych - TLS 1.2+ dla całej komunikacji API
- Maskowanie danych - Używaj dynamicznego maskowania danych Snowflake dla wrażliwych pól