Conector Snowflake
Conecte o Snowflake ao Brevo através do Tajo para sincronizar segmentos de clientes do seu data warehouse, enriquecer perfis de contato com dados de analytics e potencializar campanhas de marketing personalizadas com insights orientados pelo warehouse.
Visão geral
| Propriedade | Valor |
|---|---|
| Plataforma | Snowflake |
| Categoria | Data Warehouse (Personalizado) |
| Complexidade de configuração | Média |
| Integração oficial | Não |
| Dados sincronizados | Clientes, Segmentos, Analytics, Eventos |
| Método de autenticação | Key Pair / OAuth 2.0 |
Recursos
- Reverse ETL - Envie segmentos de clientes do Snowflake para listas de contatos do Brevo
- Sincronização de audiências - Sincronize audiências computadas no warehouse para campanhas direcionadas
- Enriquecimento de analytics - Enriqueça contatos do Brevo com métricas computadas (LTV, scores RFM)
- Consultas baseadas em SQL - Use a Snowflake SQL REST API para executar consultas programaticamente
- Sincronização agendada - Execute pipelines de dados automatizados em intervalos configuráveis
- Suporte multi-statement - Execute transformações de dados complexas em chamadas de API únicas
Pré-requisitos
Antes de começar, certifique-se de ter:
- Uma conta Snowflake com role ACCOUNTADMIN ou SYSADMIN
- Uma conta Brevo com acesso à API
- Uma conta Tajo com permissões de conector
- Um warehouse Snowflake dedicado para consultas de integração
- Política de rede permitindo os endereços IP do Tajo
Autenticação
Autenticação por Key Pair (Recomendado)
# Generate RSA key pairopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user# In Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';Autenticação OAuth 2.0
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });Autenticação da SQL API
# Using JWT token with the SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'Configuração
Configuração básica
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Every 6 hours
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncMapeamento de campos
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEEndpoints da API
| Endpoint | Método | Descrição |
|---|---|---|
/api/v2/statements | POST | Enviar statements SQL para execução |
/api/v2/statements/{statementHandle} | GET | Verificar status de execução |
/api/v2/statements/{statementHandle}/cancel | POST | Cancelar um statement em execução |
/api/v2/statements/{statementHandle}?partition={id} | GET | Recuperar partições de resultados |
Partições da SQL API
A SQL API do Snowflake retorna grandes conjuntos de resultados em partições. Cada partição contém até aproximadamente 12MB de dados. Use o parâmetro partition para iterar pelos resultados.
Exemplos de código
Inicializar o conector
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});Sincronizar segmentos de clientes via SQL API
// Execute a SQL query via Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Poll for resultslet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Sync to Brevo via Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Pipeline de Reverse ETL
// Push computed audiences from Snowflake to Brevo listsawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Limites de taxa
| Recurso | Limite | Observações |
|---|---|---|
| Consultas simultâneas da SQL API | 20 por usuário | Por conta Snowflake |
| Tamanho do resultado da SQL API | 12MB por partição | Pagine com IDs de partição |
| Timeout de statement | 172.800 seg (48h) | Configurável por consulta |
| Requisições à API | Varia por plano | Com base na edição do Snowflake |
Custos do warehouse
O Snowflake cobra com base no tempo de compute. Use um warehouse dedicado e apropriadamente dimensionado para consultas do Tajo e configure auto-suspend para minimizar custos.
Solução de problemas
| Problema | Causa | Solução |
|---|---|---|
| Falha de autenticação | Token JWT expirado | Regere o JWT com expiração válida |
| Timeout de consulta | Grande volume de dados | Adicione filtros ou use sincronização incremental |
| Erro de rede | IP não permitido | Adicione os IPs do Tajo à política de rede do Snowflake |
| Colunas ausentes | Mudança de schema | Atualize a configuração de mapeamento de campos |
| Erro de partição | Resultado muito grande | Processe resultados em partições menores |
Modo de depuração
connectors: snowflake: debug: true log_level: verbose log_queries: trueMelhores práticas
- Use um warehouse dedicado - Evite contenção com cargas de trabalho de produção
- Implemente sincronização incremental - Consulte apenas registros alterados desde a última sincronização
- Configure auto-suspend - Configure o warehouse para suspender após 5 minutos de inatividade
- Use autenticação por key pair - Prefira key pair em vez de autenticação por senha
- Otimize as consultas - Filtre e projete apenas as colunas necessárias para sincronizações mais rápidas
- Monitore os créditos - Acompanhe o consumo de créditos do Snowflake para consultas de integração
Segurança
- Autenticação por key pair - Criptografia RSA 2048-bit para acesso à API
- OAuth 2.0 - Autenticação baseada em token com escopo de role
- Políticas de rede - Lista de permissões de IP para endpoints do serviço Tajo
- Acesso baseado em role - Role Snowflake dedicada com privilégios mínimos necessários
- Transferência de dados criptografada - TLS 1.2+ para todas as comunicações de API
- Mascaramento de dados - Use mascaramento dinâmico de dados do Snowflake para campos sensíveis