Skip to content

rs_workflows/payload_template.md

<< Back to index

This file defines the schema for the payload template. The following link has been used to create it https://cpm.pages.eopf.copernicus.eu/eopf-cpm/main/processor-orchestration-guide/triggering-usage.html The schema is based on Pydantic (standard for schema + validation + autocompletion).

AdfConfig

Bases: BasePayloadModel

Definition of an ADF configuration entry

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
270
271
272
273
274
275
class AdfConfig(BasePayloadModel):
    """Definition of an ADF configuration entry"""

    id: str
    path: str | SecretStr
    store_params: StoreParams | None = None

BasePayloadModel

Bases: BaseModel

Base class shared by all the schema models

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
class BasePayloadModel(BaseModel):
    """Base class shared by all the schema models"""

    model_config = ConfigDict(
        # Allow using field names even when aliases are set
        populate_by_name=True,
        # Optional: disable validation errors
        validate_assignment=False,
        arbitrary_types_allowed=True,
        extra="allow",  # ignore unknown fields
    )

    @model_validator(mode="before")
    def fill_defaults(cls, values):  # pylint: disable=no-self-argument
        """Ensure defaults and nested models are properly initialized.
        Thus, when creating any model, the defaults should be written in the payload
        IF not provided and IF a default (except NONE) exists
        """
        if not isinstance(values, dict):
            return values
        # Pydantic v2/v3-safe
        # follow the official Pydantic v2 recommended access pattern
        # https://docs.pydantic.dev/latest/migration/#model-fields
        # https://docs.pydantic.dev/latest/migration/#validator-and-root_validator-are-deprecated
        # So normally this should work in Pydantic v3. But it seems it isn't working in v2
        # NOTE when Pydantic v3 is released, we should be able to use:
        # fields = type(cls).model_fields
        # and then iterate over fields.items() instead of cls.model_fields.items()
        for name, field in cls.model_fields.items():
            # Value missing → set default
            if name not in values:
                values[name] = field.get_default(call_default_factory=True)
                continue

            # Value is explicitly None → also replace with default
            if values[name] is None:
                values[name] = field.get_default(call_default_factory=True)

        return values

    @classmethod
    def _mask_secrets(cls, obj: Any) -> Any:
        if isinstance(obj, SecretStr):
            return "********"

        if isinstance(obj, dict):
            return {k: cls._mask_secrets(v) for k, v in obj.items()}

        if isinstance(obj, list):
            return [cls._mask_secrets(v) for v in obj]

        return obj

    @classmethod
    def _unwrap_secrets(cls, obj: Any) -> Any:
        if isinstance(obj, SecretStr):
            return obj.get_secret_value()

        if isinstance(obj, dict):
            return {k: cls._unwrap_secrets(v) for k, v in obj.items()}

        if isinstance(obj, list):
            return [cls._unwrap_secrets(v) for v in obj]

        return obj

    def dump(self, reveal_secrets: bool = False, **kwargs):
        """Custom dump that:
        - skips None fields by default.
        - skips all unset
        - use the alias for fields by default
        """
        data = self.model_dump(
            by_alias=True,
            exclude_none=True,
            exclude_unset=True,
            serialize_as_any=True,
            **kwargs,
        )
        if reveal_secrets:
            return self._unwrap_secrets(data)

        return self._mask_secrets(data)

dump(reveal_secrets=False, **kwargs)

Custom dump that: - skips None fields by default. - skips all unset - use the alias for fields by default

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def dump(self, reveal_secrets: bool = False, **kwargs):
    """Custom dump that:
    - skips None fields by default.
    - skips all unset
    - use the alias for fields by default
    """
    data = self.model_dump(
        by_alias=True,
        exclude_none=True,
        exclude_unset=True,
        serialize_as_any=True,
        **kwargs,
    )
    if reveal_secrets:
        return self._unwrap_secrets(data)

    return self._mask_secrets(data)

fill_defaults(values)

