agi_node package

class agi_node.dag_worker.dag_worker.DagWorker[source]

Bases: BaseWorker

Minimal-change DAG worker:
  • Keeps your existing structure

  • Adds a tiny signature-aware _invoke() so custom methods can vary in signature

  • Uses _invoke() at the single call site in ._exec_multi_process()

args_dump_mode: ClassVar[str] = 'json'
args_dumper: ClassVar[Callable[..., None] | None] = None
args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
args_loader: ClassVar[Callable[..., Any] | None] = None
args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
as_dict(mode=None)
Return type:

dict[str, Any]

break()

Signal the service loop to exit on this worker.

Return type:

bool

static break_loop()

Signal the service loop to exit on this worker.

Return type:

bool

default_settings_path: ClassVar[str] = 'app_settings.toml'
default_settings_section: ClassVar[str] = 'args'
env: Optional[AgiEnv] = None
static expand(path, base_directory=None)

Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

classmethod from_toml(env, settings_path=None, section=None, **overrides)
Return type:

BaseWorker

get_work(fn_name, args, prev_result)[source]

Back-compat: delegate to the signature-aware invoker.

static loop(*, poll_interval=None)

Run a long-lived service loop on this worker until signalled to stop.

The derived worker can implement a loop method accepting either zero arguments or a single stop_event argument. When the method signature accepts stop_event (keyword stop_event or should_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. Returning False from the worker method requests termination of the loop.

Return type:

Dict[str, Any]

managed_pc_home_suffix: ClassVar[str] = 'MyApp'
managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
static normalize_dataset_path(data_path)

Normalise any dataset directory input so workers rely on consistent paths.

Return type:

str

prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)

Create (and optionally reset) a deterministic output directory.

Return type:

Path

setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
Return type:

Any

setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)

Prepare normalised input/output dataset paths without relying on worker args.

Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a Path (output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.

Return type:

SimpleNamespace

static start(worker_inst)

Invoke the concrete worker’s start hook once initialised.

stop()

Returns:

to_toml(settings_path=None, section=None, create_missing=True)
Return type:

None

verbose = 1
works(workers_plan, workers_plan_metadata)[source]

Your existing entry point; keep as-is, just call multiprocess path for mode 4, etc.

pandas_worker Framework Callback Functions Module

This module provides the PandasWorker class, which extends the foundational functionalities of BaseWorker for processing data using multiprocessing or single-threaded approaches with pandas.

Classes:

PandasWorker: Worker class for data processing tasks using pandas.

Internal Libraries:

os, warnings

External Libraries:

concurrent.futures.ProcessPoolExecutor pathlib.Path time pandas as pd BaseWorker from node import BaseWorker.node

class agi_node.pandas_worker.pandas_worker.PandasWorker[source]

Bases: BaseWorker

PandasWorker Class

Inherits from BaseWorker to provide extended data processing functionalities using pandas.

verbose

Verbosity level for logging.

Type:

int

data_out

Path to the output directory.

Type:

str

worker_id

Identifier for the worker instance.

Type:

int

args

Configuration arguments for the worker.

Type:

dict

args_dump_mode: ClassVar[str] = 'json'
args_dumper: ClassVar[Callable[..., None] | None] = None
args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
args_loader: ClassVar[Callable[..., Any] | None] = None
args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
as_dict(mode=None)
Return type:

dict[str, Any]

break()

Signal the service loop to exit on this worker.

Return type:

bool

static break_loop()

Signal the service loop to exit on this worker.

Return type:

bool

default_settings_path: ClassVar[str] = 'app_settings.toml'
default_settings_section: ClassVar[str] = 'args'
env: Optional[AgiEnv] = None
static expand(path, base_directory=None)

Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

classmethod from_toml(env, settings_path=None, section=None, **overrides)
Return type:

BaseWorker

static loop(*, poll_interval=None)

Run a long-lived service loop on this worker until signalled to stop.

The derived worker can implement a loop method accepting either zero arguments or a single stop_event argument. When the method signature accepts stop_event (keyword stop_event or should_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. Returning False from the worker method requests termination of the loop.

Return type:

Dict[str, Any]

managed_pc_home_suffix: ClassVar[str] = 'MyApp'
managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
static normalize_dataset_path(data_path)

Normalise any dataset directory input so workers rely on consistent paths.

Return type:

str

prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)

Create (and optionally reset) a deterministic output directory.

Return type:

Path

setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
Return type:

Any

setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)

