agi_core.workers API

Workers Reference

agi_core.workers.agi-worker

Packages diagram for agi_core.workers.agi_worker
Classes diagram for agi_core.workers.agi_worker

agi_worker module

Auteur: Jean-Pierre Morard

class agi_core.workers.agi_worker.agi_worker.AgiWorker[source]

Bases: ABC

class AgiWorker v1.0

static build(target_worker, dask_home, worker, mode=0, verbose=0)[source]

Function to build target code on a target Worker.

Parameters:
  • target_worker (str) – module to build

  • dask_home (str) – path to dask home

  • worker – current worker

  • mode – (Default value = 0)

  • verbose – (Default value = 0)

cython_decorators = ['njit']
dask_home = None
static do_works(workers_tree, workers_tree_info)[source]

run of workers

Parameters:
  • chunk – distribution tree

  • chunks

Returns:

env = None
static exec(cmd, path, worker)[source]

execute a command within a subprocess

Parameters:
  • cmd – the str of the command

  • path – the path where to lunch the command

  • worker

Returns:

static expand(path, base_directory=None)[source]

Expand a given path to an absolute path. :param path: The path to expand. :type path: str :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)[source]

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

get_logs_and_result(*args, verbosity=50, **kwargs)[source]
static get_worker_info(worker_id)[source]

def get_worker_info():

Parameters:

worker_id

Returns:

home_dir = None
is_managed_pc = False
static join(path1, path2)[source]

Join two file paths.

Parameters:
  • path1 (str) – The first file path.

  • path2 (str) – The second file path.

Returns:

The combined file path.

Return type:

str

Raises:

None

logs = None
mode = None
static new(app, mode=None, env=None, verbose=0, worker_id=0, worker='localhost', args=None)[source]

new worker instance :param module: instanciate and load target my_code_worker module :param target_worker: :param target_worker_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)

Returns:

static onerror(func, path, exc_info)[source]

Error handler for shutil.rmtree. If it’s a permission error, make it writable and retry. Otherwise re-raise.

static run(workers={'127.0.0.1': 1}, mode=0, env=None, verbose=None, args=None)[source]
Parameters:
  • app

  • workers

  • mode

  • verbose

  • args

Returns:

share_path = None
start()[source]

Start the worker and print out a message if verbose mode is enabled.

Parameters:

None

Returns:

None

stop()[source]

Returns:

t0 = None
verbose = 1
worker = None
worker_id = None

agi_core.workers.agent-worker

Packages diagram for agi_core.workers.agent_worker
Classes diagram for agi_core.workers.agent_worker

agi_core.workers.dag-worker

Packages diagram for agi_core.workers.dag_worker
Classes diagram for agi_core.workers.dag_worker
class agi_core.workers.dag_worker.dag_worker.DagWorker[source]

Bases: AgiWorker

DagWorker Class

Inherits from:

AgiWorker: Provides foundational worker functionalities.

static build(target_worker, dask_home, worker, mode=0, verbose=0)

Function to build target code on a target Worker.

Parameters:
  • target_worker (str) – module to build

  • dask_home (str) – path to dask home

  • worker – current worker

  • mode – (Default value = 0)

  • verbose – (Default value = 0)

cython_decorators = ['njit']
dask_home = None
static do_works(workers_tree, workers_tree_info)

run of workers

Parameters:
  • chunk – distribution tree

  • chunks

Returns:

env = None
static exec(cmd, path, worker)

execute a command within a subprocess

Parameters:
  • cmd – the str of the command

  • path – the path where to lunch the command

  • worker

Returns:

exec_mono_process(workers_tree, workers_tree_info)[source]

Execute tasks in a single process, respecting dependencies, but only for branches assigned to this worker via round-robin.

exec_multi_process(workers_tree, workers_tree_info)[source]

Execute tasks in multiple threads, distributing branches to workers in round‑robin, then honoring dependencies per worker.

static expand(path, base_directory=None)

Expand a given path to an absolute path. :param path: The path to expand. :type path: str :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

get_logs_and_result(*args, verbosity=50, **kwargs)
static get_worker_info(worker_id)

def get_worker_info():

Parameters:

worker_id

Returns:

home_dir = None
is_managed_pc = False
static join(path1, path2)

Join two file paths.

Parameters:
  • path1 (str) – The first file path.

  • path2 (str) – The second file path.

Returns:

The combined file path.

Return type:

str

Raises:

None

logs = None
mode = None
static new(app, mode=None, env=None, verbose=0, worker_id=0, worker='localhost', args=None)

new worker instance :param module: instanciate and load target my_code_worker module :param target_worker: :param target_worker_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)

