Webhook and Event Systems

Event Subscription

Universal Event Manager

Event Subscription Framework

class EventManager {
  constructor() {
    this.subscriptions = new Map();
    this.eventQueue = new EventQueue();
    this.webhookManager = new WebhookManager();
    this.filters = new EventFilterManager();
  }

  async subscribe(subscriptionConfig) {
    const {
      id,
      name,
      events,
      filters,
      webhook_url,
      retry_config,
      user_id
    } = subscriptionConfig;

    // Validate subscription
    this.validateSubscription(subscriptionConfig);

    const subscription = {
      id: id || this.generateSubscriptionId(),
      name: name,
      user_id: user_id,
      events: events,
      filters: filters,
      webhook_url: webhook_url,
      retry_config: retry_config || this.getDefaultRetryConfig(),
      created_at: Date.now(),
      status: 'active',
      stats: {
        events_sent: 0,
        events_failed: 0,
        last_event: null
      }
    };

    this.subscriptions.set(subscription.id, subscription);
    
    // Set up event listeners for each event type
    for (const eventType of events) {
      await this.setupEventListener(eventType, subscription);
    }

    return subscription;
  }

  async setupEventListener(eventType, subscription) {
    switch (eventType) {
      case 'token_transfer':
        return this.setupTokenTransferListener(subscription);
      case 'price_change':
        return this.setupPriceChangeListener(subscription);
      case 'security_alert':
        return this.setupSecurityAlertListener(subscription);
      case 'social_sentiment':
        return this.setupSentimentListener(subscription);
      default:
        throw new Error(`Unsupported event type: ${eventType}`);
    }
  }
}

Event Types and Schemas

Event Schema Definitions

const EventSchemas = {
  token_transfer: {
    type: 'object',
    properties: {
      event_type: { type: 'string', enum: ['token_transfer'] },
      token_address: { type: 'string' },
      chain: { type: 'string' },
      from_address: { type: 'string' },
      to_address: { type: 'string' },
      amount: { type: 'string' },
      amount_usd: { type: 'number' },
      transaction_hash: { type: 'string' },
      block_number: { type: 'number' },
      timestamp: { type: 'number' },
      gas_used: { type: 'number' },
      gas_price: { type: 'string' }
    },
    required: ['event_type', 'token_address', 'chain', 'from_address', 'to_address', 'amount']
  },

  price_change: {
    type: 'object',
    properties: {
      event_type: { type: 'string', enum: ['price_change'] },
      token_address: { type: 'string' },
      chain: { type: 'string' },
      old_price: { type: 'number' },
      new_price: { type: 'number' },
      change_percentage: { type: 'number' },
      volume_24h: { type: 'number' },
      market_cap: { type: 'number' },
      timestamp: { type: 'number' },
      trigger_reason: { type: 'string' }
    },
    required: ['event_type', 'token_address', 'old_price', 'new_price', 'change_percentage']
  },

  security_alert: {
    type: 'object',
    properties: {
      event_type: { type: 'string', enum: ['security_alert'] },
      alert_type: { type: 'string', enum: ['honeypot', 'rug_pull', 'suspicious_activity', 'contract_vulnerability'] },
      token_address: { type: 'string' },
      chain: { type: 'string' },
      severity: { type: 'string', enum: ['low', 'medium', 'high', 'critical'] },
      description: { type: 'string' },
      evidence: { type: 'array' },
      risk_score: { type: 'number', minimum: 0, maximum: 100 },
      timestamp: { type: 'number' },
      recommended_action: { type: 'string' }
    },
    required: ['event_type', 'alert_type', 'token_address', 'severity', 'description']
  }
};

Real-Time Notifications

Notification Delivery System

Multi-Channel Notification Manager

class NotificationManager {
  constructor() {
    this.channels = {
      webhook: new WebhookChannel(),
      email: new EmailChannel(),
      slack: new SlackChannel(),
      discord: new DiscordChannel(),
      telegram: new TelegramChannel(),
      push: new PushNotificationChannel()
    };
    
    this.messageQueue = new MessageQueue();
    this.retryManager = new RetryManager();
  }

