Skip to content

Releases: pypatterns/python-cqrs

Request and event handler fallbacks with optional circuit breaker

21 Feb 20:37
95aed36

Choose a tag to compare

🚀 Release notes – v4.10.0

Fallbacks for requests and events

TL;DR — Request and event handler fallbacks with optional circuit breaker; unified ICircuitBreaker protocol; new examples and tests.

This release adds fallback support for request handlers and event handlers, with optional circuit breaker integration. You can now define primary and fallback handlers for commands, queries, streaming handlers, and domain events, and optionally protect them with a shared circuit breaker protocol.


🆕 What's new

📨 Request handler fallbacks

  • RequestHandlerFallback — wrap a primary and fallback request handler so that when the primary fails (or the circuit is open), the fallback is used.
  • Works with both sync RequestHandler[Req, Res] and streaming StreamingRequestHandler[Req, Res].
  • Optional failure_exceptions to trigger fallback only for specific exception types (e.g. ConnectionError, TimeoutError).
  • Optional circuit breaker so that after N failures the primary is skipped and the fallback runs directly.

Example:

from cqrs import RequestHandlerFallback
from cqrs.adapters.circuit_breaker import AioBreakerAdapter

request_cb = AioBreakerAdapter(fail_max=5, timeout_duration=60)
request_map.bind(
    GetOrderCommand,
    RequestHandlerFallback(
        GetOrderHandler,
        GetOrderFallbackHandler,
        failure_exceptions=(ConnectionError, TimeoutError),
        circuit_breaker=request_cb,
    ),
)

📢 Event handler fallbacks

  • EventHandlerFallback — wrap a primary and fallback event handler so that when the primary fails (or the circuit is open), the fallback is used.
  • Same options: failure_exceptions and optional circuit breaker.
  • Supported in both event dispatcher and event emitter (handlers registered in EventMap).

Example:

from cqrs import EventHandlerFallback

event_cb = AioBreakerAdapter(fail_max=5, timeout_duration=60)
event_map.bind(
    OrderCreatedEvent,
    EventHandlerFallback(
        SendEmailHandler,
        SendEmailFallbackHandler,
        circuit_breaker=event_cb,
    ),
)

🔌 Unified circuit breaker protocol

  • ICircuitBreaker – a single protocol used by:
    • Saga step fallbacks (existing)
    • Request handler fallbacks (new)
    • Event handler fallbacks (new)
  • AioBreakerAdapter now implements ICircuitBreaker (and still ISagaStepCircuitBreaker for backward compatibility).
  • One adapter instance per “domain” (e.g. one for requests, one for events) is enough; each handler/step is namespaced by type.

📡 Streaming handlers

  • RequestHandlerFallback is supported for streaming handlers: when the primary streaming handler fails, the fallback stream is used from the start (no resume from the same stream).

🛠️ Helpers

  • get_generic_args_for_origin() in cqrs.generic_utils – used to validate that primary and fallback handlers handle the same request/event and (for requests) the same response type.
  • should_use_fallback() in cqrs.circuit_breaker – central logic to decide whether to run the fallback after a primary failure (circuit open or exception in failure_exceptions, or any exception if failure_exceptions is empty).

📚 Documentation and examples

  • Examples added:
    • examples/request_fallback.py – request handler fallback with optional circuit breaker
    • examples/cor_request_fallback.py – CoR (chain of responsibility) with fallback
    • examples/event_fallback.py – event handler fallback
    • examples/streaming_handler_fallback.py – streaming request handler fallback
  • Unit tests: test_request_fallback.py, test_event_fallback.py; integration tests for the circuit breaker adapter updated.

📋 API summary

Item Description
cqrs.RequestHandlerFallback Fallback wrapper for request/streaming handlers (primary, fallback, optional failure_exceptions, optional circuit_breaker).
cqrs.EventHandlerFallback Fallback wrapper for event handlers (same options).
cqrs.ICircuitBreaker Protocol: call(identifier, func, *args, **kwargs) and is_circuit_breaker_error(exc).
cqrs.circuit_breaker.should_use_fallback Helper to decide if fallback should run after primary error.
cqrs.generic_utils.get_generic_args_for_origin Extract generic type args from handler classes (for validation).