Returns:

static onerror(func, path, exc_info)

Error handler for shutil.rmtree. If it’s a permission error, make it writable and retry. Otherwise re-raise.

static run(workers={'127.0.0.1': 1}, mode=0, env=None, verbose=None, args=None)
Parameters:
  • app

  • workers

  • mode

  • verbose

  • args

Returns:

share_path = None
start()

Start the worker and print out a message if verbose mode is enabled.

Parameters:

None

Returns:

None

stop()

Returns:

t0 = None
topological_sort(dependency_graph)[source]

Perform a topological sort on the dependency graph. Raises ValueError on cycles.

verbose = 1
worker = None
worker_id = None
works(workers_tree, workers_tree_info)[source]

Run the worker tasks.


agi_core.workers.pandas-worker

Packages diagram for agi_core.workers.pandas_workers
Classes diagram for agi_core.workers.pandas_worker

pandas_worker Framework Callback Functions Module

This module provides the PandasWorker class, which extends the foundational functionalities of AgiWorker for processing data using multiprocessing or single-threaded approaches with pandas.

Classes:

PandasWorker: Worker class for data processing tasks using pandas.

Internal Libraries:

os, warnings

External Libraries:

concurrent.futures.ProcessPoolExecutor pathlib.Path time pandas as pd AgiWorker from agi_worker.agi_worker

class agi_core.workers.pandas_worker.pandas_worker.PandasWorker[source]

Bases: AgiWorker

PandasWorker Class

Inherits from AgiWorker to provide extended data processing functionalities using pandas.

verbose

Verbosity level for logging.

Type:

int

data_out

Path to the output directory.

Type:

str

worker_id

Identifier for the worker instance.

Type:

int

args

Configuration arguments for the worker.

Type:

dict

static build(target_worker, dask_home, worker, mode=0, verbose=0)

Function to build target code on a target Worker.

Parameters:
  • target_worker (str) – module to build

  • dask_home (str) – path to dask home

  • worker – current worker

  • mode – (Default value = 0)

  • verbose – (Default value = 0)

cython_decorators = ['njit']
dask_home = None
static do_works(workers_tree, workers_tree_info)

run of workers

Parameters:
  • chunk – distribution tree

  • chunks

Returns:

env = None
static exec(cmd, path, worker)

execute a command within a subprocess

Parameters:
  • cmd – the str of the command

  • path – the path where to lunch the command

  • worker

Returns:

exec_mono_process(workers_tree, workers_tree_info)[source]

Executes tasks in single-threaded mode.

Return type:

None

Parameters:
  • workers_tree (any) – Distribution tree structure.

  • workers_tree_info (any) – Additional information about the workers.

exec_multi_process(workers_tree, workers_tree_info)[source]

Executes tasks in multiprocessing mode.

Return type:

None

Parameters:
  • workers_tree (any) – Distribution tree structure.

  • workers_tree_info (any) – Additional information about the workers.

static expand(path, base_directory=None)

Expand a given path to an absolute path. :param path: The path to expand. :type path: str :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

get_logs_and_result(*args, verbosity=50, **kwargs)
static get_worker_info(worker_id)

def get_worker_info():

Parameters:

worker_id

Returns:

home_dir = None
is_managed_pc = False
static join(path1, path2)

Join two file paths.

Parameters:
  • path1 (str) – The first file path.

  • path2 (str) – The second file path.

Returns:

The combined file path.

Return type:

str

Raises:

None

logs = None
mode = None
static new(app, mode=None, env=None, verbose=0, worker_id=0, worker='localhost', args=None)

new worker instance :param module: instanciate and load target my_code_worker module :param target_worker: :param target_worker_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)

Returns:

static onerror(func, path, exc_info)

Error handler for shutil.rmtree. If it’s a permission error, make it writable and retry. Otherwise re-raise.

static run(workers={'127.0.0.1': 1}, mode=0, env=None, verbose=None, args=None)
Parameters:
  • app

  • workers

  • mode

  • verbose

  • args

Returns:

share_path = None
start()

Start the worker and print out a message if verbose mode is enabled.

Parameters:

None

Returns:

None

stop()

Returns:

t0 = None
verbose = 1
work_done(df=None)[source]

Handles the post-processing of the DataFrame after work_pool execution.

Return type:

None

Parameters:

df (pd.DataFrame, optional) – The pandas DataFrame to process. Defaults to None.

Raises:

ValueError – If an unsupported output format is specified.

work_pool(x=None)[source]

Processes a single task.

Return type:

DataFrame

Parameters:

x (any, optional) – The task to process. Defaults to None.

Returns:

