Plugin Event HandlingΒΆ
Learn how to use events in your Nexus plugins for decoupled, scalable communication.
π― OverviewΒΆ
Events are the backbone of plugin communication in Nexus. They enable loose coupling between components, allowing plugins to react to system changes and communicate with each other without direct dependencies. This guide covers everything you need to know about handling events in your plugins.
ποΈ Event ArchitectureΒΆ
graph TB
subgraph "Event Publishers"
A[System Events]
B[Plugin A]
C[Plugin B]
D[HTTP Endpoints]
end
subgraph "Event Bus"
E[Event Router]
F[Event Queue]
G[Event Store]
H[Priority Manager]
end
subgraph "Event Subscribers"
I[Plugin C]
J[Plugin D]
K[Monitoring]
L[Audit Logger]
end
A --> E
B --> E
C --> E
D --> E
E --> F
E --> G
E --> H
F --> I
F --> J
F --> K
F --> L
π‘ Event BasicsΒΆ
Event StructureΒΆ
Every event in Nexus follows a consistent structure:
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Optional
import uuid
@dataclass
class Event:
"""Base event structure."""
event_type: str
event_id: str = None
timestamp: datetime = None
source: Optional[str] = None
correlation_id: Optional[str] = None
data: Dict[str, Any] = None
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.event_id is None:
self.event_id = str(uuid.uuid4())
if self.timestamp is None:
self.timestamp = datetime.utcnow()
if self.data is None:
self.data = {}
if self.metadata is None:
self.metadata = {}
Common Event TypesΒΆ
# System events
SYSTEM_STARTED = "system.started"
SYSTEM_SHUTDOWN = "system.shutdown"
PLUGIN_LOADED = "plugin.loaded"
PLUGIN_UNLOADED = "plugin.unloaded"
# User events
USER_CREATED = "user.created"
USER_UPDATED = "user.updated"
USER_DELETED = "user.deleted"
USER_LOGIN = "user.login"
USER_LOGOUT = "user.logout"
# Data events
DATA_CREATED = "data.created"
DATA_UPDATED = "data.updated"
DATA_DELETED = "data.deleted"
# Plugin-specific events
TASK_COMPLETED = "task.completed"
NOTIFICATION_SENT = "notification.sent"
ERROR_OCCURRED = "error.occurred"
π Event HandlersΒΆ
Basic Event HandlerΒΆ
from nexus.events import event_handler
from nexus.plugin import Plugin
class MyPlugin(Plugin):
"""Plugin with event handlers."""
@event_handler("user.created")
async def on_user_created(self, event):
"""Handle user creation events."""
user_id = event.data.get("user_id")
username = event.data.get("username")
self.logger.info(f"New user created: {username} (ID: {user_id})")
# Process the new user
await self.process_new_user(user_id, username)
@event_handler("user.updated")
async def on_user_updated(self, event):
"""Handle user update events."""
user_id = event.data.get("user_id")
changes = event.data.get("changes", {})
self.logger.info(f"User {user_id} updated: {changes}")
# Update related data
await self.update_user_cache(user_id, changes)
async def process_new_user(self, user_id: str, username: str):
"""Process new user registration."""
# Create user profile
await self.create_user_profile(user_id)
# Send welcome email
await self.send_welcome_email(user_id)
# Add to analytics
await self.track_user_registration(user_id, username)
Event Handler with FiltersΒΆ
from nexus.events import event_handler, EventFilter
class UserManagementPlugin(Plugin):
"""Plugin with filtered event handlers."""
@event_handler("user.updated")
@EventFilter(lambda event: event.data.get("field") == "email")
async def on_user_email_changed(self, event):
"""Handle email change events specifically."""
user_id = event.data.get("user_id")
old_email = event.data.get("old_value")
new_email = event.data.get("new_value")
# Verify new email
await self.send_email_verification(user_id, new_email)
# Update email subscriptions
await self.update_email_subscriptions(old_email, new_email)
@event_handler("user.login")
@EventFilter(lambda event: event.data.get("failed", False))
async def on_failed_login(self, event):
"""Handle failed login attempts."""
username = event.data.get("username")
ip_address = event.data.get("ip_address")
# Track failed login attempts
await self.track_failed_login(username, ip_address)
# Check for brute force attacks
await self.check_brute_force_attack(ip_address)
@event_handler("data.*") # Wildcard pattern
async def on_any_data_event(self, event):
"""Handle any data-related event."""
if event.event_type.startswith("data."):
await self.log_data_activity(event)
Priority Event HandlersΒΆ
class PriorityHandlerPlugin(Plugin):
"""Plugin demonstrating event handler priorities."""
@event_handler("user.created", priority=100) # Highest priority
async def validate_user_creation(self, event):
"""Validate user creation (runs first)."""
user_data = event.data
# Perform validation
if not self.is_valid_user_data(user_data):
# Cancel event processing
event.cancel = True
self.logger.error("Invalid user data, cancelling creation")
return
self.logger.info("User data validation passed")
@event_handler("user.created", priority=50) # Medium priority
async def setup_user_resources(self, event):
"""Set up user resources (runs after validation)."""
if event.cancelled:
return
user_id = event.data.get("user_id")
# Create user directory
await self.create_user_directory(user_id)
# Initialize user settings
await self.initialize_user_settings(user_id)
@event_handler("user.created", priority=10) # Low priority
async def send_notifications(self, event):
"""Send notifications (runs last)."""
if event.cancelled:
return
user_id = event.data.get("user_id")
# Send admin notification
await self.notify_admin_new_user(user_id)
# Send welcome notification to user
await self.send_welcome_notification(user_id)
π Emitting EventsΒΆ
Basic Event EmissionΒΆ
from nexus.events import emit_event
class UserServicePlugin(Plugin):
"""Plugin that emits user-related events."""
async def create_user(self, user_data):
"""Create user and emit creation event."""
# Create user in database
user = await self.database.create_user(user_data)
# Emit user creation event
await emit_event({
"event_type": "user.created",
"source": self.name,
"data": {
"user_id": user.id,
"username": user.username,
"email": user.email,
"created_at": user.created_at.isoformat()
}
})
return user
async def update_user(self, user_id, updates):
"""Update user and emit update event."""
# Get current user data
old_user = await self.database.get_user(user_id)
if not old_user:
raise ValueError("User not found")
# Update user
updated_user = await self.database.update_user(user_id, updates)
# Emit update event with changes
changes = {}
for field, new_value in updates.items():
old_value = getattr(old_user, field)
if old_value != new_value:
changes[field] = {
"old_value": old_value,
"new_value": new_value
}
if changes:
await emit_event({
"event_type": "user.updated",
"source": self.name,
"data": {
"user_id": user_id,
"changes": changes
}
})
return updated_user
async def delete_user(self, user_id):
"""Delete user and emit deletion event."""
# Get user data before deletion
user = await self.database.get_user(user_id)
if not user:
return False
# Delete user
success = await self.database.delete_user(user_id)
if success:
# Emit deletion event
await emit_event({
"event_type": "user.deleted",
"source": self.name,
"data": {
"user_id": user_id,
"username": user.username,
"deleted_at": datetime.utcnow().isoformat()
}
})
return success
Event CorrelationΒΆ
class WorkflowPlugin(Plugin):
"""Plugin demonstrating event correlation."""
async def start_user_onboarding(self, user_id):
"""Start user onboarding workflow."""
correlation_id = str(uuid.uuid4())
# Emit workflow started event
await emit_event({
"event_type": "workflow.started",
"source": self.name,
"correlation_id": correlation_id,
"data": {
"workflow_type": "user_onboarding",
"user_id": user_id,
"steps": ["profile_setup", "email_verification", "tutorial"]
}
})
# Start first step
await self.start_profile_setup(user_id, correlation_id)
async def start_profile_setup(self, user_id, correlation_id):
"""Start profile setup step."""
await emit_event({
"event_type": "workflow.step.started",
"source": self.name,
"correlation_id": correlation_id,
"data": {
"step": "profile_setup",
"user_id": user_id
}
})
@event_handler("user.profile.completed")
async def on_profile_completed(self, event):
"""Handle profile completion."""
user_id = event.data.get("user_id")
correlation_id = event.correlation_id
if correlation_id:
# Emit step completion
await emit_event({
"event_type": "workflow.step.completed",
"source": self.name,
"correlation_id": correlation_id,
"data": {
"step": "profile_setup",
"user_id": user_id
}
})
# Start next step
await self.start_email_verification(user_id, correlation_id)
Batch Event EmissionΒΆ
class AnalyticsPlugin(Plugin):
"""Plugin demonstrating batch event emission."""
def __init__(self, config):
super().__init__(config)
self.pending_events = []
self.batch_size = config.get("batch_size", 50)
self.flush_interval = config.get("flush_interval", 30) # seconds
async def initialize(self):
"""Initialize plugin with periodic batch flushing."""
await super().initialize()
# Start periodic flush task
self.flush_task = asyncio.create_task(self.periodic_flush())
async def cleanup(self):
"""Clean up plugin resources."""
# Cancel flush task and flush remaining events
if hasattr(self, 'flush_task'):
self.flush_task.cancel()
await self.flush_pending_events()
await super().cleanup()
async def track_event(self, event_type, data):
"""Track analytics event."""
event = {
"event_type": f"analytics.{event_type}",
"source": self.name,
"timestamp": datetime.utcnow().isoformat(),
"data": data
}
self.pending_events.append(event)
# Flush if batch is full
if len(self.pending_events) >= self.batch_size:
await self.flush_pending_events()
async def flush_pending_events(self):
"""Flush pending events as a batch."""
if not self.pending_events:
return
events_to_flush = self.pending_events.copy()
self.pending_events.clear()
# Emit batch event
await emit_event({
"event_type": "analytics.batch",
"source": self.name,
"data": {
"events": events_to_flush,
"count": len(events_to_flush)
}
})
async def periodic_flush(self):
"""Periodically flush pending events."""
while True:
try:
await asyncio.sleep(self.flush_interval)
await self.flush_pending_events()
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Error in periodic flush: {e}")
π Event PatternsΒΆ
Request-Response PatternΒΆ
class RequestResponsePlugin(Plugin):
"""Plugin demonstrating request-response event pattern."""
async def request_user_data(self, user_id):
"""Request user data from another plugin."""
request_id = str(uuid.uuid4())
# Emit data request event
await emit_event({
"event_type": "data.request",
"source": self.name,
"data": {
"request_id": request_id,
"resource_type": "user",
"resource_id": user_id,
"fields": ["profile", "preferences", "settings"]
}
})
# Wait for response
try:
response = await self.wait_for_response(request_id, timeout=10.0)
return response.data.get("user_data")
except asyncio.TimeoutError:
self.logger.warning(f"Timeout waiting for user data: {user_id}")
return None
@event_handler("data.request")
async def handle_data_request(self, event):
"""Handle data request from other plugins."""
request_id = event.data.get("request_id")
resource_type = event.data.get("resource_type")
resource_id = event.data.get("resource_id")
if resource_type == "user" and self.can_provide_user_data():
# Get user data
user_data = await self.get_user_data(resource_id)
# Send response
await emit_event({
"event_type": "data.response",
"source": self.name,
"data": {
"request_id": request_id,
"user_data": user_data,
"status": "success"
}
})
async def wait_for_response(self, request_id, timeout=30.0):
"""Wait for a response to a request."""
response_event = None
response_received = asyncio.Event()
async def response_handler(event):
nonlocal response_event
if event.data.get("request_id") == request_id:
response_event = event
response_received.set()
# Subscribe to response events
subscription_id = await self.event_bus.subscribe(
"data.response",
response_handler
)
try:
# Wait for response
await asyncio.wait_for(response_received.wait(), timeout=timeout)
return response_event
finally:
# Unsubscribe
await self.event_bus.unsubscribe(subscription_id)
Saga PatternΒΆ
class SagaPlugin(Plugin):
"""Plugin implementing saga pattern for distributed transactions."""
def __init__(self, config):
super().__init__(config)
self.active_sagas = {}
@event_handler("order.created")
async def start_order_saga(self, event):
"""Start order processing saga."""
order_id = event.data.get("order_id")
correlation_id = str(uuid.uuid4())
# Initialize saga state
saga_state = {
"saga_id": correlation_id,
"order_id": order_id,
"current_step": "payment",
"completed_steps": [],
"compensation_steps": []
}
self.active_sagas[correlation_id] = saga_state
# Start payment step
await emit_event({
"event_type": "payment.request",
"source": self.name,
"correlation_id": correlation_id,
"data": {
"order_id": order_id,
"amount": event.data.get("amount")
}
})
@event_handler("payment.completed")
async def handle_payment_completed(self, event):
"""Handle payment completion."""
correlation_id = event.correlation_id
saga_state = self.active_sagas.get(correlation_id)
if not saga_state:
return
# Update saga state
saga_state["completed_steps"].append("payment")
saga_state["current_step"] = "inventory"
# Start inventory reservation
await emit_event({
"event_type": "inventory.reserve",
"source": self.name,
"correlation_id": correlation_id,
"data": {
"order_id": saga_state["order_id"],
"items": event.data.get("items", [])
}
})
@event_handler("payment.failed")
async def handle_payment_failed(self, event):
"""Handle payment failure."""
correlation_id = event.correlation_id
saga_state = self.active_sagas.get(correlation_id)
if not saga_state:
return
# Cancel order
await emit_event({
"event_type": "order.cancelled",
"source": self.name,
"correlation_id": correlation_id,
"data": {
"order_id": saga_state["order_id"],
"reason": "payment_failed"
}
})
# Clean up saga
del self.active_sagas[correlation_id]
@event_handler("inventory.failed")
async def handle_inventory_failed(self, event):
"""Handle inventory reservation failure - trigger compensation."""
correlation_id = event.correlation_id
saga_state = self.active_sagas.get(correlation_id)
if not saga_state:
return
# Start compensation - refund payment
await emit_event({
"event_type": "payment.refund",
"source": self.name,
"correlation_id": correlation_id,
"data": {
"order_id": saga_state["order_id"],
"reason": "inventory_unavailable"
}
})
Event AggregationΒΆ
class EventAggregatorPlugin(Plugin):
"""Plugin that aggregates related events."""
def __init__(self, config):
super().__init__(config)
self.event_buffer = {}
self.aggregation_window = config.get("aggregation_window", 5.0) # seconds
@event_handler("user.activity.*")
async def aggregate_user_activity(self, event):
"""Aggregate user activity events."""
user_id = event.data.get("user_id")
if not user_id:
return
# Initialize buffer for user if not exists
if user_id not in self.event_buffer:
self.event_buffer[user_id] = {
"events": [],
"timer": None
}
# Add event to buffer
self.event_buffer[user_id]["events"].append(event)
# Cancel existing timer
if self.event_buffer[user_id]["timer"]:
self.event_buffer[user_id]["timer"].cancel()
# Set new timer
self.event_buffer[user_id]["timer"] = asyncio.create_task(
self.flush_user_activities(user_id)
)
async def flush_user_activities(self, user_id):
"""Flush aggregated user activities."""
await asyncio.sleep(self.aggregation_window)
if user_id not in self.event_buffer:
return
events = self.event_buffer[user_id]["events"]
if not events:
return
# Aggregate events
activity_summary = self.create_activity_summary(events)
# Emit aggregated event
await emit_event({
"event_type": "user.activity.summary",
"source": self.name,
"data": {
"user_id": user_id,
"time_window": self.aggregation_window,
"event_count": len(events),
"summary": activity_summary
}
})
# Clean up buffer
del self.event_buffer[user_id]
def create_activity_summary(self, events):
"""Create summary from activity events."""
summary = {
"actions": {},
"pages_visited": set(),
"total_time": 0,
"first_activity": None,
"last_activity": None
}
for event in events:
# Count actions
action = event.data.get("action")
if action:
summary["actions"][action] = summary["actions"].get(action, 0) + 1
# Track pages
page = event.data.get("page")
if page:
summary["pages_visited"].add(page)
# Track time
duration = event.data.get("duration", 0)
summary["total_time"] += duration
# Track first/last activity
if not summary["first_activity"] or event.timestamp < summary["first_activity"]:
summary["first_activity"] = event.timestamp
if not summary["last_activity"] or event.timestamp > summary["last_activity"]:
summary["last_activity"] = event.timestamp
# Convert set to list for JSON serialization
summary["pages_visited"] = list(summary["pages_visited"])
return summary
π Event SecurityΒΆ
Event ValidationΒΆ
from nexus.security import validate_event_source, check_event_permissions
class SecureEventPlugin(Plugin):
"""Plugin with secure event handling."""
@event_handler("sensitive.data.updated")
async def handle_sensitive_data(self, event):
"""Handle sensitive data with security checks."""
# Validate event source
if not validate_event_source(event.source, "trusted_plugins"):
self.logger.warning(f"Untrusted event source: {event.source}")
return
# Check permissions
if not check_event_permissions(event, "sensitive_data:read"):
self.logger.warning(f"Insufficient permissions for event: {event.event_type}")
return
# Validate event data
if not self.validate_event_data(event):
self.logger.error(f"Invalid event data: {event.event_id}")
return
# Process event
await self.process_sensitive_data(event)
def validate_event_data(self, event):
"""Validate event data structure and content."""
required_fields = ["user_id", "data_type", "action"]
for field in required_fields:
if field not in event.data:
return False
# Validate user_id format
user_id = event.data["user_id"]
if not isinstance(user_id, str) or len(user_id) < 3:
return False
# Validate data_type
allowed_types = ["profile", "settings", "preferences"]
if event.data["data_type"] not in allowed_types:
return False
return True
async def emit_secure_event(self, event_type, data, security_level="normal"):
"""Emit event with security metadata."""
await emit_event({
"event_type": event_type,
"source": self.name,
"data": data,
"metadata": {
"security_level": security_level,
"plugin_version": self.version,
"authentication_required": security_level == "high"
}
})
Event EncryptionΒΆ
from nexus.crypto import encrypt_data, decrypt_data
class EncryptedEventPlugin(Plugin):
"""Plugin handling encrypted events."""
def __init__(self, config):
super().__init__(config)
self.encryption_key = config.get("encryption_key")
async def emit_encrypted_event(self, event_type, sensitive_data):
"""Emit event with encrypted sensitive data."""
# Encrypt sensitive data
encrypted_data = encrypt_data(sensitive_data, self.encryption_key)
await emit_event({
"event_type": event_type,
"source": self.name,
"data": {
"encrypted_payload": encrypted_data,
"encryption_method": "AES-256-GCM"
},
"metadata": {
"encrypted": True,
"requires_decryption": True
}
})
@event_handler("*.encrypted")
async def handle_encrypted_event(self, event):
"""Handle encrypted events."""
if not event.metadata.get("encrypted"):
return
try:
# Decrypt data
encrypted_payload = event.data.get("encrypted_payload")
decrypted_data = decrypt_data(encrypted_payload, self.encryption_key)
# Process decrypted data
await self.process_decrypted_data(decrypted_data)
except Exception as e:
self.logger.error(f"Failed to decrypt event {event.event_id}: {e}")
π§ͺ Testing EventsΒΆ
Event TestingΒΆ
# tests/test_events.py
import pytest
from unittest.mock import AsyncMock
from nexus.testing import TestEventBus
from my_plugin.plugin import MyPlugin
class TestEventHandling:
"""Test event handling in plugins."""
@pytest.fixture
async def test_event_bus(self):
"""Create test event bus."""
event_bus = TestEventBus()
await event_bus.setup()
yield event_bus
await event_bus.cleanup()
@pytest.fixture
async def plugin(self, test_event_bus):
"""Create plugin instance."""
config = {"enabled": True}
plugin = MyPlugin(config)
plugin.event_bus = test_event_bus
await plugin.initialize()
yield plugin
await plugin.cleanup()
async def test_event_handler_called(self, plugin, test_event_bus):
"""Test that event handler is called."""
# Mock the handler method
plugin.on_user_created = AsyncMock()
# Emit event
await test_event_bus.emit({
"event_type": "user.created",
"data": {"user_id": "123", "username": "testuser"}
})
# Wait for event processing
await test_event_bus.process_events()
# Verify handler was called
plugin.on_user_created.assert_called_once()
# Check event data
call_args = plugin.on_user_created.call_args[0][0]
assert call_args.data["user_id"] == "123"
async def test_event_emission(self, plugin, test_event_bus):
"""Test event emission from plugin."""
# Subscribe to events
received_events = []
async def event_collector(event):
received_events.append(event)
await test_event_bus.subscribe("user.updated", event_collector)
# Trigger event emission
await plugin.update_user("123", {"name": "Updated Name"})
# Process events
await test_event_bus.process_events()
# Verify event was emitted
assert len(received_events) == 1
assert received_events[0].data["user_id"] == "123"
async def test_event_handler_error(self, plugin, test_event_bus):
"""Test event handler error handling."""
# Make handler raise an exception
plugin.on_user_created = AsyncMock(side_effect=Exception("Test error"))
# Emit event
await test_event_bus.emit({
"event_type": "user.created",
"data": {"user_id": "123"}
})
# Process events (should not raise exception)
await test_event_bus.process_events()
# Verify error was logged
assert test_event_bus.get_error_count() == 1
async def test_event_priority(self, plugin, test_event_bus):
"""Test event handler priority."""
execution_order = []
# High priority handler
async def high_priority_handler(event):
execution_order.append("high")
# Low priority handler
async def low_priority_handler(event):
execution_order.append("low")
# Subscribe with different priorities
await test_event_bus.subscribe("test.priority", low_priority_handler, priority=1)
await test_event_bus.subscribe("test.priority", high_priority_handler, priority=10)
# Emit event
await test_event_bus.emit({
"event_type": "test.priority",
"data": {}
})
await test_event_bus.process_events()
# Verify execution order
assert execution_order == ["high", "low"]
π― Best PracticesΒΆ
1. Event DesignΒΆ
# Good: Clear, descriptive event types
"user.profile.updated"
"order.payment.completed"
"notification.email.sent"
# Bad: Vague event types
"user.changed"
"something.happened"
"data.event"
2. Event Data StructureΒΆ
# Good: Consistent, well-structured data
{
"event_type": "user.created",
"data": {
"user_id": "123",
"username": "john_doe",
"email": "john@example.com",
"metadata": {
"registration_source": "web",
"user_agent": "Mozilla/5.0..."
}
}
}
# Bad: Inconsistent, unclear data
{
"event_type": "user_thing",
"data": {
"id": 123,
"stuff": ["thing1", "thing2"],
"other": {"random": "data"}
}
}