agi-distributor API

agi_distributor packages applications, ships the required artefacts to the selected workers, and runs the chosen execution mode across the cluster.

Understanding modes_enabled and mode

The first thing to know before reading the examples is that AGILAB encodes execution toggles as a small bitmask.

Parameter

Used by

Meaning

modes_enabled

AGI.install(...)

Bitmask of the execution capabilities that should be prepared during install.

mode

AGI.run(...)

One concrete execution mode selected for the current run.

The current bit values are:

Toggle

Bit value

Meaning

pool

1

Worker-pool path; backend may be process- or thread-based.

cython

2

Compiled worker path when available.

cluster_enabled

4

Distributed Dask scheduler / remote worker path.

rapids

8

RAPIDS / GPU path when supported.

Prefer the public constants instead of typing numeric bitmasks:

  • AGI.PYTHON_MODE | AGI.DASK_MODE for the MyCode docs example.

  • AGI.PYTHON_MODE | AGI.CYTHON_MODE | AGI.DASK_MODE for the Flight docs example.

This is why the install examples below pass named constants through modes_enabled=... and the run examples pass the same intent through a RunRequest. Older snippets may contain raw integers, but public examples should not teach those values directly.

In normal usage, these values are generated from the UI toggles rather than typed manually.

Usage Example

Installation

Install the mycode example:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv


APP = "mycode_project"
NO_CYTHON_RUN_MODES = AGI.PYTHON_MODE | AGI.DASK_MODE


def agilab_apps_path() -> Path:
    marker = Path.home() / ".local/share/agilab/.agilab-path"
    if not marker.is_file():
        raise SystemExit(
            "AGILAB is not initialized. Run the AGILAB installer or "
            "`agilab first-proof --json` before this example."
        )
    return Path(marker.read_text(encoding="utf-8").strip()) / "apps"


async def main():
    app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
    res = await AGI.install(
        app_env,
        modes_enabled=NO_CYTHON_RUN_MODES,
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
    )
    print(res)
    return res


if __name__ == "__main__":
    asyncio.run(main())

Install the flight example:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv


APP = "flight_project"
LOCAL_RUN_MODES = AGI.PYTHON_MODE | AGI.CYTHON_MODE | AGI.DASK_MODE


def agilab_apps_path() -> Path:
    marker = Path.home() / ".local/share/agilab/.agilab-path"
    if not marker.is_file():
        raise SystemExit(
            "AGILAB is not initialized. Run the AGILAB installer or "
            "`agilab first-proof --json` before this example."
        )
    return Path(marker.read_text(encoding="utf-8").strip()) / "apps"


async def main():
    app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
    res = await AGI.install(
        app_env,
        modes_enabled=LOCAL_RUN_MODES,
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
    )
    print(res)
    return res


if __name__ == "__main__":
    asyncio.run(main())

Distribute

Create the distribution bundle that will be sent to the workers:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv


APP = "mycode_project"


def agilab_apps_path() -> Path:
    marker = Path.home() / ".local/share/agilab/.agilab-path"
    if not marker.is_file():
        raise SystemExit(
            "AGILAB is not initialized. Run the AGILAB installer or "
            "`agilab first-proof --json` before this example."
        )
    return Path(marker.read_text(encoding="utf-8").strip()) / "apps"


async def main():
    app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
    res = await AGI.get_distrib(
        app_env,
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
        data_in="mycode/dataset",
        data_out="mycode/dataframe",
        files="*",
        nfile=1,
    )
    print(res)
    return res


if __name__ == "__main__":
    asyncio.run(main())

Equivalent example for flight:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv


APP = "flight_project"


def agilab_apps_path() -> Path:
    marker = Path.home() / ".local/share/agilab/.agilab-path"
    if not marker.is_file():
        raise SystemExit(
            "AGILAB is not initialized. Run the AGILAB installer or "
            "`agilab first-proof --json` before this example."
        )
    return Path(marker.read_text(encoding="utf-8").strip()) / "apps"


async def main():
    app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
    res = await AGI.get_distrib(
        app_env,
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
        data_source="file",
        data_in="flight/dataset",
        data_out="flight/dataframe",
        files="*",
        nfile=1,
    )
    print(res)
    return res


if __name__ == "__main__":
    asyncio.run(main())

Run

Start with the simplest mental model first:

from agi_cluster.agi_distributor import AGI, RunRequest

request = RunRequest(mode=AGI.PYTHON_MODE)
res = await AGI.run(app_env, request=request)

Then move to the generated examples that add install-time capabilities, distributed hosts, and app-specific arguments:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI, RunRequest
from agi_env import AgiEnv


