agi_core.workers API
Workers Reference
agi_core.workers.agi-worker


agi_worker module
Auteur: Jean-Pierre Morard
- class agi_core.workers.agi_worker.agi_worker.AgiWorker[source]
Bases:
ABC
class AgiWorker v1.0
- static build(target_worker, dask_home, worker, mode=0, verbose=0)[source]
Function to build target code on a target Worker.
- Parameters:
target_worker (str) – module to build
dask_home (str) – path to dask home
worker – current worker
mode – (Default value = 0)
verbose – (Default value = 0)
- cython_decorators = ['njit']
- dask_home = None
- static do_works(workers_tree, workers_tree_info)[source]
run of workers
- Parameters:
chunk – distribution tree
chunks
Returns:
- env = None
- static exec(cmd, path, worker)[source]
execute a command within a subprocess
- Parameters:
cmd – the str of the command
path – the path where to lunch the command
worker
Returns:
- static expand(path, base_directory=None)[source]
Expand a given path to an absolute path. :param path: The path to expand. :type path: str :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)[source]
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- home_dir = None
- is_managed_pc = False
- static join(path1, path2)[source]
Join two file paths.
- Parameters:
path1 (str) – The first file path.
path2 (str) – The second file path.
- Returns:
The combined file path.
- Return type:
str
- Raises:
None –
- logs = None
- mode = None
- static new(app, mode=None, env=None, verbose=0, worker_id=0, worker='localhost', args=None)[source]
new worker instance :param module: instanciate and load target my_code_worker module :param target_worker: :param target_worker_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)
Returns:
- static onerror(func, path, exc_info)[source]
Error handler for shutil.rmtree. If it’s a permission error, make it writable and retry. Otherwise re-raise.
- static run(workers={'127.0.0.1': 1}, mode=0, env=None, verbose=None, args=None)[source]
- Parameters:
app
workers
mode
verbose
args
- Returns:
- start()[source]
Start the worker and print out a message if verbose mode is enabled.
- Parameters:
None
- Returns:
None
- t0 = None
- verbose = 1
- worker = None
- worker_id = None
agi_core.workers.agent-worker


agi_core.workers.dag-worker


- class agi_core.workers.dag_worker.dag_worker.DagWorker[source]
Bases:
AgiWorker
DagWorker Class
- Inherits from:
AgiWorker: Provides foundational worker functionalities.
- static build(target_worker, dask_home, worker, mode=0, verbose=0)
Function to build target code on a target Worker.
- Parameters:
target_worker (str) – module to build
dask_home (str) – path to dask home
worker – current worker
mode – (Default value = 0)
verbose – (Default value = 0)
- cython_decorators = ['njit']
- dask_home = None
- static do_works(workers_tree, workers_tree_info)
run of workers
- Parameters:
chunk – distribution tree
chunks
Returns:
- env = None
- static exec(cmd, path, worker)
execute a command within a subprocess
- Parameters:
cmd – the str of the command
path – the path where to lunch the command
worker
Returns:
- exec_mono_process(workers_tree, workers_tree_info)[source]
Execute tasks in a single process, respecting dependencies, but only for branches assigned to this worker via round-robin.
- exec_multi_process(workers_tree, workers_tree_info)[source]
Execute tasks in multiple threads, distributing branches to workers in round‑robin, then honoring dependencies per worker.
- static expand(path, base_directory=None)
Expand a given path to an absolute path. :param path: The path to expand. :type path: str :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- get_logs_and_result(*args, verbosity=50, **kwargs)
- static get_worker_info(worker_id)
def get_worker_info():
- Parameters:
worker_id
Returns:
- home_dir = None
- is_managed_pc = False
- static join(path1, path2)
Join two file paths.
- Parameters:
path1 (str) – The first file path.
path2 (str) – The second file path.
- Returns:
The combined file path.
- Return type:
str
- Raises:
None –
- logs = None
- mode = None
- static new(app, mode=None, env=None, verbose=0, worker_id=0, worker='localhost', args=None)
new worker instance :param module: instanciate and load target my_code_worker module :param target_worker: :param target_worker_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)
Returns:
- static onerror(func, path, exc_info)
Error handler for shutil.rmtree. If it’s a permission error, make it writable and retry. Otherwise re-raise.
- static run(workers={'127.0.0.1': 1}, mode=0, env=None, verbose=None, args=None)
- Parameters:
app
workers
mode
verbose
args
- Returns:
- start()
Start the worker and print out a message if verbose mode is enabled.
- Parameters:
None
- Returns:
None
- stop()
Returns:
- t0 = None
- topological_sort(dependency_graph)[source]
Perform a topological sort on the dependency graph. Raises ValueError on cycles.
- verbose = 1
- worker = None
- worker_id = None
agi_core.workers.pandas-worker