  async sendNotification(notification) {
    const {
      user_id,
      channels,
      content,
      priority,
      template,
      metadata
    } = notification;

    // Get user preferences
    const userPreferences = await this.getUserPreferences(user_id);
    
    // Determine delivery channels
    const deliveryChannels = this.resolveDeliveryChannels(channels, userPreferences, priority);
    
    // Format content for each channel
    const deliveryPromises = deliveryChannels.map(async (channel) => {
      try {
        const formattedContent = await this.formatContent(content, channel, template);
        return await this.channels[channel].send(user_id, formattedContent, metadata);
      } catch (error) {
        return { channel, error: error.message, success: false };
      }
    });

    const results = await Promise.all(deliveryPromises);
    
    // Handle failures with retry logic
    const failures = results.filter(result => !result.success);
    if (failures.length > 0) {
      await this.retryManager.scheduleRetries(notification, failures);
    }

    return {
      notification_id: notification.id,
      delivery_results: results,
      success_rate: (results.length - failures.length) / results.length
    };
  }

  async formatContent(content, channel, template) {
    const formatter = this.getFormatter(channel);
    
    switch (template) {
      case 'security_alert':
        return formatter.formatSecurityAlert(content);
      case 'price_alert':
        return formatter.formatPriceAlert(content);
      case 'portfolio_update':
        return formatter.formatPortfolioUpdate(content);
      default:
        return formatter.formatGeneric(content);
    }
  }
}

Real-Time Alert Processing

Alert Processing Pipeline

class AlertProcessor {
  constructor() {
    this.alertRules = new Map();
    this.cooldownManager = new CooldownManager();
    this.aggregator = new AlertAggregator();
  }

  async processAlert(rawAlert) {
    // Validate alert data
    const validatedAlert = await this.validateAlert(rawAlert);
    
    // Check alert rules and conditions
    const triggeredRules = await this.evaluateRules(validatedAlert);
    
    if (triggeredRules.length === 0) {
      return { processed: false, reason: 'No rules triggered' };
    }

    // Check cooldown periods
    const allowedRules = await this.filterByCooldown(triggeredRules, validatedAlert);
    
    if (allowedRules.length === 0) {
      return { processed: false, reason: 'All rules in cooldown' };
    }

    // Aggregate similar alerts
    const aggregatedAlert = await this.aggregator.aggregate(validatedAlert, allowedRules);
    
    // Process notifications for each rule
    const notificationPromises = allowedRules.map(rule => 
      this.createNotification(aggregatedAlert, rule)
    );
    
    const notifications = await Promise.all(notificationPromises);
    
    // Update cooldown timers
    await this.cooldownManager.updateCooldowns(allowedRules, validatedAlert);
    
    return {
      processed: true,
      alert_id: validatedAlert.id,
      notifications: notifications,
      rules_triggered: allowedRules.length
    };
  }

  async evaluateRules(alert) {
    const triggeredRules = [];
    
    for (const [ruleId, rule] of this.alertRules) {
      try {
        const conditionsMet = await this.evaluateConditions(alert, rule.conditions);
        
        if (conditionsMet) {
          triggeredRules.push({
            ...rule,
            id: ruleId,
            triggered_at: Date.now()
          });
        }
      } catch (error) {
        console.error(`Rule evaluation failed for ${ruleId}: ${error.message}`);
      }
    }
    
    return triggeredRules;
  }
}

Alert Configuration

Dynamic Alert Rules

Rule Configuration System

class AlertRuleManager {
  constructor() {
    this.rules = new Map();
    this.ruleValidator = new RuleValidator();
    this.conditionEvaluator = new ConditionEvaluator();
  }

