📈 Stock Trading Agent

AI-powered trading with real-time market analysis and risk management

Advanced Java + Python + ML Financial Trading

📋 System Overview

This example demonstrates an intelligent stock trading system using AMCP agents for market analysis, trading decisions, and risk management. The system processes real-time market data and executes trades based on AI-driven strategies.

Trading System Architecture

Market Data Agent
Analysis Agent
Strategy Agent
Risk Manager
Execution Agent

✨ Key Features

📊

Real-time Market Data

Live market data processing from multiple exchanges and sources

🧠

AI-Powered Analysis

Machine learning models for technical and sentiment analysis

High-Frequency Trading

Sub-millisecond trade execution with optimized algorithms

🛡️

Risk Management

Advanced risk controls and portfolio optimization

💻 Implementation

Market Data Agent

@Agent("market-data-collector")
public class MarketDataAgent extends Agent {
    
    private final MarketDataProvider dataProvider;
    private final PriceCache priceCache;
    
    @Override
    public void onActivation() {
        subscribe("market.data.subscribe");
        subscribe("market.data.unsubscribe");
        
        // Start real-time data streams
        startDataStreams();
    }
    
    @EventHandler
    @Subscribe("market.data.subscribe")
    public void subscribeToSymbol(Event event) {
        SymbolSubscription subscription = event.getPayload(SymbolSubscription.class);
        
        dataProvider.subscribe(subscription.getSymbol(), this::onPriceUpdate);
        
        log("Subscribed to market data for: " + subscription.getSymbol());
    }
    
    private void onPriceUpdate(PriceData priceData) {
        // Cache latest price
        priceCache.updatePrice(priceData);
        
        // Publish price update event
        Event priceEvent = Event.builder()
            .topic("market.price.update." + priceData.getSymbol())
            .payload(priceData)
            .timestamp(priceData.getTimestamp())
            .build();
            
        publish(priceEvent);
        
        // Check for significant price movements
        if (isSignificantMovement(priceData)) {
            Event alertEvent = Event.builder()
                .topic("market.price.alert")
                .payload(PriceAlert.from(priceData))
                .priority(EventPriority.HIGH)
                .build();
                
            publish(alertEvent);
        }
    }
    
    private boolean isSignificantMovement(PriceData current) {
        PriceData previous = priceCache.getPreviousPrice(current.getSymbol());
        
        if (previous == null) return false;
        
        double changePercent = Math.abs(
            (current.getPrice() - previous.getPrice()) / previous.getPrice() * 100
        );
        
        return changePercent > 2.0; // 2% threshold
    }
}

Technical Analysis Agent

@Agent("technical-analysis")
public class TechnicalAnalysisAgent extends Agent {
    
    private final TechnicalIndicatorCalculator calculator;
    private final Map<String, PriceHistory> priceHistories = new ConcurrentHashMap<>();
    
    @Override
    public void onActivation() {
        subscribe("market.price.update.*");
        subscribe("analysis.request.*");
    }
    
    @EventHandler
    @Subscribe("market.price.update.*")
    public void updateTechnicalIndicators(Event event) {
        PriceData priceData = event.getPayload(PriceData.class);
        String symbol = priceData.getSymbol();
        
        // Update price history
        PriceHistory history = priceHistories.computeIfAbsent(
            symbol, k -> new PriceHistory(symbol, 200) // Keep 200 periods
        );
        history.addPrice(priceData);
        
        // Calculate technical indicators
        TechnicalIndicators indicators = calculateIndicators(history);
        
        // Publish technical analysis
        Event analysisEvent = Event.builder()
            .topic("analysis.technical." + symbol)
            .payload(indicators)
            .build();
            
        publish(analysisEvent);
        
        // Generate trading signals
        List<TradingSignal> signals = generateSignals(indicators, history);
        
        for (TradingSignal signal : signals) {
            Event signalEvent = Event.builder()
                .topic("trading.signal." + signal.getType().name().toLowerCase())
                .payload(signal)
                .build();
                
            publish(signalEvent);
        }
    }
    
    private TechnicalIndicators calculateIndicators(PriceHistory history) {
        List<Double> prices = history.getClosePrices();
        List<Double> volumes = history.getVolumes();
        
        return TechnicalIndicators.builder()
            .symbol(history.getSymbol())
            .sma20(calculator.sma(prices, 20))
            .sma50(calculator.sma(prices, 50))
            .ema12(calculator.ema(prices, 12))
            .ema26(calculator.ema(prices, 26))
            .rsi(calculator.rsi(prices, 14))
            .macd(calculator.macd(prices, 12, 26, 9))
            .bollingerBands(calculator.bollingerBands(prices, 20, 2.0))
            .volume(calculator.volumeIndicators(volumes, prices))
            .timestamp(Instant.now())
            .build();
    }
    
