Example1: My-Code-Project

manager: my_code

Alternative text

Package your_code

my_code: module my_code Auteur: Jean-Pierre Morard Copyright: Thales SIX GTS France SAS

class my_code.my_code.MyCode(env, **args)[source]

Bases: AgiManager

Class MyCode provides methods to orchestrate the run

__init__(env, **args)[source]

Initialize the object with the provided keyword arguments.

Parameters:

**args (Unpack[MyCodeArgs]) – Keyword arguments to initialize the object.

Returns:

None

args = {}
build_distribution()[source]

Build distribution as a calling graph.

static convert_functions_to_names(workers_tree)

Converts functions in a nested structure to their names.

static do_distrib(inst, agi_env, workers)

Build the distribution tree.

Parameters:

inst – The instance for building the distribution tree.

Returns:

None

static onerror(func, path, exc_info)

Error handler for shutil.rmtree.

If the error is due to an access error (read-only file), it attempts to add write permission and then retries.

If the error is for another reason, it re-raises the error.

Usage: shutil.rmtree(path, onerror=onerror)

Parameters:
  • func (function) – The function that raised the error.

  • path (str) – The path name passed to the function.

  • exc_info (tuple) – The exception information returned by sys.exc_info().

Returns:

None

verbose = None
class my_code.my_code.MyCodeArgs(**data)[source]

Bases: BaseModel

Class MyCodeArgs contains Arguments for MyCode

__init__(**data)

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

classmethod construct(_fields_set=None, **values)
Return type:

Self

copy(*, include=None, exclude=None, update=None, deep=False)

Returns a copy of the model.

!!! warning “Deprecated”

This method is now deprecated; use model_copy instead.

If you need include or exclude, use:

`python {test="skip" lint="skip"} data = self.model_dump(include=include, exclude=exclude, round_trip=True) data = {**data, **(update or {})} copied = self.model_validate(data) `

Parameters:
  • include – Optional set or mapping specifying which fields to include in the copied model.

  • exclude – Optional set or mapping specifying which fields to exclude in the copied model.

  • update – Optional dictionary of field-value pairs to override field values in the copied model.

  • deep – If True, the values of fields that are Pydantic models will be deep-copied.

Returns:

A copy of the model with included, excluded and updated fields as specified.

dict(*, include=None, exclude=None, by_alias=False, exclude_unset=False, exclude_defaults=False, exclude_none=False)
Return type:

Dict[str, Any]

classmethod from_orm(obj)
Return type:

Self

json(*, include=None, exclude=None, by_alias=False, exclude_unset=False, exclude_defaults=False, exclude_none=False, encoder=PydanticUndefined, models_as_dict=PydanticUndefined, **dumps_kwargs)
Return type:

str

model_computed_fields = {}
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

classmethod model_construct(_fields_set=None, **values)

Creates a new instance of the Model class with validated data.

Creates a new model setting __dict__ and __pydantic_fields_set__ from trusted or pre-validated data. Default values are respected, but no other validation is performed.

Return type:

Self

!!! note

model_construct() generally respects the model_config.extra setting on the provided model. That is, if model_config.extra == ‘allow’, then all extra passed values are added to the model instance’s __dict__ and __pydantic_extra__ fields. If model_config.extra == ‘ignore’ (the default), then all extra passed values are ignored. Because no validation is performed with a call to model_construct(), having model_config.extra == ‘forbid’ does not result in an error if extra values are passed, but they will be ignored.

Parameters:
  • _fields_set – A set of field names that were originally explicitly set during instantiation. If provided, this is directly used for the [model_fields_set][pydantic.BaseModel.model_fields_set] attribute. Otherwise, the field names from the values argument will be used.

  • values – Trusted or pre-validated data dictionary.

Returns:

A new instance of the Model class with validated data.

model_copy(*, update=None, deep=False)
Return type:

Self

!!! abstract “Usage Documentation”

