close
Skip to content

[Data] Dead Actor occupies actor pool slots which severely effects resource usage #62746

@zhangsikai123

Description

@zhangsikai123

What happened + What you expected to happen

What happened

Using ray.data.map_batches with compute=ActorPoolStrategy(size=1) and
DataContext.max_errored_blocks = -1, if the actor process exits mid-pipeline
(e.g. sys.exit(0) from inside the UDF), the pipeline silently hangs forever:

  • The dead actor is removed from the alive-actor scheduling heap, so no new
    tasks are dispatched to it.
  • Because max_errored_blocks = -1, the failed block is ignored instead of
    aborting the job.
  • The pool with min_size == max_size == 1 is not replaced: the dead actor
    entry stays in _running_actors, no new actor is created, and the input
    queue never drains.
  • Result: 0 schedulable actors, queued blocks stuck > 0, no exception raised,
    materialize() blocks indefinitely.

My job runs on large high-efficiency cluster with huge number of spot-instances. I neede to tolerate random node failure and maintain CPU/GPU util at the same time. Those "Zombie Actors" wasted my cores.

Versions

  • Ray: 2.55 (latest)
  • Python: 3.12.6
  • OS: Linux

Reproduction

from typing import Dict, List

import ray


class EchoPredictor:
    def __init__(self):
        self.counter = 0

    def __call__(self, batch: Dict[str, List[int]]) -> Dict[str, List[int]]:
        import time
        import sys
        time.sleep(0.01)
        batch["output"] = [x * 2 for x in batch["data"]]
        self.counter += 1
        if self.counter == 3:
            sys.exit(0)
        return batch

from ray.data import DataContext

ctx = DataContext.get_current()
ctx.max_errored_blocks = -1
ds = (
    ray.data.from_items([{"data": i} for i in range(3200)])
    .map_batches(
        EchoPredictor,
        batch_size=1,
        compute=ray.data.ActorPoolStrategy(size=1),
    )
).materialize()

Expected behavior

Dead actor (zombie actor) should be released from actor pool so that new healthy actor could be created.

Where the issue seems to originate

In python/ray/data/_internal/execution/operators/actor_pool_map_operator.py:

  • _ActorPool.refresh_actor_state correctly detects the actor as DEAD and
    _update_rank removes it from _alive_actors_to_in_flight_tasks_heap.
  • However, _release_running_actor is not called on DEAD; the entry
    stays in _running_actors so the pool's view of "running" is stale.
  • For a fixed-size pool (min_size == max_size), the autoscaler does not
    scale up to replace dead actors.
  • Combined with max_errored_blocks=-1 (which makes StreamingExecutor ignore
    block failures), this leaves the pipeline in an unrecoverable, silent state.

Are you willing to submit a PR?

  • [x ] Yes

Versions / Dependencies

  • Ray: 2.55 (latest)
  • Python: 3.12.6
  • OS: Linux

Issue Severity

Medium: It is a significant difficulty but I can work around it.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething that is supposed to be working; but isn'tcommunity-backlogdataRay Data-related issuesstabilitytriageNeeds triage (eg: priority, bug/not-bug, and owning component)

    Type

    No type

    Projects

    Status

    Todo

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions