Source code for mycode.mycode

"""Minimal manager implementation for the mycode sample project."""

from __future__ import annotations

import logging
import shutil
from pathlib import Path
from typing import Any, List, Tuple

from pydantic import ValidationError

from agi_node.agi_dispatcher import BaseWorker, WorkDispatcher

from .app_args import ArgsOverrides, MycodeArgs, dump_args, ensure_defaults, load_args, merge_args

logger = logging.getLogger(__name__)


[docs] class Mycode(BaseWorker): """Lightweight orchestration surface for the mycode example.""" worker_vars: dict[str, Any] = {}
[docs] def __init__( self, env, args: MycodeArgs | None = None, **kwargs: ArgsOverrides, ) -> None: self.env = env self._ensure_managed_pc_share_dir(env) # Allow caller-provided verbosity flag even though the Pydantic model forbids extras. self.verbose = bool(kwargs.pop("verbose", env.verbose)) if args is None: try: args = MycodeArgs(**kwargs) except ValidationError as exc: raise ValueError(f"Invalid Mycode arguments: {exc}") from exc self.args = args self.args.data_in = env.resolve_share_path(self.args.data_in) self.args.data_out = env.resolve_share_path(self.args.data_out) self.data_out = self.args.data_out # The mycode tests expect the data source directory to exist immediately # after instantiation so fixtures can write files into it. logger.info(f"mkdir {self.args.data_in}") self.args.data_in.mkdir(parents=True, exist_ok=True) WorkDispatcher.args = self.args.model_dump(mode="json") reset_target = getattr(self.args, "reset_target", False) try: if reset_target and self.data_out.exists(): shutil.rmtree( self.data_out, ignore_errors=True, onerror=WorkDispatcher._onerror, ) logger.info(f"mkdir {self.data_out}") self.data_out.mkdir(parents=True, exist_ok=True) except Exception as exc: # pragma: no cover - defensive guard logger.warning( "Issue while preparing dataframe directory %s: %s", self.data_out, exc, )
[docs] @classmethod def from_toml( cls, env, settings_path: str | Path = "app_settings.toml", section: str = "args", **overrides: ArgsOverrides, ) -> "Mycode": base = load_args(settings_path, section=section) merged = ensure_defaults(merge_args(base, overrides or None), env=env) return cls(env, args=merged)
[docs] def to_toml( self, settings_path: str | Path = "app_settings.toml", section: str = "args", create_missing: bool = True, ) -> None: dump_args(self.args, settings_path, section=section, create_missing=create_missing)
[docs] def as_dict(self) -> dict[str, Any]: payload = self.args.model_dump(mode="json") payload["dir_path"] = str(self.args.data_in) return payload
[docs] @staticmethod def pool_init(vars: dict[str, Any]) -> None: Mycode.worker_vars = vars
[docs] def work_pool(self, _: Any = None) -> None: # pragma: no cover - template hook pass
[docs] def work_done(self, _: Any) -> None: # pragma: no cover - template hook pass
[docs] def stop(self) -> None: if self.verbose > 0: print("Mycode worker completed.\n", end="") super().stop()
[docs] def build_distribution( self, _workers: dict | None = None, ) -> Tuple[List[List], List[List[Tuple[int, int]]], str, str, str]: # pragma: no cover - template hook return [], [], "id", "nb_fct", ""
[docs] class MycodeApp(Mycode): """Alias retaining the historical suffix for compatibility."""
__all__ = ["Mycode", "MycodeApp"]