close
Skip to content

Handle cluster slot migration in ClusterPubSub (shard subscription reconciliation)#4044

Open
petyaslavova wants to merge 3 commits intomasterfrom
ps_fox_no_slot_migration_handling
Open

Handle cluster slot migration in ClusterPubSub (shard subscription reconciliation)#4044
petyaslavova wants to merge 3 commits intomasterfrom
ps_fox_no_slot_migration_handling

Conversation

@petyaslavova
Copy link
Copy Markdown
Collaborator

@petyaslavova petyaslavova commented Apr 22, 2026

Summary

Adds shard-PubSub reconciliation on cluster topology changes. Before this change, when a slot holding an active SSUBSCRIBE migrated to a different node (CLUSTER SETSLOT, failover, or rebalance), the client kept reading from the old owner and silently missed messages — the reconciliation hook simply did not exist. This PR wires an observer pattern between NodesManager and ClusterPubSub so that every slots-cache update triggers a reconciliation pass: for each tracked shard channel we re-resolve the owning node and, if it has moved, perform SUNSUBSCRIBE on the old per-node pubsub and SSUBSCRIBE on the new one, preserving any registered handler. Implemented with strict sync/async parity in redis/cluster.py and redis/asyncio/cluster.py.

Design choices

  • Reverse index (_shard_channel_to_node) records the node that actually holds each subscription, so SUNSUBSCRIBE routes to the holding node even after the cluster's slot map has moved on — cluster.get_node_from_key by then points to the new owner and would miss the live subscription.
  • Handling for transiently uncovered slots: during migration a slot may briefly be uncovered. The reconciler catches SlotNotCoveredError per channel, continues with coverable siblings, then raises a single summary SlotNotCoveredError at the end so callers know the pass was incomplete. The next slots-cache change triggers a retry.
  • Thread-safety: the sync NodesManager observer registry is guarded by self._lock; callbacks are invoked outside the lock so observers can safely call back into the NodesManager. Async relies on the event loop's single-threaded guarantee.
  • Circular MOVED redirects in move_slot are treated as a no-op and skip observer notification, avoiding reconciliation storms.

Resource management

Two connection-release points ensure per-node pubsubs do not leak their dedicated connections when dropped from node_pubsub_mapping (PubSub.__del__ does not release connections back to the pool): (1) the migration-driven SUNSUBSCRIBE confirmation branch in get_sharded_message calls aclose() (async) / reset() (sync) on the per-node pubsub before the mapping drop; (2) the GC block at the end of reinitialize_shard_subscriptions does the same for any per-node pubsub emptied by the reconciliation pass. On the async side, _on_slots_changed schedules reconciliation as a fire-and-forget task with a done-callback (_log_reconcile_task_exception) that consumes the task's exception and routes it through logger.error, preventing "Task exception was never retrieved" warnings and giving the same observability as the sync path's _notify_pubsub_observers try/except.

Tests

New deterministic (mock-based) test classes TestClusterPubSubSlotMigration on both sync and async sides cover: channel moves to new owner, no-op when owner unchanged, tolerance to old-node disconnect, SUNSUBSCRIBE routing via the reverse index after migration, handler-override semantics on lazy reroute, aclose teardown (including cancellation of in-flight reconcile tasks, async), per-node pubsub drop + connection release on migration-driven SUNSUBSCRIBE, partial-progress semantics when one slot is uncovered, and (async) consumption+logging of the reconcile task's exception. NodesManager observer register/notify/failover/circular-redirect paths have four new mock-only tests each side.
All new tests are marked @pytest.mark.fixed_client and run outside a live cluster; class-level @pytest.mark.onlycluster on TestNodesManager was refactored to test-level so the existing live-cluster coverage is unchanged.


Note

Medium Risk
Changes cluster PubSub behavior during slot migration/failover by automatically moving shard subscriptions between nodes and cleaning up per-node pubsub connections; mistakes could cause missed messages or connection churn under topology flaps.

Overview
Adds a slots-cache observer mechanism in NodesManager (sync + asyncio) and wires ClusterPubSub into it so slot-map refreshes and real MOVED-driven slot changes trigger shard-subscription reconciliation.

ClusterPubSub now tracks a reverse index (_shard_channel_to_node) to route SUNSUBSCRIBE to the node that actually holds the subscription, migrates shard channels to the current slot owner (preserving handlers, handling transient SlotNotCoveredError), and aggressively closes/drops empty per-node pubsubs to avoid leaked dedicated connections. Async reconciliation runs as a background task with exception consumption/logging, and aclose() now cancels in-flight reconcile tasks and clears the reverse index.

Adds deterministic unit tests for observer registration/notification and shard-subscription migration behavior (sync + async), and adjusts some cluster tests’ markers to run under the fixed mocked client.

Reviewed by Cursor Bugbot for commit 160544f. Bugbot is set up for automated code reviews on this repo. Configure here.

@jit-ci
Copy link
Copy Markdown

jit-ci Bot commented Apr 22, 2026

🛡️ Jit Security Scan Results

CRITICAL HIGH MEDIUM

✅ No security findings were detected in this PR


Security scan by Jit

Comment thread redis/cluster.py
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit b18552b. Configure here.

Comment thread redis/cluster.py Outdated
Comment thread redis/asyncio/cluster.py
@petyaslavova petyaslavova added techdebt Things that can be improved or refactored maintenance Maintenance (CI, Releases, etc) labels Apr 23, 2026
Comment thread redis/asyncio/cluster.py
if not self.shard_channels:
return
try:
loop = asyncio.get_running_loop()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this method is not async? I believe it's possible to convert it to async and do not interact with loop itself

Comment thread redis/asyncio/cluster.py
# Notify observers so shard-pubsub reconciliation can run; skipped on
# the no-op branch to avoid needless walks under MOVED storms.
if node_changed:
self._notify_pubsub_observers()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should leverage EventDispatcher for the events handling, for the alignment

Comment thread redis/asyncio/cluster.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

maintenance Maintenance (CI, Releases, etc) techdebt Things that can be improved or refactored

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants