# Data Agent

## On-Chain Data Collection

#### Component Overview

The Data Agent serves as the foundational intelligence layer of Kaizen AI, responsible for real-time collection, processing, and normalization of blockchain data across multiple networks. This agent operates as a high-throughput, low-latency data pipeline that transforms raw blockchain events into structured, analyzable information for downstream analytical components.

**Core Responsibilities**:

* Real-time blockchain event monitoring and capture
* Smart contract interaction analysis and state tracking
* Token transfer and liquidity movement detection
* Market data aggregation from decentralized exchanges
* Cross-chain data correlation and synchronization
* Data validation, enrichment, and quality assurance

**Architecture Philosophy**: The Data Agent implements a modular, event-driven architecture that can scale horizontally to handle increasing blockchain activity while maintaining sub-second latency for critical events. The design emphasizes fault tolerance, data integrity, and seamless integration with analytical downstream systems.

#### Multi-Chain Collection Framework

**Network Support Architecture**

```typescript
// Core interfaces for multi-chain data collection
interface BlockchainNetwork {
  readonly name: string;
  readonly chainId: number | string;
  readonly type: 'evm' | 'solana' | 'bitcoin';
  readonly rpcEndpoints: RPCEndpoint[];
  readonly wsEndpoints: WebSocketEndpoint[];
  readonly explorerAPI?: ExplorerAPI;
}

interface DataCollector {
  network: BlockchainNetwork;
  isConnected: boolean;
  lastBlockProcessed: number | string;
  
  connect(): Promise<void>;
  disconnect(): Promise<void>;
  subscribe(filters: EventFilter[]): Promise<void>;
  getLatestBlock(): Promise<Block>;
  processBlock(block: Block): Promise<ProcessedEvent[]>;
  validateData(data: RawBlockchainData): ValidationResult;
}

// Multi-chain coordinator
export class MultiChainDataCollector {
  private collectors: Map<string, DataCollector> = new Map();
  private eventBus: EventEmitter;
  private dataProcessor: DataProcessor;
  private healthMonitor: HealthMonitor;

  constructor(
    private config: DataAgentConfig,
    private database: DatabaseInterface,
    private messageQueue: MessageQueueInterface
  ) {
    this.eventBus = new EventEmitter();
    this.dataProcessor = new DataProcessor(config.processing);
    this.healthMonitor = new HealthMonitor(config.monitoring);
    
    this.initializeCollectors();
    this.setupEventHandlers();
  }

  private async initializeCollectors() {
    const networks = [
      new EthereumCollector(this.config.ethereum),
      new SolanaCollector(this.config.solana),
      new PolygonCollector(this.config.polygon),
      new ArbitrumCollector(this.config.arbitrum)
    ];

    for (const collector of networks) {
      try {
        await collector.connect();
        this.collectors.set(collector.network.name, collector);
        
        // Set up event subscriptions
        await this.subscribeToNetworkEvents(collector);
        
        logger.info(`Connected to ${collector.network.name} network`);
      } catch (error) {
        logger.error(`Failed to connect to ${collector.network.name}:`, error);
        
        // Schedule retry
        setTimeout(() => this.retryConnection(collector), 5000);
      }
    }
  }

  private async subscribeToNetworkEvents(collector: DataCollector) {
    const eventFilters = this.buildEventFilters(collector.network);
    
    await collector.subscribe(eventFilters);
    
    // Set up event listeners
    collector.on('newBlock', (block) => this.handleNewBlock(collector.network, block));
    collector.on('newTransaction', (tx) => this.handleNewTransaction(collector.network, tx));
    collector.on('contractEvent', (event) => this.handleContractEvent(collector.network, event));
    collector.on('error', (error) => this.handleCollectorError(collector.network, error));
  }
}
```

#### Ethereum Data Collection

**Advanced Ethereum Integration**

