Example1: My-Code-Project

manager: my_code

Alternative text

Package your_code

my_code: module my_code Auteur: Jean-Pierre Morard Copyright: Thales SIX GTS France SAS

class my_code.my_code.MyCode(env, **args)[source]

Bases: AgiManager

Class MyCode provides methods to orchestrate the run

__init__(env, **args)[source]

Initialize the object with the provided keyword arguments.

Parameters:

**args (Unpack[MyCodeArgs]) – Keyword arguments to initialize the object.

Returns:

None

args = {}
build_distribution()[source]

Build distribution as a calling graph.

static convert_functions_to_names(workers_tree)

Converts functions in a nested structure to their names.

static do_distrib(inst, agi_env, workers)

Build the distribution tree.

Parameters:

inst – The instance for building the distribution tree.

Returns:

None

static onerror(func, path, exc_info)

Error handler for shutil.rmtree.

If the error is due to an access error (read-only file), it attempts to add write permission and then retries.

If the error is for another reason, it re-raises the error.

Usage: shutil.rmtree(path, onerror=onerror)

Parameters:
  • func (function) – The function that raised the error.

  • path (str) – The path name passed to the function.

  • exc_info (tuple) – The exception information returned by sys.exc_info().

Returns:

None

verbose = None
class my_code.my_code.MyCodeArgs(**data)[source]

Bases: BaseModel

Class MyCodeArgs contains Arguments for MyCode

__init__(**data)

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

classmethod construct(_fields_set=None, **values)
Return type:

Self

copy(*, include=None, exclude=None, update=None, deep=False)

Returns a copy of the model.

!!! warning “Deprecated”

This method is now deprecated; use model_copy instead.

If you need include or exclude, use:

`python {test="skip" lint="skip"} data = self.model_dump(include=include, exclude=exclude, round_trip=True) data = {**data, **(update or {})} copied = self.model_validate(data) `

Parameters:
  • include – Optional set or mapping specifying which fields to include in the copied model.

  • exclude – Optional set or mapping specifying which fields to exclude in the copied model.

  • update – Optional dictionary of field-value pairs to override field values in the copied model.

  • deep – If True, the values of fields that are Pydantic models will be deep-copied.

Returns:

A copy of the model with included, excluded and updated fields as specified.

dict(*, include=None, exclude=None, by_alias=False, exclude_unset=False, exclude_defaults=False, exclude_none=False)
Return type:

Dict[str, Any]

classmethod from_orm(obj)
Return type:

Self

json(*, include=None, exclude=None, by_alias=False, exclude_unset=False, exclude_defaults=False, exclude_none=False, encoder=PydanticUndefined, models_as_dict=PydanticUndefined, **dumps_kwargs)
Return type:

str

model_computed_fields = {}
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod model_construct(_fields_set=None, **values)

Creates a new instance of the Model class with validated data.

Creates a new model setting __dict__ and __pydantic_fields_set__ from trusted or pre-validated data. Default values are respected, but no other validation is performed.

Return type:

Self

!!! note

model_construct() generally respects the model_config.extra setting on the provided model. That is, if model_config.extra == ‘allow’, then all extra passed values are added to the model instance’s __dict__ and __pydantic_extra__ fields. If model_config.extra == ‘ignore’ (the default), then all extra passed values are ignored. Because no validation is performed with a call to model_construct(), having model_config.extra == ‘forbid’ does not result in an error if extra values are passed, but they will be ignored.

Parameters:
  • _fields_set – A set of field names that were originally explicitly set during instantiation. If provided, this is directly used for the [model_fields_set][pydantic.BaseModel.model_fields_set] attribute. Otherwise, the field names from the values argument will be used.

  • values – Trusted or pre-validated data dictionary.

Returns:

A new instance of the Model class with validated data.

model_copy(*, update=None, deep=False)
Return type:

Self

!!! abstract “Usage Documentation”

