Social Intelligence Agent

Social Media Integration

Component Overview

The Social Intelligence Agent serves as Kaizen AI's comprehensive social media monitoring and analysis system, responsible for collecting, processing, and analyzing social signals across multiple platforms to assess community health, detect manipulation, and evaluate genuine sentiment around cryptocurrency projects.

Core Responsibilities:

  • Multi-platform social media data collection and aggregation

  • Advanced natural language processing for sentiment and context analysis

  • Real-time manipulation detection and coordinated behavior identification

  • Community health assessment and engagement authenticity verification

  • Influencer impact analysis and network mapping

  • Viral content analysis and organic growth validation

Architecture Philosophy: The Social Intelligence Agent employs a distributed, fault-tolerant architecture that can process thousands of social media posts per minute while maintaining accuracy in sentiment analysis and manipulation detection. The system prioritizes real-time processing, scalable data collection, and sophisticated linguistic analysis to provide actionable insights for risk assessment.

Multi-Platform Data Collection

Platform Integration Architecture

// Core interfaces for multi-platform social media integration
interface SocialPlatform {
  readonly name: string;
  readonly apiVersion: string;
  readonly rateLimits: RateLimitConfig;
  readonly authConfig: AuthenticationConfig;
  readonly capabilities: PlatformCapability[];
}

interface SocialMediaCollector {
  platform: SocialPlatform;
  isConnected: boolean;
  lastSync: Date;
  
  connect(): Promise<void>;
  disconnect(): Promise<void>;
  collectMentions(query: SocialQuery): Promise<SocialPost[]>;
  streamRealTime(filters: StreamFilter[]): AsyncIterator<SocialPost>;
  getUserInfo(userId: string): Promise<UserProfile>;
  validateConnection(): Promise<boolean>;
}

export class SocialIntelligenceOrchestrator {
  private collectors: Map<string, SocialMediaCollector> = new Map();
  private dataProcessor: SocialDataProcessor;
  private sentimentAnalyzer: SentimentAnalyzer;
  private manipulationDetector: ManipulationDetector;
  private communityAnalyzer: CommunityAnalyzer;
  private realTimeStreams: Map<string, StreamManager> = new Map();

  constructor(
    private config: SocialIntelligenceConfig,
    private database: DatabaseInterface,
    private messageQueue: MessageQueueInterface,
    private cache: CacheInterface
  ) {
    this.dataProcessor = new SocialDataProcessor(config.processing);
    this.sentimentAnalyzer = new SentimentAnalyzer(config.sentiment);
    this.manipulationDetector = new ManipulationDetector(config.manipulation);
    this.communityAnalyzer = new CommunityAnalyzer(config.community);
    
    this.initializePlatformCollectors();
    this.setupRealTimeStreaming();
  }

  private async initializePlatformCollectors() {
    const platforms = [
      new TwitterCollector(this.config.twitter),
      new TelegramCollector(this.config.telegram),
      new DiscordCollector(this.config.discord),
      new FarcasterCollector(this.config.farcaster),
      new RedditCollector(this.config.reddit)
    ];

    for (const collector of platforms) {
      try {
        await collector.connect();
        this.collectors.set(collector.platform.name, collector);
        
        logger.info(`Connected to ${collector.platform.name} API`);
        
        // Set up error handling and reconnection
        this.setupCollectorMonitoring(collector);
        
      } catch (error) {
        logger.error(`Failed to connect to ${collector.platform.name}:`, error);
        
        // Schedule retry with exponential backoff
        this.scheduleRetryConnection(collector, 1);
      }
    }
  }

  async analyzeSocialSignals(projectQuery: ProjectSocialQuery): Promise<SocialIntelligenceResult> {
    const startTime = Date.now();
    
    try {
      // Collect data from all platforms in parallel
      const collectionPromises = Array.from(this.collectors.values()).map(async collector => {
        try {
          const posts = await collector.collectMentions({
            tokens: projectQuery.tokenSymbols,
            contractAddresses: projectQuery.contractAddresses,
            timeframe: projectQuery.timeframe,
            includeReplies: true,
            includeRetweets: false
          });
          
          return { platform: collector.platform.name, posts, error: null };
        } catch (error) {
          logger.warn(`Data collection failed for ${collector.platform.name}:`, error);
          return { platform: collector.platform.name, posts: [], error: error.message };
        }
      });

      const collectionResults = await Promise.all(collectionPromises);
      
      // Aggregate all collected posts
      const allPosts = collectionResults.flatMap(result => result.posts);
      
      if (allPosts.length === 0) {
        return this.generateEmptyResult(projectQuery, 'No social media mentions found');
      }

      // Process collected data through analysis pipeline
      const processedData = await this.dataProcessor.processRawPosts(allPosts);
      
      // Perform sentiment analysis
      const sentimentResult = await this.sentimentAnalyzer.analyzeBatch(processedData);
      
      // Detect manipulation and coordinated behavior
      const manipulationResult = await this.manipulationDetector.analyzeManipulation(processedData);
      
      // Assess community health
      const communityResult = await this.communityAnalyzer.assessCommunityHealth(processedData);
      
      // Calculate overall social intelligence score
      const socialScore = this.calculateSocialScore(
        sentimentResult,
        manipulationResult,
        communityResult
      );

      const processingTime = Date.now() - startTime;

      return {
        projectQuery,
        socialScore,
        sentiment: sentimentResult,
        manipulation: manipulationResult,
        community: communityResult,
        platforms: this.generatePlatformSummary(collectionResults),
        metadata: {
          totalPosts: allPosts.length,
          processingTime,
          dataQuality: this.assessDataQuality(processedData),
          analysisTimestamp: new Date().toISOString()
        }
      };

    } catch (error) {
      logger.error('Social intelligence analysis failed:', error);
      throw new SocialIntelligenceError('Social analysis failed', error);
    }
  }
}

Twitter Integration

Advanced Twitter Data Collection

// Twitter API v2 integration with enhanced features
export class TwitterCollector implements SocialMediaCollector {
  public readonly platform: SocialPlatform;
  private client: TwitterApi;
  private rateLimiter: RateLimiter;
  private streamConnection: TwitterStream | null = null;
  private userCache: Map<string, TwitterUser> = new Map();

  constructor(private config: TwitterConfig) {
    this.platform = {
      name: 'twitter',
      apiVersion: 'v2',
      rateLimits: {
        searchTweets: { requests: 300, window: 900000 }, // 15 minutes
        userLookup: { requests: 300, window: 900000 },
        streaming: { connections: 1, tweets: 1000000 } // per month
      },
      authConfig: config.auth,
      capabilities: ['mentions', 'sentiment', 'engagement', 'user_metrics']
    };

    this.client = new TwitterApi({
      appKey: config.auth.apiKey,
      appSecret: config.auth.apiSecret,
      accessToken: config.auth.accessToken,
      accessSecret: config.auth.accessTokenSecret,
      bearerToken: config.auth.bearerToken
    });

    this.rateLimiter = new RateLimiter(this.platform.rateLimits);
  }

