📡 Advanced Event Communication

Master pub/sub patterns and event orchestration

Advanced 30 minutes Java

🎯 Communication Patterns

Learn advanced event communication patterns for complex distributed systems.

Request-Response Pattern

@EventHandler
@Subscribe("order.process.request")
public void handleOrderRequest(Event event) {
    OrderRequest request = event.getPayload(OrderRequest.class);
    
    try {
        OrderResult result = processOrder(request);
        
        // Send response with correlation ID
        Event response = Event.builder()
            .topic("order.process.response")
            .payload(result)
            .correlationId(event.getCorrelationId())
            .build();
            
        publish(response);
        
    } catch (Exception e) {
        // Send error response
        Event errorResponse = Event.builder()
            .topic("order.process.error")
            .payload(ErrorDetails.from(e))
            .correlationId(event.getCorrelationId())
            .build();
            
        publish(errorResponse);
    }
}

Saga Pattern Implementation

@Agent("order-saga")
public class OrderSagaAgent extends Agent {
    
    private final Map<String, SagaState> sagaStates = new ConcurrentHashMap<>();
    
    @EventHandler
    @Subscribe("order.created")
    public void startOrderSaga(Event event) {
        Order order = event.getPayload(Order.class);
        String sagaId = UUID.randomUUID().toString();
        
        // Initialize saga state
        SagaState state = new SagaState(sagaId, order);
        sagaStates.put(sagaId, state);
        
        // Start first step: reserve inventory
        publishSagaStep("inventory.reserve", order, sagaId, 1);
    }
    
    @EventHandler
    @Subscribe("inventory.reserved")
    public void handleInventoryReserved(Event event) {
        String sagaId = event.getCorrelationId();
        SagaState state = sagaStates.get(sagaId);
        
        if (state != null) {
            state.markStepCompleted(1);
            // Next step: process payment
            publishSagaStep("payment.process", state.getOrder(), sagaId, 2);
        }
    }
    
    @EventHandler
    @Subscribe("payment.processed")
    public void handlePaymentProcessed(Event event) {
        String sagaId = event.getCorrelationId();
        SagaState state = sagaStates.get(sagaId);
        
        if (state != null) {
            state.markStepCompleted(2);
            // Final step: confirm order
            publishSagaStep("order.confirm", state.getOrder(), sagaId, 3);
        }
    }
    
    // Compensation handlers for rollback
    @EventHandler
    @Subscribe("inventory.reservation.failed")
    public void handleInventoryFailed(Event event) {
        String sagaId = event.getCorrelationId();
        compensateSaga(sagaId, "inventory_failed");
    }
}

🛤️ Advanced Event Routing

Implement sophisticated routing logic for complex event flows.

Content-Based Routing

@Agent("smart-router")
public class SmartRouterAgent extends Agent {
    
    @EventHandler
    @Subscribe("data.incoming.*")
    public void routeIncomingData(Event event) {
        DataPayload data = event.getPayload(DataPayload.class);
        String sourceType = extractSourceFromTopic(event.getTopic());
        
        // Route based on data characteristics
        if (data.getPriority() == Priority.HIGH) {
            routeToHighPriorityProcessors(data, sourceType);
        } else if (data.getSize() > LARGE_DATA_THRESHOLD) {
            routeToBatchProcessors(data, sourceType);
        } else {
            routeToStandardProcessors(data, sourceType);
        }
        
        // Also route based on content type
        switch (data.getContentType()) {
            case "image":
                routeToImageProcessors(data);
                break;
            case "text":
                routeToTextProcessors(data);
                break;
            case "video":
                routeToVideoProcessors(data);
                break;
        }
    }
    
    private void routeToHighPriorityProcessors(DataPayload data, String source) {
        Event priorityEvent = Event.builder()
            .topic("processing.high_priority." + source)
            .payload(data)
            .priority(EventPriority.HIGH)
            .build();
            
        publish(priorityEvent);
    }
}

Dynamic Topic Routing

@Agent("dynamic-router")
public class DynamicRouterAgent extends Agent {
    
    private final RoutingRuleEngine ruleEngine;
    
    @EventHandler
    @Subscribe("route.dynamic.*")
    public void dynamicRoute(Event event) {
        RoutingContext context = RoutingContext.builder()
            .event(event)
            .timestamp(Instant.now())
            .sourceAgent(event.getSender())
            .build();
        
        // Get routing decisions from rule engine
        List<RoutingDecision> decisions = ruleEngine.evaluate(context);
        
        for (RoutingDecision decision : decisions) {
            Event routedEvent = Event.builder()
                .from(event)
                .topic(decision.getTargetTopic())
                .metadata("routing_rule", decision.getRuleName())
                .metadata("routing_score", decision.getConfidence())
                .build();
                
            if (decision.shouldTransform()) {
                routedEvent = transformEvent(routedEvent, decision.getTransformation());
            }
            
            publish(routedEvent);
        }
    }
    
    @EventHandler
    @Subscribe("routing.rules.update")
    public void updateRoutingRules(Event event) {
        RoutingRules newRules = event.getPayload(RoutingRules.class);
        ruleEngine.updateRules(newRules);
        
        log("Routing rules updated: " + newRules.size() + " rules loaded");
    }
}

🎼 Event Orchestration

Coordinate complex workflows using event orchestration patterns.

Workflow Orchestrator

@Agent("workflow-orchestrator")
public class WorkflowOrchestratorAgent extends Agent {
    
    private final Map<String, WorkflowInstance> activeWorkflows = new ConcurrentHashMap<>();
    
