Technical Infrastructure

Frontend Architecture

Component Overview

Kaizen AI's frontend architecture leverages modern web technologies to deliver a responsive, real-time user experience across web and mobile platforms. The architecture emphasizes performance, accessibility, and seamless integration with the backend analytical services through WebSocket connections and RESTful APIs.

Technology Stack:

  • Framework: Next.js 14 with App Router for server-side rendering and optimization

  • Styling: TailwindCSS 3.4 for utility-first CSS with custom design system

  • UI Components: ShadCN/UI for consistent, accessible component library

  • State Management: Zustand for lightweight global state management

  • Real-time Communication: Socket.io for live updates and chat functionality

  • Charts & Visualizations: Recharts and D3.js for data visualization

Next.js Application Structure

Project Architecture

kaizen-frontend/
├── app/                          # App Router directory (Next.js 14)
│   ├── (auth)/                   # Route groups for authentication
│   │   ├── login/
│   │   └── register/
│   ├── (dashboard)/              # Protected dashboard routes
│   │   ├── analyze/
│   │   ├── portfolio/
│   │   ├── chat/
│   │   └── settings/
│   ├── api/                      # API routes
│   │   ├── auth/
│   │   ├── projects/
│   │   └── websocket/
│   ├── globals.css
│   ├── layout.tsx
│   └── page.tsx
├── components/                   # Reusable UI components
│   ├── ui/                       # ShadCN/UI components
│   ├── charts/                   # Data visualization components
│   ├── chat/                     # Chat interface components
│   └── analysis/                 # Analysis-specific components
├── lib/                          # Utility functions and configurations
│   ├── api.ts                    # API client configuration
│   ├── auth.ts                   # Authentication utilities
│   ├── utils.ts                  # Common utilities
│   └── validations.ts            # Form validation schemas
├── hooks/                        # Custom React hooks
├── store/                        # Zustand store definitions
├── types/                        # TypeScript type definitions
└── styles/                       # Additional styling files

Next.js Configuration

// next.config.js
/** @type {import('next').NextConfig} */
const nextConfig = {
  experimental: {
    appDir: true,
    serverComponentsExternalPackages: ['@solana/web3.js']
  },
  
  // Performance optimizations
  images: {
    domains: ['assets.coingecko.com', 'logos.covalenthq.com'],
    formats: ['image/webp', 'image/avif'],
  },
  
  // Security headers
  async headers() {
    return [
      {
        source: '/(.*)',
        headers: [
          {
            key: 'X-Frame-Options',
            value: 'DENY',
          },
          {
            key: 'X-Content-Type-Options',
            value: 'nosniff',
          },
          {
            key: 'Referrer-Policy',
            value: 'strict-origin-when-cross-origin',
          },
        ],
      },
    ];
  },
  
  // Environment variables
  env: {
    NEXT_PUBLIC_API_URL: process.env.NEXT_PUBLIC_API_URL,
    NEXT_PUBLIC_WS_URL: process.env.NEXT_PUBLIC_WS_URL,
    NEXT_PUBLIC_SENTRY_DSN: process.env.NEXT_PUBLIC_SENTRY_DSN,
  },
  
  // Build optimizations
  swcMinify: true,
  compiler: {
    removeConsole: process.env.NODE_ENV === 'production',
  },
  
  // Webpack configuration
  webpack: (config, { isServer }) => {
    if (!isServer) {
      config.resolve.fallback = {
        ...config.resolve.fallback,
        fs: false,
        net: false,
        tls: false,
      };
    }
    return config;
  },
};

module.exports = nextConfig;

State Management Architecture

Zustand Store Implementation

// store/analysis.ts
import { create } from 'zustand';
import { subscribeWithSelector } from 'zustand/middleware';

interface AnalysisState {
  currentAnalysis: ProjectAnalysis | null;
  analysisHistory: ProjectAnalysis[];
  isLoading: boolean;
  error: string | null;
  
  // Actions
  setCurrentAnalysis: (analysis: ProjectAnalysis) => void;
  addToHistory: (analysis: ProjectAnalysis) => void;
  setLoading: (loading: boolean) => void;
  setError: (error: string | null) => void;
  clearAnalysis: () => void;
}

export const useAnalysisStore = create<AnalysisState>()(
  subscribeWithSelector((set, get) => ({
    currentAnalysis: null,
    analysisHistory: [],
    isLoading: false,
    error: null,
    
    setCurrentAnalysis: (analysis) => {
      set({ currentAnalysis: analysis, error: null });
      
      // Add to history if not already present
      const history = get().analysisHistory;
      if (!history.find(item => item.contractAddress === analysis.contractAddress)) {
        get().addToHistory(analysis);
      }
    },
    
    addToHistory: (analysis) => {
      set((state) => ({
        analysisHistory: [analysis, ...state.analysisHistory.slice(0, 49)] // Keep last 50
      }));
    },
    
    setLoading: (loading) => set({ isLoading: loading }),
    setError: (error) => set({ error }),
    clearAnalysis: () => set({ currentAnalysis: null, error: null })
  }))
);

// store/chat.ts
interface ChatState {
  messages: ChatMessage[];
  isTyping: boolean;
  connectionStatus: 'connected' | 'disconnected' | 'connecting';
  
  // Actions
  addMessage: (message: ChatMessage) => void;
  setTyping: (typing: boolean) => void;
  setConnectionStatus: (status: 'connected' | 'disconnected' | 'connecting') => void;
  clearMessages: () => void;
}

export const useChatStore = create<ChatState>((set) => ({
  messages: [],
  isTyping: false,
  connectionStatus: 'disconnected',
  
  addMessage: (message) => set((state) => ({
    messages: [...state.messages, { ...message, id: Date.now().toString() }]
  })),
  
  setTyping: (typing) => set({ isTyping: typing }),
  setConnectionStatus: (status) => set({ connectionStatus: status }),
  clearMessages: () => set({ messages: [] })
}));

Real-Time Communication

WebSocket Integration

// lib/websocket.ts
import io, { Socket } from 'socket.io-client';
import { useChatStore } from '@/store/chat';
import { useAnalysisStore } from '@/store/analysis';

class WebSocketManager {
  private socket: Socket | null = null;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;
  
  connect(userId: string, authToken: string) {
    if (this.socket?.connected) return;
    
    this.socket = io(process.env.NEXT_PUBLIC_WS_URL!, {
      auth: {
        token: authToken,
        userId: userId
      },
      transports: ['websocket', 'polling'],
      timeout: 20000,
      retries: 3
    });
    
    this.setupEventListeners();
  }
  
  private setupEventListeners() {
    if (!this.socket) return;
    
    // Connection events
    this.socket.on('connect', () => {
      console.log('WebSocket connected');
      useChatStore.getState().setConnectionStatus('connected');
      this.reconnectAttempts = 0;
    });
    
    this.socket.on('disconnect', (reason) => {
      console.log('WebSocket disconnected:', reason);
      useChatStore.getState().setConnectionStatus('disconnected');
      
      if (reason === 'io server disconnect') {
        this.reconnect();
      }
    });
    
    // Chat events
    this.socket.on('chat_message', (message: ChatMessage) => {
      useChatStore.getState().addMessage(message);
    });
    
    this.socket.on('typing_start', () => {
      useChatStore.getState().setTyping(true);
    });
    
    this.socket.on('typing_stop', () => {
      useChatStore.getState().setTyping(false);
    });
    
    // Analysis events
    this.socket.on('analysis_update', (analysis: ProjectAnalysis) => {
      useAnalysisStore.getState().setCurrentAnalysis(analysis);
    });
    
    this.socket.on('score_update', (update: ScoreUpdate) => {
      // Handle real-time score updates
      this.handleScoreUpdate(update);
    });
    
    // Error handling
    this.socket.on('connect_error', (error) => {
      console.error('WebSocket connection error:', error);
      useChatStore.getState().setConnectionStatus('disconnected');
    });
  }
  
