Konektor Snowflake

Propojte Snowflake s Brevo přes Tajo pro synchronizaci zákaznických segmentů z datového skladu, obohacení profilů kontaktů o analytická data a napájení personalizovaných marketingových kampaní poznatky řízenými skladem.

Přehled

VlastnostHodnota
PlatformaSnowflake
KategorieDatový sklad (Vlastní)
Složitost nastaveníStřední
Oficiální integraceNe
Synchronizovaná dataZákazníci, Segmenty, Analytika, Události
Metoda ověřeníPárování klíčů / OAuth 2.0

Funkce

  • Reverse ETL – Přesúvejte zákaznické segmenty ze Snowflake do seznamů kontaktů Brevo
  • Synchronizace publika – Synchronizujte publika vypočtená skladem pro cílené kampaně
  • Obohacení analytiky – Obohacujte kontakty Brevo o vypočítané metriky (LTV, RFM skóre)
  • Dotazy na bázi SQL – Používejte Snowflake SQL REST API pro programové spouštění dotazů
  • Plánovaná synchronizace – Spouštějte automatizované datové pipeline v konfigurovatelných intervalech
  • Podpora více příkazů – Spouštějte složité transformace dat v jediném volání API

Předpoklady

Než začnete, ujistěte se, že máte:

  1. Účet Snowflake s rolí ACCOUNTADMIN nebo SYSADMIN
  2. Účet Brevo s přístupem k API
  3. Účet Tajo s oprávněními ke konektoru
  4. Vyhrazený warehouse Snowflake pro integrační dotazy
  5. Síťovou politiku povolující IP adresy Tajo

Ověření

Ověření párováním klíčů (doporučeno)

Terminal window
# Vygenerování páru RSA klíčů
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Přiřazení veřejného klíče uživateli Snowflake
# Ve Snowflake:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

Ověření OAuth 2.0

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

Ověření SQL API

Terminal window
# Použití JWT tokenu s SQL API
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

Konfigurace

Základní nastavení

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # Každých 6 hodin
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

Mapování polí

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

Koncové body API

Koncový bodMetodaPopis
/api/v2/statementsPOSTOdeslání SQL příkazů k provedení
/api/v2/statements/{statementHandle}GETKontrola stavu provedení
/api/v2/statements/{statementHandle}/cancelPOSTZrušení běžícího příkazu
/api/v2/statements/{statementHandle}?partition={id}GETNačtení oddílů výsledků

Oddíly SQL API

Snowflake SQL API vrací velké sady výsledků v oddílech. Každý oddíl obsahuje přibližně 12 MB dat. Používejte parametr partition pro iteraci výsledků.

Příklady kódu

Inicializace konektoru

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

Synchronizace zákaznických segmentů přes SQL API

// Spuštění SQL dotazu přes Snowflake SQL REST API
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// Pollování výsledků
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Synchronizace do Brevo přes Tajo
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

Pipeline Reverse ETL

// Přesun vypočítaných publik ze Snowflake do seznamů Brevo
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

Omezení rychlosti

ProstředekLimitPoznámky
Souběžné dotazy SQL API20 na uživateleNa účet Snowflake
Velikost výsledku SQL API12 MB na oddílStránkovat pomocí ID oddílů
Timeout příkazu172 800 s (48 h)Konfigurovatelné na dotaz
API požadavkyZávisí na plánuNa základě edice Snowflake

Náklady na warehouse

Snowflake účtuje na základě doby výpočtu. Používejte vyhrazený, přiměřeně dimenzovaný warehouse pro dotazy Tajo a nastavte automatické pozastavení pro minimalizaci nákladů.

Řešení problémů

ProblémPříčinaŘešení
Ověření selhaloVypršelý JWT tokenObnovte JWT s platným vypršením
Timeout dotazuVelká datová sadaPřidejte filtry nebo používejte přírůstkovou synchronizaci
Chyba sítěIP není na whitelistuPřidejte IP adresy Tajo do síťové politiky Snowflake
Chybějící sloupceZměna schématuAktualizujte konfiguraci mapování polí
Chyba oddíluVýsledek příliš velkýZpracovávejte výsledky v menších oddílech

Režim ladění

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

Osvědčené postupy

  1. Používejte vyhrazený warehouse – Vyhněte se soupeření s produkčními pracovními zátěžemi
  2. Implementujte přírůstkovou synchronizaci – Dotazujte pouze změněné záznamy od poslední synchronizace
  3. Nastavte automatické pozastavení – Konfigurujte warehouse pro pozastavení po 5 minutách nečinnosti
  4. Používejte ověření párováním klíčů – Upřednostňujte párování klíčů před ověřením heslem
  5. Optimalizujte dotazy – Filtrujte a projektujte pouze potřebné sloupce pro rychlejší synchronizace
  6. Monitorujte kredity – Sledujte spotřebu kreditů Snowflake pro integrační dotazy

Zabezpečení

  • Ověření párováním klíčů – RSA šifrování 2048 bitů pro přístup k API
  • OAuth 2.0 – Ověřování na základě tokenů s rozsahováním rolí
  • Síťové politiky – Whitelisting IP pro koncové body služby Tajo
  • Přístup na základě rolí – Vyhrazená role Snowflake s minimálními požadovanými oprávněními
  • Šifrovaný přenos dat – TLS 1.2+ pro veškerou komunikaci API
  • Maskování dat – Používejte dynamické maskování dat Snowflake pro citlivá pole

Související zdroje

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
AI asistent

Ahoj! Zeptejte se mě na dokumentaci.