agi-node API
Path handling
Workers resolve any dataset or workspace URI through
BaseWorker.normalize_data_uri before touching the filesystem. The helper
accepts str or pathlib.Path inputs and normalises them as follows:
UNC-style shares on Windows (for example
\\server\share) are preserved by wrapping them inPureWindowsPathand returning theos.path.normpathrepresentation so network drives keep their double backslashes.Every other value is first joined against
env.agi_share_path_abswhen the environment exposes one; otherwise the helper falls back to the current home directory. This lets configuration files ship entries such as<app>/datasetwhile still resolving to the correct worker share ($HOME/clustershare/<app>/dataseton macOS, the mounted share on Linux remotes, and so on).Path.resolve(strict=False)is applied to keep symlinks or bind mounts created by installers while still accepting directories that are created later during the run.On non-managed Windows installs the helper also attempts to map the dataset under
\\127.0.0.1\…vianet useso local workers gain access to the same share paths used on Linux.The function emits POSIX-style strings on Unix-like systems and uses
os.path.normpathon Windows so downstream code can hand the value to shell commands orpathlibwithout further tweaks.
All built-in workers, including flight_worker and mycode_worker, call
this helper in start and work_pool to populate self.args.data_uri
and any per-file path passed to the pool. Extending these workers means you get
consistent path semantics across local and distributed executions without
copy-pasting platform-specific logic.
Argument helpers
Recent updates to BaseWorker standardise how workers load, merge, and persist
their argument models. Every subclass can opt into the following hooks:
default_settings_pathanddefault_settings_sectioncontrol the TOML source used byfrom_toml/to_toml.args_loaderandargs_mergerare callables that fetch and combine raw settings with user overrides before instantiating the worker.args_ensure_defaultslets workers patch derived values (for example, normalising paths) after the merge but before instantiation.args_dumperandargs_dump_modedefine howto_tomlemits the active configuration, enabling round-trips back intoapp_settings.toml.
If these helpers live in the worker module (for example load_args or
dump_args defined alongside the class) or inside a sibling *_args/app_args
module, BaseWorker auto-binds them during class creation. That lets most apps
drop the explicit args_loader = … boilerplate while still allowing manual
overrides for custom integrations.
Managed PC path remapping
managed_pc_path_fieldslists argument attributes that should be remapped to the managed-machine workspace (~/MyAppby default) whenAgiEnvreports a managed PC.managed_pc_home_suffixcustomises the managed workspace folder name if your deployment uses something other thanMyApp.BaseWorker.from_tomlapplies the remapping automatically; when instantiating a worker manually, usesetup_argsto apply defaults and remap paths in a single call.setup_argsoptionally acceptsoutput_field(e.g."data_uri") along withoutput_subdir,output_attr,output_cleanandoutput_parents_upso managers can prepare their output directories without repeating boilerplate.
Output directory helpers
prepare_output_dircentralises the setup of manager-side output folders (defaulting todataframe). Hand it the base path you want to target and it normalises the path, clears old contents whenauto_clean_data_outis enabled, creates the directory, and stores it onself.data_outunless you override the target attribute.
With these attributes in place, BaseWorker.from_toml produces a configured
instance and BaseWorker.to_toml writes the updated schema without each app
copying boilerplate. BaseWorker.as_dict exposes a serialisable payload for
Streamlit pages and API consumers, while _extend_payload stays available for
apps that need to enrich the exported structure.
Reference
dag_worker
- class agi_node.dag_worker.dag_worker.DagWorker[source]
Bases:
BaseWorker- Minimal-change DAG worker:
Keeps your existing structure
Adds a tiny signature-aware _invoke() so custom methods can vary in signature
Uses _invoke() at the single call site in ._exec_multi_process()
- args_dump_mode: ClassVar[str] = 'json'
- args_dumper: ClassVar[Callable[..., None] | None] = None
- args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
- args_loader: ClassVar[Callable[..., Any] | None] = None
- args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
- as_dict(mode=None)
- Return type:
dict[str,Any]
- break()
Signal the service loop to exit on this worker.
- Return type:
bool
- static break_loop()
Signal the service loop to exit on this worker.
- Return type:
bool
- default_settings_path: ClassVar[str] = 'app_settings.toml'
- default_settings_section: ClassVar[str] = 'args'
- env: Optional[AgiEnv] = None
- static expand(path, base_directory=None)
Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- classmethod from_toml(env, settings_path=None, section=None, **overrides)
- Return type:
BaseWorker
- static loop(*, poll_interval=None)
Run a long-lived service loop on this worker until signalled to stop.
The derived worker can implement a
loopmethod accepting either zero arguments or a singlestop_eventargument. When the method signature acceptsstop_event(keywordstop_eventorshould_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. ReturningFalsefrom the worker method requests termination of the loop.- Return type:
Dict[str,Any]
- managed_pc_home_suffix: ClassVar[str] = 'MyApp'
- managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
- static normalize_dataset_path(data_path)
Normalise any dataset directory input so workers rely on consistent paths.
- Return type:
str
- prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)
Create (and optionally reset) a deterministic output directory.
- Return type:
Path
- setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
- Return type:
Any
- setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)
Prepare normalised input/output dataset paths without relying on worker args.
Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a
Path(output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.- Return type:
SimpleNamespace
- static start(worker_inst)
Invoke the concrete worker’s
starthook once initialised.
- stop()
Returns:
- to_toml(settings_path=None, section=None, create_missing=True)
- Return type:
None
- verbose = 1
pandas_worker
pandas_worker Framework Callback Functions Module
This module provides the PandasWorker class, which extends the foundational functionalities of BaseWorker for processing data using multiprocessing or single-threaded approaches with pandas.
- Classes:
PandasWorker: Worker class for data processing tasks using pandas.
- Internal Libraries:
os, warnings
- External Libraries:
concurrent.futures.ProcessPoolExecutor pathlib.Path time pandas as pd BaseWorker from node import BaseWorker.node
- class agi_node.pandas_worker.pandas_worker.PandasWorker[source]
Bases:
BaseWorkerPandasWorker Class
Inherits from
BaseWorkerto provide extended data processing functionalities using pandas.- verbose
Verbosity level for logging.
- Type:
int
- data_out
Path to the output directory.
- Type:
str
- worker_id
Identifier for the worker instance.
- Type:
int
- args
Configuration arguments for the worker.
- Type:
dict
- args_dump_mode: ClassVar[str] = 'json'
- args_dumper: ClassVar[Callable[..., None] | None] = None
- args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
- args_loader: ClassVar[Callable[..., Any] | None] = None
- args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
- as_dict(mode=None)
- Return type:
dict[str,Any]
- break()
Signal the service loop to exit on this worker.
- Return type:
bool
- static break_loop()
Signal the service loop to exit on this worker.
- Return type:
bool
- default_settings_path: ClassVar[str] = 'app_settings.toml'
- default_settings_section: ClassVar[str] = 'args'
- env: Optional[AgiEnv] = None
- static expand(path, base_directory=None)
Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- classmethod from_toml(env, settings_path=None, section=None, **overrides)
- Return type:
BaseWorker
- static loop(*, poll_interval=None)
Run a long-lived service loop on this worker until signalled to stop.
The derived worker can implement a
loopmethod accepting either zero arguments or a singlestop_eventargument. When the method signature acceptsstop_event(keywordstop_eventorshould_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. ReturningFalsefrom the worker method requests termination of the loop.- Return type:
Dict[str,Any]
- managed_pc_home_suffix: ClassVar[str] = 'MyApp'
- managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
- static normalize_dataset_path(data_path)
Normalise any dataset directory input so workers rely on consistent paths.
- Return type:
str
- prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)
Create (and optionally reset) a deterministic output directory.
- Return type:
Path
- setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
- Return type:
Any
- setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)
Prepare normalised input/output dataset paths without relying on worker args.
Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a
Path(output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.- Return type:
SimpleNamespace
- static start(worker_inst)
Invoke the concrete worker’s
starthook once initialised.
- stop()
Returns:
- to_toml(settings_path=None, section=None, create_missing=True)
- Return type:
None
- verbose = 1
- work_done(df=None)[source]
Handles the post-processing of the DataFrame after work_pool execution.
- Parameters:
df (pd.DataFrame, optional) – The pandas DataFrame to process. Defaults to None.
- Raises:
ValueError – If an unsupported output format is specified.
- Return type:
None
polars_worker
data_worker Framework Callback Functions Module
This module provides the PolarsWorker class, which extends the foundational functionalities of BaseWorker for processing data using multiprocessing or single-threaded approaches.
- Classes:
PolarsWorker: Worker class for data processing tasks.
- Internal Libraries:
os, warnings
- External Libraries:
concurrent.futures.ProcessPoolExecutor pathlib.Path time polars as pl BaseWorker from node import BaseWorker
- class agi_node.polars_worker.polars_worker.PolarsWorker[source]
Bases:
BaseWorkerPolarsWorker Class
Inherits from
BaseWorkerto provide extended data processing functionalities.- verbose
Verbosity level for logging.
- Type:
int
- data_out
Path to the output directory.
- Type:
str
- worker_id
Identifier for the worker instance.
- Type:
int
- args
Configuration arguments for the worker.
- Type:
dict
- args_dump_mode: ClassVar[str] = 'json'
- args_dumper: ClassVar[Callable[..., None] | None] = None
- args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
- args_loader: ClassVar[Callable[..., Any] | None] = None
- args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
- as_dict(mode=None)
- Return type:
dict[str,Any]
- break()
Signal the service loop to exit on this worker.
- Return type:
bool
- static break_loop()
Signal the service loop to exit on this worker.
- Return type:
bool
- default_settings_path: ClassVar[str] = 'app_settings.toml'
- default_settings_section: ClassVar[str] = 'args'
- env: Optional[AgiEnv] = None
- static expand(path, base_directory=None)
Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- classmethod from_toml(env, settings_path=None, section=None, **overrides)
- Return type:
BaseWorker
- static loop(*, poll_interval=None)
Run a long-lived service loop on this worker until signalled to stop.
The derived worker can implement a
loopmethod accepting either zero arguments or a singlestop_eventargument. When the method signature acceptsstop_event(keywordstop_eventorshould_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. ReturningFalsefrom the worker method requests termination of the loop.- Return type:
Dict[str,Any]
- managed_pc_home_suffix: ClassVar[str] = 'MyApp'
- managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
- static normalize_dataset_path(data_path)
Normalise any dataset directory input so workers rely on consistent paths.
- Return type:
str
- prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)
Create (and optionally reset) a deterministic output directory.
- Return type:
Path
- setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
- Return type:
Any
- setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)
Prepare normalised input/output dataset paths without relying on worker args.
Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a
Path(output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.- Return type:
SimpleNamespace
- static start(worker_inst)
Invoke the concrete worker’s
starthook once initialised.
- stop()
Returns:
- to_toml(settings_path=None, section=None, create_missing=True)
- Return type:
None
- verbose = 1
- work_done(df=None)[source]
Handles the post-processing of the DataFrame after work_pool execution.
- Parameters:
df (pl.DataFrame, optional) – The Polars DataFrame to process. Defaults to None.
- Raises:
ValueError – If an unsupported output format is specified.
- Return type:
None