agi-node API

Path handling

Workers resolve any dataset or workspace URI through BaseWorker.normalize_data_uri before touching the filesystem. The helper accepts str or pathlib.Path inputs and normalises them as follows:

  • UNC-style shares on Windows (for example \\server\share) are preserved by wrapping them in PureWindowsPath and returning the os.path.normpath representation so network drives keep their double backslashes.

  • Every other value is first joined against env.agi_share_path_abs when the environment exposes one; otherwise the helper falls back to the current home directory. This lets configuration files ship entries such as <app>/dataset while still resolving to the correct worker share ($HOME/clustershare/<app>/dataset on macOS, the mounted share on Linux remotes, and so on).

  • Path.resolve(strict=False) is applied to keep symlinks or bind mounts created by installers while still accepting directories that are created later during the run.

  • On non-managed Windows installs the helper also attempts to map the dataset under \\127.0.0.1\… via net use so local workers gain access to the same share paths used on Linux.

  • The function emits POSIX-style strings on Unix-like systems and uses os.path.normpath on Windows so downstream code can hand the value to shell commands or pathlib without further tweaks.

All built-in workers, including flight_worker and mycode_worker, call this helper in start and work_pool to populate self.args.data_uri and any per-file path passed to the pool. Extending these workers means you get consistent path semantics across local and distributed executions without copy-pasting platform-specific logic.

Argument helpers

Recent updates to BaseWorker standardise how workers load, merge, and persist their argument models. Every subclass can opt into the following hooks:

  • default_settings_path and default_settings_section control the TOML source used by from_toml / to_toml.

  • args_loader and args_merger are callables that fetch and combine raw settings with user overrides before instantiating the worker.

  • args_ensure_defaults lets workers patch derived values (for example, normalising paths) after the merge but before instantiation.

  • args_dumper and args_dump_mode define how to_toml emits the active configuration, enabling round-trips back into app_settings.toml.

If these helpers live in the worker module (for example load_args or dump_args defined alongside the class) or inside a sibling *_args/app_args module, BaseWorker auto-binds them during class creation. That lets most apps drop the explicit args_loader = boilerplate while still allowing manual overrides for custom integrations.

Managed PC path remapping

  • managed_pc_path_fields lists argument attributes that should be remapped to the managed-machine workspace (~/MyApp by default) when AgiEnv reports a managed PC.

  • managed_pc_home_suffix customises the managed workspace folder name if your deployment uses something other than MyApp.

  • BaseWorker.from_toml applies the remapping automatically; when instantiating a worker manually, use setup_args to apply defaults and remap paths in a single call.

  • setup_args optionally accepts output_field (e.g. "data_uri") along with output_subdir, output_attr, output_clean and output_parents_up so managers can prepare their output directories without repeating boilerplate.

Output directory helpers

  • prepare_output_dir centralises the setup of manager-side output folders (defaulting to dataframe). Hand it the base path you want to target and it normalises the path, clears old contents when auto_clean_data_out is enabled, creates the directory, and stores it on self.data_out unless you override the target attribute.

With these attributes in place, BaseWorker.from_toml produces a configured instance and BaseWorker.to_toml writes the updated schema without each app copying boilerplate. BaseWorker.as_dict exposes a serialisable payload for Streamlit pages and API consumers, while _extend_payload stays available for apps that need to enrich the exported structure.

Reference

dag_worker

Classes diagram for dag_worker
class agi_node.dag_worker.dag_worker.DagWorker[source]

Bases: BaseWorker

Minimal-change DAG worker:
  • Keeps your existing structure

  • Adds a tiny signature-aware _invoke() so custom methods can vary in signature

  • Uses _invoke() at the single call site in ._exec_multi_process()

args_dump_mode: ClassVar[str] = 'json'
args_dumper: ClassVar[Callable[..., None] | None] = None
args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
args_loader: ClassVar[Callable[..., Any] | None] = None
args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
as_dict(mode=None)
Return type:

dict[str, Any]

break()

Signal the service loop to exit on this worker.

Return type:

bool

static break_loop()

Signal the service loop to exit on this worker.

