agi_core.workers API

Workers Reference

agi_core.workers.agi-worker

Packages diagram for agi_core.workers.agi_worker
Classes diagram for agi_core.workers.agi_worker

agi_worker module

Auteur: Jean-Pierre Morard

class agi_core.workers.agi_worker.agi_worker.AgiWorker[source]

Bases: ABC

class AgiWorker v1.0

static build(app, target_worker, dask_home, worker, mode=0, verbose=0)[source]

Function to build target code on a my_code_AgiWorker.

Parameters:
  • app (str) – app to build

  • target_worker (str) – module to build

  • dask_home (str) – path to dask home

  • worker – current worker

  • mode – (Default value = 0)

  • verbose – (Default value = 0)

Returns:

cython_decorators = ['njit']
dask_home = None
static do_works(workers_tree, workers_tree_info)[source]

run of workers

Parameters:
  • chunk – distribution tree

  • chunks

Returns:

static exec(cmd, path, worker)[source]

execute a command within a subprocess

Parameters:
  • cmd – the str of the command

  • path – the path where to lunch the command

  • worker

Returns:

static expand(path, base_directory=None)[source]

Expand a given path to an absolute path.

Parameters:
  • path (str) – The path to expand.

  • base_directory (str, optional) – The base directory to use for expanding the path. Defaults to None.

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)[source]

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

static get_worker_info(worker_id)[source]

def get_worker_info():

Parameters:

worker_id

Returns:

home_dir = None
is_managed_pc = False
static join(path1, path2)[source]

Join two file paths.

Parameters:
  • path1 (str) – The first file path.

  • path2 (str) – The second file path.

Returns:

The combined file path.

Return type:

str

Raises:

None

logs = None
mode = None
static new(target_module, target_class, target_package, mode=None, verbose=0, worker_id=0, worker='localhost', env=None, args=None)[source]

new worker instance :param module: instanciate and load target my_code_worker module :param target_module: :param target_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)

Returns:

static normalize_path(path)[source]
onerror(path, exc_info)[source]

Error handler for shutil.rmtree.

If the error is due to an access error (read only file) it attempts to add write permission and then retries.

If the error is for another reason it re-raises the error.

Usage : shutil.rmtree(path, onerror=onerror)

Parameters:
  • func

  • path

  • exc_info

Returns:

static run(app, workers={'127.0.0.1': 1}, mode=0, verbose=3, args=None)[source]
share_path = None
start()[source]

Start the worker and print out a message if verbose mode is enabled.

Parameters:

None

Returns:

None

stop()[source]

Returns:

t0 = None
verbose = 1
worker = None
worker_id = None

agi_core.workers.agent-worker

Packages diagram for agi_core.workers.agent_worker
Classes diagram for agi_core.workers.agent_worker

agi_core.workers.dag-worker

Packages diagram for agi_core.workers.dag_worker
Classes diagram for agi_core.workers.dag_worker
class agi_core.workers.dag_worker.dag_worker.AgiDagWorker[source]

Bases: AgiWorker

AgiDagWorker Class

Inherits from:

AgiWorker: Provides foundational worker functionalities.

static build(app, target_worker, dask_home, worker, mode=0, verbose=0)

Function to build target code on a my_code_AgiWorker.

Parameters:
  • app (str) – app to build

  • target_worker (str) – module to build

  • dask_home (str) – path to dask home

  • worker – current worker

  • mode – (Default value = 0)

  • verbose – (Default value = 0)

Returns:

cython_decorators = ['njit']
dask_home = None
static do_works(workers_tree, workers_tree_info)

run of workers

Parameters:
  • chunk – distribution tree

  • chunks

Returns:

static exec(cmd, path, worker)

execute a command within a subprocess

Parameters:
  • cmd – the str of the command

  • path – the path where to lunch the command

  • worker

Returns:

exec_mono_process(workers_tree, workers_tree_info)[source]

Execute tasks in a single process, respecting dependencies.

exec_multi_process(workers_tree, workers_tree_info)[source]

Execute tasks in multiple threads, respecting dependencies.

static expand(path, base_directory=None)

Expand a given path to an absolute path.

Parameters:
  • path (str) – The path to expand.

  • base_directory (str, optional) – The base directory to use for expanding the path. Defaults to None.

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

static get_worker_info(worker_id)

def get_worker_info():

Parameters:

worker_id

Returns:

home_dir = None
is_managed_pc = False
static join(path1, path2)

Join two file paths.

Parameters:
  • path1 (str) – The first file path.

  • path2 (str) – The second file path.

Returns:

The combined file path.

Return type:

str

Raises:

None

logs = None
mode = None
static new(target_module, target_class, target_package, mode=None, verbose=0, worker_id=0, worker='localhost', env=None, args=None)

new worker instance :param module: instanciate and load target my_code_worker module :param target_module: :param target_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)

Returns:

static normalize_path(path)
onerror(path, exc_info)

Error handler for shutil.rmtree.

If the error is due to an access error (read only file) it attempts to add write permission and then retries.

If the error is for another reason it re-raises the error.

Usage : shutil.rmtree(path, onerror=onerror)

