Example2: Flight-Project
manager: flight

- class flight.flight.Flight(env, **args)[source]
Bases:
AgiManager
Flight class provides methods to orchester the run
- __init__(env, **args)[source]
Initialize a Flight object with provided arguments.
- Parameters:
**args (Unpack[FlightArgs]) –
Keyword arguments to configure the Flight object. Possible arguments include:
data_source (str): Source of the data, either ‘file’ or ‘hawk’.
files (str): Path pattern or file name.
path (str): Path to store data files. remark: There is also src/flight_worker/dataset.7z for dataset replication per worker
nfile (int): Maximum number of files to process.
datemin (str): Minimum date for data processing.
datemax (str): Maximum date for data processing.
output_format (str): Output format for processed data, either ‘parquet’ or ‘csv’.
- Raises:
ValueError – If an invalid input mode is provided for data_source.
- args = {}
- build_distribution()[source]
build_distrib: to provide the list of files per planes (level1) and per workers (level2) the level 1 has been think to prevent that à job that requires all the output-data of a plane have to wait for another my_code_worker which would have collapse the overall performance
Args:
Returns:
- static convert_functions_to_names(workers_tree)
Converts functions in a nested structure to their names.
- data_out
remove dataframe files from previous run
- static do_distrib(inst, agi_env, workers)
Build the distribution tree.
- Parameters:
inst – The instance for building the distribution tree.
- Returns:
None
- static extract_plane_from_file_name(file_path)[source]
provide airplane id from log file name
- Parameters:
file_path
Returns:
- get_partition_by_planes(df)[source]
build the first level of the distribution tree with planes as atomics partition
- Parameters:
s – df: dataframe containing the output-data to partition
df
Returns:
- ivq_logs = None
- static onerror(func, path, exc_info)
Error handler for shutil.rmtree.
If the error is due to an access error (read-only file), it attempts to add write permission and then retries.
If the error is for another reason, it re-raises the error.
Usage: shutil.rmtree(path, onerror=onerror)
- Parameters:
func (function) – The function that raised the error.
path (str) – The path name passed to the function.
exc_info (tuple) – The exception information returned by sys.exc_info().
- Returns:
None
- verbose = None
- class flight.flight.FlightArgs(**data)[source]
Bases:
BaseModel
FlightArgs contains Arguments for Flight
- __init__(**data)
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- classmethod check_date(v, values)[source]
Check if the given date is greater than a specified minimum date.
- Parameters:
v (datetime.date) – The date to be validated.
values (dict) – A dictionary containing the values of all fields.
- Returns:
The validated date.
- Return type:
datetime.date
- Raises:
ValueError – If the input date is not greater than “2020/01/01”.
- classmethod check_date_order(v, values)[source]
Check the order of dates and validate the ‘datemax’ value.
- Parameters:
cls – The class itself.
v – The ‘datemax’ value to be validated.
values (dict) – A dictionary containing the input values.
- Returns:
The validated ‘datemax’ value.
- Return type:
Any
- Raises:
ValueError – If ‘datemax’ is not after ‘datemin’ or after “2021/06/01”.
- classmethod check_valid_regex(value)[source]
Check if the input string is a valid regular expression.
- Parameters:
value (str) – The string to be validated as a regex.
- Returns:
The input string if it is a valid regex.
- Return type:
str
- Raises:
ValueError – If the input string is not a valid regex.
- classmethod construct(_fields_set=None, **values)
- Return type:
Self
- copy(*, include=None, exclude=None, update=None, deep=False)
Returns a copy of the model.
- !!! warning “Deprecated”
This method is now deprecated; use model_copy instead.
If you need include or exclude, use:
`python {test="skip" lint="skip"} data = self.model_dump(include=include, exclude=exclude, round_trip=True) data = {**data, **(update or {})} copied = self.model_validate(data) `
- Parameters:
include – Optional set or mapping specifying which fields to include in the copied model.
exclude – Optional set or mapping specifying which fields to exclude in the copied model.
update – Optional dictionary of field-value pairs to override field values in the copied model.
deep – If True, the values of fields that are Pydantic models will be deep-copied.
- Returns:
A copy of the model with included, excluded and updated fields as specified.
-
data_source:
Literal
['file'
,'hawk'
]
-
datemax:
date
-
datemin:
date
- dict(*, include=None, exclude=None, by_alias=False, exclude_unset=False, exclude_defaults=False, exclude_none=False)
- Return type:
Dict
[str
,Any
]
-
files:
str
- classmethod from_orm(obj)
- Return type:
Self
- json(*, include=None, exclude=None, by_alias=False, exclude_unset=False, exclude_defaults=False, exclude_none=False, encoder=PydanticUndefined, models_as_dict=PydanticUndefined, **dumps_kwargs)
- Return type:
str
- model_computed_fields = {}
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- classmethod model_construct(_fields_set=None, **values)
Creates a new instance of the Model class with validated data.
Creates a new model setting __dict__ and __pydantic_fields_set__ from trusted or pre-validated data. Default values are respected, but no other validation is performed.
- Return type:
Self
- !!! note
model_construct() generally respects the model_config.extra setting on the provided model. That is, if model_config.extra == ‘allow’, then all extra passed values are added to the model instance’s __dict__ and __pydantic_extra__ fields. If model_config.extra == ‘ignore’ (the default), then all extra passed values are ignored. Because no validation is performed with a call to model_construct(), having model_config.extra == ‘forbid’ does not result in an error if extra values are passed, but they will be ignored.
- Parameters:
_fields_set – A set of field names that were originally explicitly set during instantiation. If provided, this is directly used for the [model_fields_set][pydantic.BaseModel.model_fields_set] attribute. Otherwise, the field names from the values argument will be used.
values – Trusted or pre-validated data dictionary.
- Returns:
A new instance of the Model class with validated data.
- model_copy(*, update=None, deep=False)
- Return type:
Self
- !!! abstract “Usage Documentation”
[model_copy](../concepts/serialization.md#model_copy)
Returns a copy of the model.
- !!! note
The underlying instance’s [__dict__][object.__dict__] attribute is copied. This might have unexpected side effects if you store anything in it, on top of the model fields (e.g. the value of [cached properties][functools.cached_property]).
- Parameters:
update – Values to change/add in the new model. Note: the data is not validated before creating the new model. You should trust this data.
deep – Set to True to make a deep copy of the model.
- Returns:
New model instance.
- model_dump(*, mode='python', include=None, exclude=None, context=None, by_alias=None, exclude_unset=False, exclude_defaults=False, exclude_none=False, round_trip=False, warnings=True, fallback=None, serialize_as_any=False)
- Return type:
dict
[str
,Any
]
- !!! abstract “Usage Documentation”
[model_dump](../concepts/serialization.md#modelmodel_dump)
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
- Parameters:
mode – The mode in which to_python should run. If mode is ‘json’, the output will only contain JSON serializable types. If mode is ‘python’, the output may contain non-JSON-serializable Python objects.
include – A set of fields to include in the output.
exclude – A set of fields to exclude from the output.
context – Additional context to pass to the serializer.
by_alias – Whether to use the field’s alias in the dictionary key if defined.
exclude_unset – Whether to exclude fields that have not been explicitly set.
exclude_defaults – Whether to exclude fields that are set to their default value.
exclude_none – Whether to exclude fields that have a value of None.
round_trip – If True, dumped values should be valid as input for non-idempotent types such as Json[T].
warnings – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].
fallback – A function to call when an unknown value is encountered. If not provided, a [PydanticSerializationError][pydantic_core.PydanticSerializationError] error is raised.
serialize_as_any – Whether to serialize fields with duck-typing serialization behavior.
- Returns:
A dictionary representation of the model.
- model_dump_json(*, indent=None, include=None, exclude=None, context=None, by_alias=None, exclude_unset=False, exclude_defaults=False, exclude_none=False, round_trip=False, warnings=True, fallback=None, serialize_as_any=False)
- Return type:
str
- !!! abstract “Usage Documentation”
[model_dump_json](../concepts/serialization.md#modelmodel_dump_json)
Generates a JSON representation of the model using Pydantic’s to_json method.
- Parameters:
indent – Indentation to use in the JSON output. If None is passed, the output will be compact.
include – Field(s) to include in the JSON output.
exclude – Field(s) to exclude from the JSON output.
context – Additional context to pass to the serializer.
by_alias – Whether to serialize using field aliases.
exclude_unset – Whether to exclude fields that have not been explicitly set.
exclude_defaults – Whether to exclude fields that are set to their default value.
exclude_none – Whether to exclude fields that have a value of None.
round_trip – If True, dumped values should be valid as input for non-idempotent types such as Json[T].
warnings – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].
fallback – A function to call when an unknown value is encountered. If not provided, a [PydanticSerializationError][pydantic_core.PydanticSerializationError] error is raised.
serialize_as_any – Whether to serialize fields with duck-typing serialization behavior.
- Returns:
A JSON string representation of the model.
- property model_extra: dict[str, Any] | None
Get extra fields set during validation.
- Returns:
A dictionary of extra fields, or None if config.extra is not set to “allow”.
- model_fields = {'data_source': FieldInfo(annotation=Literal['file', 'hawk'], required=True), 'datemax': FieldInfo(annotation=date, required=True), 'datemin': FieldInfo(annotation=date, required=True), 'files': FieldInfo(annotation=str, required=True), 'nfile': FieldInfo(annotation=int, required=False, default=<function conint>), 'nread': FieldInfo(annotation=int, required=False, default=typing.Annotated[int, None, Interval(gt=None, ge=0, lt=None, le=None), None]), 'nskip': FieldInfo(annotation=int, required=False, default=<function conint>), 'output_format': FieldInfo(annotation=Literal['parquet', 'csv'], required=True), 'path': FieldInfo(annotation=Path, required=True), 'sampling_rate': FieldInfo(annotation=float, required=False, default=typing.Annotated[float, None, Interval(gt=None, ge=0, lt=None, le=None), None, None])}
- property model_fields_set: set[str]
Returns the set of fields that have been explicitly set on this model instance.
- Returns:
- A set of strings representing the fields that have been set,
i.e. that were not filled from defaults.
- classmethod model_json_schema(by_alias=True, ref_template='#/$defs/{model}', schema_generator=<class 'pydantic.json_schema.GenerateJsonSchema'>, mode='validation')
Generates a JSON schema for a model class.
- Return type:
dict
[str
,Any
]- Parameters:
by_alias – Whether to use attribute aliases or not.
ref_template – The reference template.
schema_generator – To override the logic used to generate the JSON schema, as a subclass of GenerateJsonSchema with your desired modifications
mode – The mode in which to generate the schema.
- Returns:
The JSON schema for the given model class.
- classmethod model_parametrized_name(params)
Compute the class name for parametrizations of generic classes.
This method can be overridden to achieve a custom naming scheme for generic BaseModels.
- Return type:
str
- Parameters:
params – Tuple of types of the class. Given a generic class Model with 2 type variables and a concrete model Model[str, int], the value (str, int) would be passed to params.
- Returns:
String representing the new class where params are passed to cls as type variables.
- Raises:
TypeError – Raised when trying to generate concrete names for non-generic models.
- model_post_init(context, /)
Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.
- Return type:
None
- classmethod model_rebuild(*, force=False, raise_errors=True, _parent_namespace_depth=2, _types_namespace=None)
Try to rebuild the pydantic-core schema for the model.
This may be necessary when one of the annotations is a ForwardRef which could not be resolved during the initial attempt to build the schema, and automatic rebuilding fails.
- Return type:
bool
|None
- Parameters:
force – Whether to force the rebuilding of the model schema, defaults to False.
raise_errors – Whether to raise errors, defaults to True.
_parent_namespace_depth – The depth level of the parent namespace, defaults to 2.
_types_namespace – The types namespace, defaults to None.
- Returns:
Returns None if the schema is already “complete” and rebuilding was not required. If rebuilding _was_ required, returns True if rebuilding was successful, otherwise False.
- classmethod model_validate(obj, *, strict=None, from_attributes=None, context=None, by_alias=None, by_name=None)
Validate a pydantic model instance.
- Return type:
Self
- Parameters:
obj – The object to validate.
strict – Whether to enforce types strictly.
from_attributes – Whether to extract data from object attributes.
context – Additional context to pass to the validator.
by_alias – Whether to use the field’s alias when validating against the provided input data.
by_name – Whether to use the field’s name when validating against the provided input data.
- Raises:
ValidationError – If the object could not be validated.
- Returns:
The validated model instance.
- classmethod model_validate_json(json_data, *, strict=None, context=None, by_alias=None, by_name=None)
- Return type:
Self
- !!! abstract “Usage Documentation”
[JSON Parsing](../concepts/json.md#json-parsing)
Validate the given JSON data against the Pydantic model.
- Parameters:
json_data – The JSON data to validate.
strict – Whether to enforce types strictly.
context – Extra variables to pass to the validator.
by_alias – Whether to use the field’s alias when validating against the provided input data.
by_name – Whether to use the field’s name when validating against the provided input data.
- Returns:
The validated Pydantic model.
- Raises:
ValidationError – If json_data is not a JSON string or the object could not be validated.
- classmethod model_validate_strings(obj, *, strict=None, context=None, by_alias=None, by_name=None)
Validate the given object with string data against the Pydantic model.
- Return type:
Self
- Parameters:
obj – The object containing string data to validate.
strict – Whether to enforce types strictly.
context – Extra variables to pass to the validator.
by_alias – Whether to use the field’s alias when validating against the provided input data.
by_name – Whether to use the field’s name when validating against the provided input data.
- Returns:
The validated Pydantic model.
-
nfile:
int
-
nread:
int
-
nskip:
int
-
output_format:
Literal
['parquet'
,'csv'
]
- classmethod parse_file(path, *, content_type=None, encoding='utf8', proto=None, allow_pickle=False)
- Return type:
Self
- classmethod parse_obj(obj)
- Return type:
Self
- classmethod parse_raw(b, *, content_type=None, encoding='utf8', proto=None, allow_pickle=False)
- Return type:
Self
-
path:
Path
-
sampling_rate:
float
- classmethod schema(by_alias=True, ref_template='#/$defs/{model}')
- Return type:
Dict
[str
,Any
]
- classmethod schema_json(*, by_alias=True, ref_template='#/$defs/{model}', **dumps_kwargs)
- Return type:
str
- classmethod update_forward_refs(**localns)
- Return type:
None
- classmethod validate(value)
- Return type:
Self

worker: flight_worker

- class flight_worker.flight_worker.FlightWorker[source]
Bases:
PolarsWorker
Class derived from AgiDataWorker
- static build(target_worker, dask_home, worker, mode=0, verbose=0)
Function to build target code on a target Worker.
- Parameters:
target_worker (str) – module to build
dask_home (str) – path to dask home
worker – current worker
mode – (Default value = 0)
verbose – (Default value = 0)
- calculate_speed(new_column_name, df)[source]
Compute the speed (in meters) between consecutive coordinate pairs and add it to the DataFrame under the provided column name. Assumes that the previous coordinate columns are already present.
- Return type:
DataFrame
- cython_decorators = ['njit']
- dask_home = None
- static do_works(workers_tree, workers_tree_info)
run of workers
- Parameters:
chunk – distribution tree
chunks
Returns:
- env = None
- static exec(cmd, path, worker)
execute a command within a subprocess
- Parameters:
cmd – the str of the command
path – the path where to lunch the command
worker
Returns:
- exec_mono_process(workers_tree, workers_tree_info)
Executes tasks in single-threaded mode.
- Return type:
None
- Parameters:
workers_tree (any) – Distribution tree structure.
workers_tree_info (any) – Additional information about the workers.
- exec_multi_process(workers_tree, workers_tree_info)
Executes tasks in multiprocessing mode.
- Return type:
None
- Parameters:
workers_tree (any) – Distribution tree structure.
workers_tree_info (any) – Additional information about the workers.
- static expand(path, base_directory=None)
Expand a given path to an absolute path. :param path: The path to expand. :type path: str :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional
- Returns:
The expanded absolute path.
- Return type:
str
- Raises:
None –
Note
This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.
- static expand_and_join(path1, path2)
Join two paths after expanding the first path.
- Parameters:
path1 (str) – The first path to expand and join.
path2 (str) – The second path to join with the expanded first path.
- Returns:
The joined path.
- Return type:
str
- get_logs_and_result(*args, verbosity=50, **kwargs)
- static get_worker_info(worker_id)
def get_worker_info():
- Parameters:
worker_id
Returns:
- home_dir = None
- is_managed_pc = False
- static join(path1, path2)
Join two file paths.
- Parameters:
path1 (str) – The first file path.
path2 (str) – The second file path.
- Returns:
The combined file path.
- Return type:
str
- Raises:
None –
- logs = None
- mode = None
- static new(app, mode=None, env=None, verbose=0, worker_id=0, worker='localhost', args=None)
new worker instance :param module: instanciate and load target my_code_worker module :param target_worker: :param target_worker_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)
Returns:
- static onerror(func, path, exc_info)
Error handler for shutil.rmtree. If it’s a permission error, make it writable and retry. Otherwise re-raise.
- pool_init(worker_vars)[source]
Initialize the pool with worker variables.
- Parameters:
worker_vars (dict) – Variables specific to the worker.
- pool_vars = {}
- preprocess_df(df)[source]
Preprocess the DataFrame by parsing the date column and creating previous coordinate columns. This operation is done once per file.
- Return type:
DataFrame
- static run(workers={'127.0.0.1': 1}, mode=0, env=None, verbose=None, args=None)
- Parameters:
app
workers
mode
verbose
args
- Returns:
- t0 = None
- verbose = 1
- work_done(worker_df)[source]
Concatenate dataframe if any and save the results.
- Parameters:
worker_df (pl.DataFrame) – Output dataframe for one plane.
- work_pool(file)[source]
Parse IVQ log files.
- Parameters:
file (str) – The log file to parse.
- Returns:
Parsed data.
- Return type:
pl.DataFrame
- worker = None
- worker_id = None
- works(workers_tree, workers_tree_info)
Executes worker tasks based on the distribution tree.
- Return type:
float
- Parameters:
workers_tree (any) – Distribution tree structure.
workers_tree_info (any) – Additional information about the workers.
- Returns:
Execution time in seconds.
- Return type:
float