Return type:

bool

default_settings_path: ClassVar[str] = 'app_settings.toml'
default_settings_section: ClassVar[str] = 'args'
env: Optional[AgiEnv] = None
static expand(path, base_directory=None)

Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

classmethod from_toml(env, settings_path=None, section=None, **overrides)
Return type:

BaseWorker

get_work(fn_name, args, prev_result)[source]

Back-compat: delegate to the signature-aware invoker.

static loop(*, poll_interval=None)

Run a long-lived service loop on this worker until signalled to stop.

The derived worker can implement a loop method accepting either zero arguments or a single stop_event argument. When the method signature accepts stop_event (keyword stop_event or should_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. Returning False from the worker method requests termination of the loop.

Return type:

Dict[str, Any]

managed_pc_home_suffix: ClassVar[str] = 'MyApp'
managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
static normalize_dataset_path(data_path)

Normalise any dataset directory input so workers rely on consistent paths.

Return type:

str

prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)

Create (and optionally reset) a deterministic output directory.

Return type:

Path

setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
Return type:

Any

setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)

Prepare normalised input/output dataset paths without relying on worker args.

Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a Path (output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.

Return type:

SimpleNamespace

static start(worker_inst)

Invoke the concrete worker’s start hook once initialised.

stop()

Returns:

to_toml(settings_path=None, section=None, create_missing=True)
Return type:

None

verbose = 1
works(workers_plan, workers_plan_metadata)[source]

Your existing entry point; keep as-is, just call multiprocess path for mode 4, etc.

pandas_worker

Classes diagram for pandas_worker

pandas_worker Framework Callback Functions Module

This module provides the PandasWorker class, which extends the foundational functionalities of BaseWorker for processing data using multiprocessing or single-threaded approaches with pandas.

Classes:

PandasWorker: Worker class for data processing tasks using pandas.

Internal Libraries:

os, warnings

External Libraries:

concurrent.futures.ProcessPoolExecutor pathlib.Path time pandas as pd BaseWorker from node import BaseWorker.node

class agi_node.pandas_worker.pandas_worker.PandasWorker[source]

Bases: BaseWorker

PandasWorker Class

Inherits from BaseWorker to provide extended data processing functionalities using pandas.

verbose

Verbosity level for logging.

Type:

int

data_out

Path to the output directory.

Type:

str

worker_id

Identifier for the worker instance.

Type:

int

args

Configuration arguments for the worker.

Type:

dict

args_dump_mode: ClassVar[str] = 'json'
args_dumper: ClassVar[Callable[..., None] | None] = None
args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
args_loader: ClassVar[Callable[..., Any] | None] = None
args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
as_dict(mode=None)
Return type:

dict[str, Any]

break()

Signal the service loop to exit on this worker.

Return type:

bool

static break_loop()

Signal the service loop to exit on this worker.

Return type:

bool

default_settings_path: ClassVar[str] = 'app_settings.toml'
default_settings_section: ClassVar[str] = 'args'
env: Optional[AgiEnv] = None
static expand(path, base_directory=None)

Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

classmethod from_toml(env, settings_path=None, section=None, **overrides)
Return type:

BaseWorker

static loop(*, poll_interval=None)

Run a long-lived service loop on this worker until signalled to stop.

The derived worker can implement a loop method accepting either zero arguments or a single stop_event argument. When the method signature accepts stop_event (keyword stop_event or should_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. Returning False from the worker method requests termination of the loop.

Return type:

Dict[str, Any]

managed_pc_home_suffix: ClassVar[str] = 'MyApp'
managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
static normalize_dataset_path(data_path)

Normalise any dataset directory input so workers rely on consistent paths.

Return type:

str

prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)

Create (and optionally reset) a deterministic output directory.

Return type:

Path

setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
Return type:

Any

setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)