✅ Compatibility

  • Backward compatible: Existing saga fallbacks and AioBreakerAdapter usage continue to work. AioBreakerAdapter now implements ICircuitBreaker in addition to ISagaStepCircuitBreaker.
  • Python: 3.10+
  • Optional: aiobreaker for circuit breaker support (pip install python-cqrs[aiobreaker]).

Reduce Saga Storage Overhead

20 Feb 11:46
37e723a

Choose a tag to compare

🚀 Release notes: Reduce Saga Storage Overhead

📋 Summary

This release reduces saga storage load by introducing checkpoint commits: one session per saga run with explicit commits only at key points (after each step, after each compensation step, etc.) instead of committing after every storage call. Fewer commits, shorter lock hold time, and lower risk of deadlocks when using SQLAlchemy.


✨ What's New

🎯 Checkpoint commits & SagaStorageRun protocol

  • New protocol SagaStorageRun (src/cqrs/saga/storage/protocol.py) — scoped “session” for a single saga. Methods do not commit; the caller calls commit() at checkpoints.
  • New method ISagaStorage.create_run() — returns a context manager that yields a SagaStorageRun. Storages that don’t support it can raise NotImplementedError and execution falls back to the previous behaviour.
  • SagaTransaction — when create_run() is available, the saga runs in one session and commits only at checkpoints (after create + RUNNING, after each step, after each compensation step, at completion/failure). Storages without create_run() keep working as before.

📦 Storage implementations

  • Memory (MemorySagaStorage) — implements create_run() via _MemorySagaStorageRun; commit/rollback are no-ops but the protocol matches SQLAlchemy.
  • SQLAlchemy (SqlAlchemySagaStorage) — implements create_run() via _SqlAlchemySagaStorageRun with one AsyncSession per run; mutations are committed only when the saga calls run.commit().

🔒 Deadlock mitigation

  • load_saga_state(..., read_for_update=True) — when loading for recovery or exclusive update, the row can be locked (e.g. SELECT ... FOR UPDATE in SQLAlchemy). Together with checkpoint commits, this shortens lock duration and reduces deadlock risk.

♻️ Compensation

  • SagaCompensator — new optional on_after_compensate_step callback, invoked after each successfully compensated step. When using a run, the saga passes run.commit so each compensation step is committed at a checkpoint.

📝 Documentation & types

  • Docstrings added/updated for the storage protocol, SagaStorageRun, memory and SQLAlchemy storage, SagaTransaction, execution and recovery managers, and compensator. “Strict Backward Recovery” and checkpoint behaviour are described in code.

🐛 Fixes

  • Deadlocks — checkpoint commits and read_for_update reduce long-held locks and concurrent-update conflicts.

🧪 Tests & infrastructure

Unit tests

  • New tests/unit/test_saga/test_saga_storage_run.py — tests create_run() and checkpoint path (memory), and fallback when storage does not implement create_run().

Integration tests & databases

  • PostgreSQL — integration tests now run against PostgreSQL (port 5433) in addition to MySQL.
  • Test splittest_saga_storage_sqlalchemy_postgres.py / _mysql.py, test_saga_mediator_sqlalchemy_postgres.py / _mysql.py; conftest/fixtures extended with DATABASE_DSN_POSTGRESQL.

🔧 CI

  • tests.yml — start and wait for PostgreSQL; set DATABASE_DSN_POSTGRESQL; run tests with MySQL and PostgreSQL.
  • New workflow codspeed.yml — run benchmarks via CodSpeed (MySQL, PostgreSQL, Redis; pytest tests/benchmarks/ --codspeed).
  • pytest-config.ini — added DATABASE_DSN_POSTGRESQL to env.

📦 Docker & dependencies

  • docker-compose-test.yml — new service postgres_tests (PostgreSQL 15.4, port 5433).
  • pyproject.toml — version 4.9.0; dev dependency pytest-codspeed==4.2.0 added.

⚡ Benchmarks

  • Benchmarks in tests/benchmarks/ updated and extended (conftest, dataclasses/default for memory and SQLAlchemy) to measure the impact of the new storage behaviour.

