Programmatic API¶
This page documents the Python API for creating, submitting, and monitoring jobs from code (without using the CLI).
Installation¶
pip install hpc-runner
Quick start¶
from hpc_runner import Job
job = Job(command="python my_script.py", cpu=4, mem="8G", time="1:00:00")
result = job.submit() # auto-detect scheduler
final = result.wait() # block until complete
print(final, result.returncode)
Creating jobs¶
Job is the core unit of work. It’s a scheduler-agnostic data container; a
scheduler implementation is responsible for translating fields into submission
flags/directives.
from hpc_runner import Job
job = Job(
command="python train.py",
name="training_job",
cpu=4,
mem="16G",
time="4:00:00",
queue="gpu.q",
modules=["python/3.11", "cuda/12.0"],
workdir="/path/to/project",
stdout="train.out",
stderr=None, # None means “merge stderr into stdout” for most schedulers
)
Submitting jobs¶
Submit to an auto-detected scheduler:
result = job.submit()
Or explicitly select a scheduler:
from hpc_runner import get_scheduler
scheduler = get_scheduler("sge")
result = scheduler.submit(job)
Monitoring jobs¶
Job.submit() returns a JobResult which can be polled or waited on.
from hpc_runner import Job, JobStatus
result = Job("python train.py").submit()
while not result.is_complete:
print(result.job_id, result.status)
if result.status == JobStatus.COMPLETED:
print("ok:", result.read_stdout(tail=20))
else:
print("failed:", result.read_stderr(tail=50))
Cancel a job:
result.cancel()
Configuration-driven jobs¶
Job() is config-aware by default. It auto-consults the TOML config
hierarchy, merging [defaults] with a matched [tools.<name>] or
[types.<name>] section, then applies any explicit keyword arguments you
pass. The tool name is auto-detected from the command (first word, path
stripped). Use the job_type keyword to look up a [types.*] entry
instead (this skips tool auto-detection).
If the tool has [tools.<name>.options] entries defined in the config, the
full command string (not just the first word) is used to match
argument-specific overrides. See Tool option specialisation for the
matching rules.
from hpc_runner import Job, reload_config
# Explicitly load a config file for this process (optional)
reload_config("./hpc-runner.toml")
# Auto-detects "python" from command → looks up [tools.python]
job = Job("python train.py", cpu=8)
# Look up [types.gpu], merge with [defaults], override cpu
job = Job("python train.py", job_type="gpu", cpu=8)
# No matching tool — just [defaults]
job = Job("echo hello")
# With [tools.fusesoc.options."--tool slang"] in config, the command
# arguments trigger option-specific overrides automatically:
job = Job("fusesoc run --tool slang core:v:n")
result = job.submit()
Job dependencies¶
At the low level, you can chain jobs by attaching a dependency to a new job:
from hpc_runner import Job
r1 = Job("python preprocess.py", name="pre").submit()
j2 = Job("python train.py", name="train")
j2.dependencies = [r1] # programmatic dependency
j2.dependency_type = "afterok"
r2 = j2.submit()
Pipelines (multi-step workflows)¶
For larger workflows, use Pipeline to define steps and dependencies by
name. When used as a context manager, the pipeline auto-submits on exit and
results are available via the results property.
from hpc_runner import Pipeline
with Pipeline("ml") as p:
p.add("python preprocess.py", name="preprocess", cpu=8, mem="32G")
p.add("python train.py", name="train", depends_on=["preprocess"])
p.add("python evaluate.py", name="evaluate", depends_on=["train"])
# Auto-submitted when the with-block exits cleanly.
# Results are accessible on the pipeline object.
for name, result in p.results.items():
print(name, result.job_id, result.status)
p.wait()
Without a context manager, call submit() explicitly:
p = Pipeline("ml")
p.add("make build", name="build")
p.add("make test", name="test", depends_on=["build"])
results = p.submit() # must call manually
p.wait()
Config-aware pipeline jobs¶
Pipeline.add() creates jobs through Job(), so every step picks up
[defaults] automatically and the tool is auto-detected from the command.
Use the job_type keyword to pull in [types.*] config instead:
with Pipeline("ml") as p:
# Auto-detects "python" → picks up [tools.python] config
p.add("python preprocess.py", name="preprocess")
# Picks up [types.gpu] config (queue, resources, etc.)
p.add("python train.py", name="train",
depends_on=["preprocess"], job_type="gpu")
# Only [defaults] — no matching tool or type
p.add("echo done", name="notify", depends_on=["train"])
Keyword arguments override whatever comes from config:
# [types.gpu] sets cpu=8, but we want 16 for this step
p.add("python big_train.py", name="train", job_type="gpu", cpu=16)
Per-job dependency types¶
By default every dependency uses AFTEROK (run only if all parents
succeed). You can set a different dependency type per step:
from hpc_runner import DependencyType, Pipeline
with Pipeline("robust") as p:
p.add("python train.py", name="train")
# Only runs if train succeeds
p.add("python evaluate.py", name="evaluate",
depends_on=["train"],
dependency_type=DependencyType.AFTEROK)
# Runs regardless of success/failure (cleanup, notifications, etc.)
p.add("python notify.py", name="notify",
depends_on=["train"],
dependency_type=DependencyType.AFTERANY)
Available types: AFTEROK, AFTERANY, AFTER, AFTERNOTOK.
Choosing a scheduler¶
By default the scheduler is auto-detected at submit time. You can pin it at construction:
from hpc_runner import Pipeline, get_scheduler
sge = get_scheduler("sge")
with Pipeline("build", scheduler=sge) as p:
p.add("make build", name="build")
Or pass it to submit() directly:
p = Pipeline("build")
p.add("make build", name="build")
p.submit(scheduler=sge)
Handling submission failures¶
If a scheduler error interrupts submission partway through, the successfully
submitted jobs are preserved. Call submit() again to retry only the
remaining jobs:
p = Pipeline("etl")
p.add("python extract.py", name="extract")
p.add("python transform.py", name="transform", depends_on=["extract"])
p.add("python load.py", name="load", depends_on=["transform"])
try:
p.submit()
except RuntimeError:
# extract submitted, transform failed — fix the issue, then:
p.submit() # skips extract, retries transform and load
Array jobs¶
Use JobArray when you want a scheduler array job (SGE: qsub -t):
from hpc_runner import Job, JobArray
base = Job("python work.py", name="work", cpu=2, time="0:30:00")
array = JobArray(job=base, start=1, end=100, max_concurrent=10)
array_result = array.submit()
statuses = array_result.wait()
print("completed:", sum(1 for s in statuses.values() if s.name == "COMPLETED"))