[model_copy](../concepts/serialization.md#model_copy)

Returns a copy of the model.

!!! note

The underlying instance’s [__dict__][object.__dict__] attribute is copied. This might have unexpected side effects if you store anything in it, on top of the model fields (e.g. the value of [cached properties][functools.cached_property]).

Parameters:
  • update – Values to change/add in the new model. Note: the data is not validated before creating the new model. You should trust this data.

  • deep – Set to True to make a deep copy of the model.

Returns:

New model instance.

model_dump(*, mode='python', include=None, exclude=None, context=None, by_alias=None, exclude_unset=False, exclude_defaults=False, exclude_none=False, round_trip=False, warnings=True, fallback=None, serialize_as_any=False)
Return type:

dict[str, Any]

!!! abstract “Usage Documentation”

[model_dump](../concepts/serialization.md#modelmodel_dump)

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

Parameters:
  • mode – The mode in which to_python should run. If mode is ‘json’, the output will only contain JSON serializable types. If mode is ‘python’, the output may contain non-JSON-serializable Python objects.

  • include – A set of fields to include in the output.

  • exclude – A set of fields to exclude from the output.

  • context – Additional context to pass to the serializer.

  • by_alias – Whether to use the field’s alias in the dictionary key if defined.

  • exclude_unset – Whether to exclude fields that have not been explicitly set.

  • exclude_defaults – Whether to exclude fields that are set to their default value.

  • exclude_none – Whether to exclude fields that have a value of None.

  • round_trip – If True, dumped values should be valid as input for non-idempotent types such as Json[T].

  • warnings – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].

  • fallback – A function to call when an unknown value is encountered. If not provided, a [PydanticSerializationError][pydantic_core.PydanticSerializationError] error is raised.

  • serialize_as_any – Whether to serialize fields with duck-typing serialization behavior.

Returns:

A dictionary representation of the model.

model_dump_json(*, indent=None, include=None, exclude=None, context=None, by_alias=None, exclude_unset=False, exclude_defaults=False, exclude_none=False, round_trip=False, warnings=True, fallback=None, serialize_as_any=False)
Return type:

str

!!! abstract “Usage Documentation”

[model_dump_json](../concepts/serialization.md#modelmodel_dump_json)

Generates a JSON representation of the model using Pydantic’s to_json method.

Parameters:
  • indent – Indentation to use in the JSON output. If None is passed, the output will be compact.

  • include – Field(s) to include in the JSON output.

  • exclude – Field(s) to exclude from the JSON output.

  • context – Additional context to pass to the serializer.

  • by_alias – Whether to serialize using field aliases.

  • exclude_unset – Whether to exclude fields that have not been explicitly set.

  • exclude_defaults – Whether to exclude fields that are set to their default value.

  • exclude_none – Whether to exclude fields that have a value of None.

  • round_trip – If True, dumped values should be valid as input for non-idempotent types such as Json[T].

  • warnings – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].

  • fallback – A function to call when an unknown value is encountered. If not provided, a [PydanticSerializationError][pydantic_core.PydanticSerializationError] error is raised.

  • serialize_as_any – Whether to serialize fields with duck-typing serialization behavior.

Returns:

A JSON string representation of the model.

property model_extra: dict[str, Any] | None

Get extra fields set during validation.

Returns:

A dictionary of extra fields, or None if config.extra is not set to “allow”.

model_fields = {'mycode_param1': FieldInfo(annotation=int, required=False, default=<function conint>)}
property model_fields_set: set[str]

Returns the set of fields that have been explicitly set on this model instance.

Returns:

A set of strings representing the fields that have been set,

i.e. that were not filled from defaults.

classmethod model_json_schema(by_alias=True, ref_template='#/$defs/{model}', schema_generator=<class 'pydantic.json_schema.GenerateJsonSchema'>, mode='validation')

Generates a JSON schema for a model class.

Return type:

dict[str, Any]

Parameters:
  • by_alias – Whether to use attribute aliases or not.

  • ref_template – The reference template.

  • schema_generator – To override the logic used to generate the JSON schema, as a subclass of GenerateJsonSchema with your desired modifications

  • mode – The mode in which to generate the schema.

Returns:

The JSON schema for the given model class.

classmethod model_parametrized_name(params)

Compute the class name for parametrizations of generic classes.

This method can be overridden to achieve a custom naming scheme for generic BaseModels.

Return type:

str

Parameters:

params – Tuple of types of the class. Given a generic class Model with 2 type variables and a concrete model Model[str, int], the value (str, int) would be passed to params.

Returns:

String representing the new class where params are passed to cls as type variables.

Raises:

TypeError – Raised when trying to generate concrete names for non-generic models.

model_post_init(context, /)

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

Return type:

None

classmethod model_rebuild(*, force=False, raise_errors=True, _parent_namespace_depth=2, _types_namespace=None)

Try to rebuild the pydantic-core schema for the model.

This may be necessary when one of the annotations is a ForwardRef which could not be resolved during the initial attempt to build the schema, and automatic rebuilding fails.

Return type:

bool | None

Parameters:
  • force – Whether to force the rebuilding of the model schema, defaults to False.

  • raise_errors – Whether to raise errors, defaults to True.

  • _parent_namespace_depth – The depth level of the parent namespace, defaults to 2.

  • _types_namespace – The types namespace, defaults to None.

Returns:

Returns None if the schema is already “complete” and rebuilding was not required. If rebuilding _was_ required, returns True if rebuilding was successful, otherwise False.

classmethod model_validate(obj, *, strict=None, from_attributes=None, context=None, by_alias=None, by_name=None)

Validate a pydantic model instance.

Return type:

Self

Parameters:
  • obj – The object to validate.

  • strict – Whether to enforce types strictly.

  • from_attributes – Whether to extract data from object attributes.

  • context – Additional context to pass to the validator.

  • by_alias – Whether to use the field’s alias when validating against the provided input data.

  • by_name – Whether to use the field’s name when validating against the provided input data.

Raises:

ValidationError – If the object could not be validated.

Returns:

The validated model instance.

classmethod model_validate_json(json_data, *, strict=None, context=None, by_alias=None, by_name=None)
Return type:

Self

!!! abstract “Usage Documentation”

[JSON Parsing](../concepts/json.md#json-parsing)

Validate the given JSON data against the Pydantic model.

Parameters:
  • json_data – The JSON data to validate.

  • strict – Whether to enforce types strictly.

  • context – Extra variables to pass to the validator.

  • by_alias – Whether to use the field’s alias when validating against the provided input data.

  • by_name – Whether to use the field’s name when validating against the provided input data.

Returns:

The validated Pydantic model.

Raises:

ValidationError – If json_data is not a JSON string or the object could not be validated.

classmethod model_validate_strings(obj, *, strict=None, context=None, by_alias=None, by_name=None)

Validate the given object with string data against the Pydantic model.

Return type:

Self

Parameters:
  • obj – The object containing string data to validate.

  • strict – Whether to enforce types strictly.

  • context – Extra variables to pass to the validator.

  • by_alias – Whether to use the field’s alias when validating against the provided input data.

  • by_name – Whether to use the field’s name when validating against the provided input data.

Returns:

The validated Pydantic model.

mycode_param1: int
classmethod parse_file(path, *, content_type=None, encoding='utf8', proto=None, allow_pickle=False)
Return type:

Self

classmethod parse_obj(obj)
Return type:

Self

classmethod parse_raw(b, *, content_type=None, encoding='utf8', proto=None, allow_pickle=False)
Return type:

Self

classmethod schema(by_alias=True, ref_template='#/$defs/{model}')
Return type:

Dict[str, Any]

classmethod schema_json(*, by_alias=True, ref_template='#/$defs/{model}', **dumps_kwargs)
Return type:

str

classmethod update_forward_refs(**localns)
Return type:

None

classmethod validate(value)
Return type:

Self

Alternative text

worker: my_code_worker

Alternative text

Module my_code_worker extension of your_code

Auteur: yourself

class my_code_worker.my_code_worker.MyCodeWorker[source]

Bases: AgiDagWorker

class derived from AgiDagWorker

algo_A()[source]

Perform algorithm A.

This method belongs to MyCodeWorker class.

Parameters:

self – Instance of MyCodeWorker.

Returns:

None

Prints:

str: Print statement indicating the execution of algorithm A.

algo_B()[source]

Prints a message indicating the execution of algo_B method in MyCodeWorker class.

Parameters:

self – Reference to the instance of MyCodeWorker class.

Returns:

None

algo_C()[source]

Prints a message indicating that the algo_C method of MyCodeWorker has been called.

Parameters:

() (self) – The MyCodeWorker instance on which the method is called.

Returns:

None

algo_X()[source]

Perform a specific algorithm X.

This method prints a message indicating the execution of algorithm X.

Parameters:

self – The object instance.

Returns:

None

algo_Y()[source]

Perform algorithm Y.

This method is a part of the MyCodeWorker class.

Parameters:

self – The instance of the MyCodeWorker class.

Returns:

None

algo_Z()[source]

Perform a specific algorithm Z.

This function is part of the MyCodeWorker class.

Returns:

None

static build(app, target_worker, dask_home, worker, mode=0, verbose=0)

Function to build target code on a my_code_AgiWorker.

Parameters:
  • app (str) – app to build

  • target_worker (str) – module to build

  • dask_home (str) – path to dask home

  • worker – current worker

  • mode – (Default value = 0)

  • verbose – (Default value = 0)

Returns:

cython_decorators = ['njit']
dask_home = None
static do_works(workers_tree, workers_tree_info)

run of workers

Parameters:
  • chunk – distribution tree

  • chunks

Returns:

exec(algo)[source]

execute a command within a subprocess

Parameters:
  • cmd – the str of the command

  • path – the path where to lunch the command

  • worker

Returns:

exec_mono_process(workers_tree, workers_tree_info)

Execute tasks in a single process, respecting dependencies.

exec_multi_process(workers_tree, workers_tree_info)

Execute tasks in multiple threads, respecting dependencies.

static expand(path, base_directory=None)

Expand a given path to an absolute path.

Parameters:
  • path (str) – The path to expand.

  • base_directory (str, optional) – The base directory to use for expanding the path. Defaults to None.

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

static get_worker_info(worker_id)

def get_worker_info():

Parameters:

worker_id

Returns:

home_dir = None
is_managed_pc = False
static join(path1, path2)

Join two file paths.

Parameters:
  • path1 (str) – The first file path.

  • path2 (str) – The second file path.

Returns:

The combined file path.

Return type:

str

Raises:

None

logs = None
mode = None
static new(target_module, target_class, target_package, mode=None, verbose=0, worker_id=0, worker='localhost', env=None, args=None)

new worker instance :param module: instanciate and load target my_code_worker module :param target_module: :param target_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)

Returns:

static normalize_path(path)
onerror(path, exc_info)

Error handler for shutil.rmtree.

If the error is due to an access error (read only file) it attempts to add write permission and then retries.

If the error is for another reason it re-raises the error.

Usage : shutil.rmtree(path, onerror=onerror)

Parameters:
  • func

  • path

  • exc_info

Returns:

static run(app, workers={'127.0.0.1': 1}, mode=0, verbose=3, args=None)
share_path = None
start()[source]

Start the function.

This function prints the file name if the ‘verbose’ attribute is greater than 0.

Parameters:

self – The current instance of the class.

Returns:

None

stop()[source]

Stop the current action.

Raises:

NotImplementedError – This method needs to be implemented in a subclass.

t0 = None
topological_sort(dependency_graph)

Perform a topological sort on the dependency graph.

Parameters:

dependency_graph (dict) – A dictionary where keys are functions and values are lists of dependent functions.

Returns:

A list of functions in topologically sorted order.

Return type:

list

Raises:

ValueError – If a cycle is detected in the dependencies.

verbose = 1
worker = None
worker_id = None
works(workers_tree, workers_tree_info)

Run the worker tasks.

Alternative text