✅ Compatibility

  • Saga examples (saga.py, saga_recovery.py, saga_sqlalchemy_storage.py, saga_fallback.py, saga_recovery_scheduler.py, etc.) updated to the current storage API and checkpoint path where applicable.
  • Backward compatibility — code using ISagaStorage without create_run() continues to work. The new path is used only when storage implements create_run() (Memory and SQLAlchemy do by default).

📌 Miscellaneous

  • README.md — updated for structure and usage.
  • Post-review: ruff format, pre-commit, and “banchmarks” → benchmarks rename.

🎉 Summary of benefits

Fewer DB commits per saga run
🔒 Shorter lock hold time and lower deadlock risk
Backward compatible; custom storages can add create_run() like Memory/SQLAlchemy

🚀 Saga Step Identity

02 Feb 11:00
ce4dbdf

Choose a tag to compare

✨ Added

  • Saga step result includes saga ID — Each SagaStepResult yielded from saga execution now carries a saga_id field (uuid.UUID | None). This lets client code identify which saga a step belongs to and, if desired, trigger compensation immediately when a saga fails mid-way (e.g. via a dedicated API that runs compensation for a given saga_id).

🔧 Changed

  • Stricter typing for SagaStepResult.step_type — The step_type field is now typed as type[SagaStepHandler[ContextT, Resp]] instead of typing.Any, improving type safety and IDE support without introducing new dependencies or circular imports (both types live in the same module).

📋 Technical details

  • SagaTransaction sets saga_id on each yielded SagaStepResult via dataclasses.replace() so that every step result has the current saga's ID when consumed from mediator.stream() or saga.transaction().
  • saga_id is optional (None by default) so that results created outside the execution layer (e.g. in step handlers via _generate_step_result()) remain valid; the execution layer fills it in when yielding to the client.

⚠️ Compatibility

  • Backward compatible — New field saga_id has a default of None. Existing code that does not use saga_id continues to work unchanged. The change to step_type is annotation-only; runtime behavior is unchanged.

📢 Event propagation in event handlers

02 Feb 07:05
976b3ba

Choose a tag to compare

This release introduces follow-up event propagation for event handlers and unifies how events flow through the pipeline. You can now build multi-level event chains (e.g. L1 → L2 → L3) where each handler can emit new domain events that are processed in the same run.


✨ What changed

Event handlers

  • EventHandler has an optional events property. Override it in your handler to return a sequence of follow-up events after handle() runs. Default is () (no follow-ups).
  • EventEmitter.emit() now returns Sequence[IEvent]: the follow-up events from all handlers for the given event.
  • EventProcessor.emit_events() processes not only the initial events but all follow-ups in the same pipeline:
    • Sequential mode (concurrent_event_handle_enable=False): BFS over events and follow-ups.
    • Parallel mode (concurrent_event_handle_enable=True): events and follow-ups run under the same semaphore; as soon as one task completes, its follow-ups are queued (FIRST_COMPLETED), without waiting for siblings.
  • EventDispatcher now dispatches follow-ups: after each handler's handle(), it dispatches every event from handler.events recursively.

Request handlers

  • RequestHandler and CORRequestHandler have an optional events property (default ()). You can override it to return events produced by the handler; the request mediator passes them to EventProcessor.emit_events(). If you don't emit events, you don't need to implement or override events.

🔄 How event processing works now

  1. Request path
    A command/query is sent → RequestDispatcher runs the handler → the mediator collects handler.events and calls EventProcessor.emit_events(events).

  2. Event path
    EventProcessor.emit_events(events) runs:

    • For each event, EventEmitter.emit(event) is called.
    • For domain events: all registered handlers are invoked; after each handle(), follow-ups are taken from handler.events and aggregated.
    • For notification events: the event is sent to the message broker; no follow-ups.
    • Follow-ups are then processed in the same way (same pipeline, BFS or parallel), until there are no more.
  3. Event-only path (e.g. EventMediator)
    EventDispatcher.dispatch(event) runs the handler(s) and then dispatches each handler.events follow-up recursively.

So: one entry point (e.g. one command or one emit_events call) can trigger a whole tree of events and follow-ups, either sequentially (BFS) or in parallel under a semaphore.

