agi_core.managers API

Usage Example

Installation


import asyncio
from agi_core.managers.agi_runner import AGI
from agi_env import AgiEnv

async def main():
    env = AgiEnv(install_type=1)
    res = await AGI.install('my_code', env, modes_enabled=0,
                            verbose=True, 
                            scheduler=None, workers=None)
    print(res)
    return res

if __name__ == '__main__':
    asyncio.run(main())
            

Update


Distribute


import asyncio
from agi_core.managers.agi_runner import AGI
from agi_env import AgiEnv

async def main():
    env = AgiEnv(install_type=1)
    res = await AGI.distribute('flight', env, verbose=True, 
                                scheduler=None, workers=None, data_source="file", path="/home/jpm/data/flight", files="*", nfile=1, nskip=0, nread=0, sampling_rate=1.0, datemin="2020-01-01", datemax="2021-01-01", output_format="parquet")
    print(res)
    return res

if __name__ == '__main__':
    asyncio.run(main())
            


import asyncio
from agi_core.managers.agi_runner import AGI
from agi_env import AgiEnv

async def main():
    env = AgiEnv(install_type=1)
    res = await AGI.distribute('my_code', env, verbose=True, 
                                scheduler=None, workers=None, data_source="file", path="/home/jpm/data/flight", files="*", nfile=1, nskip=0, nread=0, sampling_rate=1.0, datemin="2020-01-01", datemax="2021-01-01", output_format="parquet")
    print(res)
    return res

if __name__ == '__main__':
    asyncio.run(main())
            

Run


import asyncio
from agi_core.managers.agi_runner import AGI
from agi_env import AgiEnv

async def main():
    env = AgiEnv(install_type=1)
    res = await AGI.run('my_code', env, mode=0, 
                        scheduler=None, workers=None, 
                        verbose=True, data_source="file", path="/home/jpm/data/flight", files="*", nfile=1, nskip=0, nread=0, sampling_rate=1.0, datemin="2020-01-01", datemax="2021-01-01", output_format="parquet")
    print(res)
    return res

if __name__ == '__main__':
    asyncio.run(main())
            


import asyncio
from agi_core.managers.agi_runner import AGI
from agi_env import AgiEnv

async def main():
    env = AgiEnv(install_type=1)
    res = await AGI.run('flight', env, mode=0, 
                        scheduler=None, workers=None, 
                        verbose=True, data_source="file", path="/home/jpm/data/flight", files="*", nfile=1, nskip=0, nread=0, sampling_rate=1.0, datemin="2020-01-01", datemax="2021-01-01", output_format="parquet")
    print(res)
    return res

if __name__ == '__main__':
    asyncio.run(main())
            

Reference

Packages diagram for agi_runner
Classes diagram for agi_core.managers.agi_runner
class agi_core.managers.agi_runner.agi_runner.AGI(target, verbose=1)[source]

Bases: object

Agi Class.

Agi (Speedy-Python-Dask) is a scalable fwk based on Cython, Dask, and a pool of processes that supports High-Performance Computing (HPC) with or without output data. It offers a command-line interface in Python and an optional LAB with Streamlit, featuring advanced capabilities like embedded ChatGPT and visualizations.

Agi stands for Speedy-Python-Dask.

To run on a cluster:
  1. Create a Agi account on each host with SSH access.

  2. Copy your project’s pyproject.toml to each host.

  3. Run uv sync before using AGI.

  4. To run with output data, provide a shared directory accessible from all hosts. Use this directory in your Python target code as both input and output.

Remarks:
  • Interactive Matplotlib graphics are not supported by Jupyter Lab. Use Jupyter Notebook instead.

  • While debugging in a Jupyter cell, it’s better to comment out #%%time to allow line numbers to display correctly.

CYTHON_MODE = 2
DASK_MODE = 4
DASK_RESET = 59
DEPLOYEMENT_MASK = 48
INSTALL_MASK = 48
INSTALL_MODE = 16
PYTHON_MODE = 1
RAPIDS_MODE = 16
RAPIDS_RESET = 55
RAPIDS_SET = 63
RUN_MASK = 15
SIMULATE_MODE = 48
TIMEOUT = 10
UPDATE_MODE = 32
__init__(target, verbose=1)[source]

Initialize a Agi object with a target and verbosity level.

Parameters:
  • target (str) – The target for the env object.

  • verbose (int) – Verbosity level (0-3).

Returns:

None

Raises:

None

best_mode: Dict[str, Any] = {}
debug: Optional[bool] = None
async static distribute(app, env, scheduler=None, workers=None, verbose=0, **args)[source]

check the distribution with a dry run :param package: :type package: any Agi target apps or project created by AGILAB :param list_ip: :type list_ip: any ip V4 with ssh access and python (upto you to link it to python3) with psutil and uv synced :param verbose: :type verbose: verbosity [0-3] :param Returns: :param the distribution tree: :param ——-:

