Skip to content

rs_workflows/adf_flow.md

<< Back to index

Adf conversion flow implementation.

AdfConversionConfig

Bases: NamedTuple

Configuration needed to run one ADF conversion type.

Source code in docs/rs-client-libraries/rs_workflows/adf_flow.py
59
60
61
62
63
64
class AdfConversionConfig(NamedTuple):
    """Configuration needed to run one ADF conversion type."""

    required_types: list[str]
    generated_prod_type: str
    script_path: Path | str

S03OlAdfConfig

Bases: NamedTuple

Configuration specific to one S03 OL ADF type.

Source code in docs/rs-client-libraries/rs_workflows/adf_flow.py
67
68
69
70
71
class S03OlAdfConfig(NamedTuple):
    """Configuration specific to one S03 OL ADF type."""

    required_type: str
    generated_prod_type: str

SafeDict

Bases: dict

Dict subclass that returns {key} if key is missing or its value is None.

Source code in docs/rs-client-libraries/rs_workflows/adf_flow.py
377
378
379
380
381
382
383
384
385
386
387
class SafeDict(dict):
    """Dict subclass that returns {key} if key is missing or its value is None."""

    def __missing__(self, key):
        return "{" + key + "}"

    def __getitem__(self, key):
        value = super().__getitem__(key)
        if value is None:
            return "{" + key + "}"
        return value

adf_conversion(adf_input) async

Prefect flow for ADF conversion.

