agi_core.workers API
Workers Reference
agi_core.workers.agi-worker


agi_worker module
Auteur: Jean-Pierre Morard
- class agi_core.workers.agi_worker.agi_worker.AgiWorker[source]
Bases:
ABC
class AgiWorker v1.0
- static build(app, target_worker, dask_home, worker, mode=0, verbose=0)[source]
Function to build target code on a my_code_AgiWorker.
- Parameters:
app (str) – app to build
target_worker (str) – module to build
dask_home (str) – path to dask home
worker – current worker
mode – (Default value = 0)
verbose – (Default value = 0)
Returns:
- cython_decorators = ['njit']
- dask_home = None
- static do_works(workers_tree, workers_tree_info)[source]
run of workers
- Parameters:
chunk – distribution tree
chunks
Returns:
- static exec(cmd, path, worker)[source]
execute a command within a subprocess
- Parameters:
cmd – the str of the command
path – the path where to lunch the command
worker
Returns:
- static expand(path, base_directory=None)[source]
Expand a given path to an absolute path.
- Parameters:
path (str) – The path to expand.
base_directory (str, optional) – The base directory to use for expanding the path. Defaults to None.
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)[source]
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- home_dir = None
- is_managed_pc = False
- static join(path1, path2)[source]
Join two file paths.
- Parameters:
path1 (str) – The first file path.
path2 (str) – The second file path.
- Returns:
The combined file path.
- Return type:
str
- Raises:
None –
- logs = None
- mode = None
- static new(target_module, target_class, target_package, mode=None, verbose=0, worker_id=0, worker='localhost', env=None, args=None)[source]
new worker instance :param module: instanciate and load target my_code_worker module :param target_module: :param target_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)
Returns:
- onerror(path, exc_info)[source]
Error handler for shutil.rmtree.
If the error is due to an access error (read only file) it attempts to add write permission and then retries.
If the error is for another reason it re-raises the error.
Usage : shutil.rmtree(path, onerror=onerror)
- Parameters:
func
path
exc_info
Returns:
- start()[source]
Start the worker and print out a message if verbose mode is enabled.
- Parameters:
None
- Returns:
None
- t0 = None
- verbose = 1
- worker = None
- worker_id = None
agi_core.workers.agent-worker


agi_core.workers.dag-worker


- class agi_core.workers.dag_worker.dag_worker.AgiDagWorker[source]
Bases:
AgiWorker
AgiDagWorker Class
- Inherits from:
AgiWorker: Provides foundational worker functionalities.
- static build(app, target_worker, dask_home, worker, mode=0, verbose=0)
Function to build target code on a my_code_AgiWorker.
- Parameters:
app (str) – app to build
target_worker (str) – module to build
dask_home (str) – path to dask home
worker – current worker
mode – (Default value = 0)
verbose – (Default value = 0)
Returns:
- cython_decorators = ['njit']
- dask_home = None
- static do_works(workers_tree, workers_tree_info)
run of workers
- Parameters:
chunk – distribution tree
chunks
Returns:
- static exec(cmd, path, worker)
execute a command within a subprocess
- Parameters:
cmd – the str of the command
path – the path where to lunch the command
worker
Returns:
- exec_mono_process(workers_tree, workers_tree_info)[source]
Execute tasks in a single process, respecting dependencies.
- exec_multi_process(workers_tree, workers_tree_info)[source]
Execute tasks in multiple threads, respecting dependencies.
- static expand(path, base_directory=None)
Expand a given path to an absolute path.
- Parameters:
path (str) – The path to expand.
base_directory (str, optional) – The base directory to use for expanding the path. Defaults to None.
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- static get_worker_info(worker_id)
def get_worker_info():
- Parameters:
worker_id
Returns:
- home_dir = None
- is_managed_pc = False
- static join(path1, path2)
Join two file paths.
- Parameters:
path1 (str) – The first file path.
path2 (str) – The second file path.
- Returns:
The combined file path.
- Return type:
str
- Raises:
None –
- logs = None
- mode = None
- static new(target_module, target_class, target_package, mode=None, verbose=0, worker_id=0, worker='localhost', env=None, args=None)
new worker instance :param module: instanciate and load target my_code_worker module :param target_module: :param target_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)
Returns:
- static normalize_path(path)
- onerror(path, exc_info)
Error handler for shutil.rmtree.
If the error is due to an access error (read only file) it attempts to add write permission and then retries.
If the error is for another reason it re-raises the error.
Usage : shutil.rmtree(path, onerror=onerror)
- Parameters:
func
path
exc_info
Returns:
- static run(app, workers={'127.0.0.1': 1}, mode=0, verbose=3, args=None)
- start()
Start the worker and print out a message if verbose mode is enabled.
- Parameters:
None
- Returns:
None
- stop()
Returns:
- t0 = None
- topological_sort(dependency_graph)[source]
Perform a topological sort on the dependency graph.
- Parameters:
dependency_graph (dict) – A dictionary where keys are functions and values are lists of dependent functions.
- Returns:
A list of functions in topologically sorted order.
- Return type:
list
- Raises:
ValueError – If a cycle is detected in the dependencies.
- verbose = 1
- worker = None
- worker_id = None
agi_core.workers.data-worker


