Releases: pypatterns/python-cqrs
Request and event handler fallbacks with optional circuit breaker
🚀 Release notes – v4.10.0
Fallbacks for requests and events
TL;DR — Request and event handler fallbacks with optional circuit breaker; unified
ICircuitBreakerprotocol; new examples and tests.
This release adds fallback support for request handlers and event handlers, with optional circuit breaker integration. You can now define primary and fallback handlers for commands, queries, streaming handlers, and domain events, and optionally protect them with a shared circuit breaker protocol.
🆕 What's new
📨 Request handler fallbacks
RequestHandlerFallback— wrap a primary and fallback request handler so that when the primary fails (or the circuit is open), the fallback is used.- Works with both sync
RequestHandler[Req, Res]and streamingStreamingRequestHandler[Req, Res]. - Optional
failure_exceptionsto trigger fallback only for specific exception types (e.g.ConnectionError,TimeoutError). - Optional circuit breaker so that after N failures the primary is skipped and the fallback runs directly.
Example:
from cqrs import RequestHandlerFallback
from cqrs.adapters.circuit_breaker import AioBreakerAdapter
request_cb = AioBreakerAdapter(fail_max=5, timeout_duration=60)
request_map.bind(
GetOrderCommand,
RequestHandlerFallback(
GetOrderHandler,
GetOrderFallbackHandler,
failure_exceptions=(ConnectionError, TimeoutError),
circuit_breaker=request_cb,
),
)📢 Event handler fallbacks
EventHandlerFallback— wrap a primary and fallback event handler so that when the primary fails (or the circuit is open), the fallback is used.- Same options:
failure_exceptionsand optional circuit breaker. - Supported in both event dispatcher and event emitter (handlers registered in
EventMap).
Example:
from cqrs import EventHandlerFallback
event_cb = AioBreakerAdapter(fail_max=5, timeout_duration=60)
event_map.bind(
OrderCreatedEvent,
EventHandlerFallback(
SendEmailHandler,
SendEmailFallbackHandler,
circuit_breaker=event_cb,
),
)🔌 Unified circuit breaker protocol
ICircuitBreaker– a single protocol used by:- Saga step fallbacks (existing)
- Request handler fallbacks (new)
- Event handler fallbacks (new)
AioBreakerAdapternow implementsICircuitBreaker(and stillISagaStepCircuitBreakerfor backward compatibility).- One adapter instance per “domain” (e.g. one for requests, one for events) is enough; each handler/step is namespaced by type.
📡 Streaming handlers
RequestHandlerFallbackis supported for streaming handlers: when the primary streaming handler fails, the fallback stream is used from the start (no resume from the same stream).
🛠️ Helpers
get_generic_args_for_origin()incqrs.generic_utils– used to validate that primary and fallback handlers handle the same request/event and (for requests) the same response type.should_use_fallback()incqrs.circuit_breaker– central logic to decide whether to run the fallback after a primary failure (circuit open or exception infailure_exceptions, or any exception iffailure_exceptionsis empty).
📚 Documentation and examples
- Examples added:
examples/request_fallback.py– request handler fallback with optional circuit breakerexamples/cor_request_fallback.py– CoR (chain of responsibility) with fallbackexamples/event_fallback.py– event handler fallbackexamples/streaming_handler_fallback.py– streaming request handler fallback
- Unit tests:
test_request_fallback.py,test_event_fallback.py; integration tests for the circuit breaker adapter updated.
📋 API summary
| Item | Description |
|---|---|
cqrs.RequestHandlerFallback |
Fallback wrapper for request/streaming handlers (primary, fallback, optional failure_exceptions, optional circuit_breaker). |
cqrs.EventHandlerFallback |
Fallback wrapper for event handlers (same options). |
cqrs.ICircuitBreaker |
Protocol: call(identifier, func, *args, **kwargs) and is_circuit_breaker_error(exc). |
cqrs.circuit_breaker.should_use_fallback |
Helper to decide if fallback should run after primary error. |
cqrs.generic_utils.get_generic_args_for_origin |
Extract generic type args from handler classes (for validation). |
✅ Compatibility
- Backward compatible: Existing saga fallbacks and
AioBreakerAdapterusage continue to work.AioBreakerAdapternow implementsICircuitBreakerin addition toISagaStepCircuitBreaker. - Python: 3.10+
- Optional:
aiobreakerfor circuit breaker support (pip install python-cqrs[aiobreaker]).
Reduce Saga Storage Overhead
🚀 Release notes: Reduce Saga Storage Overhead
📋 Summary
This release reduces saga storage load by introducing checkpoint commits: one session per saga run with explicit commits only at key points (after each step, after each compensation step, etc.) instead of committing after every storage call. Fewer commits, shorter lock hold time, and lower risk of deadlocks when using SQLAlchemy.
✨ What's New
🎯 Checkpoint commits & SagaStorageRun protocol
- New protocol
SagaStorageRun(src/cqrs/saga/storage/protocol.py) — scoped “session” for a single saga. Methods do not commit; the caller callscommit()at checkpoints. - New method
ISagaStorage.create_run()— returns a context manager that yields aSagaStorageRun. Storages that don’t support it can raiseNotImplementedErrorand execution falls back to the previous behaviour. SagaTransaction— whencreate_run()is available, the saga runs in one session and commits only at checkpoints (after create + RUNNING, after each step, after each compensation step, at completion/failure). Storages withoutcreate_run()keep working as before.
📦 Storage implementations
- Memory (
MemorySagaStorage) — implementscreate_run()via_MemorySagaStorageRun; commit/rollback are no-ops but the protocol matches SQLAlchemy. - SQLAlchemy (
SqlAlchemySagaStorage) — implementscreate_run()via_SqlAlchemySagaStorageRunwith oneAsyncSessionper run; mutations are committed only when the saga callsrun.commit().
🔒 Deadlock mitigation
load_saga_state(..., read_for_update=True)— when loading for recovery or exclusive update, the row can be locked (e.g.SELECT ... FOR UPDATEin SQLAlchemy). Together with checkpoint commits, this shortens lock duration and reduces deadlock risk.
♻️ Compensation
SagaCompensator— new optionalon_after_compensate_stepcallback, invoked after each successfully compensated step. When using a run, the saga passesrun.commitso each compensation step is committed at a checkpoint.
📝 Documentation & types
- Docstrings added/updated for the storage protocol,
SagaStorageRun, memory and SQLAlchemy storage,SagaTransaction, execution and recovery managers, and compensator. “Strict Backward Recovery” and checkpoint behaviour are described in code.
🐛 Fixes
- Deadlocks — checkpoint commits and
read_for_updatereduce long-held locks and concurrent-update conflicts.
🧪 Tests & infrastructure
Unit tests
- New
tests/unit/test_saga/test_saga_storage_run.py— testscreate_run()and checkpoint path (memory), and fallback when storage does not implementcreate_run().
Integration tests & databases
- PostgreSQL — integration tests now run against PostgreSQL (port 5433) in addition to MySQL.
- Test split —
test_saga_storage_sqlalchemy_postgres.py/_mysql.py,test_saga_mediator_sqlalchemy_postgres.py/_mysql.py; conftest/fixtures extended withDATABASE_DSN_POSTGRESQL.
🔧 CI
- tests.yml — start and wait for PostgreSQL; set
DATABASE_DSN_POSTGRESQL; run tests with MySQL and PostgreSQL. - New workflow
codspeed.yml— run benchmarks via CodSpeed (MySQL, PostgreSQL, Redis;pytest tests/benchmarks/ --codspeed). - pytest-config.ini — added
DATABASE_DSN_POSTGRESQLto env.
📦 Docker & dependencies
- docker-compose-test.yml — new service postgres_tests (PostgreSQL 15.4, port 5433).
- pyproject.toml — version 4.9.0; dev dependency pytest-codspeed==4.2.0 added.
⚡ Benchmarks
- Benchmarks in
tests/benchmarks/updated and extended (conftest, dataclasses/default for memory and SQLAlchemy) to measure the impact of the new storage behaviour.
✅ Compatibility
- Saga examples (
saga.py,saga_recovery.py,saga_sqlalchemy_storage.py,saga_fallback.py,saga_recovery_scheduler.py, etc.) updated to the current storage API and checkpoint path where applicable. - Backward compatibility — code using
ISagaStoragewithoutcreate_run()continues to work. The new path is used only when storage implementscreate_run()(Memory and SQLAlchemy do by default).
📌 Miscellaneous
- README.md — updated for structure and usage.
- Post-review: ruff format, pre-commit, and “banchmarks” → benchmarks rename.
🎉 Summary of benefits
| ⚡ | Fewer DB commits per saga run |
| 🔒 | Shorter lock hold time and lower deadlock risk |
| ✅ | Backward compatible; custom storages can add create_run() like Memory/SQLAlchemy |
🚀 Saga Step Identity
✨ Added
- Saga step result includes saga ID — Each
SagaStepResultyielded from saga execution now carries asaga_idfield (uuid.UUID | None). This lets client code identify which saga a step belongs to and, if desired, trigger compensation immediately when a saga fails mid-way (e.g. via a dedicated API that runs compensation for a givensaga_id).
🔧 Changed
- Stricter typing for
SagaStepResult.step_type— Thestep_typefield is now typed astype[SagaStepHandler[ContextT, Resp]]instead oftyping.Any, improving type safety and IDE support without introducing new dependencies or circular imports (both types live in the same module).
📋 Technical details
SagaTransactionsetssaga_idon each yieldedSagaStepResultviadataclasses.replace()so that every step result has the current saga's ID when consumed frommediator.stream()orsaga.transaction().saga_idis optional (Noneby default) so that results created outside the execution layer (e.g. in step handlers via_generate_step_result()) remain valid; the execution layer fills it in when yielding to the client.
⚠️ Compatibility
- Backward compatible — New field
saga_idhas a default ofNone. Existing code that does not usesaga_idcontinues to work unchanged. The change tostep_typeis annotation-only; runtime behavior is unchanged.
📢 Event propagation in event handlers
This release introduces follow-up event propagation for event handlers and unifies how events flow through the pipeline. You can now build multi-level event chains (e.g. L1 → L2 → L3) where each handler can emit new domain events that are processed in the same run.
✨ What changed
Event handlers
EventHandlerhas an optionaleventsproperty. Override it in your handler to return a sequence of follow-up events afterhandle()runs. Default is()(no follow-ups).EventEmitter.emit()now returnsSequence[IEvent]: the follow-up events from all handlers for the given event.EventProcessor.emit_events()processes not only the initial events but all follow-ups in the same pipeline:- Sequential mode (
concurrent_event_handle_enable=False): BFS over events and follow-ups. - Parallel mode (
concurrent_event_handle_enable=True): events and follow-ups run under the same semaphore; as soon as one task completes, its follow-ups are queued (FIRST_COMPLETED), without waiting for siblings.
- Sequential mode (
EventDispatchernow dispatches follow-ups: after each handler'shandle(), it dispatches every event fromhandler.eventsrecursively.
Request handlers
RequestHandlerandCORRequestHandlerhave an optionaleventsproperty (default()). You can override it to return events produced by the handler; the request mediator passes them toEventProcessor.emit_events(). If you don't emit events, you don't need to implement or overrideevents.
🔄 How event processing works now
-
Request path
A command/query is sent →RequestDispatcherruns the handler → the mediator collectshandler.eventsand callsEventProcessor.emit_events(events). -
Event path
EventProcessor.emit_events(events)runs:- For each event,
EventEmitter.emit(event)is called. - For domain events: all registered handlers are invoked; after each
handle(), follow-ups are taken fromhandler.eventsand aggregated. - For notification events: the event is sent to the message broker; no follow-ups.
- Follow-ups are then processed in the same way (same pipeline, BFS or parallel), until there are no more.
- For each event,
-
Event-only path (e.g.
EventMediator)
EventDispatcher.dispatch(event)runs the handler(s) and then dispatches eachhandler.eventsfollow-up recursively.
So: one entry point (e.g. one command or one emit_events call) can trigger a whole tree of events and follow-ups, either sequentially (BFS) or in parallel under a semaphore.
💻 Code examples
Event handler without follow-ups (no events override)
Same as before: implement only handle(). The default events returns ().
import cqrs
class UserJoined(cqrs.DomainEvent, frozen=True):
user_id: str
meeting_id: str
class UserJoinedEventHandler(cqrs.EventHandler[UserJoined]):
async def handle(self, event: UserJoined) -> None:
# Handle the event; no follow-ups.
await self._notify_room(event.meeting_id, f"User {event.user_id} joined")Event handler with follow-ups (override events)
Override events to return new domain events; the pipeline will process them in the same run (BFS or parallel).
import typing
from cqrs.events.event import IEvent
class EventL1(cqrs.DomainEvent, frozen=True):
seed: str
class EventL2(cqrs.DomainEvent, frozen=True):
seed: str
class HandlerL1(cqrs.EventHandler[EventL1]):
def __init__(self) -> None:
self._follow_ups: list[IEvent] = []
@property
def events(self) -> typing.Sequence[IEvent]:
return tuple(self._follow_ups)
async def handle(self, event: EventL1) -> None:
# Do work, then emit follow-up event.
self._follow_ups.append(EventL2(seed=event.seed))Request handler without events (no events override)
Use when the handler does not emit domain events. Only handle() is required; the default events is ().
import cqrs
class ReadMeetingQuery(cqrs.Request):
meeting_id: str
class ReadMeetingQueryResult(cqrs.Response):
link: str
meeting_id: str
class ReadMeetingQueryHandler(cqrs.RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]):
def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
self._meetings_api = meetings_api
async def handle(self, request: ReadMeetingQuery) -> ReadMeetingQueryResult:
link = await self._meetings_api.get_link(request.meeting_id)
return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)No events property and no self.events list: the base RequestHandler.events returns () and the mediator simply passes an empty sequence to EventProcessor.emit_events().
📋 Summary
| Component | Change |
|---|---|
EventHandler |
Optional events property; override to return follow-up events. |
EventEmitter.emit() |
Returns Sequence[IEvent] (follow-ups from handlers). |
EventProcessor |
Processes initial events and all follow-ups (BFS or parallel). |
EventDispatcher |
Dispatches each handler's follow-ups recursively. |
RequestHandler |
Optional events property; override to return events from commands/queries. |
CORRequestHandler |
Same optional events property. |
You can keep existing handlers as-is (they behave like "no follow-ups"). To add multi-level event chains, override events in your event handlers and, if needed, in request handlers.
Happy event-driven building.
🚀 Streaming & Saga Typing Fixes
📋 Summary
Typing for streaming handlers and stream methods in mediators and sagas has been corrected. Previously you had to suppress type checks (# type: ignore[override], # pyright: ignore[reportIncompatibleMethodOverride]). Types are now aligned so you no longer need to ignore typing.
🔧 What Was Wrong
StreamingRequestHandler
- The base method was declared as
async def handle(...) -> AsyncIterator[ResT]with onlyraise NotImplementedError(noyield). - For the type checker that is a coroutine function: calling
handle(request)returnsCoroutine[..., AsyncIterator[ResT]]. - Implementations in subclasses are async generators (
async def handle(...): ... yield ...): calling them returnsAsyncGenerator/AsyncIterator. - Pyright treated the override as incompatible (base returns a coroutine, override returns an iterator), so type ignores were required (
# type: ignore[override]/# pyright: ignore[reportIncompatibleMethodOverride]).
Stream Methods in Mediators and Dispatch in Dispatchers
StreamingRequestMediator.stream,SagaMediator.stream,StreamingRequestDispatcher.dispatch, andSagaDispatcher.dispatchwere declared asasync def ... -> AsyncIterator[...]with bodies that usedyield.- The intended contract is that calling these without
awaitreturns an AsyncIterator. The public API has been aligned with that contract.
✅ What Was Changed
1. 🎯 StreamingRequestHandler (Base Class)
File: src/cqrs/requests/request_handler.py
- The abstract method was changed from
async def handle(...)todef handle(...) -> AsyncIterator[ResT]. - Subclasses still implement
async def handle(...): ... yield ...(async generator). - Calling
handler.handle(request)is now typed as returning anAsyncIterator, with no incompatible override.
2. 📡 Stream and Dispatch Methods
Files: src/cqrs/mediator.py, src/cqrs/dispatcher/streaming.py, src/cqrs/dispatcher/saga.py
- StreamingRequestMediator.stream and SagaMediator.stream: declared as
def stream(...) -> AsyncIterator[...]and implemented viareturn self._stream_impl(...); the actual logic lives inasync def _stream_impl(...)withyield. - StreamingRequestDispatcher.dispatch and SagaDispatcher.dispatch: declared as
def dispatch(...) -> AsyncIterator[...]and implemented viareturn self._dispatch_impl(...); logic inasync def _dispatch_impl(...)withyield. - Usage is unchanged: you still write
async for result in mediator.stream(request)andasync for result in mediator.stream(context, saga_id=...).
3. 🧹 Type Ignores Removed
- Examples:
examples/streaming_handler_parallel_events.py,examples/fastapi_sse_streaming.py— removed# type: ignore[override]fromhandle. - Tests: All unit and integration tests for streaming and saga no longer use
# type: ignore[override]or# type: ignoreon streaming handlerhandlemethods. - Benchmarks:
tests/benchmarks/dataclasses/test_benchmark_stream_request_handler.pyandtests/benchmarks/pydantic/test_benchmark_stream_request_handler.py— removed# pyright: ignore[reportIncompatibleMethodOverride]fromasync def handle.
Before, these spots required type ignores. After, types are consistent and no ignores are needed.
4. 🧪 Contract Tests
- New tests ensure that calling without
awaitreturns an AsyncIterator and that it can be consumed withasync for:test_streaming_handler_handle_returns_async_iterator_consumable_with_async_fortest_streaming_mediator_stream_returns_async_iterator_consumable_with_async_fortest_saga_mediator_stream_returns_async_iterator_consumable_with_async_for
5. 📝 Other
- saga/bootstrap.py — Docstring example fixed to use
result.step_typeinstead ofresult.step_result.step_type. - examples/streaming_handler_parallel_events.py — Added a short delay before assertions to account for fire-and-forget event handling, and corrected assertion expectations (one handler invocation per order, not doubled).
📌 Summary for Library Users
| Before | After |
|---|---|
Subclasses of StreamingRequestHandler and benchmark handlers needed # type: ignore[override] or # pyright: ignore[reportIncompatibleMethodOverride] on handle. |
Types are compatible; no type ignores required. |
The “call without await → AsyncIterator” contract was not reflected in the types for stream / dispatch. |
stream and dispatch are typed to return AsyncIterator; type checkers are satisfied. |
Runtime behavior and usage (async for result in mediator.stream(...)) are unchanged.
Extend Saga Storage interface with new improvements
🎯 Overview
This release extends the Saga Storage interface with two improvements for recovery workflows: explicit control over the recovery attempt counter and optional filtering of recovery candidates by saga name. All changes are backward compatible.
✨ What's New
🔢 set_recovery_attempts — Explicit Recovery Counter
A new method on ISagaStorage and its implementations lets you set the recovery attempt counter to any value instead of only incrementing it.
Use cases:
| Scenario | Example |
|---|---|
| Reset after successful step recovery | After you successfully resume one step, reset the counter so the saga stays eligible for recovery: await storage.set_recovery_attempts(saga_id, 0) |
| Exclude from recovery without changing status | Mark a saga as "no more retries" by setting the counter to the max: await storage.set_recovery_attempts(saga_id, max_recovery_attempts) — it will no longer appear in get_sagas_for_recovery() |
Signature:
async def set_recovery_attempts(self, saga_id: uuid.UUID, attempts: int) -> NoneImplemented in: MemorySagaStorage, SqlAlchemySagaStorage.
🏷️ get_sagas_for_recovery — Optional Filter by Saga Name
get_sagas_for_recovery() now accepts an optional saga_name parameter. You can run separate recovery jobs per saga type and only fetch the sagas that job is responsible for.
saga_name |
Behavior |
|---|---|
None (default) |
Returns all saga types — same as before, fully backward compatible |
"OrderSaga" |
Returns only sagas with name == "OrderSaga" |
Example — one job per saga type:
# Job 1: only OrderSaga
ids = await storage.get_sagas_for_recovery(
limit=50,
max_recovery_attempts=5,
saga_name="OrderSaga",
)
# Job 2: only PaymentSaga
ids = await storage.get_sagas_for_recovery(
limit=50,
max_recovery_attempts=5,
saga_name="PaymentSaga",
)Updated signature:
async def get_sagas_for_recovery(
self,
limit: int,
max_recovery_attempts: int = 5,
stale_after_seconds: int | None = None,
saga_name: str | None = None, # NEW
) -> list[uuid.UUID]📋 Summary of Changes
| Area | Change |
|---|---|
Protocol (ISagaStorage) |
New method set_recovery_attempts(saga_id, attempts); get_sagas_for_recovery gains optional saga_name=None |
| SqlAlchemySagaStorage | Implements set_recovery_attempts; adds WHERE name = :saga_name when saga_name is set |
| MemorySagaStorage | Implements set_recovery_attempts; filters in-memory by data["name"] == saga_name when saga_name is set |
| Tests | New integration tests for set_recovery_attempts (set value, exclude from recovery, not found) and for saga_name (filter by name, None returns all types) in both storage backends |
🔄 Migration & Compatibility
- Existing code that calls
get_sagas_for_recovery(limit=..., max_recovery_attempts=..., stale_after_seconds=...)continues to work unchanged;saga_namedefaults toNone. - Custom storage implementations of
ISagaStoragemust implement the new abstract methodset_recovery_attempts(saga_id, attempts)and add the optionalsaga_nameparameter toget_sagas_for_recoveryto satisfy the interface.
📦 Full Changelog
Added
- Saga Storage: method
set_recovery_attempts(saga_id, attempts)to set recovery attempt counter explicitly - Saga Storage: optional parameter
saga_nameinget_sagas_for_recovery()for filtering recovery candidates by saga name - Integration tests for
set_recovery_attemptsandsaga_namefiltering (Memory and SqlAlchemy)
Changed
get_sagas_for_recovery()signature extended withsaga_name: str | None = None(backward compatible)
🐛 Fixed: sagas that failed before any step completed
🐛 Saga recovery: sagas that failed before any step completed
Problem: Sagas that failed on the first step (no steps with act + COMPLETED in the log) were still selected by the recovery task every run. For them, compensation was never executed (there were no completed steps to compensate), but they stayed in a "recoverable" state and were picked again on the next run, causing repeated "recovered" logs and unnecessary DB load.
Changes:
-
Exclude FAILED sagas from recovery
get_sagas_for_recovery()now returns only sagas in RUNNING or COMPENSATING status. Sagas already in FAILED are no longer returned, so they are not retried every minute.- Updated in:
SqlAlchemySagaStorage,MemorySagaStorage, and the storage protocol docstring.
- Updated in:
-
Explicit handling when there are no steps to compensate
When compensation runs with an empty list of completed steps (saga failed before completing any step):compensation.py: At the start ofcompensate_steps(), ifcompleted_stepsis empty, the saga status is set to FAILED, a short info log is written, and the function returns (nocompensate()calls).saga.py: When recovering a saga in COMPENSATING/FAILED with no completed steps, a warning is logged that the saga failed before any step completed and is being marked FAILED without callingcompensate().
Result: Sagas that fail on the first step are marked FAILED once and no longer appear in recovery. Sagas that fail in the middle (e.g. external service down) continue to be recovered as RUNNING or COMPENSATING until compensation finishes or the saga is marked FAILED.
Tests: Integration tests for get_sagas_for_recovery now assert that only RUNNING and COMPENSATING sagas are returned, and that FAILED sagas are excluded.
Saga recovery attempts
🎉 New Features
ISagaStorage.get_sagas_for_recovery()— returns saga IDs that need recovery (statusRUNNING,COMPENSATING, orFAILED) with optional filters:limit— maximum number of IDs to returnmax_recovery_attempts(default:5) — only sagas withrecovery_attemptsstrictly less than this value; excludes repeatedly failing sagas from retrystale_after_seconds(optional) — only sagas whoseupdated_atis older thannow - stale_after_seconds; avoids picking sagas currently being executed by another worker
ISagaStorage.increment_recovery_attempts()— atomically incrementsrecovery_attemptsand optionally updates saga status (e.g. toFAILED). Intended for use after a failed recovery;recover_saga()calls it automatically on exception, so callers do not need to call it manually.recovery_attemptsfield in saga storage — each saga execution now has a counter of failed recovery attempts. Used byget_sagas_for_recovery()to limit retries and byincrement_recovery_attempts()on recovery failure.
Implemented in both MemorySagaStorage and SqlAlchemySagaStorage.
Changed
recover_saga()— on recovery failure (any exception during resume), the storage'sincrement_recovery_attempts(saga_id, new_status=SagaStatus.FAILED)is invoked automatically. Sagas can then be retried untilmax_recovery_attemptsor excluded from future recovery runs viaget_sagas_for_recovery(max_recovery_attempts=...).
Documentation
- Recovery and storage docs now describe recovery attempts,
get_sagas_for_recovery(), andincrement_recovery_attempts(). - Example
saga_recovery_scheduler.pydemonstrates a recovery loop usingget_sagas_for_recovery(limit, max_recovery_attempts, stale_after_seconds)andrecover_saga()without manualincrement_recovery_attemptscalls.
Upgrade notes
- Storage interface: If you implement a custom
ISagaStorage, you must add:get_sagas_for_recovery(limit, max_recovery_attempts=5, stale_after_seconds=None) -> list[uuid.UUID]increment_recovery_attempts(saga_id, new_status=None) -> None
- SqlAlchemy: The
saga_executionstable gains a new columnrecovery_attempts(INTEGER, default 0). For existing databases, add the column and backfill if needed, for example:ALTER TABLE saga_executions ADD COLUMN recovery_attempts INTEGER NOT NULL DEFAULT 0;
- Recovery jobs: Prefer
storage.get_sagas_for_recovery(limit=..., max_recovery_attempts=..., stale_after_seconds=...)instead of custom queries to select sagas for recovery.
🐛 Fixed: Saga Context Serialization and Recovery
Bug Fixes
Saga Context Serialization and Recovery
Fixed context not being saved to storage after saga completion
- Issue: When a saga completed successfully, the context was not being persisted to storage before marking the saga as
COMPLETED. This could lead to data loss if the context was modified during the last step execution. - Fix: Added final context update before setting saga status to
COMPLETEDinSagaTransaction.__aiter__(). - Impact: Saga context is now properly persisted to storage after all steps complete successfully, ensuring data consistency.
Improved field filtering in SagaContext.from_dict()
- Issue: When deserializing context from dictionary, unknown fields in the input data could cause issues or unexpected behavior.
- Fix: Added field filtering in
SagaContext.from_dict()to only include known dataclass fields before deserialization. - Impact: More robust context reconstruction that ignores unknown fields, preventing potential errors during saga recovery.
Test Improvements
Fixed test expectation for context reconstruction failures
- Issue: Test
test_recover_saga_raises_on_context_reconstruction_failurewas expectingTypeErrorwhen required fields were missing, butdataclass_wizardactually raisesMissingFieldsexception. - Fix: Updated test to expect the correct exception type (
MissingFieldsfromdataclass_wizard.errors). - Impact: Tests now correctly validate error handling during context reconstruction.
Changes
cqrs.saga.saga.SagaTransaction
- Added final context update before marking saga as
COMPLETEDto ensure context persistence.
cqrs.saga.models.SagaContext
- Enhanced
from_dict()method to filter input data to only include known dataclass fields, improving robustness during deserialization.
Technical Details
Context Serialization
SagaContext.to_dict()usesdataclass_wizard.asdict()which converts field names to camelCase (e.g.,order_id→orderId).SagaContext.from_dict()now filters input data to only include fields that exist in the dataclass definition before deserialization.
Saga Completion Flow
The saga completion process now follows this sequence:
- Execute all saga steps
- Update context in storage (final update)
- Mark saga as
COMPLETEDin storage
This ensures that any context modifications made during the last step are properly persisted.
Testing
All existing tests pass, including:
test_recover_saga_raises_on_context_reconstruction_failure- now correctly expectsMissingFieldsexceptiontest_recover_saga_updates_context_during_recovery- validates context persistence after recovery- All other saga recovery and execution tests (85 tests total)
Migration Notes
No breaking changes. This is a bug fix release that improves data consistency and error handling.
If you're using SagaContext.from_dict() with data containing unknown fields, those fields will now be automatically filtered out. This should not affect normal usage but may change behavior if you were relying on unknown fields being passed through.
Support for customizing saga storage table names through environment variables
Support for customizing saga storage table names through environment variables
Summary
This release adds support for customizing saga storage table names through environment variables, providing better flexibility for multi-tenant deployments and service-specific naming conventions.
Added
Saga Storage Configuration
-
Environment Variable Support for Saga Table Names: Added support for customizing saga execution and log table names via environment variables, similar to the existing outbox table configuration.
CQRS_SAGA_EXECUTION_TABLE_NAME: Environment variable to customize the saga execution table name (default:"saga_executions")CQRS_SAGA_LOG_TABLE_NAME: Environment variable to customize the saga log table name (default:"saga_logs")
This enhancement allows different services or deployments to use service-specific table names (e.g.,
service_manager_saga_executions,payment_service_saga_executions) while maintaining the same codebase.Usage:
# In .env file or environment variables export CQRS_SAGA_EXECUTION_TABLE_NAME=service_manager_saga_executions export CQRS_SAGA_LOG_TABLE_NAME=service_manager_saga_logs
The table names are automatically loaded from environment variables when the
cqrs.saga.storage.sqlalchemymodule is imported, usingpython-dotenvfor.envfile support.
Technical Details
Changes in cqrs.saga.storage.sqlalchemy
- Added
osanddotenvimports - Added
dotenv.load_dotenv()call to load environment variables from.envfiles - Modified
DEFAULT_SAGA_EXECUTION_TABLE_NAMEandDEFAULT_SAGA_LOG_TABLE_NAMEto be read from environment variables with fallback to default values - Updated
SagaExecutionModelandSagaLogModelto use the configurable table names - Updated
ForeignKeyreference inSagaLogModelto use the configurable execution table name
Backward Compatibility
- ✅ Fully backward compatible: If environment variables are not set, the default table names (
saga_executionsandsaga_logs) are used - ✅ No breaking changes: Existing code will continue to work without modifications
- ✅ Optional feature: The feature is opt-in via environment variables
Migration Guide
For Existing Users
No migration required. The feature is backward compatible and uses default table names if environment variables are not set.
For New Users or Custom Table Names
-
Set environment variables in your
.envfile or deployment configuration:CQRS_SAGA_EXECUTION_TABLE_NAME=your_custom_saga_executions CQRS_SAGA_LOG_TABLE_NAME=your_custom_saga_logs
-
Ensure your database migration creates tables with the custom names:
CREATE TABLE IF NOT EXISTS `your_custom_saga_executions` ( -- table definition ); CREATE TABLE IF NOT EXISTS `your_custom_saga_logs` ( -- table definition );
-
The
SqlAlchemySagaStoragewill automatically use the custom table names when initialized.
Examples
Example: Service-Specific Table Names
# .env file
CQRS_SAGA_EXECUTION_TABLE_NAME=service_manager_saga_executions
CQRS_SAGA_LOG_TABLE_NAME=service_manager_saga_logs
# Python code (no changes needed)
from cqrs.saga.storage.sqlalchemy import SqlAlchemySagaStorage
storage = SqlAlchemySagaStorage(session_factory)
# Automatically uses service_manager_saga_executions and service_manager_saga_logsRelated Issues
- Enables service-specific saga table naming for multi-service deployments
- Provides consistency with existing outbox table configuration pattern
- Improves flexibility for database schema customization