  sendMessage(message: string) {
    if (this.socket?.connected) {
      this.socket.emit('chat_message', { content: message, timestamp: Date.now() });
    }
  }
  
  requestAnalysis(contractAddress: string) {
    if (this.socket?.connected) {
      this.socket.emit('request_analysis', { contractAddress });
    }
  }
  
  private reconnect() {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++;
      setTimeout(() => {
        this.socket?.connect();
      }, Math.pow(2, this.reconnectAttempts) * 1000); // Exponential backoff
    }
  }
  
  private handleScoreUpdate(update: ScoreUpdate) {
    // Update current analysis if it matches
    const currentAnalysis = useAnalysisStore.getState().currentAnalysis;
    if (currentAnalysis?.contractAddress === update.contractAddress) {
      useAnalysisStore.getState().setCurrentAnalysis({
        ...currentAnalysis,
        kaizenScore: update.newScore,
        lastUpdated: update.timestamp
      });
    }
  }
  
  disconnect() {
    this.socket?.disconnect();
    this.socket = null;
  }
}

export const wsManager = new WebSocketManager();

Component Architecture

Analysis Dashboard Component

// components/analysis/AnalysisDashboard.tsx
'use client';

import React, { useEffect, useState } from 'react';
import { useAnalysisStore } from '@/store/analysis';
import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card';
import { Badge } from '@/components/ui/badge';
import { Progress } from '@/components/ui/progress';
import { AlertTriangle, CheckCircle, XCircle } from 'lucide-react';
import { ScoreChart } from './ScoreChart';
import { RiskBreakdown } from './RiskBreakdown';

interface AnalysisDashboardProps {
  contractAddress?: string;
}

export function AnalysisDashboard({ contractAddress }: AnalysisDashboardProps) {
  const { currentAnalysis, isLoading, error, setLoading } = useAnalysisStore();
  const [refreshing, setRefreshing] = useState(false);
  
  useEffect(() => {
    if (contractAddress && !currentAnalysis) {
      fetchAnalysis(contractAddress);
    }
  }, [contractAddress]);
  
  const fetchAnalysis = async (address: string) => {
    setLoading(true);
    try {
      const response = await fetch(`/api/analysis/${address}`);
      const data = await response.json();
      
      if (response.ok) {
        useAnalysisStore.getState().setCurrentAnalysis(data);
      } else {
        useAnalysisStore.getState().setError(data.error);
      }
    } catch (err) {
      useAnalysisStore.getState().setError('Failed to fetch analysis');
    } finally {
      setLoading(false);
    }
  };
  
  const getRiskLevel = (score: number) => {
    if (score >= 81) return { level: 'Low Risk', color: 'green', icon: CheckCircle };
    if (score >= 61) return { level: 'Moderate Risk', color: 'yellow', icon: AlertTriangle };
    if (score >= 31) return { level: 'High Risk', color: 'orange', icon: AlertTriangle };
    return { level: 'Very High Risk', color: 'red', icon: XCircle };
  };
  
  if (isLoading) {
    return (
      <div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-6">
        {[...Array(6)].map((_, i) => (
          <Card key={i} className="animate-pulse">
            <CardHeader>
              <div className="h-4 bg-gray-200 rounded w-3/4"></div>
            </CardHeader>
            <CardContent>
              <div className="h-8 bg-gray-200 rounded w-1/2 mb-2"></div>
              <div className="h-3 bg-gray-200 rounded w-full"></div>
            </CardContent>
          </Card>
        ))}
      </div>
    );
  }
  
  if (error) {
    return (
      <Card className="border-red-200 bg-red-50">
        <CardContent className="pt-6">
          <div className="flex items-center space-x-2 text-red-600">
            <XCircle className="h-5 w-5" />
            <span>{error}</span>
          </div>
        </CardContent>
      </Card>
    );
  }
  
  if (!currentAnalysis) {
    return (
      <Card>
        <CardContent className="pt-6 text-center text-gray-500">
          Enter a contract address to begin analysis
        </CardContent>
      </Card>
    );
  }
  
  const riskInfo = getRiskLevel(currentAnalysis.kaizenScore);
  const RiskIcon = riskInfo.icon;
  
  return (
    <div className="space-y-6">
      {/* Main Score Display */}
      <Card className="border-2">
        <CardHeader>
          <CardTitle className="flex items-center justify-between">
            <span>Kaizen Security Score</span>
            <Badge variant={riskInfo.color as any} className="text-sm">
              <RiskIcon className="w-4 h-4 mr-1" />
              {riskInfo.level}
            </Badge>
          </CardTitle>
        </CardHeader>
        <CardContent>
          <div className="flex items-center space-x-4">
            <div className="text-4xl font-bold">{currentAnalysis.kaizenScore}</div>
            <div className="flex-1">
              <Progress value={currentAnalysis.kaizenScore} className="h-3" />
              <p className="text-sm text-gray-600 mt-1">
                Out of 100 (Higher is safer)
              </p>
            </div>
          </div>
          
          <div className="mt-4 grid grid-cols-2 md:grid-cols-4 gap-4 text-sm">
            <div>
              <span className="text-gray-600">Contract Security</span>
              <div className="font-semibold">{currentAnalysis.breakdown.technical}/100</div>
            </div>
            <div>
              <span className="text-gray-600">Economic Health</span>
              <div className="font-semibold">{currentAnalysis.breakdown.economic}/100</div>
            </div>
            <div>
              <span className="text-gray-600">Social Score</span>
              <div className="font-semibold">{currentAnalysis.breakdown.social}/100</div>
            </div>
            <div>
              <span className="text-gray-600">Governance</span>
              <div className="font-semibold">{currentAnalysis.breakdown.governance}/100</div>
            </div>
          </div>
        </CardContent>
      </Card>
      
      {/* Risk Breakdown */}
      <div className="grid grid-cols-1 lg:grid-cols-2 gap-6">
        <RiskBreakdown analysis={currentAnalysis} />
        <ScoreChart analysis={currentAnalysis} />
      </div>
      
      {/* Additional Analysis Cards */}
      <div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-6">
        <Card>
          <CardHeader>
            <CardTitle className="text-lg">Liquidity Analysis</CardTitle>
          </CardHeader>
          <CardContent>
            <div className="space-y-3">
              <div className="flex justify-between">
                <span>LP Locked</span>
                <span className="font-semibold">
                  {currentAnalysis.liquidity.locked ? '✅ Yes' : '❌ No'}
                </span>
              </div>
              <div className="flex justify-between">
                <span>Lock Duration</span>
                <span className="font-semibold">
                  {currentAnalysis.liquidity.lockDuration || 'N/A'}
                </span>
              </div>
              <div className="flex justify-between">
                <span>Total Liquidity</span>
                <span className="font-semibold">
                  ${currentAnalysis.liquidity.totalUSD?.toLocaleString() || 'N/A'}
                </span>
              </div>
            </div>
          </CardContent>
        </Card>
        
        <Card>
          <CardHeader>
            <CardTitle className="text-lg">Token Distribution</CardTitle>
          </CardHeader>
          <CardContent>
            <div className="space-y-3">
              <div className="flex justify-between">
                <span>Total Holders</span>
                <span className="font-semibold">
                  {currentAnalysis.distribution.holderCount?.toLocaleString() || 'N/A'}
                </span>
              </div>
              <div className="flex justify-between">
                <span>Top 10 Holdings</span>
                <span className="font-semibold">
                  {currentAnalysis.distribution.top10Percentage}%
                </span>
              </div>
              <Progress 
                value={100 - currentAnalysis.distribution.top10Percentage} 
                className="h-2"
              />
              <p className="text-xs text-gray-600">
                Lower concentration = better distribution
              </p>
            </div>
          </CardContent>
        </Card>
        
        <Card>
          <CardHeader>
            <CardTitle className="text-lg">Social Sentiment</CardTitle>
          </CardHeader>
          <CardContent>
            <div className="space-y-3">
              <div className="flex justify-between">
                <span>Overall Sentiment</span>
                <Badge variant={
                  currentAnalysis.social.sentiment > 0.6 ? 'default' : 
                  currentAnalysis.social.sentiment > 0.4 ? 'secondary' : 'destructive'
                }>
                  {currentAnalysis.social.sentiment > 0.6 ? 'Positive' :
                   currentAnalysis.social.sentiment > 0.4 ? 'Neutral' : 'Negative'}
                </Badge>
              </div>
              <div className="flex justify-between">
                <span>Mentions (24h)</span>
                <span className="font-semibold">
                  {currentAnalysis.social.mentions24h || 0}
                </span>
              </div>
              <div className="flex justify-between">
                <span>Manipulation Risk</span>
                <Badge variant={
                  currentAnalysis.social.manipulationRisk < 0.3 ? 'default' :
                  currentAnalysis.social.manipulationRisk < 0.6 ? 'secondary' : 'destructive'
                }>
                  {currentAnalysis.social.manipulationRisk < 0.3 ? 'Low' :
                   currentAnalysis.social.manipulationRisk < 0.6 ? 'Medium' : 'High'}
                </Badge>
              </div>
            </div>
          </CardContent>
        </Card>
      </div>
    </div>
  );
}