env: Optional[AgiEnv] = None
static find_free_port(start=5000, end=10000, attempts=100)[source]
static get_default_local_ip()[source]

Get the default local IP address of the machine.

Returns:

The default local IP address.

Return type:

str

Raises:

Exception – If unable to determine the local IP address.

async static install(module_name, env, scheduler=None, workers=None, modes_enabled=15, verbose=1, **args)[source]

Update the cluster’s virtual environment.

Parameters:
  • module_name_or_path (str) – The name of the module to install or the path to the module.

  • list_ip (List[str], optional) – A list of IPv4 addresses with SSH access. Each IP should have Python, psutil, and pdm installed. Defaults to None.

  • modes_enabled (int, optional) – Bitmask indicating enabled modes. Defaults to 0b0111.

  • verbose (int, optional) – Verbosity level (0-3). Higher numbers increase the verbosity of the output. Defaults to 1.

  • **args – Additional keyword arguments.

Returns:

True if the installation was successful, False otherwise.

Return type:

bool

Raises:
  • ValueError – If module_name_or_path is invalid.

  • ConnectionError

async static main(scheduler)[source]
static make_chunks(nchunk2, weights, capacities=None, verbose=0, threshold=12)[source]

Partitions the nchunk2 weighted into n chuncks, in a smart way chunks and chunks_sizes must be left to None

Parameters:
  • nchunk2 – list of number of chunks level 2

  • weights – the list of weight level2

  • capacities – the lnewist of workers capacity (Default value = None)

  • verbose – whether to display run detail or not (Default value = 0)

  • threshold – the number of nchunk2 max to run the optimal algo otherwise downgrade to suboptimal one (Default value = 12)

  • weights – list:

Returns:

list of chunk per my_code_worker containing list of works per my_code_worker containing list of chunks level 1

async static run(target, env, scheduler=None, workers=None, verbose=0, mode=None, rapids_enabled=False, **args)[source]

Compiles the target module in Cython and runs it on the cluster.

Return type:

Any

Parameters:
  • target (str) – The target Python module to run.

  • scheduler (str, optional) – IP and port address of the Dask scheduler. Defaults to ‘127.0.0.1:8786’.

  • workers (dict, optional) – Dictionary of worker IPs and their counts. Defaults to workers_default.

  • verbose (int, optional) – Verbosity level. Defaults to 0.

  • mode (int or list, optional) –

    Mode(s) for execution. Defaults to None. - Bitmask 0b—- (4 bits) where each bit enables/disables specific features:

    • 1—: Rapids

    • -1–: Dask

    • –1-: Cython

    • —1: Pool

    • mode can also be a list of modes to chain for the run.

  • rapids_enabled (bool, optional) – Flag to enable RAPIDS. Defaults to False.

  • **args (Any) – Additional keyword arguments.

Returns:

Result of the execution.

Return type:

Any

Raises:
  • ValueError – If mode is invalid.

  • RuntimeError – If the target module fails to load.

async static update(module_name, module_path, scheduler=None, workers=None, modes_enabled=15, verbose=1, **args)[source]

install cluster virtual environment :param package: :type package: any Agi target apps or project created with AGILAB :param list_ip: :type list_ip: any ip V4 with ssh access and python (upto you to link it to python3) with psutil and uv synced :param mode_enabled: :type mode_enabled: this is typically a mode mask to know for example if cython or rapids are required :param force_update: :type force_update: make a Spud.update before the installation, default is True :param verbose: :type verbose: verbosity [0-3]

workers: Optional[Dict[str, int]] = None
workers_tree: Optional[Any] = None
workers_tree_info: Optional[Any] = None

Packages diagram for agi_core.managers.agi_manager
Classes diagram for agi_core.managers.agi_manager
class agi_core.managers.agi_manager.agi_manager.AgiManager(args=None)[source]

Bases: object

Class AgiManager for orchestration of jobs by the target.

__init__(args=None)[source]

Initialize the AgiManager with input arguments.

Parameters:

args – The input arguments for initializing the AgiManager.

Returns:

None

args = {}
static convert_functions_to_names(workers_tree)[source]

Converts functions in a nested structure to their names.

static do_distrib(inst, agi_env, workers)[source]

Build the distribution tree.

Parameters:

inst – The instance for building the distribution tree.

Returns:

None

static onerror(func, path, exc_info)[source]

Error handler for shutil.rmtree.

If the error is due to an access error (read-only file), it attempts to add write permission and then retries.

If the error is for another reason, it re-raises the error.

Usage: shutil.rmtree(path, onerror=onerror)

Parameters:
  • func (function) – The function that raised the error.

  • path (str) – The path name passed to the function.

  • exc_info (tuple) – The exception information returned by sys.exc_info().

Returns:

None

verbose = None