Conector de Snowflake
Conecta Snowflake con Brevo a través de Tajo para sincronizar segmentos de clientes desde tu data warehouse, enriquecer perfiles de contacto con datos analíticos e impulsar campañas de marketing personalizadas basadas en insights del warehouse.
Resumen
| Propiedad | Valor |
|---|---|
| Plataforma | Snowflake |
| Categoría | Data Warehouse (Personalizada) |
| Complejidad de configuración | Media |
| Integración oficial | No |
| Datos sincronizados | Clientes, Segmentos, Analíticas, Eventos |
| Método de autenticación | Key Pair / OAuth 2.0 |
Funcionalidades
- Reverse ETL - Envía los segmentos de clientes de Snowflake a las listas de contactos de Brevo
- Sincronización de audiencias - Sincroniza audiencias calculadas en el warehouse para campañas dirigidas
- Enriquecimiento analítico - Enriquece los contactos de Brevo con métricas calculadas (LTV, puntuaciones RFM)
- Consultas basadas en SQL - Usa la Snowflake SQL REST API para ejecutar consultas de forma programática
- Sincronización programada - Ejecuta pipelines de datos automatizados con intervalos configurables
- Soporte multisentencia - Ejecuta transformaciones de datos complejas en una sola llamada a la API
Requisitos previos
Antes de empezar, asegúrate de tener:
- Una cuenta de Snowflake con rol ACCOUNTADMIN o SYSADMIN
- Una cuenta de Brevo con acceso a la API
- Una cuenta de Tajo con permisos de conector
- Un warehouse de Snowflake dedicado a las consultas de integración
- Una política de red que permita las direcciones IP de Tajo
Autenticación
Autenticación por par de claves (recomendado)
# Generate RSA key pairopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user# In Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';Autenticación OAuth 2.0
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });Autenticación de la SQL API
# Using JWT token with the SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'Configuración
Configuración básica
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Every 6 hours
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncAsignación de campos
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEEndpoints de la API
| Endpoint | Método | Descripción |
|---|---|---|
/api/v2/statements | POST | Enviar sentencias SQL para su ejecución |
/api/v2/statements/{statementHandle} | GET | Comprobar el estado de ejecución |
/api/v2/statements/{statementHandle}/cancel | POST | Cancelar una sentencia en ejecución |
/api/v2/statements/{statementHandle}?partition={id} | GET | Obtener particiones de resultados |
Particiones de la SQL API
La SQL API de Snowflake devuelve los conjuntos de resultados grandes en particiones. Cada partición contiene hasta aproximadamente 12 MB de datos. Usa el parámetro partition para iterar por los resultados.
Ejemplos de código
Inicializar el conector
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});Sincronizar segmentos de clientes vía SQL API
// Execute a SQL query via Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Poll for resultslet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Sync to Brevo via Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Pipeline de Reverse ETL
// Push computed audiences from Snowflake to Brevo listsawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Límites de velocidad
| Recurso | Límite | Notas |
|---|---|---|
| Consultas concurrentes de SQL API | 20 por usuario | Por cuenta de Snowflake |
| Tamaño de resultado SQL API | 12 MB por partición | Paginar con IDs de partición |
| Timeout de sentencia | 172.800 s (48 h) | Configurable por consulta |
| Peticiones a la API | Varía según el plan | Depende de la edición de Snowflake |
Costes del warehouse
Snowflake factura en función del tiempo de cómputo. Usa un warehouse dedicado y dimensionado adecuadamente para las consultas de Tajo y configura auto-suspend para minimizar costes.
Resolución de problemas
| Problema | Causa | Solución |
|---|---|---|
| Authentication failed | JWT caducado | Regenera el JWT con una expiración válida |
| Timeout en la consulta | Conjunto de datos grande | Añade filtros o usa sincronización incremental |
| Error de red | IP no incluida en la allowlist | Añade las IPs de Tajo a la política de red de Snowflake |
| Columnas ausentes | Cambio de esquema | Actualiza la configuración de asignación de campos |
| Error de partición | Resultado demasiado grande | Procesa los resultados en particiones más pequeñas |
Modo depuración
connectors: snowflake: debug: true log_level: verbose log_queries: trueBuenas prácticas
- Usa un warehouse dedicado - Evita la competencia con cargas de trabajo de producción
- Implementa sincronización incremental - Consulta solo los registros modificados desde la última sincronización
- Configura auto-suspend - Configura el warehouse para suspenderse tras 5 minutos de inactividad
- Usa autenticación por par de claves - Prefiere par de claves antes que contraseña
- Optimiza las consultas - Filtra y selecciona solo las columnas necesarias para acelerar las sincronizaciones
- Monitoriza los créditos - Haz seguimiento del consumo de créditos de Snowflake en las consultas de integración
Seguridad
- Autenticación por par de claves - Cifrado RSA 2048 bits para el acceso a la API
- OAuth 2.0 - Autenticación basada en token con limitación por rol
- Políticas de red - IP allowlisting para los endpoints del servicio de Tajo
- Control de acceso por rol - Rol dedicado de Snowflake con los privilegios mínimos necesarios
- Transferencia de datos cifrada - TLS 1.2+ para todas las comunicaciones con la API
- Enmascaramiento de datos - Usa el enmascaramiento dinámico de Snowflake para campos sensibles