agi_node package
- class agi_node.dag_worker.dag_worker.DagWorker[source]
Bases:
BaseWorker- Minimal-change DAG worker:
Keeps your existing structure
Adds a tiny signature-aware _invoke() so custom methods can vary in signature
Uses _invoke() at the single call site in ._exec_multi_process()
- args_dump_mode: ClassVar[str] = 'json'
- args_dumper: ClassVar[Callable[..., None] | None] = None
- args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
- args_loader: ClassVar[Callable[..., Any] | None] = None
- args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
- as_dict(mode=None)
- Return type:
dict[str,Any]
- break()
Signal the service loop to exit on this worker.
- Return type:
bool
- static break_loop()
Signal the service loop to exit on this worker.
- Return type:
bool
- default_settings_path: ClassVar[str] = 'app_settings.toml'
- default_settings_section: ClassVar[str] = 'args'
- env: Optional[AgiEnv] = None
- static expand(path, base_directory=None)
Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- classmethod from_toml(env, settings_path=None, section=None, **overrides)
- Return type:
BaseWorker
- static loop(*, poll_interval=None)
Run a long-lived service loop on this worker until signalled to stop.
The derived worker can implement a
loopmethod accepting either zero arguments or a singlestop_eventargument. When the method signature acceptsstop_event(keywordstop_eventorshould_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. ReturningFalsefrom the worker method requests termination of the loop.- Return type:
Dict[str,Any]
- managed_pc_home_suffix: ClassVar[str] = 'MyApp'
- managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
- static normalize_dataset_path(data_path)
Normalise any dataset directory input so workers rely on consistent paths.
- Return type:
str
- prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)
Create (and optionally reset) a deterministic output directory.
- Return type:
Path
- setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
- Return type:
Any
- setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)
Prepare normalised input/output dataset paths without relying on worker args.
Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a
Path(output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.- Return type:
SimpleNamespace
- static start(worker_inst)
Invoke the concrete worker’s
starthook once initialised.
- stop()
Returns:
- to_toml(settings_path=None, section=None, create_missing=True)
- Return type:
None
- verbose = 1
pandas_worker Framework Callback Functions Module
This module provides the PandasWorker class, which extends the foundational functionalities of BaseWorker for processing data using multiprocessing or single-threaded approaches with pandas.
- Classes:
PandasWorker: Worker class for data processing tasks using pandas.
- Internal Libraries:
os, warnings
- External Libraries:
concurrent.futures.ProcessPoolExecutor pathlib.Path time pandas as pd BaseWorker from node import BaseWorker.node
- class agi_node.pandas_worker.pandas_worker.PandasWorker[source]
Bases:
BaseWorkerPandasWorker Class
Inherits from
BaseWorkerto provide extended data processing functionalities using pandas.- verbose
Verbosity level for logging.
- Type:
int
- data_out
Path to the output directory.
- Type:
str
- worker_id
Identifier for the worker instance.
- Type:
int
- args
Configuration arguments for the worker.
- Type:
dict
- args_dump_mode: ClassVar[str] = 'json'
- args_dumper: ClassVar[Callable[..., None] | None] = None
- args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
- args_loader: ClassVar[Callable[..., Any] | None] = None
- args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
- as_dict(mode=None)
- Return type:
dict[str,Any]
- break()
Signal the service loop to exit on this worker.
- Return type:
bool
- static break_loop()
Signal the service loop to exit on this worker.
- Return type:
bool
- default_settings_path: ClassVar[str] = 'app_settings.toml'
- default_settings_section: ClassVar[str] = 'args'
- env: Optional[AgiEnv] = None
- static expand(path, base_directory=None)
Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- classmethod from_toml(env, settings_path=None, section=None, **overrides)
- Return type:
BaseWorker
- static loop(*, poll_interval=None)
Run a long-lived service loop on this worker until signalled to stop.
The derived worker can implement a
loopmethod accepting either zero arguments or a singlestop_eventargument. When the method signature acceptsstop_event(keywordstop_eventorshould_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. ReturningFalsefrom the worker method requests termination of the loop.- Return type:
Dict[str,Any]
- managed_pc_home_suffix: ClassVar[str] = 'MyApp'
- managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
- static normalize_dataset_path(data_path)
Normalise any dataset directory input so workers rely on consistent paths.
- Return type:
str
- prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)
Create (and optionally reset) a deterministic output directory.
- Return type:
Path
- setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
- Return type:
Any
- setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)
Prepare normalised input/output dataset paths without relying on worker args.
Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a
Path(output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.- Return type:
SimpleNamespace
- static start(worker_inst)
Invoke the concrete worker’s
starthook once initialised.
- stop()
Returns:
- to_toml(settings_path=None, section=None, create_missing=True)
- Return type:
None
- verbose = 1
- work_done(df=None)[source]
Handles the post-processing of the DataFrame after work_pool execution.
- Parameters:
df (pd.DataFrame, optional) – The pandas DataFrame to process. Defaults to None.
- Raises:
ValueError – If an unsupported output format is specified.
- Return type:
None
data_worker Framework Callback Functions Module
This module provides the PolarsWorker class, which extends the foundational functionalities of BaseWorker for processing data using multiprocessing or single-threaded approaches.
- Classes:
PolarsWorker: Worker class for data processing tasks.
- Internal Libraries:
os, warnings
- External Libraries:
concurrent.futures.ProcessPoolExecutor pathlib.Path time polars as pl BaseWorker from node import BaseWorker
- class agi_node.polars_worker.polars_worker.PolarsWorker[source]
Bases:
BaseWorkerPolarsWorker Class
Inherits from
BaseWorkerto provide extended data processing functionalities.- verbose
Verbosity level for logging.
- Type:
int
- data_out
Path to the output directory.
- Type:
str
- worker_id
Identifier for the worker instance.
- Type:
int
- args
Configuration arguments for the worker.
- Type:
dict
- args_dump_mode: ClassVar[str] = 'json'
- args_dumper: ClassVar[Callable[..., None] | None] = None
- args_ensure_defaults: ClassVar[Callable[..., Any] | None] = None
- args_loader: ClassVar[Callable[..., Any] | None] = None
- args_merger: ClassVar[Callable[[Any, Optional[Any]], Any] | None] = None
- as_dict(mode=None)
- Return type:
dict[str,Any]
- break()
Signal the service loop to exit on this worker.
- Return type:
bool
- static break_loop()
Signal the service loop to exit on this worker.
- Return type:
bool
- default_settings_path: ClassVar[str] = 'app_settings.toml'
- default_settings_section: ClassVar[str] = 'args'
- env: Optional[AgiEnv] = None
- static expand(path, base_directory=None)
Expand a given path to an absolute path. :type path: str :param path: The path to expand. :type path: str :type base_directory: str, optional :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- classmethod from_toml(env, settings_path=None, section=None, **overrides)
- Return type:
BaseWorker
- static loop(*, poll_interval=None)
Run a long-lived service loop on this worker until signalled to stop.
The derived worker can implement a
loopmethod accepting either zero arguments or a singlestop_eventargument. When the method signature acceptsstop_event(keywordstop_eventorshould_stop), the worker implementation is responsible for honouring the event. Otherwise the base implementation repeatedly invokes the method and sleeps for the configured poll interval between calls. ReturningFalsefrom the worker method requests termination of the loop.- Return type:
Dict[str,Any]
- managed_pc_home_suffix: ClassVar[str] = 'MyApp'
- managed_pc_path_fields: ClassVar[tuple[str, ...]] = ()
- static normalize_dataset_path(data_path)
Normalise any dataset directory input so workers rely on consistent paths.
- Return type:
str
- prepare_output_dir(root, *, subdir='dataframe', attribute='data_out', clean=True)
Create (and optionally reset) a deterministic output directory.
- Return type:
Path
- setup_args(args, *, env=None, error=None, output_field=None, output_subdir='dataframe', output_attr='data_out', output_clean=True, output_parents_up=0)
- Return type:
Any
- setup_data_directories(*, source_path, target_path=None, target_subdir='dataframe', reset_target=False)
Prepare normalised input/output dataset paths without relying on worker args.
Returns a namespace with the resolved input path (input_path), the normalised string used by downstream readers (normalized_input), the output directory as a
Path(output_path), and its normalised string representation (normalized_output). Optionally clears and recreates the output directory.- Return type:
SimpleNamespace
- static start(worker_inst)
Invoke the concrete worker’s
starthook once initialised.
- stop()
Returns:
- to_toml(settings_path=None, section=None, create_missing=True)
- Return type:
None
- verbose = 1
- work_done(df=None)[source]
Handles the post-processing of the DataFrame after work_pool execution.
- Parameters:
df (pl.DataFrame, optional) – The Polars DataFrame to process. Defaults to None.
- Raises:
ValueError – If an unsupported output format is specified.
- Return type:
None