# BSD 3-Clause License
#
# [License Text Remains Unchanged]
#
# (Include the full BSD 3-Clause License text here)
"""
pandas_worker Framework Callback Functions Module
===============================================
This module provides the `PandasWorker` class, which extends the foundational
functionalities of `BaseWorker` for processing data using multiprocessing or
single-threaded approaches with pandas.
Classes:
PandasWorker: Worker class for data processing tasks using pandas.
Internal Libraries:
os, warnings
External Libraries:
concurrent.futures.ProcessPoolExecutor
pathlib.Path
time
pandas as pd
BaseWorker from node import BaseWorker.node
"""
# Internal Libraries:
import os
import warnings
# External Libraries:
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
import time
from agi_env import AgiEnv, normalize_path
from agi_node.agi_dispatcher import BaseWorker
import pandas as pd
import logging
warnings.filterwarnings("ignore")
logger = logging.getLogger(__name__)
[docs]
class PandasWorker(BaseWorker):
"""
PandasWorker Class
--------------------
Inherits from :class:`BaseWorker` to provide extended data processing functionalities using pandas.
Attributes:
verbose (int): Verbosity level for logging.
data_out (str): Path to the output directory.
worker_id (int): Identifier for the worker instance.
args (dict): Configuration arguments for the worker.
"""
[docs]
def work_pool(self, x: any = None) -> pd.DataFrame:
"""
Processes a single task.
Args:
x (any, optional): The task to process. Defaults to None.
Returns:
pd.DataFrame: A pandas DataFrame with the processed results.
"""
logging.info("work_pool")
# Call the actual work_pool method, which should return a pandas DataFrame.
# Ensure that the original _actual_work_pool method is refactored accordingly.
return self._actual_work_pool(x)
[docs]
def work_done(self, df: pd.DataFrame = None) -> None:
"""
Handles the post-processing of the DataFrame after `work_pool` execution.
Args:
df (pd.DataFrame, optional): The pandas DataFrame to process. Defaults to None.
Raises:
ValueError: If an unsupported output format is specified.
"""
logging.info("work_done")
if df is None or df.empty:
return
output_format = self.args.get("output_format")
output_filename = f"{self._worker_id}_output"
output_path = Path(self.data_out) / f"{output_filename}"
if output_format == "parquet":
df.to_parquet(output_path.with_suffix(".parquet"))
elif output_format == "csv":
df.to_csv(output_path.with_suffix(".csv"), index=False)
else:
raise ValueError("Unsupported output format")
[docs]
def works(self, workers_plan: any, workers_plan_metadata: any) -> float:
"""
Executes worker tasks based on the distribution tree.
Args:
workers_plan (any): Distribution tree structure.
workers_plan_metadata (any): Additional information about the workers.
Returns:
float: Execution time in seconds.
"""
if workers_plan:
if self._mode & 4:
self._exec_multi_process(workers_plan, workers_plan_metadata)
else:
self._exec_mono_process(workers_plan, workers_plan_metadata)
self.stop()
if BaseWorker._t0 is None:
BaseWorker._t0 = time.time()
return time.time() - BaseWorker._t0
def _exec_multi_process(self, workers_plan: any, workers_plan_metadata: any) -> None:
"""
Executes tasks in multiprocessing mode.
Args:
workers_plan (any): Distribution tree structure.
workers_plan_metadata (any): Additional information about the workers.
"""
works = []
if isinstance(workers_plan, list):
for i in workers_plan[self._worker_id]:
works += i
ncore = max(min(len(works), int(os.cpu_count())), 1)
else:
ncore = 1
logging.info(
f"PandasWorker.work - ncore {ncore} - mycode_worker #{self._worker_id}"
f" - work_pool x {len(works)}",
)
self.work_init()
for work_id, work in enumerate(workers_plan[self._worker_id]):
list_df = []
df = pd.DataFrame()
ncore = max(min(len(work), int(os.cpu_count())), 1)
if os.name == "nt":
process_factory_type = "spawn"
else:
process_factory_type = "spawn"
# Note: multiprocessing context commented out, as ThreadPoolExecutor is used
# mp_ctx = multiprocessing.get_context(process_factory_type)
with ProcessPoolExecutor(
# mp_context=mp_ctx,
max_workers=ncore,
initializer=self.pool_init,
initargs=(self.pool_vars,),
) as exec:
dfs = exec.map(self.work_pool, work)
for df_result in dfs:
if not df_result.empty:
list_df.append(df_result)
if list_df:
for idx, df_result in enumerate(list_df):
df_result = df_result.copy()
df_result["worker_id"] = str((self._worker_id, idx))
list_df[idx] = df_result
df = pd.concat(list_df, axis=0, ignore_index=True)
self.work_done(df if not df.empty else pd.DataFrame())
def _exec_mono_process(self, workers_plan: any, workers_plan_metadata: any) -> None:
"""
Executes tasks in single-threaded mode.
Args:
workers_plan (any): Distribution tree structure.
workers_plan_metadata (any): Additional information about the workers.
"""
self.work_init()
for work_id, work in enumerate(workers_plan[self._worker_id]):
list_df = []
df = pd.DataFrame()
logging.info(
f"PandasWorker.work - monoprocess work #{work_id} - work_pool x {len(work)}"
)
if workers_plan:
dfs = [self.work_pool(file) for file in work]
if dfs and isinstance(dfs[0], pd.DataFrame):
for df_result in dfs:
if not df_result.empty:
list_df.append(df_result)
if list_df:
for idx, df_result in enumerate(list_df):
df_result = df_result.copy()
df_result["worker_id"] = str((self._worker_id, 0))
list_df[idx] = df_result
df = pd.concat(list_df, axis=0, ignore_index=True)
self.work_done(df)