A pandas DataFrame with the processed results.

Return type:

pd.DataFrame

worker = None
worker_id = None
works(workers_tree, workers_tree_info)[source]

Executes worker tasks based on the distribution tree.

Return type:

float

Parameters:
  • workers_tree (any) – Distribution tree structure.

  • workers_tree_info (any) – Additional information about the workers.

Returns:

Execution time in seconds.

Return type:

float


agi_core.workers.polars-worker

Packages diagram for agi_core.workers.polars_workers
Classes diagram for agi_core.workers.polars_worker

data_worker Framework Callback Functions Module

This module provides the PolarsWorker class, which extends the foundational functionalities of AgiWorker for processing data using multiprocessing or single-threaded approaches.

Classes:

PolarsWorker: Worker class for data processing tasks.

Internal Libraries:

os, warnings

External Libraries:

concurrent.futures.ProcessPoolExecutor pathlib.Path time polars as pl AgiWorker from agi_worker.agi_worker

class agi_core.workers.polars_worker.polars_worker.PolarsWorker[source]

Bases: AgiWorker

PolarsWorker Class

Inherits from AgiWorker to provide extended data processing functionalities.

verbose

Verbosity level for logging.

Type:

int

data_out

Path to the output directory.

Type:

str

worker_id

Identifier for the worker instance.

Type:

int

args

Configuration arguments for the worker.

Type:

dict

static build(target_worker, dask_home, worker, mode=0, verbose=0)

Function to build target code on a target Worker.

Parameters:
  • target_worker (str) – module to build

  • dask_home (str) – path to dask home

  • worker – current worker

  • mode – (Default value = 0)

  • verbose – (Default value = 0)

cython_decorators = ['njit']
dask_home = None
static do_works(workers_tree, workers_tree_info)

run of workers

Parameters:
  • chunk – distribution tree

  • chunks

Returns:

env = None
static exec(cmd, path, worker)

execute a command within a subprocess

Parameters:
  • cmd – the str of the command

  • path – the path where to lunch the command

  • worker

Returns:

exec_mono_process(workers_tree, workers_tree_info)[source]

Executes tasks in single-threaded mode.

Return type:

None

Parameters:
  • workers_tree (any) – Distribution tree structure.

  • workers_tree_info (any) – Additional information about the workers.

exec_multi_process(workers_tree, workers_tree_info)[source]

Executes tasks in multiprocessing mode.

Return type:

None

Parameters:
  • workers_tree (any) – Distribution tree structure.

  • workers_tree_info (any) – Additional information about the workers.

static expand(path, base_directory=None)

Expand a given path to an absolute path. :param path: The path to expand. :type path: str :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

get_logs_and_result(*args, verbosity=50, **kwargs)
static get_worker_info(worker_id)

def get_worker_info():

Parameters:

worker_id

Returns:

home_dir = None
is_managed_pc = False
static join(path1, path2)

Join two file paths.

Parameters:
  • path1 (str) – The first file path.

  • path2 (str) – The second file path.

Returns:

The combined file path.

Return type:

str

Raises:

None

logs = None
mode = None
static new(app, mode=None, env=None, verbose=0, worker_id=0, worker='localhost', args=None)

new worker instance :param module: instanciate and load target my_code_worker module :param target_worker: :param target_worker_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)

Returns:

static onerror(func, path, exc_info)

Error handler for shutil.rmtree. If it’s a permission error, make it writable and retry. Otherwise re-raise.

static run(workers={'127.0.0.1': 1}, mode=0, env=None, verbose=None, args=None)
Parameters:
  • app

  • workers

  • mode

  • verbose

  • args

Returns:

share_path = None
start()

Start the worker and print out a message if verbose mode is enabled.

Parameters:

None

Returns:

None

stop()

Returns:

t0 = None
verbose = 1
work_done(df=None)[source]

Handles the post-processing of the DataFrame after work_pool execution.

Return type:

None

Parameters:

df (pl.DataFrame, optional) – The Polars DataFrame to process. Defaults to None.

Raises:

ValueError – If an unsupported output format is specified.

work_pool(x=None)[source]

Processes a single task.

Return type:

DataFrame

Parameters:

x (any, optional) – The task to process. Defaults to None.

Returns:

A Polars DataFrame with the processed results.

Return type:

pl.DataFrame

worker = None
worker_id = None
works(workers_tree, workers_tree_info)[source]

Executes worker tasks based on the distribution tree.

Return type:

float

Parameters:
  • workers_tree (any) – Distribution tree structure.

  • workers_tree_info (any) – Additional information about the workers.

Returns:

Execution time in seconds.

Return type:

float