Snowflake कनेक्टर

अपने data warehouse से customer segments sync करने, analytics data के साथ contact profiles को समृद्ध करने, और warehouse-driven insights के साथ personalized marketing campaigns चलाने हेतु Tajo के माध्यम से Snowflake को Brevo से कनेक्ट करें।

अवलोकन

PropertyValue
PlatformSnowflake
CategoryData Warehouse (Custom)
Setup ComplexityMedium
Official IntegrationNo
Data SyncedCustomers, Segments, Analytics, Events
Auth MethodKey Pair / OAuth 2.0

विशेषताएं

  • Reverse ETL - Snowflake से customer segments को Brevo contact lists में push करें
  • Audience sync - targeted campaigns के लिए warehouse-computed audiences sync करें
  • Analytics enrichment - computed metrics (LTV, RFM scores) के साथ Brevo contacts को समृद्ध करें
  • SQL-आधारित queries - queries को programmatically execute करने के लिए Snowflake SQL REST API का उपयोग करें
  • Scheduled sync - configurable intervals पर automated data pipelines चलाएं
  • Multi-statement support - एकल API calls में जटिल data transformations execute करें

पूर्वावश्यकताएं

शुरू करने से पहले, सुनिश्चित करें कि आपके पास हैं:

  1. ACCOUNTADMIN या SYSADMIN role वाला एक Snowflake account
  2. API access वाला एक Brevo account
  3. connector permissions वाला एक Tajo account
  4. integration queries के लिए एक समर्पित Snowflake warehouse
  5. Tajo IP addresses की अनुमति देने वाली Network policy

प्रमाणीकरण

Key Pair Authentication (अनुशंसित)

Terminal window
# RSA key pair generate करें
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Snowflake user को public key assign करें
# Snowflake में:
# ALTER USER tajo_integration SET RSA_PUBLIC_KEY='MII...';

OAuth 2.0 Authentication

const tokenResponse = await fetch(
'https://<account>.snowflakecomputing.com/oauth/token-request',
{
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'client_credentials',
client_id: process.env.SNOWFLAKE_CLIENT_ID,
client_secret: process.env.SNOWFLAKE_CLIENT_SECRET,
scope: 'session:role:TAJO_ROLE'
})
}
);

SQL API Authentication

Terminal window
# SQL API के साथ JWT token का उपयोग
curl -X POST \
'https://<account>.snowflakecomputing.com/api/v2/statements' \
-H 'Authorization: Bearer <jwt_token>' \
-H 'Content-Type: application/json' \
-H 'X-Snowflake-Authorization-Token-Type: KEYPAIR_JWT' \
-d '{"statement": "SELECT * FROM customers LIMIT 10", "warehouse": "TAJO_WH"}'

कॉन्फ़िगरेशन

बेसिक सेटअप

connectors:
snowflake:
enabled: true
account: "your-account.snowflakecomputing.com"
warehouse: "TAJO_WH"
database: "MARKETING_DB"
schema: "PUBLIC"
role: "TAJO_ROLE"
sync:
customers: true
segments: true
analytics: true
schedule: "0 */6 * * *" # हर 6 घंटे में
queries:
customer_segments: |
SELECT email, segment_name, ltv_score, rfm_class
FROM customer_segments
WHERE updated_at > :last_sync

Field Mapping

field_mapping:
email: email
first_name: FIRSTNAME
last_name: LASTNAME
ltv_score: LTV_SCORE
rfm_class: RFM_SEGMENT
total_orders: ORDER_COUNT
last_purchase_date: LAST_ORDER_DATE
predicted_churn: CHURN_RISK
customer_segment: SEGMENT_NAME

API Endpoints

EndpointMethodविवरण
/api/v2/statementsPOSTexecution के लिए SQL statements submit करें
/api/v2/statements/{statementHandle}GETexecution status जांचें
/api/v2/statements/{statementHandle}/cancelPOSTrunning statement cancel करें
/api/v2/statements/{statementHandle}?partition={id}GETresult partitions पुनः प्राप्त करें

SQL API Partitions

Snowflake SQL API बड़े result sets को partitions में लौटाता है। प्रत्येक partition में लगभग 12MB तक data होता है। results के माध्यम से iterate करने के लिए partition parameter का उपयोग करें।

कोड उदाहरण

कनेक्टर शुरू करें

import { TajoClient } from '@tajo/sdk';
const tajo = new TajoClient({
apiKey: process.env.TAJO_API_KEY,
brevoApiKey: process.env.BREVO_API_KEY
});
await tajo.connectors.connect('snowflake', {
account: process.env.SNOWFLAKE_ACCOUNT,
privateKey: process.env.SNOWFLAKE_PRIVATE_KEY,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC'
});