💻 Code examples

Event handler without follow-ups (no events override)

Same as before: implement only handle(). The default events returns ().

import cqrs

class UserJoined(cqrs.DomainEvent, frozen=True):
    user_id: str
    meeting_id: str

class UserJoinedEventHandler(cqrs.EventHandler[UserJoined]):
    async def handle(self, event: UserJoined) -> None:
        # Handle the event; no follow-ups.
        await self._notify_room(event.meeting_id, f"User {event.user_id} joined")

Event handler with follow-ups (override events)

Override events to return new domain events; the pipeline will process them in the same run (BFS or parallel).

import typing
from cqrs.events.event import IEvent

class EventL1(cqrs.DomainEvent, frozen=True):
    seed: str

class EventL2(cqrs.DomainEvent, frozen=True):
    seed: str

class HandlerL1(cqrs.EventHandler[EventL1]):
    def __init__(self) -> None:
        self._follow_ups: list[IEvent] = []

    @property
    def events(self) -> typing.Sequence[IEvent]:
        return tuple(self._follow_ups)

    async def handle(self, event: EventL1) -> None:
        # Do work, then emit follow-up event.
        self._follow_ups.append(EventL2(seed=event.seed))

Request handler without events (no events override)

Use when the handler does not emit domain events. Only handle() is required; the default events is ().

import cqrs

class ReadMeetingQuery(cqrs.Request):
    meeting_id: str

class ReadMeetingQueryResult(cqrs.Response):
    link: str
    meeting_id: str

class ReadMeetingQueryHandler(cqrs.RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]):
    def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
        self._meetings_api = meetings_api

    async def handle(self, request: ReadMeetingQuery) -> ReadMeetingQueryResult:
        link = await self._meetings_api.get_link(request.meeting_id)
        return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)

No events property and no self.events list: the base RequestHandler.events returns () and the mediator simply passes an empty sequence to EventProcessor.emit_events().


📋 Summary

Component Change
EventHandler Optional events property; override to return follow-up events.
EventEmitter.emit() Returns Sequence[IEvent] (follow-ups from handlers).
EventProcessor Processes initial events and all follow-ups (BFS or parallel).
EventDispatcher Dispatches each handler's follow-ups recursively.
RequestHandler Optional events property; override to return events from commands/queries.
CORRequestHandler Same optional events property.

You can keep existing handlers as-is (they behave like "no follow-ups"). To add multi-level event chains, override events in your event handlers and, if needed, in request handlers.


Happy event-driven building.

🚀 Streaming & Saga Typing Fixes

31 Jan 12:18
245d268

Choose a tag to compare

📋 Summary

