agi_cluster package
Cluster workplan utilities for distributing AGILab workloads.
- class agi_cluster.agi_distributor.agi_distributor.AGI(target, verbose=1)[source]
Bases:
objectCoordinate installation, scheduling, and execution of AGILab workloads.
- CYTHON_MODE = 2
- DASK_MODE = 4
- PYTHON_MODE = 1
- RAPIDS_MODE = 16
- __init__(target, verbose=1)[source]
Initialize a Agi object with a target and verbosity level.
- Parameters:
target (str) – The target for the env object.
verbose (int) – Verbosity level (0-3).
- Returns:
None
- Raises:
None –
-
debug:
Optional[bool] = None
- async static distribute(env, scheduler=None, workers=None, verbose=0, **args)[source]
- Return type:
Any
- async static exec_ssh_async(ip, cmd)[source]
Execute a remote command via SSH and return the last line of its stdout output.
- Return type:
str
- static get_default_local_ip()[source]
Get the default local IP address of the machine.
- Returns:
The default local IP address.
- Return type:
str
- Raises:
Exception – If unable to determine the local IP address.
- async static get_distrib(env, scheduler=None, workers=None, verbose=0, **args)[source]
check the distribution with a dry run :param package: :type package: any Agi target apps or project created by AGILAB :param list_ip: :type list_ip: any ip V4 with ssh access and python (upto you to link it to python3) with psutil and uv synced :type verbose:
int:param verbose: :type verbose: verbosity [0-3] :param Returns: :param the distribution tree: :param ——-:- Return type:
Any
- async static install(env, scheduler=None, workers=None, modes_enabled=15, verbose=None, **args)[source]
Update the cluster’s virtual environment.
- Parameters:
project_path (Path) – The name of the module to install or the path to the module.
list_ip (List[str], optional) – A list of IPv4 addresses with SSH access. Each IP should have Python, psutil, and pdm installed. Defaults to None.
modes_enabled (int, optional) – Bitmask indicating enabled modes. Defaults to 0b0111.
verbose (int, optional) – Verbosity level (0-3). Higher numbers increase the verbosity of the output. Defaults to 1.
**args (
Any) – Additional keyword arguments.
- Returns:
True if the installation was successful, False otherwise.
- Return type:
bool
- Raises:
ValueError – If module_name_or_path is invalid.
ConnectionError –
- async static run(env, scheduler=None, workers=None, verbose=0, mode=None, rapids_enabled=False, **args)[source]
Compiles the target module in Cython and runs it on the cluster.
- Parameters:
target (str) – The target Python module to run.
scheduler (str, optional) – IP and port address of the Dask scheduler. Defaults to ‘127.0.0.1:8786’.
workers (dict, optional) – Dictionary of worker IPs and their counts. Defaults to workers_default.
verbose (int, optional) – Verbosity level. Defaults to 0.
mode (int | list[int] | str | None, optional) – Mode(s) for execution. Defaults to None. When an int is provided, it is treated as a 4-bit mask controlling RAPIDS/Dask/Cython/Pool features. When a string is provided, it must match r”^[dcrp]+$” (letters enable features). When a list is provided, the modes are benchmarked sequentially.
rapids_enabled (bool, optional) – Flag to enable RAPIDS. Defaults to False.
**args (Any) – Additional keyword arguments.
- Returns:
Result of the execution.
- Return type:
Any
- Raises:
ValueError – If mode is invalid.
RuntimeError – If the target module fails to load.
- async static serve(env, scheduler=None, workers=None, verbose=0, mode=None, rapids_enabled=False, action='start', poll_interval=None, shutdown_on_stop=True, stop_timeout=30.0, **args)[source]
Manage persistent worker services without invoking the run workplan.
action="start"provisions workers and submitsBaseWorker.loopas a long-lived service task pinned to each Dask worker.action="stop"signals the loop throughBaseWorker.breakand optionally tears down the Dask cluster whenshutdown_on_stopis true.- Return type:
Dict[str,Any]
- async static update(env=None, scheduler=None, workers=None, modes_enabled=15, verbose=None, **args)[source]
install cluster virtual environment :param package: :type package: any Agi target apps or project created with AGILAB :param list_ip: :type list_ip: any ip V4 with ssh access and python (upto you to link it to python3) with psutil and uv synced :param mode_enabled: :type mode_enabled: this is typically a mode mask to know for example if cython or rapids are required :param force_update: :type force_update: make a Spud.update before the installation, default is True :type verbose:
Optional[int] :param verbose: :type verbose: verbosity [0-3]- Return type:
None
-
verbose:
Optional[int] = None