Source code in docs/rs-client-libraries/rs_workflows/adf_flow.py
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
@flow(name="convert-adf")
async def adf_conversion(adf_input: AdfProcessIn):
    """
    Prefect flow for ADF conversion.
    """
    logger = get_run_logger()
    logger.setLevel(logging.DEBUG)
    logger.info(f"Starting adf_conversion flow for adf_type: {adf_input.adf_type}")

    flow_env = FlowEnv(adf_input.env)
    with flow_env.start_span(__name__, "adf_conversion"):
        # 1. Build CQL2 filters for required auxiliary types
        adf_config = ADF_TYPE_CONFIG.get(adf_input.adf_type)
        if adf_config is None:
            logger.error(f"Unsupported adf_type: {adf_input.adf_type}")
            return

        required_types = adf_config.required_types
        generated_prod_type = adf_config.generated_prod_type
        script_path = adf_config.script_path

        staged_items: list[Item] = []
        for prod_type in required_types:
            cql2_filter: dict = {}
            if adf_input.cql2_filter is not None:
                logger.info("Use the provided CQL2 filter for auxiliary data retrieval")
                cql2_filter = substitute_values(adf_input.cql2_filter, {"product_type": prod_type})
                logger.debug(f"Substituted CQL2 filter for product type {prod_type}: {cql2_filter}")
            else:
                logger.info("Build a default CQL2 filter for auxiliary data retrieval with operator t_intersects")
                cql2_filter = {
                    "filter": {
                        "op": "and",
                        "args": [
                            {
                                "op": "t_intersects",
                                "args": [
                                    {"interval": [{"property": "start_datetime"}, {"property": "end_datetime"}]},
                                    {
                                        "interval": [
                                            adf_input.start_datetime.isoformat() if adf_input.start_datetime else None,
                                            adf_input.end_datetime.isoformat() if adf_input.end_datetime else None,
                                        ],
                                    },
                                ],
                            },
                            {"op": "=", "args": [{"property": "product:type"}, prod_type]},
                        ],
                    },
                }
            logger.info(f"Built CQL2 filter for product type {prod_type}: {cql2_filter}")

            # find target collection from mapping, preferring an exact product type match
            target_collection = resolve_collection_name(
                adf_input.auxiliary_product_to_collection_identifier,
                prod_type,
            )
            if target_collection is None:
                raise RuntimeError(
                    "❌ No target collection found for input product type "
                    f"{prod_type!r} in auxiliary_product_to_collection_identifier.",
                )

            logger.info(f"Staging {prod_type} to collection {target_collection}")
            success, items = await aux_staging_task(
                env=adf_input.env,
                cql2_filter=cql2_filter,
                catalog_collection_identifier=target_collection,
            )
            logger.debug(f"Staging result for product type {prod_type}: success={success}, items={items}")
            if success and items:
                staged_items.extend(items)

        if not staged_items:
            logger.warning("⚠️ No staged items found to process.")
            return

        with tempfile.TemporaryDirectory() as temp_dir:
            temp_path = Path(temp_dir)
            input_dir = temp_path / "INPUT"
            work_dir = temp_path / "WORK"
            output_dir = temp_path / "OUTPUT"

            input_dir.mkdir()
            work_dir.mkdir()
            output_dir.mkdir()

            # 2. Download and unzip assets locally
            await download_and_extract_assets_task(staged_items, input_dir)  # type: ignore[arg-type]

            # 3. Call the conversion tool
            try:
                zarr_product_paths = run_adf_script(script_path, input_dir, work_dir, output_dir)
            finally:
                logger.info(f"Cleaning up input directory {input_dir}")
                shutil.rmtree(input_dir, ignore_errors=True)

            # 4. Process each generated ZARR product
            # compute bucket configuration once for all products
            bucket_configuration = fetch_csv_from_endpoint(os.environ["RSPY_HOST_OSAM"] + "/internal/configuration")
            owner_id = flow_env.owner_id

            items_metadata: list[DprProcessedItemMetadata] = []
            publish_mapping: list[FlowGeneratedProduct] = []

            for zarr_product_path in zarr_product_paths:
                # 5. Create STAC item for this ZARR
                stac_item = create_stac_item_from_zarr(zarr_product_path, generated_prod_type)
                stac_item.properties["product:type"] = generated_prod_type

                # 6. Resolve destination collection
                target_collection = resolve_collection_name(
                    adf_input.auxiliary_product_to_collection_identifier,
                    generated_prod_type,
                )
                if target_collection is None:
                    raise RuntimeError(
                        "❌ No target collection found for generated product type "
                        f"{generated_prod_type!r} in auxiliary_product_to_collection_identifier.",
                    )

                bucket_name = find_s3_output_bucket(
                    bucket_configuration,
                    owner_id,
                    target_collection,
                    generated_prod_type,
                )

                # 7. Upload product to S3 and update STAC item href
                if zarr_product_path.suffix == ".json":
                    s3_dest = f"s3://{bucket_name}/{owner_id}/{target_collection}/{zarr_product_path.name}"
                    logger.info(f"Uploading JSON to {s3_dest}")
                    await s3_upload_file(zarr_product_path, s3_dest)
                    stac_item.assets["data"].href = s3_dest
                else:
                    zarr_suffix = ".zarr" if zarr_product_path.suffix == ".zarr" else ""
                    s3_dest_prefix = f"s3://{bucket_name}/{owner_id}/{target_collection}/{stac_item.id}{zarr_suffix}/"
                    logger.info(f"Uploading ZARR to {s3_dest_prefix}")
                    await s3_upload_dir(zarr_product_path, s3_dest_prefix)
                    stac_item.assets["data"].href = s3_dest_prefix

                items_metadata.append(
                    DprProcessedItemMetadata(
                        output_product_id=stac_item.id,
                        product_type=generated_prod_type,
                        stac_item=stac_item,
                    ),
                )
                publish_mapping.append(
                    FlowGeneratedProduct(
                        name=stac_item.id,
                        product_type=generated_prod_type,
                        collection_name=target_collection,
                    ),
                )

            # 8. Publish all items to catalog
            try:
                await publish(
                    adf_input.env,
                    publish_mapping,
                    items_metadata,
                )
            finally:
                logger.info(f"Cleaning up output directory {output_dir}")
                shutil.rmtree(output_dir, ignore_errors=True)

adf_conversion_task(*args, **kwargs) async

See: adf_conversion

