Example2: Flight-Project
manager: flight

Package flight
flight: module examples Auteur: Jean-Pierre Morard Copyright: Thales SIX GTS France SAS
- class flight.flight.Flight(env, **args)[source]
Bases:
AgiManager
Flight class provides methods to orchester the run
- __init__(env, **args)[source]
Initialize a Flight object with provided arguments.
- Parameters:
**args (Unpack[FlightArgs]) –
Keyword arguments to configure the Flight object. Possible arguments include:
data_source (str): Source of the data, either ‘file’ or ‘hawk’.
files (str): Path pattern or file name.
path (str): Path to store data files. remark: There is also src/flight_worker/dataset.7z for dataset replication per worker
nfile (int): Maximum number of files to process.
datemin (str): Minimum date for data processing.
datemax (str): Maximum date for data processing.
output_format (str): Output format for processed data, either ‘parquet’ or ‘csv’.
- Raises:
ValueError – If an invalid input mode is provided for data_source.
- args = {}
- build_distribution()[source]
build_distrib: to provide the list of files per planes (level1) and per workers (level2) the level 1 has been think to prevent that à job that requires all the output-data of a plane have to wait for another my_code_worker which would have collapse the overall performance
Args:
Returns:
- static convert_functions_to_names(workers_tree)
Converts functions in a nested structure to their names.
- data_out
remove dataframe files from previous run
- static do_distrib(inst, agi_env, workers)
Build the distribution tree.
- Parameters:
inst – The instance for building the distribution tree.
- Returns:
None
- static extract_plane_from_file_name(file_path)[source]
provide airplane id from log file name
- Parameters:
file_path
Returns:
- get_partition_by_planes(df)[source]
build the first level of the distribution tree with planes as atomics partition
- Parameters:
s – df: dataframe containing the output-data to partition
df
Returns:
- ivq_logs = None
- static onerror(func, path, exc_info)
Error handler for shutil.rmtree.
If the error is due to an access error (read-only file), it attempts to add write permission and then retries.
If the error is for another reason, it re-raises the error.
Usage: shutil.rmtree(path, onerror=onerror)
- Parameters:
func (function) – The function that raised the error.
path (str) – The path name passed to the function.
exc_info (tuple) – The exception information returned by sys.exc_info().
- Returns:
None
- verbose = None
- class flight.flight.FlightArgs(**data)[source]
Bases:
BaseModel
FlightArgs contains Arguments for Flight
- __init__(**data)
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- classmethod check_date(v, values)[source]
Check if the given date is greater than a specified minimum date.
- Parameters:
v (datetime.date) – The date to be validated.
values (dict) – A dictionary containing the values of all fields.
- Returns:
The validated date.
- Return type:
datetime.date
- Raises:
ValueError – If the input date is not greater than “2020/01/01”.
- classmethod check_date_order(v, values)[source]
Check the order of dates and validate the ‘datemax’ value.
- Parameters:
cls – The class itself.
v – The ‘datemax’ value to be validated.
values (dict) – A dictionary containing the input values.
- Returns:
The validated ‘datemax’ value.
- Return type:
Any
- Raises:
ValueError – If ‘datemax’ is not after ‘datemin’ or after “2021/06/01”.
- classmethod check_valid_regex(value)[source]
Check if the input string is a valid regular expression.
- Parameters:
value (str) – The string to be validated as a regex.
- Returns:
The input string if it is a valid regex.
- Return type:
str
- Raises:
ValueError – If the input string is not a valid regex.
- classmethod construct(_fields_set=None, **values)
- Return type:
Self
- copy(*, include=None, exclude=None, update=None, deep=False)
Returns a copy of the model.
- !!! warning “Deprecated”
This method is now deprecated; use model_copy instead.
If you need include or exclude, use:
`python {test="skip" lint="skip"} data = self.model_dump(include=include, exclude=exclude, round_trip=True) data = {**data, **(update or {})} copied = self.model_validate(data) `
- Parameters:
include – Optional set or mapping specifying which fields to include in the copied model.
exclude – Optional set or mapping specifying which fields to exclude in the copied model.
update – Optional dictionary of field-value pairs to override field values in the copied model.
deep – If True, the values of fields that are Pydantic models will be deep-copied.
- Returns:
A copy of the model with included, excluded and updated fields as specified.
-
data_source:
Literal
['file'
,'hawk'
]
-
datemax:
date
-
datemin:
date
- dict(*, include=None, exclude=None, by_alias=False, exclude_unset=False, exclude_defaults=False, exclude_none=False)
- Return type:
Dict
[str
,Any
]
-
files:
str
- classmethod from_orm(obj)
- Return type:
Self
- json(*, include=None, exclude=None, by_alias=False, exclude_unset=False, exclude_defaults=False, exclude_none=False, encoder=PydanticUndefined, models_as_dict=PydanticUndefined, **dumps_kwargs)
- Return type:
str
- model_computed_fields = {}
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod model_construct(_fields_set=None, **values)
Creates a new instance of the Model class with validated data.
Creates a new model setting __dict__ and __pydantic_fields_set__ from trusted or pre-validated data. Default values are respected, but no other validation is performed.
- Return type:
Self
- !!! note
model_construct() generally respects the model_config.extra setting on the provided model. That is, if model_config.extra == ‘allow’, then all extra passed values are added to the model instance’s __dict__ and __pydantic_extra__ fields. If model_config.extra == ‘ignore’ (the default), then all extra passed values are ignored. Because no validation is performed with a call to model_construct(), having model_config.extra == ‘forbid’ does not result in an error if extra values are passed, but they will be ignored.
- Parameters:
_fields_set – A set of field names that were originally explicitly set during instantiation. If provided, this is directly used for the [model_fields_set][pydantic.BaseModel.model_fields_set] attribute. Otherwise, the field names from the values argument will be used.
values – Trusted or pre-validated data dictionary.
- Returns:
A new instance of the Model class with validated data.
- model_copy(*, update=None, deep=False)
- Return type:
Self
- !!! abstract “Usage Documentation”
[model_copy](../concepts/serialization.md#model_copy)
Returns a copy of the model.
- !!! note
The underlying instance’s [__dict__][object.__dict__] attribute is copied. This might have unexpected side effects if you store anything in it, on top of the model fields (e.g. the value of [cached properties][functools.cached_property]).
- Parameters:
update – Values to change/add in the new model. Note: the data is not validated before creating the new model. You should trust this data.
deep – Set to True to make a deep copy of the model.
- Returns:
New model instance.
- model_dump(*, mode='python', include=None, exclude=None, context=None, by_alias=None, exclude_unset=False, exclude_defaults=False, exclude_none=False, round_trip=False, warnings=True, fallback=None, serialize_as_any=False)
- Return type:
dict
[str
,Any
]
- !!! abstract “Usage Documentation”
[model_dump](../concepts/serialization.md#modelmodel_dump)
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
- Parameters:
mode – The mode in which to_python should run. If mode is ‘json’, the output will only contain JSON serializable types. If mode is ‘python’, the output may contain non-JSON-serializable Python objects.
include – A set of fields to include in the output.
exclude – A set of fields to exclude from the output.
context – Additional context to pass to the serializer.
by_alias – Whether to use the field’s alias in the dictionary key if defined.
exclude_unset – Whether to exclude fields that have not been explicitly set.
exclude_defaults – Whether to exclude fields that are set to their default value.
exclude_none – Whether to exclude fields that have a value of None.
round_trip – If True, dumped values should be valid as input for non-idempotent types such as Json[T].
warnings – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].
fallback – A function to call when an unknown value is encountered. If not provided, a [PydanticSerializationError][pydantic_core.PydanticSerializationError] error is raised.
serialize_as_any – Whether to serialize fields with duck-typing serialization behavior.
- Returns:
A dictionary representation of the model.
- model_dump_json(*, indent=None, include=None, exclude=None, context=None, by_alias=None, exclude_unset=False, exclude_defaults=False, exclude_none=False, round_trip=False, warnings=True, fallback=None, serialize_as_any=False)
- Return type:
str
- !!! abstract “Usage Documentation”
[model_dump_json](../concepts/serialization.md#modelmodel_dump_json)
Generates a JSON representation of the model using Pydantic’s to_json method.
- Parameters:
indent – Indentation to use in the JSON output. If None is passed, the output will be compact.
include – Field(s) to include in the JSON output.
exclude – Field(s) to exclude from the JSON output.
context – Additional context to pass to the serializer.
by_alias – Whether to serialize using field aliases.
exclude_unset – Whether to exclude fields that have not been explicitly set.
exclude_defaults – Whether to exclude fields that are set to their default value.
exclude_none – Whether to exclude fields that have a value of None.
round_trip – If True, dumped values should be valid as input for non-idempotent types such as Json[T].
warnings – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].
fallback – A function to call when an unknown value is encountered. If not provided, a [PydanticSerializationError][pydantic_core.PydanticSerializationError] error is raised.
serialize_as_any – Whether to serialize fields with duck-typing serialization behavior.
- Returns:
A JSON string representation of the model.
- property model_extra: dict[str, Any] | None
Get extra fields set during validation.
- Returns:
A dictionary of extra fields, or None if config.extra is not set to “allow”.
- model_fields = {'data_source': FieldInfo(annotation=Literal['file', 'hawk'], required=True), 'datemax': FieldInfo(annotation=date, required=True), 'datemin': FieldInfo(annotation=date, required=True), 'files': FieldInfo(annotation=str, required=True), 'nfile': FieldInfo(annotation=int, required=False, default=<function conint>), 'nread': FieldInfo(annotation=int, required=False, default=typing.Annotated[int, None, Interval(gt=None, ge=0, lt=None, le=None), None]), 'nskip': FieldInfo(annotation=int, required=False, default=<function conint>), 'output_format': FieldInfo(annotation=Literal['parquet', 'csv'], required=True), 'path': FieldInfo(annotation=Path, required=True), 'sampling_rate': FieldInfo(annotation=float, required=False, default=typing.Annotated[float, None, Interval(gt=None, ge=0, lt=None, le=None), None, None])}
- property model_fields_set: set[str]
Returns the set of fields that have been explicitly set on this model instance.
- Returns:
- A set of strings representing the fields that have been set,
i.e. that were not filled from defaults.
- classmethod model_json_schema(by_alias=True, ref_template='#/$defs/{model}', schema_generator=<class 'pydantic.json_schema.GenerateJsonSchema'>, mode='validation')
Generates a JSON schema for a model class.
- Return type:
dict
[str
,Any
]- Parameters:
by_alias – Whether to use attribute aliases or not.
ref_template – The reference template.
schema_generator – To override the logic used to generate the JSON schema, as a subclass of GenerateJsonSchema with your desired modifications
mode – The mode in which to generate the schema.
- Returns:
The JSON schema for the given model class.
- classmethod model_parametrized_name(params)
Compute the class name for parametrizations of generic classes.
This method can be overridden to achieve a custom naming scheme for generic BaseModels.
- Return type:
str
- Parameters:
params – Tuple of types of the class. Given a generic class Model with 2 type variables and a concrete model Model[str, int], the value (str, int) would be passed to params.
- Returns:
String representing the new class where params are passed to cls as type variables.
- Raises:
TypeError – Raised when trying to generate concrete names for non-generic models.
- model_post_init(context, /)
Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.
- Return type:
None
- classmethod model_rebuild(*, force=False, raise_errors=True, _parent_namespace_depth=2, _types_namespace=None)
Try to rebuild the pydantic-core schema for the model.
This may be necessary when one of the annotations is a ForwardRef which could not be resolved during the initial attempt to build the schema, and automatic rebuilding fails.
- Return type:
bool
|None
- Parameters:
force – Whether to force the rebuilding of the model schema, defaults to False.
raise_errors – Whether to raise errors, defaults to True.
_parent_namespace_depth – The depth level of the parent namespace, defaults to 2.
_types_namespace – The types namespace, defaults to None.
- Returns:
Returns None if the schema is already “complete” and rebuilding was not required. If rebuilding _was_ required, returns True if rebuilding was successful, otherwise False.
- classmethod model_validate(obj, *, strict=None, from_attributes=None, context=None, by_alias=None, by_name=None)
Validate a pydantic model instance.
- Return type:
Self
- Parameters:
obj – The object to validate.
strict – Whether to enforce types strictly.
from_attributes – Whether to extract data from object attributes.
context – Additional context to pass to the validator.
by_alias – Whether to use the field’s alias when validating against the provided input data.
by_name – Whether to use the field’s name when validating against the provided input data.
- Raises:
ValidationError – If the object could not be validated.
- Returns:
The validated model instance.
- classmethod model_validate_json(json_data, *, strict=None, context=None, by_alias=None, by_name=None)
- Return type:
Self
- !!! abstract “Usage Documentation”
[JSON Parsing](../concepts/json.md#json-parsing)
Validate the given JSON data against the Pydantic model.
- Parameters:
json_data – The JSON data to validate.
strict – Whether to enforce types strictly.
context – Extra variables to pass to the validator.
by_alias – Whether to use the field’s alias when validating against the provided input data.
by_name – Whether to use the field’s name when validating against the provided input data.
- Returns:
The validated Pydantic model.
- Raises:
ValidationError – If json_data is not a JSON string or the object could not be validated.
- classmethod model_validate_strings(obj, *, strict=None, context=None, by_alias=None, by_name=None)
Validate the given object with string data against the Pydantic model.
- Return type:
Self
- Parameters:
obj – The object containing string data to validate.
strict – Whether to enforce types strictly.
context – Extra variables to pass to the validator.
by_alias – Whether to use the field’s alias when validating against the provided input data.
by_name – Whether to use the field’s name when validating against the provided input data.
- Returns:
The validated Pydantic model.
-
nfile:
int
-
nread:
int
-
nskip:
int
-
output_format:
Literal
['parquet'
,'csv'
]
- classmethod parse_file(path, *, content_type=None, encoding='utf8', proto=None, allow_pickle=False)
- Return type:
Self
- classmethod parse_obj(obj)
- Return type:
Self
- classmethod parse_raw(b, *, content_type=None, encoding='utf8', proto=None, allow_pickle=False)
- Return type:
Self
-
path:
Path
-
sampling_rate:
float
- classmethod schema(by_alias=True, ref_template='#/$defs/{model}')
- Return type:
Dict
[str
,Any
]
- classmethod schema_json(*, by_alias=True, ref_template='#/$defs/{model}', **dumps_kwargs)
- Return type:
str
- classmethod update_forward_refs(**localns)
- Return type:
None
- classmethod validate(value)
- Return type:
Self

worker: flight_worker

Package flight_worker
flight_worker: module examples Auteur: Jean-Pierre Morard Copyright: Thales SIX GTS France SAS
- class flight_worker.flight_worker.FlightWorker[source]
Bases:
AgiDataWorker
Class derived from AgiDataWorker
- static build(app, target_worker, dask_home, worker, mode=0, verbose=0)
Function to build target code on a my_code_AgiWorker.
- Parameters:
app (str) – app to build
target_worker (str) – module to build
dask_home (str) – path to dask home
worker – current worker
mode – (Default value = 0)
verbose – (Default value = 0)
Returns:
- calculate_speed(new_column_name, df)[source]
Compute the speed (in meters) between consecutive coordinate pairs and add it to the DataFrame under the provided column name. Assumes that the previous coordinate columns are already present.
- Return type:
DataFrame
- cython_decorators = ['njit']
- dask_home = None
- static do_works(workers_tree, workers_tree_info)
run of workers
- Parameters:
chunk – distribution tree
chunks
Returns:
- static exec(cmd, path, worker)
execute a command within a subprocess
- Parameters:
cmd – the str of the command
path – the path where to lunch the command
worker
Returns:
- exec_mono_process(workers_tree, workers_tree_info)
Executes tasks in single-threaded mode.
- Return type:
None
- Parameters:
workers_tree (any) – Distribution tree structure.
workers_tree_info (any) – Additional information about the workers.
- exec_multi_process(workers_tree, workers_tree_info)
Executes tasks in multiprocessing mode.
- Return type:
None
- Parameters:
workers_tree (any) – Distribution tree structure.
workers_tree_info (any) – Additional information about the workers.
- static expand(path, base_directory=None)
Expand a given path to an absolute path.
- Parameters:
path (str) – The path to expand.
base_directory (str, optional) – The base directory to use for expanding the path. Defaults to None.
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- static get_worker_info(worker_id)
def get_worker_info():
- Parameters:
worker_id
Returns:
- home_dir = None
- is_managed_pc = False
- static join(path1, path2)
Join two file paths.
- Parameters:
path1 (str) – The first file path.
path2 (str) – The second file path.
- Returns:
The combined file path.
- Return type:
str
- Raises:
None –
- logs = None
- mode = None
- static new(target_module, target_class, target_package, mode=None, verbose=0, worker_id=0, worker='localhost', env=None, args=None)
new worker instance :param module: instanciate and load target my_code_worker module :param target_module: :param target_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)
Returns:
- static normalize_path(path)
- onerror(path, exc_info)
Error handler for shutil.rmtree.
If the error is due to an access error (read only file) it attempts to add write permission and then retries.
If the error is for another reason it re-raises the error.
Usage : shutil.rmtree(path, onerror=onerror)
- Parameters:
func
path
exc_info
Returns:
- pool_init(worker_vars)[source]
Initialize the pool with worker variables.
- Parameters:
worker_vars (dict) – Variables specific to the worker.
- pool_vars = {}
- preprocess_df(df)[source]
Preprocess the DataFrame by parsing the date column and creating previous coordinate columns. This operation is done once per file.
- Return type:
DataFrame
- static run(app, workers={'127.0.0.1': 1}, mode=0, verbose=3, args=None)
- t0 = None
- verbose = 1
- work_done(worker_df)[source]
Concatenate dataframe if any and save the results.
- Parameters:
worker_df (pl.DataFrame) – Output dataframe for one plane.
- work_pool(file)[source]
Parse IVQ log files.
- Parameters:
file (str) – The log file to parse.
- Returns:
Parsed data.
- Return type:
pl.DataFrame
- worker = None
- worker_id = None
- works(workers_tree, workers_tree_info)
Executes worker tasks based on the distribution tree.
- Return type:
float
- Parameters:
workers_tree (any) – Distribution tree structure.
workers_tree_info (any) – Additional information about the workers.
- Returns:
Execution time in seconds.
- Return type:
float