```typescript
// Ethereum-specific data collector implementation
export class EthereumCollector implements DataCollector {
  public readonly network: BlockchainNetwork;
  private providers: Map<string, ethers.Provider> = new Map();
  private currentProvider: ethers.Provider;
  private wsProvider: ethers.WebSocketProvider;
  private eventFilters: Map<string, ethers.Filter> = new Map();
  
  constructor(private config: EthereumConfig) {
    this.network = {
      name: 'ethereum',
      chainId: 1,
      type: 'evm',
      rpcEndpoints: config.rpcEndpoints,
      wsEndpoints: config.wsEndpoints
    };
    
    this.initializeProviders();
  }

  private initializeProviders() {
    // Initialize multiple providers for redundancy
    const providers = [
      new ethers.AlchemyProvider('homestead', this.config.alchemyKey),
      new ethers.InfuraProvider('homestead', this.config.infuraKey),
      new ethers.JsonRpcProvider(this.config.customRPCUrl)
    ];

    providers.forEach((provider, index) => {
      this.providers.set(`provider_${index}`, provider);
    });

    // Primary provider with automatic failover
    this.currentProvider = new ethers.FallbackProvider(
      Array.from(this.providers.values()).map((provider, index) => ({
        provider,
        priority: index + 1,
        weight: 1
      }))
    );

    // WebSocket provider for real-time events
    this.wsProvider = new ethers.WebSocketProvider(
      this.config.wsEndpoints[0].url,
      'homestead'
    );
  }

  async connect(): Promise<void> {
    try {
      // Test connection
      const blockNumber = await this.currentProvider.getBlockNumber();
      const network = await this.currentProvider.getNetwork();
      
      logger.info(`Connected to Ethereum network ${network.name}, block: ${blockNumber}`);
      
      // Set up WebSocket connection
      await this.setupWebSocketConnection();
      
      this.isConnected = true;
      this.lastBlockProcessed = blockNumber;
      
    } catch (error) {
      logger.error('Failed to connect to Ethereum network:', error);
      throw new ConnectionError('Ethereum connection failed', error);
    }
  }

  private async setupWebSocketConnection() {
    this.wsProvider.on('block', (blockNumber) => {
      this.handleNewBlock(blockNumber);
    });

    this.wsProvider.on('error', (error) => {
      logger.error('WebSocket error:', error);
      this.handleWebSocketError(error);
    });

    // Set up specific event filters
    await this.setupEventFilters();
  }

  private async setupEventFilters() {
    const commonFilters = [
      // ERC-20 Transfer events
      {
        name: 'ERC20_Transfer',
        topics: ['0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'],
        handler: this.handleERC20Transfer.bind(this)
      },
      
      // Uniswap V2 Swap events
      {
        name: 'UniswapV2_Swap',
        address: '0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f',
        topics: ['0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822'],
        handler: this.handleUniswapSwap.bind(this)
      },
      
      // Uniswap V3 Swap events
      {
        name: 'UniswapV3_Swap',
        topics: ['0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'],
        handler: this.handleUniswapV3Swap.bind(this)
      },
      
      // Liquidity events
      {
        name: 'Liquidity_Add',
        topics: ['0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f'],
        handler: this.handleLiquidityAdd.bind(this)
      }
    ];

    for (const filter of commonFilters) {
      const ethersFilter = {
        address: filter.address,
        topics: filter.topics
      };
      
      this.eventFilters.set(filter.name, ethersFilter);
      this.wsProvider.on(ethersFilter, filter.handler);
    }
  }

  async processBlock(blockNumber: number): Promise<ProcessedEvent[]> {
    const startTime = Date.now();
    
    try {
      // Get full block with transactions
      const block = await this.currentProvider.getBlock(blockNumber, true);
      if (!block || !block.transactions) {
        return [];
      }

      const processedEvents: ProcessedEvent[] = [];
      
      // Process all transactions in parallel
      const transactionPromises = block.transactions.map(async (tx) => {
        if (typeof tx === 'string') return []; // Skip if only hash
        
        return await this.processTransaction(tx as ethers.TransactionResponse, block);
      });

      const transactionResults = await Promise.all(transactionPromises);
      
      // Flatten results
      for (const events of transactionResults) {
        processedEvents.push(...events);
      }

      // Update metrics
      const processingTime = Date.now() - startTime;
      this.updateMetrics({
        blockNumber,
        transactionCount: block.transactions.length,
        eventCount: processedEvents.length,
        processingTime
      });

      this.lastBlockProcessed = blockNumber;
      
      return processedEvents;
      
    } catch (error) {
      logger.error(`Failed to process block ${blockNumber}:`, error);
      throw new ProcessingError(`Block processing failed: ${blockNumber}`, error);
    }
  }

  private async processTransaction(
    tx: ethers.TransactionResponse, 
    block: ethers.Block
  ): Promise<ProcessedEvent[]> {
    const events: ProcessedEvent[] = [];
    
    try {
      // Get transaction receipt for events
      const receipt = await this.currentProvider.getTransactionReceipt(tx.hash);
      if (!receipt) return events;

      // Process each log in the transaction
      for (const log of receipt.logs) {
        const processedEvent = await this.processLog(log, tx, block);
        if (processedEvent) {
          events.push(processedEvent);
        }
      }

      // Check for failed transactions
      if (receipt.status === 0) {
        events.push({
          type: 'TRANSACTION_FAILED',
          contractAddress: tx.to || '',
          transactionHash: tx.hash,
          blockNumber: block.number,
          timestamp: block.timestamp,
          data: {
            from: tx.from,
            to: tx.to,
            value: tx.value.toString(),
            gasUsed: receipt.gasUsed.toString(),
            gasPrice: tx.gasPrice?.toString()
          },
          network: 'ethereum'
        });
      }

      return events;
      
    } catch (error) {
      logger.warn(`Failed to process transaction ${tx.hash}:`, error);
      return events;
    }
  }

  private async processLog(
    log: ethers.Log, 
    tx: ethers.TransactionResponse, 
    block: ethers.Block
  ): Promise<ProcessedEvent | null> {
    try {
      const eventSignature = log.topics[0];
      
      // Match against known event signatures
      switch (eventSignature) {
        case '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef':
          return await this.parseERC20Transfer(log, tx, block);
          
        case '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822':
          return await this.parseUniswapV2Swap(log, tx, block);
          
        case '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67':
          return await this.parseUniswapV3Swap(log, tx, block);
          
        default:
          // Try to decode using contract ABI if available
          return await this.tryDecodeGenericEvent(log, tx, block);
      }
      
    } catch (error) {
      logger.debug(`Failed to parse log ${log.transactionHash}:${log.logIndex}`, error);
      return null;
    }
  }

  private async parseERC20Transfer(
    log: ethers.Log, 
    tx: ethers.TransactionResponse, 
    block: ethers.Block
  ): Promise<ProcessedEvent> {
    // Decode Transfer event: Transfer(address indexed from, address indexed to, uint256 value)
    const fromAddress = ethers.getAddress('0x' + log.topics[1].slice(26));
    const toAddress = ethers.getAddress('0x' + log.topics[2].slice(26));
    const value = BigInt(log.data);

    // Get token information
    const tokenInfo = await this.getTokenInfo(log.address);

    return {
      type: 'ERC20_TRANSFER',
      contractAddress: log.address,
      transactionHash: log.transactionHash,
      blockNumber: block.number,
      timestamp: block.timestamp,
      logIndex: log.index,
      data: {
        from: fromAddress,
        to: toAddress,
        value: value.toString(),
        valueFormatted: ethers.formatUnits(value, tokenInfo?.decimals || 18),
        token: {
          address: log.address,
          symbol: tokenInfo?.symbol,
          name: tokenInfo?.name,
          decimals: tokenInfo?.decimals
        }
      },
      network: 'ethereum',
      gasUsed: tx.gasLimit?.toString(),
      gasPrice: tx.gasPrice?.toString()
    };
  }

  private async parseUniswapV2Swap(
    log: ethers.Log, 
    tx: ethers.TransactionResponse, 
    block: ethers.Block
  ): Promise<ProcessedEvent> {
    // Decode Swap event: Swap(address indexed sender, uint amount0In, uint amount1In, uint amount0Out, uint amount1Out, address indexed to)
    const iface = new ethers.Interface([
      'event Swap(address indexed sender, uint amount0In, uint amount1In, uint amount0Out, uint amount1Out, address indexed to)'
    ]);
    
    const decoded = iface.parseLog({
      topics: log.topics,
      data: log.data
    });

    // Get pair information
    const pairInfo = await this.getUniswapV2PairInfo(log.address);

    return {
      type: 'UNISWAP_V2_SWAP',
      contractAddress: log.address,
      transactionHash: log.transactionHash,
      blockNumber: block.number,
      timestamp: block.timestamp,
      logIndex: log.index,
      data: {
        sender: decoded!.args.sender,
        recipient: decoded!.args.to,
        amount0In: decoded!.args.amount0In.toString(),
        amount1In: decoded!.args.amount1In.toString(),
        amount0Out: decoded!.args.amount0Out.toString(),
        amount1Out: decoded!.args.amount1Out.toString(),
        pair: pairInfo
      },
      network: 'ethereum'
    };
  }

  // Token information caching
  private tokenInfoCache: Map<string, TokenInfo> = new Map();
  
  private async getTokenInfo(tokenAddress: string): Promise<TokenInfo | null> {
    // Check cache first
    if (this.tokenInfoCache.has(tokenAddress)) {
      return this.tokenInfoCache.get(tokenAddress)!;
    }

    try {
      const contract = new ethers.Contract(tokenAddress, [
        'function name() view returns (string)',
        'function symbol() view returns (string)',
        'function decimals() view returns (uint8)',
        'function totalSupply() view returns (uint256)'
      ], this.currentProvider);

      const [name, symbol, decimals, totalSupply] = await Promise.all([
        contract.name(),
        contract.symbol(),
        contract.decimals(),
        contract.totalSupply()
      ]);

      const tokenInfo: TokenInfo = {
        address: tokenAddress,
        name,
        symbol,
        decimals,
        totalSupply: totalSupply.toString()
      };

      // Cache for 1 hour
      this.tokenInfoCache.set(tokenAddress, tokenInfo);
      setTimeout(() => this.tokenInfoCache.delete(tokenAddress), 3600000);

      return tokenInfo;
      
    } catch (error) {
      logger.warn(`Failed to get token info for ${tokenAddress}:`, error);
      return null;
    }
  }

  // Event handlers
  private async handleNewBlock(blockNumber: number) {
    try {
      const events = await this.processBlock(blockNumber);
      
      if (events.length > 0) {
        // Emit to event bus for downstream processing
        this.emit('blockProcessed', {
          network: 'ethereum',
          blockNumber,
          eventCount: events.length,
          events
        });

        // Send to message queue for async processing
        await this.messageQueue.publish('blockchain-events', {
          network: 'ethereum',
          events
        });
      }
      
    } catch (error) {
      logger.error(`Failed to handle new block ${blockNumber}:`, error);
      this.emit('error', error);
    }
  }

  private handleWebSocketError(error: Error) {
    logger.error('WebSocket connection error:', error);
    
    // Attempt reconnection
    setTimeout(async () => {
      try {
        await this.reconnectWebSocket();
      } catch (reconnectError) {
        logger.error('WebSocket reconnection failed:', reconnectError);
      }
    }, 5000);
  }

  private async reconnectWebSocket() {
    this.wsProvider.removeAllListeners();
    
    this.wsProvider = new ethers.WebSocketProvider(
      this.config.wsEndpoints[0].url,
      'homestead'
    );
    
    await this.setupWebSocketConnection();
    logger.info('WebSocket reconnected successfully');
  }
}
```

