Skip to content

rs_workflows/flow_utils.md

<< Back to index

Utility module for the Prefect flows.

AdfProcessIn

Bases: BaseModel

Input parameters for executing the 'adf_conversion' flow.

This model defines all the configuration needed to run an ADF conversion script.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
class AdfProcessIn(BaseModel):
    """
    Input parameters for executing the 'adf_conversion' flow.

    This model defines all the configuration needed to run an ADF conversion script.
    """

    env: FlowEnvArgs = Field(
        title="Flow Environment",
        description="Environment configuration for Prefect flow. Includes identifiers like owner_id.",
    )
    adf_type: str | AdfType = Field(
        title="ADF Type",
        description="Name of the ADF type to generate. Can be a string or AdfType enum.",
    )
    auxiliary_product_to_collection_identifier: list[AuxiliaryProductMapping] = Field(
        title="Auxiliary Product Mapping",
        description=(
            "Mapping of auxiliary product types to collections. "
            "Use '*' as a wildcard to map all other auxiliary products."
        ),
        min_length=1,
    )
    start_datetime: datetime | None = Field(
        default=None,
        title="Start Datetime",
        description="Start datetime for retrieving auxiliary data. ISO format.",
    )
    end_datetime: datetime | None = Field(
        default=None,
        title="End Datetime",
        description="End datetime for retrieving auxiliary data. ISO format.",
    )
    satellite: str | SentinelSatellite | None = Field(
        default=None,
        title="Satellite",
        description="Satellite identifier used in certain queries. Can be a string or SentinelSatellite enum.",
    )
    cql2_filter: dict | None = Field(
        default=None,
        title="CQL2 Filter",
        description="CQL2 filter for retrieving auxiliary data."
        "If provided, start_datetime, end_datetime, and satellite will be ignored for auxiliary data retrieval.",
    )

    @field_validator("adf_type", mode="before")
    @classmethod
    def normalize_adf_type(cls, v):
        """Normalize ADF type to string."""
        return v.value if hasattr(v, "value") else v

    @field_validator("satellite", mode="before")
    @classmethod
    def normalize_satellite_name(cls, v):
        """Normalize satellite name to string."""
        return v.value if hasattr(v, "value") else v

normalize_adf_type(v) classmethod

Normalize ADF type to string.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
509
510
511
512
513
@field_validator("adf_type", mode="before")
@classmethod
def normalize_adf_type(cls, v):
    """Normalize ADF type to string."""
    return v.value if hasattr(v, "value") else v

normalize_satellite_name(v) classmethod

Normalize satellite name to string.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
515
516
517
518
519
@field_validator("satellite", mode="before")
@classmethod
def normalize_satellite_name(cls, v):
    """Normalize satellite name to string."""
    return v.value if hasattr(v, "value") else v

AdfType

Bases: str, Enum

ADF type

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class AdfType(str, Enum):
    """ADF type"""

    S00__ADF_ECMWA = "S00__ADF_ECMWA"
    S00__ADF_ECMWF = "S00__ADF_ECMWF"
    S00__ADF_GETAS = "S00__ADF_GETAS"
    S00__ADF_WATER = "S00__ADF_WATER"
    S03_ADF_OLCAL = "S03_ADF_OLCAL"
    S03_ADF_OLEOP = "S03_ADF_OLEOP"
    S03_ADF_OLINS = "S03_ADF_OLINS"
    S03_ADF_OLLUT = "S03_ADF_OLLUT"
    S03_ADF_OLPRG = "S03_ADF_OLPRG"
    S03_ADF_OLRAC = "S03_ADF_OLRAC"
    S03_ADF_OLSPC = "S03_ADF_OLSPC"

AuxiliaryProductMapping

Bases: BaseModel

Represents mapping for auxiliary products.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
class AuxiliaryProductMapping(BaseModel):
    """Represents mapping for auxiliary products."""

    product_type: str = Field(description="Product type or '*' wildcard.")
    collection_name: str = Field(description="Collection name.")
    source: AuxiliarySource = Field(
        default=AuxiliarySource.AUXIP,
        description="STAC source where auxiliary products are searched.",
    )
    selected_assets: list[str] | None = Field(
        default=None,
        description="Optional asset keys to stage.",
    )

    def items(self):
        """Helper method to return the model fields as items, useful for logging."""
        return self.model_dump().items()