  async connect(): Promise<void> {
    try {
      // Verify credentials
      const me = await this.client.currentUser();
      logger.info(`Connected to Twitter as @${me.username}`);
      
      this.isConnected = true;
      this.lastSync = new Date();
      
    } catch (error) {
      logger.error('Twitter connection failed:', error);
      throw new ConnectionError('Twitter API connection failed', error);
    }
  }

  async collectMentions(query: SocialQuery): Promise<SocialPost[]> {
    try {
      // Build Twitter search query
      const searchQuery = this.buildTwitterQuery(query);
      
      // Rate limit check
      await this.rateLimiter.waitForAvailability('searchTweets');
      
      // Search tweets with comprehensive fields
      const searchResults = await this.client.v2.search(searchQuery, {
        'tweet.fields': [
          'created_at',
          'author_id',
          'public_metrics',
          'context_annotations',
          'conversation_id',
          'in_reply_to_user_id',
          'referenced_tweets',
          'lang',
          'possibly_sensitive',
          'reply_settings'
        ],
        'user.fields': [
          'created_at',
          'public_metrics',
          'verified',
          'description',
          'location',
          'profile_image_url'
        ],
        'expansions': [
          'author_id',
          'referenced_tweets.id',
          'referenced_tweets.id.author_id'
        ],
        max_results: Math.min(query.limit || 100, 100),
        start_time: query.timeframe.start.toISOString(),
        end_time: query.timeframe.end.toISOString()
      });

      const posts: SocialPost[] = [];
      
      // Process each tweet
      for (const tweet of searchResults.data || []) {
        const author = searchResults.includes?.users?.find(u => u.id === tweet.author_id);
        
        if (!author) continue;
        
        const processedPost = await this.processTwitterPost(tweet, author, searchResults.includes);
        posts.push(processedPost);
      }

      return posts;

    } catch (error) {
      logger.error('Twitter mention collection failed:', error);
      throw new DataCollectionError('Twitter data collection failed', error);
    }
  }

  private buildTwitterQuery(query: SocialQuery): string {
    const queryParts: string[] = [];
    
    // Add token symbols
    if (query.tokens && query.tokens.length > 0) {
      const tokenQueries = query.tokens.map(token => `$${token} OR #${token}`);
      queryParts.push(`(${tokenQueries.join(' OR ')})`);
    }

    // Add contract addresses
    if (query.contractAddresses && query.contractAddresses.length > 0) {
      const addressQueries = query.contractAddresses.map(addr => `"${addr}"`);
      queryParts.push(`(${addressQueries.join(' OR ')})`);
    }

    // Language filter
    queryParts.push('lang:en');
    
    // Exclude retweets if specified
    if (!query.includeRetweets) {
      queryParts.push('-is:retweet');
    }

    // Exclude replies if specified
    if (!query.includeReplies) {
      queryParts.push('-is:reply');
    }

    return queryParts.join(' ');
  }

  private async processTwitterPost(
    tweet: TweetV2,
    author: UserV2,
    includes?: Includes2
  ): Promise<SocialPost> {
    // Calculate user authenticity score
    const authorAuthenticity = this.calculateUserAuthenticity(author);
    
    // Extract entities and context
    const entities = this.extractTwitterEntities(tweet);
    
    // Calculate engagement metrics
    const engagementScore = this.calculateEngagementScore(tweet.public_metrics!);

    return {
      id: tweet.id,
      platform: 'twitter',
      content: tweet.text,
      author: {
        id: author.id,
        username: author.username,
        displayName: author.name,
        verified: author.verified || false,
        followerCount: author.public_metrics?.followers_count || 0,
        followingCount: author.public_metrics?.following_count || 0,
        createdAt: author.created_at,
        profileDescription: author.description,
        authenticity: authorAuthenticity
      },
      timestamp: new Date(tweet.created_at!),
      metrics: {
        likes: tweet.public_metrics?.like_count || 0,
        shares: tweet.public_metrics?.retweet_count || 0,
        replies: tweet.public_metrics?.reply_count || 0,
        quotes: tweet.public_metrics?.quote_count || 0,
        engagement: engagementScore
      },
      entities,
      metadata: {
        conversationId: tweet.conversation_id,
        language: tweet.lang,
        possiblySensitive: tweet.possibly_sensitive,
        contextAnnotations: tweet.context_annotations
      }
    };
  }

  private calculateUserAuthenticity(user: UserV2): number {
    let score = 0.5; // Base score
    
    // Account age factor (older accounts more trustworthy)
    if (user.created_at) {
      const accountAge = Date.now() - new Date(user.created_at).getTime();
      const ageInDays = accountAge / (1000 * 60 * 60 * 24);
      
      if (ageInDays > 365) score += 0.2; // 1+ years
      else if (ageInDays > 180) score += 0.1; // 6+ months
      else if (ageInDays < 30) score -= 0.2; // Less than 30 days
    }

    // Follower ratio (balanced following/follower ratio)
    if (user.public_metrics) {
      const followers = user.public_metrics.followers_count || 0;
      const following = user.public_metrics.following_count || 1; // Avoid division by zero
      
      const ratio = followers / following;
      
      if (ratio > 0.1 && ratio < 10) score += 0.1; // Balanced ratio
      else if (ratio < 0.01 || ratio > 100) score -= 0.2; // Suspicious ratio
    }

    // Verification status
    if (user.verified) score += 0.3;

    // Profile completeness
    if (user.description && user.description.length > 20) score += 0.1;
    if (user.location) score += 0.05;
    if (user.profile_image_url && !user.profile_image_url.includes('default')) score += 0.05;

    return Math.max(0, Math.min(1, score));
  }

  async streamRealTime(filters: StreamFilter[]): Promise<AsyncIterator<SocialPost>> {
    try {
      // Build streaming rules
      const rules = filters.map(filter => ({
        value: this.buildTwitterQuery(filter.query),
        tag: filter.tag || `filter_${Date.now()}`
      }));

      // Set up streaming rules
      await this.client.v2.updateStreamRules({
        add: rules
      });

      // Create stream
      this.streamConnection = await this.client.v2.searchStream({
        'tweet.fields': [
          'created_at',
          'author_id',
          'public_metrics',
          'context_annotations',
          'lang'
        ],
        'user.fields': [
          'public_metrics',
          'verified',
          'created_at'
        ],
        expansions: ['author_id']
      });

      return this.createAsyncIterator();

    } catch (error) {
      logger.error('Twitter streaming setup failed:', error);
      throw new StreamingError('Twitter streaming failed', error);
    }
  }

