Коннектор Snowflake

Подключите Snowflake к Brevo через Tajo для синхронизации клиентских сегментов из хранилища данных, обогащения профилей контактов аналитическими данными и проведения персонализированных маркетинговых кампаний на основе инсайтов из хранилища.

Обзор

СвойствоЗначение
ПлатформаSnowflake
КатегорияData Warehouse (Custom)
Сложность настройкиСредняя
Официальная интеграцияНет
Синхронизируемые данныеКлиенты, сегменты, аналитика, события
Метод аутентификацииKey Pair / OAuth 2.0

Возможности

  • Reverse ETL, передача клиентских сегментов из Snowflake в листы контактов Brevo
  • Синхронизация аудиторий, синхронизация аудиторий, вычисленных в хранилище, для таргетированных кампаний
  • Обогащение аналитикой, обогащение контактов Brevo вычисленными метриками (LTV, RFM-оценки)
  • SQL-запросы, использование Snowflake SQL REST API для программного выполнения запросов
  • Синхронизация по расписанию, автоматические конвейеры данных с настраиваемыми интервалами
  • Поддержка нескольких операторов, выполнение сложных трансформаций данных в одном API-вызове

Предварительные требования

Прежде чем начать, убедитесь, что у вас есть:

  1. Аккаунт Snowflake с ролью ACCOUNTADMIN или SYSADMIN
  2. Аккаунт Brevo с доступом к API
  3. Аккаунт Tajo с правами на управление коннекторами
  4. Выделенный warehouse Snowflake для интеграционных запросов
  5. Сетевая политика, разрешающая IP-адреса Tajo

Аутентификация

Аутентификация по Key Pair (рекомендуется)

Terminal window
# Генерация пары RSA-ключей
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Назначение публичного ключа пользователю Snowflake
# В Snowflake:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

OAuth 2.0

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

Аутентификация SQL API

Terminal window
# Использование JWT-токена с SQL API
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

Конфигурация

Базовая настройка

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # Каждые 6 часов
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

Сопоставление полей

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

Эндпоинты API

ЭндпоинтМетодОписание
/api/v2/statementsPOSTОтправка SQL-операторов для выполнения
/api/v2/statements/{statementHandle}GETПроверка статуса выполнения
/api/v2/statements/{statementHandle}/cancelPOSTОтмена выполняющегося оператора
/api/v2/statements/{statementHandle}?partition={id}GETПолучение разделов результатов

Разделы SQL API

Snowflake SQL API возвращает большие наборы данных в разделах. Каждый раздел содержит около 12 МБ данных. Используйте параметр partition для перебора результатов.

Примеры кода

Инициализация коннектора

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

Синхронизация клиентских сегментов через SQL API

// Выполнение SQL-запроса через Snowflake SQL REST API
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// Опрос для получения результатов
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Синхронизация с Brevo через Tajo
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

Конвейер Reverse ETL

// Передача вычисленных аудиторий из Snowflake в листы Brevo
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

Ограничения скорости

РесурсЛимитПримечания
Параллельные запросы SQL API20 на пользователяНа аккаунт Snowflake
Размер результата SQL API12 МБ на разделПостраничная обработка по ID разделов
Таймаут оператора172 800 сек (48 ч)Настраивается для каждого запроса
API-запросыЗависит от планаПо редакции Snowflake

Стоимость warehouse

Snowflake тарифицирует по времени работы вычислительных ресурсов. Используйте выделенный warehouse подходящего размера для запросов Tajo и настройте авто-приостановку для минимизации затрат.

Устранение неполадок

ПроблемаПричинаРешение
Ошибка аутентификацииИстёкший JWT-токенПерегенерируйте JWT с корректным сроком действия
Таймаут запросаБольшой набор данныхДобавьте фильтры или используйте инкрементальную синхронизацию
Сетевая ошибкаIP не в белом спискеДобавьте IP-адреса Tajo в сетевую политику Snowflake
Отсутствующие столбцыИзменение схемыОбновите конфигурацию сопоставления полей
Ошибка разделаРезультат слишком большойОбрабатывайте результаты меньшими разделами

Режим отладки

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

Лучшие практики

  1. Используйте выделенный warehouse, избегайте конкуренции с production-нагрузками
  2. Реализуйте инкрементальную синхронизацию, запрашивайте только изменённые записи с момента последней синхронизации
  3. Настройте авто-приостановку, конфигурируйте warehouse на приостановку через 5 минут бездействия
  4. Используйте Key Pair auth, предпочтительнее аутентификации по паролю
  5. Оптимизируйте запросы, фильтруйте и проецируйте только нужные столбцы для ускорения синхронизации
  6. Мониторинг кредитов, отслеживайте потребление кредитов Snowflake для интеграционных запросов

Безопасность

  • Key Pair аутентификация, шифрование RSA 2048 бит для доступа к API
  • OAuth 2.0, аутентификация на основе токенов с ограничением роли
  • Сетевые политики, IP-разрешение для сервисных эндпоинтов Tajo
  • Доступ на основе ролей, выделенная роль Snowflake с минимально необходимыми привилегиями
  • Зашифрованная передача данных, TLS 1.2+ для всех API-коммуникаций
  • Маскировка данных, динамическая маскировка данных Snowflake для чувствительных полей

Связанные ресурсы

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
AI-ассистент

Привет! Спрашивайте меня о документации.