#### Solana Data Collection

**Solana-Specific Implementation**

```typescript
// Solana data collector with program-specific monitoring
export class SolanaCollector implements DataCollector {
  public readonly network: BlockchainNetwork;
  private connection: Connection;
  private commitment: Commitment = 'confirmed';
  private subscriptions: Map<string, number> = new Map();
  private programWatchers: Map<string, ProgramWatcher> = new Map();

  constructor(private config: SolanaConfig) {
    this.network = {
      name: 'solana',
      chainId: 'mainnet-beta',
      type: 'solana',
      rpcEndpoints: config.rpcEndpoints,
      wsEndpoints: config.wsEndpoints
    };

    this.connection = new Connection(
      config.rpcEndpoints[0].url,
      {
        commitment: this.commitment,
        wsEndpoint: config.wsEndpoints[0].url,
        confirmTransactionInitialTimeout: 60000
      }
    );
  }

  async connect(): Promise<void> {
    try {
      // Test connection
      const slot = await this.connection.getSlot();
      const epochInfo = await this.connection.getEpochInfo();
      
      logger.info(`Connected to Solana network, slot: ${slot}, epoch: ${epochInfo.epoch}`);
      
      // Set up program watchers
      await this.setupProgramWatchers();
      
      this.isConnected = true;
      this.lastBlockProcessed = slot;
      
    } catch (error) {
      logger.error('Failed to connect to Solana network:', error);
      throw new ConnectionError('Solana connection failed', error);
    }
  }

  private async setupProgramWatchers() {
    const programsToWatch = [
      // SPL Token Program
      {
        programId: 'TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA',
        name: 'SPL_Token',
        handler: this.handleTokenProgramInstruction.bind(this)
      },
      
      // Serum DEX
      {
        programId: '9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin',
        name: 'Serum_DEX',
        handler: this.handleSerumInstruction.bind(this)
      },
      
      // Raydium AMM
      {
        programId: '675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8',
        name: 'Raydium_AMM',
        handler: this.handleRaydiumInstruction.bind(this)
      },
      
      // Jupiter Aggregator
      {
        programId: 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4',
        name: 'Jupiter',
        handler: this.handleJupiterInstruction.bind(this)
      }
    ];

    for (const program of programsToWatch) {
      try {
        const watcher = new ProgramWatcher(
          this.connection,
          new PublicKey(program.programId),
          program.handler
        );
        
        await watcher.start();
        this.programWatchers.set(program.name, watcher);
        
        logger.info(`Started watching ${program.name} program`);
      } catch (error) {
        logger.error(`Failed to set up watcher for ${program.name}:`, error);
      }
    }

    // Set up slot subscription
    const slotSubscription = this.connection.onSlotChange((slotInfo) => {
      this.handleNewSlot(slotInfo);
    });
    
    this.subscriptions.set('slot', slotSubscription);
  }

  async processSlot(slot: number): Promise<ProcessedEvent[]> {
    const startTime = Date.now();
    
    try {
      // Get block with full transaction details
      const block = await this.connection.getBlock(slot, {
        commitment: this.commitment,
        maxSupportedTransactionVersion: 0,
        transactionDetails: 'full',
        rewards: false
      });

      if (!block || !block.transactions) {
        return [];
      }

      const processedEvents: ProcessedEvent[] = [];
      
      // Process all transactions
      for (const tx of block.transactions) {
        const events = await this.processTransaction(tx, block, slot);
        processedEvents.push(...events);
      }

      // Update metrics
      const processingTime = Date.now() - startTime;
      this.updateMetrics({
        slot,
        transactionCount: block.transactions.length,
        eventCount: processedEvents.length,
        processingTime
      });

      this.lastBlockProcessed = slot;
      
      return processedEvents;
      
    } catch (error) {
      logger.error(`Failed to process slot ${slot}:`, error);
      throw new ProcessingError(`Slot processing failed: ${slot}`, error);
    }
  }

  private async processTransaction(
    tx: VersionedTransactionWithMeta,
    block: BlockResponse,
    slot: number
  ): Promise<ProcessedEvent[]> {
    const events: ProcessedEvent[] = [];
    
    if (!tx.meta || tx.meta.err) {
      // Handle failed transactions
      if (tx.meta?.err) {
        events.push({
          type: 'TRANSACTION_FAILED',
          contractAddress: '',
          transactionHash: '',
          blockNumber: slot,
          timestamp: block.blockTime || 0,
          data: {
            error: tx.meta.err,
            signature: tx.transaction.signatures[0]
          },
          network: 'solana'
        });
      }
      return events;
    }

    // Process instructions
    const transaction = tx.transaction;
    const message = transaction.message;
    
    for (let i = 0; i < message.compiledInstructions.length; i++) {
      const instruction = message.compiledInstructions[i];
      const programId = message.staticAccountKeys[instruction.programIdIndex];
      
      const event = await this.processInstruction(
        instruction,
        programId,
        message.staticAccountKeys,
        tx.meta,
        slot,
        block.blockTime || 0,
        tx.transaction.signatures[0]
      );
      
      if (event) {
        events.push(event);
      }
    }

    return events;
  }

  private async processInstruction(
    instruction: CompiledInstruction,
    programId: PublicKey,
    accountKeys: PublicKey[],
    meta: TransactionMeta,
    slot: number,
    blockTime: number,
    signature: string
  ): Promise<ProcessedEvent | null> {
    const programIdString = programId.toString();
    
    switch (programIdString) {
      case 'TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA':
        return await this.parseTokenInstruction(
          instruction, accountKeys, meta, slot, blockTime, signature
        );
        
      case '9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin':
        return await this.parseSerumInstruction(
          instruction, accountKeys, meta, slot, blockTime, signature
        );
        
      case '675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8':
        return await this.parseRaydiumInstruction(
          instruction, accountKeys, meta, slot, blockTime, signature
        );
        
      default:
        return null;
    }
  }

  private async parseTokenInstruction(
    instruction: CompiledInstruction,
    accountKeys: PublicKey[],
    meta: TransactionMeta,
    slot: number,
    blockTime: number,
    signature: string
  ): Promise<ProcessedEvent | null> {
    try {
      // Decode instruction data
      const instructionType = instruction.data[0];
      
      switch (instructionType) {
        case 3: // Transfer
          return this.parseTokenTransfer(instruction, accountKeys, meta, slot, blockTime, signature);
        case 7: // MintTo
          return this.parseTokenMint(instruction, accountKeys, meta, slot, blockTime, signature);
        case 8: // Burn
          return this.parseTokenBurn(instruction, accountKeys, meta, slot, blockTime, signature);
        default:
          return null;
      }
      
    } catch (error) {
      logger.debug(`Failed to parse token instruction:`, error);
      return null;
    }
  }

  private parseTokenTransfer(
    instruction: CompiledInstruction,
    accountKeys: PublicKey[],
    meta: TransactionMeta,
    slot: number,
    blockTime: number,
    signature: string
  ): ProcessedEvent {
    // Parse transfer instruction: source, destination, authority, amount
    const sourceAccount = accountKeys[instruction.accounts[0]];
    const destinationAccount = accountKeys[instruction.accounts[1]];
    const authority = accountKeys[instruction.accounts[2]];
    
    // Decode amount from instruction data (bytes 1-8)
    const amountBuffer = Buffer.from(instruction.data.slice(1, 9));
    const amount = amountBuffer.readBigUInt64LE(0);

    return {
      type: 'SPL_TOKEN_TRANSFER',
      contractAddress: sourceAccount.toString(),
      transactionHash: signature,
      blockNumber: slot,
      timestamp: blockTime,
      data: {
        source: sourceAccount.toString(),
        destination: destinationAccount.toString(),
        authority: authority.toString(),
        amount: amount.toString()
      },
      network: 'solana'
    };
  }

  // Program-specific watchers
  private async handleTokenProgramInstruction(instruction: ParsedInstruction) {
    // Handle SPL Token program instructions
    this.emit('tokenInstruction', {
      network: 'solana',
      instruction
    });
  }

  private async handleSerumInstruction(instruction: ParsedInstruction) {
    // Handle Serum DEX instructions
    this.emit('serumInstruction', {
      network: 'solana',
      instruction
    });
  }

  private async handleRaydiumInstruction(instruction: ParsedInstruction) {
    // Handle Raydium AMM instructions
    this.emit('raydiumInstruction', {
      network: 'solana',
      instruction
    });
  }

  private async handleJupiterInstruction(instruction: ParsedInstruction) {
    // Handle Jupiter aggregator instructions
    this.emit('jupiterInstruction', {
      network: 'solana',
      instruction
    });
  }

  private handleNewSlot(slotInfo: SlotInfo) {
    // Process new slot
    this.processSlot(slotInfo.slot).catch(error => {
      logger.error(`Failed to process new slot ${slotInfo.slot}:`, error);
    });
  }
}
```

