agi-distributor API

agi_distributor packages applications, ships the required artefacts to the selected workers, and runs the chosen execution mode across the cluster.

Understanding modes_enabled and mode

The first thing to know before reading the examples is that AGILAB encodes execution toggles as a small bitmask.

Parameter

Used by

Meaning

modes_enabled

AGI.install(...)

Bitmask of the execution capabilities that should be prepared during install.

mode

AGI.run(...)

One concrete execution mode selected for the current run.

The current bit values are:

Toggle

Bit value

Meaning

pool

1

Multiprocessing / worker-pool path.

cython

2

Compiled worker path when available.

cluster_enabled

4

Distributed Dask scheduler / remote worker path.

rapids

8

RAPIDS / GPU path when supported.

Examples:

  • 13 = 4 + 8 + 1 = cluster + rapids + pool

  • 15 = 4 + 8 + 2 + 1 = cluster + rapids + cython + pool

This is why the install examples below use modes_enabled=13 or modes_enabled=15, while the run examples use matching mode=13 or mode=15 values.

In normal usage, these values are generated from the UI toggles rather than typed manually.

Usage Example

Installation

Install the mycode example:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv


AGILAB_PATH = Path((Path.home() / ".local/share/agilab/.agilab-path").read_text().strip())
APPS_PATH = AGILAB_PATH / "apps"
APP = "mycode_project"


async def main():
    app_env = AgiEnv(apps_path=APPS_PATH, app=APP, verbose=1)
    res = await AGI.install(
        app_env,
        modes_enabled=13,  # cluster + pool + rapids
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
    )
    print(res)
    return res


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

Install the flight example:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv


AGILAB_PATH = Path((Path.home() / ".local/share/agilab/.agilab-path").read_text().strip())
APPS_PATH = AGILAB_PATH / "apps"
APP = "flight_project"


async def main():
    app_env = AgiEnv(apps_path=APPS_PATH, app=APP, verbose=1)
    res = await AGI.install(
        app_env,
        modes_enabled=15,  # cluster + pool + cython + rapids
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
    )
    print(res)
    return res


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

Distribute

Create the distribution bundle that will be sent to the workers:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv


AGILAB_PATH = Path((Path.home() / ".local/share/agilab/.agilab-path").read_text().strip())
APPS_PATH = AGILAB_PATH / "apps"
APP = "mycode_project"


async def main():
    app_env = AgiEnv(apps_path=APPS_PATH, app=APP, verbose=1)
    res = await AGI.get_distrib(
        app_env,
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
        data_in="mycode/dataset",
        data_out="mycode/dataframe",
        files="*",
        nfile=1,
    )
    print(res)
    return res


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

Equivalent example for flight:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv


AGILAB_PATH = Path((Path.home() / ".local/share/agilab/.agilab-path").read_text().strip())
APPS_PATH = AGILAB_PATH / "apps"
APP = "flight_project"


async def main():
    app_env = AgiEnv(apps_path=APPS_PATH, app=APP, verbose=1)
    res = await AGI.get_distrib(
        app_env,
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
        data_source="file",
        data_in="flight/dataset",
        data_out="flight/dataframe",
        files="*",
        nfile=1,
    )
    print(res)
    return res


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

Run

Start with the simplest mental model first:

res = await AGI.run(
    app_env,
    mode=0,  # plain local Python execution
)

Then move to the generated examples that add install-time capabilities, distributed hosts, and app-specific arguments:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv


AGILAB_PATH = Path((Path.home() / ".local/share/agilab/.agilab-path").read_text().strip())
APPS_PATH = AGILAB_PATH / "apps"
APP = "mycode_project"


async def main():
    app_env = AgiEnv(apps_path=APPS_PATH, app=APP, verbose=1)
    res = await AGI.run(
        app_env,
        mode=13,  # cluster + pool + rapids
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
        data_in="mycode/dataset",
        data_out="mycode/dataframe",
        files="*",
        nfile=1,
    )
    print(res)
    return res


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

Equivalent example for flight:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv


