🌤️ Weather Agent Example

Real-time weather monitoring with distributed agent coordination

Intermediate Java + Kafka IoT Monitoring

📋 Overview

This example demonstrates a distributed weather monitoring system using AMCP agents. Multiple weather stations collect data, process it through specialized agents, and coordinate responses to weather events.

System Architecture

Weather Collector
Data Processor
Alert Manager
Notification Service

✨ Key Features

📡

Real-time Data Collection

Agents collect weather data from multiple sources and sensors

🔄

Event-Driven Processing

Asynchronous processing of weather updates and alerts

🚨

Smart Alerting

Intelligent threshold-based weather alert system

📊

Data Aggregation

Statistical analysis and trend detection

💻 Implementation

Weather Collector Agent

@Agent("weather-collector")
public class WeatherCollectorAgent extends BaseAgent {
    
    private final WeatherAPI weatherAPI;
    private final ScheduledExecutorService scheduler;
    
    @Override
    public void onActivation() {
        // Subscribe to collection requests
        subscribe("weather.collect.*");
        
        // Start periodic collection
        scheduler.scheduleAtFixedRate(
            this::collectWeatherData, 0, 5, TimeUnit.MINUTES);
    }
    
    @EventHandler
    @Subscribe("weather.collect.request")
    public void handleCollectionRequest(Event event) {
        String location = event.getPayload(String.class);
        collectWeatherForLocation(location);
    }
    
    private void collectWeatherData() {
        List<String> locations = getMonitoredLocations();
        
        locations.parallelStream().forEach(location -> {
            try {
                WeatherData data = weatherAPI.getCurrentWeather(location);
                
                Event weatherEvent = Event.builder()
                    .topic("weather.data.raw." + location)
                    .payload(data)
                    .timestamp(Instant.now())
                    .build();
                    
                publish(weatherEvent);
                
            } catch (Exception e) {
                publishError("weather.error.collection", location, e);
            }
        });
    }
}

Weather Processor Agent

@Agent("weather-processor")
public class WeatherProcessorAgent extends BaseAgent {
    
    private final Map<String, WeatherHistory> locationHistory = new ConcurrentHashMap<>();
    
    @Override
    public void onActivation() {
        subscribe("weather.data.raw.*");
        subscribe("weather.analysis.request.*");
    }
    
    @EventHandler
    @Subscribe("weather.data.raw.*")
    public void processWeatherData(Event event) {
        String location = extractLocationFromTopic(event.getTopic());
        WeatherData rawData = event.getPayload(WeatherData.class);
        
        // Process and enrich data
        ProcessedWeatherData processed = enrichWeatherData(rawData);
        
        // Update history
        updateLocationHistory(location, processed);
        
        // Check for alerts
        List<WeatherAlert> alerts = checkAlertConditions(location, processed);
        
        // Publish processed data
        publish(Event.builder()
            .topic("weather.data.processed." + location)
            .payload(processed)
            .build());
            
        // Publish alerts if any
        alerts.forEach(alert -> 
            publish(Event.builder()
                .topic("weather.alert." + alert.getSeverity())
                .payload(alert)
                .build())
        );
    }
    
    private ProcessedWeatherData enrichWeatherData(WeatherData raw) {
        return ProcessedWeatherData.builder()
            .from(raw)
            .heatIndex(calculateHeatIndex(raw))
            .windChill(calculateWindChill(raw))
            .dewPoint(calculateDewPoint(raw))
            .trend(calculateTrend(raw))
            .build();
    }
    
    private List<WeatherAlert> checkAlertConditions(String location, ProcessedWeatherData data) {
        List<WeatherAlert> alerts = new ArrayList<>();
        
        // Temperature alerts
        if (data.getTemperature() > 35.0) {
            alerts.add(WeatherAlert.heatWarning(location, data.getTemperature()));
        }
        
        // Wind alerts
        if (data.getWindSpeed() > 50.0) {
            alerts.add(WeatherAlert.windWarning(location, data.getWindSpeed()));
        }
        
        // Precipitation alerts
        if (data.getPrecipitation() > 25.0) {
            alerts.add(WeatherAlert.rainWarning(location, data.getPrecipitation()));
        }
        
        return alerts;
    }
}

Alert Manager Agent

@Agent("alert-manager")
public class AlertManagerAgent extends BaseAgent {
    
    private final AlertRepository alertRepository;
    private final NotificationService notificationService;
    
    @Override
    public void onActivation() {
        subscribe("weather.alert.*");
        subscribe("alert.acknowledge.*");
    }
    
    @EventHandler
    @Subscribe("weather.alert.*")
    public void handleWeatherAlert(Event event) {
        WeatherAlert alert = event.getPayload(WeatherAlert.class);
        String severity = extractSeverityFromTopic(event.getTopic());
        
        // Store alert
        alertRepository.save(alert);
        
        // Determine notification strategy based on severity
        NotificationStrategy strategy = getNotificationStrategy(severity);
        
        // Send notifications
        strategy.notify(alert);
        
        // Publish alert event for other systems
        publish(Event.builder()
            .topic("system.alert.weather")
            .payload(AlertNotification.from(alert))
            .priority(getPriority(severity))
            .build());
    }
    
    @EventHandler
    @Subscribe("alert.acknowledge.*")
    public void handleAlertAcknowledgment(Event event) {
        String alertId = event.getPayload(String.class);
        
        alertRepository.acknowledge(alertId, event.getSender());
        
        publish(Event.builder()
            .topic("alert.status.acknowledged")
            .payload(Map.of("alertId", alertId, "acknowledgedBy", event.getSender()))
            .build());
    }
    
    private NotificationStrategy getNotificationStrategy(String severity) {
        switch (severity.toLowerCase()) {
            case "critical":
                return new ImmediateNotificationStrategy(
                    List.of(EmailNotifier.class, SMSNotifier.class, PushNotifier.class));
            case "high":
                return new UrgentNotificationStrategy(
                    List.of(EmailNotifier.class, PushNotifier.class));
            case "medium":
                return new StandardNotificationStrategy(
                    List.of(EmailNotifier.class));
            default:
                return new LogOnlyNotificationStrategy();
        }
    }
}

🚀 Running the Example

1. Prerequisites

  • Java 11+ installed
  • Apache Kafka running
  • Weather API key (OpenWeatherMap)

2. Configuration

# application.properties
amcp.broker.url=kafka://localhost:9092
weather.api.key=your_api_key_here
weather.locations=london,paris,tokyo,newyork
weather.collection.interval=300000

3. Start the System

mvn spring-boot:run -Dspring-boot.run.main-class=com.example.WeatherSystemApp

4. Monitor Events

# Watch weather data
kafka-console-consumer --topic weather.data.processed.london

# Watch alerts
kafka-console-consumer --topic weather.alert.high

📊 Expected Results

Data Collection

Weather data collected every 5 minutes from configured locations

~288 data points/day per location

Processing Speed

Sub-second processing of weather data and alert generation

<500ms average latency

Alert Accuracy

Intelligent threshold-based alerting with minimal false positives

95%+ accuracy rate