Source code in docs/rs-client-libraries/rs_workflows/adf_flow.py
572
573
574
575
@task(name="convert-adf-task")
async def adf_conversion_task(*args, **kwargs) -> None:
    """See: adf_conversion"""
    return await adf_conversion.fn(*args, **kwargs)

create_stac_item_from_zarr(zarr_path, generated_prod_type)

Create a STAC Item from a generated ZARR directory or JSON file.

For ZARR directories, metadata is read from .zattrs or zarr.json. For JSON files, the file itself is the metadata and the item ID is taken from stac_discovery.id when present, otherwise it is derived from the filename (minus the .json extension).

Source code in docs/rs-client-libraries/rs_workflows/adf_flow.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
@task(name="Create STAC Item from ZARR metadata")
def create_stac_item_from_zarr(zarr_path: Path, generated_prod_type: str) -> Item:
    """
    Create a STAC Item from a generated ZARR directory or JSON file.

    For ZARR directories, metadata is read from .zattrs or zarr.json.
    For JSON files, the file itself is the metadata and the item ID is
    taken from stac_discovery.id when present, otherwise it is derived
    from the filename (minus the .json extension).
    """
    logger = get_run_logger()
    is_json_product = zarr_path.suffix == ".json"

    if is_json_product:
        logger.info(f"Creating STAC item from JSON file: {zarr_path}")
        metadata_path = zarr_path
    else:
        logger.info(f"Creating STAC item from ZARR: {zarr_path}")
        # read .zattrs for global attributes
        metadata_path = zarr_path / ".zattrs"
        if not metadata_path.exists():
            # try zarr.json if .zattrs doesn't exist
            metadata_path = zarr_path / "zarr.json"

    if not metadata_path.exists():
        raise RuntimeError(f"Metadata file (.zattrs or zarr.json) not found in {zarr_path}")

    with open(metadata_path, encoding="utf-8") as f:
        metadata = json.load(f)

    stac_discovery = metadata.get("stac_discovery") if isinstance(metadata.get("stac_discovery"), dict) else {}
    stac_metadata = stac_discovery if is_json_product and stac_discovery else metadata

    # extract STAC properties from metadata.
    # the scripts put them in 'properties' attribute.
    logger.info(f"Stac discovery metadata: {stac_discovery}")
    logger.info(f'Product metadata: {stac_metadata.get("properties", {})}')
    stac_props = normalize_stac_properties_datetimes(stac_metadata.get("properties", {}))
    logger.info(f"Extracted STAC properties from product metadata: {stac_props}")

    # requirement: "Create a STAC item ... with generated product type as product:type"
    # but the script may set it differently
    stac_props["product:type"] = generated_prod_type

    # For JSON files prefer the STAC discovery item ID, otherwise use the filename
    # without the .json extension. For ZARR directories fall back to metadata ID.
    if is_json_product:
        item_id = stac_discovery.get("id") or zarr_path.stem
    else:
        item_id = metadata.get("id", zarr_path.stem)
    logger.info(f"Setting item_id to {item_id}")

    # extract start/end datetime for pystac.Item validation.
    # try to find them in the metadata, but if they are not present, try to take them from the
    # item_id string pattern. If still not found, raise an error.
    # priority: metadata properties > datetimes embedded in item_id
    start_dt_str = stac_props.get("start_datetime")
    end_dt_str = stac_props.get("end_datetime")
    if not start_dt_str or not end_dt_str:
        id_datetimes = extract_datetimes_from_item_id(item_id)
        if id_datetimes:
            start_dt_str = start_dt_str or id_datetimes[0]
            end_dt_str = end_dt_str or id_datetimes[1]
            logger.info(f"Extracted start/end datetimes from item_id: {id_datetimes[0]} / {id_datetimes[1]}")
        else:
            msg = f"Could not extract datetimes from metadata properties or from item_id {item_id!r}"
            logger.error(msg)
            raise RuntimeError(msg)

    # We check if platform is part of the STAC properties.
    # If not, we will get it from the name of the generated item
    if not stac_props.get("platform"):
        value_platform: str = item_id[2:4].lower()
        if value_platform not in ("0_", "__", "00"):
            stac_props["platform"] = f"sentinel-{value_platform}"
            logger.info(
                "'platform' property has been computed from the item name." f"Value is '{stac_props["platform"]}'",
            )
        else:
            logger.info("'platform' is not added to properties because ADF is not specific to a platform.")

    start_dt = parse_date(start_dt_str) if start_dt_str else None
    end_dt = parse_date(end_dt_str) if end_dt_str else None

    # build basic STAC item
    item = Item(
        id=item_id,
        geometry=stac_metadata.get("geometry", None),
        bbox=stac_metadata.get("bbox", None),
        datetime=None,
        start_datetime=start_dt,
        end_datetime=end_dt,
        properties=stac_props,
    )

    # add product as an asset
    logger.info(f"Adding asset to STAC item with href {str(zarr_path)}")
    if is_json_product:
        item.add_asset(
            key="data",
            asset=Asset(
                href=str(zarr_path),
                title=item_id,
                media_type="application/json",
                roles=["data", "metadata"],
            ),
        )
    else:
        item.add_asset(
            key="data",
            asset=Asset(
                href=str(zarr_path),
                title=item_id,
                media_type="application/vnd+zarr",
                roles=["data", "metadata"],
            ),
        )

    return item