  private async *createAsyncIterator(): AsyncIterator<SocialPost> {
    if (!this.streamConnection) {
      throw new Error('Stream connection not established');
    }

    try {
      for await (const { data: tweet, includes } of this.streamConnection) {
        if (tweet && includes?.users) {
          const author = includes.users.find(u => u.id === tweet.author_id);
          if (author) {
            const processedPost = await this.processTwitterPost(tweet, author, includes);
            yield processedPost;
          }
        }
      }
    } catch (error) {
      logger.error('Twitter stream error:', error);
      throw error;
    }
  }
}

Telegram Integration

Telegram Bot and Channel Monitoring

// Telegram data collection through bot API and channel monitoring
export class TelegramCollector implements SocialMediaCollector {
  public readonly platform: SocialPlatform;
  private bot: TelegramBot;
  private channelMonitor: ChannelMonitor;
  private messageProcessor: TelegramMessageProcessor;

  constructor(private config: TelegramConfig) {
    this.platform = {
      name: 'telegram',
      apiVersion: 'bot_api_6.0',
      rateLimits: {
        messages: { requests: 30, window: 1000 }, // 30 per second
        channelInfo: { requests: 20, window: 60000 } // 20 per minute
      },
      authConfig: config.auth,
      capabilities: ['channels', 'groups', 'mentions', 'sentiment']
    };

    this.bot = new TelegramBot(config.auth.botToken, { polling: false });
    this.channelMonitor = new ChannelMonitor(config.channels);
    this.messageProcessor = new TelegramMessageProcessor();
  }

  async connect(): Promise<void> {
    try {
      // Verify bot token
      const botInfo = await this.bot.getMe();
      logger.info(`Connected to Telegram as @${botInfo.username}`);

      // Initialize channel monitoring
      await this.channelMonitor.initialize(this.bot);
      
      this.isConnected = true;
      this.lastSync = new Date();

    } catch (error) {
      logger.error('Telegram connection failed:', error);
      throw new ConnectionError('Telegram API connection failed', error);
    }
  }

  async collectMentions(query: SocialQuery): Promise<SocialPost[]> {
    try {
      const posts: SocialPost[] = [];
      
      // Get monitored channels
      const channels = await this.channelMonitor.getActiveChannels();
      
      for (const channel of channels) {
        try {
          const messages = await this.collectChannelMessages(channel, query);
          posts.push(...messages);
        } catch (error) {
          logger.warn(`Failed to collect from channel ${channel.id}:`, error);
          continue;
        }
      }

      return posts;

    } catch (error) {
      logger.error('Telegram mention collection failed:', error);
      throw new DataCollectionError('Telegram data collection failed', error);
    }
  }

  private async collectChannelMessages(
    channel: TelegramChannel,
    query: SocialQuery
  ): Promise<SocialPost[]> {
    const posts: SocialPost[] = [];
    
    try {
      // Calculate message limit based on timeframe
      const timeDiffHours = (query.timeframe.end.getTime() - query.timeframe.start.getTime()) / (1000 * 60 * 60);
      const messageLimit = Math.min(Math.max(Math.floor(timeDiffHours * 5), 10), 100); // 5 messages per hour, max 100

      // Get recent messages
      const messages = await this.bot.getChatHistory(channel.id, messageLimit);
      
      for (const message of messages) {
        // Check if message is within timeframe
        if (!this.isMessageInTimeframe(message, query.timeframe)) {
          continue;
        }

        // Check if message contains relevant keywords
        if (!this.messageContainsKeywords(message, query)) {
          continue;
        }

        const processedPost = await this.processTelegramMessage(message, channel);
        posts.push(processedPost);
      }

    } catch (error) {
      logger.error(`Channel message collection failed for ${channel.id}:`, error);
      throw error;
    }

    return posts;
  }

  private async processTelegramMessage(
    message: TelegramMessage,
    channel: TelegramChannel
  ): Promise<SocialPost> {
    // Extract user information
    const author = await this.extractUserInfo(message.from);
    
    // Process message content
    const processedContent = this.messageProcessor.processContent(message.text || '');
    
    // Extract entities
    const entities = this.messageProcessor.extractEntities(message);
    
    // Calculate engagement based on views and forwards
    const engagement = this.calculateTelegramEngagement(message, channel);

    return {
      id: `tg_${channel.id}_${message.message_id}`,
      platform: 'telegram',
      content: processedContent.text,
      author: {
        id: author.id.toString(),
        username: author.username || `user_${author.id}`,
        displayName: author.first_name + (author.last_name ? ` ${author.last_name}` : ''),
        verified: false, // Telegram doesn't have verification
        followerCount: 0, // Not available
        followingCount: 0, // Not available
        authenticity: this.calculateTelegramUserAuthenticity(author)
      },
      timestamp: new Date(message.date * 1000),
      metrics: {
        likes: 0, // Not available in Telegram
        shares: message.forward_from ? 1 : 0,
        replies: 0, // Would need to track separately
        views: message.views || 0,
        engagement
      },
      entities,
      metadata: {
        channelId: channel.id,
        channelTitle: channel.title,
        messageId: message.message_id,
        hasMedia: !!(message.photo || message.video || message.document),
        isForwarded: !!message.forward_from
      }
    };
  }

  private calculateTelegramUserAuthenticity(user: TelegramUser): number {
    let score = 0.5; // Base score
    
    // Username presence (users with usernames are generally more authentic)
    if (user.username) score += 0.2;
    
    // Profile photo presence
    if (user.photo) score += 0.1;
    
    // Complete name (both first and last name)
    if (user.first_name && user.last_name) score += 0.1;
    
    // Bot detection (bots are less authentic for organic sentiment)
    if (user.is_bot) score -= 0.3;

    return Math.max(0, Math.min(1, score));
  }

  private calculateTelegramEngagement(message: TelegramMessage, channel: TelegramChannel): number {
    if (!message.views || message.views === 0) return 0;
    
    // Base engagement from views relative to channel size
    const channelMembers = channel.member_count || 1000; // Default if unknown
    const viewRatio = message.views / channelMembers;
    
    let engagement = Math.min(viewRatio * 100, 100); // Cap at 100%
    
    // Bonus for forwards
    if (message.forward_from) engagement *= 1.2;
    
    // Penalty for very old messages (engagement typically peaks early)
    const messageAge = Date.now() - (message.date * 1000);
    const hoursOld = messageAge / (1000 * 60 * 60);
    
    if (hoursOld > 24) engagement *= 0.8; // 20% penalty after 24 hours
    if (hoursOld > 168) engagement *= 0.5; // Additional penalty after 1 week

    return Math.max(0, Math.min(100, engagement));
  }
}

Natural Language Processing Pipeline

Advanced Sentiment Analysis and Entity Recognition

