📡 Advanced Event Communication
Master pub/sub patterns and event orchestration
🎯 Communication Patterns
Learn advanced event communication patterns for complex distributed systems.
Request-Response Pattern
@EventHandler
@Subscribe("order.process.request")
public void handleOrderRequest(Event event) {
OrderRequest request = event.getPayload(OrderRequest.class);
try {
OrderResult result = processOrder(request);
// Send response with correlation ID
Event response = Event.builder()
.topic("order.process.response")
.payload(result)
.correlationId(event.getCorrelationId())
.build();
publish(response);
} catch (Exception e) {
// Send error response
Event errorResponse = Event.builder()
.topic("order.process.error")
.payload(ErrorDetails.from(e))
.correlationId(event.getCorrelationId())
.build();
publish(errorResponse);
}
}
Saga Pattern Implementation
@Agent("order-saga")
public class OrderSagaAgent extends Agent {
private final Map<String, SagaState> sagaStates = new ConcurrentHashMap<>();
@EventHandler
@Subscribe("order.created")
public void startOrderSaga(Event event) {
Order order = event.getPayload(Order.class);
String sagaId = UUID.randomUUID().toString();
// Initialize saga state
SagaState state = new SagaState(sagaId, order);
sagaStates.put(sagaId, state);
// Start first step: reserve inventory
publishSagaStep("inventory.reserve", order, sagaId, 1);
}
@EventHandler
@Subscribe("inventory.reserved")
public void handleInventoryReserved(Event event) {
String sagaId = event.getCorrelationId();
SagaState state = sagaStates.get(sagaId);
if (state != null) {
state.markStepCompleted(1);
// Next step: process payment
publishSagaStep("payment.process", state.getOrder(), sagaId, 2);
}
}
@EventHandler
@Subscribe("payment.processed")
public void handlePaymentProcessed(Event event) {
String sagaId = event.getCorrelationId();
SagaState state = sagaStates.get(sagaId);
if (state != null) {
state.markStepCompleted(2);
// Final step: confirm order
publishSagaStep("order.confirm", state.getOrder(), sagaId, 3);
}
}
// Compensation handlers for rollback
@EventHandler
@Subscribe("inventory.reservation.failed")
public void handleInventoryFailed(Event event) {
String sagaId = event.getCorrelationId();
compensateSaga(sagaId, "inventory_failed");
}
}
🛤️ Advanced Event Routing
Implement sophisticated routing logic for complex event flows.
Content-Based Routing
@Agent("smart-router")
public class SmartRouterAgent extends Agent {
@EventHandler
@Subscribe("data.incoming.*")
public void routeIncomingData(Event event) {
DataPayload data = event.getPayload(DataPayload.class);
String sourceType = extractSourceFromTopic(event.getTopic());
// Route based on data characteristics
if (data.getPriority() == Priority.HIGH) {
routeToHighPriorityProcessors(data, sourceType);
} else if (data.getSize() > LARGE_DATA_THRESHOLD) {
routeToBatchProcessors(data, sourceType);
} else {
routeToStandardProcessors(data, sourceType);
}
// Also route based on content type
switch (data.getContentType()) {
case "image":
routeToImageProcessors(data);
break;
case "text":
routeToTextProcessors(data);
break;
case "video":
routeToVideoProcessors(data);
break;
}
}
private void routeToHighPriorityProcessors(DataPayload data, String source) {
Event priorityEvent = Event.builder()
.topic("processing.high_priority." + source)
.payload(data)
.priority(EventPriority.HIGH)
.build();
publish(priorityEvent);
}
}
Dynamic Topic Routing
@Agent("dynamic-router")
public class DynamicRouterAgent extends Agent {
private final RoutingRuleEngine ruleEngine;
@EventHandler
@Subscribe("route.dynamic.*")
public void dynamicRoute(Event event) {
RoutingContext context = RoutingContext.builder()
.event(event)
.timestamp(Instant.now())
.sourceAgent(event.getSender())
.build();
// Get routing decisions from rule engine
List<RoutingDecision> decisions = ruleEngine.evaluate(context);
for (RoutingDecision decision : decisions) {
Event routedEvent = Event.builder()
.from(event)
.topic(decision.getTargetTopic())
.metadata("routing_rule", decision.getRuleName())
.metadata("routing_score", decision.getConfidence())
.build();
if (decision.shouldTransform()) {
routedEvent = transformEvent(routedEvent, decision.getTransformation());
}
publish(routedEvent);
}
}
@EventHandler
@Subscribe("routing.rules.update")
public void updateRoutingRules(Event event) {
RoutingRules newRules = event.getPayload(RoutingRules.class);
ruleEngine.updateRules(newRules);
log("Routing rules updated: " + newRules.size() + " rules loaded");
}
}
🎼 Event Orchestration
Coordinate complex workflows using event orchestration patterns.
Workflow Orchestrator
@Agent("workflow-orchestrator")
public class WorkflowOrchestratorAgent extends Agent {
private final Map<String, WorkflowInstance> activeWorkflows = new ConcurrentHashMap<>();
@EventHandler
@Subscribe("workflow.start.*")
public void startWorkflow(Event event) {
String workflowType = extractWorkflowType(event.getTopic());
WorkflowDefinition definition = getWorkflowDefinition(workflowType);
String instanceId = UUID.randomUUID().toString();
WorkflowInstance instance = new WorkflowInstance(instanceId, definition);
instance.setInput(event.getPayload());
activeWorkflows.put(instanceId, instance);
// Start first step
executeNextStep(instance);
}
@EventHandler
@Subscribe("workflow.step.completed")
public void handleStepCompleted(Event event) {
String instanceId = event.getCorrelationId();
WorkflowInstance instance = activeWorkflows.get(instanceId);
if (instance != null) {
StepResult result = event.getPayload(StepResult.class);
instance.completeCurrentStep(result);
if (instance.hasMoreSteps()) {
executeNextStep(instance);
} else {
completeWorkflow(instance);
}
}
}
private void executeNextStep(WorkflowInstance instance) {
WorkflowStep nextStep = instance.getNextStep();
Event stepEvent = Event.builder()
.topic("workflow.step.execute." + nextStep.getType())
.payload(nextStep.getInput())
.correlationId(instance.getId())
.metadata("step_name", nextStep.getName())
.metadata("step_index", String.valueOf(nextStep.getIndex()))
.build();
publish(stepEvent);
// Set timeout for step completion
scheduleTimeout(instance.getId(), nextStep.getTimeoutMs());
}
@EventHandler
@Subscribe("workflow.step.timeout")
public void handleStepTimeout(Event event) {
String instanceId = event.getPayload(String.class);
WorkflowInstance instance = activeWorkflows.get(instanceId);
if (instance != null && !instance.isCurrentStepCompleted()) {
// Handle timeout - retry or fail workflow
if (instance.getCurrentStep().canRetry()) {
retryCurrentStep(instance);
} else {
failWorkflow(instance, "Step timeout exceeded");
}
}
}
}
🛡️ Error Handling & Recovery
Implement robust error handling and recovery mechanisms.
Circuit Breaker Pattern
@Agent("resilient-processor")
public class ResilientProcessorAgent extends Agent {
private final CircuitBreaker circuitBreaker;
private final RetryPolicy retryPolicy;
public ResilientProcessorAgent() {
this.circuitBreaker = CircuitBreaker.builder()
.failureThreshold(5)
.recoveryTimeout(Duration.ofMinutes(1))
.build();
this.retryPolicy = RetryPolicy.builder()
.maxRetries(3)
.backoff(Duration.ofSeconds(1), Duration.ofSeconds(10))
.build();
}
@EventHandler
@Subscribe("data.process.request")
public void processData(Event event) {
String requestId = event.getCorrelationId();
try {
// Execute with circuit breaker protection
ProcessingResult result = circuitBreaker.execute(() -> {
return retryPolicy.execute(() -> {
return performDataProcessing(event.getPayload());
});
});
// Publish success result
publishResult(requestId, result);
} catch (CircuitBreakerOpenException e) {
// Circuit breaker is open - service unavailable
publishError(requestId, "Service temporarily unavailable", 503);
} catch (RetryExhaustedException e) {
// All retries exhausted
publishError(requestId, "Processing failed after retries", 500);
} catch (Exception e) {
// Unexpected error
publishError(requestId, "Unexpected processing error", 500);
}
}
private void publishError(String requestId, String message, int errorCode) {
ErrorResponse error = ErrorResponse.builder()
.message(message)
.errorCode(errorCode)
.timestamp(Instant.now())
.build();
Event errorEvent = Event.builder()
.topic("data.process.error")
.payload(error)
.correlationId(requestId)
.build();
publish(errorEvent);
}
}
⚡ Performance Optimization
Optimize event processing for high-throughput scenarios.
Batch Event Processing
@Agent("high-throughput-processor")
public class HighThroughputProcessorAgent extends Agent {
private final BlockingQueue<Event> eventQueue = new ArrayBlockingQueue<>(10000);
private final ScheduledExecutorService batchProcessor = Executors.newScheduledThreadPool(4);
@Override
public void onActivation() {
subscribe("data.high_volume.*");
// Start batch processing threads
for (int i = 0; i < 4; i++) {
batchProcessor.scheduleAtFixedRate(this::processBatch, 0, 100, TimeUnit.MILLISECONDS);
}
}
@EventHandler
@Subscribe("data.high_volume.*")
public void queueEvent(Event event) {
if (!eventQueue.offer(event)) {
// Queue is full - apply backpressure
publishBackpressureSignal(event);
}
}
private void processBatch() {
List<Event> batch = new ArrayList<>();
eventQueue.drainTo(batch, 100); // Process up to 100 events per batch
if (!batch.isEmpty()) {
try {
// Process batch efficiently
List<ProcessingResult> results = processBatchEfficiently(batch);
// Publish results in batch
publishBatchResults(results);
// Update metrics
updateThroughputMetrics(batch.size());
} catch (Exception e) {
// Handle batch processing error
handleBatchError(batch, e);
}
}
}
private List<ProcessingResult> processBatchEfficiently(List<Event> events) {
// Use parallel processing for CPU-intensive operations
return events.parallelStream()
.map(this::processEvent)
.collect(Collectors.toList());
}
}