agi_cluster package

Cluster workplan utilities for distributing AGILab workloads.

class agi_cluster.agi_distributor.agi_distributor.AGI(target, verbose=1)[source]

Bases: object

Coordinate installation, scheduling, and execution of AGILab workloads.

CYTHON_MODE = 2
DASK_MODE = 4
PYTHON_MODE = 1
RAPIDS_MODE = 16
__init__(target, verbose=1)[source]

Initialize a Agi object with a target and verbosity level.

Parameters:
  • target (str) – The target for the env object.

  • verbose (int) – Verbosity level (0-3).

Returns:

None

Raises:

None

debug: Optional[bool] = None
async static distribute(env, scheduler=None, workers=None, verbose=0, **args)[source]
Return type:

Any

env: Optional[AgiEnv] = None
async static exec_ssh(ip, cmd)[source]
Return type:

str

async static exec_ssh_async(ip, cmd)[source]

Execute a remote command via SSH and return the last line of its stdout output.

Return type:

str

static find_free_port(start=5000, end=10000, attempts=100)[source]
Return type:

int

static get_default_local_ip()[source]

Get the default local IP address of the machine.

Returns:

The default local IP address.

Return type:

str

Raises:

Exception – If unable to determine the local IP address.

async static get_distrib(env, scheduler=None, workers=None, verbose=0, **args)[source]

check the distribution with a dry run :param package: :type package: any Agi target apps or project created by AGILAB :param list_ip: :type list_ip: any ip V4 with ssh access and python (upto you to link it to python3) with psutil and uv synced :type verbose: int :param verbose: :type verbose: verbosity [0-3] :param Returns: :param the distribution tree: :param ——-:

Return type:

Any

get_ssh_connection(timeout_sec=5)[source]
async static install(env, scheduler=None, workers=None, modes_enabled=15, verbose=None, **args)[source]

Update the cluster’s virtual environment.

Parameters:
  • project_path (Path) – The name of the module to install or the path to the module.

  • list_ip (List[str], optional) – A list of IPv4 addresses with SSH access. Each IP should have Python, psutil, and pdm installed. Defaults to None.

  • modes_enabled (int, optional) – Bitmask indicating enabled modes. Defaults to 0b0111.

  • verbose (int, optional) – Verbosity level (0-3). Higher numbers increase the verbosity of the output. Defaults to 1.

  • **args (Any) – Additional keyword arguments.

Returns:

True if the installation was successful, False otherwise.

Return type:

bool

Raises:
  • ValueError – If module_name_or_path is invalid.

  • ConnectionError

async static run(env, scheduler=None, workers=None, verbose=0, mode=None, rapids_enabled=False, **args)[source]

Compiles the target module in Cython and runs it on the cluster.

Parameters:
  • target (str) – The target Python module to run.

  • scheduler (str, optional) – IP and port address of the Dask scheduler. Defaults to ‘127.0.0.1:8786’.

  • workers (dict, optional) – Dictionary of worker IPs and their counts. Defaults to workers_default.

  • verbose (int, optional) – Verbosity level. Defaults to 0.

  • mode (int | list[int] | str | None, optional) – Mode(s) for execution. Defaults to None. When an int is provided, it is treated as a 4-bit mask controlling RAPIDS/Dask/Cython/Pool features. When a string is provided, it must match r”^[dcrp]+$” (letters enable features). When a list is provided, the modes are benchmarked sequentially.

  • rapids_enabled (bool, optional) – Flag to enable RAPIDS. Defaults to False.

  • **args (Any) – Additional keyword arguments.

Returns:

Result of the execution.

Return type:

Any

Raises:
  • ValueError – If mode is invalid.

  • RuntimeError – If the target module fails to load.

async static send_file(env, ip, local_path, remote_path, user=None, password=None)[source]
async static send_files(env, ip, files, remote_dir, user=None)[source]
async static serve(env, scheduler=None, workers=None, verbose=0, mode=None, rapids_enabled=False, action='start', poll_interval=None, shutdown_on_stop=True, stop_timeout=30.0, **args)[source]

Manage persistent worker services without invoking the run workplan.

action="start" provisions workers and submits BaseWorker.loop as a long-lived service task pinned to each Dask worker. action="stop" signals the loop through BaseWorker.break and optionally tears down the Dask cluster when shutdown_on_stop is true.

Return type:

Dict[str, Any]

async static update(env=None, scheduler=None, workers=None, modes_enabled=15, verbose=None, **args)[source]

install cluster virtual environment :param package: :type package: any Agi target apps or project created with AGILAB :param list_ip: :type list_ip: any ip V4 with ssh access and python (upto you to link it to python3) with psutil and uv synced :param mode_enabled: :type mode_enabled: this is typically a mode mask to know for example if cython or rapids are required :param force_update: :type force_update: make a Spud.update before the installation, default is True :type verbose: Optional[int] :param verbose: :type verbose: verbosity [0-3]

Return type:

None

verbose: Optional[int] = None