items()

Helper method to return the model fields as items, useful for logging.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
274
275
276
def items(self):
    """Helper method to return the model fields as items, useful for logging."""
    return self.model_dump().items()

AuxiliarySource

Bases: str, Enum

STAC source for auxiliary product search.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
118
119
120
121
122
123
124
125
126
127
class AuxiliarySource(str, Enum):
    """STAC source for auxiliary product search."""

    LTA = "lta"
    PRIP = "prip"
    CADIP = "cadip"
    AUXIP = "auxip"
    CDSE = "cdse"
    CATALOG = "catalog"
    EARTHDATAHUB = "earthdatahub"

ConversionIn

Bases: BaseModel

Input parameters for executing the 'on_demand_conversion' flow.

This model defines all the configuration needed to run an on-demand conversion flow, including input datasets, generated outputs, and scheduling parameters.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
class ConversionIn(BaseModel):
    """
    Input parameters for executing the 'on_demand_conversion' flow.

    This model defines all the configuration needed to run an on-demand conversion flow,
    including input datasets, generated outputs, and scheduling parameters.
    """

    env: FlowEnvArgs = Field(
        title="Flow Environment",
        description="Environment configuration for Prefect flow. Includes identifiers like owner_id.",
    )
    stac_input: str | dict = Field(
        title="STAC Input Product",
        description=("Input product for the conversion. Specifies the STAC item or href."),
    )
    generated_product_to_collection_identifier: FlowGeneratedProduct = Field(
        title="Generated Product",
        description=(
            "Generated product. Specifies a name, the product type, and the collection where the output will be stored."
        ),
    )
    owner_id: str = Field(
        title="Owner ID",
        description="User/owner ID necessary to retrieve the user info from the right Prefect block.",
    )
    dask_cluster_label: str = Field(
        title="Dask Cluster Label",
        description="Label of the Dask cluster to use for SAFE conversion.",
    )
    dask_cluster_instance: str | None = Field(
        default=None,
        title="Dask Cluster Instance",
        description="Optional Dask cluster instance ID used by the DPR conversion service.",
    )

    selected_assets: list[str] | None = Field(
        default=None,
        title="Selected Assets",
        description=("Set of selected asset keys to stage. If not provided, all assets will be converted"),
    )

DprProcessIn

Bases: BaseModel

Input parameters for executing the 'dpr-process' flow.