  async createRule(ruleConfig) {
    const {
      name,
      description,
      conditions,
      actions,
      cooldown,
      priority,
      user_id,
      tokens,
      chains
    } = ruleConfig;

    // Validate rule configuration
    await this.ruleValidator.validate(ruleConfig);

    const rule = {
      id: this.generateRuleId(),
      name: name,
      description: description,
      conditions: this.parseConditions(conditions),
      actions: this.parseActions(actions),
      cooldown: cooldown || 300, // 5 minutes default
      priority: priority || 'medium',
      user_id: user_id,
      tokens: tokens || [],
      chains: chains || [],
      created_at: Date.now(),
      updated_at: Date.now(),
      status: 'active',
      stats: {
        triggered_count: 0,
        last_triggered: null
      }
    };

    this.rules.set(rule.id, rule);
    return rule;
  }

  parseConditions(conditions) {
    return conditions.map(condition => ({
      type: condition.type,
      field: condition.field,
      operator: condition.operator,
      value: condition.value,
      timeframe: condition.timeframe,
      aggregation: condition.aggregation
    }));
  }

  // Example rule conditions
  getExampleRules() {
    return {
      priceDropAlert: {
        name: "Price Drop Alert",
        conditions: [
          {
            type: "price_change",
            field: "change_percentage",
            operator: "less_than",
            value: -10,
            timeframe: "1h"
          }
        ],
        actions: [
          {
            type: "webhook",
            url: "https://api.example.com/webhook",
            template: "price_alert"
          },
          {
            type: "email",
            template: "price_drop_notification"
          }
        ]
      },
      
      liquidityAlert: {
        name: "Low Liquidity Warning",
        conditions: [
          {
            type: "liquidity_change",
            field: "liquidity_usd",
            operator: "less_than",
            value: 10000
          },
          {
            type: "volume_change",
            field: "volume_24h",
            operator: "greater_than",
            value: 100000,
            timeframe: "24h"
          }
        ],
        actions: [
          {
            type: "webhook",
            url: "https://api.example.com/urgent",
            template: "liquidity_warning"
          }
        ]
      },

      securityAlert: {
        name: "Security Risk Detection",
        conditions: [
          {
            type: "security_score",
            field: "score",
            operator: "less_than",
            value: 30
          }
        ],
        actions: [
          {
            type: "webhook",
            url: "https://api.example.com/security",
            template: "security_alert",
            priority: "high"
          },
          {
            type: "slack",
            channel: "#security-alerts",
            template: "security_notification"
          }
        ]
      }
    };
  }
}

Callback Management

Webhook Delivery System

Reliable Webhook Delivery

class WebhookManager {
  constructor() {
    this.deliveryQueue = new PriorityQueue();
    this.retryScheduler = new RetryScheduler();
    this.circuitBreaker = new CircuitBreaker();
    this.rateLimiter = new RateLimiter();
  }

  async deliverWebhook(webhook) {
    const {
      url,
      payload,
      headers,
      timeout,
      retry_config,
      user_id,
      signature_secret
    } = webhook;

    // Check rate limits
    await this.rateLimiter.checkLimit(user_id);

    // Check circuit breaker
    if (this.circuitBreaker.isOpen(url)) {
      throw new Error(`Circuit breaker open for ${url}`);
    }

    try {
      // Sign payload if secret provided
      const signedHeaders = signature_secret 
        ? this.signPayload(payload, signature_secret, headers)
        : headers;

      // Make HTTP request
      const response = await this.makeRequest({
        url: url,
        method: 'POST',
        headers: signedHeaders,
        body: JSON.stringify(payload),
        timeout: timeout || 30000
      });

      // Record success
      this.circuitBreaker.recordSuccess(url);
      
      return {
        success: true,
        status_code: response.status,
        response_time: response.responseTime,
        webhook_id: webhook.id
      };

    } catch (error) {
      // Record failure
      this.circuitBreaker.recordFailure(url);
      
      // Schedule retry if configured
      if (retry_config && retry_config.enabled) {
        await this.retryScheduler.schedule(webhook, error);
      }

      throw error;
    }
  }

