Snowflake コネクタ

Tajo を介して Snowflake を Brevo に接続し、データウェアハウスから顧客セグメントを同期し、分析データで連絡先プロファイルを強化し、ウェアハウス駆動のインサイトでパーソナライズされたマーケティングキャンペーンを強化します。

概要

プロパティ
プラットフォームSnowflake
カテゴリデータウェアハウス(カスタム)
セットアップの複雑さ
公式統合いいえ
同期データ顧客、セグメント、分析、イベント
認証方式キーペア / OAuth 2.0

機能

  • リバース ETL - Snowflake から Brevo 連絡先リストに顧客セグメントをプッシュ
  • オーディエンス同期 - ターゲットキャンペーン用にウェアハウスで計算されたオーディエンスを同期
  • 分析エンリッチメント - 計算されたメトリクス(LTV、RFM スコア)で Brevo 連絡先を強化
  • SQL ベースのクエリ - Snowflake SQL REST API を使用してプログラムでクエリを実行
  • スケジュールされた同期 - 設定可能な間隔で自動データパイプラインを実行
  • マルチステートメントサポート - 単一の API 呼び出しで複雑なデータ変換を実行

前提条件

開始する前に、以下を準備してください。

  1. ACCOUNTADMIN または SYSADMIN ロールを持つ Snowflake アカウント
  2. API アクセス可能な Brevo アカウント
  3. コネクタ権限を持つ Tajo アカウント
  4. 統合クエリ用の専用 Snowflake ウェアハウス
  5. Tajo の IP アドレスを許可するネットワークポリシー

認証

キーペア認証(推奨)

Terminal window
# RSA キーペアを生成
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# 公開鍵を Snowflake ユーザーに割り当て
# Snowflake で:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

OAuth 2.0 認証

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

SQL API 認証

Terminal window
# SQL API で JWT トークンを使用
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

設定

基本セットアップ

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # 6 時間ごと
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

フィールドマッピング

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

API エンドポイント

エンドポイントメソッド説明
/api/v2/statementsPOST実行用 SQL ステートメントを送信
/api/v2/statements/{statementHandle}GET実行ステータスを確認
/api/v2/statements/{statementHandle}/cancelPOST実行中のステートメントをキャンセル
/api/v2/statements/{statementHandle}?partition={id}GET結果パーティションを取得

SQL API パーティション

Snowflake SQL API は大きな結果セットをパーティションで返します。各パーティションには最大約 12MB のデータが含まれます。結果を反復処理するには partition パラメータを使用してください。

コード例

コネクタの初期化

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

SQL API を介して顧客セグメントを同期

// Snowflake SQL REST API で SQL クエリを実行
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// 結果をポーリング
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Tajo を介して Brevo に同期
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

リバース ETL パイプライン

// Snowflake から計算されたオーディエンスを Brevo リストにプッシュ
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

レート制限

リソース制限備考
SQL API 同時クエリユーザーあたり 20Snowflake アカウント単位
SQL API 結果サイズパーティションあたり 12MBパーティション ID でページネーション
ステートメントタイムアウト172,800 秒(48 時間)クエリごとに設定可能
API リクエストプランにより異なるSnowflake エディションに基づく

ウェアハウスコスト

Snowflake はコンピュート時間に基づいて課金します。Tajo クエリには専用で適切にサイジングされたウェアハウスを使用し、自動サスペンドを設定してコストを最小化してください。

トラブルシューティング

問題原因解決策
認証失敗JWT トークンの期限切れ有効な有効期限で JWT を再生成
クエリタイムアウト大きなデータセットフィルタを追加するか増分同期を使用
ネットワークエラーIP が許可リストにないTajo の IP を Snowflake ネットワークポリシーに追加
列の欠落スキーマ変更フィールドマッピング設定を更新
パーティションエラー結果が大きすぎるより小さなパーティションで結果を処理

デバッグモード

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

ベストプラクティス

  1. 専用ウェアハウスを使用する - 本番ワークロードとの競合を避ける
  2. 増分同期を実装する - 前回の同期以降に変更されたレコードのみをクエリ
  3. 自動サスペンドを設定する - 5 分間の非アクティブ後にサスペンドするようウェアハウスを設定
  4. キーペア認証を使用する - パスワード認証よりもキーペアを優先
  5. クエリを最適化する - より高速な同期のため必要な列のみをフィルタおよびプロジェクト
  6. クレジットを監視する - 統合クエリの Snowflake クレジット消費をトラッキング

セキュリティ

  • キーペア認証 - API アクセス用 RSA 2048 ビット暗号化
  • OAuth 2.0 - ロールスコーピング付きトークンベース認証
  • ネットワークポリシー - Tajo サービスエンドポイントの IP 許可リスト
  • ロールベースアクセス - 最小限の必要な権限を持つ専用 Snowflake ロール
  • 暗号化されたデータ転送 - すべての API 通信に TLS 1.2 以上
  • データマスキング - 機密フィールドに Snowflake 動的データマスキングを使用

関連リソース

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
AIアシスタント

こんにちは!ドキュメントについて何でもお聞きください。