agi-distributor API
agi_distributor packages applications, ships the required artefacts to the
selected workers, and runs the chosen execution mode across the cluster.
Understanding modes_enabled and mode
The first thing to know before reading the examples is that AGILAB encodes execution toggles as a small bitmask.
Parameter |
Used by |
Meaning |
|---|---|---|
|
|
Bitmask of the execution capabilities that should be prepared during install. |
|
|
One concrete execution mode selected for the current run. |
The current bit values are:
Toggle |
Bit value |
Meaning |
|---|---|---|
|
|
Multiprocessing / worker-pool path. |
|
|
Compiled worker path when available. |
|
|
Distributed Dask scheduler / remote worker path. |
|
|
RAPIDS / GPU path when supported. |
Examples:
13 = 4 + 8 + 1 = cluster + rapids + pool15 = 4 + 8 + 2 + 1 = cluster + rapids + cython + pool
This is why the install examples below use modes_enabled=13 or
modes_enabled=15, while the run examples use matching mode=13 or
mode=15 values.
In normal usage, these values are generated from the UI toggles rather than typed manually.
Usage Example
Installation
Install the mycode example:
import asyncio
from pathlib import Path
from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv
AGILAB_PATH = Path((Path.home() / ".local/share/agilab/.agilab-path").read_text().strip())
APPS_PATH = AGILAB_PATH / "apps"
APP = "mycode_project"
async def main():
app_env = AgiEnv(apps_path=APPS_PATH, app=APP, verbose=1)
res = await AGI.install(
app_env,
modes_enabled=13, # cluster + pool + rapids
scheduler="127.0.0.1",
workers={"127.0.0.1": 1},
)
print(res)
return res
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
Install the flight example:
import asyncio
from pathlib import Path
from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv
AGILAB_PATH = Path((Path.home() / ".local/share/agilab/.agilab-path").read_text().strip())
APPS_PATH = AGILAB_PATH / "apps"
APP = "flight_project"
async def main():
app_env = AgiEnv(apps_path=APPS_PATH, app=APP, verbose=1)
res = await AGI.install(
app_env,
modes_enabled=15, # cluster + pool + cython + rapids
scheduler="127.0.0.1",
workers={"127.0.0.1": 1},
)
print(res)
return res
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
Distribute
Create the distribution bundle that will be sent to the workers:
import asyncio
from pathlib import Path
from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv
AGILAB_PATH = Path((Path.home() / ".local/share/agilab/.agilab-path").read_text().strip())
APPS_PATH = AGILAB_PATH / "apps"
APP = "mycode_project"
async def main():
app_env = AgiEnv(apps_path=APPS_PATH, app=APP, verbose=1)
res = await AGI.get_distrib(
app_env,
scheduler="127.0.0.1",
workers={"127.0.0.1": 1},
data_in="mycode/dataset",
data_out="mycode/dataframe",
files="*",
nfile=1,
)
print(res)
return res
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
Equivalent example for flight:
import asyncio
from pathlib import Path
from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv
AGILAB_PATH = Path((Path.home() / ".local/share/agilab/.agilab-path").read_text().strip())
APPS_PATH = AGILAB_PATH / "apps"
APP = "flight_project"
async def main():
app_env = AgiEnv(apps_path=APPS_PATH, app=APP, verbose=1)
res = await AGI.get_distrib(
app_env,
scheduler="127.0.0.1",
workers={"127.0.0.1": 1},
data_source="file",
data_in="flight/dataset",
data_out="flight/dataframe",
files="*",
nfile=1,
)
print(res)
return res
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
Run
Start with the simplest mental model first:
res = await AGI.run(
app_env,
mode=0, # plain local Python execution
)
Then move to the generated examples that add install-time capabilities, distributed hosts, and app-specific arguments:
import asyncio
from pathlib import Path
from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv
AGILAB_PATH = Path((Path.home() / ".local/share/agilab/.agilab-path").read_text().strip())
APPS_PATH = AGILAB_PATH / "apps"
APP = "mycode_project"
async def main():
app_env = AgiEnv(apps_path=APPS_PATH, app=APP, verbose=1)
res = await AGI.run(
app_env,
mode=13, # cluster + pool + rapids
scheduler="127.0.0.1",
workers={"127.0.0.1": 1},
data_in="mycode/dataset",
data_out="mycode/dataframe",
files="*",
nfile=1,
)
print(res)
return res
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
Equivalent example for flight:
import asyncio
from pathlib import Path
from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv
AGILAB_PATH = Path((Path.home() / ".local/share/agilab/.agilab-path").read_text().strip())
APPS_PATH = AGILAB_PATH / "apps"
APP = "flight_project"
async def main():
app_env = AgiEnv(apps_path=APPS_PATH, app=APP, verbose=1)
res = await AGI.run(
app_env,
mode=15, # cluster + pool + cython + rapids
scheduler="127.0.0.1",
workers={"127.0.0.1": 1},
data_source="file",
data_in="flight/dataset",
data_out="flight/dataframe",
files="*",
nfile=1,
)
print(res)
return res
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
Reference
Cluster workplan utilities for distributing AGILab workloads.
- class agi_cluster.agi_distributor.agi_distributor.AGI(target, verbose=1)[source]
Bases:
objectCoordinate installation, scheduling, and execution of AGILab workloads.
- CYTHON_MODE = 2
- DASK_MODE = 4
- PYTHON_MODE = 1
- RAPIDS_MODE = 16
- __init__(target, verbose=1)[source]
Initialize a Agi object with a target and verbosity level.
- Parameters:
target (
str) – The target for the env object.verbose (
int) – Verbosity level (0-3).
- Returns:
None
- Raises:
None –
- debug = None
- async static distribute(env, scheduler=None, workers=None, verbose=0, **args)[source]
- Return type:
Any
- env = None
- async static get_distrib(env, scheduler=None, workers=None, verbose=0, **args)[source]
- Return type:
Any
- async static install(env, scheduler=None, workers=None, workers_data_path=None, modes_enabled=15, verbose=None, **args)[source]
- Return type:
None
- async static run(env, scheduler=None, workers=None, workers_data_path=None, verbose=0, mode=None, rapids_enabled=False, **args)[source]
Compiles the target module in Cython and runs it on the cluster.
- Parameters:
target (str) – The target Python module to run.
scheduler (
Optional[str]) – IP and port address of the Dask scheduler. Defaults to ‘127.0.0.1:8786’.workers (
Optional[Dict[str,int]]) – Dictionary of worker IPs and their counts. Defaults to workers_default.verbose (
int) – Verbosity level. Defaults to 0.mode (
Union[int,List[int],str,None]) – Mode(s) for execution. Defaults to None. When an int is provided, it is treated as a 4-bit mask controlling RAPIDS/Dask/Cython/Pool features. When a string is provided, it must match r”^[dcrp]+$” (letters enable features). When a list is provided, the modes are benchmarked sequentially.rapids_enabled (
bool) – Flag to enable RAPIDS. Defaults to False.**args (
Any) – Additional keyword arguments.
- Returns:
Result of the execution.
- Return type:
Any- Raises:
ValueError – If mode is invalid.
RuntimeError – If the target module fails to load.
- async static serve(env, scheduler=None, workers=None, verbose=0, mode=None, rapids_enabled=False, action='start', poll_interval=None, shutdown_on_stop=True, stop_timeout=30.0, service_queue_dir=None, heartbeat_timeout=None, cleanup_done_ttl_sec=None, cleanup_failed_ttl_sec=None, cleanup_heartbeat_ttl_sec=None, cleanup_done_max_files=None, cleanup_failed_max_files=None, cleanup_heartbeat_max_files=None, health_output_path=None, **args)[source]
- Return type:
Dict[str,Any]
- async static submit(env=None, workers=None, work_plan=None, work_plan_metadata=None, task_id=None, task_name=None, **args)[source]
- Return type:
Dict[str,Any]
- async static update(env=None, scheduler=None, workers=None, modes_enabled=15, verbose=None, **args)[source]
- Return type:
None
- verbose = None