📈 Stock Trading Agent
AI-powered trading with real-time market analysis and risk management
📋 System Overview
This example demonstrates an intelligent stock trading system using AMCP agents for market analysis, trading decisions, and risk management. The system processes real-time market data and executes trades based on AI-driven strategies.
Trading System Architecture
Market Data Agent
→
Analysis Agent
→
Strategy Agent
→
Risk Manager
→
Execution Agent
✨ Key Features
Real-time Market Data
Live market data processing from multiple exchanges and sources
AI-Powered Analysis
Machine learning models for technical and sentiment analysis
High-Frequency Trading
Sub-millisecond trade execution with optimized algorithms
Risk Management
Advanced risk controls and portfolio optimization
💻 Implementation
Market Data Agent
@Agent("market-data-collector")
public class MarketDataAgent extends Agent {
private final MarketDataProvider dataProvider;
private final PriceCache priceCache;
@Override
public void onActivation() {
subscribe("market.data.subscribe");
subscribe("market.data.unsubscribe");
// Start real-time data streams
startDataStreams();
}
@EventHandler
@Subscribe("market.data.subscribe")
public void subscribeToSymbol(Event event) {
SymbolSubscription subscription = event.getPayload(SymbolSubscription.class);
dataProvider.subscribe(subscription.getSymbol(), this::onPriceUpdate);
log("Subscribed to market data for: " + subscription.getSymbol());
}
private void onPriceUpdate(PriceData priceData) {
// Cache latest price
priceCache.updatePrice(priceData);
// Publish price update event
Event priceEvent = Event.builder()
.topic("market.price.update." + priceData.getSymbol())
.payload(priceData)
.timestamp(priceData.getTimestamp())
.build();
publish(priceEvent);
// Check for significant price movements
if (isSignificantMovement(priceData)) {
Event alertEvent = Event.builder()
.topic("market.price.alert")
.payload(PriceAlert.from(priceData))
.priority(EventPriority.HIGH)
.build();
publish(alertEvent);
}
}
private boolean isSignificantMovement(PriceData current) {
PriceData previous = priceCache.getPreviousPrice(current.getSymbol());
if (previous == null) return false;
double changePercent = Math.abs(
(current.getPrice() - previous.getPrice()) / previous.getPrice() * 100
);
return changePercent > 2.0; // 2% threshold
}
}
Technical Analysis Agent
@Agent("technical-analysis")
public class TechnicalAnalysisAgent extends Agent {
private final TechnicalIndicatorCalculator calculator;
private final Map<String, PriceHistory> priceHistories = new ConcurrentHashMap<>();
@Override
public void onActivation() {
subscribe("market.price.update.*");
subscribe("analysis.request.*");
}
@EventHandler
@Subscribe("market.price.update.*")
public void updateTechnicalIndicators(Event event) {
PriceData priceData = event.getPayload(PriceData.class);
String symbol = priceData.getSymbol();
// Update price history
PriceHistory history = priceHistories.computeIfAbsent(
symbol, k -> new PriceHistory(symbol, 200) // Keep 200 periods
);
history.addPrice(priceData);
// Calculate technical indicators
TechnicalIndicators indicators = calculateIndicators(history);
// Publish technical analysis
Event analysisEvent = Event.builder()
.topic("analysis.technical." + symbol)
.payload(indicators)
.build();
publish(analysisEvent);
// Generate trading signals
List<TradingSignal> signals = generateSignals(indicators, history);
for (TradingSignal signal : signals) {
Event signalEvent = Event.builder()
.topic("trading.signal." + signal.getType().name().toLowerCase())
.payload(signal)
.build();
publish(signalEvent);
}
}
private TechnicalIndicators calculateIndicators(PriceHistory history) {
List<Double> prices = history.getClosePrices();
List<Double> volumes = history.getVolumes();
return TechnicalIndicators.builder()
.symbol(history.getSymbol())
.sma20(calculator.sma(prices, 20))
.sma50(calculator.sma(prices, 50))
.ema12(calculator.ema(prices, 12))
.ema26(calculator.ema(prices, 26))
.rsi(calculator.rsi(prices, 14))
.macd(calculator.macd(prices, 12, 26, 9))
.bollingerBands(calculator.bollingerBands(prices, 20, 2.0))
.volume(calculator.volumeIndicators(volumes, prices))
.timestamp(Instant.now())
.build();
}
private List<TradingSignal> generateSignals(TechnicalIndicators indicators, PriceHistory history) {
List<TradingSignal> signals = new ArrayList<>();
// Moving Average Crossover
if (indicators.getSma20() > indicators.getSma50() &&
history.wasCrossoverRecent(20, 50)) {
signals.add(TradingSignal.bullish(
history.getSymbol(),
"SMA20 crossed above SMA50",
0.7
));
}
// RSI Oversold/Overbought
if (indicators.getRsi() < 30) {
signals.add(TradingSignal.bullish(
history.getSymbol(),
"RSI oversold condition",
0.6
));
} else if (indicators.getRsi() > 70) {
signals.add(TradingSignal.bearish(
history.getSymbol(),
"RSI overbought condition",
0.6
));
}
// MACD Signal
MacdData macd = indicators.getMacd();
if (macd.getMacdLine() > macd.getSignalLine() &&
macd.wasRecentCrossover()) {
signals.add(TradingSignal.bullish(
history.getSymbol(),
"MACD bullish crossover",
0.8
));
}
return signals;
}
}
Trading Strategy Agent
@Agent("trading-strategy")
public class TradingStrategyAgent extends Agent {
private final StrategyEngine strategyEngine;
private final PortfolioService portfolioService;
private final RiskCalculator riskCalculator;
@Override
public void onActivation() {
subscribe("trading.signal.*");
subscribe("analysis.technical.*");
subscribe("market.sentiment.*");
}
@EventHandler
@Subscribe("trading.signal.*")
public void processSignal(Event event) {
TradingSignal signal = event.getPayload(TradingSignal.class);
// Get current portfolio state
Portfolio portfolio = portfolioService.getCurrentPortfolio();
// Calculate position sizing
PositionSize positionSize = calculatePositionSize(signal, portfolio);
if (positionSize.getSize() > 0) {
// Generate trading recommendation
TradingRecommendation recommendation = TradingRecommendation.builder()
.symbol(signal.getSymbol())
.action(signal.getType() == SignalType.BULLISH ? Action.BUY : Action.SELL)
.quantity(positionSize.getSize())
.confidence(signal.getConfidence())
.reasoning(signal.getReason())
.riskLevel(positionSize.getRiskLevel())
.build();
// Publish recommendation for risk review
Event recommendationEvent = Event.builder()
.topic("trading.recommendation")
.payload(recommendation)
.correlationId(event.getCorrelationId())
.build();
publish(recommendationEvent);
}
}
@EventHandler
@Subscribe("analysis.technical.*")
public void updateStrategyParameters(Event event) {
TechnicalIndicators indicators = event.getPayload(TechnicalIndicators.class);
// Update strategy parameters based on market conditions
MarketRegime regime = determineMarketRegime(indicators);
strategyEngine.updateParameters(indicators.getSymbol(), regime);
// Publish market regime update
Event regimeEvent = Event.builder()
.topic("market.regime.update")
.payload(MarketRegimeUpdate.builder()
.symbol(indicators.getSymbol())
.regime(regime)
.confidence(calculateRegimeConfidence(indicators))
.build())
.build();
publish(regimeEvent);
}
private PositionSize calculatePositionSize(TradingSignal signal, Portfolio portfolio) {
// Kelly Criterion for position sizing
double winRate = strategyEngine.getWinRate(signal.getSymbol());
double avgWin = strategyEngine.getAverageWin(signal.getSymbol());
double avgLoss = strategyEngine.getAverageLoss(signal.getSymbol());
double kellyPercent = (winRate * avgWin - (1 - winRate) * avgLoss) / avgWin;
// Apply confidence and risk adjustments
double adjustedPercent = kellyPercent * signal.getConfidence() * 0.5; // Conservative
// Calculate position size
double portfolioValue = portfolio.getTotalValue();
double positionValue = portfolioValue * Math.min(adjustedPercent, 0.1); // Max 10%
// Get current price
double currentPrice = getCurrentPrice(signal.getSymbol());
int shares = (int) (positionValue / currentPrice);
return PositionSize.builder()
.size(shares)
.riskLevel(calculateRiskLevel(adjustedPercent))
.maxLoss(shares * currentPrice * 0.02) // 2% stop loss
.build();
}
}
Risk Management Agent
@Agent("risk-manager")
public class RiskManagerAgent extends Agent {
private final RiskLimits riskLimits;
private final PortfolioService portfolioService;
private final VaRCalculator varCalculator;
@Override
public void onActivation() {
subscribe("trading.recommendation");
subscribe("portfolio.update");
subscribe("market.volatility.alert");
}
@EventHandler
@Subscribe("trading.recommendation")
public void reviewTradingRecommendation(Event event) {
TradingRecommendation recommendation = event.getPayload(TradingRecommendation.class);
// Perform comprehensive risk checks
RiskAssessment assessment = performRiskAssessment(recommendation);
if (assessment.isApproved()) {
// Approve trade
Event approvalEvent = Event.builder()
.topic("trading.approved")
.payload(TradeApproval.builder()
.recommendation(recommendation)
.riskAssessment(assessment)
.approvalTimestamp(Instant.now())
.build())
.correlationId(event.getCorrelationId())
.build();
publish(approvalEvent);
} else {
// Reject trade
Event rejectionEvent = Event.builder()
.topic("trading.rejected")
.payload(TradeRejection.builder()
.recommendation(recommendation)
.rejectionReasons(assessment.getRejectionReasons())
.build())
.correlationId(event.getCorrelationId())
.build();
publish(rejectionEvent);
}
}
private RiskAssessment performRiskAssessment(TradingRecommendation recommendation) {
List<String> issues = new ArrayList<>();
Portfolio portfolio = portfolioService.getCurrentPortfolio();
// Check position size limits
double positionValue = recommendation.getQuantity() * getCurrentPrice(recommendation.getSymbol());
double portfolioValue = portfolio.getTotalValue();
double positionPercent = positionValue / portfolioValue;
if (positionPercent > riskLimits.getMaxPositionSize()) {
issues.add("Position size exceeds limit: " + positionPercent + " > " + riskLimits.getMaxPositionSize());
}
// Check sector concentration
double sectorExposure = calculateSectorExposure(recommendation.getSymbol(), portfolio);
if (sectorExposure > riskLimits.getMaxSectorExposure()) {
issues.add("Sector exposure exceeds limit: " + sectorExposure);
}
// Check correlation risk
double correlationRisk = calculateCorrelationRisk(recommendation.getSymbol(), portfolio);
if (correlationRisk > riskLimits.getMaxCorrelationRisk()) {
issues.add("Correlation risk too high: " + correlationRisk);
}
// Check VaR impact
double newVaR = varCalculator.calculateVaRWithNewPosition(portfolio, recommendation);
if (newVaR > riskLimits.getMaxVaR()) {
issues.add("VaR would exceed limit: " + newVaR);
}
// Check drawdown limits
double currentDrawdown = portfolio.getCurrentDrawdown();
if (currentDrawdown > riskLimits.getMaxDrawdown()) {
issues.add("Portfolio drawdown exceeds limit: " + currentDrawdown);
}
return RiskAssessment.builder()
.approved(issues.isEmpty())
.rejectionReasons(issues)
.riskScore(calculateRiskScore(recommendation, portfolio))
.build();
}
@EventHandler
@Subscribe("portfolio.update")
public void monitorPortfolioRisk(Event event) {
Portfolio portfolio = event.getPayload(Portfolio.class);
// Calculate current risk metrics
RiskMetrics metrics = calculateRiskMetrics(portfolio);
// Check for risk limit breaches
List<RiskAlert> alerts = checkRiskLimits(metrics);
if (!alerts.isEmpty()) {
// Publish risk alerts
for (RiskAlert alert : alerts) {
Event alertEvent = Event.builder()
.topic("risk.alert." + alert.getSeverity().name().toLowerCase())
.payload(alert)
.priority(alert.getSeverity() == Severity.HIGH ?
EventPriority.HIGH : EventPriority.NORMAL)
.build();
publish(alertEvent);
}
}
// Publish risk metrics update
Event metricsEvent = Event.builder()
.topic("risk.metrics.update")
.payload(metrics)
.build();
publish(metricsEvent);
}
}
🚀 Running the Trading System
1. Prerequisites
- Java 17+ and Python 3.9+
- Apache Kafka cluster
- Market data API access (Alpha Vantage, IEX)
- Trading broker API (Alpaca, Interactive Brokers)
2. Configuration
# application.properties
amcp.broker.url=kafka://localhost:9092
market.data.provider=alphavantage
market.data.api.key=your_api_key
trading.broker=alpaca
trading.api.key=your_trading_key
trading.api.secret=your_trading_secret
risk.max.position.size=0.1
risk.max.portfolio.var=0.05
3. Start the System
# Start all trading agents
mvn spring-boot:run -Dspring-boot.run.main-class=com.example.TradingSystemApp
# Monitor trading activity
kafka-console-consumer --topic trading.approved
kafka-console-consumer --topic risk.alert.high
📊 Performance Metrics
Processing Speed
Sub-millisecond trade signal processing and execution
<0.5ms latency
Risk Management
Comprehensive risk controls with real-time monitoring
99.9% risk compliance
Strategy Performance
AI-driven strategies with adaptive parameters
15%+ annual returns