Backend Services

Component Overview

The backend infrastructure of Kaizen AI is built on a microservices architecture using Node.js and Fastify, designed for high performance, scalability, and maintainability. The services are containerized using Docker and orchestrated through Google Kubernetes Engine (GKE) for optimal resource management and horizontal scaling.

Technology Stack:

  • Runtime: Node.js 20 LTS with TypeScript for type safety

  • Framework: Fastify 4.x for high-performance HTTP server

  • Database: PostgreSQL 15 for relational data, TimescaleDB for time-series

  • Cache: Redis 7 for session management and high-speed data access

  • Message Queue: BullMQ with Redis for job processing

  • Container: Docker with multi-stage builds for optimization

  • Orchestration: Google Kubernetes Engine (GKE) with Istio service mesh

Microservices Architecture

Service Decomposition

Backend Services Architecture:

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   API Gateway   │    │  Auth Service   │    │  User Service   │
│                 │    │                 │    │                 │
│ • Rate Limiting │    │ • JWT Tokens    │    │ • Profile Mgmt  │
│ • Load Balance  │    │ • OAuth         │    │ • Preferences   │
│ • SSL Term      │    │ • Session Mgmt  │    │ • Notifications │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         └───────────────────────┼───────────────────────┘

    ┌────────────────────────────┼────────────────────────────┐
    │                           │                             │
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│ Analysis Service│    │  Data Service   │    │ Social Service  │
│                 │    │                 │    │                 │
│ • Score Calc    │    │ • Blockchain    │    │ • Twitter API   │
│ • Risk Assess   │    │ • Contract Data │    │ • Telegram Bot  │
│ • ML Models     │    │ • Token Info    │    │ • Sentiment     │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         └───────────────────────┼───────────────────────┘

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│ Intel Service   │    │  Chat Service   │    │ Notification    │
│                 │    │                 │    │ Service         │
│ • Arkham API    │    │ • WebSocket     │    │                 │
│ • Fund Tracking │    │ • LLM Router    │    │ • Email         │
│ • Entity Res    │    │ • Context Mgmt  │    │ • Push Notifs   │
└─────────────────┘    └─────────────────┘    └─────────────────┘

Fastify Application Setup

// src/app.ts
import Fastify, { FastifyInstance } from 'fastify';
import { TypeBoxTypeProvider } from '@fastify/type-provider-typebox';
import cors from '@fastify/cors';
import helmet from '@fastify/helmet';
import rateLimit from '@fastify/rate-limit';
import jwt from '@fastify/jwt';
import websocket from '@fastify/websocket';

export async function buildApp(): Promise<FastifyInstance> {
  const app = Fastify({
    logger: {
      level: process.env.LOG_LEVEL || 'info',
      serializers: {
        req: (req) => ({
          method: req.method,
          url: req.url,
          headers: req.headers,
          hostname: req.hostname,
          remoteAddress: req.ip,
        }),
        res: (res) => ({
          statusCode: res.statusCode,
        }),
      },
    },
    requestTimeout: 30000,
    bodyLimit: 1048576, // 1MB
  }).withTypeProvider<TypeBoxTypeProvider>();

  // Security plugins
  await app.register(helmet, {
    contentSecurityPolicy: {
      directives: {
        defaultSrc: ["'self'"],
        scriptSrc: ["'self'", "'unsafe-inline'"],
        styleSrc: ["'self'", "'unsafe-inline'"],
        imgSrc: ["'self'", "data:", "https:"],
      },
    },
  });

  // CORS configuration
  await app.register(cors, {
    origin: (origin, callback) => {
      const allowedOrigins = process.env.ALLOWED_ORIGINS?.split(',') || [];
      if (!origin || allowedOrigins.includes(origin)) {
        callback(null, true);
      } else {
        callback(new Error('Not allowed by CORS'), false);
      }
    },
    credentials: true,
  });

  // Rate limiting
  await app.register(rateLimit, {
    max: 100,
    timeWindow: '1 minute',
    keyGenerator: (request) => {
      return request.ip;
    },
    errorResponseBuilder: (request, context) => {
      return {
        code: 429,
        error: 'Too Many Requests',
        message: `Rate limit exceeded, retry in ${context.ttl}. Max: ${context.max}.`,
      };
    },
  });

  // JWT authentication
  await app.register(jwt, {
    secret: process.env.JWT_SECRET || 'your-secret-key',
    sign: {
      expiresIn: '24h',
    },
  });

  // WebSocket support
  await app.register(websocket);

  // Database plugin
  await app.register(require('./plugins/database'));
  await app.register(require('./plugins/redis'));
  await app.register(require('./plugins/queue'));

  // Routes
  await app.register(require('./routes/health'));
  await app.register(require('./routes/auth'), { prefix: '/api/auth' });
  await app.register(require('./routes/analysis'), { prefix: '/api/analysis' });
  await app.register(require('./routes/projects'), { prefix: '/api/projects' });
  await app.register(require('./routes/chat'), { prefix: '/api/chat' });
  await app.register(require('./routes/websocket'), { prefix: '/ws' });

  // Error handler
  app.setErrorHandler(async (error, request, reply) => {
    app.log.error(error);
    
    if (error.statusCode === 429) {
      return reply.status(429).send({
        error: 'Too Many Requests',
        message: 'Rate limit exceeded',
      });
    }
    
    if (error.validation) {
      return reply.status(400).send({
        error: 'Validation Error',
        message: error.message,
        details: error.validation,
      });
    }

    return reply.status(error.statusCode || 500).send({
      error: error.name || 'Internal Server Error',
      message: error.message,
    });
  });

  return app;
}

Analysis Service Implementation

Core Analysis Service

// services/analysis/src/analysisService.ts
import { FastifyInstance } from 'fastify';
import { Type } from '@sinclair/typebox';
import { ScoringEngine } from './scoring/scoringEngine';
import { DataCollector } from './data/dataCollector';
import { RiskAssessment } from './risk/riskAssessment';

const AnalysisRequestSchema = Type.Object({
  contractAddress: Type.String({ format: 'address' }),
  network: Type.Union([Type.Literal('ethereum'), Type.Literal('solana')]),
  depth: Type.Optional(Type.Union([
    Type.Literal('quick'),
    Type.Literal('standard'),
    Type.Literal('comprehensive')
  ]))
});

const AnalysisResponseSchema = Type.Object({
  contractAddress: Type.String(),
  network: Type.String(),
  kaizenScore: Type.Number({ minimum: 0, maximum: 100 }),
  riskLevel: Type.String(),
  confidence: Type.Number({ minimum: 0, maximum: 1 }),
  breakdown: Type.Object({
    technical: Type.Number(),
    economic: Type.Number(),
    social: Type.Number(),
    governance: Type.Number()
  }),
  alerts: Type.Array(Type.Object({
    level: Type.String(),
    message: Type.String(),
    category: Type.String()
  })),
  metadata: Type.Object({
    analyzedAt: Type.String({ format: 'date-time' }),
    processingTime: Type.Number(),
    dataQuality: Type.Number()
  })
});

