agi-distributor API
agi_distributor packages applications, ships the required artefacts to the
selected workers, and runs the chosen execution mode across the cluster.
Understanding modes_enabled and mode
The first thing to know before reading the examples is that AGILAB encodes execution toggles as a small bitmask.
Parameter |
Used by |
Meaning |
|---|---|---|
|
|
Bitmask of the execution capabilities that should be prepared during install. |
|
|
One concrete execution mode selected for the current run. |
The current bit values are:
Toggle |
Bit value |
Meaning |
|---|---|---|
|
|
Worker-pool path; backend may be process- or thread-based. |
|
|
Compiled worker path when available. |
|
|
Distributed Dask scheduler / remote worker path. |
|
|
RAPIDS / GPU path when supported. |
Prefer the public constants instead of typing numeric bitmasks:
AGI.PYTHON_MODE | AGI.DASK_MODEfor the MyCode docs example.AGI.PYTHON_MODE | AGI.CYTHON_MODE | AGI.DASK_MODEfor the Flight docs example.
This is why the install examples below pass named constants through
modes_enabled=... and the run examples pass the same intent through a
RunRequest. Older snippets may contain raw integers, but public examples
should not teach those values directly.
In normal usage, these values are generated from the UI toggles rather than typed manually.
Usage Example
Installation
Install the mycode example:
import asyncio
from pathlib import Path
from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv
APP = "mycode_project"
NO_CYTHON_RUN_MODES = AGI.PYTHON_MODE | AGI.DASK_MODE
def agilab_apps_path() -> Path:
marker = Path.home() / ".local/share/agilab/.agilab-path"
if not marker.is_file():
raise SystemExit(
"AGILAB is not initialized. Run the AGILAB installer or "
"`agilab first-proof --json` before this example."
)
return Path(marker.read_text(encoding="utf-8").strip()) / "apps"
async def main():
app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
res = await AGI.install(
app_env,
modes_enabled=NO_CYTHON_RUN_MODES,
scheduler="127.0.0.1",
workers={"127.0.0.1": 1},
)
print(res)
return res
if __name__ == "__main__":
asyncio.run(main())
Install the flight example:
import asyncio
from pathlib import Path
from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv
APP = "flight_project"
LOCAL_RUN_MODES = AGI.PYTHON_MODE | AGI.CYTHON_MODE | AGI.DASK_MODE
def agilab_apps_path() -> Path:
marker = Path.home() / ".local/share/agilab/.agilab-path"
if not marker.is_file():
raise SystemExit(
"AGILAB is not initialized. Run the AGILAB installer or "
"`agilab first-proof --json` before this example."
)
return Path(marker.read_text(encoding="utf-8").strip()) / "apps"
async def main():
app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
res = await AGI.install(
app_env,
modes_enabled=LOCAL_RUN_MODES,
scheduler="127.0.0.1",
workers={"127.0.0.1": 1},
)
print(res)
return res
if __name__ == "__main__":
asyncio.run(main())
Distribute
Create the distribution bundle that will be sent to the workers:
import asyncio
from pathlib import Path
from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv
APP = "mycode_project"
def agilab_apps_path() -> Path:
marker = Path.home() / ".local/share/agilab/.agilab-path"
if not marker.is_file():
raise SystemExit(
"AGILAB is not initialized. Run the AGILAB installer or "
"`agilab first-proof --json` before this example."
)
return Path(marker.read_text(encoding="utf-8").strip()) / "apps"
async def main():
app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
res = await AGI.get_distrib(
app_env,
scheduler="127.0.0.1",
workers={"127.0.0.1": 1},
data_in="mycode/dataset",
data_out="mycode/dataframe",
files="*",
nfile=1,
)
print(res)
return res
if __name__ == "__main__":
asyncio.run(main())
Equivalent example for flight:
import asyncio
from pathlib import Path
from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv
APP = "flight_project"
def agilab_apps_path() -> Path:
marker = Path.home() / ".local/share/agilab/.agilab-path"
if not marker.is_file():
raise SystemExit(
"AGILAB is not initialized. Run the AGILAB installer or "
"`agilab first-proof --json` before this example."
)
return Path(marker.read_text(encoding="utf-8").strip()) / "apps"
async def main():
app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
res = await AGI.get_distrib(
app_env,
scheduler="127.0.0.1",
workers={"127.0.0.1": 1},
data_source="file",
data_in="flight/dataset",
data_out="flight/dataframe",
files="*",
nfile=1,
)
print(res)
return res
if __name__ == "__main__":
asyncio.run(main())
Run
Start with the simplest mental model first:
from agi_cluster.agi_distributor import AGI, RunRequest
request = RunRequest(mode=AGI.PYTHON_MODE)
res = await AGI.run(app_env, request=request)
Then move to the generated examples that add install-time capabilities, distributed hosts, and app-specific arguments:
import asyncio
from pathlib import Path
from agi_cluster.agi_distributor import AGI, RunRequest
from agi_env import AgiEnv
APP = "mycode_project"
NO_CYTHON_RUN_MODES = AGI.PYTHON_MODE | AGI.DASK_MODE
def agilab_apps_path() -> Path:
marker = Path.home() / ".local/share/agilab/.agilab-path"
if not marker.is_file():
raise SystemExit(
"AGILAB is not initialized. Run the AGILAB installer or "
"`agilab first-proof --json` before this example."
)
return Path(marker.read_text(encoding="utf-8").strip()) / "apps"
async def main():
app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
request = RunRequest(
mode=NO_CYTHON_RUN_MODES,
scheduler="127.0.0.1",
workers={"127.0.0.1": 1},
)
res = await AGI.run(app_env, request=request)
print(res)
return res
if __name__ == "__main__":
asyncio.run(main())
Equivalent example for flight:
import asyncio
from pathlib import Path
from agi_cluster.agi_distributor import AGI, RunRequest
from agi_env import AgiEnv
APP = "flight_project"
LOCAL_RUN_MODES = AGI.PYTHON_MODE | AGI.CYTHON_MODE | AGI.DASK_MODE
def agilab_apps_path() -> Path:
marker = Path.home() / ".local/share/agilab/.agilab-path"
if not marker.is_file():
raise SystemExit(
"AGILAB is not initialized. Run the AGILAB installer or "
"`agilab first-proof --json` before this example."
)
return Path(marker.read_text(encoding="utf-8").strip()) / "apps"
async def main():
app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
request = RunRequest(
params={
"data_source": "file",
"files": "*",
"nfile": 1,
},
data_in="flight/dataset",
data_out="flight/dataframe",
mode=LOCAL_RUN_MODES,
scheduler="127.0.0.1",
workers={"127.0.0.1": 1},
)
res = await AGI.run(app_env, request=request)
print(res)
return res
if __name__ == "__main__":
asyncio.run(main())
Reference
Cluster workplan utilities for distributing AGILab workloads.
- class agi_cluster.agi_distributor.agi_distributor.AGI(target, verbose=1)[source]
Bases:
objectCoordinate installation, scheduling, and execution of AGILab workloads.
- CYTHON_MODE = 2
- DASK_MODE = 4
- PYTHON_MODE = 1
- RAPIDS_MODE = 16
- __init__(target, verbose=1)[source]
Initialize a Agi object with a target and verbosity level.
- Parameters:
target (
str) – The target for the env object.verbose (
int) – Verbosity level (0-3).
- Returns:
None
- Raises:
None –
- debug = None
- async static distribute(env, scheduler=None, workers=None, verbose=0, **args)[source]
- Return type:
Any
- env = None
- async static get_distrib(env, scheduler=None, workers=None, verbose=0, **args)[source]
- Return type:
Any
- async static install(env, scheduler=None, workers=None, workers_data_path=None, modes_enabled=15, verbose=None, **args)[source]
- Return type:
None
- async static run(env, request)[source]
Compiles the target module in Cython and runs it on the cluster.
- Parameters:
env (
AgiEnv) – AGILAB environment to execute.request (
RunRequest) – Typed execution request. App params and workflow stages are kept separate.
- Returns:
Result of the execution.
- Return type:
Any- Raises:
ValueError – If mode is invalid.
RuntimeError – If the target module fails to load.
- async static serve(env, scheduler=None, workers=None, verbose=0, mode=None, rapids_enabled=False, action='start', poll_interval=None, shutdown_on_stop=True, stop_timeout=30.0, service_queue_dir=None, heartbeat_timeout=None, cleanup_done_ttl_sec=None, cleanup_failed_ttl_sec=None, cleanup_heartbeat_ttl_sec=None, cleanup_done_max_files=None, cleanup_failed_max_files=None, cleanup_heartbeat_max_files=None, health_output_path=None, **args)[source]
- Return type:
Dict[str,Any]
- async static submit(env=None, workers=None, work_plan=None, work_plan_metadata=None, task_id=None, task_name=None, **args)[source]
- Return type:
Dict[str,Any]
- async static update(env=None, scheduler=None, workers=None, modes_enabled=15, verbose=None, **args)[source]
- Return type:
None
- verbose = None