APP = "mycode_project"
NO_CYTHON_RUN_MODES = AGI.PYTHON_MODE | AGI.DASK_MODE


def agilab_apps_path() -> Path:
    marker = Path.home() / ".local/share/agilab/.agilab-path"
    if not marker.is_file():
        raise SystemExit(
            "AGILAB is not initialized. Run the AGILAB installer or "
            "`agilab first-proof --json` before this example."
        )
    return Path(marker.read_text(encoding="utf-8").strip()) / "apps"


async def main():
    app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
    request = RunRequest(
        mode=NO_CYTHON_RUN_MODES,
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
    )
    res = await AGI.run(app_env, request=request)
    print(res)
    return res


if __name__ == "__main__":
    asyncio.run(main())

Equivalent example for flight:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI, RunRequest
from agi_env import AgiEnv


APP = "flight_project"
LOCAL_RUN_MODES = AGI.PYTHON_MODE | AGI.CYTHON_MODE | AGI.DASK_MODE


def agilab_apps_path() -> Path:
    marker = Path.home() / ".local/share/agilab/.agilab-path"
    if not marker.is_file():
        raise SystemExit(
            "AGILAB is not initialized. Run the AGILAB installer or "
            "`agilab first-proof --json` before this example."
        )
    return Path(marker.read_text(encoding="utf-8").strip()) / "apps"


async def main():
    app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
    request = RunRequest(
        params={
            "data_source": "file",
            "files": "*",
            "nfile": 1,
        },
        data_in="flight/dataset",
        data_out="flight/dataframe",
        mode=LOCAL_RUN_MODES,
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
    )
    res = await AGI.run(app_env, request=request)
    print(res)
    return res


if __name__ == "__main__":
    asyncio.run(main())

Reference

Packages diagram for agi_distributor
Classes diagram for agi_distributor

Cluster workplan utilities for distributing AGILab workloads.

class agi_cluster.agi_distributor.agi_distributor.AGI(target, verbose=1)[source]

Bases: object

Coordinate installation, scheduling, and execution of AGILab workloads.

CYTHON_MODE = 2
DASK_MODE = 4
PYTHON_MODE = 1
RAPIDS_MODE = 16
__init__(target, verbose=1)[source]

Initialize a Agi object with a target and verbosity level.

Parameters:
  • target (str) – The target for the env object.

  • verbose (int) – Verbosity level (0-3).

Returns:

None

Raises:

None

debug = None
async static distribute(env, scheduler=None, workers=None, verbose=0, **args)[source]
Return type:

Any

env = None
async static exec_ssh(ip, cmd)[source]
Return type:

str

async static exec_ssh_async(ip, cmd)[source]
Return type:

str

static find_free_port(start=5000, end=10000, attempts=100)[source]
Return type:

int

static get_default_local_ip()[source]
Return type:

str

async static get_distrib(env, scheduler=None, workers=None, verbose=0, **args)[source]
Return type:

Any

get_ssh_connection(timeout_sec=5)[source]
async static install(env, scheduler=None, workers=None, workers_data_path=None, modes_enabled=15, verbose=None, **args)[source]
Return type:

None

async static run(env, request)[source]

Compiles the target module in Cython and runs it on the cluster.

Parameters:
  • env (AgiEnv) – AGILAB environment to execute.

  • request (RunRequest) – Typed execution request. App params and workflow stages are kept separate.

Returns:

Result of the execution.

Return type:

Any

Raises:
  • ValueError – If mode is invalid.

  • RuntimeError – If the target module fails to load.

async static send_file(env, ip, local_path, remote_path, user=None, password=None)[source]
async static send_files(env, ip, files, remote_dir, user=None)[source]
async static serve(env, scheduler=None, workers=None, verbose=0, mode=None, rapids_enabled=False, action='start', poll_interval=None, shutdown_on_stop=True, stop_timeout=30.0, service_queue_dir=None, heartbeat_timeout=None, cleanup_done_ttl_sec=None, cleanup_failed_ttl_sec=None, cleanup_heartbeat_ttl_sec=None, cleanup_done_max_files=None, cleanup_failed_max_files=None, cleanup_heartbeat_max_files=None, health_output_path=None, **args)[source]
Return type:

Dict[str, Any]

async static submit(env=None, workers=None, work_plan=None, work_plan_metadata=None, task_id=None, task_name=None, **args)[source]
Return type:

Dict[str, Any]

async static update(env=None, scheduler=None, workers=None, modes_enabled=15, verbose=None, **args)[source]
Return type:

None

verbose = None