data_worker Framework Callback Functions Module
This module provides the AgiDataWorker class, which extends the foundational functionalities of AgiWorker for processing data using multiprocessing or single-threaded approaches.
- Classes:
AgiDataWorker: Worker class for data processing tasks.
- Internal Libraries:
os, warnings
- External Libraries:
concurrent.futures.ProcessPoolExecutor pathlib.Path time polars as pl AgiWorker from agi_worker.agi_worker
- class agi_core.workers.data_worker.data_worker.AgiDataWorker[source]
Bases:
AgiWorker
AgiDataWorker Class
Inherits from
AgiWorker
to provide extended data processing functionalities.- verbose
Verbosity level for logging.
- Type:
int
- data_out
Path to the output directory.
- Type:
str
- worker_id
Identifier for the worker instance.
- Type:
int
- args
Configuration arguments for the worker.
- Type:
dict
- static build(app, target_worker, dask_home, worker, mode=0, verbose=0)
Function to build target code on a my_code_AgiWorker.
- Parameters:
app (str) – app to build
target_worker (str) – module to build
dask_home (str) – path to dask home
worker – current worker
mode – (Default value = 0)
verbose – (Default value = 0)
Returns:
- cython_decorators = ['njit']
- dask_home = None
- static do_works(workers_tree, workers_tree_info)
run of workers
- Parameters:
chunk – distribution tree
chunks
Returns:
- static exec(cmd, path, worker)
execute a command within a subprocess
- Parameters:
cmd – the str of the command
path – the path where to lunch the command
worker
Returns:
- exec_mono_process(workers_tree, workers_tree_info)[source]
Executes tasks in single-threaded mode.
- Return type:
None
- Parameters:
workers_tree (any) – Distribution tree structure.
workers_tree_info (any) – Additional information about the workers.
- exec_multi_process(workers_tree, workers_tree_info)[source]
Executes tasks in multiprocessing mode.
- Return type:
None
- Parameters:
workers_tree (any) – Distribution tree structure.
workers_tree_info (any) – Additional information about the workers.
- static expand(path, base_directory=None)
Expand a given path to an absolute path.
- Parameters:
path (str) – The path to expand.
base_directory (str, optional) – The base directory to use for expanding the path. Defaults to None.
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- static get_worker_info(worker_id)
def get_worker_info():
- Parameters:
worker_id
Returns:
- home_dir = None
- is_managed_pc = False
- static join(path1, path2)
Join two file paths.
- Parameters:
path1 (str) – The first file path.
path2 (str) – The second file path.
- Returns:
The combined file path.
- Return type:
str
- Raises:
None –
- logs = None
- mode = None
- static new(target_module, target_class, target_package, mode=None, verbose=0, worker_id=0, worker='localhost', env=None, args=None)
new worker instance :param module: instanciate and load target my_code_worker module :param target_module: :param target_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)
Returns:
- static normalize_path(path)
- onerror(path, exc_info)
Error handler for shutil.rmtree.
If the error is due to an access error (read only file) it attempts to add write permission and then retries.
If the error is for another reason it re-raises the error.
Usage : shutil.rmtree(path, onerror=onerror)
- Parameters:
func
path
exc_info
Returns:
- static run(app, workers={'127.0.0.1': 1}, mode=0, verbose=3, args=None)
- start()
Start the worker and print out a message if verbose mode is enabled.
- Parameters:
None
- Returns:
None
- stop()
Returns:
- t0 = None
- verbose = 1
- work_done(df=None)[source]
Handles the post-processing of the DataFrame after work_pool execution.
- Return type:
None
- Parameters:
df (pl.DataFrame, optional) – The Polars DataFrame to process. Defaults to None.
- Raises:
ValueError – If an unsupported output format is specified.
- work_pool(x=None)[source]
Processes a single task.
- Return type:
DataFrame
- Parameters:
x (any, optional) – The task to process. Defaults to None.
- Returns:
A Polars DataFrame with the processed results.
- Return type:
pl.DataFrame
- worker = None
- worker_id = None
- works(workers_tree, workers_tree_info)[source]
Executes worker tasks based on the distribution tree.
- Return type:
float
- Parameters:
workers_tree (any) – Distribution tree structure.
workers_tree_info (any) – Additional information about the workers.
- Returns:
Execution time in seconds.
- Return type:
float