Σύνδεσμος Snowflake
Συνδέστε το Snowflake με το Brevo μέσω Tajo για συγχρονισμό τμημάτων πελατών από την αποθήκη δεδομένων σας, εμπλουτισμό προφίλ επαφών με δεδομένα ανάλυσης και τροφοδότηση εξατομικευμένων καμπανιών marketing με πληροφορίες από την αποθήκη.
Επισκόπηση
| Ιδιότητα | Τιμή |
|---|---|
| Πλατφόρμα | Snowflake |
| Κατηγορία | Αποθήκη Δεδομένων (Προσαρμοσμένο) |
| Πολυπλοκότητα Ρύθμισης | Μεσαία |
| Επίσημη Ενσωμάτωση | Όχι |
| Δεδομένα που Συγχρονίζονται | Πελάτες, Τμήματα, Αναλυτικά, Εκδηλώσεις |
| Μέθοδος Πιστοποίησης | Ζεύγος Κλειδιών / OAuth 2.0 |
Χαρακτηριστικά
- Reverse ETL - Προώθηση τμημάτων πελατών από Snowflake σε λίστες επαφών Brevo
- Συγχρονισμός κοινού - Συγχρονισμός κοινών υπολογισμένων από αποθήκη για στοχευμένες καμπάνιες
- Εμπλουτισμός αναλυτικών - Εμπλουτισμός επαφών Brevo με υπολογισμένες μετρικές (LTV, βαθμολογίες RFM)
- Ερωτήματα SQL - Χρήση Snowflake SQL REST API για εκτέλεση ερωτημάτων μέσω προγράμματος
- Προγραμματισμένος συγχρονισμός - Εκτέλεση αυτοματοποιημένων pipelines δεδομένων σε ρυθμιζόμενα διαστήματα
- Υποστήριξη πολλαπλών εντολών - Εκτέλεση σύνθετων μετασχηματισμών δεδομένων σε μεμονωμένες κλήσεις API
Προαπαιτούμενα
Πριν ξεκινήσετε, βεβαιωθείτε ότι διαθέτετε:
- Λογαριασμό Snowflake με ρόλο ACCOUNTADMIN ή SYSADMIN
- Λογαριασμό Brevo με πρόσβαση API
- Λογαριασμό Tajo με δικαιώματα συνδέσμου
- Αποκλειστική αποθήκη Snowflake για ερωτήματα ενσωμάτωσης
- Πολιτική δικτύου που επιτρέπει τις διευθύνσεις IP Tajo
Πιστοποίηση
Πιστοποίηση Ζεύγους Κλειδιών (Συνιστάται)
# Generate RSA key pairopenssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocryptopenssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Assign public key to Snowflake user# In Snowflake:# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';Πιστοποίηση OAuth 2.0
const tokenResponse = await fetch( 'https://<account>.snowflakecomputing.com/oauth/token-request', { method: 'POST', headers: { 'Content-Type': 'application/x-www-form-urlencoded' }, body: new URLSearchParams({ grant_type: 'client_credentials', client_id: process.env.SNOWFLAKE_CLIENT_ID, client_secret: process.env.SNOWFLAKE_CLIENT_SECRET, scope: 'session:role:TAJO_ROLE' }) });Πιστοποίηση SQL API
# Using JWT token with the SQL APIcurl -X POST \ 'https://<account>.snowflakecomputing.com/api/v2/statements' \ -H 'Authorization: Bearer <jwt_token>' \ -H 'Content-Type: application/json' \ -H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \ -d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'Διαμόρφωση
Βασική Ρύθμιση
connectors: snowflake: enabled: true account: "your-account.snowflakecomputing.com" warehouse: "TAJO_WH" database: "MARKETING_DB" schema: "PUBLIC" role: "TAJO_ROLE"
sync: customers: true segments: true analytics: true schedule: "0 */6 * * *" # Every 6 hours
queries: customer_segments: | SELECT email, segment_name, ltv_score, rfm_class FROM customer_segments WHERE updated_at > :last_syncΑντιστοίχιση Πεδίων
field_mapping: email: email first_name: FIRSTNAME last_name: LASTNAME ltv_score: LTV_SCORE rfm_class: RFM_SEGMENT total_orders: ORDER_COUNT last_purchase_date: LAST_ORDER_DATE predicted_churn: CHURN_RISK customer_segment: SEGMENT_NAMEEndpoints API
| Endpoint | Μέθοδος | Περιγραφή |
|---|---|---|
/api/v2/statements | POST | Υποβολή εντολών SQL για εκτέλεση |
/api/v2/statements/{statementHandle} | GET | Έλεγχος κατάστασης εκτέλεσης |
/api/v2/statements/{statementHandle}/cancel | POST | Ακύρωση εκτελούμενης εντολής |
/api/v2/statements/{statementHandle}?partition={id} | GET | Ανάκτηση τμημάτων αποτελέσματος |
Τμήματα SQL API
Το Snowflake SQL API επιστρέφει μεγάλα σύνολα αποτελεσμάτων σε τμήματα. Κάθε τμήμα περιέχει έως περίπου 12MB δεδομένων. Χρησιμοποιήστε την παράμετρο partition για επανάληψη μέσω αποτελεσμάτων.
Παραδείγματα Κώδικα
Αρχικοποίηση Συνδέσμου
import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({ apiKey: process.env.TAJO_API_KEY, brevoApiKey: process.env.BREVO_API_KEY});
await tajo.connectors.connect('snowflake', { account: process.env.SNOWFLAKE_ACCOUNT, privateKey: process.env.SNOWFLAKE_PRIVATE_KEY, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC'});Συγχρονισμός Τμημάτων Πελατών μέσω SQL API
// Execute a SQL query via Snowflake SQL REST APIconst response = await fetch( `https://${account}.snowflakecomputing.com/api/v2/statements`, { method: 'POST', headers: { 'Authorization': `Bearer ${jwtToken}`, 'Content-Type': 'application/json', 'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT' }, body: JSON.stringify({ statement: `SELECT email, segment, ltv FROM customer_segments WHERE updated_at > '${lastSync}'`, warehouse: 'TAJO_WH', database: 'MARKETING_DB', schema: 'PUBLIC', timeout: 120 }) });
const result = await response.json();const statementHandle = result.statementHandle;
// Poll for resultslet status = result.statementStatusUrl;while (result.code !== '090001') { const check = await fetch(status, { headers: { 'Authorization': `Bearer ${jwtToken}` } }); result = await check.json();}
// Sync to Brevo via Tajofor (const row of result.data) { await tajo.contacts.sync({ email: row[0], attributes: { SEGMENT: row[1], LTV: row[2] } });}Pipeline Reverse ETL
// Push computed audiences from Snowflake to Brevo listsawait tajo.connectors.sync('snowflake', { type: 'reverse-etl', query: ` SELECT email, first_name, last_name, predicted_ltv, churn_score FROM ml_predictions.customer_scores WHERE score_date = CURRENT_DATE() `, destination: { list_id: 42, attribute_mapping: { predicted_ltv: 'PREDICTED_LTV', churn_score: 'CHURN_SCORE' } }});Όρια Ρυθμού
| Πόρος | Όριο | Σημειώσεις |
|---|---|---|
| Ταυτόχρονα ερωτήματα SQL API | 20 ανά χρήστη | Ανά λογαριασμό Snowflake |
| Μέγεθος αποτελέσματος SQL API | 12MB ανά τμήμα | Σελιδοποίηση με IDs τμήματος |
| Λήξη εντολής | 172.800 δευτ. (48ώρ) | Ρυθμιζόμενο ανά ερώτημα |
| Αιτήματα API | Ποικίλει βάσει πλάνου | Βάσει έκδοσης Snowflake |
Κόστος Αποθήκης
Το Snowflake χρεώνει βάσει χρόνου υπολογισμού. Χρησιμοποιήστε αποκλειστική αποθήκη κατάλληλου μεγέθους για ερωτήματα Tajo και ορίστε αυτόματη αναστολή για ελαχιστοποίηση κόστους.
Αντιμετώπιση Προβλημάτων
| Πρόβλημα | Αιτία | Λύση |
|---|---|---|
| Αποτυχία πιστοποίησης | Ληγμένο JWT token | Αναγέννηση JWT με έγκυρη λήξη |
| Λήξη ερωτήματος | Μεγάλο σύνολο δεδομένων | Προσθέστε φίλτρα ή χρησιμοποιήστε σταδιακό συγχρονισμό |
| Σφάλμα δικτύου | Η IP δεν βρίσκεται στη whitelist | Προσθέστε IPs Tajo στην πολιτική δικτύου Snowflake |
| Λείπουν στήλες | Αλλαγή σχήματος | Ενημερώστε τη διαμόρφωση αντιστοίχησης πεδίων |
| Σφάλμα τμήματος | Το αποτέλεσμα είναι πολύ μεγάλο | Επεξεργαστείτε αποτελέσματα σε μικρότερα τμήματα |
Λειτουργία Εντοπισμού Σφαλμάτων
connectors: snowflake: debug: true log_level: verbose log_queries: trueΒέλτιστες Πρακτικές
- Χρησιμοποιήστε αποκλειστική αποθήκη - Αποφύγετε ανταγωνισμό με φόρτο εργασίας παραγωγής
- Εφαρμόστε σταδιακό συγχρονισμό - Ερωτήστε μόνο τροποποιημένες εγγραφές από τον τελευταίο συγχρονισμό
- Ορίστε αυτόματη αναστολή - Ρυθμίστε αποθήκη να αναστέλλεται μετά από 5 λεπτά αδράνειας
- Χρησιμοποιήστε πιστοποίηση ζεύγους κλειδιών - Προτιμήστε ζεύγος κλειδιών έναντι πιστοποίησης με κωδικό
- Βελτιστοποιήστε ερωτήματα - Φιλτράρετε και προβάλετε μόνο τις απαραίτητες στήλες για ταχύτερους συγχρονισμούς
- Παρακολουθήστε credits - Παρακολουθήστε κατανάλωση credit Snowflake για ερωτήματα ενσωμάτωσης
Ασφάλεια
- Πιστοποίηση ζεύγους κλειδιών - Κρυπτογράφηση RSA 2048-bit για πρόσβαση API
- OAuth 2.0 - Πιστοποίηση βάσει token με εύρος ρόλου
- Πολιτικές δικτύου - Allowlisting IP για endpoints υπηρεσίας Tajo
- Πρόσβαση βάσει ρόλου - Αποκλειστικός ρόλος Snowflake με ελάχιστα απαιτούμενα δικαιώματα
- Κρυπτογραφημένη μεταφορά δεδομένων - TLS 1.2+ για όλες τις επικοινωνίες API
- Απόκρυψη δεδομένων - Χρήση Snowflake dynamic data masking για ευαίσθητα πεδία