AGILAB_PATH = Path((Path.home() / ".local/share/agilab/.agilab-path").read_text().strip())
APPS_PATH = AGILAB_PATH / "apps"
APP = "flight_project"


async def main():
    app_env = AgiEnv(apps_path=APPS_PATH, app=APP, verbose=1)
    res = await AGI.run(
        app_env,
        mode=15,  # cluster + pool + cython + rapids
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
        data_source="file",
        data_in="flight/dataset",
        data_out="flight/dataframe",
        files="*",
        nfile=1,
    )
    print(res)
    return res


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(main())

Reference

Packages diagram for agi_distributor
Classes diagram for agi_distributor

Cluster workplan utilities for distributing AGILab workloads.

class agi_cluster.agi_distributor.agi_distributor.AGI(target, verbose=1)[source]

Bases: object

Coordinate installation, scheduling, and execution of AGILab workloads.

CYTHON_MODE = 2
DASK_MODE = 4
PYTHON_MODE = 1
RAPIDS_MODE = 16
__init__(target, verbose=1)[source]

Initialize a Agi object with a target and verbosity level.

Parameters:
  • target (str) – The target for the env object.

  • verbose (int) – Verbosity level (0-3).

Returns:

None

Raises:

None

debug = None
async static distribute(env, scheduler=None, workers=None, verbose=0, **args)[source]
Return type:

Any

env = None
async static exec_ssh(ip, cmd)[source]
Return type:

str

async static exec_ssh_async(ip, cmd)[source]
Return type:

str

static find_free_port(start=5000, end=10000, attempts=100)[source]
Return type:

int

static get_default_local_ip()[source]
Return type:

str

async static get_distrib(env, scheduler=None, workers=None, verbose=0, **args)[source]
Return type:

Any

get_ssh_connection(timeout_sec=5)[source]
async static install(env, scheduler=None, workers=None, workers_data_path=None, modes_enabled=15, verbose=None, **args)[source]
Return type:

None

async static run(env, scheduler=None, workers=None, workers_data_path=None, verbose=0, mode=None, rapids_enabled=False, **args)[source]

Compiles the target module in Cython and runs it on the cluster.

Parameters:
  • target (str) – The target Python module to run.

  • scheduler (Optional[str]) – IP and port address of the Dask scheduler. Defaults to ‘127.0.0.1:8786’.

  • workers (Optional[Dict[str, int]]) – Dictionary of worker IPs and their counts. Defaults to workers_default.

  • verbose (int) – Verbosity level. Defaults to 0.

  • mode (Union[int, List[int], str, None]) – Mode(s) for execution. Defaults to None. When an int is provided, it is treated as a 4-bit mask controlling RAPIDS/Dask/Cython/Pool features. When a string is provided, it must match r”^[dcrp]+$” (letters enable features). When a list is provided, the modes are benchmarked sequentially.

  • rapids_enabled (bool) – Flag to enable RAPIDS. Defaults to False.

  • **args (Any) – Additional keyword arguments.

Returns:

Result of the execution.

Return type:

Any

Raises:
  • ValueError – If mode is invalid.

  • RuntimeError – If the target module fails to load.

async static send_file(env, ip, local_path, remote_path, user=None, password=None)[source]
async static send_files(env, ip, files, remote_dir, user=None)[source]
async static serve(env, scheduler=None, workers=None, verbose=0, mode=None, rapids_enabled=False, action='start', poll_interval=None, shutdown_on_stop=True, stop_timeout=30.0, service_queue_dir=None, heartbeat_timeout=None, cleanup_done_ttl_sec=None, cleanup_failed_ttl_sec=None, cleanup_heartbeat_ttl_sec=None, cleanup_done_max_files=None, cleanup_failed_max_files=None, cleanup_heartbeat_max_files=None, health_output_path=None, **args)[source]
Return type:

Dict[str, Any]

async static submit(env=None, workers=None, work_plan=None, work_plan_metadata=None, task_id=None, task_name=None, **args)[source]
Return type:

Dict[str, Any]

async static update(env=None, scheduler=None, workers=None, modes_enabled=15, verbose=None, **args)[source]
Return type:

None

verbose = None