// Comprehensive NLP pipeline for social media content analysis
export class SentimentAnalyzer {
  private models: Map<string, SentimentModel> = new Map();
  private entityExtractor: EntityExtractor;
  private contextAnalyzer: ContextAnalyzer;
  private cryptoLexicon: CryptoLexicon;
  private languageDetector: LanguageDetector;

  constructor(private config: SentimentConfig) {
    this.entityExtractor = new EntityExtractor(config.entities);
    this.contextAnalyzer = new ContextAnalyzer(config.context);
    this.cryptoLexicon = new CryptoLexicon(config.lexicon);
    this.languageDetector = new LanguageDetector();
    
    this.initializeModels();
  }

  private async initializeModels() {
    // Load pre-trained sentiment models
    this.models.set('crypto_bert', await this.loadModel('crypto-bert-sentiment', {
      modelPath: 'models/crypto-bert-sentiment',
      tokenizer: 'crypto-bert-tokenizer',
      maxLength: 512
    }));

    this.models.set('finbert', await this.loadModel('finbert-sentiment', {
      modelPath: 'models/finbert',
      tokenizer: 'finbert-tokenizer',
      maxLength: 512
    }));

    this.models.set('general_sentiment', await this.loadModel('twitter-roberta-sentiment', {
      modelPath: 'models/twitter-roberta-base-sentiment',
      tokenizer: 'roberta-tokenizer',
      maxLength: 280
    }));

    // Ensemble model for improved accuracy
    this.models.set('ensemble', new EnsembleSentimentModel(
      Array.from(this.models.values()),
      { weights: [0.4, 0.35, 0.25] } // Crypto-bert gets highest weight
    ));
  }

  async analyzeBatch(posts: SocialPost[]): Promise<SentimentAnalysisResult> {
    try {
      const results: PostSentimentResult[] = [];
      const batchSize = 32; // Process in batches for efficiency
      
      for (let i = 0; i < posts.length; i += batchSize) {
        const batch = posts.slice(i, i + batchSize);
        const batchResults = await this.processBatch(batch);
        results.push(...batchResults);
      }

      // Calculate aggregate sentiment metrics
      const aggregateMetrics = this.calculateAggregateMetrics(results);
      
      // Detect sentiment manipulation
      const manipulationScore = await this.detectSentimentManipulation(results, posts);
      
      return {
        overallSentiment: aggregateMetrics.weighted,
        confidence: aggregateMetrics.confidence,
        distribution: aggregateMetrics.distribution,
        temporalTrends: this.analyzeTemporalTrends(results),
        manipulationScore,
        postResults: results,
        metadata: {
          totalPosts: posts.length,
          processedPosts: results.length,
          averageConfidence: aggregateMetrics.confidence,
          languageDistribution: this.calculateLanguageDistribution(posts)
        }
      };

    } catch (error) {
      logger.error('Batch sentiment analysis failed:', error);
      throw new SentimentAnalysisError('Sentiment analysis failed', error);
    }
  }

  private async processBatch(posts: SocialPost[]): Promise<PostSentimentResult[]> {
    const results: PostSentimentResult[] = [];
    
    for (const post of posts) {
      try {
        const result = await this.analyzePost(post);
        results.push(result);
      } catch (error) {
        logger.warn(`Failed to analyze post ${post.id}:`, error);
        // Add neutral sentiment for failed analysis
        results.push({
          postId: post.id,
          sentiment: 'neutral',
          score: 0.5,
          confidence: 0.1,
          emotions: {},
          entities: [],
          explanation: 'Analysis failed'
        });
      }
    }
    
    return results;
  }

  private async analyzePost(post: SocialPost): Promise<PostSentimentResult> {
    // Preprocess content
    const preprocessedContent = this.preprocessContent(post.content);
    
    // Detect language
    const language = await this.languageDetector.detect(preprocessedContent);
    
    if (language !== 'en') {
      // For now, only process English content
      return {
        postId: post.id,
        sentiment: 'neutral',
        score: 0.5,
        confidence: 0.1,
        emotions: {},
        entities: [],
        explanation: `Non-English content (${language})`
      };
    }

    // Extract entities
    const entities = await this.entityExtractor.extract(preprocessedContent);
    
    // Get sentiment predictions from all models
    const modelPredictions: Map<string, SentimentPrediction> = new Map();
    
    for (const [modelName, model] of this.models) {
      if (modelName === 'ensemble') continue; // Skip ensemble for individual predictions
      
      try {
        const prediction = await model.predict(preprocessedContent);
        modelPredictions.set(modelName, prediction);
      } catch (error) {
        logger.warn(`Model prediction failed: ${modelName}`, error);
      }
    }

    // Get ensemble prediction
    const ensembleModel = this.models.get('ensemble')!;
    const ensemblePrediction = await ensembleModel.predict(preprocessedContent);

    // Apply crypto-specific adjustments
    const cryptoAdjustment = this.applyCryptoSentimentAdjustment(
      preprocessedContent,
      ensemblePrediction,
      entities
    );

    // Analyze context and emotions
    const contextAnalysis = await this.contextAnalyzer.analyze(preprocessedContent, post);
    const emotions = this.extractEmotions(preprocessedContent, ensemblePrediction);

    return {
      postId: post.id,
      sentiment: this.mapScoreToSentiment(cryptoAdjustment.score),
      score: cryptoAdjustment.score,
      confidence: cryptoAdjustment.confidence,
      emotions,
      entities: entities.map(e => ({
        text: e.text,
        type: e.type,
        confidence: e.confidence,
        sentiment: e.sentiment
      })),
      context: contextAnalysis,
      modelPredictions: Object.fromEntries(modelPredictions),
      explanation: cryptoAdjustment.explanation
    };
  }

