Коннектор Snowflake
Подключите Snowflake к Brevo через Tajo для синхронизации клиентских сегментов из хранилища данных, обогащения профилей контактов аналитическими данными и проведения персонализированных маркетинговых кампаний на основе инсайтов из хранилища.
Обзор
| Свойство | Значение |
|---|---|
| Платформа | Snowflake |
| Категория | Data Warehouse (Custom) |
| Сложность настройки | Средняя |
| Официальная интеграция | Нет |
| Синхронизируемые данные | Клиенты, сегменты, аналитика, события |
| Метод аутентификации | Key Pair / OAuth 2.0 |
Возможности
- Reverse ETL, передача клиентских сегментов из Snowflake в листы контактов Brevo
- Синхронизация аудиторий, синхронизация аудиторий, вычисленных в хранилище, для таргетированных кампаний
- Обогащение аналитикой, обогащение контактов Brevo вычисленными метриками (LTV, RFM-оценки)
- SQL-запросы, использование Snowflake SQL REST API для программного выполнения запросов
- Синхронизация по расписанию, автоматические конвейеры данных с настраиваемыми интервалами
- Поддержка нескольких операторов, выполнение сложных трансформаций данных в одном API-вызове
Предварительные требования
Прежде чем начать, убедитесь, что у вас есть:
- Аккаунт Snowflake с ролью ACCOUNTADMIN или SYSADMIN
- Аккаунт Brevo с доступом к API
- Аккаунт Tajo с правами на управление коннекторами
- Выделенный warehouse Snowflake для интеграционных запросов
- Сетевая политика, разрешающая IP-адреса Tajo
Аутентификация
Аутентификация по Key Pair (рекомендуется)
# Генерация пары RSA-ключейopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Назначение публичного ключа пользователю Snowflake# В Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';OAuth 2.0
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });Аутентификация SQL API
# Использование JWT-токена с SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'Конфигурация
Базовая настройка
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Каждые 6 часов
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncСопоставление полей
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEЭндпоинты API
| Эндпоинт | Метод | Описание |
|---|---|---|
/api/v2/statements | POST | Отправка SQL-операторов для выполнения |
/api/v2/statements/{statementHandle} | GET | Проверка статуса выполнения |
/api/v2/statements/{statementHandle}/cancel | POST | Отмена выполняющегося оператора |
/api/v2/statements/{statementHandle}?partition={id} | GET | Получение разделов результатов |
Разделы SQL API
Snowflake SQL API возвращает большие наборы данных в разделах. Каждый раздел содержит около 12 МБ данных. Используйте параметр partition для перебора результатов.
Примеры кода
Инициализация коннектора
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});Синхронизация клиентских сегментов через SQL API
// Выполнение SQL-запроса через Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Опрос для получения результатовlet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Синхронизация с Brevo через Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Конвейер Reverse ETL
// Передача вычисленных аудиторий из Snowflake в листы Brevoawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Ограничения скорости
| Ресурс | Лимит | Примечания |
|---|---|---|
| Параллельные запросы SQL API | 20 на пользователя | На аккаунт Snowflake |
| Размер результата SQL API | 12 МБ на раздел | Постраничная обработка по ID разделов |
| Таймаут оператора | 172 800 сек (48 ч) | Настраивается для каждого запроса |
| API-запросы | Зависит от плана | По редакции Snowflake |
Стоимость warehouse
Snowflake тарифицирует по времени работы вычислительных ресурсов. Используйте выделенный warehouse подходящего размера для запросов Tajo и настройте авто-приостановку для минимизации затрат.
Устранение неполадок
| Проблема | Причина | Решение |
|---|---|---|
| Ошибка аутентификации | Истёкший JWT-токен | Перегенерируйте JWT с корректным сроком действия |
| Таймаут запроса | Большой набор данных | Добавьте фильтры или используйте инкрементальную синхронизацию |
| Сетевая ошибка | IP не в белом списке | Добавьте IP-адреса Tajo в сетевую политику Snowflake |
| Отсутствующие столбцы | Изменение схемы | Обновите конфигурацию сопоставления полей |
| Ошибка раздела | Результат слишком большой | Обрабатывайте результаты меньшими разделами |
Режим отладки
connectors: snowflake: debug: true log_level: verbose log_queries: trueЛучшие практики
- Используйте выделенный warehouse, избегайте конкуренции с production-нагрузками
- Реализуйте инкрементальную синхронизацию, запрашивайте только изменённые записи с момента последней синхронизации
- Настройте авто-приостановку, конфигурируйте warehouse на приостановку через 5 минут бездействия
- Используйте Key Pair auth, предпочтительнее аутентификации по паролю
- Оптимизируйте запросы, фильтруйте и проецируйте только нужные столбцы для ускорения синхронизации
- Мониторинг кредитов, отслеживайте потребление кредитов Snowflake для интеграционных запросов
Безопасность
- Key Pair аутентификация, шифрование RSA 2048 бит для доступа к API
- OAuth 2.0, аутентификация на основе токенов с ограничением роли
- Сетевые политики, IP-разрешение для сервисных эндпоинтов Tajo
- Доступ на основе ролей, выделенная роль Snowflake с минимально необходимыми привилегиями
- Зашифрованная передача данных, TLS 1.2+ для всех API-коммуникаций
- Маскировка данных, динамическая маскировка данных Snowflake для чувствительных полей