Skip to content

rs_workflows/flow_utils.md

<< Back to index

Utility module for the Prefect flows.

AuxiliaryProductMapping

Bases: BaseModel

Represents mapping for auxiliary products.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
206
207
208
209
210
211
212
213
214
class AuxiliaryProductMapping(BaseModel):
    """Represents mapping for auxiliary products."""

    product_type: str = Field(description="Product type or '*' wildcard.")
    collection_name: str = Field(description="Collection name.")

    def items(self):
        """Helper method to return the model fields as items, useful for logging."""
        return self.model_dump().items()

items()

Helper method to return the model fields as items, useful for logging.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
212
213
214
def items(self):
    """Helper method to return the model fields as items, useful for logging."""
    return self.model_dump().items()

DprProcessIn

Bases: BaseModel

Input parameters for executing the 'dpr-process' flow.

This model defines all the configuration needed to run a DPR processor, including input datasets, generated outputs, auxiliary data mapping, processing modes, and scheduling parameters.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
class DprProcessIn(BaseModel):
    """
    Input parameters for executing the 'dpr-process' flow.

    This model defines all the configuration needed to run a DPR processor,
    including input datasets, generated outputs, auxiliary data mapping,
    processing modes, and scheduling parameters.
    """

    env: FlowEnvArgs = Field(
        title="Flow Environment",
        description="Environment configuration for Prefect flow. Includes identifiers like owner_id.",
    )

    processor_name: str | DprProcessor = Field(
        title="DPR Processor Name",
        description="Name of the DPR processor to run. Can be a string or DprProcessor enum.",
    )

    processor_version: str = Field(
        title="Processor Version",
        description="Version of the processor. If not relevant, can be empty string.",
    )

    dask_cluster_label: str = Field(
        title="Dask Cluster Label",
        description='Label of the Dask cluster to use, e.g. "dask-l0" for local testing.',
    )

    dask_cluster_instance: str | None = Field(
        default=None,
        title="Dask Cluster Instance",
        description="Optional Dask cluster instance ID used to build a direct dashboard URL.",
    )

    s3_payload_file: str = Field(
        title="S3 Payload File",
        description="S3 path where the processor payload (JSON) will be written for execution.",
    )

    pipeline: str | DprPipeline | None = Field(
        default=None,
        title="Pipeline Name",
        description="Name of the processing pipeline. Must be provided if `unit` is not set.",
    )

    unit: str | None = Field(
        default=None,
        title="Unit Name",
        description="Processing unit name. Must be provided if `pipeline` is not set.",
    )

    priority: Priority = Field(
        default=Priority.LOW,
        title="Processing Priority",
        description="Priority to assign for processing on the Dask cluster.",
    )

    workflow_type: WorkflowType = Field(
        default=WorkflowType.ON_DEMAND,
        title="Workflow Type",
        description="Type of workflow: ON_DEMAND, BENCHMARKING, SYSTEMATIC.",
    )

    input_products: list[FlowInputProduct] = Field(
        title="Input Products",
        description=(
            "List of input products for the processor. Each item specifies the product name, "
            "the STAC item identifier, and the collection it belongs to."
        ),
        min_length=1,
    )

    generated_product_to_collection_identifier: list[FlowGeneratedProduct] = Field(
        title="Generated Products",
        description=(
            "List of generated products. Each item specifies a name, the product type, "
            "and the collection where the output will be stored."
        ),
        min_length=1,
    )

    auxiliary_product_to_collection_identifier: list[AuxiliaryProductMapping] = Field(
        title="Auxiliary Product Mapping",
        description=(
            "Mapping of auxiliary product types to collections. "
            "Use '*' as a wildcard to map all other auxiliary products."
        ),
        min_length=1,
    )

    processing_mode: list[ProcessingMode] = Field(
        default_factory=list,
        title="Processing Modes",
        description="List of processing modes that control DPR behavior, e.g., ALWAYS, CONDITIONAL.",
    )

    start_datetime: datetime | None = Field(
        default=None,
        title="Start Datetime",
        description="Start datetime for retrieving auxiliary data. ISO format.",
    )

    end_datetime: datetime | None = Field(
        default=None,
        title="End Datetime",
        description="End datetime for retrieving auxiliary data. ISO format.",
    )

    satellite: str | SentinelSatellite | None = Field(
        default=None,
        title="Satellite",
        description="Satellite identifier used in certain queries. Can be a string or SentinelSatellite enum.",
    )

    # -----------------------
    # Validators
    # -----------------------

    @field_validator("processor_name", mode="before")
    @classmethod
    def normalize_processor_name(cls, v):
        """Normalize processor name to string."""
        return v.value if hasattr(v, "value") else v

    @field_validator("satellite", mode="before")
    @classmethod
    def normalize_satellite_name(cls, v):
        """Normalize satellite name to string."""
        return v.value if hasattr(v, "value") else v

    @model_validator(mode="after")
    def check_model(self):
        """Ensure mutual exclusivity between pipeline and unit."""
        has_pipeline = bool(self.pipeline)
        has_unit = bool(self.unit)

        if has_pipeline == has_unit:
            raise ValueError("Exactly one of 'pipeline' or 'unit' must be provided.")

        return self