    private List<TradingSignal> generateSignals(TechnicalIndicators indicators, PriceHistory history) {
        List<TradingSignal> signals = new ArrayList<>();
        
        // Moving Average Crossover
        if (indicators.getSma20() > indicators.getSma50() && 
            history.wasCrossoverRecent(20, 50)) {
            signals.add(TradingSignal.bullish(
                history.getSymbol(), 
                "SMA20 crossed above SMA50", 
                0.7
            ));
        }
        
        // RSI Oversold/Overbought
        if (indicators.getRsi() < 30) {
            signals.add(TradingSignal.bullish(
                history.getSymbol(), 
                "RSI oversold condition", 
                0.6
            ));
        } else if (indicators.getRsi() > 70) {
            signals.add(TradingSignal.bearish(
                history.getSymbol(), 
                "RSI overbought condition", 
                0.6
            ));
        }
        
        // MACD Signal
        MacdData macd = indicators.getMacd();
        if (macd.getMacdLine() > macd.getSignalLine() && 
            macd.wasRecentCrossover()) {
            signals.add(TradingSignal.bullish(
                history.getSymbol(), 
                "MACD bullish crossover", 
                0.8
            ));
        }
        
        return signals;
    }
}

Trading Strategy Agent

@Agent("trading-strategy")
public class TradingStrategyAgent extends Agent {
    
    private final StrategyEngine strategyEngine;
    private final PortfolioService portfolioService;
    private final RiskCalculator riskCalculator;
    
    @Override
    public void onActivation() {
        subscribe("trading.signal.*");
        subscribe("analysis.technical.*");
        subscribe("market.sentiment.*");
    }
    
    @EventHandler
    @Subscribe("trading.signal.*")
    public void processSignal(Event event) {
        TradingSignal signal = event.getPayload(TradingSignal.class);
        
        // Get current portfolio state
        Portfolio portfolio = portfolioService.getCurrentPortfolio();
        
        // Calculate position sizing
        PositionSize positionSize = calculatePositionSize(signal, portfolio);
        
        if (positionSize.getSize() > 0) {
            // Generate trading recommendation
            TradingRecommendation recommendation = TradingRecommendation.builder()
                .symbol(signal.getSymbol())
                .action(signal.getType() == SignalType.BULLISH ? Action.BUY : Action.SELL)
                .quantity(positionSize.getSize())
                .confidence(signal.getConfidence())
                .reasoning(signal.getReason())
                .riskLevel(positionSize.getRiskLevel())
                .build();
                
            // Publish recommendation for risk review
            Event recommendationEvent = Event.builder()
                .topic("trading.recommendation")
                .payload(recommendation)
                .correlationId(event.getCorrelationId())
                .build();
                
            publish(recommendationEvent);
        }
    }
    
    @EventHandler
    @Subscribe("analysis.technical.*")
    public void updateStrategyParameters(Event event) {
        TechnicalIndicators indicators = event.getPayload(TechnicalIndicators.class);
        
        // Update strategy parameters based on market conditions
        MarketRegime regime = determineMarketRegime(indicators);
        strategyEngine.updateParameters(indicators.getSymbol(), regime);
        
        // Publish market regime update
        Event regimeEvent = Event.builder()
            .topic("market.regime.update")
            .payload(MarketRegimeUpdate.builder()
                .symbol(indicators.getSymbol())
                .regime(regime)
                .confidence(calculateRegimeConfidence(indicators))
                .build())
            .build();
            
        publish(regimeEvent);
    }
    
    private PositionSize calculatePositionSize(TradingSignal signal, Portfolio portfolio) {
        // Kelly Criterion for position sizing
        double winRate = strategyEngine.getWinRate(signal.getSymbol());
        double avgWin = strategyEngine.getAverageWin(signal.getSymbol());
        double avgLoss = strategyEngine.getAverageLoss(signal.getSymbol());
        
        double kellyPercent = (winRate * avgWin - (1 - winRate) * avgLoss) / avgWin;
        
        // Apply confidence and risk adjustments
        double adjustedPercent = kellyPercent * signal.getConfidence() * 0.5; // Conservative
        
        // Calculate position size
        double portfolioValue = portfolio.getTotalValue();
        double positionValue = portfolioValue * Math.min(adjustedPercent, 0.1); // Max 10%
        
        // Get current price
        double currentPrice = getCurrentPrice(signal.getSymbol());
        int shares = (int) (positionValue / currentPrice);
        
        return PositionSize.builder()
            .size(shares)
            .riskLevel(calculateRiskLevel(adjustedPercent))
            .maxLoss(shares * currentPrice * 0.02) // 2% stop loss
            .build();
    }
}

Risk Management Agent

@Agent("risk-manager")
public class RiskManagerAgent extends Agent {
    
    private final RiskLimits riskLimits;
    private final PortfolioService portfolioService;
    private final VaRCalculator varCalculator;
    
    @Override
    public void onActivation() {
        subscribe("trading.recommendation");
        subscribe("portfolio.update");
        subscribe("market.volatility.alert");
    }
    
