Skip to content

rs_workflows/payload_template.md

<< Back to index

This file defines the schema for the payload template. The following link has been used to create it https://cpm.pages.eopf.copernicus.eu/eopf-cpm/main/processor-orchestration-guide/triggering-usage.html The schema is based on Pydantic (standard for schema + validation + autocompletion).

AdfConfig

Bases: BasePayloadModel

Definition of an ADF configuration entry

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
285
286
287
288
289
290
class AdfConfig(BasePayloadModel):
    """Definition of an ADF configuration entry"""

    id: str
    path: str
    store_params: StoreParams | None = None

BasePayloadModel

Bases: BaseModel

Base class shared by all the schema models

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class BasePayloadModel(BaseModel):
    """Base class shared by all the schema models"""

    model_config = ConfigDict(
        # Allow using field names even when aliases are set
        populate_by_name=True,
        # Optional: disable validation errors
        validate_assignment=False,
        arbitrary_types_allowed=True,
        extra="allow",  # ignore unknown fields
    )

    @model_validator(mode="before")
    def fill_defaults(cls, values):  # pylint: disable=no-self-argument
        """Ensure defaults and nested models are properly initialized.
        Thus, when creating any model, the defaults should be written in the payload
        IF not provided and IF a default (except NONE) exists
        """
        if not isinstance(values, dict):
            return values
        # Pydantic v2/v3-safe
        # follow the official Pydantic v2 recommended access pattern
        # https://docs.pydantic.dev/latest/migration/#model-fields
        # https://docs.pydantic.dev/latest/migration/#validator-and-root_validator-are-deprecated
        # So normally this should work in Pydantic v3. But it seems it isn't working in v2
        # NOTE when Pydantic v3 is released, we should be able to use:
        # fields = type(cls).model_fields
        # and then iterate over fields.items() instead of cls.model_fields.items()
        for name, field in cls.model_fields.items():
            # Value missing → set default
            if name not in values:
                values[name] = field.get_default(call_default_factory=True)
                continue

            # Value is explicitly None → also replace with default
            if values[name] is None:
                values[name] = field.get_default(call_default_factory=True)

        return values

    @classmethod
    def _mask_secrets(cls, obj: Any) -> Any:
        if isinstance(obj, SecretStr):
            return "********"

        if isinstance(obj, dict):
            return {k: cls._mask_secrets(v) for k, v in obj.items()}

        if isinstance(obj, list):
            return [cls._mask_secrets(v) for v in obj]

        return obj

    @classmethod
    def _unwrap_secrets(cls, obj: Any) -> Any:
        if isinstance(obj, SecretStr):
            return obj.get_secret_value()

        if isinstance(obj, dict):
            return {k: cls._unwrap_secrets(v) for k, v in obj.items()}

        if isinstance(obj, list):
            return [cls._unwrap_secrets(v) for v in obj]

        return obj

    def dump(self, reveal_secrets: bool = False, **kwargs):
        """Custom dump that:
        - skips None fields by default.
        - skips all unset
        - use the alias for fields by default
        """
        data = self.model_dump(
            by_alias=True,
            exclude_none=True,
            exclude_unset=True,
            serialize_as_any=True,
            **kwargs,
        )
        if reveal_secrets:
            return self._unwrap_secrets(data)

        return self._mask_secrets(data)

dump(reveal_secrets=False, **kwargs)

Custom dump that: - skips None fields by default. - skips all unset - use the alias for fields by default

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def dump(self, reveal_secrets: bool = False, **kwargs):
    """Custom dump that:
    - skips None fields by default.
    - skips all unset
    - use the alias for fields by default
    """
    data = self.model_dump(
        by_alias=True,
        exclude_none=True,
        exclude_unset=True,
        serialize_as_any=True,
        **kwargs,
    )
    if reveal_secrets:
        return self._unwrap_secrets(data)

    return self._mask_secrets(data)

fill_defaults(values)

