Konektor Snowflake
Propojte Snowflake s Brevo přes Tajo pro synchronizaci zákaznických segmentů z datového skladu, obohacení profilů kontaktů o analytická data a napájení personalizovaných marketingových kampaní poznatky řízenými skladem.
Přehled
| Vlastnost | Hodnota |
|---|---|
| Platforma | Snowflake |
| Kategorie | Datový sklad (Vlastní) |
| Složitost nastavení | Střední |
| Oficiální integrace | Ne |
| Synchronizovaná data | Zákazníci, Segmenty, Analytika, Události |
| Metoda ověření | Párování klíčů / OAuth 2.0 |
Funkce
- Reverse ETL – Přesúvejte zákaznické segmenty ze Snowflake do seznamů kontaktů Brevo
- Synchronizace publika – Synchronizujte publika vypočtená skladem pro cílené kampaně
- Obohacení analytiky – Obohacujte kontakty Brevo o vypočítané metriky (LTV, RFM skóre)
- Dotazy na bázi SQL – Používejte Snowflake SQL REST API pro programové spouštění dotazů
- Plánovaná synchronizace – Spouštějte automatizované datové pipeline v konfigurovatelných intervalech
- Podpora více příkazů – Spouštějte složité transformace dat v jediném volání API
Předpoklady
Než začnete, ujistěte se, že máte:
- Účet Snowflake s rolí ACCOUNTADMIN nebo SYSADMIN
- Účet Brevo s přístupem k API
- Účet Tajo s oprávněními ke konektoru
- Vyhrazený warehouse Snowflake pro integrační dotazy
- Síťovou politiku povolující IP adresy Tajo
Ověření
Ověření párováním klíčů (doporučeno)
# Vygenerování páru RSA klíčůopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Přiřazení veřejného klíče uživateli Snowflake# Ve Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';Ověření OAuth 2.0
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });Ověření SQL API
# Použití JWT tokenu s SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'Konfigurace
Základní nastavení
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Každých 6 hodin
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncMapování polí
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEKoncové body API
| Koncový bod | Metoda | Popis |
|---|---|---|
/api/v2/statements | POST | Odeslání SQL příkazů k provedení |
/api/v2/statements/{statementHandle} | GET | Kontrola stavu provedení |
/api/v2/statements/{statementHandle}/cancel | POST | Zrušení běžícího příkazu |
/api/v2/statements/{statementHandle}?partition={id} | GET | Načtení oddílů výsledků |
Oddíly SQL API
Snowflake SQL API vrací velké sady výsledků v oddílech. Každý oddíl obsahuje přibližně 12 MB dat. Používejte parametr partition pro iteraci výsledků.
Příklady kódu
Inicializace konektoru
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});Synchronizace zákaznických segmentů přes SQL API
// Spuštění SQL dotazu přes Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Pollování výsledkůlet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Synchronizace do Brevo přes Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Pipeline Reverse ETL
// Přesun vypočítaných publik ze Snowflake do seznamů Brevoawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Omezení rychlosti
| Prostředek | Limit | Poznámky |
|---|---|---|
| Souběžné dotazy SQL API | 20 na uživatele | Na účet Snowflake |
| Velikost výsledku SQL API | 12 MB na oddíl | Stránkovat pomocí ID oddílů |
| Timeout příkazu | 172 800 s (48 h) | Konfigurovatelné na dotaz |
| API požadavky | Závisí na plánu | Na základě edice Snowflake |
Náklady na warehouse
Snowflake účtuje na základě doby výpočtu. Používejte vyhrazený, přiměřeně dimenzovaný warehouse pro dotazy Tajo a nastavte automatické pozastavení pro minimalizaci nákladů.
Řešení problémů
| Problém | Příčina | Řešení |
|---|---|---|
| Ověření selhalo | Vypršelý JWT token | Obnovte JWT s platným vypršením |
| Timeout dotazu | Velká datová sada | Přidejte filtry nebo používejte přírůstkovou synchronizaci |
| Chyba sítě | IP není na whitelistu | Přidejte IP adresy Tajo do síťové politiky Snowflake |
| Chybějící sloupce | Změna schématu | Aktualizujte konfiguraci mapování polí |
| Chyba oddílu | Výsledek příliš velký | Zpracovávejte výsledky v menších oddílech |
Režim ladění
connectors: snowflake: debug: true log_level: verbose log_queries: trueOsvědčené postupy
- Používejte vyhrazený warehouse – Vyhněte se soupeření s produkčními pracovními zátěžemi
- Implementujte přírůstkovou synchronizaci – Dotazujte pouze změněné záznamy od poslední synchronizace
- Nastavte automatické pozastavení – Konfigurujte warehouse pro pozastavení po 5 minutách nečinnosti
- Používejte ověření párováním klíčů – Upřednostňujte párování klíčů před ověřením heslem
- Optimalizujte dotazy – Filtrujte a projektujte pouze potřebné sloupce pro rychlejší synchronizace
- Monitorujte kredity – Sledujte spotřebu kreditů Snowflake pro integrační dotazy
Zabezpečení
- Ověření párováním klíčů – RSA šifrování 2048 bitů pro přístup k API
- OAuth 2.0 – Ověřování na základě tokenů s rozsahováním rolí
- Síťové politiky – Whitelisting IP pro koncové body služby Tajo
- Přístup na základě rolí – Vyhrazená role Snowflake s minimálními požadovanými oprávněními
- Šifrovaný přenos dat – TLS 1.2+ pro veškerou komunikaci API
- Maskování dat – Používejte dynamické maskování dat Snowflake pro citlivá pole