Skip to content

rs_workflows/payload_template.md

<< Back to index

This file defines the schema for the payload template. The following link has been used to create it https://cpm.pages.eopf.copernicus.eu/eopf-cpm/main/processor-orchestration-guide/triggering-usage.html The schema is based on Pydantic (standard for schema + validation + autocompletion).

AdfConfig

Bases: BasePayloadModel

Definition of an ADF configuration entry

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
242
243
244
245
246
247
class AdfConfig(BasePayloadModel):
    """Definition of an ADF configuration entry"""

    id: str
    path: str
    store_params: StoreParams | None = None

BasePayloadModel

Bases: BaseModel

Base class shared by all the schema models

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class BasePayloadModel(BaseModel):
    """Base class shared by all the schema models"""

    model_config = ConfigDict(
        # Allow using field names even when aliases are set
        populate_by_name=True,
        # Optional: disable validation errors
        validate_assignment=False,
        arbitrary_types_allowed=True,
        extra="allow",  # ignore unknown fields
    )

    @model_validator(mode="before")
    def fill_defaults(cls, values):  # pylint: disable=no-self-argument
        """Ensure defaults and nested models are properly initialized.
        Thus, when creating any model, the defaults should be written in the payload
        IF not provided and IF a default (except NONE) exists
        """
        if not isinstance(values, dict):
            return values
        # Pydantic v2/v3-safe
        # follow the official Pydantic v2 recommended access pattern
        # https://docs.pydantic.dev/latest/migration/#model-fields
        # https://docs.pydantic.dev/latest/migration/#validator-and-root_validator-are-deprecated
        # So normally this should work in Pydantic v3. But it seems it isn't working in v2
        # NOTE when Pydantic v3 is released, we should be able to use:
        # fields = type(cls).model_fields
        # and then iterate over fields.items() instead of cls.model_fields.items()
        for name, field in cls.model_fields.items():
            # Value missing → set default
            if name not in values:
                values[name] = field.get_default(call_default_factory=True)
                continue

            # Value is explicitly None → also replace with default
            if values[name] is None:
                values[name] = field.get_default(call_default_factory=True)

        return values

    def dump(self, **kwargs):
        """Custom dump that:
        - skips None fields by default.
        - skips all unset
        - use the alias for fields by default
        """
        return self.model_dump(by_alias=True, exclude_none=True, exclude_unset=True, **kwargs)

dump(**kwargs)

Custom dump that: - skips None fields by default. - skips all unset - use the alias for fields by default

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
87
88
89
90
91
92
93
def dump(self, **kwargs):
    """Custom dump that:
    - skips None fields by default.
    - skips all unset
    - use the alias for fields by default
    """
    return self.model_dump(by_alias=True, exclude_none=True, exclude_unset=True, **kwargs)

fill_defaults(values)

Ensure defaults and nested models are properly initialized. Thus, when creating any model, the defaults should be written in the payload IF not provided and IF a default (except NONE) exists

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@model_validator(mode="before")
def fill_defaults(cls, values):  # pylint: disable=no-self-argument
    """Ensure defaults and nested models are properly initialized.
    Thus, when creating any model, the defaults should be written in the payload
    IF not provided and IF a default (except NONE) exists
    """
    if not isinstance(values, dict):
        return values
    # Pydantic v2/v3-safe
    # follow the official Pydantic v2 recommended access pattern
    # https://docs.pydantic.dev/latest/migration/#model-fields
    # https://docs.pydantic.dev/latest/migration/#validator-and-root_validator-are-deprecated
    # So normally this should work in Pydantic v3. But it seems it isn't working in v2
    # NOTE when Pydantic v3 is released, we should be able to use:
    # fields = type(cls).model_fields
    # and then iterate over fields.items() instead of cls.model_fields.items()
    for name, field in cls.model_fields.items():
        # Value missing → set default
        if name not in values:
            values[name] = field.get_default(call_default_factory=True)
            continue

        # Value is explicitly None → also replace with default
        if values[name] is None:
            values[name] = field.get_default(call_default_factory=True)

    return values

Breakpoints

Bases: BasePayloadModel

Configuration for debugging breakpoints

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
191
192
193
194
195
196
197
class Breakpoints(BasePayloadModel):
    """Configuration for debugging breakpoints"""

    activate_all: bool | None = None
    folder: str | None = None
    store_params: StoreParams | None = None
    ids: list[str] | None = None

DaskContext

Bases: BasePayloadModel

Configuration for the DaskContext

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
258
259
260
261
262
263
264
265
266
class DaskContext(BasePayloadModel):
    """Configuration for the DaskContext"""

    cluster_type: str | None = "local"  # Optional but if not available "address" is mandatory
    address: str | None = None
    cluster_config: dict[str, str | int | bool] | None = DEFAULT_CLUSTER_CONFIG
    client_config: dict[str, str | int | bool] | None = {}
    dask_config: dict[str, str | int | bool] | None = DEFAULT_DASK_CONFIG
    performance_report_file: str | None = "report.html"

EOQCConfig

Bases: BasePayloadModel

Configuration for the EOQC processor

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
269
270
271
272
273
274
275
276
277
class EOQCConfig(BasePayloadModel):
    """Configuration for the EOQC processor"""

    config_folder: str | None = Field(default="default")
    parameters: dict[str, str | int | float | bool] | None = Field(default_factory=dict)
    update_attrs: bool | None = Field(default=True)
    report_path: str | None = None
    config_path: str | None = None
    additional_config_folders: list[str] | None = None

ExternalModule

Bases: BasePayloadModel

Definition of an external module to import dynamically

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
182
183
184
185
186
187
188
class ExternalModule(BasePayloadModel):
    """Definition of an external module to import dynamically"""

    name: str
    alias: str | None = None
    nested: bool | None = None
    folder: str | None = None