Ensure defaults and nested models are properly initialized. Thus, when creating any model, the defaults should be written in the payload IF not provided and IF a default (except NONE) exists

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@model_validator(mode="before")
def fill_defaults(cls, values):  # pylint: disable=no-self-argument
    """Ensure defaults and nested models are properly initialized.
    Thus, when creating any model, the defaults should be written in the payload
    IF not provided and IF a default (except NONE) exists
    """
    if not isinstance(values, dict):
        return values
    # Pydantic v2/v3-safe
    # follow the official Pydantic v2 recommended access pattern
    # https://docs.pydantic.dev/latest/migration/#model-fields
    # https://docs.pydantic.dev/latest/migration/#validator-and-root_validator-are-deprecated
    # So normally this should work in Pydantic v3. But it seems it isn't working in v2
    # NOTE when Pydantic v3 is released, we should be able to use:
    # fields = type(cls).model_fields
    # and then iterate over fields.items() instead of cls.model_fields.items()
    for name, field in cls.model_fields.items():
        # Value missing → set default
        if name not in values:
            values[name] = field.get_default(call_default_factory=True)
            continue

        # Value is explicitly None → also replace with default
        if values[name] is None:
            values[name] = field.get_default(call_default_factory=True)

    return values

Breakpoints

Bases: BasePayloadModel

Configuration for debugging breakpoints

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
234
235
236
237
238
239
240
class Breakpoints(BasePayloadModel):
    """Configuration for debugging breakpoints"""

    activate_all: bool | None = None
    folder: str | None = None
    store_params: StoreParams | None = None
    ids: list[str] | None = None

DaskContext

Bases: BasePayloadModel

Configuration for the DaskContext

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
301
302
303
304
305
306
307
308
309
class DaskContext(BasePayloadModel):
    """Configuration for the DaskContext"""

    cluster_type: str | None = "local"  # Optional but if not available "address" is mandatory
    address: str | None = None
    cluster_config: dict[str, str | int | bool] | None = DEFAULT_CLUSTER_CONFIG
    client_config: dict[str, str | int | bool] | None = {}
    dask_config: dict[str, str | int | bool] | None = DEFAULT_DASK_CONFIG
    performance_report_file: str | None = "report.html"

EOQCConfig

Bases: BasePayloadModel

Configuration for the EOQC processor

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
312
313
314
315
316
317
318
319
320
class EOQCConfig(BasePayloadModel):
    """Configuration for the EOQC processor"""

    config_folder: str | None = Field(default="default")
    parameters: dict[str, str | int | float | bool] | None = Field(default_factory=dict)
    update_attrs: bool | None = Field(default=True)
    report_path: str | None = None
    config_path: str | None = None
    additional_config_folders: list[str] | None = None

ExternalModule

Bases: BasePayloadModel

Definition of an external module to import dynamically

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
225
226
227
228
229
230
231
class ExternalModule(BasePayloadModel):
    """Definition of an external module to import dynamically"""

    name: str
    alias: str | None = None
    nested: bool | None = None
    folder: str | None = None

GeneralConfiguration

Bases: BasePayloadModel

General configuration options for EOConfiguration behavior

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
class GeneralConfiguration(BasePayloadModel):
    """General configuration options for EOConfiguration behavior"""

    logging: LoggingConfig | None = LoggingConfig(level="DEBUG")
    triggering__use_basic_logging: bool | None = True
    triggering__wait_before_exit: int | None = 10
    dask__export_graphs: str | None = None
    breakpoints__folder: str | None = None
    triggering__create_temporary: bool | None = None
    triggering__temporary_shared: bool | None = None
    triggering__validate_run: bool | None = None
    triggering__validate_mode: str | None = None
    triggering__error_policy: str | None = None
    temporary__folder: str | None = None
    temporary__folder_s3_secret: str | None = None
    temporary__folder_create_folder: bool | None = None
    triggering__dask_monitor__enabled: bool | None = None
    triggering__dask_monitor__cancel: bool | None = None
    triggering__dask_monitor__cancel_state: str | None = None

IOConfig

Bases: BasePayloadModel

Input/output configuration

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
293
294
295
296
297
298
class IOConfig(BasePayloadModel):
    """Input/output configuration"""

    input_products: list[InputProduct] = []
    output_products: list[OutputProduct] = []
    adfs: list[AdfConfig] = []

InputProduct

Bases: BasePayloadModel

Definition of an input product in the I/O configuration

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
263
264
265
266
267
268
269
270
class InputProduct(BasePayloadModel):
    """Definition of an input product in the I/O configuration"""

    id: str
    path: str
    type: str | None = Field(default="filename")
    store_type: str
    store_params: StoreParams | None = None

LoggingConfig

Bases: BasePayloadModel