Ensure defaults and nested models are properly initialized. Thus, when creating any model, the defaults should be written in the payload IF not provided and IF a default (except NONE) exists

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@model_validator(mode="before")
def fill_defaults(cls, values):  # pylint: disable=no-self-argument
    """Ensure defaults and nested models are properly initialized.
    Thus, when creating any model, the defaults should be written in the payload
    IF not provided and IF a default (except NONE) exists
    """
    if not isinstance(values, dict):
        return values
    # Pydantic v2/v3-safe
    # follow the official Pydantic v2 recommended access pattern
    # https://docs.pydantic.dev/latest/migration/#model-fields
    # https://docs.pydantic.dev/latest/migration/#validator-and-root_validator-are-deprecated
    # So normally this should work in Pydantic v3. But it seems it isn't working in v2
    # NOTE when Pydantic v3 is released, we should be able to use:
    # fields = type(cls).model_fields
    # and then iterate over fields.items() instead of cls.model_fields.items()
    for name, field in cls.model_fields.items():
        # Value missing → set default
        if name not in values:
            values[name] = field.get_default(call_default_factory=True)
            continue

        # Value is explicitly None → also replace with default
        if values[name] is None:
            values[name] = field.get_default(call_default_factory=True)

    return values

Breakpoints

Bases: BasePayloadModel

Configuration for debugging breakpoints

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
211
212
213
214
215
216
217
class Breakpoints(BasePayloadModel):
    """Configuration for debugging breakpoints"""

    activate_all: bool | None = None
    folder: str | None = None
    store_params: StoreParams | None = None
    ids: list[str] | None = None

DaskContext

Bases: BasePayloadModel

Configuration for the DaskContext

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
286
287
288
289
290
291
292
293
294
class DaskContext(BasePayloadModel):
    """Configuration for the DaskContext"""

    cluster_type: str | None = "local"  # Optional but if not available "address" is mandatory
    address: str | None = None
    cluster_config: dict[str, str | int | bool] | None = DEFAULT_CLUSTER_CONFIG
    client_config: dict[str, str | int | bool] | None = {}
    dask_config: dict[str, str | int | bool] | None = DEFAULT_DASK_CONFIG
    performance_report_file: str | None = "report.html"

EOQCConfig

Bases: BasePayloadModel

Configuration for the EOQC processor

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
297
298
299
300
301
302
303
304
305
class EOQCConfig(BasePayloadModel):
    """Configuration for the EOQC processor"""

    config_folder: str | None = Field(default="default")
    parameters: dict[str, str | int | float | bool] | None = Field(default_factory=dict)
    update_attrs: bool | None = Field(default=True)
    report_path: str | None = None
    config_path: str | None = None
    additional_config_folders: list[str] | None = None

ExternalModule

Bases: BasePayloadModel

Definition of an external module to import dynamically

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
202
203
204
205
206
207
208
class ExternalModule(BasePayloadModel):
    """Definition of an external module to import dynamically"""

    name: str
    alias: str | None = None
    nested: bool | None = None
    folder: str | None = None

GeneralConfiguration

Bases: BasePayloadModel

General configuration options for EOConfiguration behavior

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class GeneralConfiguration(BasePayloadModel):
    """General configuration options for EOConfiguration behavior"""

    logging: LoggingConfig | None = LoggingConfig(level="DEBUG")
    triggering__use_basic_logging: bool | None = True
    triggering__wait_before_exit: int | None = 10
    dask__export_graphs: str | None = None
    dask_utils__timeout: int | None = None
    breakpoints__folder: str | None = None
    triggering__create_temporary: bool | None = None
    triggering__temporary_shared: bool | None = None
    triggering__validate_run: bool | None = None
    triggering__validate_mode: str | None = None
    triggering__error_policy: str | None = None
    temporary__folder: str | None = None
    temporary__folder_s3_secret: str | None = None
    temporary__folder_create_folder: bool | None = None
    triggering__dask_monitor__enabled: bool | None = None
    triggering__dask_monitor__cancel: bool | None = None
    triggering__dask_monitor__cancel_state: str | None = None

IOConfig

Bases: BasePayloadModel

Input/output configuration

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
278
279
280
281
282
283
class IOConfig(BasePayloadModel):
    """Input/output configuration"""

    input_products: list[InputProduct] = []
    output_products: list[OutputProduct] = []
    adfs: list[AdfConfig] = []

InputProduct

Bases: BasePayloadModel

Definition of an input product in the I/O configuration

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
240
241
242
243
244
245
246
247
248
class InputProduct(BasePayloadModel):
    """Definition of an input product in the I/O configuration"""

    id: str
    path: str
    type: str | None = Field(default="filename")
    store_type: str
    store_params: StoreParams | None = None
    opening_mode: str | None = Field(default=None)

LoggingConfig

Bases: BasePayloadModel