  private applyCryptoSentimentAdjustment(
    content: string,
    prediction: SentimentPrediction,
    entities: ExtractedEntity[]
  ): CryptoSentimentResult {
    let adjustedScore = prediction.score;
    let confidence = prediction.confidence;
    const adjustmentFactors: string[] = [];

    // Check for crypto-specific positive indicators
    const positiveIndicators = [
      'moon', 'bullish', 'pump', 'gem', 'diamond hands', 'hodl',
      '100x', 'to the moon', 'buy the dip', 'strong community',
      'good fundamentals', 'solid project', 'great team'
    ];

    // Check for crypto-specific negative indicators
    const negativeIndicators = [
      'rug pull', 'scam', 'dump', 'exit scam', 'honeypot',
      'paper hands', 'dead project', 'abandoned', 'suspicious',
      'red flag', 'avoid', 'warning', 'failed'
    ];

    const contentLower = content.toLowerCase();

    // Apply positive adjustments
    for (const indicator of positiveIndicators) {
      if (contentLower.includes(indicator)) {
        adjustedScore = Math.min(1.0, adjustedScore + 0.1);
        adjustmentFactors.push(`Positive: ${indicator}`);
        confidence = Math.min(1.0, confidence + 0.05);
      }
    }

    // Apply negative adjustments
    for (const indicator of negativeIndicators) {
      if (contentLower.includes(indicator)) {
        adjustedScore = Math.max(0.0, adjustedScore - 0.2);
        adjustmentFactors.push(`Negative: ${indicator}`);
        confidence = Math.min(1.0, confidence + 0.1); // More confident about negative indicators
      }
    }

    // Entity-based adjustments
    for (const entity of entities) {
      if (entity.type === 'CRYPTO_TOKEN' || entity.type === 'CONTRACT_ADDRESS') {
        // Presence of crypto entities increases relevance and confidence
        confidence = Math.min(1.0, confidence + 0.05);
        
        if (entity.sentiment && Math.abs(entity.sentiment - 0.5) > 0.2) {
          // Strong entity sentiment influences overall sentiment
          const entityInfluence = (entity.sentiment - 0.5) * 0.1;
          adjustedScore = Math.max(0, Math.min(1, adjustedScore + entityInfluence));
          adjustmentFactors.push(`Entity influence: ${entity.text}`);
        }
      }
    }

    return {
      score: adjustedScore,
      confidence,
      explanation: adjustmentFactors.length > 0 
        ? `Crypto-adjusted: ${adjustmentFactors.join(', ')}`
        : 'Standard sentiment analysis'
    };
  }

  private extractEmotions(content: string, prediction: SentimentPrediction): EmotionScores {
    // Simple emotion detection based on keywords and sentiment
    const emotions: EmotionScores = {
      joy: 0,
      anger: 0,
      fear: 0,
      sadness: 0,
      surprise: 0,
      disgust: 0,
      trust: 0,
      anticipation: 0
    };

    const emotionKeywords = {
      joy: ['happy', 'excited', 'great', 'awesome', 'amazing', 'love', 'moon'],
      anger: ['angry', 'furious', 'mad', 'hate', 'terrible', 'awful'],
      fear: ['scared', 'afraid', 'worried', 'nervous', 'risky', 'dangerous'],
      sadness: ['sad', 'disappointed', 'depressed', 'crying', 'lost'],
      surprise: ['surprised', 'shocked', 'unexpected', 'wow', 'incredible'],
      disgust: ['disgusting', 'gross', 'sick', 'revolting'],
      trust: ['trust', 'reliable', 'solid', 'confidence', 'believe'],
      anticipation: ['excited', 'waiting', 'upcoming', 'soon', 'expecting']
    };

    const contentLower = content.toLowerCase();

    for (const [emotion, keywords] of Object.entries(emotionKeywords)) {
      let score = 0;
      for (const keyword of keywords) {
        if (contentLower.includes(keyword)) {
          score += 0.2;
        }
      }
      emotions[emotion as keyof EmotionScores] = Math.min(1, score);
    }

    return emotions;
  }

  private calculateAggregateMetrics(results: PostSentimentResult[]): AggregateMetrics {
    if (results.length === 0) {
      return {
        weighted: 0.5,
        simple: 0.5,
        confidence: 0,
        distribution: { positive: 0, neutral: 0, negative: 0 }
      };
    }

    let weightedSum = 0;
    let totalWeight = 0;
    let positiveCount = 0;
    let neutralCount = 0;
    let negativeCount = 0;
    let totalConfidence = 0;

    for (const result of results) {
      const weight = result.confidence; // Weight by confidence
      weightedSum += result.score * weight;
      totalWeight += weight;
      totalConfidence += result.confidence;

      // Count sentiment distribution
      if (result.sentiment === 'positive') positiveCount++;
      else if (result.sentiment === 'negative') negativeCount++;
      else neutralCount++;
    }

    const weightedAverage = totalWeight > 0 ? weightedSum / totalWeight : 0.5;
    const simpleAverage = results.reduce((sum, r) => sum + r.score, 0) / results.length;
    const averageConfidence = totalConfidence / results.length;

    return {
      weighted: weightedAverage,
      simple: simpleAverage,
      confidence: averageConfidence,
      distribution: {
        positive: positiveCount / results.length,
        neutral: neutralCount / results.length,
        negative: negativeCount / results.length
      }
    };
  }
}

Manipulation Detection

Advanced Coordination and Shill Detection

// Sophisticated manipulation detection system
export class ManipulationDetector {
  private shillDetector: ShillDetector;
  private coordinationAnalyzer: CoordinationAnalyzer;
  private botDetector: BotDetector;
  private astroturfDetector: AstroturfDetector;
  private networkAnalyzer: NetworkAnalyzer;

  constructor(private config: ManipulationConfig) {
    this.shillDetector = new ShillDetector(config.shill);
    this.coordinationAnalyzer = new CoordinationAnalyzer(config.coordination);
    this.botDetector = new BotDetector(config.bot);
    this.astroturfDetector = new AstroturfDetector(config.astroturf);
    this.networkAnalyzer = new NetworkAnalyzer(config.network);
  }

  async analyzeManipulation(posts: SocialPost[]): Promise<ManipulationAnalysisResult> {
    try {
      // Run all detection algorithms in parallel
      const [
        shillResult,
        coordinationResult,
        botResult,
        astroturfResult,
        networkResult
      ] = await Promise.all([
        this.shillDetector.detectShilling(posts),
        this.coordinationAnalyzer.detectCoordination(posts),
        this.botDetector.detectBots(posts),
        this.astroturfDetector.detectAstroturfing(posts),
        this.networkAnalyzer.analyzeNetworkManipulation(posts)
      ]);

      // Calculate overall manipulation score
      const overallScore = this.calculateOverallManipulationScore([
        shillResult,
        coordinationResult,
        botResult,
        astroturfResult,
        networkResult
      ]);

      // Generate alerts and recommendations
      const alerts = this.generateManipulationAlerts(overallScore, [
        shillResult,
        coordinationResult,
        botResult,
        astroturfResult,
        networkResult
      ]);

      return {
        overallManipulationScore: overallScore.score,
        confidence: overallScore.confidence,
        riskLevel: this.categorizeRisk(overallScore.score),
        detectionResults: {
          shilling: shillResult,
          coordination: coordinationResult,
          botActivity: botResult,
          astroturfing: astroturfResult,
          networkManipulation: networkResult
        },
        alerts,
        recommendations: this.generateRecommendations(overallScore.score, alerts),
        metadata: {
          totalPosts: posts.length,
          analysisTimestamp: new Date().toISOString(),
          algorithmsUsed: ['shill', 'coordination', 'bot', 'astroturf', 'network']
        }
      };

    } catch (error) {
      logger.error('Manipulation analysis failed:', error);
      throw new ManipulationAnalysisError('Manipulation analysis failed', error);
    }
  }