pandas_worker Framework Callback Functions Module
This module provides the PandasWorker class, which extends the foundational functionalities of AgiWorker for processing data using multiprocessing or single-threaded approaches with pandas.
- Classes:
PandasWorker: Worker class for data processing tasks using pandas.
- Internal Libraries:
os, warnings
- External Libraries:
concurrent.futures.ProcessPoolExecutor pathlib.Path time pandas as pd AgiWorker from agi_worker.agi_worker
- class agi_core.workers.pandas_worker.pandas_worker.PandasWorker[source]
Bases:
AgiWorker
PandasWorker Class
Inherits from
AgiWorker
to provide extended data processing functionalities using pandas.- verbose
Verbosity level for logging.
- Type:
int
- data_out
Path to the output directory.
- Type:
str
- worker_id
Identifier for the worker instance.
- Type:
int
- args
Configuration arguments for the worker.
- Type:
dict
- static build(target_worker, dask_home, worker, mode=0, verbose=0)
Function to build target code on a target Worker.
- Parameters:
target_worker (str) – module to build
dask_home (str) – path to dask home
worker – current worker
mode – (Default value = 0)
verbose – (Default value = 0)
- cython_decorators = ['njit']
- dask_home = None
- static do_works(workers_tree, workers_tree_info)
run of workers
- Parameters:
chunk – distribution tree
chunks
Returns:
- env = None
- static exec(cmd, path, worker)
execute a command within a subprocess
- Parameters:
cmd – the str of the command
path – the path where to lunch the command
worker
Returns:
- exec_mono_process(workers_tree, workers_tree_info)[source]
Executes tasks in single-threaded mode.
- Return type:
None
- Parameters:
workers_tree (any) – Distribution tree structure.
workers_tree_info (any) – Additional information about the workers.
- exec_multi_process(workers_tree, workers_tree_info)[source]
Executes tasks in multiprocessing mode.
- Return type:
None
- Parameters:
workers_tree (any) – Distribution tree structure.
workers_tree_info (any) – Additional information about the workers.
- static expand(path, base_directory=None)
Expand a given path to an absolute path. :param path: The path to expand. :type path: str :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- get_logs_and_result(*args, verbosity=50, **kwargs)
- static get_worker_info(worker_id)
def get_worker_info():
- Parameters:
worker_id
Returns:
- home_dir = None
- is_managed_pc = False
- static join(path1, path2)
Join two file paths.
- Parameters:
path1 (str) – The first file path.
path2 (str) – The second file path.
- Returns:
The combined file path.
- Return type:
str
- Raises:
None –
- logs = None
- mode = None
- static new(app, mode=None, env=None, verbose=0, worker_id=0, worker='localhost', args=None)
new worker instance :param module: instanciate and load target my_code_worker module :param target_worker: :param target_worker_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)
Returns:
- static onerror(func, path, exc_info)
Error handler for shutil.rmtree. If it’s a permission error, make it writable and retry. Otherwise re-raise.
- static run(workers={'127.0.0.1': 1}, mode=0, env=None, verbose=None, args=None)
- Parameters:
app
workers
mode
verbose
args
- Returns:
- start()
Start the worker and print out a message if verbose mode is enabled.
- Parameters:
None
- Returns:
None
- stop()
Returns:
- t0 = None
- verbose = 1
- work_done(df=None)[source]
Handles the post-processing of the DataFrame after work_pool execution.
- Return type:
None
- Parameters:
df (pd.DataFrame, optional) – The pandas DataFrame to process. Defaults to None.
- Raises:
ValueError – If an unsupported output format is specified.
- work_pool(x=None)[source]
Processes a single task.
- Return type:
DataFrame
- Parameters:
x (any, optional) – The task to process. Defaults to None.
- Returns:
A pandas DataFrame with the processed results.
- Return type:
pd.DataFrame
- worker = None
- worker_id = None
- works(workers_tree, workers_tree_info)[source]
Executes worker tasks based on the distribution tree.
- Return type:
float
- Parameters:
workers_tree (any) – Distribution tree structure.
workers_tree_info (any) – Additional information about the workers.
- Returns:
Execution time in seconds.
- Return type:
float
agi_core.workers.polars-worker


