Snowflake コネクタ
Tajo を介して Snowflake を Brevo に接続し、データウェアハウスから顧客セグメントを同期し、分析データで連絡先プロファイルを強化し、ウェアハウス駆動のインサイトでパーソナライズされたマーケティングキャンペーンを強化します。
概要
| プロパティ | 値 |
|---|---|
| プラットフォーム | Snowflake |
| カテゴリ | データウェアハウス(カスタム) |
| セットアップの複雑さ | 中 |
| 公式統合 | いいえ |
| 同期データ | 顧客、セグメント、分析、イベント |
| 認証方式 | キーペア / OAuth 2.0 |
機能
- リバース ETL - Snowflake から Brevo 連絡先リストに顧客セグメントをプッシュ
- オーディエンス同期 - ターゲットキャンペーン用にウェアハウスで計算されたオーディエンスを同期
- 分析エンリッチメント - 計算されたメトリクス(LTV、RFM スコア)で Brevo 連絡先を強化
- SQL ベースのクエリ - Snowflake SQL REST API を使用してプログラムでクエリを実行
- スケジュールされた同期 - 設定可能な間隔で自動データパイプラインを実行
- マルチステートメントサポート - 単一の API 呼び出しで複雑なデータ変換を実行
前提条件
開始する前に、以下を準備してください。
- ACCOUNTADMIN または SYSADMIN ロールを持つ Snowflake アカウント
- API アクセス可能な Brevo アカウント
- コネクタ権限を持つ Tajo アカウント
- 統合クエリ用の専用 Snowflake ウェアハウス
- Tajo の IP アドレスを許可するネットワークポリシー
認証
キーペア認証(推奨)
# RSA キーペアを生成openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# 公開鍵を Snowflake ユーザーに割り当て# Snowflake で:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';OAuth 2.0 認証
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });SQL API 認証
# SQL API で JWT トークンを使用curl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'設定
基本セットアップ
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # 6 時間ごと
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncフィールドマッピング
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEAPI エンドポイント
| エンドポイント | メソッド | 説明 |
|---|---|---|
/api/v2/statements | POST | 実行用 SQL ステートメントを送信 |
/api/v2/statements/{statementHandle} | GET | 実行ステータスを確認 |
/api/v2/statements/{statementHandle}/cancel | POST | 実行中のステートメントをキャンセル |
/api/v2/statements/{statementHandle}?partition={id} | GET | 結果パーティションを取得 |
SQL API パーティション
Snowflake SQL API は大きな結果セットをパーティションで返します。各パーティションには最大約 12MB のデータが含まれます。結果を反復処理するには partition パラメータを使用してください。
コード例
コネクタの初期化
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});SQL API を介して顧客セグメントを同期
// Snowflake SQL REST API で SQL クエリを実行const response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// 結果をポーリングlet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Tajo を介して Brevo に同期for (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}リバース ETL パイプライン
// Snowflake から計算されたオーディエンスを Brevo リストにプッシュawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});レート制限
| リソース | 制限 | 備考 |
|---|---|---|
| SQL API 同時クエリ | ユーザーあたり 20 | Snowflake アカウント単位 |
| SQL API 結果サイズ | パーティションあたり 12MB | パーティション ID でページネーション |
| ステートメントタイムアウト | 172,800 秒(48 時間) | クエリごとに設定可能 |
| API リクエスト | プランにより異なる | Snowflake エディションに基づく |
ウェアハウスコスト
Snowflake はコンピュート時間に基づいて課金します。Tajo クエリには専用で適切にサイジングされたウェアハウスを使用し、自動サスペンドを設定してコストを最小化してください。
トラブルシューティング
| 問題 | 原因 | 解決策 |
|---|---|---|
| 認証失敗 | JWT トークンの期限切れ | 有効な有効期限で JWT を再生成 |
| クエリタイムアウト | 大きなデータセット | フィルタを追加するか増分同期を使用 |
| ネットワークエラー | IP が許可リストにない | Tajo の IP を Snowflake ネットワークポリシーに追加 |
| 列の欠落 | スキーマ変更 | フィールドマッピング設定を更新 |
| パーティションエラー | 結果が大きすぎる | より小さなパーティションで結果を処理 |
デバッグモード
connectors: snowflake: debug: true log_level: verbose log_queries: trueベストプラクティス
- 専用ウェアハウスを使用する - 本番ワークロードとの競合を避ける
- 増分同期を実装する - 前回の同期以降に変更されたレコードのみをクエリ
- 自動サスペンドを設定する - 5 分間の非アクティブ後にサスペンドするようウェアハウスを設定
- キーペア認証を使用する - パスワード認証よりもキーペアを優先
- クエリを最適化する - より高速な同期のため必要な列のみをフィルタおよびプロジェクト
- クレジットを監視する - 統合クエリの Snowflake クレジット消費をトラッキング
セキュリティ
- キーペア認証 - API アクセス用 RSA 2048 ビット暗号化
- OAuth 2.0 - ロールスコーピング付きトークンベース認証
- ネットワークポリシー - Tajo サービスエンドポイントの IP 許可リスト
- ロールベースアクセス - 最小限の必要な権限を持つ専用 Snowflake ロール
- 暗号化されたデータ転送 - すべての API 通信に TLS 1.2 以上
- データマスキング - 機密フィールドに Snowflake 動的データマスキングを使用