extract_datetimes_from_item_id(item_id)

Extract start and end datetime ISO strings from an item_id.

The expected item_id format is

_

Returns a (start_datetime, end_datetime) tuple on success, or None when the pattern does not match.

Source code in docs/rs-client-libraries/rs_workflows/adf_flow.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def extract_datetimes_from_item_id(item_id: str) -> tuple[str, str] | None:
    """Extract start and end datetime ISO strings from an item_id.

    The expected item_id format is:
        <prefix>_<start_YYYYMMDDTHHMMSS>_<end_YYYYMMDDTHHMMSS>_<creation_YYYYMMDDTHHMMSS>

    Returns a (start_datetime, end_datetime) tuple on success, or None when the
    pattern does not match.
    """
    match = _ITEM_ID_DT_RE.search(item_id)
    if not match:
        return None
    start_raw, end_raw = match.group(1), match.group(2)
    start_iso = (
        datetime.strptime(start_raw, "%Y%m%dT%H%M%S").replace(tzinfo=timezone.utc).isoformat().replace("+00:00", "Z")
    )
    end_iso = (
        datetime.strptime(end_raw, "%Y%m%dT%H%M%S").replace(tzinfo=timezone.utc).isoformat().replace("+00:00", "Z")
    )
    return start_iso, end_iso

normalize_stac_datetime_value(value)

Normalize a datetime string to an RFC 3339 UTC representation.

Source code in docs/rs-client-libraries/rs_workflows/adf_flow.py
174
175
176
177
178
179
180
181
def normalize_stac_datetime_value(value: str) -> str:
    """Normalize a datetime string to an RFC 3339 UTC representation."""
    parsed_value = parse_date(value)
    if parsed_value.tzinfo is None:
        parsed_value = parsed_value.replace(tzinfo=timezone.utc)
    else:
        parsed_value = parsed_value.astimezone(timezone.utc)
    return parsed_value.isoformat().replace("+00:00", "Z")

normalize_stac_properties_datetimes(stac_props)

Normalize STAC datetime-like property values to include an explicit timezone.

Source code in docs/rs-client-libraries/rs_workflows/adf_flow.py
184
185
186
187
188
189
190
191
def normalize_stac_properties_datetimes(stac_props: dict) -> dict:
    """Normalize STAC datetime-like property values to include an explicit timezone."""
    normalized_props = dict(stac_props)
    for key in STAC_DATETIME_PROPERTY_NAMES:
        value = normalized_props.get(key)
        if isinstance(value, str):
            normalized_props[key] = normalize_stac_datetime_value(value)
    return normalized_props

resolve_collection_name(mappings, product_type)