This model defines all the configuration needed to run a DPR processor, including input datasets, generated outputs, auxiliary data mapping, processing modes, and scheduling parameters.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
class DprProcessIn(BaseModel):
    """
    Input parameters for executing the 'dpr-process' flow.

    This model defines all the configuration needed to run a DPR processor,
    including input datasets, generated outputs, auxiliary data mapping,
    processing modes, and scheduling parameters.
    """

    env: FlowEnvArgs = Field(
        title="Flow Environment",
        description="Environment configuration for Prefect flow. Includes identifiers like owner_id.",
    )

    processor_name: str | DprProcessor = Field(
        title="DPR Processor Name",
        description="Name of the DPR processor to run. Can be a string or DprProcessor enum.",
    )

    processor_version: str = Field(
        title="Processor Version",
        description="Version of the processor. If not relevant, can be empty string.",
    )

    dask_cluster_label: str = Field(
        title="Dask Cluster Label",
        description='Label of the Dask cluster to use, e.g. "dask-l0" for local testing.',
    )

    dask_cluster_instance: str | None = Field(
        default=None,
        title="Dask Cluster Instance",
        description="Optional Dask cluster instance ID used to build a direct dashboard URL.",
    )

    dask_task_timeout: int | None = Field(
        default=None,
        title="Dask task timeout",
        description="Default timeout on a submitted task",
    )

    logging_level: LoggingLevel = Field(
        default=LoggingLevel.INFO,
        title="Overall EOPF logging level",
        description="Overall EOPF logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
    )

    temporary_shared: bool = Field(
        default=False,
        title="Temporary folder shared",
        description="Whether the temporary folder is reachable from the workers",
    )

    temporary_folder: str | None = Field(
        default=None,
        title="Temporary folder",
        description="Temporary folder path",
    )

    s3_payload_file: str = Field(
        title="S3 Payload File",
        description="S3 path where the processor payload (JSON) will be written for execution.",
    )

    pipeline: str | DprPipeline | None = Field(
        default=None,
        title="Pipeline Name",
        description="Name of the processing pipeline. Must be provided if `unit` is not set.",
    )

    unit: str | None = Field(
        default=None,
        title="Unit Name",
        description="Processing unit name. Must be provided if `pipeline` is not set.",
    )

    priority: Priority = Field(
        default=Priority.LOW,
        title="Processing Priority",
        description="Priority to assign for processing on the Dask cluster.",
    )

    workflow_type: WorkflowType = Field(
        default=WorkflowType.ON_DEMAND,
        title="Workflow Type",
        description="Type of workflow: ON_DEMAND, BENCHMARKING, SYSTEMATIC.",
    )

    input_products: list[FlowInputProduct] = Field(
        title="Input Products",
        description=(
            "List of input products for the processor. Each item specifies the product name, "
            "the STAC item identifier, and the collection it belongs to."
        ),
        min_length=1,
    )

    generated_product_to_collection_identifier: list[FlowGeneratedProduct] = Field(
        title="Generated Products",
        description=(
            "List of generated products. Each item specifies a name, the product type, "
            "and the collection where the output will be stored."
        ),
        min_length=1,
    )

    auxiliary_product_to_collection_identifier: list[AuxiliaryProductMapping] = Field(
        title="Auxiliary Product Mapping",
        description=(
            "Mapping of auxiliary product types to collections. "
            "Use '*' as a wildcard to map all other auxiliary products."
        ),
        min_length=1,
    )

    processing_mode: list[ProcessingMode] = Field(
        default_factory=list,
        title="Processing Modes",
        description="List of processing modes that control DPR behavior, e.g., ALWAYS, CONDITIONAL.",
    )

    start_datetime: datetime | None = Field(
        default=None,
        title="Start Datetime",
        description="Start datetime for retrieving auxiliary data. ISO format.",
    )

    end_datetime: datetime | None = Field(
        default=None,
        title="End Datetime",
        description="End datetime for retrieving auxiliary data. ISO format.",
    )

    satellite: str | SentinelSatellite | None = Field(
        default=None,
        title="Satellite",
        description="Satellite identifier used in certain queries. Can be a string or SentinelSatellite enum.",
    )

    reference_date: date | None = Field(
        default=None,
        title="Reference Date",
        description="Date used to identify a specific reference/master input product within the list of inputs.",
    )

    instrument_mode: str | InstrumentMode | None = Field(
        default=None,
        title="Instrument Mode",
        description="Instrument mode used in certain queries. Can be a string or InstrumentMode enum.",
    )

    edh_api_key: str | None = Field(
        default=None,
        title="EarthDataHub Standard API key",
        description="Destination Earth / EarthDataHub standard API key used to access Copernicus DEM",
    )

    # -----------------------
    # Validators
    # -----------------------

    @field_validator("processor_name", mode="before")
    @classmethod
    def normalize_processor_name(cls, v):
        """Normalize processor name to string."""
        return v.value if hasattr(v, "value") else v

    @field_validator("satellite", mode="before")
    @classmethod
    def normalize_satellite_name(cls, v):
        """Normalize satellite name to string."""
        return v.value if hasattr(v, "value") else v

    @model_validator(mode="after")
    def check_model(self):
        """Ensure mutual exclusivity between pipeline and unit."""
        has_pipeline = bool(self.pipeline)
        has_unit = bool(self.unit)

        if has_pipeline == has_unit:
            raise ValueError("Exactly one of 'pipeline' or 'unit' must be provided.")

        return self

check_model()