GeneralConfiguration

Bases: BasePayloadModel

General configuration options for EOConfiguration behavior

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class GeneralConfiguration(BasePayloadModel):
    """General configuration options for EOConfiguration behavior"""

    logging: LoggingConfig | None = LoggingConfig(level="DEBUG")
    triggering__use_basic_logging: bool | None = True
    triggering__wait_before_exit: int | None = 10
    dask__export_graphs: str | None = None
    breakpoints__folder: str | None = None
    triggering__create_temporary: bool | None = None
    triggering__temporary_shared: bool | None = None
    triggering__validate_run: bool | None = None
    triggering__validate_mode: str | None = None
    triggering__error_policy: str | None = None
    temporary__folder: str | None = None
    temporary__folder_s3_secret: str | None = None
    temporary__folder_create_folder: bool | None = None
    triggering__dask_monitor__enabled: bool | None = None
    triggering__dask_monitor__cancel: bool | None = None
    triggering__dask_monitor__cancel_state: str | None = None

IOConfig

Bases: BasePayloadModel

Input/output configuration

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
250
251
252
253
254
255
class IOConfig(BasePayloadModel):
    """Input/output configuration"""

    input_products: list[InputProduct] = []
    output_products: list[OutputProduct] = []
    adfs: list[AdfConfig] = []

InputProduct

Bases: BasePayloadModel

Definition of an input product in the I/O configuration

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
220
221
222
223
224
225
226
227
class InputProduct(BasePayloadModel):
    """Definition of an input product in the I/O configuration"""

    id: str
    path: str
    type: str | None = Field(default="filename")
    store_type: str
    store_params: StoreParams | None = None

LoggingConfig

Bases: BasePayloadModel

Logging configuration used in the general_configuration section

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
152
153
154
155
class LoggingConfig(BasePayloadModel):
    """Logging configuration used in the general_configuration section"""

    level: str | None = Field(default="INFO", description="Logging level")

OutputProduct

Bases: BasePayloadModel

Definition of an output product in the I/O configuration

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
230
231
232
233
234
235
236
237
238
239
class OutputProduct(BasePayloadModel):
    """Definition of an output product in the I/O configuration"""

    id: str
    path: str
    store_type: str
    store_params: StoreParams | None = None
    type: str | None = Field(default="filename")
    opening_mode: str | None = Field(default="CREATE")
    apply_eoqc: bool | None = Field(default=False)

PayloadSchema

Bases: BasePayloadModel

Root payload schema containing all configuration sections

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
283
284
285
286
287
288
289
290
291
292
293
294
295
class PayloadSchema(BasePayloadModel):
    """Root payload schema containing all configuration sections"""

    dotenv: list[str] | None = None
    general_configuration: GeneralConfiguration | None = None
    external_modules: list[ExternalModule] | None = None
    breakpoints: Breakpoints | None = None
    workflow: list[WorkflowStep] | None = None
    io: IOConfig | None = Field(None, alias="I/O")
    dask_context: DaskContext | None = None
    logging: list[str] | None = None
    config: list[str] | None = None
    eoqc: EOQCConfig | None = None

StorageOptions

Bases: BasePayloadModel

Options to access a storage backend

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
 99
100
101
102
103
104
105
class StorageOptions(BasePayloadModel):
    """Options to access a storage backend"""

    name: str
    key: str | None = None
    secret: str | None = None
    client_kwargs: dict[str, str] | None = None

StoreParams

Bases: BasePayloadModel

Flexible store_params representation for payloads

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class StoreParams(BasePayloadModel):
    """Flexible store_params representation for payloads"""

    # Either a simple S3 secret alias
    s3_secret_alias: str | None = None
    # Or a list of storage options
    # options: list[StoreOptionsWrapper] | None = None
    storage_options: list[StorageOptions] | None = None
    # Or a regex + multiplicity
    regex: str | None = None
    multiplicity: str | int | None = None

    @field_validator("multiplicity")
    @classmethod
    def validate_multiplicity(cls, v):
        """Validation of multiplicity field"""
        if v is None:
            return v
        if isinstance(v, str) and v not in {"exactly_one", "at_least_one", "more_than_one"}:
            raise ValueError('multiplicity must be "exactly_one", "at_least_one", "more_than_one" or an integer')
        if not isinstance(v, (str, int)):
            raise ValueError("multiplicity must be a string or an integer")
        return v

validate_multiplicity(v) classmethod

Validation of multiplicity field

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
126
127
128
129
130
131
132
133
134
135
136
@field_validator("multiplicity")
@classmethod
def validate_multiplicity(cls, v):
    """Validation of multiplicity field"""
    if v is None:
        return v
    if isinstance(v, str) and v not in {"exactly_one", "at_least_one", "more_than_one"}:
        raise ValueError('multiplicity must be "exactly_one", "at_least_one", "more_than_one" or an integer')
    if not isinstance(v, (str, int)):
        raise ValueError("multiplicity must be a string or an integer")
    return v

WorkflowStep

Bases: BasePayloadModel

Definition of a workflow step (processing unit)

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
class WorkflowStep(BasePayloadModel):
    """Definition of a workflow step (processing unit)"""

    name: str
    active: bool | None = True
    validate_output: bool | None = Field(default=None, alias="validate")
    step: int | None = None
    module: str | None = None
    processing_unit: str | None = None
    inputs: list[dict[str, str]] | None = None
    outputs: list[dict[str, str]] | None = None
    adfs: list[dict[str, str]] | None = None
    parameters: None | (
        dict[
            str,
            (str | int | float | bool | list[int] | list[str]),
        ]
    ) = None