check_model()

Ensure mutual exclusivity between pipeline and unit.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
348
349
350
351
352
353
354
355
356
357
@model_validator(mode="after")
def check_model(self):
    """Ensure mutual exclusivity between pipeline and unit."""
    has_pipeline = bool(self.pipeline)
    has_unit = bool(self.unit)

    if has_pipeline == has_unit:
        raise ValueError("Exactly one of 'pipeline' or 'unit' must be provided.")

    return self

normalize_processor_name(v) classmethod

Normalize processor name to string.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
336
337
338
339
340
@field_validator("processor_name", mode="before")
@classmethod
def normalize_processor_name(cls, v):
    """Normalize processor name to string."""
    return v.value if hasattr(v, "value") else v

normalize_satellite_name(v) classmethod

Normalize satellite name to string.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
342
343
344
345
346
@field_validator("satellite", mode="before")
@classmethod
def normalize_satellite_name(cls, v):
    """Normalize satellite name to string."""
    return v.value if hasattr(v, "value") else v

DprProcessOut dataclass

Output parameters for the 'dpr-process' flow

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
360
361
362
363
364
365
366
367
@dataclass
class DprProcessOut:
    """
    Output parameters for the 'dpr-process' flow
    """

    status: bool
    product_identifier: list[Item] = field(default_factory=list)

DprProcessedItemMetadata dataclass

Metadata for a DPR processed item.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
370
371
372
373
374
375
376
@dataclass
class DprProcessedItemMetadata:
    """Metadata for a DPR processed item."""

    output_product_id: str
    product_type: str | None
    stac_item: Item

FlowEnv

Prefect flow environment and reusable objects.

Attributes:

Name Type Description
owner_id str

User/owner ID

calling_span SpanContext | None

OpenTelemetry span of the calling flow, if any.

this_span SpanContext | None

Current OpenTelemetry span.

rs_client RsClient

RsClient instance

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
class FlowEnv:
    """
    Prefect flow environment and reusable objects.

    Attributes:
        owner_id (str): User/owner ID
        calling_span (SpanContext | None): OpenTelemetry span of the calling flow, if any.
        this_span (SpanContext | None): Current OpenTelemetry span.
        rs_client (RsClient): RsClient instance
    """

    def __init__(self, args: FlowEnvArgs):
        """Constructor."""
        self.owner_id: str = args.owner_id
        self.calling_span: SpanContext | None = None
        self.this_span: SpanContext | None = None

        # Deserialize the calling span, if any
        if args.calling_span:
            self.calling_span = SpanContext(*args.calling_span)

        # Read prefect blocks into env vars
        prefect_utils.read_prefect_blocks(self.owner_id, _sync=True)  # type: ignore

        # Init opentelemetry traces
        init_opentelemetry.init_traces(args.service_name)

        # Init the RsClient instance from the env vars
        self.rs_client = RsClient(
            rs_server_href=os.getenv("RSPY_WEBSITE"),
            rs_server_api_key=os.getenv("RSPY_APIKEY"),
            owner_id=self.owner_id,
            logger=get_run_logger(),  # type: ignore
        )

    def serialize(self) -> FlowEnvArgs:
        """Serialize this object with Pydantic."""

        # The serialized object will be used by a new opentelemetry span.
        # Its calling span will be either the current span, or the current calling span.
        new_calling_span = self.this_span or self.calling_span
        if new_calling_span:
            # Only keep the first n attributes, the other need custom serialization
            serialized_span = tuple(new_calling_span)[:3]
        else:
            serialized_span = None

        return FlowEnvArgs(owner_id=self.owner_id, calling_span=serialized_span)  # type: ignore

    @_agnosticcontextmanager
    def start_span(
        self,
        instrumenting_module_name: str,
        name: str,
    ) -> Iterator[Span]:
        """
        Context manager for creating a new main or child OpenTelemetry span and set it
        as the current span in this tracer's context.

        Args:
            instrumenting_module_name: Caller module name, just pass __name__
            name: The name of the span to be created (use a custom name)

        Yields:
            The newly-created span.
        """
        # Create new span and save it
        with init_opentelemetry.start_span(  # pylint: disable=contextmanager-generator-missing-cleanup
            instrumenting_module_name,
            name,
            self.calling_span,
        ) as span:
            self.this_span = trace.get_current_span().get_span_context()
            yield span