  private calculateOverallManipulationScore(results: DetectionResult[]): OverallScore {
    const weights = {
      shill: 0.25,
      coordination: 0.25,
      bot: 0.20,
      astroturf: 0.15,
      network: 0.15
    };

    let weightedScore = 0;
    let totalWeight = 0;
    let totalConfidence = 0;

    for (let i = 0; i < results.length; i++) {
      const result = results[i];
      const weight = Object.values(weights)[i] * result.confidence;
      
      weightedScore += result.score * weight;
      totalWeight += weight;
      totalConfidence += result.confidence;
    }

    return {
      score: totalWeight > 0 ? weightedScore / totalWeight : 0,
      confidence: totalConfidence / results.length
    };
  }
}

// Shill detection implementation
export class ShillDetector {
  constructor(private config: ShillConfig) {}

  async detectShilling(posts: SocialPost[]): Promise<ShillDetectionResult> {
    try {
      const suspiciousPatterns: ShillPattern[] = [];
      let overallShillScore = 0;

      // Group posts by author
      const postsByAuthor = this.groupPostsByAuthor(posts);
      
      // Analyze each author's posting patterns
      for (const [authorId, authorPosts] of postsByAuthor) {
        const authorAnalysis = await this.analyzeAuthorPattern(authorPosts);
        
        if (authorAnalysis.isSuspicious) {
          suspiciousPatterns.push({
            type: 'suspicious_author',
            authorId,
            suspicionScore: authorAnalysis.suspicionScore,
            evidence: authorAnalysis.evidence,
            postCount: authorPosts.length
          });
        }
        
        overallShillScore += authorAnalysis.suspicionScore * (authorPosts.length / posts.length);
      }

      // Detect content similarity patterns
      const similarityPatterns = await this.detectContentSimilarity(posts);
      suspiciousPatterns.push(...similarityPatterns);

      // Detect timing patterns
      const timingPatterns = await this.detectTimingPatterns(posts);
      suspiciousPatterns.push(...timingPatterns);

      // Calculate final shill score
      const finalScore = Math.min(1, overallShillScore + (suspiciousPatterns.length * 0.1));

      return {
        score: finalScore,
        confidence: this.calculateShillConfidence(suspiciousPatterns, posts.length),
        suspiciousPatterns,
        affectedPosts: this.getAffectedPosts(posts, suspiciousPatterns),
        explanation: this.generateShillExplanation(suspiciousPatterns, finalScore)
      };

    } catch (error) {
      logger.error('Shill detection failed:', error);
      throw error;
    }
  }

  private async analyzeAuthorPattern(posts: SocialPost[]): Promise<AuthorAnalysis> {
    let suspicionScore = 0;
    const evidence: string[] = [];

    // Check posting frequency
    const timeSpan = posts[posts.length - 1].timestamp.getTime() - posts[0].timestamp.getTime();
    const hoursSpan = timeSpan / (1000 * 60 * 60);
    const postsPerHour = posts.length / Math.max(hoursSpan, 1);

    if (postsPerHour > 5) { // More than 5 posts per hour
      suspicionScore += 0.3;
      evidence.push(`High posting frequency: ${postsPerHour.toFixed(1)} posts/hour`);
    }

    // Check content repetition
    const uniqueContent = new Set(posts.map(p => this.normalizeContent(p.content)));
    const repetitionRatio = 1 - (uniqueContent.size / posts.length);

    if (repetitionRatio > 0.5) { // More than 50% content repetition
      suspicionScore += 0.4;
      evidence.push(`High content repetition: ${(repetitionRatio * 100).toFixed(1)}%`);
    }

    // Check author authenticity
    const author = posts[0].author;
    if (author.authenticity < 0.3) {
      suspicionScore += 0.2;
      evidence.push('Low author authenticity score');
    }

    // Check for excessive promotional language
    const promotionalScore = this.calculatePromotionalScore(posts);
    if (promotionalScore > 0.7) {
      suspicionScore += 0.3;
      evidence.push('Excessive promotional language detected');
    }

    return {
      isSuspicious: suspicionScore > 0.4,
      suspicionScore: Math.min(1, suspicionScore),
      evidence
    };
  }

  private async detectContentSimilarity(posts: SocialPost[]): Promise<ShillPattern[]> {
    const patterns: ShillPattern[] = [];
    const similarityThreshold = 0.8;

    // Calculate content similarity matrix
    for (let i = 0; i < posts.length; i++) {
      for (let j = i + 1; j < posts.length; j++) {
        const similarity = this.calculateContentSimilarity(
          posts[i].content,
          posts[j].content
        );

        if (similarity > similarityThreshold) {
          patterns.push({
            type: 'content_similarity',
            postIds: [posts[i].id, posts[j].id],
            suspicionScore: similarity,
            evidence: [`Content similarity: ${(similarity * 100).toFixed(1)}%`],
            similarity
          });
        }
      }
    }

    return patterns;
  }

  private calculateContentSimilarity(content1: string, content2: string): number {
    // Implement Jaccard similarity for text content
    const tokens1 = new Set(this.tokenize(content1.toLowerCase()));
    const tokens2 = new Set(this.tokenize(content2.toLowerCase()));
    
    const intersection = new Set([...tokens1].filter(x => tokens2.has(x)));
    const union = new Set([...tokens1, ...tokens2]);
    
    return intersection.size / union.size;
  }

  private tokenize(text: string): string[] {
    return text.match(/\b\w+\b/g) || [];
  }

  private normalizeContent(content: string): string {
    return content
      .toLowerCase()
      .replace(/[^\w\s]/g, '') // Remove punctuation
      .replace(/\s+/g, ' ') // Normalize whitespace
      .trim();
  }

  private calculatePromotionalScore(posts: SocialPost[]): number {
    const promotionalKeywords = [
      'buy now', 'don\'t miss', 'last chance', 'limited time',
      'guaranteed', 'easy money', 'get rich', 'moon shot',
      'next big thing', 'explosive growth', '1000x', 'lambo'
    ];

    let totalScore = 0;
    
    for (const post of posts) {
      const contentLower = post.content.toLowerCase();
      let postScore = 0;
      
      for (const keyword of promotionalKeywords) {
        if (contentLower.includes(keyword)) {
          postScore += 0.1;
        }
      }
      
      // Check for excessive use of emojis and capitalization
      const emojiCount = (post.content.match(/[\u{1F600}-\u{1F64F}]|[\u{1F300}-\u{1F5FF}]|[\u{1F680}-\u{1F6FF}]|[\u{1F1E0}-\u{1F1FF}]/gu) || []).length;
      const capsRatio = (post.content.match(/[A-Z]/g) || []).length / post.content.length;
      
      if (emojiCount > 5) postScore += 0.1;
      if (capsRatio > 0.3) postScore += 0.1;
      
      totalScore += Math.min(1, postScore);
    }

    return totalScore / posts.length;
  }
}

// Coordination detection implementation
export class CoordinationAnalyzer {
  constructor(private config: CoordinationConfig) {}