Logging configuration used in the general_configuration section

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
195
196
197
198
class LoggingConfig(BasePayloadModel):
    """Logging configuration used in the general_configuration section"""

    level: str | None = Field(default="INFO", description="Logging level")

OutputProduct

Bases: BasePayloadModel

Definition of an output product in the I/O configuration

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
273
274
275
276
277
278
279
280
281
282
class OutputProduct(BasePayloadModel):
    """Definition of an output product in the I/O configuration"""

    id: str
    path: str
    store_type: str
    store_params: StoreParams | None = None
    type: str | None = Field(default="filename")
    opening_mode: str | None = Field(default="CREATE")
    apply_eoqc: bool | None = Field(default=False)

PayloadSchema

Bases: BasePayloadModel

Root payload schema containing all configuration sections

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
326
327
328
329
330
331
332
333
334
335
336
337
338
class PayloadSchema(BasePayloadModel):
    """Root payload schema containing all configuration sections"""

    dotenv: list[str] | None = None
    general_configuration: GeneralConfiguration | None = None
    external_modules: list[ExternalModule] | None = None
    breakpoints: Breakpoints | None = None
    workflow: list[WorkflowStep] | None = None
    io: IOConfig | None = Field(None, alias="I/O")
    dask_context: DaskContext | None = None
    logging: str | None = None
    config: list[str] | None = None
    eoqc: EOQCConfig | None = None

StorageOptions

Bases: BasePayloadModel

Options to access a storage backend

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
136
137
138
139
140
141
142
143
144
class StorageOptions(BasePayloadModel):
    """Options to access a storage backend"""

    # The field name is excluded to avoid including it in the payload
    # Otherwise, the processor yelds an error when trying to parse the store_params
    name: str = Field(exclude=True)
    key: SecretStr
    secret: SecretStr
    client_kwargs: dict[str, SecretStr]

StoragePath

Bases: BasePayloadModel

Wrapper for a list of storage options

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
147
148
149
150
151
152
153
class StoragePath(BasePayloadModel):
    """Wrapper for a list of storage options"""

    # TODO: check if we need to exclude name here as well as for StorageOptions
    name: str = Field(exclude=True)
    opening_mode: str | None = Field(default="CREATE_OVERWRITE")
    relative_path: str

StoreParams

Bases: BasePayloadModel

Flexible store_params representation for payloads

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class StoreParams(BasePayloadModel):
    """Flexible store_params representation for payloads"""

    # Either a simple S3 secret alias
    s3_secret_alias: str | None = None
    # Or a storage options used for s3
    storage_options: StorageOptions | None = None
    # Or a disk path
    storage_path: StoragePath | None = None
    # Or a regex + multiplicity
    regex: str | None = None
    multiplicity: str | int | None = None

    @field_validator("multiplicity")
    @classmethod
    def validate_multiplicity(cls, v):
        """Validation of multiplicity field"""
        if v is None:
            return v
        if isinstance(v, str) and v not in {"exactly_one", "at_least_one", "more_than_one"}:
            raise ValueError('multiplicity must be "exactly_one", "at_least_one", "more_than_one" or an integer')
        if not isinstance(v, (str, int)):
            raise ValueError("multiplicity must be a string or an integer")
        return v

validate_multiplicity(v) classmethod

Validation of multiplicity field

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
169
170
171
172
173
174
175
176
177
178
179
@field_validator("multiplicity")
@classmethod
def validate_multiplicity(cls, v):
    """Validation of multiplicity field"""
    if v is None:
        return v
    if isinstance(v, str) and v not in {"exactly_one", "at_least_one", "more_than_one"}:
        raise ValueError('multiplicity must be "exactly_one", "at_least_one", "more_than_one" or an integer')
    if not isinstance(v, (str, int)):
        raise ValueError("multiplicity must be a string or an integer")
    return v

WorkflowStep

Bases: BasePayloadModel

Definition of a workflow step (processing unit)

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
class WorkflowStep(BasePayloadModel):
    """Definition of a workflow step (processing unit)"""

    name: str
    active: bool | None = True
    validate_output: bool | None = Field(default=None, alias="validate")
    step: int | None = None
    module: str | None = None
    processing_unit: str | None = None
    inputs: dict[str, str] | None = None
    outputs: dict[str, str] | None = None
    adfs: dict[str, str] | None = None
    parameters: None | (
        dict[
            str,
            (str | int | float | bool | list[int] | list[str]),
        ]
    ) = None