__init__(args)

Constructor.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def __init__(self, args: FlowEnvArgs):
    """Constructor."""
    self.owner_id: str = args.owner_id
    self.calling_span: SpanContext | None = None
    self.this_span: SpanContext | None = None

    # Deserialize the calling span, if any
    if args.calling_span:
        self.calling_span = SpanContext(*args.calling_span)

    # Read prefect blocks into env vars
    prefect_utils.read_prefect_blocks(self.owner_id, _sync=True)  # type: ignore

    # Init opentelemetry traces
    init_opentelemetry.init_traces(args.service_name)

    # Init the RsClient instance from the env vars
    self.rs_client = RsClient(
        rs_server_href=os.getenv("RSPY_WEBSITE"),
        rs_server_api_key=os.getenv("RSPY_APIKEY"),
        owner_id=self.owner_id,
        logger=get_run_logger(),  # type: ignore
    )

serialize()

Serialize this object with Pydantic.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
138
139
140
141
142
143
144
145
146
147
148
149
150
def serialize(self) -> FlowEnvArgs:
    """Serialize this object with Pydantic."""

    # The serialized object will be used by a new opentelemetry span.
    # Its calling span will be either the current span, or the current calling span.
    new_calling_span = self.this_span or self.calling_span
    if new_calling_span:
        # Only keep the first n attributes, the other need custom serialization
        serialized_span = tuple(new_calling_span)[:3]
    else:
        serialized_span = None

    return FlowEnvArgs(owner_id=self.owner_id, calling_span=serialized_span)  # type: ignore

start_span(instrumenting_module_name, name)

Context manager for creating a new main or child OpenTelemetry span and set it as the current span in this tracer's context.

Parameters:

Name Type Description Default
instrumenting_module_name str

Caller module name, just pass name

required
name str

The name of the span to be created (use a custom name)

required

Yields:

Type Description
Span

The newly-created span.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@_agnosticcontextmanager
def start_span(
    self,
    instrumenting_module_name: str,
    name: str,
) -> Iterator[Span]:
    """
    Context manager for creating a new main or child OpenTelemetry span and set it
    as the current span in this tracer's context.

    Args:
        instrumenting_module_name: Caller module name, just pass __name__
        name: The name of the span to be created (use a custom name)

    Yields:
        The newly-created span.
    """
    # Create new span and save it
    with init_opentelemetry.start_span(  # pylint: disable=contextmanager-generator-missing-cleanup
        instrumenting_module_name,
        name,
        self.calling_span,
    ) as span:
        self.this_span = trace.get_current_span().get_span_context()
        yield span

FlowEnvArgs

Bases: BaseModel

Prefect flow environment arguments.

Attributes:

Name Type Description
owner_id str

User/owner ID (necessary to retrieve the user info: API key and OAuth2 cookie)

from the right Prefect block. NOTE

may be useless after each user has their own prefect

calling_span tuple

Serialized OpenTelemetry span of the calling flow, if any.

service_name str