Ensure mutual exclusivity between pipeline and unit.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
452
453
454
455
456
457
458
459
460
461
@model_validator(mode="after")
def check_model(self):
    """Ensure mutual exclusivity between pipeline and unit."""
    has_pipeline = bool(self.pipeline)
    has_unit = bool(self.unit)

    if has_pipeline == has_unit:
        raise ValueError("Exactly one of 'pipeline' or 'unit' must be provided.")

    return self

normalize_processor_name(v) classmethod

Normalize processor name to string.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
440
441
442
443
444
@field_validator("processor_name", mode="before")
@classmethod
def normalize_processor_name(cls, v):
    """Normalize processor name to string."""
    return v.value if hasattr(v, "value") else v

normalize_satellite_name(v) classmethod

Normalize satellite name to string.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
446
447
448
449
450
@field_validator("satellite", mode="before")
@classmethod
def normalize_satellite_name(cls, v):
    """Normalize satellite name to string."""
    return v.value if hasattr(v, "value") else v

DprProcessOut dataclass

Output parameters for the 'dpr-process' flow

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
522
523
524
525
526
527
528
529
@dataclass
class DprProcessOut:
    """
    Output parameters for the 'dpr-process' flow
    """

    status: bool
    product_identifier: list[Item] = field(default_factory=list)

DprProcessedItemMetadata dataclass

Metadata for a DPR processed item.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
532
533
534
535
536
537
538
@dataclass
class DprProcessedItemMetadata:
    """Metadata for a DPR processed item."""

    output_product_id: str
    product_type: str | None
    stac_item: Item

FlowEnv

Prefect flow environment and reusable objects.

Attributes:

Name Type Description
owner_id str

User/owner ID

calling_span SpanContext | None

OpenTelemetry span of the calling flow, if any.

this_span SpanContext | None

Current OpenTelemetry span.

rs_client RsClient

RsClient instance

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
class FlowEnv:
    """
    Prefect flow environment and reusable objects.

    Attributes:
        owner_id (str): User/owner ID
        calling_span (SpanContext | None): OpenTelemetry span of the calling flow, if any.
        this_span (SpanContext | None): Current OpenTelemetry span.
        rs_client (RsClient): RsClient instance
    """

    def __init__(self, args: FlowEnvArgs):
        """Constructor."""
        self.owner_id: str = args.owner_id
        self.calling_span: SpanContext | None = None
        self.this_span: SpanContext | None = None

        # Deserialize the calling span, if any
        if args.calling_span:
            self.calling_span = SpanContext(*args.calling_span)

        # Read prefect blocks into env vars
        prefect_utils.read_prefect_blocks(self.owner_id, _sync=True)  # type: ignore

        # Init opentelemetry traces
        init_opentelemetry.init_traces(args.service_name)

        # Init the RsClient instance from the env vars
        self.rs_client = RsClient(
            rs_server_href=os.getenv("RSPY_WEBSITE"),
            rs_server_api_key=os.getenv("RSPY_APIKEY"),
            owner_id=self.owner_id,
            logger=get_run_logger(),  # type: ignore
        )

    def serialize(self) -> FlowEnvArgs:
        """Serialize this object with Pydantic."""

        # The serialized object will be used by a new opentelemetry span.
        # Its calling span will be either the current span, or the current calling span.
        new_calling_span = self.this_span or self.calling_span
        if new_calling_span:
            # Only keep the first n attributes, the other need custom serialization
            serialized_span = tuple(new_calling_span)[:3]
        else:
            serialized_span = None

        return FlowEnvArgs(owner_id=self.owner_id, calling_span=serialized_span)  # type: ignore

    @_agnosticcontextmanager
    def start_span(
        self,
        instrumenting_module_name: str,
        name: str,
    ) -> Iterator[Span]:
        """
        Context manager for creating a new main or child OpenTelemetry span and set it
        as the current span in this tracer's context.

        Args:
            instrumenting_module_name: Caller module name, just pass __name__
            name: The name of the span to be created (use a custom name)

        Yields:
            The newly-created span.
        """
        # Create new span and save it
        with init_opentelemetry.start_span(  # pylint: disable=contextmanager-generator-missing-cleanup
            instrumenting_module_name,
            name,
            self.calling_span,
        ) as span:
            self.this_span = trace.get_current_span().get_span_context()
            yield span