  signPayload(payload, secret, headers = {}) {
    const timestamp = Date.now();
    const signature = this.generateSignature(payload, secret, timestamp);
    
    return {
      ...headers,
      'X-Kaizen-Timestamp': timestamp.toString(),
      'X-Kaizen-Signature': signature,
      'Content-Type': 'application/json'
    };
  }

  generateSignature(payload, secret, timestamp) {
    const crypto = require('crypto');
    const message = `${timestamp}.${JSON.stringify(payload)}`;
    const signature = crypto
      .createHmac('sha256', secret)
      .update(message)
      .digest('hex');
    
    return `v1=${signature}`;
  }

  async verifyWebhookSignature(payload, signature, secret, timestamp) {
    const currentTime = Date.now();
    const timestampMs = parseInt(timestamp) * 1000;
    
    // Check timestamp tolerance (5 minutes)
    if (Math.abs(currentTime - timestampMs) > 300000) {
      throw new Error('Request timestamp too old');
    }

    const expectedSignature = this.generateSignature(payload, secret, timestamp);
    
    if (signature !== expectedSignature) {
      throw new Error('Invalid signature');
    }

    return true;
  }
}

Error Handling

Comprehensive Error Management

Error Handling Framework

class ErrorHandler {
  constructor() {
    this.errorCategories = {
      NETWORK_ERROR: 'network',
      AUTHENTICATION_ERROR: 'auth',
      RATE_LIMIT_ERROR: 'rate_limit',
      VALIDATION_ERROR: 'validation',
      PROCESSING_ERROR: 'processing',
      EXTERNAL_API_ERROR: 'external_api'
    };
    
    this.retryStrategies = new Map();
    this.errorMetrics = new ErrorMetrics();
  }

  async handleError(error, context) {
    // Categorize error
    const category = this.categorizeError(error);
    
    // Log error with context
    await this.logError(error, category, context);
    
    // Update metrics
    this.errorMetrics.recordError(category, context);
    
    // Determine retry strategy
    const retryStrategy = this.getRetryStrategy(category, error);
    
    // Create error response
    const errorResponse = {
      error_id: this.generateErrorId(),
      category: category,
      message: this.sanitizeErrorMessage(error.message),
      timestamp: Date.now(),
      context: this.sanitizeContext(context),
      retry_strategy: retryStrategy,
      recoverable: this.isRecoverable(category, error)
    };

    // Handle specific error types
    switch (category) {
      case this.errorCategories.RATE_LIMIT_ERROR:
        return this.handleRateLimitError(error, context, errorResponse);
      
      case this.errorCategories.NETWORK_ERROR:
        return this.handleNetworkError(error, context, errorResponse);
      
      case this.errorCategories.EXTERNAL_API_ERROR:
        return this.handleExternalAPIError(error, context, errorResponse);
      
      default:
        return this.handleGenericError(error, context, errorResponse);
    }
  }

  categorizeError(error) {
    if (error.code === 'ENOTFOUND' || error.code === 'ECONNREFUSED') {
      return this.errorCategories.NETWORK_ERROR;
    }
    
    if (error.status === 401 || error.status === 403) {
      return this.errorCategories.AUTHENTICATION_ERROR;
    }
    
    if (error.status === 429) {
      return this.errorCategories.RATE_LIMIT_ERROR;
    }
    
    if (error.status >= 400 && error.status < 500) {
      return this.errorCategories.VALIDATION_ERROR;
    }
    
    if (error.status >= 500) {
      return this.errorCategories.EXTERNAL_API_ERROR;
    }
    
    return this.errorCategories.PROCESSING_ERROR;
  }

  getRetryStrategy(category, error) {
    const defaultStrategy = {
      max_attempts: 3,
      base_delay: 1000,
      backoff_multiplier: 2,
      max_delay: 30000,
      jitter: true
    };

    switch (category) {
      case this.errorCategories.RATE_LIMIT_ERROR:
        return {
          ...defaultStrategy,
          max_attempts: 5,
          base_delay: 5000,
          respect_retry_after: true
        };
      
      case this.errorCategories.NETWORK_ERROR:
        return {
          ...defaultStrategy,
          max_attempts: 4,
          base_delay: 2000
        };
      
      case this.errorCategories.EXTERNAL_API_ERROR:
        return {
          ...defaultStrategy,
          max_attempts: 3,
          base_delay: 1000
        };
      
      default:
        return defaultStrategy;
    }
  }

