Snowflake Connector
เชื่อมต่อ Snowflake กับ Brevo ผ่าน Tajo เพื่อซิงค์ customer segments จาก data warehouse เพิ่มความสมบูรณ์ให้โปรไฟล์ผู้ติดต่อด้วยข้อมูล analytics และขับเคลื่อนแคมเปญการตลาดที่เป็นส่วนตัวด้วยข้อมูลเชิงลึกจาก warehouse
ภาพรวม
| คุณสมบัติ | ค่า |
|---|---|
| แพลตฟอร์ม | Snowflake |
| หมวดหมู่ | Data Warehouse (แบบกำหนดเอง) |
| ความซับซ้อนในการตั้งค่า | ปานกลาง |
| การผสานรวมอย่างเป็นทางการ | ไม่ |
| ข้อมูลที่ซิงค์ | ลูกค้า Segments Analytics เหตุการณ์ |
| วิธีการยืนยันตัวตน | Key Pair / OAuth 2.0 |
ฟีเจอร์
- Reverse ETL - ส่ง customer segments จาก Snowflake ไปยังรายการผู้ติดต่อ Brevo
- การซิงค์ audience - ซิงค์ audiences ที่คำนวณโดย warehouse สำหรับแคมเปญที่กำหนดเป้าหมาย
- การเพิ่มความสมบูรณ์ analytics - เพิ่มความสมบูรณ์ให้ผู้ติดต่อ Brevo ด้วย metrics ที่คำนวณแล้ว (LTV, RFM scores)
- SQL-based queries - ใช้ Snowflake SQL REST API เพื่อรัน queries แบบ programmatic
- การซิงค์ตามกำหนดเวลา - รัน data pipelines อัตโนมัติตามช่วงเวลาที่กำหนดได้
- รองรับหลาย statement - รัน data transformations ที่ซับซ้อนในการเรียก API เดียว
ข้อกำหนดเบื้องต้น
ก่อนเริ่มต้น ตรวจสอบให้แน่ใจว่าคุณมี:
- บัญชี Snowflake ที่มีบทบาท ACCOUNTADMIN หรือ SYSADMIN
- บัญชี Brevo ที่มีสิทธิ์เข้าถึง API
- บัญชี Tajo ที่มีสิทธิ์ connector
- Snowflake warehouse เฉพาะสำหรับ integration queries
- Network policy ที่อนุญาต IP addresses ของ Tajo
การยืนยันตัวตน
การยืนยันตัวตนด้วย Key Pair (แนะนำ)
# Generate RSA key pairopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user# In Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';การยืนยันตัวตนด้วย OAuth 2.0
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });การยืนยันตัวตน SQL API
# Using JWT token with the SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'การกำหนดค่า
การตั้งค่าพื้นฐาน
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Every 6 hours
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncการแมปฟิลด์
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEAPI Endpoints
| Endpoint | เมธอด | คำอธิบาย |
|---|---|---|
/api/v2/statements | POST | ส่ง SQL statements สำหรับการรัน |
/api/v2/statements/{statementHandle} | GET | ตรวจสอบสถานะการรัน |
/api/v2/statements/{statementHandle}/cancel | POST | ยกเลิก statement ที่กำลังรัน |
/api/v2/statements/{statementHandle}?partition={id} | GET | ดึง result partitions |
SQL API Partitions
Snowflake SQL API ส่งคืน result sets ขนาดใหญ่เป็น partitions แต่ละ partition มีข้อมูลประมาณ 12MB ใช้พารามิเตอร์ partition เพื่อวนซ้ำผ่านผลลัพธ์
ตัวอย่างโค้ด
เริ่มต้น Connector
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});ซิงค์ Customer Segments ผ่าน SQL API
// Execute a SQL query via Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Poll for resultslet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Sync to Brevo via Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Reverse ETL Pipeline
// Push computed audiences from Snowflake to Brevo listsawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});ขีดจำกัดอัตรา
| ทรัพยากร | ขีดจำกัด | หมายเหตุ |
|---|---|---|
| SQL API concurrent queries | 20 ต่อผู้ใช้ | ต่อบัญชี Snowflake |
| ขนาด SQL API result | 12MB ต่อ partition | แบ่งหน้าด้วย partition IDs |
| Statement timeout | 172,800 วินาที (48 ชั่วโมง) | กำหนดค่าได้ต่อ query |
| คำขอ API | แตกต่างตามแผน | ขึ้นอยู่กับ Snowflake edition |
ค่าใช้จ่าย Warehouse
Snowflake เรียกเก็บเงินตามเวลาคำนวณ ใช้ warehouse ขนาดที่เหมาะสมสำหรับ Tajo queries และตั้งค่า auto-suspend เพื่อลดค่าใช้จ่าย
การแก้ไขปัญหา
| ปัญหา | สาเหตุ | วิธีแก้ |
|---|---|---|
| การยืนยันตัวตนล้มเหลว | JWT token หมดอายุ | สร้าง JWT ใหม่พร้อมวันหมดอายุที่ถูกต้อง |
| Query หมดเวลา | Dataset ขนาดใหญ่ | เพิ่มตัวกรองหรือใช้ incremental sync |
| Network error | IP ไม่ได้อยู่ใน whitelist | เพิ่ม Tajo IPs ใน Snowflake network policy |
| คอลัมน์หายไป | Schema เปลี่ยนแปลง | อัปเดตการกำหนดค่าการแมปฟิลด์ |
| Partition error | ผลลัพธ์ใหญ่เกินไป | ประมวลผลผลลัพธ์เป็น partitions เล็กกว่า |
โหมด Debug
connectors: snowflake: debug: true log_level: verbose log_queries: trueแนวทางปฏิบัติที่ดีที่สุด
- ใช้ warehouse เฉพาะ - หลีกเลี่ยงการแย่งทรัพยากรกับ workloads production
- ใช้ incremental sync - Query เฉพาะ records ที่เปลี่ยนแปลงตั้งแต่ sync ครั้งล่าสุด
- ตั้งค่า auto-suspend - กำหนดค่า warehouse ให้หยุดทำงานหลังจาก 5 นาทีที่ไม่มีกิจกรรม
- ใช้การยืนยันตัวตนด้วย key pair - ต้องการ key pair มากกว่าการยืนยันตัวตนด้วยรหัสผ่าน
- ปรับแต่ง queries - กรองและ project เฉพาะคอลัมน์ที่จำเป็นสำหรับการซิงค์ที่เร็วขึ้น
- ตรวจสอบ credits - ติดตามการบริโภค Snowflake credits สำหรับ integration queries
ความปลอดภัย
- การยืนยันตัวตนด้วย key pair - การเข้ารหัส RSA 2048-bit สำหรับการเข้าถึง API
- OAuth 2.0 - การยืนยันตัวตนแบบ token พร้อม role scoping
- Network policies - IP allowlisting สำหรับ service endpoints ของ Tajo
- การควบคุมการเข้าถึงตามบทบาท - บทบาท Snowflake เฉพาะพร้อมสิทธิ์ขั้นต่ำที่จำเป็น
- การถ่ายโอนข้อมูลที่เข้ารหัส - TLS 1.2+ สำหรับการสื่อสาร API ทั้งหมด
- Data masking - ใช้ Snowflake dynamic data masking สำหรับฟิลด์ที่ละเอียดอ่อน