موصل Snowflake
اربط Snowflake بـ Brevo من خلال Tajo لمزامنة شرائح العملاء من مستودع البيانات الخاص بك، وإثراء ملفات جهات الاتصال ببيانات التحليلات، وتشغيل حملات تسويقية مخصصة مدعومة برؤى من المستودع.
نظرة عامة
| الخاصية | القيمة |
|---|---|
| المنصة | Snowflake |
| الفئة | مستودع بيانات (مخصص) |
| تعقيد الإعداد | متوسط |
| تكامل رسمي | لا |
| البيانات المُزامَنة | عملاء، شرائح، تحليلات، أحداث |
| طريقة المصادقة | زوج مفاتيح / OAuth 2.0 |
الميزات
- ETL العكسي - دفع شرائح العملاء من Snowflake إلى قوائم جهات اتصال Brevo
- مزامنة الجماهير - مزامنة الجماهير المحسوبة من المستودع للحملات المستهدفة
- إثراء التحليلات - إثراء جهات اتصال Brevo بمقاييس محسوبة (LTV، درجات RFM)
- الاستعلامات القائمة على SQL - استخدام Snowflake SQL REST API لتنفيذ الاستعلامات برمجيًا
- مزامنة مجدولة - تشغيل خطوط أنابيب بيانات آلية على فترات قابلة للتخصيص
- دعم العبارات المتعددة - تنفيذ تحويلات بيانات معقدة في استدعاءات API فردية
المتطلبات الأساسية
قبل البدء، تأكد من توفر ما يلي:
- حساب Snowflake مع دور ACCOUNTADMIN أو SYSADMIN
- حساب Brevo مع وصول إلى API
- حساب Tajo مع صلاحيات الموصلات
- مستودع Snowflake مخصص لاستعلامات التكامل
- سياسة شبكة تسمح بعناوين IP الخاصة بـ Tajo
المصادقة
مصادقة زوج المفاتيح (موصى بها)
# Generate RSA key pairopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user# In Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';مصادقة OAuth 2.0
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });مصادقة SQL API
# Using JWT token with the SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'الإعداد
الإعداد الأساسي
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Every 6 hours
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncتعيين الحقول
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEنقاط نهاية واجهة البرمجة
| نقطة النهاية | الطريقة | الوصف |
|---|---|---|
/api/v2/statements | POST | تقديم عبارات SQL للتنفيذ |
/api/v2/statements/{statementHandle} | GET | التحقق من حالة التنفيذ |
/api/v2/statements/{statementHandle}/cancel | POST | إلغاء عبارة قيد التشغيل |
/api/v2/statements/{statementHandle}?partition={id} | GET | استرجاع أقسام النتائج |
أقسام SQL API
يُعيد Snowflake SQL API مجموعات النتائج الكبيرة في أقسام. يحتوي كل قسم على ما يصل إلى 12 ميجابايت تقريبًا من البيانات. استخدم معامل القسم للتنقل عبر النتائج.
أمثلة على الكود
تهيئة الموصل
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});مزامنة شرائح العملاء عبر SQL API
// تنفيذ استعلام SQL عبر Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// استطلاع النتائجlet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// مزامنة مع Brevo عبر Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}خط أنابيب ETL العكسي
// دفع الجماهير المحسوبة من Snowflake إلى قوائم Brevoawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});حدود المعدل
| المورد | الحد | ملاحظات |
|---|---|---|
| استعلامات SQL API المتزامنة | 20 لكل مستخدم | لكل حساب Snowflake |
| حجم نتيجة SQL API | 12 ميجابايت لكل قسم | قسّم النتائج بمعرّفات الأقسام |
| مهلة العبارة | 172,800 ثانية (48 ساعة) | قابلة للتكوين لكل استعلام |
| طلبات API | تختلف حسب الخطة | استنادًا إلى إصدار Snowflake |
تكاليف المستودع
يفرض Snowflake رسومًا بناءً على وقت الحوسبة. استخدم مستودعًا مخصصًا بحجم مناسب لاستعلامات Tajo واضبط التعليق التلقائي لتقليل التكاليف.
استكشاف الأخطاء وإصلاحها
| المشكلة | السبب | الحل |
|---|---|---|
| فشلت المصادقة | انتهاء رمز JWT | أعد توليد JWT بصلاحية صالحة |
| انتهاء مهلة الاستعلام | مجموعة بيانات كبيرة | أضف عوامل تصفية أو استخدم المزامنة التزايدية |
| خطأ في الشبكة | IP غير مدرج في القائمة البيضاء | أضف عناوين IP الخاصة بـ Tajo إلى سياسة شبكة Snowflake |
| أعمدة مفقودة | تغيير في المخطط | حدّث إعدادات تعيين الحقول |
| خطأ في القسم | النتيجة كبيرة جدًا | عالج النتائج في أقسام أصغر |
وضع التصحيح
connectors: snowflake: debug: true log_level: verbose log_queries: trueأفضل الممارسات
- استخدم مستودعًا مخصصًا - تجنب التعارض مع أحمال عمل الإنتاج
- طبّق المزامنة التزايدية - استعلم فقط عن السجلات المتغيرة منذ آخر مزامنة
- اضبط التعليق التلقائي - اضبط المستودع للتعليق بعد 5 دقائق من عدم النشاط
- استخدم مصادقة زوج المفاتيح - فضّل زوج المفاتيح على مصادقة كلمة المرور
- حسّن الاستعلامات - صفِّ واعرض فقط الأعمدة اللازمة لمزامنة أسرع
- راقب الأرصدة - تتبع استهلاك أرصدة Snowflake لاستعلامات التكامل
الأمان
- مصادقة زوج المفاتيح - تشفير RSA 2048-bit للوصول إلى API
- OAuth 2.0 - مصادقة قائمة على الرموز مع تحديد نطاق الدور
- سياسات الشبكة - قائمة IP المسموح بها لنقاط نهاية خدمة Tajo
- الوصول القائم على الدور - دور Snowflake مخصص بأدنى الصلاحيات المطلوبة
- نقل البيانات المشفرة - TLS 1.2+ لجميع اتصالات API
- إخفاء البيانات - استخدم إخفاء البيانات الديناميكي في Snowflake للحقول الحساسة