  async handleRateLimitError(error, context, errorResponse) {
    const retryAfter = this.extractRetryAfter(error);
    
    if (retryAfter) {
      errorResponse.retry_after = retryAfter;
      errorResponse.recommended_action = `Wait ${retryAfter}ms before retrying`;
    }
    
    // Automatically throttle future requests
    await this.throttleRequests(context.api_endpoint, retryAfter || 60000);
    
    return errorResponse;
  }

  extractRetryAfter(error) {
    // Try different headers and response formats
    const headers = error.response?.headers || {};
    
    if (headers['retry-after']) {
      const retryAfter = parseInt(headers['retry-after']);
      return isNaN(retryAfter) ? null : retryAfter * 1000; // Convert to ms
    }
    
    if (headers['x-ratelimit-reset']) {
      const resetTime = parseInt(headers['x-ratelimit-reset']);
      return isNaN(resetTime) ? null : Math.max(0, resetTime - Date.now());
    }
    
    return null;
  }
}

Recovery and Resilience

System Recovery Mechanisms

class ResilienceManager {
  constructor() {
    this.circuitBreakers = new Map();
    this.healthChecks = new Map();
    this.fallbackStrategies = new Map();
  }

  async setupHealthChecks() {
    // API endpoint health checks
    this.healthChecks.set('kaito', {
      url: 'https://api.kaito.ai/health',
      interval: 30000,
      timeout: 10000,
      retries: 3
    });

    this.healthChecks.set('arkham', {
      url: 'https://api.arkhamintelligence.com/health',
      interval: 30000,
      timeout: 10000,
      retries: 3
    });

    // Start health check monitoring
    for (const [service, config] of this.healthChecks) {
      this.startHealthCheck(service, config);
    }
  }

  async startHealthCheck(service, config) {
    setInterval(async () => {
      try {
        const response = await fetch(config.url, {
          timeout: config.timeout
        });
        
        if (response.ok) {
          this.recordHealthCheckSuccess(service);
        } else {
          this.recordHealthCheckFailure(service, `HTTP ${response.status}`);
        }
      } catch (error) {
        this.recordHealthCheckFailure(service, error.message);
      }
    }, config.interval);
  }

  setupFallbackStrategies() {
    // Price data fallback
    this.fallbackStrategies.set('price_data', [
      'coingecko',
      'coinmarketcap',
      'dextools',
      'cached_data'
    ]);

    // Social sentiment fallback
    this.fallbackStrategies.set('social_sentiment', [
      'kaito',
      'twitter_direct',
      'reddit_direct',
      'cached_sentiment'
    ]);

    // Blockchain data fallback
    this.fallbackStrategies.set('blockchain_data', [
      'alchemy_primary',
      'alchemy_backup',
      'infura',
      'public_rpc'
    ]);
  }

  async executeWithFallback(operation, strategyName) {
    const strategy = this.fallbackStrategies.get(strategyName);
    
    if (!strategy) {
      throw new Error(`No fallback strategy found for ${strategyName}`);
    }

    let lastError;
    
    for (const provider of strategy) {
      try {
        const result = await operation(provider);
        
        // Log successful fallback if not primary
        if (provider !== strategy[0]) {
          console.warn(`Used fallback provider ${provider} for ${strategyName}`);
        }
        
        return result;
      } catch (error) {
        lastError = error;
        console.error(`Provider ${provider} failed for ${strategyName}: ${error.message}`);
        continue;
      }
    }
    
    throw new Error(`All fallback providers failed for ${strategyName}. Last error: ${lastError.message}`);
  }
}

Last updated