#### Event Processing and Validation

**Data Processing Pipeline**

```typescript
// Advanced data processing and validation system
export class DataProcessor {
  private validators: Map<string, DataValidator> = new Map();
  private enrichers: Map<string, DataEnricher> = new Map();
  private normalizers: Map<string, DataNormalizer> = new Map();
  
  constructor(private config: ProcessingConfig) {
    this.initializeProcessors();
  }

  private initializeProcessors() {
    // Initialize validators
    this.validators.set('ethereum', new EthereumValidator());
    this.validators.set('solana', new SolanaValidator());
    this.validators.set('polygon', new PolygonValidator());

    // Initialize enrichers
    this.enrichers.set('token', new TokenDataEnricher());
    this.enrichers.set('price', new PriceDataEnricher());
    this.enrichers.set('liquidity', new LiquidityDataEnricher());

    // Initialize normalizers
    this.normalizers.set('transfer', new TransferNormalizer());
    this.normalizers.set('swap', new SwapNormalizer());
    this.normalizers.set('liquidity', new LiquidityNormalizer());
  }

  async processEvents(events: RawBlockchainEvent[]): Promise<ProcessedEvent[]> {
    const processedEvents: ProcessedEvent[] = [];
    
    for (const event of events) {
      try {
        // Step 1: Validate raw event data
        const validationResult = await this.validateEvent(event);
        if (!validationResult.isValid) {
          logger.warn(`Event validation failed: ${validationResult.error}`, { event });
          continue;
        }

        // Step 2: Normalize event format
        const normalizedEvent = await this.normalizeEvent(event);
        if (!normalizedEvent) {
          logger.warn('Event normalization failed', { event });
          continue;
        }

        // Step 3: Enrich with additional data
        const enrichedEvent = await this.enrichEvent(normalizedEvent);

        // Step 4: Final validation and quality check
        const qualityScore = this.calculateQualityScore(enrichedEvent);
        enrichedEvent.qualityScore = qualityScore;

        processedEvents.push(enrichedEvent);
        
      } catch (error) {
        logger.error('Failed to process event:', error, { event });
      }
    }

    return processedEvents;
  }

  private async validateEvent(event: RawBlockchainEvent): Promise<ValidationResult> {
    const validator = this.validators.get(event.network);
    if (!validator) {
      return {
        isValid: false,
        error: `No validator found for network: ${event.network}`
      };
    }

    return await validator.validate(event);
  }

  private async normalizeEvent(event: RawBlockchainEvent): Promise<NormalizedEvent | null> {
    const normalizer = this.normalizers.get(event.type);
    if (!normalizer) {
      // Use generic normalizer
      return this.genericNormalize(event);
    }

    return await normalizer.normalize(event);
  }

  private async enrichEvent(event: NormalizedEvent): Promise<ProcessedEvent> {
    let enrichedEvent = { ...event } as ProcessedEvent;

    // Apply all relevant enrichers
    for (const [type, enricher] of this.enrichers) {
      if (enricher.canEnrich(event)) {
        enrichedEvent = await enricher.enrich(enrichedEvent);
      }
    }

    return enrichedEvent;
  }

  private calculateQualityScore(event: ProcessedEvent): number {
    let score = 100;
    
    // Check data completeness
    if (!event.contractAddress) score -= 20;
    if (!event.transactionHash) score -= 15;
    if (!event.timestamp) score -= 10;
    if (!event.data || Object.keys(event.data).length === 0) score -= 25;

    // Check data validity
    if (event.contractAddress && !this.isValidAddress(event.contractAddress, event.network)) {
      score -= 30;
    }

    // Check consistency
    if (event.timestamp && Math.abs(event.timestamp - Date.now() / 1000) > 86400) {
      score -= 10; // Event more than 1 day old/future
    }

    return Math.max(0, score);
  }

  private isValidAddress(address: string, network: string): boolean {
    switch (network) {
      case 'ethereum':
      case 'polygon':
      case 'arbitrum':
        return /^0x[a-fA-F0-9]{40}$/.test(address);
      case 'solana':
        try {
          new PublicKey(address);
          return true;
        } catch {
          return false;
        }
      default:
        return false;
    }
  }
}

// Token data enricher implementation
export class TokenDataEnricher implements DataEnricher {
  private tokenCache: Map<string, TokenMetadata> = new Map();
  private priceCache: Map<string, PriceData> = new Map();

  canEnrich(event: NormalizedEvent): boolean {
    return ['ERC20_TRANSFER', 'SPL_TOKEN_TRANSFER', 'UNISWAP_SWAP'].includes(event.type);
  }

  async enrich(event: ProcessedEvent): Promise<ProcessedEvent> {
    if (event.type === 'ERC20_TRANSFER' || event.type === 'SPL_TOKEN_TRANSFER') {
      return await this.enrichTokenTransfer(event);
    } else if (event.type.includes('SWAP')) {
      return await this.enrichSwapEvent(event);
    }

    return event;
  }

  private async enrichTokenTransfer(event: ProcessedEvent): Promise<ProcessedEvent> {
    const tokenAddress = event.contractAddress;
    
    // Get token metadata
    const tokenMetadata = await this.getTokenMetadata(tokenAddress, event.network);
    if (tokenMetadata) {
      event.data.token = {
        ...event.data.token,
        ...tokenMetadata
      };
    }

    // Get price data
    const priceData = await this.getPriceData(tokenAddress, event.network);
    if (priceData) {
      event.data.priceUSD = priceData.price;
      event.data.marketCap = priceData.marketCap;
      event.data.volume24h = priceData.volume24h;
      
      // Calculate USD value
      if (event.data.valueFormatted && priceData.price) {
        event.data.valueUSD = parseFloat(event.data.valueFormatted) * priceData.price;
      }
    }

    return event;
  }

  private async getTokenMetadata(address: string, network: string): Promise<TokenMetadata | null> {
    const cacheKey = `${network}:${address}`;
    
    if (this.tokenCache.has(cacheKey)) {
      return this.tokenCache.get(cacheKey)!;
    }

    try {
      // Implementation depends on network and data sources
      const metadata = await this.fetchTokenMetadata(address, network);
      
      if (metadata) {
        this.tokenCache.set(cacheKey, metadata);
        // Cache for 1 hour
        setTimeout(() => this.tokenCache.delete(cacheKey), 3600000);
      }

      return metadata;
      
    } catch (error) {
      logger.warn(`Failed to get token metadata for ${address}:`, error);
      return null;
    }
  }

  private async fetchTokenMetadata(address: string, network: string): Promise<TokenMetadata | null> {
    // Implementation would call external APIs like CoinGecko, CoinMarketCap, etc.
    // This is a placeholder for the actual implementation
    return {
      address,
      name: 'Unknown Token',
      symbol: 'UNK',
      decimals: 18,
      totalSupply: '0',
      logoURI: null,
      website: null,
      description: null
    };
  }
}
```

#### Performance Monitoring and Optimization

**Real-Time Performance Tracking**

```typescript
// Comprehensive performance monitoring system
export class DataAgentMonitor {
  private metrics: Map<string, MetricCollector> = new Map();
  private alertManager: AlertManager;
  private dashboardUpdater: DashboardUpdater;
  
  constructor(private config: MonitoringConfig) {
    this.alertManager = new AlertManager(config.alerts);
    this.dashboardUpdater = new DashboardUpdater(config.dashboard);
    
    this.initializeMetrics();
    this.startMonitoring();
  }

  private initializeMetrics() {
    // Network-specific metrics
    for (const network of ['ethereum', 'solana', 'polygon']) {
      this.metrics.set(`${network}_latency`, new LatencyMetric());
      this.metrics.set(`${network}_throughput`, new ThroughputMetric());
      this.metrics.set(`${network}_error_rate`, new ErrorRateMetric());
      this.metrics.set(`${network}_connection_health`, new ConnectionHealthMetric());
    }

    // Processing metrics
    this.metrics.set('processing_queue_depth', new QueueDepthMetric());
    this.metrics.set('data_quality_score', new DataQualityMetric());
    this.metrics.set('cache_hit_rate', new CacheHitRateMetric());
    this.metrics.set('memory_usage', new MemoryUsageMetric());
  }

  private startMonitoring() {
    // Collect metrics every 30 seconds
    setInterval(() => {
      this.collectMetrics();
    }, 30000);

    // Update dashboard every minute
    setInterval(() => {
      this.updateDashboard();
    }, 60000);

    // Check alerts every 10 seconds
    setInterval(() => {
      this.checkAlerts();
    }, 10000);
  }

  private async collectMetrics() {
    const timestamp = Date.now();
    
    for (const [name, metric] of this.metrics) {
      try {
        const value = await metric.collect();
        
        // Store metric value
        await this.storeMetric(name, value, timestamp);
        
        // Check for anomalies
        if (this.isAnomalous(name, value)) {
          this.handleAnomaly(name, value);
        }
        
      } catch (error) {
        logger.error(`Failed to collect metric ${name}:`, error);
      }
    }
  }

  recordEventProcessing(network: string, processingTime: number, eventCount: number) {
    // Update throughput metrics
    const throughputMetric = this.metrics.get(`${network}_throughput`);
    if (throughputMetric) {
      throughputMetric.record(eventCount);
    }

    // Update latency metrics
    const latencyMetric = this.metrics.get(`${network}_latency`);
    if (latencyMetric) {
      latencyMetric.record(processingTime);
    }

    // Check for performance issues
    if (processingTime > this.config.latencyThresholds[network]) {
      this.alertManager.trigger({
        type: 'HIGH_LATENCY',
        network,
        value: processingTime,
        threshold: this.config.latencyThresholds[network]
      });
    }
  }

  recordError(network: string, error: Error, context?: any) {
    const errorMetric = this.metrics.get(`${network}_error_rate`);
    if (errorMetric) {
      errorMetric.record(1);
    }

    // Log error with context
    logger.error(`Data Agent error on ${network}:`, error, context);

    // Check error rate threshold
    const errorRate = errorMetric?.getCurrentRate() || 0;
    if (errorRate > this.config.errorRateThreshold) {
      this.alertManager.trigger({
        type: 'HIGH_ERROR_RATE',
        network,
        value: errorRate,
        threshold: this.config.errorRateThreshold
      });
    }
  }

  getNetworkStatus(network: string): NetworkStatus {
    const latency = this.metrics.get(`${network}_latency`)?.getCurrentValue() || 0;
    const throughput = this.metrics.get(`${network}_throughput`)?.getCurrentValue() || 0;
    const errorRate = this.metrics.get(`${network}_error_rate`)?.getCurrentRate() || 0;
    const connectionHealth = this.metrics.get(`${network}_connection_health`)?.getCurrentValue() || 0;

    let status: 'healthy' | 'degraded' | 'critical' = 'healthy';
    
    if (errorRate > 0.1 || latency > 5000 || connectionHealth < 0.8) {
      status = 'critical';
    } else if (errorRate > 0.05 || latency > 2000 || connectionHealth < 0.9) {
      status = 'degraded';
    }

    return {
      network,
      status,
      metrics: {
        latency,
        throughput,
        errorRate,
        connectionHealth
      },
      lastUpdate: Date.now()
    };
  }

  generatePerformanceReport(): PerformanceReport {
    const report: PerformanceReport = {
      timestamp: Date.now(),
      networks: {},
      overall: {
        totalEventsProcessed: 0,
        averageLatency: 0,
        overallErrorRate: 0,
        systemHealth: 'healthy'
      }
    };

    // Collect network-specific data
    for (const network of ['ethereum', 'solana', 'polygon']) {
      report.networks[network] = this.getNetworkStatus(network);
      report.overall.totalEventsProcessed += report.networks[network].metrics.throughput;
    }

    // Calculate overall metrics
    const networkCount = Object.keys(report.networks).length;
    report.overall.averageLatency = Object.values(report.networks)
      .reduce((sum, net) => sum + net.metrics.latency, 0) / networkCount;
    
    report.overall.overallErrorRate = Object.values(report.networks)
      .reduce((sum, net) => sum + net.metrics.errorRate, 0) / networkCount;

    // Determine overall system health
    const criticalNetworks = Object.values(report.networks)
      .filter(net => net.status === 'critical').length;
    
    if (criticalNetworks > 0) {
      report.overall.systemHealth = 'critical';
    } else if (Object.values(report.networks).some(net => net.status === 'degraded')) {
      report.overall.systemHealth = 'degraded';
    }

    return report;
  }
}

// Metric collector implementations
class LatencyMetric implements MetricCollector {
  private samples: number[] = [];
  private maxSamples = 100;

  async collect(): Promise<number> {
    return this.getCurrentValue();
  }

  record(value: number) {
    this.samples.push(value);
    if (this.samples.length > this.maxSamples) {
      this.samples.shift();
    }
  }

  getCurrentValue(): number {
    if (this.samples.length === 0) return 0;
    return this.samples.reduce((sum, val) => sum + val, 0) / this.samples.length;
  }

  getPercentile(p: number): number {
    if (this.samples.length === 0) return 0;
    
    const sorted = [...this.samples].sort((a, b) => a - b);
    const index = Math.ceil((p / 100) * sorted.length) - 1;
    return sorted[Math.max(0, index)];
  }
}

class ThroughputMetric implements MetricCollector {
  private events: Array<{ count: number; timestamp: number }> = [];
  private windowSize = 60000; // 1 minute window

  async collect(): Promise<number> {
    this.cleanOldEvents();
    return this.getCurrentValue();
  }

  record(eventCount: number) {
    this.events.push({
      count: eventCount,
      timestamp: Date.now()
    });
  }

  getCurrentValue(): number {
    this.cleanOldEvents();
    return this.events.reduce((sum, event) => sum + event.count, 0);
  }

  private cleanOldEvents() {
    const cutoff = Date.now() - this.windowSize;
    this.events = this.events.filter(event => event.timestamp > cutoff);
  }
}
```

This comprehensive Data Agent documentation provides the technical foundation for understanding and implementing the blockchain data collection system that powers Kaizen AI's analytical capabilities across multiple networks and data sources.
