agi-distributor API

agi_distributor packages applications, ships the required artefacts to the selected workers, and runs the chosen execution mode across the cluster.

Understanding modes_enabled and mode

The first thing to know before reading the examples is that AGILAB encodes execution toggles as a small bitmask.

Parameter

Used by

Meaning

modes_enabled

AGI.install(...)

Bitmask of the execution capabilities that should be prepared during install.

mode

AGI.run(...)

One concrete execution mode selected for the current run.

The current bit values are:

Toggle

Bit value

Meaning

pool

1

Worker-pool path; backend may be process- or thread-based.

cython

2

Compiled worker path when available.

cluster_enabled

4

Distributed Dask scheduler / remote worker path.

rapids

8

RAPIDS / GPU path when supported.

Prefer the public constants instead of typing numeric bitmasks:

  • AGI.PYTHON_MODE | AGI.DASK_MODE for the Minimal App docs example.

  • AGI.PYTHON_MODE | AGI.DASK_MODE for the Flight docs example.

This is why the install examples below pass named constants through modes_enabled=... and the run examples pass the same intent through a RunRequest. Older snippets may contain raw integers, but public examples should not teach those values directly.

In normal usage, these values are generated from the UI toggles rather than typed manually.

The literal-included Minimal App and Flight snippets below are kept aligned with the packaged examples under src/agilab/examples: they resolve built-in apps from apps/builtin and keep Cython off for the public first-run examples. Cython can still be enabled intentionally later, after the local Python/Dask path is known-good.

Usage Example

Installation

Install the minimal_app example:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv


APP = "minimal_app_project"
NO_CYTHON_RUN_MODES = AGI.PYTHON_MODE | AGI.DASK_MODE


def agilab_apps_path() -> Path:
    marker = Path.home() / ".local/share/agilab/.agilab-path"
    if not marker.is_file():
        raise SystemExit(
            "AGILAB is not initialized. Run the AGILAB installer or "
            "`agilab first-proof --json` before this example."
        )
    return Path(marker.read_text(encoding="utf-8").strip()) / "apps" / "builtin"


async def main():
    app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
    res = await AGI.install(
        app_env,
        modes_enabled=NO_CYTHON_RUN_MODES,
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
    )
    print(res)
    return res


if __name__ == "__main__":
    asyncio.run(main())

Install the flight example:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv


APP = "flight_telemetry_project"
LOCAL_RUN_MODES = AGI.PYTHON_MODE | AGI.DASK_MODE


def agilab_apps_path() -> Path:
    marker = Path.home() / ".local/share/agilab/.agilab-path"
    if not marker.is_file():
        raise SystemExit(
            "AGILAB is not initialized. Run the AGILAB installer or "
            "`agilab first-proof --json` before this example."
        )
    return Path(marker.read_text(encoding="utf-8").strip()) / "apps" / "builtin"


async def main():
    app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
    res = await AGI.install(
        app_env,
        modes_enabled=LOCAL_RUN_MODES,
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
    )
    print(res)
    return res


if __name__ == "__main__":
    asyncio.run(main())

Distribute

Create the distribution bundle that will be sent to the workers:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv


APP = "minimal_app_project"


def agilab_apps_path() -> Path:
    marker = Path.home() / ".local/share/agilab/.agilab-path"
    if not marker.is_file():
        raise SystemExit(
            "AGILAB is not initialized. Run the AGILAB installer or "
            "`agilab first-proof --json` before this example."
        )
    return Path(marker.read_text(encoding="utf-8").strip()) / "apps" / "builtin"


async def main():
    app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
    res = await AGI.get_distrib(
        app_env,
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
        data_in="minimal_app/dataset",
        data_out="minimal_app/dataframe",
        files="*",
        nfile=1,
    )
    print(res)
    return res


if __name__ == "__main__":
    asyncio.run(main())

Equivalent example for flight:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI
from agi_env import AgiEnv


APP = "flight_telemetry_project"


def agilab_apps_path() -> Path:
    marker = Path.home() / ".local/share/agilab/.agilab-path"
    if not marker.is_file():
        raise SystemExit(
            "AGILAB is not initialized. Run the AGILAB installer or "
            "`agilab first-proof --json` before this example."
        )
    return Path(marker.read_text(encoding="utf-8").strip()) / "apps" / "builtin"


async def main():
    app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
    res = await AGI.get_distrib(
        app_env,
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
        data_source="file",
        data_in="flight_telemetry/dataset",
        data_out="flight_telemetry/dataframe",
        files="*",
        nfile=1,
    )
    print(res)
    return res


if __name__ == "__main__":
    asyncio.run(main())

Run

Start with the simplest mental model first:

from agi_cluster.agi_distributor import AGI, RunRequest

request = RunRequest(mode=AGI.PYTHON_MODE)
res = await AGI.run(app_env, request=request)

Then move to the generated examples that add install-time capabilities, distributed hosts, and app-specific arguments:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI, RunRequest
from agi_env import AgiEnv


APP = "minimal_app_project"
NO_CYTHON_RUN_MODES = AGI.PYTHON_MODE | AGI.DASK_MODE


def agilab_apps_path() -> Path:
    marker = Path.home() / ".local/share/agilab/.agilab-path"
    if not marker.is_file():
        raise SystemExit(
            "AGILAB is not initialized. Run the AGILAB installer or "
            "`agilab first-proof --json` before this example."
        )
    return Path(marker.read_text(encoding="utf-8").strip()) / "apps" / "builtin"


async def main():
    app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
    request = RunRequest(
        mode=NO_CYTHON_RUN_MODES,
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
    )
    res = await AGI.run(app_env, request=request)
    print(res)
    return res


if __name__ == "__main__":
    asyncio.run(main())

Equivalent example for flight:

import asyncio
from pathlib import Path

from agi_cluster.agi_distributor import AGI, RunRequest
from agi_env import AgiEnv


APP = "flight_telemetry_project"
LOCAL_RUN_MODES = AGI.PYTHON_MODE | AGI.DASK_MODE


def agilab_apps_path() -> Path:
    marker = Path.home() / ".local/share/agilab/.agilab-path"
    if not marker.is_file():
        raise SystemExit(
            "AGILAB is not initialized. Run the AGILAB installer or "
            "`agilab first-proof --json` before this example."
        )
    return Path(marker.read_text(encoding="utf-8").strip()) / "apps" / "builtin"


async def main():
    app_env = AgiEnv(apps_path=agilab_apps_path(), app=APP, verbose=1)
    request = RunRequest(
        params={
            "data_source": "file",
            "files": "*",
            "nfile": 1,
            "nskip": 0,
            "nread": 0,
            "sampling_rate": 1.0,
            "datemin": "2020-01-01",
            "datemax": "2021-01-01",
            "output_format": "parquet",
        },
        data_in="flight_telemetry/dataset",
        data_out="flight_telemetry/dataframe",
        mode=LOCAL_RUN_MODES,
        scheduler="127.0.0.1",
        workers={"127.0.0.1": 1},
    )
    res = await AGI.run(app_env, request=request)
    print(res)
    return res


if __name__ == "__main__":
    asyncio.run(main())

Reference

Packages diagram for agi_distributor
Classes diagram for agi_distributor

Cluster workplan utilities for distributing AGILab workloads.