Data Agent
On-Chain Data Collection
Component Overview
The Data Agent serves as the foundational intelligence layer of Kaizen AI, responsible for real-time collection, processing, and normalization of blockchain data across multiple networks. This agent operates as a high-throughput, low-latency data pipeline that transforms raw blockchain events into structured, analyzable information for downstream analytical components.
Core Responsibilities:
Real-time blockchain event monitoring and capture
Smart contract interaction analysis and state tracking
Token transfer and liquidity movement detection
Market data aggregation from decentralized exchanges
Cross-chain data correlation and synchronization
Data validation, enrichment, and quality assurance
Architecture Philosophy: The Data Agent implements a modular, event-driven architecture that can scale horizontally to handle increasing blockchain activity while maintaining sub-second latency for critical events. The design emphasizes fault tolerance, data integrity, and seamless integration with analytical downstream systems.
Multi-Chain Collection Framework
Network Support Architecture
// Core interfaces for multi-chain data collection
interface BlockchainNetwork {
readonly name: string;
readonly chainId: number | string;
readonly type: 'evm' | 'solana' | 'bitcoin';
readonly rpcEndpoints: RPCEndpoint[];
readonly wsEndpoints: WebSocketEndpoint[];
readonly explorerAPI?: ExplorerAPI;
}
interface DataCollector {
network: BlockchainNetwork;
isConnected: boolean;
lastBlockProcessed: number | string;
connect(): Promise<void>;
disconnect(): Promise<void>;
subscribe(filters: EventFilter[]): Promise<void>;
getLatestBlock(): Promise<Block>;
processBlock(block: Block): Promise<ProcessedEvent[]>;
validateData(data: RawBlockchainData): ValidationResult;
}
// Multi-chain coordinator
export class MultiChainDataCollector {
private collectors: Map<string, DataCollector> = new Map();
private eventBus: EventEmitter;
private dataProcessor: DataProcessor;
private healthMonitor: HealthMonitor;
constructor(
private config: DataAgentConfig,
private database: DatabaseInterface,
private messageQueue: MessageQueueInterface
) {
this.eventBus = new EventEmitter();
this.dataProcessor = new DataProcessor(config.processing);
this.healthMonitor = new HealthMonitor(config.monitoring);
this.initializeCollectors();
this.setupEventHandlers();
}
private async initializeCollectors() {
const networks = [
new EthereumCollector(this.config.ethereum),
new SolanaCollector(this.config.solana),
new PolygonCollector(this.config.polygon),
new ArbitrumCollector(this.config.arbitrum)
];
for (const collector of networks) {
try {
await collector.connect();
this.collectors.set(collector.network.name, collector);
// Set up event subscriptions
await this.subscribeToNetworkEvents(collector);
logger.info(`Connected to ${collector.network.name} network`);
} catch (error) {
logger.error(`Failed to connect to ${collector.network.name}:`, error);
// Schedule retry
setTimeout(() => this.retryConnection(collector), 5000);
}
}
}
private async subscribeToNetworkEvents(collector: DataCollector) {
const eventFilters = this.buildEventFilters(collector.network);
await collector.subscribe(eventFilters);
// Set up event listeners
collector.on('newBlock', (block) => this.handleNewBlock(collector.network, block));
collector.on('newTransaction', (tx) => this.handleNewTransaction(collector.network, tx));
collector.on('contractEvent', (event) => this.handleContractEvent(collector.network, event));
collector.on('error', (error) => this.handleCollectorError(collector.network, error));
}
}
Ethereum Data Collection
Advanced Ethereum Integration
// Ethereum-specific data collector implementation
export class EthereumCollector implements DataCollector {
public readonly network: BlockchainNetwork;
private providers: Map<string, ethers.Provider> = new Map();
private currentProvider: ethers.Provider;
private wsProvider: ethers.WebSocketProvider;
private eventFilters: Map<string, ethers.Filter> = new Map();
constructor(private config: EthereumConfig) {
this.network = {
name: 'ethereum',
chainId: 1,
type: 'evm',
rpcEndpoints: config.rpcEndpoints,
wsEndpoints: config.wsEndpoints
};
this.initializeProviders();
}
private initializeProviders() {
// Initialize multiple providers for redundancy
const providers = [
new ethers.AlchemyProvider('homestead', this.config.alchemyKey),
new ethers.InfuraProvider('homestead', this.config.infuraKey),
new ethers.JsonRpcProvider(this.config.customRPCUrl)
];
providers.forEach((provider, index) => {
this.providers.set(`provider_${index}`, provider);
});
// Primary provider with automatic failover
this.currentProvider = new ethers.FallbackProvider(
Array.from(this.providers.values()).map((provider, index) => ({
provider,
priority: index + 1,
weight: 1
}))
);
// WebSocket provider for real-time events
this.wsProvider = new ethers.WebSocketProvider(
this.config.wsEndpoints[0].url,
'homestead'
);
}
async connect(): Promise<void> {
try {
// Test connection
const blockNumber = await this.currentProvider.getBlockNumber();
const network = await this.currentProvider.getNetwork();
logger.info(`Connected to Ethereum network ${network.name}, block: ${blockNumber}`);
// Set up WebSocket connection
await this.setupWebSocketConnection();
this.isConnected = true;
this.lastBlockProcessed = blockNumber;
} catch (error) {
logger.error('Failed to connect to Ethereum network:', error);
throw new ConnectionError('Ethereum connection failed', error);
}
}
private async setupWebSocketConnection() {
this.wsProvider.on('block', (blockNumber) => {
this.handleNewBlock(blockNumber);
});
this.wsProvider.on('error', (error) => {
logger.error('WebSocket error:', error);
this.handleWebSocketError(error);
});
// Set up specific event filters
await this.setupEventFilters();
}
private async setupEventFilters() {
const commonFilters = [
// ERC-20 Transfer events
{
name: 'ERC20_Transfer',
topics: ['0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'],
handler: this.handleERC20Transfer.bind(this)
},
// Uniswap V2 Swap events
{
name: 'UniswapV2_Swap',
address: '0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f',
topics: ['0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822'],
handler: this.handleUniswapSwap.bind(this)
},
// Uniswap V3 Swap events
{
name: 'UniswapV3_Swap',
topics: ['0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67'],
handler: this.handleUniswapV3Swap.bind(this)
},
// Liquidity events
{
name: 'Liquidity_Add',
topics: ['0x4c209b5fc8ad50758f13e2e1088ba56a560dff690a1c6fef26394f4c03821c4f'],
handler: this.handleLiquidityAdd.bind(this)
}
];
for (const filter of commonFilters) {
const ethersFilter = {
address: filter.address,
topics: filter.topics
};
this.eventFilters.set(filter.name, ethersFilter);
this.wsProvider.on(ethersFilter, filter.handler);
}
}
async processBlock(blockNumber: number): Promise<ProcessedEvent[]> {
const startTime = Date.now();
try {
// Get full block with transactions
const block = await this.currentProvider.getBlock(blockNumber, true);
if (!block || !block.transactions) {
return [];
}
const processedEvents: ProcessedEvent[] = [];
// Process all transactions in parallel
const transactionPromises = block.transactions.map(async (tx) => {
if (typeof tx === 'string') return []; // Skip if only hash
return await this.processTransaction(tx as ethers.TransactionResponse, block);
});
const transactionResults = await Promise.all(transactionPromises);
// Flatten results
for (const events of transactionResults) {
processedEvents.push(...events);
}
// Update metrics
const processingTime = Date.now() - startTime;
this.updateMetrics({
blockNumber,
transactionCount: block.transactions.length,
eventCount: processedEvents.length,
processingTime
});
this.lastBlockProcessed = blockNumber;
return processedEvents;
} catch (error) {
logger.error(`Failed to process block ${blockNumber}:`, error);
throw new ProcessingError(`Block processing failed: ${blockNumber}`, error);
}
}
private async processTransaction(
tx: ethers.TransactionResponse,
block: ethers.Block
): Promise<ProcessedEvent[]> {
const events: ProcessedEvent[] = [];
try {
// Get transaction receipt for events
const receipt = await this.currentProvider.getTransactionReceipt(tx.hash);
if (!receipt) return events;
// Process each log in the transaction
for (const log of receipt.logs) {
const processedEvent = await this.processLog(log, tx, block);
if (processedEvent) {
events.push(processedEvent);
}
}
// Check for failed transactions
if (receipt.status === 0) {
events.push({
type: 'TRANSACTION_FAILED',
contractAddress: tx.to || '',
transactionHash: tx.hash,
blockNumber: block.number,
timestamp: block.timestamp,
data: {
from: tx.from,
to: tx.to,
value: tx.value.toString(),
gasUsed: receipt.gasUsed.toString(),
gasPrice: tx.gasPrice?.toString()
},
network: 'ethereum'
});
}
return events;
} catch (error) {
logger.warn(`Failed to process transaction ${tx.hash}:`, error);
return events;
}
}
private async processLog(
log: ethers.Log,
tx: ethers.TransactionResponse,
block: ethers.Block
): Promise<ProcessedEvent | null> {
try {
const eventSignature = log.topics[0];
// Match against known event signatures
switch (eventSignature) {
case '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef':
return await this.parseERC20Transfer(log, tx, block);
case '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822':
return await this.parseUniswapV2Swap(log, tx, block);
case '0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67':
return await this.parseUniswapV3Swap(log, tx, block);
default:
// Try to decode using contract ABI if available
return await this.tryDecodeGenericEvent(log, tx, block);
}
} catch (error) {
logger.debug(`Failed to parse log ${log.transactionHash}:${log.logIndex}`, error);
return null;
}
}
private async parseERC20Transfer(
log: ethers.Log,
tx: ethers.TransactionResponse,
block: ethers.Block
): Promise<ProcessedEvent> {
// Decode Transfer event: Transfer(address indexed from, address indexed to, uint256 value)
const fromAddress = ethers.getAddress('0x' + log.topics[1].slice(26));
const toAddress = ethers.getAddress('0x' + log.topics[2].slice(26));
const value = BigInt(log.data);
// Get token information
const tokenInfo = await this.getTokenInfo(log.address);
return {
type: 'ERC20_TRANSFER',
contractAddress: log.address,
transactionHash: log.transactionHash,
blockNumber: block.number,
timestamp: block.timestamp,
logIndex: log.index,
data: {
from: fromAddress,
to: toAddress,
value: value.toString(),
valueFormatted: ethers.formatUnits(value, tokenInfo?.decimals || 18),
token: {
address: log.address,
symbol: tokenInfo?.symbol,
name: tokenInfo?.name,
decimals: tokenInfo?.decimals
}
},
network: 'ethereum',
gasUsed: tx.gasLimit?.toString(),
gasPrice: tx.gasPrice?.toString()
};
}
private async parseUniswapV2Swap(
log: ethers.Log,
tx: ethers.TransactionResponse,
block: ethers.Block
): Promise<ProcessedEvent> {
// Decode Swap event: Swap(address indexed sender, uint amount0In, uint amount1In, uint amount0Out, uint amount1Out, address indexed to)
const iface = new ethers.Interface([
'event Swap(address indexed sender, uint amount0In, uint amount1In, uint amount0Out, uint amount1Out, address indexed to)'
]);
const decoded = iface.parseLog({
topics: log.topics,
data: log.data
});
// Get pair information
const pairInfo = await this.getUniswapV2PairInfo(log.address);
return {
type: 'UNISWAP_V2_SWAP',
contractAddress: log.address,
transactionHash: log.transactionHash,
blockNumber: block.number,
timestamp: block.timestamp,
logIndex: log.index,
data: {
sender: decoded!.args.sender,
recipient: decoded!.args.to,
amount0In: decoded!.args.amount0In.toString(),
amount1In: decoded!.args.amount1In.toString(),
amount0Out: decoded!.args.amount0Out.toString(),
amount1Out: decoded!.args.amount1Out.toString(),
pair: pairInfo
},
network: 'ethereum'
};
}
// Token information caching
private tokenInfoCache: Map<string, TokenInfo> = new Map();
private async getTokenInfo(tokenAddress: string): Promise<TokenInfo | null> {
// Check cache first
if (this.tokenInfoCache.has(tokenAddress)) {
return this.tokenInfoCache.get(tokenAddress)!;
}
try {
const contract = new ethers.Contract(tokenAddress, [
'function name() view returns (string)',
'function symbol() view returns (string)',
'function decimals() view returns (uint8)',
'function totalSupply() view returns (uint256)'
], this.currentProvider);
const [name, symbol, decimals, totalSupply] = await Promise.all([
contract.name(),
contract.symbol(),
contract.decimals(),
contract.totalSupply()
]);
const tokenInfo: TokenInfo = {
address: tokenAddress,
name,
symbol,
decimals,
totalSupply: totalSupply.toString()
};
// Cache for 1 hour
this.tokenInfoCache.set(tokenAddress, tokenInfo);
setTimeout(() => this.tokenInfoCache.delete(tokenAddress), 3600000);
return tokenInfo;
} catch (error) {
logger.warn(`Failed to get token info for ${tokenAddress}:`, error);
return null;
}
}
// Event handlers
private async handleNewBlock(blockNumber: number) {
try {
const events = await this.processBlock(blockNumber);
if (events.length > 0) {
// Emit to event bus for downstream processing
this.emit('blockProcessed', {
network: 'ethereum',
blockNumber,
eventCount: events.length,
events
});
// Send to message queue for async processing
await this.messageQueue.publish('blockchain-events', {
network: 'ethereum',
events
});
}
} catch (error) {
logger.error(`Failed to handle new block ${blockNumber}:`, error);
this.emit('error', error);
}
}
private handleWebSocketError(error: Error) {
logger.error('WebSocket connection error:', error);
// Attempt reconnection
setTimeout(async () => {
try {
await this.reconnectWebSocket();
} catch (reconnectError) {
logger.error('WebSocket reconnection failed:', reconnectError);
}
}, 5000);
}
private async reconnectWebSocket() {
this.wsProvider.removeAllListeners();
this.wsProvider = new ethers.WebSocketProvider(
this.config.wsEndpoints[0].url,
'homestead'
);
await this.setupWebSocketConnection();
logger.info('WebSocket reconnected successfully');
}
}
Solana Data Collection
Solana-Specific Implementation
// Solana data collector with program-specific monitoring
export class SolanaCollector implements DataCollector {
public readonly network: BlockchainNetwork;
private connection: Connection;
private commitment: Commitment = 'confirmed';
private subscriptions: Map<string, number> = new Map();
private programWatchers: Map<string, ProgramWatcher> = new Map();
constructor(private config: SolanaConfig) {
this.network = {
name: 'solana',
chainId: 'mainnet-beta',
type: 'solana',
rpcEndpoints: config.rpcEndpoints,
wsEndpoints: config.wsEndpoints
};
this.connection = new Connection(
config.rpcEndpoints[0].url,
{
commitment: this.commitment,
wsEndpoint: config.wsEndpoints[0].url,
confirmTransactionInitialTimeout: 60000
}
);
}
async connect(): Promise<void> {
try {
// Test connection
const slot = await this.connection.getSlot();
const epochInfo = await this.connection.getEpochInfo();
logger.info(`Connected to Solana network, slot: ${slot}, epoch: ${epochInfo.epoch}`);
// Set up program watchers
await this.setupProgramWatchers();
this.isConnected = true;
this.lastBlockProcessed = slot;
} catch (error) {
logger.error('Failed to connect to Solana network:', error);
throw new ConnectionError('Solana connection failed', error);
}
}
private async setupProgramWatchers() {
const programsToWatch = [
// SPL Token Program
{
programId: 'TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA',
name: 'SPL_Token',
handler: this.handleTokenProgramInstruction.bind(this)
},
// Serum DEX
{
programId: '9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin',
name: 'Serum_DEX',
handler: this.handleSerumInstruction.bind(this)
},
// Raydium AMM
{
programId: '675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8',
name: 'Raydium_AMM',
handler: this.handleRaydiumInstruction.bind(this)
},
// Jupiter Aggregator
{
programId: 'JUP6LkbZbjS1jKKwapdHNy74zcZ3tLUZoi5QNyVTaV4',
name: 'Jupiter',
handler: this.handleJupiterInstruction.bind(this)
}
];
for (const program of programsToWatch) {
try {
const watcher = new ProgramWatcher(
this.connection,
new PublicKey(program.programId),
program.handler
);
await watcher.start();
this.programWatchers.set(program.name, watcher);
logger.info(`Started watching ${program.name} program`);
} catch (error) {
logger.error(`Failed to set up watcher for ${program.name}:`, error);
}
}
// Set up slot subscription
const slotSubscription = this.connection.onSlotChange((slotInfo) => {
this.handleNewSlot(slotInfo);
});
this.subscriptions.set('slot', slotSubscription);
}
async processSlot(slot: number): Promise<ProcessedEvent[]> {
const startTime = Date.now();
try {
// Get block with full transaction details
const block = await this.connection.getBlock(slot, {
commitment: this.commitment,
maxSupportedTransactionVersion: 0,
transactionDetails: 'full',
rewards: false
});
if (!block || !block.transactions) {
return [];
}
const processedEvents: ProcessedEvent[] = [];
// Process all transactions
for (const tx of block.transactions) {
const events = await this.processTransaction(tx, block, slot);
processedEvents.push(...events);
}
// Update metrics
const processingTime = Date.now() - startTime;
this.updateMetrics({
slot,
transactionCount: block.transactions.length,
eventCount: processedEvents.length,
processingTime
});
this.lastBlockProcessed = slot;
return processedEvents;
} catch (error) {
logger.error(`Failed to process slot ${slot}:`, error);
throw new ProcessingError(`Slot processing failed: ${slot}`, error);
}
}
private async processTransaction(
tx: VersionedTransactionWithMeta,
block: BlockResponse,
slot: number
): Promise<ProcessedEvent[]> {
const events: ProcessedEvent[] = [];
if (!tx.meta || tx.meta.err) {
// Handle failed transactions
if (tx.meta?.err) {
events.push({
type: 'TRANSACTION_FAILED',
contractAddress: '',
transactionHash: '',
blockNumber: slot,
timestamp: block.blockTime || 0,
data: {
error: tx.meta.err,
signature: tx.transaction.signatures[0]
},
network: 'solana'
});
}
return events;
}
// Process instructions
const transaction = tx.transaction;
const message = transaction.message;
for (let i = 0; i < message.compiledInstructions.length; i++) {
const instruction = message.compiledInstructions[i];
const programId = message.staticAccountKeys[instruction.programIdIndex];
const event = await this.processInstruction(
instruction,
programId,
message.staticAccountKeys,
tx.meta,
slot,
block.blockTime || 0,
tx.transaction.signatures[0]
);
if (event) {
events.push(event);
}
}
return events;
}
private async processInstruction(
instruction: CompiledInstruction,
programId: PublicKey,
accountKeys: PublicKey[],
meta: TransactionMeta,
slot: number,
blockTime: number,
signature: string
): Promise<ProcessedEvent | null> {
const programIdString = programId.toString();
switch (programIdString) {
case 'TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA':
return await this.parseTokenInstruction(
instruction, accountKeys, meta, slot, blockTime, signature
);
case '9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin':
return await this.parseSerumInstruction(
instruction, accountKeys, meta, slot, blockTime, signature
);
case '675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8':
return await this.parseRaydiumInstruction(
instruction, accountKeys, meta, slot, blockTime, signature
);
default:
return null;
}
}
private async parseTokenInstruction(
instruction: CompiledInstruction,
accountKeys: PublicKey[],
meta: TransactionMeta,
slot: number,
blockTime: number,
signature: string
): Promise<ProcessedEvent | null> {
try {
// Decode instruction data
const instructionType = instruction.data[0];
switch (instructionType) {
case 3: // Transfer
return this.parseTokenTransfer(instruction, accountKeys, meta, slot, blockTime, signature);
case 7: // MintTo
return this.parseTokenMint(instruction, accountKeys, meta, slot, blockTime, signature);
case 8: // Burn
return this.parseTokenBurn(instruction, accountKeys, meta, slot, blockTime, signature);
default:
return null;
}
} catch (error) {
logger.debug(`Failed to parse token instruction:`, error);
return null;
}
}
private parseTokenTransfer(
instruction: CompiledInstruction,
accountKeys: PublicKey[],
meta: TransactionMeta,
slot: number,
blockTime: number,
signature: string
): ProcessedEvent {
// Parse transfer instruction: source, destination, authority, amount
const sourceAccount = accountKeys[instruction.accounts[0]];
const destinationAccount = accountKeys[instruction.accounts[1]];
const authority = accountKeys[instruction.accounts[2]];
// Decode amount from instruction data (bytes 1-8)
const amountBuffer = Buffer.from(instruction.data.slice(1, 9));
const amount = amountBuffer.readBigUInt64LE(0);
return {
type: 'SPL_TOKEN_TRANSFER',
contractAddress: sourceAccount.toString(),
transactionHash: signature,
blockNumber: slot,
timestamp: blockTime,
data: {
source: sourceAccount.toString(),
destination: destinationAccount.toString(),
authority: authority.toString(),
amount: amount.toString()
},
network: 'solana'
};
}
// Program-specific watchers
private async handleTokenProgramInstruction(instruction: ParsedInstruction) {
// Handle SPL Token program instructions
this.emit('tokenInstruction', {
network: 'solana',
instruction
});
}
private async handleSerumInstruction(instruction: ParsedInstruction) {
// Handle Serum DEX instructions
this.emit('serumInstruction', {
network: 'solana',
instruction
});
}
private async handleRaydiumInstruction(instruction: ParsedInstruction) {
// Handle Raydium AMM instructions
this.emit('raydiumInstruction', {
network: 'solana',
instruction
});
}
private async handleJupiterInstruction(instruction: ParsedInstruction) {
// Handle Jupiter aggregator instructions
this.emit('jupiterInstruction', {
network: 'solana',
instruction
});
}
private handleNewSlot(slotInfo: SlotInfo) {
// Process new slot
this.processSlot(slotInfo.slot).catch(error => {
logger.error(`Failed to process new slot ${slotInfo.slot}:`, error);
});
}
}
Event Processing and Validation
Data Processing Pipeline
// Advanced data processing and validation system
export class DataProcessor {
private validators: Map<string, DataValidator> = new Map();
private enrichers: Map<string, DataEnricher> = new Map();
private normalizers: Map<string, DataNormalizer> = new Map();
constructor(private config: ProcessingConfig) {
this.initializeProcessors();
}
private initializeProcessors() {
// Initialize validators
this.validators.set('ethereum', new EthereumValidator());
this.validators.set('solana', new SolanaValidator());
this.validators.set('polygon', new PolygonValidator());
// Initialize enrichers
this.enrichers.set('token', new TokenDataEnricher());
this.enrichers.set('price', new PriceDataEnricher());
this.enrichers.set('liquidity', new LiquidityDataEnricher());
// Initialize normalizers
this.normalizers.set('transfer', new TransferNormalizer());
this.normalizers.set('swap', new SwapNormalizer());
this.normalizers.set('liquidity', new LiquidityNormalizer());
}
async processEvents(events: RawBlockchainEvent[]): Promise<ProcessedEvent[]> {
const processedEvents: ProcessedEvent[] = [];
for (const event of events) {
try {
// Step 1: Validate raw event data
const validationResult = await this.validateEvent(event);
if (!validationResult.isValid) {
logger.warn(`Event validation failed: ${validationResult.error}`, { event });
continue;
}
// Step 2: Normalize event format
const normalizedEvent = await this.normalizeEvent(event);
if (!normalizedEvent) {
logger.warn('Event normalization failed', { event });
continue;
}
// Step 3: Enrich with additional data
const enrichedEvent = await this.enrichEvent(normalizedEvent);
// Step 4: Final validation and quality check
const qualityScore = this.calculateQualityScore(enrichedEvent);
enrichedEvent.qualityScore = qualityScore;
processedEvents.push(enrichedEvent);
} catch (error) {
logger.error('Failed to process event:', error, { event });
}
}
return processedEvents;
}
private async validateEvent(event: RawBlockchainEvent): Promise<ValidationResult> {
const validator = this.validators.get(event.network);
if (!validator) {
return {
isValid: false,
error: `No validator found for network: ${event.network}`
};
}
return await validator.validate(event);
}
private async normalizeEvent(event: RawBlockchainEvent): Promise<NormalizedEvent | null> {
const normalizer = this.normalizers.get(event.type);
if (!normalizer) {
// Use generic normalizer
return this.genericNormalize(event);
}
return await normalizer.normalize(event);
}
private async enrichEvent(event: NormalizedEvent): Promise<ProcessedEvent> {
let enrichedEvent = { ...event } as ProcessedEvent;
// Apply all relevant enrichers
for (const [type, enricher] of this.enrichers) {
if (enricher.canEnrich(event)) {
enrichedEvent = await enricher.enrich(enrichedEvent);
}
}
return enrichedEvent;
}
private calculateQualityScore(event: ProcessedEvent): number {
let score = 100;
// Check data completeness
if (!event.contractAddress) score -= 20;
if (!event.transactionHash) score -= 15;
if (!event.timestamp) score -= 10;
if (!event.data || Object.keys(event.data).length === 0) score -= 25;
// Check data validity
if (event.contractAddress && !this.isValidAddress(event.contractAddress, event.network)) {
score -= 30;
}
// Check consistency
if (event.timestamp && Math.abs(event.timestamp - Date.now() / 1000) > 86400) {
score -= 10; // Event more than 1 day old/future
}
return Math.max(0, score);
}
private isValidAddress(address: string, network: string): boolean {
switch (network) {
case 'ethereum':
case 'polygon':
case 'arbitrum':
return /^0x[a-fA-F0-9]{40}$/.test(address);
case 'solana':
try {
new PublicKey(address);
return true;
} catch {
return false;
}
default:
return false;
}
}
}
// Token data enricher implementation
export class TokenDataEnricher implements DataEnricher {
private tokenCache: Map<string, TokenMetadata> = new Map();
private priceCache: Map<string, PriceData> = new Map();
canEnrich(event: NormalizedEvent): boolean {
return ['ERC20_TRANSFER', 'SPL_TOKEN_TRANSFER', 'UNISWAP_SWAP'].includes(event.type);
}
async enrich(event: ProcessedEvent): Promise<ProcessedEvent> {
if (event.type === 'ERC20_TRANSFER' || event.type === 'SPL_TOKEN_TRANSFER') {
return await this.enrichTokenTransfer(event);
} else if (event.type.includes('SWAP')) {
return await this.enrichSwapEvent(event);
}
return event;
}
private async enrichTokenTransfer(event: ProcessedEvent): Promise<ProcessedEvent> {
const tokenAddress = event.contractAddress;
// Get token metadata
const tokenMetadata = await this.getTokenMetadata(tokenAddress, event.network);
if (tokenMetadata) {
event.data.token = {
...event.data.token,
...tokenMetadata
};
}
// Get price data
const priceData = await this.getPriceData(tokenAddress, event.network);
if (priceData) {
event.data.priceUSD = priceData.price;
event.data.marketCap = priceData.marketCap;
event.data.volume24h = priceData.volume24h;
// Calculate USD value
if (event.data.valueFormatted && priceData.price) {
event.data.valueUSD = parseFloat(event.data.valueFormatted) * priceData.price;
}
}
return event;
}
private async getTokenMetadata(address: string, network: string): Promise<TokenMetadata | null> {
const cacheKey = `${network}:${address}`;
if (this.tokenCache.has(cacheKey)) {
return this.tokenCache.get(cacheKey)!;
}
try {
// Implementation depends on network and data sources
const metadata = await this.fetchTokenMetadata(address, network);
if (metadata) {
this.tokenCache.set(cacheKey, metadata);
// Cache for 1 hour
setTimeout(() => this.tokenCache.delete(cacheKey), 3600000);
}
return metadata;
} catch (error) {
logger.warn(`Failed to get token metadata for ${address}:`, error);
return null;
}
}
private async fetchTokenMetadata(address: string, network: string): Promise<TokenMetadata | null> {
// Implementation would call external APIs like CoinGecko, CoinMarketCap, etc.
// This is a placeholder for the actual implementation
return {
address,
name: 'Unknown Token',
symbol: 'UNK',
decimals: 18,
totalSupply: '0',
logoURI: null,
website: null,
description: null
};
}
}
Performance Monitoring and Optimization
Real-Time Performance Tracking
// Comprehensive performance monitoring system
export class DataAgentMonitor {
private metrics: Map<string, MetricCollector> = new Map();
private alertManager: AlertManager;
private dashboardUpdater: DashboardUpdater;
constructor(private config: MonitoringConfig) {
this.alertManager = new AlertManager(config.alerts);
this.dashboardUpdater = new DashboardUpdater(config.dashboard);
this.initializeMetrics();
this.startMonitoring();
}
private initializeMetrics() {
// Network-specific metrics
for (const network of ['ethereum', 'solana', 'polygon']) {
this.metrics.set(`${network}_latency`, new LatencyMetric());
this.metrics.set(`${network}_throughput`, new ThroughputMetric());
this.metrics.set(`${network}_error_rate`, new ErrorRateMetric());
this.metrics.set(`${network}_connection_health`, new ConnectionHealthMetric());
}
// Processing metrics
this.metrics.set('processing_queue_depth', new QueueDepthMetric());
this.metrics.set('data_quality_score', new DataQualityMetric());
this.metrics.set('cache_hit_rate', new CacheHitRateMetric());
this.metrics.set('memory_usage', new MemoryUsageMetric());
}
private startMonitoring() {
// Collect metrics every 30 seconds
setInterval(() => {
this.collectMetrics();
}, 30000);
// Update dashboard every minute
setInterval(() => {
this.updateDashboard();
}, 60000);
// Check alerts every 10 seconds
setInterval(() => {
this.checkAlerts();
}, 10000);
}
private async collectMetrics() {
const timestamp = Date.now();
for (const [name, metric] of this.metrics) {
try {
const value = await metric.collect();
// Store metric value
await this.storeMetric(name, value, timestamp);
// Check for anomalies
if (this.isAnomalous(name, value)) {
this.handleAnomaly(name, value);
}
} catch (error) {
logger.error(`Failed to collect metric ${name}:`, error);
}
}
}
recordEventProcessing(network: string, processingTime: number, eventCount: number) {
// Update throughput metrics
const throughputMetric = this.metrics.get(`${network}_throughput`);
if (throughputMetric) {
throughputMetric.record(eventCount);
}
// Update latency metrics
const latencyMetric = this.metrics.get(`${network}_latency`);
if (latencyMetric) {
latencyMetric.record(processingTime);
}
// Check for performance issues
if (processingTime > this.config.latencyThresholds[network]) {
this.alertManager.trigger({
type: 'HIGH_LATENCY',
network,
value: processingTime,
threshold: this.config.latencyThresholds[network]
});
}
}
recordError(network: string, error: Error, context?: any) {
const errorMetric = this.metrics.get(`${network}_error_rate`);
if (errorMetric) {
errorMetric.record(1);
}
// Log error with context
logger.error(`Data Agent error on ${network}:`, error, context);
// Check error rate threshold
const errorRate = errorMetric?.getCurrentRate() || 0;
if (errorRate > this.config.errorRateThreshold) {
this.alertManager.trigger({
type: 'HIGH_ERROR_RATE',
network,
value: errorRate,
threshold: this.config.errorRateThreshold
});
}
}
getNetworkStatus(network: string): NetworkStatus {
const latency = this.metrics.get(`${network}_latency`)?.getCurrentValue() || 0;
const throughput = this.metrics.get(`${network}_throughput`)?.getCurrentValue() || 0;
const errorRate = this.metrics.get(`${network}_error_rate`)?.getCurrentRate() || 0;
const connectionHealth = this.metrics.get(`${network}_connection_health`)?.getCurrentValue() || 0;
let status: 'healthy' | 'degraded' | 'critical' = 'healthy';
if (errorRate > 0.1 || latency > 5000 || connectionHealth < 0.8) {
status = 'critical';
} else if (errorRate > 0.05 || latency > 2000 || connectionHealth < 0.9) {
status = 'degraded';
}
return {
network,
status,
metrics: {
latency,
throughput,
errorRate,
connectionHealth
},
lastUpdate: Date.now()
};
}
generatePerformanceReport(): PerformanceReport {
const report: PerformanceReport = {
timestamp: Date.now(),
networks: {},
overall: {
totalEventsProcessed: 0,
averageLatency: 0,
overallErrorRate: 0,
systemHealth: 'healthy'
}
};
// Collect network-specific data
for (const network of ['ethereum', 'solana', 'polygon']) {
report.networks[network] = this.getNetworkStatus(network);
report.overall.totalEventsProcessed += report.networks[network].metrics.throughput;
}
// Calculate overall metrics
const networkCount = Object.keys(report.networks).length;
report.overall.averageLatency = Object.values(report.networks)
.reduce((sum, net) => sum + net.metrics.latency, 0) / networkCount;
report.overall.overallErrorRate = Object.values(report.networks)
.reduce((sum, net) => sum + net.metrics.errorRate, 0) / networkCount;
// Determine overall system health
const criticalNetworks = Object.values(report.networks)
.filter(net => net.status === 'critical').length;
if (criticalNetworks > 0) {
report.overall.systemHealth = 'critical';
} else if (Object.values(report.networks).some(net => net.status === 'degraded')) {
report.overall.systemHealth = 'degraded';
}
return report;
}
}
// Metric collector implementations
class LatencyMetric implements MetricCollector {
private samples: number[] = [];
private maxSamples = 100;
async collect(): Promise<number> {
return this.getCurrentValue();
}
record(value: number) {
this.samples.push(value);
if (this.samples.length > this.maxSamples) {
this.samples.shift();
}
}
getCurrentValue(): number {
if (this.samples.length === 0) return 0;
return this.samples.reduce((sum, val) => sum + val, 0) / this.samples.length;
}
getPercentile(p: number): number {
if (this.samples.length === 0) return 0;
const sorted = [...this.samples].sort((a, b) => a - b);
const index = Math.ceil((p / 100) * sorted.length) - 1;
return sorted[Math.max(0, index)];
}
}
class ThroughputMetric implements MetricCollector {
private events: Array<{ count: number; timestamp: number }> = [];
private windowSize = 60000; // 1 minute window
async collect(): Promise<number> {
this.cleanOldEvents();
return this.getCurrentValue();
}
record(eventCount: number) {
this.events.push({
count: eventCount,
timestamp: Date.now()
});
}
getCurrentValue(): number {
this.cleanOldEvents();
return this.events.reduce((sum, event) => sum + event.count, 0);
}
private cleanOldEvents() {
const cutoff = Date.now() - this.windowSize;
this.events = this.events.filter(event => event.timestamp > cutoff);
}
}
This comprehensive Data Agent documentation provides the technical foundation for understanding and implementing the blockchain data collection system that powers Kaizen AI's analytical capabilities across multiple networks and data sources.
Last updated