Distributed Computing with Ray
Why this matters
A training job that takes 8 hours on one machine can take 30 minutes with 16 parallel workers — if the job is structured for distribution from the start. The trap is writing code that only runs in-process and then trying to retrofit parallelism later. Ray makes this tractable by providing tasks (stateless functions) and actors (stateful workers) as first-class primitives.
The key discipline: structure pipelines so the same code can run in-process, as a subprocess, or as a Ray remote task — and produce byte-identical output in all three modes. This makes distribution something you switch on, not something you rewrite for.
Install
uv add 'ray[default]>=2.30'Install the [default] extra (dashboard + autoscaler), not the bare ray wheel.
Ray Core Concepts
See the Ray core concepts docs for the full tasks/actors/object-store API. Here's what matters for our stack:
Apple Silicon + uv hang: always set these env vars before import ray, or the driver hangs on Ray ≥ 2.55 when invoked via uv run:
import os
os.environ.setdefault("RAY_ENABLE_UV_RUN_RUNTIME_ENV", "0")
os.environ.setdefault("RAY_USAGE_STATS_ENABLED", "0")
import rayPlacement trick — num_gpus=0.01: use a fractional GPU hint to force a task onto a GPU node without consuming the GPU. This is the pattern for sync/upload tasks that need to run co-located with GPU workers (e.g. sync_to_s3 after a training task):
@ray.remote(num_gpus=0.01)
def sync_to_s3(out_dir: str, s3_uri: str) -> str:
import subprocess
subprocess.run(["aws", "s3", "sync", out_dir, s3_uri], check=True)
return s3_uriObject store: pass ray.put(big_array) and give tasks the reference rather than the array. Each task reads from shared memory — no serialization per worker. Mandatory for large NumPy arrays distributed across many tasks.
Actors: method calls on a single actor are serialized automatically. Use actors for a model server, a connection pool, or a metric accumulator — no locking needed.
Shipping Code — runtime_env.working_dir
Workers in a cluster don't have your local code. working_dir packages a directory of source files and ships it to every worker. This lets you edit libs/*/src locally and have remote workers pick up the changes immediately, without rebuilding their Docker image.
The cap is 100 MB — stage a trimmed tree, not the repo root:
import shutil, tempfile
from pathlib import Path
REPO = Path(__file__).resolve().parent.parent
def stage_working_dir() -> Path:
staging = Path(tempfile.mkdtemp(prefix="ray-stage-"))
keep = ["pyproject.toml", "libs", "apps", "fixtures"]
for entry in keep:
src = REPO / entry
if not src.exists():
continue
dst = staging / entry
if src.is_dir():
shutil.copytree(src, dst, ignore=shutil.ignore_patterns(
"__pycache__", ".pytest_cache", ".ruff_cache",
".venv", "dist", "build", "*.egg-info", "out", "node_modules",
))
else:
shutil.copy2(src, dst)
return staging
ray.init(
address="ray://head:10001",
runtime_env={
"working_dir": str(stage_working_dir()),
"env_vars": {"MY_APP_BACKEND": "cuda"},
},
)On the worker, prepend each source root to sys.path so local source wins over baked-in wheels:
@ray.remote(num_gpus=1)
def run_on_worker(args: list[str]) -> int:
import glob, os, sys
cwd = os.getcwd() # the unpacked working_dir
for src in sorted(glob.glob(os.path.join(cwd, "libs", "*", "src"))):
sys.path.insert(0, src)
import pytest # heavy deps from worker venv
return int(pytest.main(args))Rebuild the worker image only when pyproject.toml changes. Pure source edits ship for free via working_dir.
Three-Executor Pattern (Local ↔ Distributed Parity)
This is the core architectural pattern for distributable pipelines. Structure every pipeline stage so it can run three ways — in-process (fast, debuggable), subprocess (parity gate), or Ray remote (parallel, distributed) — and all three produce byte-identical output files.
The subprocess executor is the cheap parity gate that proves a stage is distribution-safe before you add a cluster. If subprocess and in-process agree, Ray will too.
from __future__ import annotations
import importlib
import json
import subprocess
from pathlib import Path
from typing import Protocol, runtime_checkable
from my_schemas import JobSpec, RunStats
_STAGE_REGISTRY: dict[str, tuple[str, str]] = {}
def register_stage(name: str, module: str, attr: str = "run") -> None:
_STAGE_REGISTRY[name] = (module, attr)
def resolve_stage(name: str):
module_path, attr = _STAGE_REGISTRY[name]
return getattr(importlib.import_module(module_path), attr)
@runtime_checkable
class StageExecutor(Protocol):
def run_stage(self, name: str, inputs: dict[str, Path], output: Path,
job: JobSpec, num_gpus: float | None = None,
worker_type: str | None = None,
stage_kwargs: dict | None = None) -> RunStats: ...
class InProcessExecutor:
def run_stage(self, name, inputs, output, job, num_gpus=None,
worker_type=None, stage_kwargs=None) -> RunStats:
fn = resolve_stage(name)
return fn(inputs=inputs, output=output, job=job, **(stage_kwargs or {}))
class SubprocessExecutor:
def __init__(self, runner: list[str] | None = None) -> None:
self._runner = runner or ["uv", "run", "my-app-stage"]
def run_stage(self, name, inputs, output, job, num_gpus=None,
worker_type=None, stage_kwargs=None) -> RunStats:
payload = {
"stage": name,
"inputs": {k: str(v) for k, v in inputs.items()},
"output": str(output),
"job": json.loads(job.model_dump_json()),
"stage_kwargs": stage_kwargs or {},
}
proc = subprocess.run([*self._runner, name],
input=json.dumps(payload),
capture_output=True, text=True, check=False)
if proc.returncode != 0:
raise RuntimeError(
f"stage '{name}' failed (rc={proc.returncode})\n"
f"STDOUT:\n{proc.stdout}\nSTDERR:\n{proc.stderr}"
)
stats_line = next(
(ln.strip() for ln in reversed(proc.stdout.splitlines())
if ln.strip().startswith("{") and "wall_s" in ln),
None,
)
if stats_line is None:
raise RuntimeError(f"stage '{name}' emitted no RunStats JSON")
return RunStats.model_validate_json(stats_line)
class RayExecutor:
def __init__(self, num_workers=1, num_gpus_per_stage=0.0,
ray_address: str | None = None, runtime_env: dict | None = None) -> None:
self.num_workers = num_workers
self.num_gpus_per_stage = num_gpus_per_stage
self.ray_address = ray_address
self.runtime_env = runtime_env
self._initialised = False
def _ensure_ray(self) -> None:
if self._initialised:
return
import os, sys
os.environ.setdefault("RAY_ENABLE_UV_RUN_RUNTIME_ENV", "0")
os.environ.setdefault("RAY_USAGE_STATS_ENABLED", "0")
import ray
if not ray.is_initialized():
if self.ray_address:
ray.init(address=self.ray_address, ignore_reinit_error=True,
runtime_env=self.runtime_env or {})
else:
ray.init(ignore_reinit_error=True, log_to_driver=False,
include_dashboard=False, num_cpus=max(1, self.num_workers),
runtime_env={"py_executable": sys.executable})
self._initialised = True
def run_stage(self, name, inputs, output, job, num_gpus=None,
worker_type=None, stage_kwargs=None) -> RunStats:
self._ensure_ray()
import ray
module_path, attr = _STAGE_REGISTRY[name]
gpus = self.num_gpus_per_stage if num_gpus is None else num_gpus
# vm:<type> only works if the worker registered the resource at start
extra = {f"vm:{worker_type}": 0.01} if worker_type else None
@ray.remote(num_gpus=gpus, resources=extra)
def _run_remote(module_path, attr, inputs_s, output_s, job_json, kwargs_in) -> str:
import importlib
from pathlib import Path as _Path
from my_schemas import JobSpec as _JobSpec
fn = getattr(importlib.import_module(module_path), attr)
stats = fn(inputs={k: _Path(v) for k, v in inputs_s.items()},
output=_Path(output_s),
job=_JobSpec.model_validate_json(job_json),
**kwargs_in)
return stats.model_dump_json()
handle = _run_remote.remote(
module_path, attr,
{k: str(v) for k, v in inputs.items()},
str(output), job.model_dump_json(), stage_kwargs or {},
)
return RunStats.model_validate_json(ray.get(handle))
def make_executor(name: str, ray_address=None, runtime_env=None) -> StageExecutor:
if name == "inprocess": return InProcessExecutor()
if name == "subprocess": return SubprocessExecutor()
if name == "ray": return RayExecutor(ray_address=ray_address, runtime_env=runtime_env)
raise ValueError(f"unknown executor '{name}'")The remote function takes only JSON/strings and importlibs the stage by module path — it never closes over a function object or in-process state. That's the rule that makes Ray output byte-identical to in-process output.
The byte-identical gate (test)
import hashlib
from pathlib import Path
def _digest(p: Path) -> str:
return hashlib.sha256(p.read_bytes()).hexdigest()
def test_three_executors_byte_identical(tmp_path):
job = make_job()
inp = tmp_path / "in.txt"; inp.write_text("parity-payload")
outs = {n: tmp_path / f"out_{n}.json" for n in ("in", "sub", "ray")}
InProcessExecutor().run_stage("trivial", {"in": inp}, outs["in"], job)
SubprocessExecutor().run_stage("trivial", {"in": inp}, outs["sub"], job)
RayExecutor().run_stage("trivial", {"in": inp}, outs["ray"], job)
d = {k: _digest(v) for k, v in outs.items()}
assert d["in"] == d["sub"] == d["ray"], f"hash mismatch: {d}"AWS Cluster (Head + Spot GPU Workers)
Cluster shape:
- Head (
t3.medium/t3.xlarge, on-demand) — Ray head container with--autoscaling-config. No GPU. Ports 6379 (GCS) / 8265 (dashboard) / 10001 (Client). - Workers (
g4dn.*/g6.*spot, GPU) — not in Terraform state; Ray autoscaler launches on demand and terminates on idle. - Same container image on both (deps baked into
/workspace/.venv).
Autoscaler config (key non-obvious bits when head is created outside ray up):
provider:
type: aws
region: us-east-1
cache_stopped_nodes: false
disable_launch_config_check: true # head not made by `ray up`; skip the check
disable_node_updaters: true # workers self-bootstrap via EC2 UserData; no SSH
available_node_types:
ray.head.default:
resources: {CPU: 2}
max_workers: 0
ray.worker.g4dn_2xlarge:
min_workers: 0
max_workers: 4
# vm:<type> is a CUSTOM resource — task lands here ONLY if worker's
# `ray start --resources` registered it too.
resources: {CPU: 8, GPU: 1, "vm:g4dn.2xlarge": 1}
node_config:
InstanceType: g4dn.2xlarge
ImageId: <gpu-ami>
InstanceMarketOptions: {MarketType: spot, SpotOptions: {MaxPrice: "0.20"}}Worker UserData starts Ray and registers the custom resource (load-bearing):
docker run -d --name ray-worker --gpus all --network host --shm-size=2g <image> \
bash -c "ray start --address=$HEAD_IP:6379 \
--resources='{\"vm:g4dn.2xlarge\":1}' --block"Reach the head from your laptop
# SSM port-forward (no public SSH, no inbound 0.0.0.0/0)
aws ssm start-session --target <head_instance_id> --region us-east-1 \
--document-name AWS-StartPortForwardingSession \
--parameters '{"portNumber":["10001"],"localPortNumber":["10001"]}'
export RAY_ADDRESS_OVERRIDE=ray://127.0.0.1:10001Submit a job
import os
os.environ.setdefault("RAY_ENABLE_UV_RUN_RUNTIME_ENV", "0")
import ray
ray.init(
address=os.environ.get("RAY_ADDRESS_OVERRIDE", "ray://<head_private_ip>:10001"),
runtime_env={"working_dir": "<staged tree>",
"env_vars": {"MY_APP_BACKEND": "cuda"}},
)
@ray.remote(num_gpus=1)
def heavy(inp: str, out: str) -> str:
...
return out
ray.get(heavy.remote("in.bin", "out.bin"))Get artifacts back
@ray.remote(num_gpus=0.01)
def sync_to_s3(out_dir: str, s3_uri: str) -> str:
import subprocess
subprocess.run(["aws", "s3", "sync", "--no-progress", out_dir, s3_uri], check=True)
return s3_uri
ray.get(sync_to_s3.remote("/workspace/out", f"s3://{bucket}/run-123/"))aws s3 sync s3://<bucket>/run-123/ ./out/run-123/Cost Hygiene (Spot GPUs Bleed Money)
- Spot + price cap on workers (
MaxPrice: "0.20") - Autoscaler
idle_timeout_minutesreaps idle workers - Independent watchdog crons as backstops (worker shuts down on GPU < 5% for N min; head stops after N idle hours)
- Wrap each run in a try/finally cost log; tear down workers (
ray downor idle timeout) before destroying the head with Terraform
Common Pitfalls
runtime_env.pipfor transitive deps — Ray builds a fresh per-task virtualenv and reinstalls everything, timing out the Ray Client gRPC channel on cold workers (5–10 min). Bake heavy deps into the worker image; ship source viaworking_dir. Reservepipfor one or two tiny pure-Python packages.- Per-task venv bootstrap needs pip — if the worker base venv was created with
uv venv(no--seed), the bootstrap fails withNo module named pip. Alwaysuv venv --seed. - Apple Silicon driver hang — Ray >= 2.55 auto-detects
uv runon the driver and rewritespy_executable, which then hangs. SetRAY_ENABLE_UV_RUN_RUNTIME_ENV=0beforeimport ray. working_dircap = 100 MB — stage a trimmed tree (exclude.venv,.git,dist,build); pointing at the repo root will blow past it.vm:<type>custom resource must be registered by the worker'sray start --resources='{"vm:<type>":1}'— if missing, the autoscaler picks the right node but the raylet has no such resource and the task pends forever.- Absolute laptop paths don't exist on the worker — stage inputs into
working_dirand passPath(local_input.name)(basenames), let the worker resolve against the unpacked directory. env_varsis not a secret store — non-secret flags only; use instance roles for cloud credentials.