Logging configuration used in the general_configuration section

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
171
172
173
174
class LoggingConfig(BasePayloadModel):
    """Logging configuration used in the general_configuration section"""

    level: str | None = Field(default="INFO", description="Logging level")

OutputProduct

Bases: BasePayloadModel

Definition of an output product in the I/O configuration

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
class OutputProduct(BasePayloadModel):
    """Definition of an output product in the I/O configuration"""

    id: str
    path: str
    store_type: str
    store_params: StoreParams | None = None
    type: str | None = Field(default="filename")
    opening_mode: str | None = Field(default="CREATE")
    apply_eoqc: bool | None = Field(default=False)
    autoclean: bool | None = Field(default=False, exclude=True)
    # Excluded from serialization by default
    # This is because the "final product" concept exists only in the tasktable and
    # is is not recognized by the processor. The processor simply fails if it finds an unknown
    # field in the payload. So we need to exclude this field from the payload, but we want to
    # keep it in the model for internal use in the run_processor task.
    final_product: bool | None = Field(default=True, exclude=True)

PayloadSchema

Bases: BasePayloadModel

Root payload schema containing all configuration sections

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
class PayloadSchema(BasePayloadModel):
    """Root payload schema containing all configuration sections"""

    dotenv: list[str] | None = None
    general_configuration: GeneralConfiguration | None = None
    external_modules: list[ExternalModule] | None = None
    breakpoints: Breakpoints | None = None
    workflow: list[WorkflowStep] | None = None
    io: IOConfig | None = Field(None, alias="I/O")
    dask_context: DaskContext | None = None
    logging: str | None = None
    config: list[str] | None = None
    secret: list[str] | None = None
    eoqc: EOQCConfig | None = None

StorageOptions

Bases: BasePayloadModel

Options to access a storage backend

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
136
137
138
139
140
141
142
143
144
class StorageOptions(BasePayloadModel):
    """Options to access a storage backend"""

    # The field name is excluded to avoid including it in the payload
    # Otherwise, the processor yelds an error when trying to parse the store_params
    name: str = Field(exclude=True)
    key: SecretStr
    secret: SecretStr
    client_kwargs: dict[str, SecretStr]

StoreParams

Bases: BasePayloadModel

Flexible store_params representation for payloads

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class StoreParams(BasePayloadModel):
    """Flexible store_params representation for payloads"""

    # Either a simple S3 secret alias
    s3_secret_alias: str | None = None
    # Or a storage options used for s3
    storage_options: StorageOptions | None = None
    # Or a regex + multiplicity
    regex: str | None = None
    multiplicity: str | int | None = None

    @field_validator("multiplicity")
    @classmethod
    def validate_multiplicity(cls, v):
        """Validation of multiplicity field"""
        if v is None:
            return v
        if isinstance(v, str) and v not in {"exactly_one", "at_least_one", "more_than_one"}:
            raise ValueError('multiplicity must be "exactly_one", "at_least_one", "more_than_one" or an integer')
        if not isinstance(v, (str, int)):
            raise ValueError("multiplicity must be a string or an integer")
        return v

validate_multiplicity(v) classmethod

Validation of multiplicity field

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
158
159
160
161
162
163
164
165
166
167
168
@field_validator("multiplicity")
@classmethod
def validate_multiplicity(cls, v):
    """Validation of multiplicity field"""
    if v is None:
        return v
    if isinstance(v, str) and v not in {"exactly_one", "at_least_one", "more_than_one"}:
        raise ValueError('multiplicity must be "exactly_one", "at_least_one", "more_than_one" or an integer')
    if not isinstance(v, (str, int)):
        raise ValueError("multiplicity must be a string or an integer")
    return v

WorkflowStep

Bases: BasePayloadModel

Definition of a workflow step (processing unit)

Source code in docs/rs-client-libraries/rs_workflows/payload_template.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
class WorkflowStep(BasePayloadModel):
    """Definition of a workflow step (processing unit)"""

    name: str
    active: bool | None = True
    validate_output: bool | None = Field(default=None, alias="validate")
    step: int | None = None
    module: str | None = None
    processing_unit: str | None = None
    inputs: dict[str, str] | None = None
    outputs: dict[str, str] | None = None
    adfs: dict[str, str] | None = None
    parameters: None | (
        dict[
            str,
            (str | int | float | bool | list[int] | list[str]),
        ]
    ) = None