export class AnalysisService {
  private scoringEngine: ScoringEngine;
  private dataCollector: DataCollector;
  private riskAssessment: RiskAssessment;

  constructor(
    private app: FastifyInstance,
    private database: any,
    private redis: any,
    private queue: any
  ) {
    this.scoringEngine = new ScoringEngine(database, redis);
    this.dataCollector = new DataCollector(database, redis);
    this.riskAssessment = new RiskAssessment(database);
    
    this.registerRoutes();
  }

  private registerRoutes() {
    // Analyze project endpoint
    this.app.post('/analyze', {
      schema: {
        body: AnalysisRequestSchema,
        response: {
          200: AnalysisResponseSchema
        }
      },
      preHandler: [this.app.authenticate],
      handler: this.analyzeProject.bind(this)
    });

    // Get cached analysis
    this.app.get('/cached/:address', {
      schema: {
        params: Type.Object({
          address: Type.String({ format: 'address' })
        }),
        response: {
          200: AnalysisResponseSchema
        }
      },
      handler: this.getCachedAnalysis.bind(this)
    });

    // Batch analysis endpoint
    this.app.post('/batch', {
      schema: {
        body: Type.Object({
          contracts: Type.Array(AnalysisRequestSchema, { maxItems: 10 })
        })
      },
      preHandler: [this.app.authenticate],
      handler: this.batchAnalysis.bind(this)
    });
  }

  async analyzeProject(request: any, reply: any) {
    const startTime = Date.now();
    const { contractAddress, network, depth = 'standard' } = request.body;
    const userId = request.user.id;

    try {
      // Check cache first
      const cacheKey = `analysis:${network}:${contractAddress}`;
      const cached = await this.redis.get(cacheKey);
      
      if (cached && depth !== 'comprehensive') {
        const cachedAnalysis = JSON.parse(cached);
        
        // Check if cache is still valid (15 minutes for standard, 5 minutes for quick)
        const cacheAge = Date.now() - new Date(cachedAnalysis.metadata.analyzedAt).getTime();
        const maxAge = depth === 'quick' ? 5 * 60 * 1000 : 15 * 60 * 1000;
        
        if (cacheAge < maxAge) {
          return reply.send(cachedAnalysis);
        }
      }

      // Collect project data
      const projectData = await this.dataCollector.collectProjectData(
        contractAddress,
        network,
        depth
      );

      if (!projectData) {
        return reply.status(404).send({
          error: 'Project Not Found',
          message: 'Could not find project data for the specified contract address'
        });
      }

      // Calculate Kaizen score
      const scoringResult = await this.scoringEngine.calculateScore(projectData);

      // Perform risk assessment
      const riskAssessment = await this.riskAssessment.assessRisks(projectData);

      // Combine results
      const analysis = {
        contractAddress: projectData.contractAddress,
        network: projectData.network,
        kaizenScore: scoringResult.score,
        riskLevel: this.determineRiskLevel(scoringResult.score),
        confidence: scoringResult.confidence,
        breakdown: scoringResult.breakdown,
        alerts: riskAssessment.alerts,
        metadata: {
          analyzedAt: new Date().toISOString(),
          processingTime: Date.now() - startTime,
          dataQuality: projectData.qualityScore
        }
      };

      // Cache the result
      await this.redis.setex(cacheKey, 900, JSON.stringify(analysis)); // 15 minutes

      // Store in database for historical tracking
      await this.database.query(`
        INSERT INTO analysis_history (
          user_id, contract_address, network, kaizen_score, 
          analysis_data, created_at
        ) VALUES ($1, $2, $3, $4, $5, NOW())
      `, [userId, contractAddress, network, analysis.kaizenScore, analysis]);

      // Queue follow-up analysis if needed
      if (depth === 'comprehensive') {
        await this.queue.add('comprehensive-analysis', {
          contractAddress,
          network,
          userId,
          analysisId: analysis.metadata.analyzedAt
        });
      }

      return reply.send(analysis);

    } catch (error) {
      this.app.log.error('Analysis error:', error);
      
      return reply.status(500).send({
        error: 'Analysis Failed',
        message: 'An error occurred during project analysis'
      });
    }
  }

  async getCachedAnalysis(request: any, reply: any) {
    const { address } = request.params;
    
    try {
      // Try Ethereum first, then Solana
      for (const network of ['ethereum', 'solana']) {
        const cacheKey = `analysis:${network}:${address}`;
        const cached = await this.redis.get(cacheKey);
        
        if (cached) {
          return reply.send(JSON.parse(cached));
        }
      }
      
      return reply.status(404).send({
        error: 'Not Found',
        message: 'No cached analysis found for this address'
      });
      
    } catch (error) {
      this.app.log.error('Cache retrieval error:', error);
      return reply.status(500).send({
        error: 'Cache Error',
        message: 'Failed to retrieve cached analysis'
      });
    }
  }

  async batchAnalysis(request: any, reply: any) {
    const { contracts } = request.body;
    const userId = request.user.id;

    try {
      const analysisPromises = contracts.map(async (contract: any) => {
        try {
          // Add to queue for processing
          const job = await this.queue.add('batch-analysis', {
            ...contract,
            userId
          });

          return {
            contractAddress: contract.contractAddress,
            network: contract.network,
            status: 'queued',
            jobId: job.id
          };
        } catch (error) {
          return {
            contractAddress: contract.contractAddress,
            network: contract.network,
            status: 'error',
            error: error.message
          };
        }
      });

      const results = await Promise.all(analysisPromises);

      return reply.send({
        message: 'Batch analysis queued',
        results
      });

    } catch (error) {
      this.app.log.error('Batch analysis error:', error);
      return reply.status(500).send({
        error: 'Batch Analysis Failed',
        message: 'Failed to queue batch analysis'
      });
    }
  }

  private determineRiskLevel(score: number): string {
    if (score >= 81) return 'Low Risk';
    if (score >= 61) return 'Moderate Risk';
    if (score >= 31) return 'High Risk';
    return 'Very High Risk';
  }
}

Database Plugin

PostgreSQL Integration

// plugins/database.ts
import fp from 'fastify-plugin';
import { Pool, PoolClient } from 'pg';
import { FastifyInstance } from 'fastify';

interface DatabaseOptions {
  connectionString?: string;
  host?: string;
  port?: number;
  user?: string;
  password?: string;
  database?: string;
  ssl?: boolean;
  max?: number;
  idleTimeoutMillis?: number;
  connectionTimeoutMillis?: number;
}

declare module 'fastify' {
  interface FastifyInstance {
    db: {
      pool: Pool;
      query: (text: string, params?: any[]) => Promise<any>;
      getClient: () => Promise<PoolClient>;
      transaction: <T>(callback: (client: PoolClient) => Promise<T>) => Promise<T>;
    };
  }
}