Typing for streaming handlers and stream methods in mediators and sagas has been corrected. Previously you had to suppress type checks (# type: ignore[override], # pyright: ignore[reportIncompatibleMethodOverride]). Types are now aligned so you no longer need to ignore typing.


🔧 What Was Wrong

StreamingRequestHandler

  • The base method was declared as async def handle(...) -> AsyncIterator[ResT] with only raise NotImplementedError (no yield).
  • For the type checker that is a coroutine function: calling handle(request) returns Coroutine[..., AsyncIterator[ResT]].
  • Implementations in subclasses are async generators (async def handle(...): ... yield ...): calling them returns AsyncGenerator / AsyncIterator.
  • Pyright treated the override as incompatible (base returns a coroutine, override returns an iterator), so type ignores were required (# type: ignore[override] / # pyright: ignore[reportIncompatibleMethodOverride]).

Stream Methods in Mediators and Dispatch in Dispatchers

  • StreamingRequestMediator.stream, SagaMediator.stream, StreamingRequestDispatcher.dispatch, and SagaDispatcher.dispatch were declared as async def ... -> AsyncIterator[...] with bodies that used yield.
  • The intended contract is that calling these without await returns an AsyncIterator. The public API has been aligned with that contract.

✅ What Was Changed

1. 🎯 StreamingRequestHandler (Base Class)

File: src/cqrs/requests/request_handler.py

  • The abstract method was changed from async def handle(...) to def handle(...) -> AsyncIterator[ResT].
  • Subclasses still implement async def handle(...): ... yield ... (async generator).
  • Calling handler.handle(request) is now typed as returning an AsyncIterator, with no incompatible override.

2. 📡 Stream and Dispatch Methods

Files: src/cqrs/mediator.py, src/cqrs/dispatcher/streaming.py, src/cqrs/dispatcher/saga.py

  • StreamingRequestMediator.stream and SagaMediator.stream: declared as def stream(...) -> AsyncIterator[...] and implemented via return self._stream_impl(...); the actual logic lives in async def _stream_impl(...) with yield.
  • StreamingRequestDispatcher.dispatch and SagaDispatcher.dispatch: declared as def dispatch(...) -> AsyncIterator[...] and implemented via return self._dispatch_impl(...); logic in async def _dispatch_impl(...) with yield.
  • Usage is unchanged: you still write async for result in mediator.stream(request) and async for result in mediator.stream(context, saga_id=...).

3. 🧹 Type Ignores Removed

  • Examples: examples/streaming_handler_parallel_events.py, examples/fastapi_sse_streaming.py — removed # type: ignore[override] from handle.
  • Tests: All unit and integration tests for streaming and saga no longer use # type: ignore[override] or # type: ignore on streaming handler handle methods.
  • Benchmarks: tests/benchmarks/dataclasses/test_benchmark_stream_request_handler.py and tests/benchmarks/pydantic/test_benchmark_stream_request_handler.py — removed # pyright: ignore[reportIncompatibleMethodOverride] from async def handle.

Before, these spots required type ignores. After, types are consistent and no ignores are needed.

4. 🧪 Contract Tests

  • New tests ensure that calling without await returns an AsyncIterator and that it can be consumed with async for:
    • test_streaming_handler_handle_returns_async_iterator_consumable_with_async_for
    • test_streaming_mediator_stream_returns_async_iterator_consumable_with_async_for
    • test_saga_mediator_stream_returns_async_iterator_consumable_with_async_for

5. 📝 Other

  • saga/bootstrap.py — Docstring example fixed to use result.step_type instead of result.step_result.step_type.
  • examples/streaming_handler_parallel_events.py — Added a short delay before assertions to account for fire-and-forget event handling, and corrected assertion expectations (one handler invocation per order, not doubled).

📌 Summary for Library Users

Before After
Subclasses of StreamingRequestHandler and benchmark handlers needed # type: ignore[override] or # pyright: ignore[reportIncompatibleMethodOverride] on handle. Types are compatible; no type ignores required.
The “call without await → AsyncIterator” contract was not reflected in the types for stream / dispatch. stream and dispatch are typed to return AsyncIterator; type checkers are satisfied.

Runtime behavior and usage (async for result in mediator.stream(...)) are unchanged.

Extend Saga Storage interface with new improvements

28 Jan 15:38
935ec01

Choose a tag to compare

🎯 Overview

This release extends the Saga Storage interface with two improvements for recovery workflows: explicit control over the recovery attempt counter and optional filtering of recovery candidates by saga name. All changes are backward compatible.


✨ What's New

🔢 set_recovery_attempts — Explicit Recovery Counter

A new method on ISagaStorage and its implementations lets you set the recovery attempt counter to any value instead of only incrementing it.

Use cases:

Scenario Example
Reset after successful step recovery After you successfully resume one step, reset the counter so the saga stays eligible for recovery: await storage.set_recovery_attempts(saga_id, 0)
Exclude from recovery without changing status Mark a saga as "no more retries" by setting the counter to the max: await storage.set_recovery_attempts(saga_id, max_recovery_attempts) — it will no longer appear in get_sagas_for_recovery()

Signature:

async def set_recovery_attempts(self, saga_id: uuid.UUID, attempts: int) -> None

Implemented in: MemorySagaStorage, SqlAlchemySagaStorage.


🏷️ get_sagas_for_recovery — Optional Filter by Saga Name

get_sagas_for_recovery() now accepts an optional saga_name parameter. You can run separate recovery jobs per saga type and only fetch the sagas that job is responsible for.

saga_name Behavior
None (default) Returns all saga types — same as before, fully backward compatible
"OrderSaga" Returns only sagas with name == "OrderSaga"

Example — one job per saga type:

# Job 1: only OrderSaga
ids = await storage.get_sagas_for_recovery(
    limit=50,
    max_recovery_attempts=5,
    saga_name="OrderSaga",
)

# Job 2: only PaymentSaga
ids = await storage.get_sagas_for_recovery(
    limit=50,
    max_recovery_attempts=5,
    saga_name="PaymentSaga",
)

Updated signature:

async def get_sagas_for_recovery(
    self,
    limit: int,
    max_recovery_attempts: int = 5,
    stale_after_seconds: int | None = None,
    saga_name: str | None = None,  # NEW
) -> list[uuid.UUID]

📋 Summary of Changes

Area Change
Protocol (ISagaStorage) New method set_recovery_attempts(saga_id, attempts); get_sagas_for_recovery gains optional saga_name=None
SqlAlchemySagaStorage Implements set_recovery_attempts; adds WHERE name = :saga_name when saga_name is set
MemorySagaStorage Implements set_recovery_attempts; filters in-memory by data["name"] == saga_name when saga_name is set
Tests New integration tests for set_recovery_attempts (set value, exclude from recovery, not found) and for saga_name (filter by name, None returns all types) in both storage backends

🔄 Migration & Compatibility

  • Existing code that calls get_sagas_for_recovery(limit=..., max_recovery_attempts=..., stale_after_seconds=...) continues to work unchanged; saga_name defaults to None.
  • Custom storage implementations of ISagaStorage must implement the new abstract method set_recovery_attempts(saga_id, attempts) and add the optional saga_name parameter to get_sagas_for_recovery to satisfy the interface.

📦 Full Changelog

Added

  • Saga Storage: method set_recovery_attempts(saga_id, attempts) to set recovery attempt counter explicitly
  • Saga Storage: optional parameter saga_name in get_sagas_for_recovery() for filtering recovery candidates by saga name
  • Integration tests for set_recovery_attempts and saga_name filtering (Memory and SqlAlchemy)

Changed

  • get_sagas_for_recovery() signature extended with saga_name: str | None = None (backward compatible)

🐛 Fixed: sagas that failed before any step completed

28 Jan 11:37
05ee3e0

Choose a tag to compare

🐛 Saga recovery: sagas that failed before any step completed

Problem: Sagas that failed on the first step (no steps with act + COMPLETED in the log) were still selected by the recovery task every run. For them, compensation was never executed (there were no completed steps to compensate), but they stayed in a "recoverable" state and were picked again on the next run, causing repeated "recovered" logs and unnecessary DB load.

Changes:

  1. Exclude FAILED sagas from recovery
    get_sagas_for_recovery() now returns only sagas in RUNNING or COMPENSATING status. Sagas already in FAILED are no longer returned, so they are not retried every minute.

    • Updated in: SqlAlchemySagaStorage, MemorySagaStorage, and the storage protocol docstring.
  2. Explicit handling when there are no steps to compensate
    When compensation runs with an empty list of completed steps (saga failed before completing any step):

    • compensation.py: At the start of compensate_steps(), if completed_steps is empty, the saga status is set to FAILED, a short info log is written, and the function returns (no compensate() calls).
    • saga.py: When recovering a saga in COMPENSATING/FAILED with no completed steps, a warning is logged that the saga failed before any step completed and is being marked FAILED without calling compensate().

Result: Sagas that fail on the first step are marked FAILED once and no longer appear in recovery. Sagas that fail in the middle (e.g. external service down) continue to be recovered as RUNNING or COMPENSATING until compensation finishes or the saga is marked FAILED.

Tests: Integration tests for get_sagas_for_recovery now assert that only RUNNING and COMPENSATING sagas are returned, and that FAILED sagas are excluded.

Saga recovery attempts

28 Jan 10:12
a1427f3

Choose a tag to compare

🎉 New Features

  • ISagaStorage.get_sagas_for_recovery() — returns saga IDs that need recovery (status RUNNING, COMPENSATING, or FAILED) with optional filters:
    • limit — maximum number of IDs to return
    • max_recovery_attempts (default: 5) — only sagas with recovery_attempts strictly less than this value; excludes repeatedly failing sagas from retry
    • stale_after_seconds (optional) — only sagas whose updated_at is older than now - stale_after_seconds; avoids picking sagas currently being executed by another worker
  • ISagaStorage.increment_recovery_attempts() — atomically increments recovery_attempts and optionally updates saga status (e.g. to FAILED). Intended for use after a failed recovery; recover_saga() calls it automatically on exception, so callers do not need to call it manually.
  • recovery_attempts field in saga storage — each saga execution now has a counter of failed recovery attempts. Used by get_sagas_for_recovery() to limit retries and by increment_recovery_attempts() on recovery failure.

Implemented in both MemorySagaStorage and SqlAlchemySagaStorage.

Changed

  • recover_saga() — on recovery failure (any exception during resume), the storage's increment_recovery_attempts(saga_id, new_status=SagaStatus.FAILED) is invoked automatically. Sagas can then be retried until max_recovery_attempts or excluded from future recovery runs via get_sagas_for_recovery(max_recovery_attempts=...).

Documentation

  • Recovery and storage docs now describe recovery attempts, get_sagas_for_recovery(), and increment_recovery_attempts().
  • Example saga_recovery_scheduler.py demonstrates a recovery loop using get_sagas_for_recovery(limit, max_recovery_attempts, stale_after_seconds) and recover_saga() without manual increment_recovery_attempts calls.

Upgrade notes

  • Storage interface: If you implement a custom ISagaStorage, you must add:
    • get_sagas_for_recovery(limit, max_recovery_attempts=5, stale_after_seconds=None) -> list[uuid.UUID]
    • increment_recovery_attempts(saga_id, new_status=None) -> None
  • SqlAlchemy: The saga_executions table gains a new column recovery_attempts (INTEGER, default 0). For existing databases, add the column and backfill if needed, for example:
    ALTER TABLE saga_executions ADD COLUMN recovery_attempts INTEGER NOT NULL DEFAULT 0;
  • Recovery jobs: Prefer storage.get_sagas_for_recovery(limit=..., max_recovery_attempts=..., stale_after_seconds=...) instead of custom queries to select sagas for recovery.

🐛 Fixed: Saga Context Serialization and Recovery

27 Jan 15:58
69e0c9b

Choose a tag to compare

Bug Fixes

Saga Context Serialization and Recovery

Fixed context not being saved to storage after saga completion

  • Issue: When a saga completed successfully, the context was not being persisted to storage before marking the saga as COMPLETED. This could lead to data loss if the context was modified during the last step execution.
  • Fix: Added final context update before setting saga status to COMPLETED in SagaTransaction.__aiter__().
  • Impact: Saga context is now properly persisted to storage after all steps complete successfully, ensuring data consistency.

Improved field filtering in SagaContext.from_dict()

  • Issue: When deserializing context from dictionary, unknown fields in the input data could cause issues or unexpected behavior.
  • Fix: Added field filtering in SagaContext.from_dict() to only include known dataclass fields before deserialization.
  • Impact: More robust context reconstruction that ignores unknown fields, preventing potential errors during saga recovery.

Test Improvements

Fixed test expectation for context reconstruction failures

  • Issue: Test test_recover_saga_raises_on_context_reconstruction_failure was expecting TypeError when required fields were missing, but dataclass_wizard actually raises MissingFields exception.
  • Fix: Updated test to expect the correct exception type (MissingFields from dataclass_wizard.errors).
  • Impact: Tests now correctly validate error handling during context reconstruction.

Changes

cqrs.saga.saga.SagaTransaction

  • Added final context update before marking saga as COMPLETED to ensure context persistence.

cqrs.saga.models.SagaContext

  • Enhanced from_dict() method to filter input data to only include known dataclass fields, improving robustness during deserialization.

Technical Details

Context Serialization

  • SagaContext.to_dict() uses dataclass_wizard.asdict() which converts field names to camelCase (e.g., order_idorderId).
  • SagaContext.from_dict() now filters input data to only include fields that exist in the dataclass definition before deserialization.

Saga Completion Flow

The saga completion process now follows this sequence:

  1. Execute all saga steps
  2. Update context in storage (final update)
  3. Mark saga as COMPLETED in storage

This ensures that any context modifications made during the last step are properly persisted.

Testing

All existing tests pass, including:

  • test_recover_saga_raises_on_context_reconstruction_failure - now correctly expects MissingFields exception
  • test_recover_saga_updates_context_during_recovery - validates context persistence after recovery
  • All other saga recovery and execution tests (85 tests total)

Migration Notes

No breaking changes. This is a bug fix release that improves data consistency and error handling.

If you're using SagaContext.from_dict() with data containing unknown fields, those fields will now be automatically filtered out. This should not affect normal usage but may change behavior if you were relying on unknown fields being passed through.

Support for customizing saga storage table names through environment variables

27 Jan 15:24
88a410d

Choose a tag to compare

Support for customizing saga storage table names through environment variables

Summary

This release adds support for customizing saga storage table names through environment variables, providing better flexibility for multi-tenant deployments and service-specific naming conventions.

Added

Saga Storage Configuration

  • Environment Variable Support for Saga Table Names: Added support for customizing saga execution and log table names via environment variables, similar to the existing outbox table configuration.

    • CQRS_SAGA_EXECUTION_TABLE_NAME: Environment variable to customize the saga execution table name (default: "saga_executions")
    • CQRS_SAGA_LOG_TABLE_NAME: Environment variable to customize the saga log table name (default: "saga_logs")

    This enhancement allows different services or deployments to use service-specific table names (e.g., service_manager_saga_executions, payment_service_saga_executions) while maintaining the same codebase.

    Usage:

    # In .env file or environment variables
    export CQRS_SAGA_EXECUTION_TABLE_NAME=service_manager_saga_executions
    export CQRS_SAGA_LOG_TABLE_NAME=service_manager_saga_logs

    The table names are automatically loaded from environment variables when the cqrs.saga.storage.sqlalchemy module is imported, using python-dotenv for .env file support.

Technical Details

Changes in cqrs.saga.storage.sqlalchemy

  • Added os and dotenv imports
  • Added dotenv.load_dotenv() call to load environment variables from .env files
  • Modified DEFAULT_SAGA_EXECUTION_TABLE_NAME and DEFAULT_SAGA_LOG_TABLE_NAME to be read from environment variables with fallback to default values
  • Updated SagaExecutionModel and SagaLogModel to use the configurable table names
  • Updated ForeignKey reference in SagaLogModel to use the configurable execution table name

Backward Compatibility

  • Fully backward compatible: If environment variables are not set, the default table names (saga_executions and saga_logs) are used
  • No breaking changes: Existing code will continue to work without modifications
  • Optional feature: The feature is opt-in via environment variables

Migration Guide

For Existing Users

No migration required. The feature is backward compatible and uses default table names if environment variables are not set.

For New Users or Custom Table Names

  1. Set environment variables in your .env file or deployment configuration:

    CQRS_SAGA_EXECUTION_TABLE_NAME=your_custom_saga_executions
    CQRS_SAGA_LOG_TABLE_NAME=your_custom_saga_logs
  2. Ensure your database migration creates tables with the custom names:

    CREATE TABLE IF NOT EXISTS `your_custom_saga_executions` (
        -- table definition
    );
    
    CREATE TABLE IF NOT EXISTS `your_custom_saga_logs` (
        -- table definition
    );
  3. The SqlAlchemySagaStorage will automatically use the custom table names when initialized.

Examples

Example: Service-Specific Table Names

# .env file
CQRS_SAGA_EXECUTION_TABLE_NAME=service_manager_saga_executions
CQRS_SAGA_LOG_TABLE_NAME=service_manager_saga_logs

# Python code (no changes needed)
from cqrs.saga.storage.sqlalchemy import SqlAlchemySagaStorage

storage = SqlAlchemySagaStorage(session_factory)
# Automatically uses service_manager_saga_executions and service_manager_saga_logs

Related Issues

  • Enables service-specific saga table naming for multi-service deployments
  • Provides consistency with existing outbox table configuration pattern
  • Improves flexibility for database schema customization