Resolve a collection name from mappings, preferring an exact match over '*'.

Source code in docs/rs-client-libraries/rs_workflows/adf_flow.py
163
164
165
166
167
168
169
170
171
def resolve_collection_name(mappings, product_type: str) -> str | None:
    """Resolve a collection name from mappings, preferring an exact match over '*'."""
    fallback_collection = None
    for mapping in mappings:
        if mapping.product_type == product_type:
            return mapping.collection_name
        if mapping.product_type == "*" and fallback_collection is None:
            fallback_collection = mapping.collection_name
    return fallback_collection

run_adf_script(script_path, data_dir, working_dir, output_dir)

Run an ADF conversion tool with the given input and output directories.

When script_path is a Path, the corresponding Python script is executed with the current interpreter. When it equals :data:STB_CONVERT_PRODUCTS, the stb_convert_products executable is called instead.

Returns the list of generated ZARR product directories and JSON files.

Source code in docs/rs-client-libraries/rs_workflows/adf_flow.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
@task(name="Run ADF conversion")
def run_adf_script(script_path: Path | str, data_dir: Path, working_dir: Path, output_dir: Path) -> list[Path]:
    """
    Run an ADF conversion tool with the given input and output directories.

    When *script_path* is a Path, the corresponding Python script is executed
    with the current interpreter.  When it equals :data:`STB_CONVERT_PRODUCTS`,
    the ``stb_convert_products`` executable is called instead.

    Returns the list of generated ZARR product directories and JSON files.
    """
    logger = get_run_logger()
    logger.info(f"Running ADF conversion: {script_path}")

    def log_subprocess_output(output: str):
        """Forward subprocess output to the Prefect logger line by line."""
        for line in output.splitlines():
            logger.info(f"Conversion log: {line}")

    # Build command depending on the conversion tool
    if script_path == STB_CONVERT_PRODUCTS:
        command = ["stb_convert_products", "-i", str(data_dir), "-o", str(output_dir)]
        env = None  # inherit current environment
    else:
        env = os.environ.copy()
        env["ADF_OUTPUT"] = str(output_dir)
        # ADF_INPUT is used specifically by the script S00__ADF_GETAS.py to find the input data
        # we also pass it as an argument for the other scripts that don't use this variable, but
        # they receive it as an argument without causing any issue
        env["ADF_INPUT"] = str(data_dir)
        command = [sys.executable, str(script_path), str(data_dir), "--working_dir", str(working_dir)]

    try:
        result = subprocess.run(  # nosec B603
            # trusted local script / executable with flow-created paths
            command,
            env=env,
            check=True,
            capture_output=True,
            text=True,
        )
        log_subprocess_output(result.stdout)
        log_subprocess_output(result.stderr)
    except subprocess.CalledProcessError as e:
        log_subprocess_output(e.stdout or "")
        log_subprocess_output(e.stderr or "")
        logger.error(f"ADF conversion failed with exit code {e.returncode}")
        raise

    # find the generated ZARR directories and JSON files in output_dir
    zarr_products = sorted(output_dir.glob("*.zarr"))
    json_products = sorted(output_dir.glob("*.json"))
    all_products = zarr_products + json_products
    if not all_products:
        raise RuntimeError(
            f"No ZARR or JSON product generated in {output_dir}. The content of this dir is: "
            f"{list(output_dir.glob('*'))}",
        )

    return all_products

substitute_values(obj, values)

Recursively substitute values in a nested structure of dicts/lists/strings.

Source code in docs/rs-client-libraries/rs_workflows/adf_flow.py
390
391
392
393
394
395
396
397
398
def substitute_values(obj, values):
    """Recursively substitute values in a nested structure of dicts/lists/strings."""
    if isinstance(obj, dict):
        return {k: substitute_values(v, values) for k, v in obj.items()}
    if isinstance(obj, list):
        return [substitute_values(v, values) for v in obj]
    if isinstance(obj, str):
        return obj.format_map(SafeDict(values))
    return obj