    @EventHandler
    @Subscribe("workflow.start.*")
    public void startWorkflow(Event event) {
        String workflowType = extractWorkflowType(event.getTopic());
        WorkflowDefinition definition = getWorkflowDefinition(workflowType);
        
        String instanceId = UUID.randomUUID().toString();
        WorkflowInstance instance = new WorkflowInstance(instanceId, definition);
        instance.setInput(event.getPayload());
        
        activeWorkflows.put(instanceId, instance);
        
        // Start first step
        executeNextStep(instance);
    }
    
    @EventHandler
    @Subscribe("workflow.step.completed")
    public void handleStepCompleted(Event event) {
        String instanceId = event.getCorrelationId();
        WorkflowInstance instance = activeWorkflows.get(instanceId);
        
        if (instance != null) {
            StepResult result = event.getPayload(StepResult.class);
            instance.completeCurrentStep(result);
            
            if (instance.hasMoreSteps()) {
                executeNextStep(instance);
            } else {
                completeWorkflow(instance);
            }
        }
    }
    
    private void executeNextStep(WorkflowInstance instance) {
        WorkflowStep nextStep = instance.getNextStep();
        
        Event stepEvent = Event.builder()
            .topic("workflow.step.execute." + nextStep.getType())
            .payload(nextStep.getInput())
            .correlationId(instance.getId())
            .metadata("step_name", nextStep.getName())
            .metadata("step_index", String.valueOf(nextStep.getIndex()))
            .build();
            
        publish(stepEvent);
        
        // Set timeout for step completion
        scheduleTimeout(instance.getId(), nextStep.getTimeoutMs());
    }
    
    @EventHandler
    @Subscribe("workflow.step.timeout")
    public void handleStepTimeout(Event event) {
        String instanceId = event.getPayload(String.class);
        WorkflowInstance instance = activeWorkflows.get(instanceId);
        
        if (instance != null && !instance.isCurrentStepCompleted()) {
            // Handle timeout - retry or fail workflow
            if (instance.getCurrentStep().canRetry()) {
                retryCurrentStep(instance);
            } else {
                failWorkflow(instance, "Step timeout exceeded");
            }
        }
    }
}

🛡️ Error Handling & Recovery

Implement robust error handling and recovery mechanisms.

Circuit Breaker Pattern

@Agent("resilient-processor")
public class ResilientProcessorAgent extends Agent {
    
    private final CircuitBreaker circuitBreaker;
    private final RetryPolicy retryPolicy;
    
    public ResilientProcessorAgent() {
        this.circuitBreaker = CircuitBreaker.builder()
            .failureThreshold(5)
            .recoveryTimeout(Duration.ofMinutes(1))
            .build();
            
        this.retryPolicy = RetryPolicy.builder()
            .maxRetries(3)
            .backoff(Duration.ofSeconds(1), Duration.ofSeconds(10))
            .build();
    }
    
    @EventHandler
    @Subscribe("data.process.request")
    public void processData(Event event) {
        String requestId = event.getCorrelationId();
        
        try {
            // Execute with circuit breaker protection
            ProcessingResult result = circuitBreaker.execute(() -> {
                return retryPolicy.execute(() -> {
                    return performDataProcessing(event.getPayload());
                });
            });
            
            // Publish success result
            publishResult(requestId, result);
            
        } catch (CircuitBreakerOpenException e) {
            // Circuit breaker is open - service unavailable
            publishError(requestId, "Service temporarily unavailable", 503);
            
        } catch (RetryExhaustedException e) {
            // All retries exhausted
            publishError(requestId, "Processing failed after retries", 500);
            
        } catch (Exception e) {
            // Unexpected error
            publishError(requestId, "Unexpected processing error", 500);
        }
    }
    
    private void publishError(String requestId, String message, int errorCode) {
        ErrorResponse error = ErrorResponse.builder()
            .message(message)
            .errorCode(errorCode)
            .timestamp(Instant.now())
            .build();
            
        Event errorEvent = Event.builder()
            .topic("data.process.error")
            .payload(error)
            .correlationId(requestId)
            .build();
            
        publish(errorEvent);
    }
}

⚡ Performance Optimization

Optimize event processing for high-throughput scenarios.

Batch Event Processing

@Agent("high-throughput-processor")
public class HighThroughputProcessorAgent extends Agent {
    
    private final BlockingQueue<Event> eventQueue = new ArrayBlockingQueue<>(10000);
    private final ScheduledExecutorService batchProcessor = Executors.newScheduledThreadPool(4);
    
    @Override
    public void onActivation() {
        subscribe("data.high_volume.*");
        
        // Start batch processing threads
        for (int i = 0; i < 4; i++) {
            batchProcessor.scheduleAtFixedRate(this::processBatch, 0, 100, TimeUnit.MILLISECONDS);
        }
    }
    
    @EventHandler
    @Subscribe("data.high_volume.*")
    public void queueEvent(Event event) {
        if (!eventQueue.offer(event)) {
            // Queue is full - apply backpressure
            publishBackpressureSignal(event);
        }
    }
    
    private void processBatch() {
        List<Event> batch = new ArrayList<>();
        eventQueue.drainTo(batch, 100); // Process up to 100 events per batch
        
        if (!batch.isEmpty()) {
            try {
                // Process batch efficiently
                List<ProcessingResult> results = processBatchEfficiently(batch);
                
                // Publish results in batch
                publishBatchResults(results);
                
                // Update metrics
                updateThroughputMetrics(batch.size());
                
            } catch (Exception e) {
                // Handle batch processing error
                handleBatchError(batch, e);
            }
        }
    }
    
    private List<ProcessingResult> processBatchEfficiently(List<Event> events) {
        // Use parallel processing for CPU-intensive operations
        return events.parallelStream()
            .map(this::processEvent)
            .collect(Collectors.toList());
    }
}