data_worker Framework Callback Functions Module
This module provides the PolarsWorker class, which extends the foundational functionalities of AgiWorker for processing data using multiprocessing or single-threaded approaches.
- Classes:
PolarsWorker: Worker class for data processing tasks.
- Internal Libraries:
os, warnings
- External Libraries:
concurrent.futures.ProcessPoolExecutor pathlib.Path time polars as pl AgiWorker from agi_worker.agi_worker
- class agi_core.workers.polars_worker.polars_worker.PolarsWorker[source]
Bases:
AgiWorker
PolarsWorker Class
Inherits from
AgiWorker
to provide extended data processing functionalities.- verbose
Verbosity level for logging.
- Type:
int
- data_out
Path to the output directory.
- Type:
str
- worker_id
Identifier for the worker instance.
- Type:
int
- args
Configuration arguments for the worker.
- Type:
dict
- static build(target_worker, dask_home, worker, mode=0, verbose=0)
Function to build target code on a target Worker.
- Parameters:
target_worker (str) – module to build
dask_home (str) – path to dask home
worker – current worker
mode – (Default value = 0)
verbose – (Default value = 0)
- cython_decorators = ['njit']
- dask_home = None
- static do_works(workers_tree, workers_tree_info)
run of workers
- Parameters:
chunk – distribution tree
chunks
Returns:
- env = None
- static exec(cmd, path, worker)
execute a command within a subprocess
- Parameters:
cmd – the str of the command
path – the path where to lunch the command
worker
Returns:
- exec_mono_process(workers_tree, workers_tree_info)[source]
Executes tasks in single-threaded mode.
- Return type:
None
- Parameters:
workers_tree (any) – Distribution tree structure.
workers_tree_info (any) – Additional information about the workers.
- exec_multi_process(workers_tree, workers_tree_info)[source]
Executes tasks in multiprocessing mode.
- Return type:
None
- Parameters:
workers_tree (any) – Distribution tree structure.
workers_tree_info (any) – Additional information about the workers.
- static expand(path, base_directory=None)
Expand a given path to an absolute path. :param path: The path to expand. :type path: str :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- get_logs_and_result(*args, verbosity=50, **kwargs)
- static get_worker_info(worker_id)
def get_worker_info():
- Parameters:
worker_id
Returns:
- home_dir = None
- is_managed_pc = False
- static join(path1, path2)
Join two file paths.
- Parameters:
path1 (str) – The first file path.
path2 (str) – The second file path.
- Returns:
The combined file path.
- Return type:
str
- Raises:
None –
- logs = None
- mode = None
- static new(app, mode=None, env=None, verbose=0, worker_id=0, worker='localhost', args=None)
new worker instance :param module: instanciate and load target my_code_worker module :param target_worker: :param target_worker_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)
Returns:
- static onerror(func, path, exc_info)
Error handler for shutil.rmtree. If it’s a permission error, make it writable and retry. Otherwise re-raise.
- static run(workers={'127.0.0.1': 1}, mode=0, env=None, verbose=None, args=None)
- Parameters:
app
workers
mode
verbose
args
- Returns:
- start()
Start the worker and print out a message if verbose mode is enabled.
- Parameters:
None
- Returns:
None
- stop()
Returns:
- t0 = None
- verbose = 1
- work_done(df=None)[source]
Handles the post-processing of the DataFrame after work_pool execution.
- Return type:
None
- Parameters:
df (pl.DataFrame, optional) – The Polars DataFrame to process. Defaults to None.
- Raises:
ValueError – If an unsupported output format is specified.
- work_pool(x=None)[source]
Processes a single task.
- Return type:
DataFrame
- Parameters:
x (any, optional) – The task to process. Defaults to None.
- Returns:
A Polars DataFrame with the processed results.
- Return type:
pl.DataFrame
- worker = None
- worker_id = None
- works(workers_tree, workers_tree_info)[source]
Executes worker tasks based on the distribution tree.
- Return type:
float
- Parameters:
workers_tree (any) – Distribution tree structure.
workers_tree_info (any) – Additional information about the workers.
- Returns:
Execution time in seconds.
- Return type:
float