OpenTelemetry service name

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class FlowEnvArgs(BaseModel):
    """
    Prefect flow environment arguments.

    Attributes:
        owner_id: User/owner ID (necessary to retrieve the user info: API key and OAuth2 cookie)
        from the right Prefect block. NOTE: may be useless after each user has their own prefect
        server because there will be only one block.
        calling_span (tuple): Serialized OpenTelemetry span of the calling flow, if any.
        service_name: OpenTelemetry service name
    """

    owner_id: str = Field(
        description="User/owner ID (necessary to retrieve the user info) from the right Prefect block",
    )
    calling_span: tuple[int, int, bool] | None = Field(
        default=None,
        description="Serialized OpenTelemetry span of the calling flow, if any",
    )
    service_name: str = Field(default="rs.workflows", description="OpenTelemetry service name")

FlowGeneratedProduct

Bases: BaseModel

Represents one generated output product.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
191
192
193
194
195
196
197
198
199
200
201
202
203
class FlowGeneratedProduct(BaseModel):
    """Represents one generated output product."""

    name: str = Field(description="Output product name.")
    product_type: str = Field(description="Product type.")
    collection_name: str | None = Field(
        default=None,
        description="Collection name. If not provided, it defaults to product_type.",
    )

    def items(self):
        """Helper method to return the model fields as items, useful for logging."""
        return self.model_dump().items()

items()

Helper method to return the model fields as items, useful for logging.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
201
202
203
def items(self):
    """Helper method to return the model fields as items, useful for logging."""
    return self.model_dump().items()

FlowInputProduct

Bases: BaseModel

Represents one input product for the processor.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
179
180
181
182
183
184
185
186
187
188
class FlowInputProduct(BaseModel):
    """Represents one input product for the processor."""

    name: str = Field(description="Input product name.")
    cadip_session: str = Field(description="STAC item identifier.")
    collection_name: str = Field(description="Collection name.")

    def items(self):
        """Helper method to return the model fields as items, useful for logging."""
        return self.model_dump().items()

items()

Helper method to return the model fields as items, useful for logging.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
186
187
188
def items(self):
    """Helper method to return the model fields as items, useful for logging."""
    return self.model_dump().items()

Priority

Bases: str, Enum

Priority for the cluster dask to be able to prioritise task execution.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
35
36
37
38
39
40
41
42
class Priority(str, Enum):
    """
    Priority for the cluster dask to be able to prioritise task execution.
    """

    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"

ProcessingMode

Bases: str, Enum

List of mode to be applied when calling the DPR processor.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
55
56
57
58
59
60
61
62
63
64
class ProcessingMode(str, Enum):
    """
    List of mode to be applied when calling the DPR processor.
    """

    NRT = "nrt"
    NTC = "ntc"
    REPROCESSING = "reprocessing"
    SUBS = "subs"
    ALWAYS = "always"

RetryConfig

Bases: BaseModel

Parameters:

Name Type Description Default
staging_retries

Number of retry attempts for staging operations

required
staging_retry_delay

Delay in seconds between retry attempts.

required
Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
379
380
381
382
383
384
385
386
387
class RetryConfig(BaseModel):
    """
    Args:
        staging_retries: Number of retry attempts for staging operations
        staging_retry_delay: Delay in seconds between retry attempts.
    """

    staging_retries: int = Field(3, description="Number of retry attempts for staging operations.")
    staging_retry_delay: int = Field(60, description="Delay in seconds between retry attempts.")

SentinelSatellite

Bases: str, Enum

Sentinel satellite name

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
67
68
69
70
71
72
73
74
75
76
77
78
class SentinelSatellite(str, Enum):
    """Sentinel satellite name"""

    # String value = STAC standardized value
    S1A = "sentinel-1a"
    S1B = "sentinel-1b"
    S1C = "sentinel-1c"
    S2A = "sentinel-2a"
    S2B = "sentinel-2b"
    S2C = "sentinel-2c"
    S3A = "sentinel-3a"
    S3B = "sentinel-3b"

WorkflowType

Bases: str, Enum

Workflow type.

Source code in docs/rs-client-libraries/rs_workflows/flow_utils.py
45
46
47
48
49
50
51
52
class WorkflowType(str, Enum):
    """
    Workflow type.
    """

    BENCHMARKING = "benchmarking"
    ON_DEMAND = "on-demand"
    SYSTEMATIC = "systematic"