SQL API के माध्यम से Customer Segments Sync करें

// Snowflake SQL REST API के माध्यम से SQL query execute करें
const response = await fetch(
`https://${account}.snowflakecomputing.com/api/v2/statements`,
{
method: 'POST',
headers: {
'Authorization': `Bearer ${jwtToken}`,
'Content-Type': 'application/json',
'X-Snowflake-Authorization-Token-Type': 'KEYPAIR_JWT'
},
body: JSON.stringify({
statement: `SELECT email, segment, ltv FROM customer_segments
WHERE updated_at > '${lastSync}'`,
warehouse: 'TAJO_WH',
database: 'MARKETING_DB',
schema: 'PUBLIC',
timeout: 120
})
}
);
const result = await response.json();
const statementHandle = result.statementHandle;
// results के लिए poll करें
let status = result.statementStatusUrl;
while (result.code !== '090001') {
const check = await fetch(status, {
headers: { 'Authorization': `Bearer ${jwtToken}` }
});
result = await check.json();
}
// Tajo के माध्यम से Brevo से sync करें
for (const row of result.data) {
await tajo.contacts.sync({
email: row[0],
attributes: { SEGMENT: row[1], LTV: row[2] }
});
}

Reverse ETL Pipeline

// computed audiences को Snowflake से Brevo lists में push करें
await tajo.connectors.sync('snowflake', {
type: 'reverse-etl',
query: `
SELECT email, first_name, last_name, predicted_ltv, churn_score
FROM ml_predictions.customer_scores
WHERE score_date = CURRENT_DATE()
`,
destination: {
list_id: 42,
attribute_mapping: {
predicted_ltv: 'PREDICTED_LTV',
churn_score: 'CHURN_SCORE'
}
}
});

Rate Limits

संसाधनLimitनोट्स
SQL API concurrent queriesप्रति user 20प्रति Snowflake account
SQL API result sizeप्रति partition 12MBpartition IDs के साथ paginate करें
Statement timeout172,800 सेकंड (48h)प्रति query configurable
API requestsPlan के अनुसार भिन्नSnowflake edition पर आधारित

Warehouse Costs

Snowflake compute time के आधार पर charge करता है। Tajo queries के लिए एक समर्पित, उपयुक्त आकार के warehouse का उपयोग करें और लागत को न्यूनतम करने के लिए auto-suspend सेट करें।

समस्या निवारण

समस्याकारणसमाधान
Authentication failedExpired JWT tokenvalid expiration के साथ JWT पुनः generate करें
Query timeoutबड़ा datasetfilters जोड़ें या incremental sync का उपयोग करें
Network errorIP whitelisted नहींSnowflake network policy में Tajo IPs जोड़ें
गायब columnsSchema changefield mapping configuration अपडेट करें
Partition errorResult बहुत बड़ाछोटे partitions में results process करें

Debug Mode

connectors:
snowflake:
debug: true
log_level: verbose
log_queries: true

सर्वोत्तम प्रथाएं

  1. एक समर्पित warehouse का उपयोग करें - production workloads के साथ contention से बचें
  2. Incremental sync लागू करें - केवल last sync के बाद से बदले गए records क्वेरी करें
  3. Auto-suspend सेट करें - warehouse को 5 मिनट की inactivity के बाद suspend होने के लिए कॉन्फ़िगर करें
  4. Key pair auth का उपयोग करें - password authentication की तुलना में key pair को प्राथमिकता दें
  5. Queries को अनुकूलित करें - तेज syncs के लिए केवल आवश्यक columns को filter और project करें
  6. Credits monitor करें - integration queries के लिए Snowflake credit consumption ट्रैक करें

सुरक्षा

  • Key pair authentication - API access के लिए RSA 2048-bit encryption
  • OAuth 2.0 - role scoping के साथ Token-आधारित authentication
  • Network policies - Tajo service endpoints के लिए IP allowlisting
  • Role-आधारित access - न्यूनतम आवश्यक privileges के साथ समर्पित Snowflake role
  • Encrypted data transfer - सभी API communications के लिए TLS 1.2+
  • Data masking - sensitive fields के लिए Snowflake dynamic data masking का उपयोग करें

संबंधित संसाधन

Subscribe to updates

developer-docs

Drop your email or phone number — we'll send you what matters next.

auto-detect
AI सहायक

नमस्ते! डॉक्यूमेंटेशन के बारे में कुछ भी पूछें।