Prepare normalised input/output dataset paths without relying on worker args.

Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a Path (output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.

Return type:

SimpleNamespace

static start(worker_inst)

Invoke the concrete worker’s start hook once initialised.

stop()

Returns:

to_toml(settings_path=None, section=None, create_missing=True)
Return type:

None

verbose = 1
work_done(df=None)[source]

Handles the post-processing of the DataFrame after work_pool execution.

Parameters:

df (pd.DataFrame, optional) – The pandas DataFrame to process. Defaults to None.

Raises:

ValueError – If an unsupported output format is specified.

Return type:

None

work_pool(x=None)[source]

Processes a single task.

Parameters:

x (any, optional) – The task to process. Defaults to None.

Returns:

A pandas DataFrame with the processed results.

Return type:

pd.DataFrame

works(workers_plan, workers_plan_metadata)[source]

Executes worker tasks based on the distribution tree.

Parameters:
  • workers_plan (any) – Distribution tree structure.

  • workers_plan_metadata (any) – Additional information about the workers.

Returns:

Execution time in seconds.

Return type:

float

data_worker Framework Callback Functions Module

This module provides the PolarsWorker class, which extends the foundational functionalities of BaseWorker for processing data using multiprocessing or single-threaded approaches.

Classes:

PolarsWorker: Worker class for data processing tasks.

Internal Libraries:

os, warnings

External Libraries:

concurrent.futures.ProcessPoolExecutor pathlib.Path time polars as pl BaseWorker from node import BaseWorker

class agi_node.polars_worker.polars_worker.PolarsWorker[source]

Bases: BaseWorker

PolarsWorker Class

Inherits from BaseWorker to provide extended data processing functionalities.

verbose

Verbosity level for logging.

Type:

int

data_out

Path to the output directory.

Type:

str

worker_id

Identifier for the worker instance.

Type:

int

args

Configuration arguments for the worker.

Type:

dict

args_dump_mode: ClassVar[str] = 'json'
args_dumper: ClassVar[Callable[..., None] | None] = None
args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
args_loader: ClassVar[Callable[..., Any] | None] = None
args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
as_dict(mode=None)
Return type:

dict[str, Any]

break()

Signal the service loop to exit on this worker.

Return type:

bool

static break_loop()

Signal the service loop to exit on this worker.

Return type:

bool

default_settings_path: ClassVar[str] = 'app_settings.toml'
default_settings_section: ClassVar[str] = 'args'
env: Optional[AgiEnv] = None
static expand(path, base_directory=None)

Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

classmethod from_toml(env, settings_path=None, section=None, **overrides)
Return type:

BaseWorker

static loop(*, poll_interval=None)

Run a long-lived service loop on this worker until signalled to stop.

The derived worker can implement a loop method accepting either zero arguments or a single stop_event argument. When the method signature accepts stop_event (keyword stop_event or should_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. Returning False from the worker method requests termination of the loop.

Return type:

Dict[str, Any]

managed_pc_home_suffix: ClassVar[str] = 'MyApp'
managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
static normalize_dataset_path(data_path)

Normalise any dataset directory input so workers rely on consistent paths.

Return type:

str

prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)

Create (and optionally reset) a deterministic output directory.

Return type:

Path

setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
Return type:

Any

setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)

Prepare normalised input/output dataset paths without relying on worker args.

Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a Path (output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.

Return type:

SimpleNamespace

static start(worker_inst)

Invoke the concrete worker’s start hook once initialised.

stop()

Returns:

to_toml(settings_path=None, section=None, create_missing=True)
Return type:

None

verbose = 1
work_done(df=None)[source]

Handles the post-processing of the DataFrame after work_pool execution.

Parameters:

df (pl.DataFrame, optional) – The Polars DataFrame to process. Defaults to None.

Raises:

ValueError – If an unsupported output format is specified.

Return type:

None

work_pool(x=None)[source]

Processes a single task.

Parameters:

x (any, optional) – The task to process. Defaults to None.

Returns:

A Polars DataFrame with the processed results.

Return type:

pl.DataFrame

works(workers_plan, workers_plan_metadata)[source]

Executes worker tasks based on the distribution tree.

Parameters:
  • workers_plan (any) – Distribution tree structure.

  • workers_plan_metadata (any) – Additional information about the workers.

Returns:

Execution time in seconds.

Return type:

float