__init__(args)

Constructor.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def __init__(self, args: FlowEnvArgs):
    """Constructor."""
    self.owner_id: str = args.owner_id
    self.calling_span: SpanContext | None = None
    self.this_span: SpanContext | None = None

    # Deserialize the calling span, if any
    if args.calling_span:
        self.calling_span = SpanContext(*args.calling_span)

    # Read prefect blocks into env vars
    prefect_utils.read_prefect_blocks(self.owner_id, _sync=True)  # type: ignore

    # Init opentelemetry traces
    init_opentelemetry.init_traces(args.service_name)

    # Init the RsClient instance from the env vars
    self.rs_client = RsClient(
        rs_server_href=os.getenv("RSPY_WEBSITE"),
        rs_server_api_key=os.getenv("RSPY_APIKEY"),
        owner_id=self.owner_id,
        logger=get_run_logger(),  # type: ignore
    )

serialize()

Serialize this object with Pydantic.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
192
193
194
195
196
197
198
199
200
201
202
203
204
def serialize(self) -> FlowEnvArgs:
    """Serialize this object with Pydantic."""

    # The serialized object will be used by a new opentelemetry span.
    # Its calling span will be either the current span, or the current calling span.
    new_calling_span = self.this_span or self.calling_span
    if new_calling_span:
        # Only keep the first n attributes, the other need custom serialization
        serialized_span = tuple(new_calling_span)[:3]
    else:
        serialized_span = None

    return FlowEnvArgs(owner_id=self.owner_id, calling_span=serialized_span)  # type: ignore

start_span(instrumenting_module_name, name)

Context manager for creating a new main or child OpenTelemetry span and set it as the current span in this tracer's context.

Parameters:

Name Type Description Default
instrumenting_module_name str

Caller module name, just pass name

required
name str

The name of the span to be created (use a custom name)

required

Yields:

Type Description
Span

The newly-created span.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
@_agnosticcontextmanager
def start_span(
    self,
    instrumenting_module_name: str,
    name: str,
) -> Iterator[Span]:
    """
    Context manager for creating a new main or child OpenTelemetry span and set it
    as the current span in this tracer's context.

    Args:
        instrumenting_module_name: Caller module name, just pass __name__
        name: The name of the span to be created (use a custom name)

    Yields:
        The newly-created span.
    """
    # Create new span and save it
    with init_opentelemetry.start_span(  # pylint: disable=contextmanager-generator-missing-cleanup
        instrumenting_module_name,
        name,
        self.calling_span,
    ) as span:
        self.this_span = trace.get_current_span().get_span_context()
        yield span

FlowEnvArgs

Bases: BaseModel

Prefect flow environment arguments.

Attributes:

Name Type Description
owner_id str

User/owner ID (necessary to retrieve the user info: API key and OAuth2 cookie)

from the right Prefect block. NOTE

may be useless after each user has their own prefect

calling_span tuple

Serialized OpenTelemetry span of the calling flow, if any.

service_name str

OpenTelemetry service name

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class FlowEnvArgs(BaseModel):
    """
    Prefect flow environment arguments.

    Attributes:
        owner_id: User/owner ID (necessary to retrieve the user info: API key and OAuth2 cookie)
        from the right Prefect block. NOTE: may be useless after each user has their own prefect
        server because there will be only one block.
        calling_span (tuple): Serialized OpenTelemetry span of the calling flow, if any.
        service_name: OpenTelemetry service name
    """

    owner_id: str = Field(
        description="User/owner ID (necessary to retrieve the user info) from the right Prefect block",
    )
    calling_span: tuple[int, int, bool] | None = Field(
        default=None,
        description="Serialized OpenTelemetry span of the calling flow, if any",
    )
    service_name: str = Field(default="rs.workflows", description="OpenTelemetry service name")
    called_by: str = Field(
        default="",
        title="Called by",
        description="Any additional information on who or what called the flow",
    )

FlowGeneratedProduct

Bases: BaseModel