[model_copy](../concepts/serialization.md#model_copy)

Returns a copy of the model.

!!! note

The underlying instance’s [__dict__][object.__dict__] attribute is copied. This might have unexpected side effects if you store anything in it, on top of the model fields (e.g. the value of [cached properties][functools.cached_property]).

Parameters:
  • update – Values to change/add in the new model. Note: the data is not validated before creating the new model. You should trust this data.

  • deep – Set to True to make a deep copy of the model.

Returns:

New model instance.

model_dump(*, mode='python', include=None, exclude=None, context=None, by_alias=None, exclude_unset=False, exclude_defaults=False, exclude_none=False, round_trip=False, warnings=True, fallback=None, serialize_as_any=False)
Return type:

dict[str, Any]

!!! abstract “Usage Documentation”

[model_dump](../concepts/serialization.md#modelmodel_dump)

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

Parameters:
  • mode – The mode in which to_python should run. If mode is ‘json’, the output will only contain JSON serializable types. If mode is ‘python’, the output may contain non-JSON-serializable Python objects.

  • include – A set of fields to include in the output.

  • exclude – A set of fields to exclude from the output.

  • context – Additional context to pass to the serializer.

  • by_alias – Whether to use the field’s alias in the dictionary key if defined.

  • exclude_unset – Whether to exclude fields that have not been explicitly set.

  • exclude_defaults – Whether to exclude fields that are set to their default value.

  • exclude_none – Whether to exclude fields that have a value of None.

  • round_trip – If True, dumped values should be valid as input for non-idempotent types such as Json[T].

  • warnings – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].

  • fallback – A function to call when an unknown value is encountered. If not provided, a [PydanticSerializationError][pydantic_core.PydanticSerializationError] error is raised.

  • serialize_as_any – Whether to serialize fields with duck-typing serialization behavior.

Returns:

A dictionary representation of the model.

model_dump_json(*, indent=None, include=None, exclude=None, context=None, by_alias=None, exclude_unset=False, exclude_defaults=False, exclude_none=False, round_trip=False, warnings=True, fallback=None, serialize_as_any=False)
Return type:

str

!!! abstract “Usage Documentation”

[model_dump_json](../concepts/serialization.md#modelmodel_dump_json)

Generates a JSON representation of the model using Pydantic’s to_json method.

Parameters:
  • indent – Indentation to use in the JSON output. If None is passed, the output will be compact.

  • include – Field(s) to include in the JSON output.

  • exclude – Field(s) to exclude from the JSON output.

  • context – Additional context to pass to the serializer.

  • by_alias – Whether to serialize using field aliases.

  • exclude_unset – Whether to exclude fields that have not been explicitly set.

  • exclude_defaults – Whether to exclude fields that are set to their default value.

  • exclude_none – Whether to exclude fields that have a value of None.

  • round_trip – If True, dumped values should be valid as input for non-idempotent types such as Json[T].

  • warnings – How to handle serialization errors. False/”none” ignores them, True/”warn” logs errors, “error” raises a [PydanticSerializationError][pydantic_core.PydanticSerializationError].

  • fallback – A function to call when an unknown value is encountered. If not provided, a [PydanticSerializationError][pydantic_core.PydanticSerializationError] error is raised.

  • serialize_as_any – Whether to serialize fields with duck-typing serialization behavior.

Returns:

A JSON string representation of the model.

property model_extra: dict[str, Any] | None

Get extra fields set during validation.

Returns:

A dictionary of extra fields, or None if config.extra is not set to “allow”.

model_fields = {'mycode_param1': FieldInfo(annotation=int, required=False, default=<function conint>)}
property model_fields_set: set[str]

Returns the set of fields that have been explicitly set on this model instance.

Returns:

A set of strings representing the fields that have been set,

i.e. that were not filled from defaults.

classmethod model_json_schema(by_alias=True, ref_template='#/$defs/{model}', schema_generator=<class 'pydantic.json_schema.GenerateJsonSchema'>, mode='validation')

Generates a JSON schema for a model class.

Return type:

dict[str, Any]

Parameters:
  • by_alias – Whether to use attribute aliases or not.

  • ref_template – The reference template.

  • schema_generator – To override the logic used to generate the JSON schema, as a subclass of GenerateJsonSchema with your desired modifications

  • mode – The mode in which to generate the schema.

Returns:

The JSON schema for the given model class.

classmethod model_parametrized_name(params)

Compute the class name for parametrizations of generic classes.

This method can be overridden to achieve a custom naming scheme for generic BaseModels.

Return type:

str

Parameters:

params – Tuple of types of the class. Given a generic class Model with 2 type variables and a concrete model Model[str, int], the value (str, int) would be passed to params.

Returns:

String representing the new class where params are passed to cls as type variables.

Raises:

TypeError – Raised when trying to generate concrete names for non-generic models.

model_post_init(context, /)

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

Return type:

None

classmethod model_rebuild(*, force=False, raise_errors=True, _parent_namespace_depth=2, _types_namespace=None)

Try to rebuild the pydantic-core schema for the model.

This may be necessary when one of the annotations is a ForwardRef which could not be resolved during the initial attempt to build the schema, and automatic rebuilding fails.

Return type:

bool | None

Parameters:
  • force – Whether to force the rebuilding of the model schema, defaults to False.

  • raise_errors – Whether to raise errors, defaults to True.

  • _parent_namespace_depth – The depth level of the parent namespace, defaults to 2.

  • _types_namespace – The types namespace, defaults to None.

Returns:

Returns None if the schema is already “complete” and rebuilding was not required. If rebuilding _was_ required, returns True if rebuilding was successful, otherwise False.

classmethod model_validate(obj, *, strict=None, from_attributes=None, context=None, by_alias=None, by_name=None)

Validate a pydantic model instance.

Return type:

Self

Parameters:
  • obj – The object to validate.

  • strict – Whether to enforce types strictly.

  • from_attributes – Whether to extract data from object attributes.

  • context – Additional context to pass to the validator.

  • by_alias – Whether to use the field’s alias when validating against the provided input data.

  • by_name – Whether to use the field’s name when validating against the provided input data.

Raises:

ValidationError – If the object could not be validated.

Returns:

The validated model instance.

classmethod model_validate_json(json_data, *, strict=None, context=None, by_alias=None, by_name=None)
Return type:

Self

!!! abstract “Usage Documentation”

[JSON Parsing](../concepts/json.md#json-parsing)

Validate the given JSON data against the Pydantic model.

Parameters:
  • json_data – The JSON data to validate.

  • strict – Whether to enforce types strictly.

  • context – Extra variables to pass to the validator.

  • by_alias – Whether to use the field’s alias when validating against the provided input data.

  • by_name – Whether to use the field’s name when validating against the provided input data.

Returns:

The validated Pydantic model.

Raises:

ValidationError – If json_data is not a JSON string or the object could not be validated.

classmethod model_validate_strings(obj, *, strict=None, context=None, by_alias=None, by_name=None)

Validate the given object with string data against the Pydantic model.

Return type:

Self

Parameters:
  • obj – The object containing string data to validate.

  • strict – Whether to enforce types strictly.

  • context – Extra variables to pass to the validator.

  • by_alias – Whether to use the field’s alias when validating against the provided input data.

  • by_name – Whether to use the field’s name when validating against the provided input data.

Returns:

The validated Pydantic model.

mycode_param1: int
classmethod parse_file(path, *, content_type=None, encoding='utf8', proto=None, allow_pickle=False)
Return type:

Self

classmethod parse_obj(obj)
Return type:

Self

classmethod parse_raw(b, *, content_type=None, encoding='utf8', proto=None, allow_pickle=False)
Return type:

Self

classmethod schema(by_alias=True, ref_template='#/$defs/{model}')
Return type:

Dict[str, Any]

classmethod schema_json(*, by_alias=True, ref_template='#/$defs/{model}', **dumps_kwargs)
Return type:

str

classmethod update_forward_refs(**localns)
Return type:

None

classmethod validate(value)
Return type:

Self

Alternative text

worker: my_code_worker

Alternative text

Module my_code_worker extension of your_code

Auteur: yourself

class my_code_worker.my_code_worker.MyCodeWorker[source]

Bases: DagWorker

class derived from DagWorker

algo_A()[source]

Perform algorithm A.

This method belongs to MyCodeWorker class.

Parameters:

self – Instance of MyCodeWorker.

Returns:

None

Prints:

str: Print statement indicating the execution of algorithm A.

algo_B()[source]

Prints a message indicating the execution of algo_B method in MyCodeWorker class.

Parameters:

self – Reference to the instance of MyCodeWorker class.

Returns:

None

algo_C()[source]

Prints a message indicating that the algo_C method of MyCodeWorker has been called.

Parameters:

() (self) – The MyCodeWorker instance on which the method is called.

Returns:

None

algo_X()[source]

Perform a specific algorithm X.

This method prints a message indicating the execution of algorithm X.

Parameters:

self – The object instance.

Returns:

None

algo_Y()[source]

Perform algorithm Y.

This method is a part of the MyCodeWorker class.

Parameters:

self – The instance of the MyCodeWorker class.

Returns:

None

algo_Z()[source]

Perform a specific algorithm Z.

This function is part of the MyCodeWorker class.

Returns:

None

static build(target_worker, dask_home, worker, mode=0, verbose=0)

Function to build target code on a target Worker.

Parameters:
  • target_worker (str) – module to build

  • dask_home (str) – path to dask home

  • worker – current worker

  • mode – (Default value = 0)

  • verbose – (Default value = 0)

cython_decorators = ['njit']
dask_home = None
static do_works(workers_tree, workers_tree_info)

run of workers

Parameters:
  • chunk – distribution tree

  • chunks

Returns:

env = None
static exec(cmd, path, worker)

execute a command within a subprocess

Parameters:
  • cmd – the str of the command

  • path – the path where to lunch the command

  • worker

Returns:

exec_mono_process(workers_tree, workers_tree_info)

Execute tasks in a single process, respecting dependencies, but only for branches assigned to this worker via round-robin.

exec_multi_process(workers_tree, workers_tree_info)

Execute tasks in multiple threads, distributing branches to workers in round‑robin, then honoring dependencies per worker.

static expand(path, base_directory=None)

Expand a given path to an absolute path. :param path: The path to expand. :type path: str :param base_directory: The base directory to use for expanding the path. Defaults to None. :type base_directory: str, optional

Returns:

The expanded absolute path.

Return type:

str

Raises:

None

Note

This method handles both Unix and Windows paths and expands ‘~’ notation to the user’s home directory.

static expand_and_join(path1, path2)

Join two paths after expanding the first path.

Parameters:
  • path1 (str) – The first path to expand and join.

  • path2 (str) – The second path to join with the expanded first path.

Returns:

The joined path.

Return type:

str

get_logs_and_result(*args, verbosity=50, **kwargs)
get_work(work)[source]
Parameters:

work (str) – contain the worker function name called by AgiWorker.do_work

this is type string and not type function to avoid manager (e.g. My_code) to be dependant of MyCodeWorker :return:

static get_worker_info(worker_id)

def get_worker_info():

Parameters:

worker_id

Returns:

home_dir = None
is_managed_pc = False
static join(path1, path2)

Join two file paths.

Parameters:
  • path1 (str) – The first file path.

  • path2 (str) – The second file path.

Returns:

The combined file path.

Return type:

str

Raises:

None

logs = None
mode = None
static new(app, mode=None, env=None, verbose=0, worker_id=0, worker='localhost', args=None)

new worker instance :param module: instanciate and load target my_code_worker module :param target_worker: :param target_worker_class: :param target_package: :param mode: (Default value = mode) :param verbose: (Default value = 0) :param worker_id: (Default value = 0) :param worker: (Default value = ‘localhost’) :param args: (Default value = None)

Returns:

static onerror(func, path, exc_info)

Error handler for shutil.rmtree. If it’s a permission error, make it writable and retry. Otherwise re-raise.

static run(workers={'127.0.0.1': 1}, mode=0, env=None, verbose=None, args=None)
Parameters:
  • app

  • workers

  • mode

  • verbose

  • args

Returns:

share_path = None
start()[source]

Start the function.

This function prints the file name if the ‘verbose’ attribute is greater than 0.

Parameters:

self – The current instance of the class.

Returns:

None

stop()[source]

Stop the current action.

Raises:

NotImplementedError – This method needs to be implemented in a subclass.

t0 = None
topological_sort(dependency_graph)

Perform a topological sort on the dependency graph. Raises ValueError on cycles.

verbose = 1
worker = None
worker_id = None
works(workers_tree, workers_tree_info)

Run the worker tasks.

Alternative text