Source code for agi_env.agi_env

# BSD 3-Clause License
#
# Copyright (c) 2025, Jean-Pierre Morard, THALES SIX GTS France SAS
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
# following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the
#    following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
#    following disclaimer in the documentation and/or other materials provided with the distribution.
# 3. Neither the name of Jean-Pierre Morard nor the names of its contributors, or THALES SIX GTS France SAS,
#    may be used to endorse or promote products derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import ast
import cmd
import asyncio
import getpass
import os
import subprocess
import threading
import queue
import traceback
import time
if os.name == "nt":
    import winreg
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
import re
import sys
import shutil
from pathlib import Path, PureWindowsPath, PurePosixPath
from dotenv import dotenv_values, set_key
from pathspec import PathSpec
import tomli
import tomli_w


[docs] class JumpToMain(Exception): """ Custom exception to jump back to the main execution flow. """ pass
[docs] class ContentRenamer(ast.NodeTransformer): """ A class that renames identifiers in an abstract syntax tree (AST). Attributes: rename_map (dict): A mapping of old identifiers to new identifiers. """
[docs] def __init__(self, rename_map): """ Initialize the ContentRenamer with the rename_map. Args: rename_map (dict): Mapping of old names to new names. """ self.rename_map = rename_map
[docs] def visit_Name(self, node): # Rename variable and function names """ Visit and potentially rename a Name node in the abstract syntax tree. Args: self: The current object instance. node: The Name node in the abstract syntax tree. Returns: ast.Node: The modified Name node after potential renaming. Note: This function modifies the Name node in place. Raises: None """ if node.id in self.rename_map: print(f"Renaming Name: {node.id}{self.rename_map[node.id]}") node.id = self.rename_map[node.id] self.generic_visit(node) # Ensure child nodes are visited return node
[docs] def visit_Attribute(self, node): # Rename attributes """ Visit and potentially rename an attribute in a node. Args: node: A node representing an attribute. Returns: node: The visited node with potential attribute renamed. Raises: None. """ if node.attr in self.rename_map: print(f"Renaming Attribute: {node.attr}{self.rename_map[node.attr]}") node.attr = self.rename_map[node.attr] self.generic_visit(node) return node
[docs] def visit_FunctionDef(self, node): # Rename function names """ Rename a function node based on a provided mapping. Args: node (ast.FunctionDef): The function node to be processed. Returns: ast.FunctionDef: The function node with potential name change. """ if node.name in self.rename_map: print(f"Renaming Function: {node.name}{self.rename_map[node.name]}") node.name = self.rename_map[node.name] self.generic_visit(node) return node
[docs] def visit_ClassDef(self, node): # Rename class names """ Visit and potentially rename a ClassDef node. Args: node (ast.ClassDef): The ClassDef node to visit. Returns: ast.ClassDef: The potentially modified ClassDef node. """ if node.name in self.rename_map: print(f"Renaming Class: {node.name}{self.rename_map[node.name]}") node.name = self.rename_map[node.name] self.generic_visit(node) return node
[docs] def visit_arg(self, node): # Rename function argument names """ Visit and potentially rename an argument node. Args: self: The instance of the class. node: The argument node to visit and possibly rename. Returns: ast.AST: The modified argument node. Notes: Modifies the argument node in place if its name is found in the rename map. Raises: None. """ if node.arg in self.rename_map: print(f"Renaming Argument: {node.arg}{self.rename_map[node.arg]}") node.arg = self.rename_map[node.arg] self.generic_visit(node) return node
[docs] def visit_Global(self, node): # Rename global variable names """ Visit and potentially rename global variables in the AST node. Args: self: The instance of the class that contains the renaming logic. node: The AST node to visit and potentially rename global variables. Returns: AST node: The modified AST node with global variable names potentially renamed. """ new_names = [] for name in node.names: if name in self.rename_map: print(f"Renaming Global Variable: {name}{self.rename_map[name]}") new_names.append(self.rename_map[name]) else: new_names.append(name) node.names = new_names self.generic_visit(node) return node
[docs] def visit_nonlocal(self, node): # Rename nonlocal variable names """ Visit and potentially rename nonlocal variables in the AST node. Args: self: An instance of the class containing the visit_nonlocal method. node: The AST node to visit and potentially modify. Returns: ast.AST: The modified AST node after visiting and potentially renaming nonlocal variables. """ new_names = [] for name in node.names: if name in self.rename_map: print( f"Renaming Nonlocal Variable: {name}{self.rename_map[name]}" ) new_names.append(self.rename_map[name]) else: new_names.append(name) node.names = new_names self.generic_visit(node) return node
[docs] def visit_Assign(self, node): # Rename assigned variable names """ Visit and process an assignment node. Args: self: The instance of the visitor class. node: The assignment node to be visited. Returns: ast.Node: The visited assignment node. """ self.generic_visit(node) return node
[docs] def visit_AnnAssign(self, node): # Rename annotated assignments """ Visit and process an AnnAssign node in an abstract syntax tree. Args: self: The AST visitor object. node: The AnnAssign node to be visited. Returns: AnnAssign: The visited AnnAssign node. """ self.generic_visit(node) return node
[docs] def visit_For(self, node): # Rename loop variable names """ Visit and potentially rename the target variable in a For loop node. Args: node (ast.For): The For loop node to visit. Returns: ast.For: The modified For loop node. Note: This function may modify the target variable in the For loop node if it exists in the rename map. """ if isinstance(node.target, ast.Name) and node.target.id in self.rename_map: print( f"Renaming For Loop Variable: {node.target.id}{self.rename_map[node.target.id]}" ) node.target.id = self.rename_map[node.target.id] self.generic_visit(node) return node
[docs] def visit_Import(self, node): """ Rename imported modules in 'import module' statements. Args: node (ast.Import): The import node. """ for alias in node.names: original_name = alias.name if original_name in self.rename_map: print( f"Renaming Import Module: {original_name}{self.rename_map[original_name]}" ) alias.name = self.rename_map[original_name] else: # Handle compound module names if necessary for old, new in self.rename_map.items(): if original_name.startswith(old): print( f"Renaming Import Module: {original_name}{original_name.replace(old, new, 1)}" ) alias.name = original_name.replace(old, new, 1) break self.generic_visit(node) return node
[docs] def visit_ImportFrom(self, node): """ Rename modules and imported names in 'from module import name' statements. Args: node (ast.ImportFrom): The import from node. """ # Rename the module being imported from if node.module in self.rename_map: print( f"Renaming ImportFrom Module: {node.module}{self.rename_map[node.module]}" ) node.module = self.rename_map[node.module] else: for old, new in self.rename_map.items(): if node.module and node.module.startswith(old): new_module = node.module.replace(old, new, 1) print( f"Renaming ImportFrom Module: {node.module}{new_module}" ) node.module = new_module break # Rename the imported names for alias in node.names: if alias.name in self.rename_map: print( f"Renaming Imported Name: {alias.name}{self.rename_map[alias.name]}" ) alias.name = self.rename_map[alias.name] else: for old, new in self.rename_map.items(): if alias.name.startswith(old): print( f"Renaming Imported Name: {alias.name}{alias.name.replace(old, new, 1)}" ) alias.name = alias.name.replace(old, new, 1) break self.generic_visit(node) return node
[docs] class AgiEnv: """ AgiEnv manages paths and environment variables within the agiFramework. """ install_type = None apps_dir = None app = None module = None GUI_NROW = None
[docs] def __init__(self, install_type: int=None, apps_dir: Path = None, active_app: Path | str = None, active_module: Path = None, verbose: int = 0): """ Initialize the AgiEnv instance parameters: - install_type: 0: end-user, 1: dev, 2: api - apps_dir: path to apps directory - active_app: name or path of the active app - active_module: path of the active module - verbose: verbosity level """ if install_type and not isinstance(install_type, int): Print("install_type should be a int") exit(1) self.verbose = verbose self.is_managed_pc = getpass.getuser().startswith("T0") self.agi_resources = Path("resources/.agilab") self.home_abs = Path.home() / "MyApp" if self.is_managed_pc else Path.home() self.resource_path = self.home_abs / self.agi_resources.name env_path = self.resource_path / ".env" self.envars = dotenv_values(dotenv_path=env_path, verbose=verbose) envars = self.envars if not install_type: cwd = os.getcwd() # If we're not in site-packages, or we're in the "gui" folder under site-packages → 1, else 0 install_type = 1 if ("site-packages" not in __file__ and sys.prefix.endswith("gui/.venv")) else 0 if self.verbose: print("install_type", install_type) self.agi_root = AgiEnv.locate_agi_installation() if install_type: self.agi_fwk_env_path = self.agi_root / "fwk/env" resource_path = self.agi_fwk_env_path / "src/agi_env" / self.agi_resources else: head, sep, _ = __file__.partition("site-packages") if not sep: raise ValueError("site-packages not in", __file__) self.agi_fwk_env_path = Path(head + sep) #self.agi_root = self.agi_fwk_env_path / "agilab" resource_path = self.agi_fwk_env_path / "agi_env" / self.agi_resources if not self.agi_fwk_env_path.exists(): raise JumpToMain("your Agilab installation is not valid") self.install_type = install_type # Initialize .agilab resources self._init_resources(resource_path) self.set_env_var("INSTALL_TYPE", install_type) # check validity of active_module if any and set the apps_dir if active_module: if isinstance(active_module, Path): self.module = active_module.stem appsdir = self._determine_apps_dir(active_module) if apps_dir: print("warning apps_dir will be determine from active_module path") apps_dir = appsdir app = apps_dir.name if active_app: print("app will be determined from active_module path") active_app = app else: print("active_module must be of type 'Path'") exit(1) else: self.module = None # self.set_env_var("INSTALL_TYPE", install_type) # if apps_dir is not provided or can't be guess from modul_path then take from envars if not apps_dir: apps_dir = envars.get("APPS_DIR", '.') else: set_key(dotenv_path=env_path, key_to_set="APPS_DIR", value_to_set=str(apps_dir)) apps_dir = Path(apps_dir) # check validity of apps_dir if any try: if apps_dir.exists(): self.apps_dir = apps_dir elif install_type: self.apps_dir = self.agi_root / apps_dir else: os.makedirs(str(apps_dir), exist_ok=True) except FileNotFoundError: print("app_dir not found:/n", apps_dir) exit(1) self.GUI_NROW = int(envars.get("GUI_NROW", 1000)) if not active_app: active_app = envars.get("APP_DEFAULT", 'flight_project') # check validity of active_app and set module if isinstance(active_app, str): active_app = active_app if not active_app.endswith('_project'): active_app = active_app + '_project' app_path = apps_dir / active_app if app_path.exists(): self.app = active_app src_apps = self.agi_root / "apps" if not install_type: if not apps_dir.exists(): shutil.copytree(src_apps, apps_dir) else: self.copy_missing(src_apps, apps_dir) module = active_app.replace("_project", "").replace("-", "_") else: apps_dir = self._determine_apps_dir(active_app) module = apps_dir.name.replace("_project", "").replace("-", "_") AgiEnv.resolve_packages_path_in_toml(module, self.agi_root, apps_dir) self.projects = self.get_projects(self.apps_dir) if not self.projects: print(f"Could not find any target project app in {self.agi_root / "apps"}.") if not self.module: self.module = module AgiEnv.apps_dir = self.apps_dir # Initialize environment variables self._init_envars() self.app_path = self.apps_dir / active_app self.setup_app = self.app_path / "setup" self.setup_core = self.core_src / "agi_core/workers/agi_worker/setup" self.target_worker = f"{self.module}_worker" self.worker_path = ( self.app_path / "src" / self.target_worker / f"{self.target_worker}.py" ) self.module_path = self.app_path / "src" / self.module / f"{self.module}.py" self.worker_pyproject = self.worker_path.parent / "pyproject.toml" target_class = "".join(x.title() for x in self.target.split("_")) worker_class = target_class + "Worker" self.target_class = target_class self.target_worker_class = worker_class # Call the new base class parser to get both class name and module name. self.base_worker_cls, self.base_worker_module = self.get_base_worker_cls( self.worker_path, worker_class ) self.workers_packages_prefix = "agi_core.workers." if not self.worker_path.exists(): print( f"Missing {self.target_worker_class} definition; should be in {self.worker_path} but it does not exist" ) exit(1) app_src_path = self.app_path / "src" app_src = str(app_src_path) if app_src not in sys.path: sys.path.insert(0, app_src) app_src_path.mkdir(parents=True, exist_ok=True) self.app_src_path = self.agi_root / app_src_path # Initialize worker environment self._init_worker_env() # Initialize projects and LAB if required if AgiEnv.install_type != 3: self.init_envars_app(self.envars) self._init_apps() if not self.wenv_abs.exists(): os.makedirs(self.wenv_abs) # Set export_local_bin based on the OS if os.name == "nt": self.export_local_bin = 'set PATH=%USERPROFILE%\\.local\\bin;%PATH% &&' else: self.export_local_bin = 'export PATH="$HOME/.local/bin:$PATH";'
[docs] def copy_missing(self, src: Path, dst: Path): # Ensure the destination directory exists dst.mkdir(parents=True, exist_ok=True) for item in src.iterdir(): src_item = item dst_item = dst / item.name if src_item.is_dir(): # Recursively copy the directory if it's missing entirely, # or copy missing files inside it self.copy_missing(src_item, dst_item) else: # Copy file if it does not exist in destination if not dst_item.exists(): shutil.copy2(src_item, dst_item)
[docs] def active(self, target, install_type): if self.module != target: self.change_active_app(target + '_project', install_type)
# ---------------------------------------------- # Base class parsing methods (integrated) # ----------------------------------------------
[docs] def get_base_worker_cls(self, module_path, class_name): """ Retrieves the first base class ending with 'Worker' from the specified module. Returns a tuple: (base_class_name, module_name) """ base_info_list = self.get_base_classes(module_path, class_name) try: # Retrieve the first base whose name ends with 'Worker' base_class, module_name = next( (base, mod) for base, mod in base_info_list if base.endswith("Worker") ) return base_class, module_name except StopIteration: # workaroud # todo change logic for AgiEnv instanciation into wenv #raise ValueError( # f"class {class_name}([Dag|Data|Agent]Worker): not found in {module_path}." #) return None, None
[docs] def get_base_classes(self, module_path, class_name): """ Parses the module at module_path and returns a list of tuples for the base classes of the specified class. Each tuple is (base_class_name, module_name). """ try: with open(module_path, "r", encoding="utf-8") as file: source = file.read() except (IOError, FileNotFoundError) as e: if self.verbose: print(f"Error reading module file {module_path}: {e}") return [] try: tree = ast.parse(source) except SyntaxError as e: if self.verbose: print(f"Syntax error parsing {module_path}: {e}") raise RuntimeError(f"Syntax error parsing {module_path}: {e}") # Build mapping of imported names/aliases to modules import_mapping = self.get_import_mapping(source) base_classes = [] for node in ast.walk(tree): if isinstance(node, ast.ClassDef) and node.name == class_name: for base in node.bases: base_info = self.extract_base_info(base, import_mapping) if base_info: base_classes.append(base_info) break # Found our target class return base_classes
[docs] def get_import_mapping(self, source): """ Parses the source code and builds a mapping of imported names/aliases to module names. """ mapping = {} try: tree = ast.parse(source) except SyntaxError as e: if self.verbose: print(f"Syntax error during import mapping: {e}") raise for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: mapping[alias.asname or alias.name] = alias.name elif isinstance(node, ast.ImportFrom): module = node.module for alias in node.names: mapping[alias.asname or alias.name] = module return mapping
[docs] def extract_base_info(self, base, import_mapping): """ Extracts the base class name and attempts to determine the module name from the import mapping. Returns: Tuple[str, Optional[str]]: (base_class_name, module_name) """ if isinstance(base, ast.Name): # For a simple name like "MyClassFoo", try to get the module from the import mapping. module_name = import_mapping.get(base.id) return base.id, module_name elif isinstance(base, ast.Attribute): # For an attribute like dag_worker.DagWorker, reconstruct the full dotted name. full_name = self.get_full_attribute_name(base) parts = full_name.split(".") if len(parts) > 1: # Assume the first part is the alias from the import alias = parts[0] module_name = import_mapping.get(alias, alias) return parts[-1], module_name return base.attr, None return None
[docs] def get_full_attribute_name(self, node): """ Recursively retrieves the full dotted name from an attribute node. """ if isinstance(node, ast.Name): return node.id elif isinstance(node, ast.Attribute): return self.get_full_attribute_name(node.value) + "." + node.attr return ""
# ---------------------------------------------- # Updated method using tomli instead of toml # ----------------------------------------------
[docs] def mode2str(self, mode): import tomli # Use tomli for reading TOML files chars = ["p", "c", "d", "r"] reversed_chars = reversed(list(enumerate(chars))) # Open in binary mode for tomli with open(self.app_path / "pyproject.toml", "rb") as file: pyproject_data = tomli.load(file) dependencies = pyproject_data.get("project", {}).get("dependencies", []) if len([dep for dep in dependencies if dep.lower().startswith("cu")]) > 0: mode += 8 mode_str = "".join( "_" if (mode & (1 << i)) == 0 else v for i, v in reversed_chars ) return mode_str
[docs] @staticmethod def mode2int(mode): mode_int = 0 set_rm = set(mode) for i, v in enumerate(["p", "c", "d"]): if v in set_rm: mode_int += 2 ** (len(["p", "c", "d"]) - 1 - i) return mode_int
[docs] @staticmethod def locate_agi_installation(): if os.name == "nt": where_is_agi = Path(os.getenv("LOCALAPPDATA")) / "agilab/.agi-path" else: where_is_agi = Path.home() / ".local/share/agilab/.agi-path" if where_is_agi.exists(): try: with where_is_agi.open("r", encoding="utf-8-sig") as f: install_path = f.read().strip() agilab_path = Path(install_path) if install_path and agilab_path.exists(): print("Run Agilab:", install_path) return agilab_path else: raise ValueError("Installation path file is empty.") where_is_agi.unlink() if self.verbose: print(f"Agilab Installation found at: {self.home_abs}") except FileNotFoundError: print(f"File {where_is_agi} does not exist.") except PermissionError: print(f"Permission denied when accessing {where_is_agi}.") except Exception as e: print(f"An error occurred: {e}") else: raise RuntimeError("agilab dir not found in local folder (.local on posix and %LOCALAPPDATA% on Windows).")
def _check_module_path(self, module: Path): module = module.expanduser() if not module.exists(): print(f"Warning Module source '{module}' does not exist") return module def _determine_module_path(self, project_or_module_name): parts = project_or_module_name.rsplit("-", 1) suffix = parts[-1] name = parts[0].split(os.sep)[-1] module_name = name.replace("-", "_") # Moved this up if suffix.startswith("project"): name = name.replace("-" + suffix, "") project_name = name + "_project" else: project_name = name.replace("_", "-") + "_project" module_path = ( self.apps_dir / project_name / "src" / module_name / (module_name + ".py") ).resolve() return module_path def _determine_apps_dir(self, module_path): path_str = str(module_path) index = path_str.index("_project") return Path(path_str[:index]).parent def _init_apps(self): app_settings_file = self.app_src_path / "app_settings.toml" app_settings_file.touch(exist_ok=True) self.app_settings_file = app_settings_file args_ui_snippet = self.app_src_path / "args_ui_snippet.py" args_ui_snippet.touch(exist_ok=True) self.args_ui_snippet = args_ui_snippet self.gitignore_file = self.app_path / ".gitignore" dest = self.resource_path if self.install_type: shutil.copytree(self.agi_root / "fwk/gui/src/agi_gui" / self.agi_resources, dest, dirs_exist_ok=True) else: shutil.copytree(self.agi_root.parent / "agi_gui" / self.agi_resources, dest, dirs_exist_ok=True) def _update_env_file(self, updates: dict): """ Updates the .agilab/.env file with the key/value pairs from updates. Reads the current file (if any), updates the keys, and writes back all key/value pairs. """ env_file = self.resource_path / ".env" env_data = {} if env_file.exists(): with env_file.open("r") as f: for line in f: if '=' in line: k, v = line.strip().split("=", 1) env_data[k] = v # Update with the new key/value pairs. env_data.update(updates) with env_file.open("w") as f: for k, v in env_data.items(): f.write(f"{k}={v}\n")
[docs] def set_env_var(self, key: str, value: str): """ General setter: Updates the AgiEnv internal environment dictionary, the process environment, and persists the change in the .agilab/.env file. """ self.envars[key] = value os.environ[key] = str(value) self._update_env_file({key: value})
[docs] def set_cluster_credentials(self, credentials: str): """Set the AGI_CREDENTIALS environment variable.""" self.CLUSTER_CREDENTIALS = credentials # maintain internal state self.set_env_var("CLUSTER_CREDENTIALS", credentials)
[docs] def set_openai_api_key(self, api_key: str): """Set the OPENAI_API_KEY environment variable.""" self.OPENAI_API_KEY = api_key self.set_env_var("OPENAI_API_KEY", api_key)
[docs] def set_install_type(self, install_type: int): self.install_type = install_type self.set_env_var("INSTALL_TYPE", str(install_type))
[docs] def set_apps_dir(self, apps_dir: Path): self.apps_dir =apps_dir self.set_env_var("APPS_DIR", apps_dir)
[docs] @staticmethod def get_venv_root(): p = Path(sys.prefix).resolve() # If .venv exists in the path parts, slice the path up to it if ".venv" in p.parts: index = p.parts.index(".venv") return Path(*p.parts[:index]) return p
[docs] def has_admin_rights(): """ Check if the current process has administrative rights on Windows. Returns: bool: True if admin, False otherwise. """ try: return ctypes.windll.shell32.IsUserAnAdmin() except: return False
[docs] def create_junction_windows(source: Path, dest: Path): """ Create a directory junction on Windows. Args: source (Path): The target directory path. dest (Path): The destination junction path. """ try: # Using the mklink command to create a junction (/J) which doesn't require admin rights. subprocess.check_call(['cmd', '/c', 'mklink', '/J', str(dest), str(source)]) print(f"Created junction: {dest} -> {source}") except subprocess.CalledProcessError as e: print(f"Failed to create junction. Error: {e}")
# -------------------- Handling .venv Directory -------------------- #
[docs] def handle_venv_directory(self, source_venv: Path, dest_venv: Path): """ Create a symbolic link for the .venv directory instead of copying it. Args: source_venv (Path): Source .venv directory path. dest_venv (Path): Destination .venv symbolic link path. """ try: if os.name == "nt": create_symlink_windows(source_venv, dest_venv) else: # For Unix-like systems os.symlink(source_venv, dest_venv, target_is_directory=True) print(f"Created symbolic link for .venv: {dest_venv} -> {source_venv}") except OSError as e: print(f"Failed to create symbolic link for .venv: {e}")
# -------------------- Rename Map Creator -------------------- #
[docs] def create_rename_map(self, target_project: Path, dest_project: Path) -> dict: """ Create a mapping of old → new names for cloning. Includes project names, top-level src folders, worker folders, in-file identifiers and class names. """ def cap(s: str) -> str: return "".join(p.capitalize() for p in s.split("_")) name_tp = target_project.name # e.g. "flight_project" name_dp = dest_project.name # e.g. "tata_project" tp = name_tp[:-8] # strip "_project" → "flight" dp = name_dp[:-8] # → "tata" tm = tp.replace("-", "_") dm = dp.replace("-", "_") tc = cap(tm) # "Flight" dc = cap(dm) # "Tata" return { # project-level name_tp: name_dp, # folder-level (longest keys first) f"src/{tm}_worker": f"src/{dm}_worker", f"src/{tm}": f"src/{dm}", # sibling-level f"{tm}_worker": f"{dm}_worker", tm: dm, # class-level f"{tc}Worker": f"{dc}Worker", f"{tc}Args": f"{dc}Args", tc: dc, }
[docs] def clone_project(self, target_project: Path, dest_project: Path): """ Clone a project by copying files and directories, applying renaming, then cleaning up any leftovers. Args: target_project: Path under self.apps_dir (e.g. Path("flight_project")) dest_project: Path under self.apps_dir (e.g. Path("tata_project")) """ # Lazy import heavy deps import shutil, ast, os, astor from pathspec import PathSpec from pathspec.patterns import GitWildMatchPattern # normalize names if not target_project.name.endswith("_project"): target_project = target_project.with_name(target_project.name + "_project") if not dest_project.name.endswith("_project"): dest_project = dest_project.with_name(dest_project.name + "_project") rename_map = self.create_rename_map(target_project, dest_project) source_root = self.apps_dir / target_project dest_root = self.apps_dir / dest_project if not source_root.exists(): print(f"Source project '{target_project}' does not exist.") return if dest_root.exists(): print(f"Destination project '{dest_project}' already exists.") return gitignore = source_root / ".gitignore" if not gitignore.exists(): print(f"No .gitignore at '{gitignore}'.") return spec = PathSpec.from_lines(GitWildMatchPattern, gitignore.read_text().splitlines()) try: dest_root.mkdir(parents=True, exist_ok=False) except Exception as e: print(f"Could not create '{dest_root}': {e}") return # 1) Recursive clone self.clone_directory(source_root, dest_root, rename_map, spec, source_root) # 2) Final cleanup self._cleanup_rename(dest_root, rename_map) self.projects.insert(0, dest_project)
[docs] def clone_directory(self, source_dir: Path, dest_dir: Path, rename_map: dict, spec: PathSpec, source_root: Path): """ Recursively copy + rename directories, files, and contents. """ import astor for item in source_dir.iterdir(): # inside your clone_directory loop, after you've computed `rel` rel = item.relative_to(source_root).as_posix() if spec.match_file(rel + ("/" if item.is_dir() else "")): continue # split into segments parts = rel.split("/") # map each segment exactly via your rename_map (falling back to itself) parts = [rename_map.get(seg, seg) for seg in parts] # now reconstruct the destination path dst_item = dest_dir.joinpath(*parts) dst_item.parent.mkdir(parents=True, exist_ok=True) if item.is_dir(): if item.name == ".venv": # keep venv as a symlink os.symlink(item, dst_item, target_is_directory=True) else: self.clone_directory(item, dest_dir, rename_map, spec, source_root) elif item.is_file(): suf = item.suffix.lower() # first, if the **basename** matches an old→new, rename the file itself base = item.stem if base in rename_map: dst_item = dst_item.with_name(rename_map[base] + item.suffix) # archives if suf in (".7z", ".zip"): shutil.copy2(item, dst_item) # Python → AST rename + whole‑word replace elif suf == ".py": src = item.read_text(encoding="utf-8") try: tree = ast.parse(src) renamer = ContentRenamer(rename_map) new_tree = renamer.visit(tree) ast.fix_missing_locations(new_tree) out = astor.to_source(new_tree) except SyntaxError: out = src # apply any leftover whole‑word replaces for old, new in rename_map.items(): out = re.sub(rf"\b{re.escape(old)}\b", new, out) dst_item.write_text(out, encoding="utf-8") # text files → whole‑word replace elif suf in (".toml", ".md", ".txt", ".json", ".yaml", ".yml"): txt = item.read_text(encoding="utf-8") for old, new in rename_map.items(): txt = re.sub(rf"\b{re.escape(old)}\b", new, txt) dst_item.write_text(txt, encoding="utf-8") # everything else else: shutil.copy2(item, dst_item) elif item.is_symlink(): target = os.readlink(item) os.symlink(target, dst_item, target_is_directory=item.is_dir())
def _cleanup_rename(self, root: Path, rename_map: dict): """ 1) Rename any leftover file/dir basenames (including .py) that exactly match a key. 2) Rewrite text files for any straggler content references. """ # build simple name→new map (no slashes) simple_map = {old: new for old, new in rename_map.items() if "/" not in old} # sort longest first sorted_simple = sorted(simple_map.items(), key=lambda kv: len(kv[0]), reverse=True) # -- step 1: rename basenames (dirs & files) bottom‑up -- for path in sorted(root.rglob("*"), key=lambda p: len(p.parts), reverse=True): old = path.name for o, n in sorted_simple: # directory exactly "flight" → "truc", or "flight_worker" → "truc_worker" if old == o or old == f"{o}_worker" or old == f"{o}_project": new_name = old.replace(o, n, 1) path.rename(path.with_name(new_name)) break # file like "flight.py" → "truc.py" if path.is_file() and old.startswith(o + "."): new_name = n + old[len(o):] path.rename(path.with_name(new_name)) break # -- step 2: rewrite any lingering text references -- exts = {".py", ".toml", ".md", ".txt", ".json", ".yaml", ".yml"} for file in root.rglob("*"): if not file.is_file() or file.suffix.lower() not in exts: continue txt = file.read_text(encoding="utf-8") new_txt = txt for old, new in rename_map.items(): new_txt = re.sub(rf"\b{re.escape(old)}\b", new, new_txt) if new_txt != txt: file.write_text(new_txt, encoding="utf-8")
[docs] def replace_content(self, txt: str, rename_map: dict) -> str: for old, new in sorted(rename_map.items(), key=lambda kv: len(kv[0]), reverse=True): # only match whole‐word occurrences of `old` pattern = re.compile(rf"\b{re.escape(old)}\b") txt = pattern.sub(new, txt) return txt
[docs] def read_gitignore(self, gitignore_path: Path) -> 'PathSpec': from pathspec import PathSpec from pathspec.patterns import GitWildMatchPattern lines = gitignore_path.read_text(encoding="utf-8").splitlines() return PathSpec.from_lines(GitWildMatchPattern, lines)
def _init_envars(self): envars = self.envars self.credantials = envars.get("CLUSTER_CREDENTIALS", getpass.getuser()) credantials = self.credantials.split(":") self.user = credantials[0] if len(credantials) > 1: self.password = credantials[1] else: self.password = None self.python_version = envars.get("AGI_PYTHON_VERSION", "3.12.9") os.makedirs(AgiEnv.apps_dir, exist_ok=True) if self.install_type: self.core_src = self.agi_root / "fwk/core/src" else: self.core_src = self.agi_root self.core_root = self.core_src.parent self.workers_root = self.core_src / "agi_core/workers" self.manager_root = self.core_src / "agi_core/managers/" path = str(self.core_src) if path not in sys.path: sys.path.insert(0, path) # Determine module path and set target. if isinstance(self.module, Path): self.module_path = self.module.expanduser().resolve() else: self.module_path = self._determine_module_path(self.module) self.target = self.module_path.stem # Define self.target here self.AGILAB_SHARE_ABS = Path( envars.get("AGI_SHARE_DIR", self.home_abs / "data") ) self.dataframes_path = self.AGILAB_SHARE_ABS / self.target / "dataframes" # Now that target is defined, we can use it for further assignments. self._init_projects() self.WORKER_VENV_REL = Path(envars.get("WORKER_VENV_DIR", "wenv")) self.scheduler_ip = envars.get("AGI_SCHEDULER_IP", "127.0.0.1") if not self.is_valid_ip(self.scheduler_ip): raise ValueError(f"Invalid scheduler IP address: {self.scheduler_ip}") if self.install_type: self.help_path = str(self.agi_root / "../docs/html") else: self.help_path = "https://thalesgroup.github.io/agilab" self.AGILAB_SHARE_ABS = Path( envars.get("AGI_SHARE_DIR", self.home_abs / "data") )
[docs] def is_valid_ip(self, ip: str) -> bool: pattern = re.compile(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$") if pattern.match(ip): parts = ip.split(".") return all(0 <= int(part) <= 255 for part in parts) return False
[docs] def init_envars_app(self, envars): self.CLUSTER_CREDENTIALS = envars.get("CLUSTER_CREDENTIALS", None) self.OPENAI_API_KEY = envars.get("OPENAI_API_KEY", None) AGILAB_LOG_ABS = Path(envars.get("AGI_LOG_DIR", self.home_abs / "log")) if not AGILAB_LOG_ABS.exists(): AGILAB_LOG_ABS.mkdir(parents=True) self.AGILAB_LOG_ABS = AGILAB_LOG_ABS self.runenv = self.AGILAB_LOG_ABS AGILAB_EXPORT_ABS = Path(envars.get("AGI_EXPORT_DIR", self.home_abs / "export")) if not AGILAB_EXPORT_ABS.exists(): AGILAB_EXPORT_ABS.mkdir(parents=True) self.AGILAB_EXPORT_ABS = AGILAB_EXPORT_ABS self.export_apps = AGILAB_EXPORT_ABS / "apps" if not self.export_apps.exists(): os.makedirs(str(self.export_apps), exist_ok=True) self.MLFLOW_TRACKING_DIR = Path( envars.get("MLFLOW_TRACKING_DIR", self.home_abs / ".mlflow") ) self.AGILAB_VIEWS_ABS = Path( envars.get("AGI_VIEWS_DIR", self.agi_root / "views") ) self.AGILAB_VIEWS_REL = Path(envars.get("AGI_VIEWS_DIR", "agi/_")) if self.install_type == 0: self.copilot_file = self.agi_root / "agi_gui/agi_copilot.py" else: self.copilot_file = self.agi_root / "fwk/gui/src/agi_gui/agi_copilot.py"
def _init_resources(self, resources_path): src_env_path = resources_path / ".env" dest_env_file = self.resource_path / ".env" if not src_env_path.exists(): msg = f"Installation issue: {src_env_path} is missing!" print(msg) raise RuntimeError(msg) if not dest_env_file.exists(): os.makedirs(dest_env_file.parent, exist_ok=True) shutil.copy(src_env_path, dest_env_file) for root, dirs, files in os.walk(resources_path): for file in files: src_file = Path(root) / file relative_path = src_file.relative_to(resources_path) dest_file = self.resource_path / relative_path dest_file.parent.mkdir(parents=True, exist_ok=True) if not dest_file.exists(): os.makedirs(dest_env_file.parent, exist_ok=True) shutil.copy(src_file, dest_file) def _init_worker_env(self): self.wenv_rel = self.WORKER_VENV_REL / self.target_worker self.wenv_abs = self.home_abs / self.wenv_rel self.wenv_target_worker = self.wenv_abs distribution_tree = self.wenv_abs / "distribution_tree.json" self.cyprepro = self.core_src / "agi_core/workers/agi_worker/cyprepro.py" self.post_install_script = self.wenv_abs / "src" / self.target_worker / "post_install.py" if distribution_tree.exists(): distribution_tree.unlink() self.distribution_tree = distribution_tree def _init_projects(self): for idx, project in enumerate(self.projects): if self.target == project[:-8].replace("-", "_"): self.app_path = AgiEnv.apps_dir / project self.project_index = idx self.app = project break
[docs] def get_projects(self, path:Path): return [p.name for p in path.glob("*project")]
[docs] def get_modules(self, target=None): pattern = "_project" modules = [ re.sub(f"^{pattern}|{pattern}$", "", project).replace("-", "_") for project in self.get_projects(AgiEnv.apps_dir) ] return modules
@property def scheduler_ip_address(self): return self.scheduler_ip
[docs] def change_active_module(self, module_path, install_type): if module_path != self.module_path: self.__init__(active_module=module_path, install_type=install_type, verbose=self.verbose)
[docs] def change_active_app(self, app, install_type=1): if isinstance(app, str): app_name = app elif isinstance(app, Path): app_name = app.name else: raise TypeError(f"Invalid app type: {type(app)}\nSupported type are <str> and <Path>") if app_name != self.app: self.__init__(active_app=app_name, install_type=install_type, verbose=self.verbose)
[docs] def check_args(self, target_args_class, target_args): try: validated_args = target_args_class.parse_obj(target_args) validation_errors = None except Exception as e: import humanize validation_errors = self.humanize_validation_errors(e) return validation_errors
[docs] def humanize_validation_errors(self, error): formatted_errors = [] for err in error.errors(): field = ".".join(str(loc) for loc in err["loc"]) message = err["msg"] error_type = err.get("type", "unknown_error") input_value = err.get("ctx", {}).get("input_value", None) user_message = f"❌ **{field}**: {message}" if input_value is not None: user_message += f" (Received: `{input_value}`)" user_message += f"\n*Error Type:* `{error_type}`\n" formatted_errors.append(user_message) return formatted_errors
@staticmethod def _build_env(venv=None): proc_env = os.environ.copy() if venv is not None: venv_path = Path(venv) / ".venv" proc_env["VIRTUAL_ENV"] = str(venv_path) bin_path = "Scripts" if os.name == "nt" else "bin" venv_bin = venv_path / bin_path proc_env["PATH"] = str(venv_bin) + os.pathsep + proc_env.get("PATH", "") return proc_env
[docs] class JumpToMain(Exception): pass
@staticmethod async def _run_bg(cmd, cwd=".", venv=None, timeout=None, log_callback=None): """ Run the given command asynchronously, reading stdout and stderr line by line and passing them to the log_callback. """ proc_env = AgiEnv._build_env(venv) proc_env["PYTHONUNBUFFERED"] = "1" proc = await asyncio.create_subprocess_shell( cmd, cwd=os.path.abspath(cwd), env=proc_env, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) async def read_stream(stream, callback): while True: line = await stream.readline() if not line: break decoded_line = line.decode().rstrip() if callback: callback(decoded_line) else: print(decoded_line) tasks = [] if proc.stdout: tasks.append(asyncio.create_task( read_stream(proc.stdout, lambda msg: log_callback(msg) if log_callback else print(msg)) )) if proc.stderr: tasks.append(asyncio.create_task( read_stream(proc.stderr, lambda msg: log_callback(msg) if log_callback else print(msg)) )) try: await asyncio.wait_for(proc.wait(), timeout=timeout) except asyncio.TimeoutError as err: proc.kill() raise RuntimeError(f"Timeout expired for command: {cmd}") from err await asyncio.gather(*tasks) stdout, stderr = await proc.communicate() return stdout.decode(), stderr.decode()
[docs] async def run_agi(self, code, log_callback=None, venv: Path = None, type=None): """ Asynchronous version of run_agi for use within an async context. """ pattern = r"await\s+(?:Agi\.)?([^\(]+)\(" matches = re.findall(pattern, code) if not matches: message = "Could not determine snippet name from code." if log_callback: log_callback(message) else: print(message) return "", "" snippet_file = os.path.join(self.runenv, f"{matches[0]}-{self.target}.py") with open(snippet_file, "w") as file: file.write(code) cmd = f"uv run python {snippet_file}" # Await _run_bg directly without asyncio.run() result = await AgiEnv._run_bg(cmd, venv=venv, log_callback=log_callback) if log_callback: log_callback(f"Process finished with output: {result}") return result
[docs] @staticmethod async def run_async(cmd, venv=None, cwd=None, timeout=None, log_callback=None): if not cwd: cwd = venv process_env = os.environ.copy() venv_path = Path(venv) / ".venv" process_env["VIRTUAL_ENV"] = str(venv_path) bin_dir = "Scripts" if os.name == "nt" else "bin" venv_bin = venv_path / bin_dir process_env["PATH"] = str(venv_bin) + os.pathsep + process_env.get("PATH", "") shell_executable = "/bin/bash" if os.name != "nt" else None # If cmd is a list, join it for shell=True. if isinstance(cmd, list): cmd = " ".join(cmd) process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(cwd), env=process_env, executable=shell_executable ) async def read_stream(stream, callback): while True: line = await stream.readline() if not line: break decoded_line = line.decode().rstrip() callback(decoded_line) # Start a task for reading stderr concurrently. stderr_task = asyncio.create_task( read_stream(process.stderr, log_callback if log_callback else print) )
[docs] @staticmethod def run(cmd, venv=None, cwd=None, timeout=None, wait=True, log_callback=None): if not cwd: cwd = venv process_env = os.environ.copy() venv_path = Path(venv) / ".venv" process_env["VIRTUAL_ENV"] = str(venv_path) bin_dir = "Scripts" if os.name == "nt" else "bin" venv_bin = venv_path / bin_dir process_env["PATH"] = str(venv_bin) + os.pathsep + process_env.get("PATH", "") shell_executable = "/bin/bash" if os.name != "nt" else None if wait: try: process = subprocess.Popen( cmd, shell=True, cwd=str(venv) if not cwd else str(cwd), env=process_env, text=True, executable=shell_executable, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) output_lines = [] while True: if process.stderr: line = process.stderr.readline().rstrip() if line: if log_callback: log_callback(line) else: print(line) if line == '' and process.poll() is not None: break else: break process.wait(timeout=timeout) return process.stdout.read() if process.stdout else "" except Exception as e: print(traceback.format_exc()) raise RuntimeError(f"Command execution error: {e}") from e else: return ""
[docs] @staticmethod def normalize_path(path): return ( str(PureWindowsPath(Path(path))) if os.name == "nt" else str(PurePosixPath(Path(path))) )
[docs] @staticmethod def resolve_packages_path_in_toml(module, agi_root, apps_dir): """ Updates the 'agi-core' package path in the pyproject.toml file for a given module. Args: module (str): The module name (using underscore as separator). Raises: FileNotFoundError: If the pyproject.toml file cannot be found. RuntimeError: If an error occurs during reading or writing the TOML file. """ # Convert agi_root to POSIX string agi_root_str = agi_root.as_posix() # Build the module path based on naming conventions (underscores to hyphens) module_path = agi_root / apps_dir / (module + "_project") pyproject_file = module_path / "pyproject.toml" if not pyproject_file.exists(): raise FileNotFoundError(f"pyproject.toml not found in {module_path}") try: with pyproject_file.open("rb") as f: content = tomli.load(f) except Exception as e: raise RuntimeError(f"Error loading TOML from {pyproject_file}: {e}") # On non-Windows, ensure agi_root_str ends with a slash if not agi_root_str.endswith("/"): agi_root_str += "/" # Compute the agi-core path agi_core = f"{agi_root_str}fwk/core" # Safely retrieve (or create) the nested structure for tool/uv/sources sources = content.setdefault("tool", {}).setdefault("uv", {}).setdefault("sources", {}) # Update the 'agi-core' entry if it exists and is a dict if isinstance(sources.get("agi-core"), dict) and "path" in sources["agi-core"]: sources["agi-core"]["path"] = agi_core else: print(f"Warning: 'agi-core' entry not found or invalid in {pyproject_file}; skipping update.") try: with pyproject_file.open("wb") as f: tomli_w.dump(content, f) except Exception as e: raise RuntimeError(f"Error writing updated TOML to {pyproject_file}: {e}") print("Updated", pyproject_file)