# BSD 3-Clause License
#
# [License Text Remains Unchanged]
#
# (Include the full BSD 3-Clause License text here)
"""
pandas_worker Framework Callback Functions Module
==================================================
This module provides the `PandasWorker` class, which extends the foundational
functionalities of `BaseWorker` for processing data using multiprocessing or
single-threaded approaches with pandas.
Classes:
PandasWorker: Worker class for data processing tasks using pandas.
Internal Libraries:
os
External Libraries:
concurrent.futures.ProcessPoolExecutor
pathlib.Path
pandas as pd
BaseWorker from node import BaseWorker.node
"""
# Internal Libraries:
import multiprocessing
# External Libraries:
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
import numpy as np
from agi_node.agi_dispatcher import BaseWorker
from agi_node.agi_dispatcher import worker_pool_support
import pandas as pd
import logging
logger = logging.getLogger(__name__)
def _process_pool_factory(*, max_workers, initializer, initargs):
"""Build the process pool for the pool execution path.
``ProcessPoolExecutor`` is resolved through this module's global namespace
so tests can monkeypatch ``pandas_worker.ProcessPoolExecutor``. An explicit
spawn context keeps child startup deterministic across platforms.
"""
return ProcessPoolExecutor(
max_workers=max_workers,
initializer=initializer,
initargs=initargs,
mp_context=multiprocessing.get_context("spawn"),
)
def _concat_labeled(frames, labels):
"""Concatenate frames and add the worker_id column in one allocation."""
df = pd.concat(list(frames), axis=0, ignore_index=True)
df["worker_id"] = np.repeat(
np.asarray(list(labels), dtype=object),
[len(frame) for frame in frames],
)
return df
_PANDAS_POOL_HOOKS = worker_pool_support.PoolFrameHooks(
family="PandasWorker",
executor_kind="process",
executor_factory=_process_pool_factory,
is_frame=lambda result: isinstance(result, pd.DataFrame),
is_empty=lambda df: df.empty,
concat_labeled=_concat_labeled,
empty_frame=pd.DataFrame,
)
[docs]
class PandasWorker(BaseWorker):
"""
PandasWorker Class
--------------------
Inherits from ``BaseWorker`` to provide extended data processing functionalities using pandas.
Attributes:
verbose (int): Verbosity level for logging.
data_out (str): Path to the output directory.
worker_id (int): Identifier for the worker instance.
args (dict): Configuration arguments for the worker.
"""
[docs]
def work_pool(self, x: any = None) -> pd.DataFrame: # ty: ignore[invalid-type-form]
"""
Processes a single task.
Args:
x (any, optional): The task to process. Defaults to None.
Returns:
pd.DataFrame: A pandas DataFrame with the processed results.
"""
logging.info("work_pool")
# Call the actual work_pool method, which should return a pandas DataFrame.
# Ensure that the original _actual_work_pool method is refactored accordingly.
return self._actual_work_pool(x) # ty: ignore[unresolved-attribute]
[docs]
def work_done(self, df: pd.DataFrame = None) -> None: # ty: ignore[invalid-parameter-default]
"""
Handles the post-processing of the DataFrame after `work_pool` execution.
Args:
df (pd.DataFrame, optional): The pandas DataFrame to process. Defaults to None.
Raises:
ValueError: If an unsupported output format is specified.
"""
logging.info("work_done")
if df is None or df.empty:
return
output_format = self.args.get("output_format")
# work_done is called once per work chunk; suffix subsequent chunks so
# they do not overwrite the first chunk's output file.
chunk_index = getattr(self, "_work_done_chunk", 0)
self._work_done_chunk = chunk_index + 1
output_filename = (
f"{self._worker_id}_output"
if chunk_index == 0
else f"{self._worker_id}_output_{chunk_index}"
)
output_path = Path(self.data_out) / f"{output_filename}"
if output_format == "parquet":
df.to_parquet(output_path.with_suffix(".parquet"))
elif output_format == "csv":
df.to_csv(output_path.with_suffix(".csv"), index=False)
else:
raise ValueError("Unsupported output format")
[docs]
def works(self, workers_plan: any, workers_plan_metadata: any) -> float: # ty: ignore[invalid-type-form]
"""
Executes worker tasks based on the distribution tree.
Args:
workers_plan (any): Distribution tree structure.
workers_plan_metadata (any): Additional information about the workers.
Returns:
float: Execution time of this works() call in seconds.
"""
return worker_pool_support.run_works(self, workers_plan, workers_plan_metadata)
def _exec_multi_process(self, workers_plan: any, workers_plan_metadata: any) -> None: # ty: ignore[invalid-type-form]
"""
Executes tasks with a process pool shared across all plan chunks.
Args:
workers_plan (any): Distribution tree structure.
workers_plan_metadata (any): Additional information about the workers.
"""
worker_pool_support.exec_multi_process(
self, workers_plan, workers_plan_metadata, _PANDAS_POOL_HOOKS
)
def _exec_mono_process(self, workers_plan: any, workers_plan_metadata: any) -> None: # ty: ignore[invalid-type-form]
"""
Executes tasks in single-threaded mode.
Args:
workers_plan (any): Distribution tree structure.
workers_plan_metadata (any): Additional information about the workers.
"""
worker_pool_support.exec_mono_process(
self, workers_plan, workers_plan_metadata, _PANDAS_POOL_HOOKS
)