  async detectCoordination(posts: SocialPost[]): Promise<CoordinationDetectionResult> {
    try {
      const coordinationPatterns: CoordinationPattern[] = [];

      // Temporal coordination detection
      const temporalPatterns = await this.detectTemporalCoordination(posts);
      coordinationPatterns.push(...temporalPatterns);

      // Content coordination detection
      const contentPatterns = await this.detectContentCoordination(posts);
      coordinationPatterns.push(...contentPatterns);

      // Network coordination detection
      const networkPatterns = await this.detectNetworkCoordination(posts);
      coordinationPatterns.push(...networkPatterns);

      // Calculate overall coordination score
      const coordinationScore = this.calculateCoordinationScore(coordinationPatterns);

      return {
        score: coordinationScore.score,
        confidence: coordinationScore.confidence,
        patterns: coordinationPatterns,
        suspiciousGroups: this.identifySuspiciousGroups(coordinationPatterns),
        explanation: this.generateCoordinationExplanation(coordinationPatterns)
      };

    } catch (error) {
      logger.error('Coordination detection failed:', error);
      throw error;
    }
  }

  private async detectTemporalCoordination(posts: SocialPost[]): Promise<CoordinationPattern[]> {
    const patterns: CoordinationPattern[] = [];
    const timeWindow = 5 * 60 * 1000; // 5 minutes
    const minimumPostsInWindow = 5;

    // Sort posts by timestamp
    const sortedPosts = [...posts].sort((a, b) => a.timestamp.getTime() - b.timestamp.getTime());

    for (let i = 0; i < sortedPosts.length; i++) {
      const windowStart = sortedPosts[i].timestamp.getTime();
      const windowEnd = windowStart + timeWindow;
      
      const postsInWindow = sortedPosts.slice(i).filter(post => 
        post.timestamp.getTime() <= windowEnd
      );

      if (postsInWindow.length >= minimumPostsInWindow) {
        // Check if these posts come from different authors
        const uniqueAuthors = new Set(postsInWindow.map(p => p.author.id));
        
        if (uniqueAuthors.size >= 3) { // At least 3 different authors
          patterns.push({
            type: 'temporal_coordination',
            postIds: postsInWindow.map(p => p.id),
            authorIds: Array.from(uniqueAuthors),
            suspicionScore: Math.min(1, postsInWindow.length / 10), // Higher score for more posts
            timeWindow: {
              start: new Date(windowStart),
              end: new Date(windowEnd)
            },
            evidence: [`${postsInWindow.length} posts from ${uniqueAuthors.size} authors in 5-minute window`]
          });
        }
      }
    }

    return patterns;
  }

  private async detectContentCoordination(posts: SocialPost[]): Promise<CoordinationPattern[]> {
    const patterns: CoordinationPattern[] = [];
    const similarityThreshold = 0.7;
    const minGroupSize = 3;

    // Group posts by content similarity
    const similarityGroups = this.groupBySimilarity(posts, similarityThreshold);

    for (const group of similarityGroups) {
      if (group.length >= minGroupSize) {
        const uniqueAuthors = new Set(group.map(p => p.author.id));
        
        if (uniqueAuthors.size >= 2) { // Multiple authors posting similar content
          patterns.push({
            type: 'content_coordination',
            postIds: group.map(p => p.id),
            authorIds: Array.from(uniqueAuthors),
            suspicionScore: Math.min(1, group.length / 10),
            evidence: [`${group.length} similar posts from ${uniqueAuthors.size} authors`],
            similarity: this.calculateGroupSimilarity(group)
          });
        }
      }
    }

    return patterns;
  }
}

Real-Time Social Monitoring

Live Social Media Streaming and Alert System

// Real-time social media monitoring and alert system
export class RealTimeSocialMonitor {
  private streamManagers: Map<string, StreamManager> = new Map();
  private alertEngine: SocialAlertEngine;
  private trendDetector: TrendDetector;
  private viralAnalyzer: ViralAnalyzer;
  private eventEmitter: EventEmitter;

  constructor(
    private config: RealTimeMonitorConfig,
    private socialCollectors: Map<string, SocialMediaCollector>,
    private notificationService: NotificationService
  ) {
    this.alertEngine = new SocialAlertEngine(config.alerts);
    this.trendDetector = new TrendDetector(config.trends);
    this.viralAnalyzer = new ViralAnalyzer(config.viral);
    this.eventEmitter = new EventEmitter();
    
    this.setupRealTimeStreaming();
  }

  private async setupRealTimeStreaming() {
    for (const [platform, collector] of this.socialCollectors) {
      try {
        const streamManager = new StreamManager(platform, collector, {
          bufferSize: this.config.bufferSize,
          processingInterval: this.config.processingInterval,
          alertThresholds: this.config.alertThresholds[platform]
        });

        await streamManager.initialize();
        this.streamManagers.set(platform, streamManager);

        // Set up event handlers
        streamManager.on('newPost', (post) => this.handleNewPost(post));
        streamManager.on('trendDetected', (trend) => this.handleTrendDetection(trend));
        streamManager.on('viralContent', (content) => this.handleViralContent(content));
        streamManager.on('suspiciousActivity', (activity) => this.handleSuspiciousActivity(activity));

        logger.info(`Real-time streaming initialized for ${platform}`);

      } catch (error) {
        logger.error(`Failed to initialize streaming for ${platform}:`, error);
      }
    }
  }

  async monitorProject(projectId: string, monitoringConfig: ProjectMonitoringConfig): Promise<string> {
    const monitoringId = `monitor_${projectId}_${Date.now()}`;
    
    try {
      // Create stream filters for the project
      const streamFilters = this.createStreamFilters(monitoringConfig);
      
      // Start monitoring on all platforms
      for (const [platform, streamManager] of this.streamManagers) {
        await streamManager.addProjectMonitoring(monitoringId, streamFilters);
      }

      // Set up project-specific alert rules
      await this.alertEngine.addProjectAlerts(monitoringId, monitoringConfig.alertRules);

      logger.info(`Started monitoring project ${projectId} with ID ${monitoringId}`);
      
      return monitoringId;

    } catch (error) {
      logger.error(`Failed to start project monitoring for ${projectId}:`, error);
      throw error;
    }
  }

  private async handleNewPost(post: SocialPost) {
    try {
      // Process post through analysis pipeline
      const analysis = await this.analyzePostRealTime(post);
      
      // Check for alerts
      const alerts = await this.alertEngine.checkPostAlerts(post, analysis);
      
      // Emit events
      this.eventEmitter.emit('postAnalyzed', { post, analysis, alerts });
      
      // Send alerts if necessary
      if (alerts.length > 0) {
        await this.sendAlerts(alerts, post);
      }

      // Update trend detection
      await this.trendDetector.updateWithPost(post, analysis);

    } catch (error) {
      logger.error('Failed to handle new post:', error, { postId: post.id });
    }
  }