Prepare normalised input/output dataset paths without relying on worker args.

Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a Path (output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.

Return type:

SimpleNamespace

static start(worker_inst)

Invoke the concrete worker’s start hook once initialised.

stop()

Returns:

to_toml(settings_path=None, section=None, create_missing=True)
Return type:

None

verbose = 1
work_done(df=None)[source]

Handles the post-processing of the DataFrame after work_pool execution.

Parameters:

df (pd.DataFrame, optional) – The pandas DataFrame to process. Defaults to None.

Raises:

ValueError – If an unsupported output format is specified.

Return type:

None

work_pool(x=None)[source]

Processes a single task.

Parameters:

x (any, optional) – The task to process. Defaults to None.

Returns:

A pandas DataFrame with the processed results.

Return type:

pd.DataFrame

works(workers_plan, workers_plan_metadata)[source]

Executes worker tasks based on the distribution tree.

Parameters:
  • workers_plan (any) – Distribution tree structure.

  • workers_plan_metadata (any) – Additional information about the workers.

Returns:

Execution time in seconds.

Return type:

float

polars_worker

Classes diagram for polars_worker

data_worker Framework Callback Functions Module

This module provides the PolarsWorker class, which extends the foundational functionalities of BaseWorker for processing data using multiprocessing or single-threaded approaches.

Classes:

PolarsWorker: Worker class for data processing tasks.

Internal Libraries:

os, warnings

External Libraries:

concurrent.futures.ProcessPoolExecutor pathlib.Path time polars as pl BaseWorker from node import BaseWorker

class agi_node.polars_worker.polars_worker.PolarsWorker[source]

Bases: BaseWorker

PolarsWorker Class

Inherits from BaseWorker to provide extended data processing functionalities.

verbose

Verbosity level for logging.

Type:

int

data_out

Path to the output directory.

Type:

str

worker_id

Identifier for the worker instance.

Type:

int

args

Configuration arguments for the worker.

Type:

dict

args_dump_mode: ClassVar[str] = 'json'
args_dumper: ClassVar[Callable[..., None] | None] = None
args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
args_loader: ClassVar[Callable[..., Any] | None] = None
args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
as_dict(mode=None)
Return type:

dict[str, Any]

break()

Signal the service loop to exit on this worker.

Return type:

bool

static break_loop()

Signal the service loop to exit on this worker.

Return type:

bool

default_settings_path: ClassVar[str] = 'app_settings.toml'
default_settings_section: ClassVar[str] = 'args'
env: Optional[AgiEnv] = None
static expand(path, base_directory=None)

Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

classmethod from_toml(env, settings_path=None, section=None, **overrides)
Return type:

BaseWorker

static loop(*, poll_interval=None)

Run a long-lived service loop on this worker until signalled to stop.

The derived worker can implement a loop method accepting either zero arguments or a single stop_event argument. When the method signature accepts stop_event (keyword stop_event or should_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. Returning False from the worker method requests termination of the loop.

Return type:

Dict[str, Any]

managed_pc_home_suffix: ClassVar[str] = 'MyApp'
managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
static normalize_dataset_path(data_path)

Normalise any dataset directory input so workers rely on consistent paths.

Return type:

str

prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)

Create (and optionally reset) a deterministic output directory.

Return type:

Path

setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
Return type:

Any

setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)

Prepare normalised input/output dataset paths without relying on worker args.

Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a Path (output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.

Return type:

SimpleNamespace

static start(worker_inst)

Invoke the concrete worker’s start hook once initialised.

stop()

Returns:

to_toml(settings_path=None, section=None, create_missing=True)
Return type:

None

verbose = 1
work_done(df=None)[source]

Handles the post-processing of the DataFrame after work_pool execution.

Parameters:

df (pl.DataFrame, optional) – The Polars DataFrame to process. Defaults to None.

Raises:

ValueError – If an unsupported output format is specified.

Return type:

None

work_pool(x=None)[source]

Processes a single task.

Parameters:

x (any, optional) – The task to process. Defaults to None.

Returns:

A Polars DataFrame with the processed results.

Return type:

pl.DataFrame

works(workers_plan, workers_plan_metadata)[source]

Executes worker tasks based on the distribution tree.

Parameters:
  • workers_plan (any) – Distribution tree structure.

  • workers_plan_metadata (any) – Additional information about the workers.

Returns:

Execution time in seconds.

Return type:

float