Parameters:
  • func

  • path

  • exc_info

Returns:

static run(app, workers={'127.0.0.1': 1}, mode=0, verbose=3, args=None)
share_path = None
start()

Start the worker and print out a message if verbose mode is enabled.

Parameters:

None

Returns:

None

stop()

Returns:

t0 = None
topological_sort(dependency_graph)[source]

Perform a topological sort on the dependency graph.

Parameters:

dependency_graph (dict) – A dictionary where keys are functions and values are lists of dependent functions.

Returns:

A list of functions in topologically sorted order.

Return type:

list

Raises:

ValueError – If a cycle is detected in the dependencies.

verbose = 1
worker = None
worker_id = None
works(workers_tree, workers_tree_info)[source]

Run the worker tasks.


agi_core.workers.data-worker

Packages diagram for agi_core.workers.data_workers
Classes diagram for agi_core.workers.data_worker

data_worker Framework Callback Functions Module

This module provides the AgiDataWorker class, which extends the foundational functionalities of AgiWorker for processing data using multiprocessing or single-threaded approaches.

Classes:

AgiDataWorker: Worker class for data processing tasks.

Internal Libraries:

os, warnings

External Libraries:

concurrent.futures.ProcessPoolExecutor pathlib.Path time polars as pl AgiWorker from agi_worker.agi_worker

class agi_core.workers.data_worker.data_worker.AgiDataWorker[source]

Bases: AgiWorker

AgiDataWorker Class

Inherits from AgiWorker to provide extended data processing functionalities.

verbose

Verbosity level for logging.

Type:

int

data_out

Path to the output directory.

Type:

str

worker_id

Identifier for the worker instance.

Type:

int

args

Configuration arguments for the worker.

Type:

dict

static build(app, target_worker, dask_home, worker, mode=0, verbose=0)

Function to build target code on a my_code_AgiWorker.

Parameters:
  • app (str) – app to build

  • target_worker (str) – module to build

  • dask_home (str) – path to dask home

  • worker – current worker

  • mode – (Default value = 0)

  • verbose – (Default value = 0)

Returns:

cython_decorators = ['njit']
dask_home = None
static do_works(workers_tree, workers_tree_info)

run of workers

Parameters:
  • chunk – distribution tree

  • chunks

Returns:

static exec(cmd, path, worker)

execute a command within a subprocess

Parameters:
  • cmd – the str of the command

  • path – the path where to lunch the command

  • worker

Returns:

exec_mono_process(workers_tree, workers_tree_info)[source]

Executes tasks in single-threaded mode.

Return type:

None

Parameters:
  • workers_tree (any) – Distribution tree structure.

  • workers_tree_info (any) – Additional information about the workers.

exec_multi_process(workers_tree, workers_tree_info)[source]

Executes tasks in multiprocessing mode.

Return type:

None

Parameters:
  • workers_tree (any) – Distribution tree structure.

  • workers_tree_info (any) – Additional information about the workers.

static expand(path, base_directory=None)

Expand a given path to an absolute path.

Parameters:
  • path (str) – The path to expand.

  • base_directory (str, optional) – The base directory to use for expanding the path. Defaults to None.

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

static get_worker_info(worker_id)

def get_worker_info():

Parameters:

worker_id

Returns:

home_dir = None
is_managed_pc = False
static join(path1, path2)

Join two file paths.

Parameters:
  • path1 (str) – The first file path.

  • path2 (str) – The second file path.

Returns:

The combined file path.

Return type:

str

Raises:

None

logs = None
mode = None
static new(target_module, target_class, target_package, mode=None, verbose=0, worker_id=0, worker='localhost', env=None, args=None)

new worker instance :param module: instanciate and load target my_code_worker module :param target_module: :param target_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)

Returns:

static normalize_path(path)
onerror(path, exc_info)

Error handler for shutil.rmtree.

If the error is due to an access error (read only file) it attempts to add write permission and then retries.

If the error is for another reason it re-raises the error.

Usage : shutil.rmtree(path, onerror=onerror)

Parameters:
  • func

  • path

  • exc_info

Returns:

static run(app, workers={'127.0.0.1': 1}, mode=0, verbose=3, args=None)
share_path = None
start()

Start the worker and print out a message if verbose mode is enabled.

Parameters:

None

Returns:

None

stop()

Returns:

t0 = None
verbose = 1
work_done(df=None)[source]

Handles the post-processing of the DataFrame after work_pool execution.

Return type:

None

Parameters:

df (pl.DataFrame, optional) – The Polars DataFrame to process. Defaults to None.

Raises:

ValueError – If an unsupported output format is specified.

work_pool(x=None)[source]

Processes a single task.

Return type:

DataFrame

Parameters:

x (any, optional) – The task to process. Defaults to None.

Returns:

A Polars DataFrame with the processed results.

Return type:

pl.DataFrame

worker = None
worker_id = None
works(workers_tree, workers_tree_info)[source]

Executes worker tasks based on the distribution tree.

Return type:

float

Parameters:
  • workers_tree (any) – Distribution tree structure.

  • workers_tree_info (any) – Additional information about the workers.

Returns:

Execution time in seconds.

Return type:

float