  private async analyzePostRealTime(post: SocialPost): Promise<RealTimeAnalysis> {
    // Lightweight real-time analysis
    const sentiment = await this.quickSentimentAnalysis(post.content);
    const entities = await this.quickEntityExtraction(post.content);
    const manipulation = await this.quickManipulationCheck(post);

    return {
      sentiment,
      entities,
      manipulation,
      timestamp: new Date(),
      confidence: this.calculateAnalysisConfidence(sentiment, entities, manipulation)
    };
  }

  private async handleTrendDetection(trend: DetectedTrend) {
    try {
      logger.info(`Trend detected: ${trend.type}`, trend);

      // Analyze trend significance
      const significance = await this.trendDetector.analyzeTrendSignificance(trend);
      
      if (significance.isSignificant) {
        // Create trend alert
        const trendAlert: TrendAlert = {
          type: 'trend_detected',
          trend,
          significance,
          timestamp: new Date(),
          platforms: trend.platforms,
          affectedProjects: trend.affectedProjects
        };

        // Send trend notifications
        await this.notificationService.sendTrendAlert(trendAlert);
        
        this.eventEmitter.emit('significantTrend', trendAlert);
      }

    } catch (error) {
      logger.error('Failed to handle trend detection:', error);
    }
  }

  private async handleViralContent(content: ViralContent) {
    try {
      logger.info(`Viral content detected: ${content.postId}`);

      // Analyze viral spread pattern
      const spreadAnalysis = await this.viralAnalyzer.analyzeSpread(content);
      
      // Check for artificial amplification
      const artificialScore = await this.viralAnalyzer.detectArtificialAmplification(content);
      
      if (artificialScore > 0.7) {
        // High probability of artificial amplification
        const manipulationAlert: ManipulationAlert = {
          type: 'artificial_viral',
          content,
          artificialScore,
          spreadAnalysis,
          timestamp: new Date()
        };

        await this.notificationService.sendManipulationAlert(manipulationAlert);
        this.eventEmitter.emit('manipulationDetected', manipulationAlert);
      }

    } catch (error) {
      logger.error('Failed to handle viral content:', error);
    }
  }

  async generateRealTimeReport(timeframe: number = 3600000): Promise<RealTimeReport> {
    const endTime = Date.now();
    const startTime = endTime - timeframe;

    try {
      // Collect metrics from all stream managers
      const platformMetrics: Record<string, PlatformMetrics> = {};
      
      for (const [platform, streamManager] of this.streamManagers) {
        platformMetrics[platform] = await streamManager.getMetrics(startTime, endTime);
      }

      // Get trend summary
      const trendSummary = await this.trendDetector.getTrendSummary(startTime, endTime);
      
      // Get viral content summary
      const viralSummary = await this.viralAnalyzer.getViralSummary(startTime, endTime);
      
      // Get alert summary
      const alertSummary = await this.alertEngine.getAlertSummary(startTime, endTime);

      return {
        timeframe: { start: new Date(startTime), end: new Date(endTime) },
        platformMetrics,
        trendSummary,
        viralSummary,
        alertSummary,
        overallHealth: this.calculateOverallSocialHealth(platformMetrics),
        recommendations: this.generateRecommendations(alertSummary, trendSummary),
        generatedAt: new Date()
      };

    } catch (error) {
      logger.error('Failed to generate real-time report:', error);
      throw error;
    }
  }
}

// Stream manager for individual platforms
export class StreamManager extends EventEmitter {
  private activeStreams: Map<string, AsyncIterator<SocialPost>> = new Map();
  private postBuffer: SocialPost[] = [];
  private processingTimer: NodeJS.Timer | null = null;
  private metrics: StreamMetrics;

  constructor(
    private platform: string,
    private collector: SocialMediaCollector,
    private config: StreamConfig
  ) {
    super();
    this.metrics = new StreamMetrics();
  }

  async initialize(): Promise<void> {
    try {
      // Start processing timer
      this.processingTimer = setInterval(() => {
        this.processBufferedPosts();
      }, this.config.processingInterval);

      logger.info(`Stream manager initialized for ${this.platform}`);

    } catch (error) {
      logger.error(`Stream manager initialization failed for ${this.platform}:`, error);
      throw error;
    }
  }

  async addProjectMonitoring(monitoringId: string, filters: StreamFilter[]): Promise<void> {
    try {
      // Create stream for project monitoring
      const stream = await this.collector.streamRealTime(filters);
      this.activeStreams.set(monitoringId, stream);

      // Start consuming the stream
      this.consumeStream(monitoringId, stream);

      logger.info(`Added project monitoring ${monitoringId} for ${this.platform}`);

    } catch (error) {
      logger.error(`Failed to add project monitoring ${monitoringId}:`, error);
      throw error;
    }
  }

  private async consumeStream(monitoringId: string, stream: AsyncIterator<SocialPost>) {
    try {
      for await (const post of stream) {
        // Add to buffer
        this.postBuffer.push(post);
        
        // Update metrics
        this.metrics.recordPost(post);
        
        // Check buffer size
        if (this.postBuffer.length >= this.config.bufferSize) {
          this.processBufferedPosts();
        }
      }
    } catch (error) {
      logger.error(`Stream consumption error for ${monitoringId}:`, error);
      
      // Attempt to restart stream
      setTimeout(() => {
        this.restartMonitoring(monitoringId);
      }, 5000);
    }
  }

  private async processBufferedPosts() {
    if (this.postBuffer.length === 0) return;

    const postsToProcess = this.postBuffer.splice(0);
    
    try {
      for (const post of postsToProcess) {
        // Emit new post event
        this.emit('newPost', post);
        
        // Check for trends
        const trendCheck = await this.checkForTrends(post, postsToProcess);
        if (trendCheck.isTrending) {
          this.emit('trendDetected', trendCheck.trend);
        }
        
        // Check for viral content
        const viralCheck = await this.checkForViralContent(post);
        if (viralCheck.isViral) {
          this.emit('viralContent', viralCheck.content);
        }
        
        // Check for suspicious activity
        const suspiciousCheck = await this.checkForSuspiciousActivity(post, postsToProcess);
        if (suspiciousCheck.isSuspicious) {
          this.emit('suspiciousActivity', suspiciousCheck.activity);
        }
      }

    } catch (error) {
      logger.error('Failed to process buffered posts:', error);
    }
  }

  async getMetrics(startTime: number, endTime: number): Promise<PlatformMetrics> {
    return this.metrics.getMetrics(startTime, endTime);
  }
}

This comprehensive Social Intelligence Agent documentation provides the technical foundation for understanding and implementing sophisticated social media monitoring, sentiment analysis, and manipulation detection capabilities that power Kaizen AI's social intelligence features.

Last updated