async function databasePlugin(
  fastify: FastifyInstance,
  options: DatabaseOptions
) {
  const pool = new Pool({
    connectionString: options.connectionString || process.env.DATABASE_URL,
    host: options.host || process.env.DB_HOST,
    port: options.port || parseInt(process.env.DB_PORT || '5432'),
    user: options.user || process.env.DB_USER,
    password: options.password || process.env.DB_PASSWORD,
    database: options.database || process.env.DB_NAME,
    ssl: options.ssl !== undefined ? options.ssl : process.env.NODE_ENV === 'production',
    max: options.max || 20,
    idleTimeoutMillis: options.idleTimeoutMillis || 30000,
    connectionTimeoutMillis: options.connectionTimeoutMillis || 2000,
  });

  // Test connection
  try {
    const client = await pool.connect();
    fastify.log.info('Database connected successfully');
    client.release();
  } catch (error) {
    fastify.log.error('Database connection failed:', error);
    throw error;
  }

  const db = {
    pool,
    
    async query(text: string, params?: any[]) {
      const start = Date.now();
      try {
        const result = await pool.query(text, params);
        const duration = Date.now() - start;
        
        fastify.log.debug({
          query: text,
          duration,
          rows: result.rowCount
        }, 'Database query executed');
        
        return result;
      } catch (error) {
        fastify.log.error({
          query: text,
          error: error.message
        }, 'Database query failed');
        throw error;
      }
    },
    
    async getClient() {
      return await pool.connect();
    },
    
    async transaction<T>(callback: (client: PoolClient) => Promise<T>): Promise<T> {
      const client = await pool.connect();
      
      try {
        await client.query('BEGIN');
        const result = await callback(client);
        await client.query('COMMIT');
        return result;
      } catch (error) {
        await client.query('ROLLBACK');
        throw error;
      } finally {
        client.release();
      }
    }
  };

  fastify.decorate('db', db);

  fastify.addHook('onClose', async () => {
    await pool.end();
    fastify.log.info('Database pool closed');
  });
}

export default fp(databasePlugin, {
  name: 'database',
  fastify: '4.x'
});

Database Design

Schema Architecture

Kaizen AI utilizes a hybrid database architecture combining PostgreSQL for relational data, TimescaleDB for time-series blockchain data, and Redis for high-performance caching and session management.

Database Structure Overview:

-- Core database schema for Kaizen AI

-- Users and Authentication
CREATE TABLE users (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    email VARCHAR(255) UNIQUE NOT NULL,
    username VARCHAR(50) UNIQUE,
    password_hash VARCHAR(255) NOT NULL,
    profile JSONB DEFAULT '{}',
    preferences JSONB DEFAULT '{}',
    subscription_tier VARCHAR(20) DEFAULT 'free',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    last_login TIMESTAMP WITH TIME ZONE,
    is_verified BOOLEAN DEFAULT FALSE,
    is_active BOOLEAN DEFAULT TRUE
);

CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_username ON users(username);
CREATE INDEX idx_users_subscription ON users(subscription_tier);

-- Project tracking
CREATE TABLE projects (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    contract_address VARCHAR(42) NOT NULL,
    network VARCHAR(20) NOT NULL,
    token_symbol VARCHAR(20),
    token_name VARCHAR(100),
    project_metadata JSONB DEFAULT '{}',
    first_seen TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    last_updated TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    is_active BOOLEAN DEFAULT TRUE,
    
    UNIQUE(contract_address, network)
);

CREATE INDEX idx_projects_contract_network ON projects(contract_address, network);
CREATE INDEX idx_projects_symbol ON projects(token_symbol);
CREATE INDEX idx_projects_network ON projects(network);

