Webhook and Event Systems
Event Subscription
Universal Event Manager
Event Subscription Framework
class EventManager {
constructor() {
this.subscriptions = new Map();
this.eventQueue = new EventQueue();
this.webhookManager = new WebhookManager();
this.filters = new EventFilterManager();
}
async subscribe(subscriptionConfig) {
const {
id,
name,
events,
filters,
webhook_url,
retry_config,
user_id
} = subscriptionConfig;
// Validate subscription
this.validateSubscription(subscriptionConfig);
const subscription = {
id: id || this.generateSubscriptionId(),
name: name,
user_id: user_id,
events: events,
filters: filters,
webhook_url: webhook_url,
retry_config: retry_config || this.getDefaultRetryConfig(),
created_at: Date.now(),
status: 'active',
stats: {
events_sent: 0,
events_failed: 0,
last_event: null
}
};
this.subscriptions.set(subscription.id, subscription);
// Set up event listeners for each event type
for (const eventType of events) {
await this.setupEventListener(eventType, subscription);
}
return subscription;
}
async setupEventListener(eventType, subscription) {
switch (eventType) {
case 'token_transfer':
return this.setupTokenTransferListener(subscription);
case 'price_change':
return this.setupPriceChangeListener(subscription);
case 'security_alert':
return this.setupSecurityAlertListener(subscription);
case 'social_sentiment':
return this.setupSentimentListener(subscription);
default:
throw new Error(`Unsupported event type: ${eventType}`);
}
}
}
Event Types and Schemas
Event Schema Definitions
const EventSchemas = {
token_transfer: {
type: 'object',
properties: {
event_type: { type: 'string', enum: ['token_transfer'] },
token_address: { type: 'string' },
chain: { type: 'string' },
from_address: { type: 'string' },
to_address: { type: 'string' },
amount: { type: 'string' },
amount_usd: { type: 'number' },
transaction_hash: { type: 'string' },
block_number: { type: 'number' },
timestamp: { type: 'number' },
gas_used: { type: 'number' },
gas_price: { type: 'string' }
},
required: ['event_type', 'token_address', 'chain', 'from_address', 'to_address', 'amount']
},
price_change: {
type: 'object',
properties: {
event_type: { type: 'string', enum: ['price_change'] },
token_address: { type: 'string' },
chain: { type: 'string' },
old_price: { type: 'number' },
new_price: { type: 'number' },
change_percentage: { type: 'number' },
volume_24h: { type: 'number' },
market_cap: { type: 'number' },
timestamp: { type: 'number' },
trigger_reason: { type: 'string' }
},
required: ['event_type', 'token_address', 'old_price', 'new_price', 'change_percentage']
},
security_alert: {
type: 'object',
properties: {
event_type: { type: 'string', enum: ['security_alert'] },
alert_type: { type: 'string', enum: ['honeypot', 'rug_pull', 'suspicious_activity', 'contract_vulnerability'] },
token_address: { type: 'string' },
chain: { type: 'string' },
severity: { type: 'string', enum: ['low', 'medium', 'high', 'critical'] },
description: { type: 'string' },
evidence: { type: 'array' },
risk_score: { type: 'number', minimum: 0, maximum: 100 },
timestamp: { type: 'number' },
recommended_action: { type: 'string' }
},
required: ['event_type', 'alert_type', 'token_address', 'severity', 'description']
}
};
Real-Time Notifications
Notification Delivery System
Multi-Channel Notification Manager
class NotificationManager {
constructor() {
this.channels = {
webhook: new WebhookChannel(),
email: new EmailChannel(),
slack: new SlackChannel(),
discord: new DiscordChannel(),
telegram: new TelegramChannel(),
push: new PushNotificationChannel()
};
this.messageQueue = new MessageQueue();
this.retryManager = new RetryManager();
}
async sendNotification(notification) {
const {
user_id,
channels,
content,
priority,
template,
metadata
} = notification;
// Get user preferences
const userPreferences = await this.getUserPreferences(user_id);
// Determine delivery channels
const deliveryChannels = this.resolveDeliveryChannels(channels, userPreferences, priority);
// Format content for each channel
const deliveryPromises = deliveryChannels.map(async (channel) => {
try {
const formattedContent = await this.formatContent(content, channel, template);
return await this.channels[channel].send(user_id, formattedContent, metadata);
} catch (error) {
return { channel, error: error.message, success: false };
}
});
const results = await Promise.all(deliveryPromises);
// Handle failures with retry logic
const failures = results.filter(result => !result.success);
if (failures.length > 0) {
await this.retryManager.scheduleRetries(notification, failures);
}
return {
notification_id: notification.id,
delivery_results: results,
success_rate: (results.length - failures.length) / results.length
};
}
async formatContent(content, channel, template) {
const formatter = this.getFormatter(channel);
switch (template) {
case 'security_alert':
return formatter.formatSecurityAlert(content);
case 'price_alert':
return formatter.formatPriceAlert(content);
case 'portfolio_update':
return formatter.formatPortfolioUpdate(content);
default:
return formatter.formatGeneric(content);
}
}
}
Real-Time Alert Processing
Alert Processing Pipeline
class AlertProcessor {
constructor() {
this.alertRules = new Map();
this.cooldownManager = new CooldownManager();
this.aggregator = new AlertAggregator();
}
async processAlert(rawAlert) {
// Validate alert data
const validatedAlert = await this.validateAlert(rawAlert);
// Check alert rules and conditions
const triggeredRules = await this.evaluateRules(validatedAlert);
if (triggeredRules.length === 0) {
return { processed: false, reason: 'No rules triggered' };
}
// Check cooldown periods
const allowedRules = await this.filterByCooldown(triggeredRules, validatedAlert);
if (allowedRules.length === 0) {
return { processed: false, reason: 'All rules in cooldown' };
}
// Aggregate similar alerts
const aggregatedAlert = await this.aggregator.aggregate(validatedAlert, allowedRules);
// Process notifications for each rule
const notificationPromises = allowedRules.map(rule =>
this.createNotification(aggregatedAlert, rule)
);
const notifications = await Promise.all(notificationPromises);
// Update cooldown timers
await this.cooldownManager.updateCooldowns(allowedRules, validatedAlert);
return {
processed: true,
alert_id: validatedAlert.id,
notifications: notifications,
rules_triggered: allowedRules.length
};
}
async evaluateRules(alert) {
const triggeredRules = [];
for (const [ruleId, rule] of this.alertRules) {
try {
const conditionsMet = await this.evaluateConditions(alert, rule.conditions);
if (conditionsMet) {
triggeredRules.push({
...rule,
id: ruleId,
triggered_at: Date.now()
});
}
} catch (error) {
console.error(`Rule evaluation failed for ${ruleId}: ${error.message}`);
}
}
return triggeredRules;
}
}
Alert Configuration
Dynamic Alert Rules
Rule Configuration System
class AlertRuleManager {
constructor() {
this.rules = new Map();
this.ruleValidator = new RuleValidator();
this.conditionEvaluator = new ConditionEvaluator();
}
async createRule(ruleConfig) {
const {
name,
description,
conditions,
actions,
cooldown,
priority,
user_id,
tokens,
chains
} = ruleConfig;
// Validate rule configuration
await this.ruleValidator.validate(ruleConfig);
const rule = {
id: this.generateRuleId(),
name: name,
description: description,
conditions: this.parseConditions(conditions),
actions: this.parseActions(actions),
cooldown: cooldown || 300, // 5 minutes default
priority: priority || 'medium',
user_id: user_id,
tokens: tokens || [],
chains: chains || [],
created_at: Date.now(),
updated_at: Date.now(),
status: 'active',
stats: {
triggered_count: 0,
last_triggered: null
}
};
this.rules.set(rule.id, rule);
return rule;
}
parseConditions(conditions) {
return conditions.map(condition => ({
type: condition.type,
field: condition.field,
operator: condition.operator,
value: condition.value,
timeframe: condition.timeframe,
aggregation: condition.aggregation
}));
}
// Example rule conditions
getExampleRules() {
return {
priceDropAlert: {
name: "Price Drop Alert",
conditions: [
{
type: "price_change",
field: "change_percentage",
operator: "less_than",
value: -10,
timeframe: "1h"
}
],
actions: [
{
type: "webhook",
url: "https://api.example.com/webhook",
template: "price_alert"
},
{
type: "email",
template: "price_drop_notification"
}
]
},
liquidityAlert: {
name: "Low Liquidity Warning",
conditions: [
{
type: "liquidity_change",
field: "liquidity_usd",
operator: "less_than",
value: 10000
},
{
type: "volume_change",
field: "volume_24h",
operator: "greater_than",
value: 100000,
timeframe: "24h"
}
],
actions: [
{
type: "webhook",
url: "https://api.example.com/urgent",
template: "liquidity_warning"
}
]
},
securityAlert: {
name: "Security Risk Detection",
conditions: [
{
type: "security_score",
field: "score",
operator: "less_than",
value: 30
}
],
actions: [
{
type: "webhook",
url: "https://api.example.com/security",
template: "security_alert",
priority: "high"
},
{
type: "slack",
channel: "#security-alerts",
template: "security_notification"
}
]
}
};
}
}
Callback Management
Webhook Delivery System
Reliable Webhook Delivery
class WebhookManager {
constructor() {
this.deliveryQueue = new PriorityQueue();
this.retryScheduler = new RetryScheduler();
this.circuitBreaker = new CircuitBreaker();
this.rateLimiter = new RateLimiter();
}
async deliverWebhook(webhook) {
const {
url,
payload,
headers,
timeout,
retry_config,
user_id,
signature_secret
} = webhook;
// Check rate limits
await this.rateLimiter.checkLimit(user_id);
// Check circuit breaker
if (this.circuitBreaker.isOpen(url)) {
throw new Error(`Circuit breaker open for ${url}`);
}
try {
// Sign payload if secret provided
const signedHeaders = signature_secret
? this.signPayload(payload, signature_secret, headers)
: headers;
// Make HTTP request
const response = await this.makeRequest({
url: url,
method: 'POST',
headers: signedHeaders,
body: JSON.stringify(payload),
timeout: timeout || 30000
});
// Record success
this.circuitBreaker.recordSuccess(url);
return {
success: true,
status_code: response.status,
response_time: response.responseTime,
webhook_id: webhook.id
};
} catch (error) {
// Record failure
this.circuitBreaker.recordFailure(url);
// Schedule retry if configured
if (retry_config && retry_config.enabled) {
await this.retryScheduler.schedule(webhook, error);
}
throw error;
}
}
signPayload(payload, secret, headers = {}) {
const timestamp = Date.now();
const signature = this.generateSignature(payload, secret, timestamp);
return {
...headers,
'X-Kaizen-Timestamp': timestamp.toString(),
'X-Kaizen-Signature': signature,
'Content-Type': 'application/json'
};
}
generateSignature(payload, secret, timestamp) {
const crypto = require('crypto');
const message = `${timestamp}.${JSON.stringify(payload)}`;
const signature = crypto
.createHmac('sha256', secret)
.update(message)
.digest('hex');
return `v1=${signature}`;
}
async verifyWebhookSignature(payload, signature, secret, timestamp) {
const currentTime = Date.now();
const timestampMs = parseInt(timestamp) * 1000;
// Check timestamp tolerance (5 minutes)
if (Math.abs(currentTime - timestampMs) > 300000) {
throw new Error('Request timestamp too old');
}
const expectedSignature = this.generateSignature(payload, secret, timestamp);
if (signature !== expectedSignature) {
throw new Error('Invalid signature');
}
return true;
}
}
Error Handling
Comprehensive Error Management
Error Handling Framework
class ErrorHandler {
constructor() {
this.errorCategories = {
NETWORK_ERROR: 'network',
AUTHENTICATION_ERROR: 'auth',
RATE_LIMIT_ERROR: 'rate_limit',
VALIDATION_ERROR: 'validation',
PROCESSING_ERROR: 'processing',
EXTERNAL_API_ERROR: 'external_api'
};
this.retryStrategies = new Map();
this.errorMetrics = new ErrorMetrics();
}
async handleError(error, context) {
// Categorize error
const category = this.categorizeError(error);
// Log error with context
await this.logError(error, category, context);
// Update metrics
this.errorMetrics.recordError(category, context);
// Determine retry strategy
const retryStrategy = this.getRetryStrategy(category, error);
// Create error response
const errorResponse = {
error_id: this.generateErrorId(),
category: category,
message: this.sanitizeErrorMessage(error.message),
timestamp: Date.now(),
context: this.sanitizeContext(context),
retry_strategy: retryStrategy,
recoverable: this.isRecoverable(category, error)
};
// Handle specific error types
switch (category) {
case this.errorCategories.RATE_LIMIT_ERROR:
return this.handleRateLimitError(error, context, errorResponse);
case this.errorCategories.NETWORK_ERROR:
return this.handleNetworkError(error, context, errorResponse);
case this.errorCategories.EXTERNAL_API_ERROR:
return this.handleExternalAPIError(error, context, errorResponse);
default:
return this.handleGenericError(error, context, errorResponse);
}
}
categorizeError(error) {
if (error.code === 'ENOTFOUND' || error.code === 'ECONNREFUSED') {
return this.errorCategories.NETWORK_ERROR;
}
if (error.status === 401 || error.status === 403) {
return this.errorCategories.AUTHENTICATION_ERROR;
}
if (error.status === 429) {
return this.errorCategories.RATE_LIMIT_ERROR;
}
if (error.status >= 400 && error.status < 500) {
return this.errorCategories.VALIDATION_ERROR;
}
if (error.status >= 500) {
return this.errorCategories.EXTERNAL_API_ERROR;
}
return this.errorCategories.PROCESSING_ERROR;
}
getRetryStrategy(category, error) {
const defaultStrategy = {
max_attempts: 3,
base_delay: 1000,
backoff_multiplier: 2,
max_delay: 30000,
jitter: true
};
switch (category) {
case this.errorCategories.RATE_LIMIT_ERROR:
return {
...defaultStrategy,
max_attempts: 5,
base_delay: 5000,
respect_retry_after: true
};
case this.errorCategories.NETWORK_ERROR:
return {
...defaultStrategy,
max_attempts: 4,
base_delay: 2000
};
case this.errorCategories.EXTERNAL_API_ERROR:
return {
...defaultStrategy,
max_attempts: 3,
base_delay: 1000
};
default:
return defaultStrategy;
}
}
async handleRateLimitError(error, context, errorResponse) {
const retryAfter = this.extractRetryAfter(error);
if (retryAfter) {
errorResponse.retry_after = retryAfter;
errorResponse.recommended_action = `Wait ${retryAfter}ms before retrying`;
}
// Automatically throttle future requests
await this.throttleRequests(context.api_endpoint, retryAfter || 60000);
return errorResponse;
}
extractRetryAfter(error) {
// Try different headers and response formats
const headers = error.response?.headers || {};
if (headers['retry-after']) {
const retryAfter = parseInt(headers['retry-after']);
return isNaN(retryAfter) ? null : retryAfter * 1000; // Convert to ms
}
if (headers['x-ratelimit-reset']) {
const resetTime = parseInt(headers['x-ratelimit-reset']);
return isNaN(resetTime) ? null : Math.max(0, resetTime - Date.now());
}
return null;
}
}
Recovery and Resilience
System Recovery Mechanisms
class ResilienceManager {
constructor() {
this.circuitBreakers = new Map();
this.healthChecks = new Map();
this.fallbackStrategies = new Map();
}
async setupHealthChecks() {
// API endpoint health checks
this.healthChecks.set('kaito', {
url: 'https://api.kaito.ai/health',
interval: 30000,
timeout: 10000,
retries: 3
});
this.healthChecks.set('arkham', {
url: 'https://api.arkhamintelligence.com/health',
interval: 30000,
timeout: 10000,
retries: 3
});
// Start health check monitoring
for (const [service, config] of this.healthChecks) {
this.startHealthCheck(service, config);
}
}
async startHealthCheck(service, config) {
setInterval(async () => {
try {
const response = await fetch(config.url, {
timeout: config.timeout
});
if (response.ok) {
this.recordHealthCheckSuccess(service);
} else {
this.recordHealthCheckFailure(service, `HTTP ${response.status}`);
}
} catch (error) {
this.recordHealthCheckFailure(service, error.message);
}
}, config.interval);
}
setupFallbackStrategies() {
// Price data fallback
this.fallbackStrategies.set('price_data', [
'coingecko',
'coinmarketcap',
'dextools',
'cached_data'
]);
// Social sentiment fallback
this.fallbackStrategies.set('social_sentiment', [
'kaito',
'twitter_direct',
'reddit_direct',
'cached_sentiment'
]);
// Blockchain data fallback
this.fallbackStrategies.set('blockchain_data', [
'alchemy_primary',
'alchemy_backup',
'infura',
'public_rpc'
]);
}
async executeWithFallback(operation, strategyName) {
const strategy = this.fallbackStrategies.get(strategyName);
if (!strategy) {
throw new Error(`No fallback strategy found for ${strategyName}`);
}
let lastError;
for (const provider of strategy) {
try {
const result = await operation(provider);
// Log successful fallback if not primary
if (provider !== strategy[0]) {
console.warn(`Used fallback provider ${provider} for ${strategyName}`);
}
return result;
} catch (error) {
lastError = error;
console.error(`Provider ${provider} failed for ${strategyName}: ${error.message}`);
continue;
}
}
throw new Error(`All fallback providers failed for ${strategyName}. Last error: ${lastError.message}`);
}
}
Last updated