# BSD 3-Clause License
#
# Copyright (c) 2025, Jean-Pierre Morard, THALES SIX GTS France SAS
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
# 3. Neither the name of Jean-Pierre Morard nor the names of its contributors, or THALES SIX GTS France SAS, may be used to endorse or promote products derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
import re
import traceback
from pydantic import BaseModel, validator, conint, confloat
import shutil
import warnings
from pathlib import Path
from typing import Unpack, Literal
import py7zr
import polars as pl
from datetime import date
from agi_core.managers.agi_runner import AGI
from agi_core.managers.agi_manager import AgiManager
from agi_env import AgiEnv
warnings.filterwarnings("ignore")
[docs]
class FlightArgs(BaseModel):
"""FlightArgs contains Arguments for Flight"""
data_source: Literal["file", "hawk"]
path: Path
files: str
nfile: int = conint
nskip: int = conint
nread: int = conint(ge=0)
sampling_rate: float = confloat(ge=0)
datemin: date
datemax: date
output_format: Literal["parquet", "csv"]
[docs]
@validator("datemax")
def check_date_order(cls, v, values):
"""
Check the order of dates and validate the 'datemax' value.
Args:
cls: The class itself.
v: The 'datemax' value to be validated.
values (dict): A dictionary containing the input values.
Returns:
Any: The validated 'datemax' value.
Raises:
ValueError: If 'datemax' is not after 'datemin' or after "2021/06/01".
"""
datemin = values.get("datemin")
if datemin and date(2021, 6, 1) >= v < datemin:
raise ValueError('datemax must be after datemin and before "2021/06/01"')
return v
[docs]
@validator("datemin")
def check_date(cls, v, values):
"""
Check if the given date is greater than a specified minimum date.
Args:
v (datetime.date): The date to be validated.
values (dict): A dictionary containing the values of all fields.
Returns:
datetime.date: The validated date.
Raises:
ValueError: If the input date is not greater than "2020/01/01".
"""
if v < date(2020, 1, 1):
raise ValueError('datemin must be greater than "2020/01/01"')
return v
[docs]
@validator("files")
def check_valid_regex(cls, value):
"""
Check if the input string is a valid regular expression.
Args:
value (str): The string to be validated as a regex.
Returns:
str: The input string if it is a valid regex.
Raises:
ValueError: If the input string is not a valid regex.
"""
try:
if value.startswith("*"):
value = '.' + value
re.compile(value)
except re.error:
raise ValueError(f"The provided string '{value}' is not a valid regex.")
return value
[docs]
class Flight(AgiManager):
"""Flight class provides methods to orchester the run"""
ivq_logs = None
[docs]
def __init__(self, env, **args: Unpack[FlightArgs]):
# Handling defaults and specific behaviors
"""
Initialize a Flight object with provided arguments.
Args:
**args (Unpack[FlightArgs]): Keyword arguments to configure the Flight object.
Possible arguments include:
- data_source (str): Source of the data, either 'file' or 'hawk'.
- files (str): Path pattern or file name.
- path (str): Path to store data files.
remark: There is also src/flight_worker/dataset.7z for dataset replication per worker
- nfile (int): Maximum number of files to process.
- datemin (str): Minimum date for data processing.
- datemax (str): Maximum date for data processing.
- output_format (str): Output format for processed data, either 'parquet' or 'csv'.
Raises:
ValueError: If an invalid input mode is provided for data_source.
"""
args["data_source"] = args.get("data_source", "file")
self.data_source = args["data_source"]
if self.data_source == "file":
args["files"] = args.get("files", "*")
path = args.get("path", os.path.join("~/data", "flight"))
if env.is_managed_pc:
path = path.replace("~", "~/MyApp")
# path_abs = AgiEnv.normalize_path(os.path.expanduser(path))
# if not os.path.exists(path_abs):
# this is not working on Thales managed PC
# os.link(src=os.path.join(AGI.env.app_path, "data"), dst=path_abs)
# os.makedirs(path_abs, exist_ok=True)
# data_src = os.path.join(AGI.env.app_path, "data.7z")
# Extract contents of src_archive directly to path_abs
# with py7zr.SevenZipFile(data_src, mode="r") as archive:
# archive.extractall(path_abs)
args["nfile"] = args.get("nfile", 999_999_999_999)
if args["nfile"] == 0:
args["nfile"] = 999_999_999_999
args["path"] = path
elif self.data_source == "hawk":
# implement another logic
pass
self.path = AgiEnv.normalize_path(Path(path).expanduser())
self.files = args["files"]
self.nfile = args["nfile"]
AgiManager.args = args
self.data_out = os.path.join(self.path, "dataframes")
"""
remove dataframe files from previous run
"""
try:
if os.path.exists(self.data_out):
shutil.rmtree(
self.data_out, ignore_errors=False, onerror=AgiManager.onerror
)
os.makedirs(self.data_out, exist_ok=True)
except Exception as e:
print(f"warning issue while trying to remove directory: {e}")
return
[docs]
def build_distribution(self):
"""build_distrib: to provide the list of files per planes (level1) and per workers (level2)
the level 1 has been think to prevent that à job that requires all the output-data of a plane have to wait for another
my_code_worker which would have collapse the overall performance
Args:
Returns:
"""
try:
# create list of works weighted
planes_partition, planes_partition_size, df = self.get_partition_by_planes(
self.get_data_from_files()
)
# get the second level of the distribution tree by by dispatching these works per workers
# make chunk of planes by worker with a load balancing that takes into consideration workers capacities
workers_chunks = AGI.make_chunks(
len(planes_partition), planes_partition_size, self.verbose, threshold=12
)
if workers_chunks:
# build tree: workers = dask workers -> works = planes -> files <=> list of list of list
# files by plane are capped to max number of files requested per workers
workers_planes_dist = []
df = df.with_columns([pl.col("id_plane").cast(pl.Int64)])
for planes in workers_chunks:
workers_planes_dist.append(
[
df.filter(pl.col("id_plane") == plane_id)["files"]
.head(self.nfile)
.to_list()
for plane_id, _ in planes
]
)
workers_chunks = [
[(plane, round(size / 1000, 3)) for plane, size in chunk]
for chunk in workers_chunks
]
# tree: workers -> planes -> files
except Exception as e:
print(traceback.format_exc())
print(f"warning issue while trying to build distribution: {e}")
return workers_planes_dist, workers_chunks, "plane", "files", "ko"
[docs]
def get_data_from_hawk(self):
"""get output-data from ELK/HAWK"""
# implement your hawk logic
pass
[docs]
def get_data_from_files(self):
"""get output-data slices from files or from ELK/HAWK"""
if self.data_source == "file":
path = AgiEnv.normalize_path(self.path)
home_dir = Path.home()
# Assuming 'self.path' is the base directory and 'self.files' is the pattern for the files you're interested in
self.logs_ivq = {
str(f.relative_to(home_dir)): os.path.getsize(f) // 1000
for f in Path(self.path).rglob(self.files)
if f.is_file()
}
if not self.logs_ivq:
raise FileNotFoundError(
f"Error in make_chunk: no files found with Path('{self.path}').rglob('{self.files}')"
)
# Convert dict_items to a list of tuples before creating a Polars DataFrame
df = pl.DataFrame(list(self.logs_ivq.items()), schema=["files", "size"])
elif self.data_source == "hawk":
# implement your HAWK logic
pass
return df
[docs]
def get_partition_by_planes(self, df):
"""build the first level of the distribution tree with planes as atomics partition
Args:
s: df: dataframe containing the output-data to partition
df:
Returns:
"""
df = df.with_columns(
pl.col("files")
.str.extract(
r"(?:.*/)?(\d{2})_") # Optionally match directories, then capture two digits followed by an underscore
.cast(pl.Int32) # Cast the captured string to Int32
.alias("id_plane") # Rename the column
)
# Get the first 'nfile' rows per 'id_plane' group
df = df.group_by("id_plane").head(self.nfile)
# Sort the DataFrame by 'id_plane'
df = df.sort("id_plane")
# Compute the sum of 'size' per 'id_plane' and sort in descending order
planes_partition = (
df.group_by("id_plane")
.agg(pl.col("size").sum().alias("size"))
.sort("size", descending=True)
)
# Extract 'id_plane' and 'size' into lists and create tuples
planes_partition_size = list(
zip(
planes_partition["id_plane"].to_list(),
planes_partition["size"].to_list(),
)
)
return planes_partition, planes_partition_size, df