    @EventHandler
    @Subscribe("trading.recommendation")
    public void reviewTradingRecommendation(Event event) {
        TradingRecommendation recommendation = event.getPayload(TradingRecommendation.class);
        
        // Perform comprehensive risk checks
        RiskAssessment assessment = performRiskAssessment(recommendation);
        
        if (assessment.isApproved()) {
            // Approve trade
            Event approvalEvent = Event.builder()
                .topic("trading.approved")
                .payload(TradeApproval.builder()
                    .recommendation(recommendation)
                    .riskAssessment(assessment)
                    .approvalTimestamp(Instant.now())
                    .build())
                .correlationId(event.getCorrelationId())
                .build();
                
            publish(approvalEvent);
            
        } else {
            // Reject trade
            Event rejectionEvent = Event.builder()
                .topic("trading.rejected")
                .payload(TradeRejection.builder()
                    .recommendation(recommendation)
                    .rejectionReasons(assessment.getRejectionReasons())
                    .build())
                .correlationId(event.getCorrelationId())
                .build();
                
            publish(rejectionEvent);
        }
    }
    
    private RiskAssessment performRiskAssessment(TradingRecommendation recommendation) {
        List<String> issues = new ArrayList<>();
        Portfolio portfolio = portfolioService.getCurrentPortfolio();
        
        // Check position size limits
        double positionValue = recommendation.getQuantity() * getCurrentPrice(recommendation.getSymbol());
        double portfolioValue = portfolio.getTotalValue();
        double positionPercent = positionValue / portfolioValue;
        
        if (positionPercent > riskLimits.getMaxPositionSize()) {
            issues.add("Position size exceeds limit: " + positionPercent + " > " + riskLimits.getMaxPositionSize());
        }
        
        // Check sector concentration
        double sectorExposure = calculateSectorExposure(recommendation.getSymbol(), portfolio);
        if (sectorExposure > riskLimits.getMaxSectorExposure()) {
            issues.add("Sector exposure exceeds limit: " + sectorExposure);
        }
        
        // Check correlation risk
        double correlationRisk = calculateCorrelationRisk(recommendation.getSymbol(), portfolio);
        if (correlationRisk > riskLimits.getMaxCorrelationRisk()) {
            issues.add("Correlation risk too high: " + correlationRisk);
        }
        
        // Check VaR impact
        double newVaR = varCalculator.calculateVaRWithNewPosition(portfolio, recommendation);
        if (newVaR > riskLimits.getMaxVaR()) {
            issues.add("VaR would exceed limit: " + newVaR);
        }
        
        // Check drawdown limits
        double currentDrawdown = portfolio.getCurrentDrawdown();
        if (currentDrawdown > riskLimits.getMaxDrawdown()) {
            issues.add("Portfolio drawdown exceeds limit: " + currentDrawdown);
        }
        
        return RiskAssessment.builder()
            .approved(issues.isEmpty())
            .rejectionReasons(issues)
            .riskScore(calculateRiskScore(recommendation, portfolio))
            .build();
    }
    
    @EventHandler
    @Subscribe("portfolio.update")
    public void monitorPortfolioRisk(Event event) {
        Portfolio portfolio = event.getPayload(Portfolio.class);
        
        // Calculate current risk metrics
        RiskMetrics metrics = calculateRiskMetrics(portfolio);
        
        // Check for risk limit breaches
        List<RiskAlert> alerts = checkRiskLimits(metrics);
        
        if (!alerts.isEmpty()) {
            // Publish risk alerts
            for (RiskAlert alert : alerts) {
                Event alertEvent = Event.builder()
                    .topic("risk.alert." + alert.getSeverity().name().toLowerCase())
                    .payload(alert)
                    .priority(alert.getSeverity() == Severity.HIGH ? 
                        EventPriority.HIGH : EventPriority.NORMAL)
                    .build();
                    
                publish(alertEvent);
            }
        }
        
        // Publish risk metrics update
        Event metricsEvent = Event.builder()
            .topic("risk.metrics.update")
            .payload(metrics)
            .build();
            
        publish(metricsEvent);
    }
}

🚀 Running the Trading System

1. Prerequisites

  • Java 17+ and Python 3.9+
  • Apache Kafka cluster
  • Market data API access (Alpha Vantage, IEX)
  • Trading broker API (Alpaca, Interactive Brokers)

2. Configuration

# application.properties
amcp.broker.url=kafka://localhost:9092
market.data.provider=alphavantage
market.data.api.key=your_api_key
trading.broker=alpaca
trading.api.key=your_trading_key
trading.api.secret=your_trading_secret
risk.max.position.size=0.1
risk.max.portfolio.var=0.05

3. Start the System

# Start all trading agents
mvn spring-boot:run -Dspring-boot.run.main-class=com.example.TradingSystemApp

# Monitor trading activity
kafka-console-consumer --topic trading.approved
kafka-console-consumer --topic risk.alert.high

📊 Performance Metrics

Processing Speed

Sub-millisecond trade signal processing and execution

<0.5ms latency

Risk Management

Comprehensive risk controls with real-time monitoring

99.9% risk compliance

Strategy Performance

AI-driven strategies with adaptive parameters

15%+ annual returns