Represents one generated output product.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
245
246
247
248
249
250
251
252
253
254
255
256
257
class FlowGeneratedProduct(BaseModel):
    """Represents one generated output product."""

    name: str = Field(description="Output product name.")
    product_type: str = Field(description="Product type.")
    collection_name: str | None = Field(
        default=None,
        description="Collection name. If not provided, it defaults to product_type.",
    )

    def items(self):
        """Helper method to return the model fields as items, useful for logging."""
        return self.model_dump().items()

items()

Helper method to return the model fields as items, useful for logging.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
255
256
257
def items(self):
    """Helper method to return the model fields as items, useful for logging."""
    return self.model_dump().items()

FlowInputProduct

Bases: BaseModel

Represents one input product for the processor.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
233
234
235
236
237
238
239
240
241
242
class FlowInputProduct(BaseModel):
    """Represents one input product for the processor."""

    name: str = Field(description="Input product name.")
    item_id: str = Field(description="STAC item identifier.")
    collection_name: str = Field(description="Collection name.")

    def items(self):
        """Helper method to return the model fields as items, useful for logging."""
        return self.model_dump().items()

items()

Helper method to return the model fields as items, useful for logging.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
240
241
242
def items(self):
    """Helper method to return the model fields as items, useful for logging."""
    return self.model_dump().items()

InstrumentMode

Bases: str, Enum

Instrument mode

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
69
70
71
72
73
74
class InstrumentMode(str, Enum):
    """Instrument mode"""

    EW = "EW"
    IW = "IW"
    SM = "SM"

LoggingLevel

Bases: str, Enum

Logging level allowed by eopf.logging module

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
108
109
110
111
112
113
114
115
class LoggingLevel(str, Enum):
    """Logging level allowed by eopf.logging module"""

    DEBUG = "DEBUG"
    INFO = "INFO"
    WARNING = "WARNING"
    ERROR = "ERROR"
    CRITICAL = "CRITICAL"

Priority

Bases: str, Enum

Priority for the cluster dask to be able to prioritise task execution.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
37
38
39
40
41
42
43
44
class Priority(str, Enum):
    """
    Priority for the cluster dask to be able to prioritise task execution.
    """

    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"

ProcessingMode

Bases: str, Enum

List of mode to be applied when calling the DPR processor.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
57
58
59
60
61
62
63
64
65
66
class ProcessingMode(str, Enum):
    """
    List of mode to be applied when calling the DPR processor.
    """

    NRT = "nrt"
    NTC = "ntc"
    REPROCESSING = "reprocessing"
    SUBS = "subs"
    ALWAYS = "always"

RetryConfig

Bases: BaseModel

Parameters:

Name Type Description Default
staging_retries

Number of retry attempts for staging operations

required
staging_retry_delay

Delay in seconds between retry attempts.

required
Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
541
542
543
544
545
546
547
548
549
class RetryConfig(BaseModel):
    """
    Args:
        staging_retries: Number of retry attempts for staging operations
        staging_retry_delay: Delay in seconds between retry attempts.
    """

    staging_retries: int = Field(3, description="Number of retry attempts for staging operations.")
    staging_retry_delay: int = Field(60, description="Delay in seconds between retry attempts.")

SentinelSatellite

Bases: str, Enum

Sentinel satellite name

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
77
78
79
80
81
82
83
84
85
86
87
88
89
class SentinelSatellite(str, Enum):
    """Sentinel satellite name"""

    # String value = STAC standardized value
    S1A = "sentinel-1a"
    S1B = "sentinel-1b"
    S1C = "sentinel-1c"
    S1D = "sentinel-1d"
    S2A = "sentinel-2a"
    S2B = "sentinel-2b"
    S2C = "sentinel-2c"
    S3A = "sentinel-3a"
    S3B = "sentinel-3b"

WorkflowType

Bases: str, Enum

Workflow type.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
47
48
49
50
51
52
53
54
class WorkflowType(str, Enum):
    """
    Workflow type.
    """

    BENCHMARKING = "benchmarking"
    ON_DEMAND = "on-demand"
    SYSTEMATIC = "systematic"