-- Analysis results storage
CREATE TABLE analysis_results (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    project_id UUID REFERENCES projects(id) ON DELETE CASCADE,
    user_id UUID REFERENCES users(id) ON DELETE SET NULL,
    kaizen_score INTEGER NOT NULL CHECK (kaizen_score >= 0 AND kaizen_score <= 100),
    risk_level VARCHAR(20) NOT NULL,
    confidence DECIMAL(3,2) NOT NULL CHECK (confidence >= 0 AND confidence <= 1),
    
    -- Score breakdown
    technical_score INTEGER NOT NULL,
    economic_score INTEGER NOT NULL,
    social_score INTEGER NOT NULL,
    governance_score INTEGER NOT NULL,
    
    -- Analysis metadata
    analysis_depth VARCHAR(20) DEFAULT 'standard',
    processing_time_ms INTEGER,
    data_quality_score DECIMAL(3,2),
    
    -- Full analysis data
    full_analysis JSONB NOT NULL,
    alerts JSONB DEFAULT '[]',
    
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

CREATE INDEX idx_analysis_project ON analysis_results(project_id);
CREATE INDEX idx_analysis_user ON analysis_results(user_id);
CREATE INDEX idx_analysis_score ON analysis_results(kaizen_score);
CREATE INDEX idx_analysis_created ON analysis_results(created_at);
CREATE INDEX idx_analysis_risk_level ON analysis_results(risk_level);

-- Time-series tables for blockchain data (TimescaleDB)
CREATE TABLE blockchain_events (
    time TIMESTAMP WITH TIME ZONE NOT NULL,
    network VARCHAR(20) NOT NULL,
    contract_address VARCHAR(42) NOT NULL,
    event_type VARCHAR(50) NOT NULL,
    block_number BIGINT NOT NULL,
    transaction_hash VARCHAR(66) NOT NULL,
    event_data JSONB NOT NULL,
    gas_used INTEGER,
    gas_price BIGINT
);

-- Convert to hypertable for time-series optimization
SELECT create_hypertable('blockchain_events', 'time');

CREATE INDEX idx_blockchain_events_contract ON blockchain_events(contract_address, time DESC);
CREATE INDEX idx_blockchain_events_type ON blockchain_events(event_type, time DESC);
CREATE INDEX idx_blockchain_events_network ON blockchain_events(network, time DESC);

-- Token price and market data
CREATE TABLE token_prices (
    time TIMESTAMP WITH TIME ZONE NOT NULL,
    contract_address VARCHAR(42) NOT NULL,
    network VARCHAR(20) NOT NULL,
    price_usd DECIMAL(20,8),
    volume_24h DECIMAL(20,2),
    market_cap DECIMAL(20,2),
    liquidity_usd DECIMAL(20,2),
    holder_count INTEGER,
    data_source VARCHAR(50) NOT NULL
);

SELECT create_hypertable('token_prices', 'time');

CREATE INDEX idx_token_prices_contract ON token_prices(contract_address, time DESC);
CREATE INDEX idx_token_prices_network ON token_prices(network, time DESC);

-- Social media data
CREATE TABLE social_mentions (
    time TIMESTAMP WITH TIME ZONE NOT NULL,
    contract_address VARCHAR(42) NOT NULL,
    platform VARCHAR(20) NOT NULL,
    post_id VARCHAR(255) NOT NULL,
    author_id VARCHAR(255),
    content TEXT,
    sentiment_score DECIMAL(3,2),
    engagement_metrics JSONB,
    is_suspicious BOOLEAN DEFAULT FALSE,
    
    UNIQUE(platform, post_id)
);

SELECT create_hypertable('social_mentions', 'time');

CREATE INDEX idx_social_contract ON social_mentions(contract_address, time DESC);
CREATE INDEX idx_social_platform ON social_mentions(platform, time DESC);
CREATE INDEX idx_social_sentiment ON social_mentions(sentiment_score);

-- Intelligence and attribution data
CREATE TABLE wallet_intelligence (
    wallet_address VARCHAR(42) PRIMARY KEY,
    entity_name VARCHAR(255),
    entity_type VARCHAR(50),
    risk_score INTEGER CHECK (risk_score >= 0 AND risk_score <= 100),
    labels TEXT[],
    intelligence_sources JSONB,
    confidence DECIMAL(3,2),
    first_seen TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    last_updated TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

CREATE INDEX idx_wallet_intel_type ON wallet_intelligence(entity_type);
CREATE INDEX idx_wallet_intel_risk ON wallet_intelligence(risk_score);
CREATE INDEX idx_wallet_intel_labels ON wallet_intelligence USING GIN(labels);

-- User watchlists
CREATE TABLE user_watchlists (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID REFERENCES users(id) ON DELETE CASCADE,
    name VARCHAR(100) NOT NULL,
    description TEXT,
    is_public BOOLEAN DEFAULT FALSE,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

CREATE INDEX idx_watchlists_user ON user_watchlists(user_id);

-- Watchlist items
CREATE TABLE watchlist_items (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    watchlist_id UUID REFERENCES user_watchlists(id) ON DELETE CASCADE,
    project_id UUID REFERENCES projects(id) ON DELETE CASCADE,
    alert_threshold INTEGER,
    notes TEXT,
    added_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    
    UNIQUE(watchlist_id, project_id)
);

CREATE INDEX idx_watchlist_items_list ON watchlist_items(watchlist_id);
CREATE INDEX idx_watchlist_items_project ON watchlist_items(project_id);

-- User activity logs
CREATE TABLE user_activity (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id UUID REFERENCES users(id) ON DELETE CASCADE,
    activity_type VARCHAR(50) NOT NULL,
    activity_data JSONB,
    ip_address INET,
    user_agent TEXT,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

CREATE INDEX idx_activity_user ON user_activity(user_id, created_at DESC);
CREATE INDEX idx_activity_type ON user_activity(activity_type, created_at DESC);

-- API usage tracking
CREATE TABLE api_usage (
    time TIMESTAMP WITH TIME ZONE NOT NULL,
    user_id UUID REFERENCES users(id) ON DELETE CASCADE,
    endpoint VARCHAR(255) NOT NULL,
    method VARCHAR(10) NOT NULL,
    status_code INTEGER NOT NULL,
    response_time_ms INTEGER,
    request_size INTEGER,
    response_size INTEGER
);

SELECT create_hypertable('api_usage', 'time');

CREATE INDEX idx_api_usage_user ON api_usage(user_id, time DESC);
CREATE INDEX idx_api_usage_endpoint ON api_usage(endpoint, time DESC);

Performance Optimization

Database Optimization Strategies

-- Performance optimization views and functions

-- Materialized view for project statistics
CREATE MATERIALIZED VIEW project_stats AS
SELECT 
    p.id,
    p.contract_address,
    p.network,
    p.token_symbol,
    COUNT(ar.id) as analysis_count,
    AVG(ar.kaizen_score) as avg_score,
    MAX(ar.created_at) as last_analysis,
    COUNT(DISTINCT ar.user_id) as unique_analyzers
FROM projects p
LEFT JOIN analysis_results ar ON p.id = ar.project_id
WHERE p.is_active = TRUE
GROUP BY p.id, p.contract_address, p.network, p.token_symbol;

CREATE UNIQUE INDEX idx_project_stats_id ON project_stats(id);
CREATE INDEX idx_project_stats_score ON project_stats(avg_score);

-- Function to refresh project stats
CREATE OR REPLACE FUNCTION refresh_project_stats()
RETURNS VOID AS $$
BEGIN
    REFRESH MATERIALIZED VIEW CONCURRENTLY project_stats;
END;
$$ LANGUAGE plpgsql;

-- Automated refresh trigger
CREATE OR REPLACE FUNCTION trigger_refresh_project_stats()
RETURNS TRIGGER AS $$
BEGIN
    -- Refresh stats after significant changes
    PERFORM pg_notify('refresh_stats', 'project_stats');
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER analysis_results_refresh_stats
AFTER INSERT OR UPDATE OR DELETE ON analysis_results
FOR EACH STATEMENT
EXECUTE FUNCTION trigger_refresh_project_stats();

-- Partitioning strategy for large tables
-- Partition blockchain_events by month
CREATE TABLE blockchain_events_template (
    LIKE blockchain_events INCLUDING ALL
);

-- Function to create monthly partitions
CREATE OR REPLACE FUNCTION create_monthly_partition(table_name TEXT, start_date DATE)
RETURNS VOID AS $$
DECLARE
    partition_name TEXT;
    end_date DATE;
BEGIN
    partition_name := table_name || '_' || to_char(start_date, 'YYYY_MM');
    end_date := start_date + INTERVAL '1 month';
    
    EXECUTE format('CREATE TABLE IF NOT EXISTS %I PARTITION OF %I
                   FOR VALUES FROM (%L) TO (%L)',
                   partition_name, table_name, start_date, end_date);
END;
$$ LANGUAGE plpgsql;

-- Performance monitoring queries
CREATE OR REPLACE VIEW slow_queries AS
SELECT 
    query,
    calls,
    total_time,
    mean_time,
    rows,
    100.0 * shared_blks_hit / nullif(shared_blks_hit + shared_blks_read, 0) AS hit_percent
FROM pg_stat_statements
WHERE mean_time > 100  -- Queries taking more than 100ms on average
ORDER BY mean_time DESC;

-- Index usage monitoring
CREATE OR REPLACE VIEW unused_indexes AS
SELECT 
    schemaname,
    tablename,
    indexname,
    idx_scan,
    pg_size_pretty(pg_relation_size(i.indexrelid)) AS index_size
FROM pg_stat_user_indexes ui
JOIN pg_index i ON ui.indexrelid = i.indexrelid
WHERE idx_scan = 0
AND NOT i.indisunique
AND NOT i.indisprimary
ORDER BY pg_relation_size(i.indexrelid) DESC;

Message Queue System

BullMQ Implementation

Queue Architecture and Configuration

// queue/queueManager.ts
import { Queue, Worker, QueueScheduler } from 'bullmq';
import IORedis from 'ioredis';
import { FastifyInstance } from 'fastify';

interface QueueConfig {
  redis: {
    host: string;
    port: number;
    password?: string;
    db: number;
  };
  defaultJobOptions: {
    removeOnComplete: number;
    removeOnFail: number;
    attempts: number;
    backoff: {
      type: string;
      delay: number;
    };
  };
}

export class QueueManager {
  private redis: IORedis;
  private queues: Map<string, Queue> = new Map();
  private workers: Map<string, Worker> = new Map();
  private schedulers: Map<string, QueueScheduler> = new Map();

  constructor(
    private config: QueueConfig,
    private app: FastifyInstance
  ) {
    this.redis = new IORedis({
      host: config.redis.host,
      port: config.redis.port,
      password: config.redis.password,
      db: config.redis.db,
      maxRetriesPerRequest: 3,
      retryDelayOnFailover: 100,
      enableReadyCheck: false,
      lazyConnect: true,
    });

    this.initializeQueues();
  }

  private initializeQueues() {
    const queueDefinitions = [
      {
        name: 'analysis',
        processor: this.processAnalysisJob.bind(this),
        concurrency: 5,
        priority: 1
      },
      {
        name: 'data-collection',
        processor: this.processDataCollectionJob.bind(this),
        concurrency: 10,
        priority: 2
      },
      {
        name: 'social-intelligence',
        processor: this.processSocialIntelligenceJob.bind(this),
        concurrency: 3,
        priority: 3
      },
      {
        name: 'scoring',
        processor: this.processScoringJob.bind(this),
        concurrency: 8,
        priority: 1
      },
      {
        name: 'notifications',
        processor: this.processNotificationJob.bind(this),
        concurrency: 15,
        priority: 4
      },
      {
        name: 'cleanup',
        processor: this.processCleanupJob.bind(this),
        concurrency: 2,
        priority: 5
      }
    ];

    queueDefinitions.forEach(({ name, processor, concurrency, priority }) => {
      // Create queue
      const queue = new Queue(name, {
        connection: this.redis,
        defaultJobOptions: {
          ...this.config.defaultJobOptions,
          priority: priority
        }
      });

      // Create scheduler
      const scheduler = new QueueScheduler(name, {
        connection: this.redis
      });

      // Create worker
      const worker = new Worker(name, processor, {
        connection: this.redis,
        concurrency: concurrency,
        limiter: {
          max: concurrency * 2,
          duration: 1000
        }
      });

      // Set up event listeners
      this.setupEventListeners(worker, name);

      this.queues.set(name, queue);
      this.workers.set(name, worker);
      this.schedulers.set(name, scheduler);
    });

    this.app.log.info(`Initialized ${this.queues.size} queues`);
  }

  private setupEventListeners(worker: Worker, queueName: string) {
    worker.on('completed', (job) => {
      this.app.log.info({
        queue: queueName,
        jobId: job.id,
        duration: Date.now() - job.timestamp
      }, 'Job completed');
    });

    worker.on('failed', (job, err) => {
      this.app.log.error({
        queue: queueName,
        jobId: job?.id,
        error: err.message,
        attempts: job?.attemptsMade
      }, 'Job failed');
    });

    worker.on('error', (err) => {
      this.app.log.error({
        queue: queueName,
        error: err.message
      }, 'Worker error');
    });

    worker.on('stalled', (jobId) => {
      this.app.log.warn({
        queue: queueName,
        jobId
      }, 'Job stalled');
    });
  }

  // Job processors
  private async processAnalysisJob(job: any) {
    const { contractAddress, network, userId, analysisType } = job.data;
    
    try {
      this.app.log.info({
        jobId: job.id,
        contractAddress,
        network,
        analysisType
      }, 'Processing analysis job');

      // Import analysis service dynamically to avoid circular dependencies
      const { AnalysisService } = await import('../services/analysisService');
      const analysisService = new AnalysisService(this.app);

      const result = await analysisService.performAnalysis({
        contractAddress,
        network,
        depth: analysisType,
        userId
      });

      // Store result and notify user
      await this.addJob('notifications', {
        userId,
        type: 'analysis_complete',
        data: {
          contractAddress,
          network,
          score: result.kaizenScore,
          riskLevel: result.riskLevel
        }
      });

      return result;

    } catch (error) {
      this.app.log.error({
        jobId: job.id,
        error: error.message,
        contractAddress,
        network
      }, 'Analysis job failed');
      
      throw error;
    }
  }

  private async processDataCollectionJob(job: any) {
    const { network, contractAddress, eventType } = job.data;
    
    try {
      const { DataCollectionService } = await import('../services/dataCollectionService');
      const dataService = new DataCollectionService(this.app);

      const data = await dataService.collectData({
        network,
        contractAddress,
        eventType
      });

      // Trigger dependent jobs if needed
      if (data.significantChange) {
        await this.addJob('scoring', {
          contractAddress,
          network,
          triggerReason: 'data_update'
        });
      }

      return data;

    } catch (error) {
      this.app.log.error({
        jobId: job.id,
        error: error.message
      }, 'Data collection job failed');
      
      throw error;
    }
  }

  private async processSocialIntelligenceJob(job: any) {
    const { contractAddress, platform, timeframe } = job.data;
    
    try {
      const { SocialIntelligenceService } = await import('../services/socialIntelligenceService');
      const socialService = new SocialIntelligenceService(this.app);

      const analysis = await socialService.analyzeSocialData({
        contractAddress,
        platform,
        timeframe
      });

      return analysis;

    } catch (error) {
      this.app.log.error({
        jobId: job.id,
        error: error.message
      }, 'Social intelligence job failed');
      
      throw error;
    }
  }

  private async processScoringJob(job: any) {
    const { contractAddress, network, triggerReason } = job.data;
    
    try {
      const { ScoringService } = await import('../services/scoringService');
      const scoringService = new ScoringService(this.app);

      const newScore = await scoringService.calculateScore({
        contractAddress,
        network
      });

      // Check for significant score changes
      const previousScore = await this.getPreviousScore(contractAddress, network);
      
      if (Math.abs(newScore.score - previousScore) > 10) {
        await this.addJob('notifications', {
          type: 'score_change',
          data: {
            contractAddress,
            network,
            oldScore: previousScore,
            newScore: newScore.score,
            triggerReason
          }
        });
      }

      return newScore;

    } catch (error) {
      this.app.log.error({
        jobId: job.id,
        error: error.message
      }, 'Scoring job failed');
      
      throw error;
    }
  }

  private async processNotificationJob(job: any) {
    const { userId, type, data } = job.data;
    
    try {
      const { NotificationService } = await import('../services/notificationService');
      const notificationService = new NotificationService(this.app);

      await notificationService.sendNotification({
        userId,
        type,
        data
      });

      return { sent: true, timestamp: Date.now() };

    } catch (error) {
      this.app.log.error({
        jobId: job.id,
        error: error.message
      }, 'Notification job failed');
      
      throw error;
    }
  }

  private async processCleanupJob(job: any) {
    const { type, olderThan } = job.data;
    
    try {
      switch (type) {
        case 'old_analysis':
          await this.cleanupOldAnalysis(olderThan);
          break;
        case 'temp_files':
          await this.cleanupTempFiles(olderThan);
          break;
        case 'expired_cache':
          await this.cleanupExpiredCache();
          break;
        default:
          throw new Error(`Unknown cleanup type: ${type}`);
      }

      return { cleaned: true, type, timestamp: Date.now() };

    } catch (error) {
      this.app.log.error({
        jobId: job.id,
        error: error.message
      }, 'Cleanup job failed');
      
      throw error;
    }
  }

  // Public methods
  async addJob(queueName: string, data: any, options?: any) {
    const queue = this.queues.get(queueName);
    if (!queue) {
      throw new Error(`Queue ${queueName} not found`);
    }

    return await queue.add(queueName, data, options);
  }

  async addBulkJobs(queueName: string, jobs: Array<{ data: any; options?: any }>) {
    const queue = this.queues.get(queueName);
    if (!queue) {
      throw new Error(`Queue ${queueName} not found`);
    }

    return await queue.addBulk(jobs.map(job => ({
      name: queueName,
      data: job.data,
      opts: job.options
    })));
  }

  async scheduleJob(queueName: string, data: any, delay: number) {
    return await this.addJob(queueName, data, { delay });
  }

  async scheduleRecurringJob(queueName: string, data: any, cronExpression: string) {
    return await this.addJob(queueName, data, {
      repeat: { cron: cronExpression }
    });
  }

  async getQueueStats(queueName: string) {
    const queue = this.queues.get(queueName);
    if (!queue) {
      throw new Error(`Queue ${queueName} not found`);
    }

    const [waiting, active, completed, failed, delayed] = await Promise.all([
      queue.getWaiting(),
      queue.getActive(),
      queue.getCompleted(),
      queue.getFailed(),
      queue.getDelayed()
    ]);

    return {
      waiting: waiting.length,
      active: active.length,
      completed: completed.length,
      failed: failed.length,
      delayed: delayed.length
    };
  }

  async closeAll() {
    const promises = [];
    
    for (const worker of this.workers.values()) {
      promises.push(worker.close());
    }
    
    for (const scheduler of this.schedulers.values()) {
      promises.push(scheduler.close());
    }
    
    for (const queue of this.queues.values()) {
      promises.push(queue.close());
    }

    await Promise.all(promises);
    await this.redis.quit();
    
    this.app.log.info('All queues and workers closed');
  }

  // Helper methods
  private async getPreviousScore(contractAddress: string, network: string): Promise<number> {
    const result = await this.app.db.query(`
      SELECT kaizen_score 
      FROM analysis_results ar
      JOIN projects p ON ar.project_id = p.id
      WHERE p.contract_address = $1 AND p.network = $2
      ORDER BY ar.created_at DESC
      LIMIT 1
    `, [contractAddress, network]);

    return result.rows[0]?.kaizen_score || 50; // Default to neutral score
  }

  private async cleanupOldAnalysis(olderThan: string) {
    const result = await this.app.db.query(`
      DELETE FROM analysis_results 
      WHERE created_at < NOW() - INTERVAL '${olderThan}'
    `);
    
    this.app.log.info(`Cleaned up ${result.rowCount} old analysis records`);
  }

  private async cleanupTempFiles(olderThan: string) {
    // Implementation depends on file storage system
    this.app.log.info('Temp files cleanup completed');
  }

  private async cleanupExpiredCache() {
    const keys = await this.redis.keys('cache:*');
    let cleaned = 0;
    
    for (const key of keys) {
      const ttl = await this.redis.ttl(key);
      if (ttl === -1) { // Keys without expiration
        await this.redis.del(key);
        cleaned++;
      }
    }
    
    this.app.log.info(`Cleaned up ${cleaned} expired cache entries`);
  }
}

Caching Strategy

Redis Implementation

Multi-Layer Caching Architecture

// cache/cacheManager.ts
import IORedis from 'ioredis';
import { FastifyInstance } from 'fastify';

interface CacheConfig {
  redis: {
    host: string;
    port: number;
    password?: string;
    db: number;
  };
  defaultTTL: number;
  keyPrefix: string;
}

export class CacheManager {
  private redis: IORedis;
  private localCache: Map<string, { value: any; expiry: number }> = new Map();
  private cacheHits = 0;
  private cacheMisses = 0;

  constructor(
    private config: CacheConfig,
    private app: FastifyInstance
  ) {
    this.redis = new IORedis({
      host: config.redis.host,
      port: config.redis.port,
      password: config.redis.password,
      db: config.redis.db,
      keyPrefix: config.keyPrefix,
      retryDelayOnFailover: 100,
      enableReadyCheck: false,
      lazyConnect: true,
    });

    // Set up periodic cleanup of local cache
    setInterval(() => this.cleanupLocalCache(), 60000); // Every minute
  }

  // Multi-level get operation
  async get<T = any>(key: string): Promise<T | null> {
    // Level 1: Local cache (fastest)
    const localResult = this.getFromLocalCache(key);
    if (localResult !== null) {
      this.cacheHits++;
      return localResult;
    }

    // Level 2: Redis cache
    try {
      const redisResult = await this.redis.get(key);
      if (redisResult !== null) {
        const parsed = JSON.parse(redisResult);
        
        // Store in local cache for future requests
        this.setInLocalCache(key, parsed, 300); // 5 minutes local TTL
        
        this.cacheHits++;
        return parsed;
      }
    } catch (error) {
      this.app.log.error('Redis get error:', error);
    }

    this.cacheMisses++;
    return null;
  }

  // Multi-level set operation
  async set(key: string, value: any, ttl?: number): Promise<void> {
    const actualTTL = ttl || this.config.defaultTTL;
    
    try {
      // Set in Redis with TTL
      await this.redis.setex(key, actualTTL, JSON.stringify(value));
      
      // Set in local cache with shorter TTL
      const localTTL = Math.min(actualTTL, 300); // Max 5 minutes local
      this.setInLocalCache(key, value, localTTL);
      
    } catch (error) {
      this.app.log.error('Cache set error:', error);
      throw error;
    }
  }

  // Specialized cache methods for different data types
  async cacheAnalysisResult(contractAddress: string, network: string, result: any, ttl = 900) {
    const key = `analysis:${network}:${contractAddress}`;
    await this.set(key, result, ttl);
  }

  async getCachedAnalysisResult(contractAddress: string, network: string) {
    const key = `analysis:${network}:${contractAddress}`;
    return await this.get(key);
  }

  async cacheTokenPrice(contractAddress: string, network: string, priceData: any, ttl = 300) {
    const key = `price:${network}:${contractAddress}`;
    await this.set(key, priceData, ttl);
  }

  async getCachedTokenPrice(contractAddress: string, network: string) {
    const key = `price:${network}:${contractAddress}`;
    return await this.get(key);
  }

  async cacheSocialData(contractAddress: string, platform: string, data: any, ttl = 600) {
    const key = `social:${platform}:${contractAddress}`;
    await this.set(key, data, ttl);
  }

  async getCachedSocialData(contractAddress: string, platform: string) {
    const key = `social:${platform}:${contractAddress}`;
    return await this.get(key);
  }

  // Batch operations
  async mget(keys: string[]): Promise<Array<any | null>> {
    try {
      const results = await this.redis.mget(...keys);
      return results.map(result => result ? JSON.parse(result) : null);
    } catch (error) {
      this.app.log.error('Redis mget error:', error);
      return keys.map(() => null);
    }
  }

  async mset(keyValuePairs: Array<{ key: string; value: any; ttl?: number }>): Promise<void> {
    const pipeline = this.redis.pipeline();
    
    keyValuePairs.forEach(({ key, value, ttl }) => {
      const actualTTL = ttl || this.config.defaultTTL;
      pipeline.setex(key, actualTTL, JSON.stringify(value));
    });
    
    try {
      await pipeline.exec();
    } catch (error) {
      this.app.log.error('Redis mset error:', error);
      throw error;
    }
  }

  // Cache invalidation
  async invalidate(pattern: string): Promise<number> {
    try {
      const keys = await this.redis.keys(pattern);
      if (keys.length === 0) return 0;
      
      const result = await this.redis.del(...keys);
      
      // Also invalidate from local cache
      for (const [localKey] of this.localCache) {
        if (this.matchesPattern(localKey, pattern)) {
          this.localCache.delete(localKey);
        }
      }
      
      return result;
    } catch (error) {
      this.app.log.error('Cache invalidation error:', error);
      return 0;
    }
  }

  async invalidateProject(contractAddress: string, network: string): Promise<void> {
    const patterns = [
      `analysis:${network}:${contractAddress}`,
      `price:${network}:${contractAddress}`,
      `social:*:${contractAddress}`,
      `intel:${contractAddress}`
    ];
    
    for (const pattern of patterns) {
      await this.invalidate(pattern);
    }
  }

  // Cache warming strategies
  async warmAnalysisCache(popularProjects: Array<{ contractAddress: string; network: string }>) {
    const { AnalysisService } = await import('../services/analysisService');
    const analysisService = new AnalysisService(this.app);
    
    for (const project of popularProjects) {
      try {
        const result = await analysisService.getQuickAnalysis(
          project.contractAddress,
          project.network
        );
        
        await this.cacheAnalysisResult(
          project.contractAddress,
          project.network,
          result,
          1800 // 30 minutes for popular projects
        );
        
        this.app.log.debug(`Warmed cache for ${project.contractAddress}`);
      } catch (error) {
        this.app.log.warn(`Failed to warm cache for ${project.contractAddress}:`, error);
      }
    }
  }

  // Local cache operations
  private getFromLocalCache(key: string): any | null {
    const entry = this.localCache.get(key);
    if (!entry) return null;
    
    if (Date.now() > entry.expiry) {
      this.localCache.delete(key);
      return null;
    }
    
    return entry.value;
  }

  private setInLocalCache(key: string, value: any, ttl: number): void {
    const expiry = Date.now() + (ttl * 1000);
    this.localCache.set(key, { value, expiry });
  }

  private cleanupLocalCache(): void {
    const now = Date.now();
    let cleaned = 0;
    
    for (const [key, entry] of this.localCache) {
      if (now > entry.expiry) {
        this.localCache.delete(key);
        cleaned++;
      }
    }
    
    if (cleaned > 0) {
      this.app.log.debug(`Cleaned ${cleaned} expired local cache entries`);
    }
  }

  private matchesPattern(key: string, pattern: string): boolean {
    const regex = new RegExp(pattern.replace(/\*/g, '.*'));
    return regex.test(key);
  }

  // Monitoring and statistics
  getCacheStats() {
    const hitRate = this.cacheHits / (this.cacheHits + this.cacheMisses);
    
    return {
      hits: this.cacheHits,
      misses: this.cacheMisses,
      hitRate: hitRate || 0,
      localCacheSize: this.localCache.size,
      redisConnected: this.redis.status === 'ready'
    };
  }

  async getRedisInfo(): Promise<any> {
    try {
      const info = await this.redis.info('memory');
      const keyspace = await this.redis.info('keyspace');
      
      return {
        memory: this.parseRedisInfo(info),
        keyspace: this.parseRedisInfo(keyspace)
      };
    } catch (error) {
      this.app.log.error('Failed to get Redis info:', error);
      return null;
    }
  }

  private parseRedisInfo(infoString: string): Record<string, string> {
    const result: Record<string, string> = {};
    
    infoString.split('\n').forEach(line => {
      const [key, value] = line.split(':');
      if (key && value) {
        result[key.trim()] = value.trim();
      }
    });
    
    return result;
  }

  async close(): Promise<void> {
    await this.redis.quit();
    this.localCache.clear();
    this.app.log.info('Cache manager closed');
  }
}

This comprehensive technical infrastructure documentation provides the foundation for understanding, deploying, and maintaining Kaizen AI's scalable architecture across frontend, backend, database, messaging, and caching layers.

Last updated