Skip to content

rs_workflows/on_demand_conversion.md

<< Back to index

Prefect flows and tasks for on-demand SAFE -zarr conversion.

cleanup_staged_safe_item_task(env, collection_id, item_id) async

Remove the staged SAFE item from the catalog after conversion.

Source code in docs/rs-client-libraries/rs_workflows/on_demand_conversion.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@task(name="Cleanup staged SAFE item")
async def cleanup_staged_safe_item_task(
    env: FlowEnvArgs,
    collection_id: str,
    item_id: str,
) -> None:
    """Remove the staged SAFE item from the catalog after conversion."""
    logger = get_run_logger()
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "cleanup-staged-safe-item"):
        catalog_client: CatalogClient = flow_env.rs_client.get_catalog_client()
        logger.info(f"Removing staged SAFE item {item_id!r} from output collection {collection_id!r}.")
        catalog_client.remove_item(collection_id, item_id, raise_for_status=False)
        logger.info(f"Removed staged SAFE item {item_id!r} from output collection {collection_id!r}.")

on_demand_conversion(conversion_input, retry_config=RetryConfig()) async

Docstring

Source code in docs/rs-client-libraries/rs_workflows/on_demand_conversion.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
@flow
async def on_demand_conversion(
    conversion_input: ConversionIn,
    retry_config: RetryConfig = RetryConfig(),  # type: ignore
):
    """Docstring"""
    logger = get_run_logger()
    logger.info(f"Starting on-demand conversion flow with input: {conversion_input}")
    flow_env = FlowEnv(conversion_input.env)
    staging_collection = conversion_input.generated_product_to_collection_identifier.collection_name
    if staging_collection is None:
        raise ValueError("collection_name is required to stage and retrieve the SAFE item")

    with flow_env.start_span(__name__, "legacy-conversion"):
        # 1. stage
        logger.info("Staging task submitted, waiting for completion...")

        # stac_input is str | dict; normalize once so all downstream code works uniformly.
        stac_item: str | dict[str, Any] = conversion_input.stac_input
        stac_item_id = stac_item.split("/")[-1] if isinstance(stac_item, str) else stac_item["features"][0]["id"]
        original_stac_href = None
        if isinstance(stac_item, str):
            original_stac_href = stac_item
        else:
            input_feature = stac_item["features"][0]
            original_stac_href = next(
                (link["href"] for link in input_feature.get("links", []) if link.get("rel") == "self"),
                input_feature["id"],
            )

        legacy_product = staging_task.with_options(
            retries=retry_config.staging_retries,
            retry_delay_seconds=retry_config.staging_retry_delay,
        ).submit(
            flow_env.serialize(),
            stac_input=conversion_input.stac_input,
            catalog_collection_identifier=staging_collection,
            asset_names=conversion_input.selected_assets or {"product"},
            poll_interval=10,
        )
        staging_results = legacy_product.result()  # type: ignore[unused-coroutine]

        for job_name, job_result in staging_results.items():
            if job_result.get("status") != "successful":
                raise RuntimeError(
                    f"Staging job {job_name!r} failed with status {job_result.get('status')!r}: "
                    f"{job_result.get('message')}",
                )
        catalog_client: CatalogClient = flow_env.rs_client.get_catalog_client()
        catalog_items = ItemCollection(
            catalog_client.get_items(
                collection_id=staging_collection,
                items_ids=[stac_item_id],
            ),
        )
        logger.info(f"Retrieved catalog items after staging: {catalog_items.to_dict()}")

        # Start from the staged catalog item; if it contains an archived SAFE asset,
        # step 2 will replace this with the uncompressed item.
        safe_item = catalog_items.items[0]

        # 2. Prepare assets for conversion (e.g. unzip if needed)
        try:
            for idx in get_archived_item_indexes(catalog_items):
                safe_zipped_item = catalog_items.items[idx]
                logger.info(f"Processing item {safe_zipped_item.id} for asset extraction...")
                safe_unzipped_item = asset_unzip_decompress_task.submit(safe_zipped_item, True)
                safe_item = safe_unzipped_item.result()  # type: ignore[assignment]
                safe_item.assets.pop("product", None)
                catalog_client.update_item(safe_item)
        except Exception as err:
            raise RuntimeError(
                "Error while trying to update the item collection with the uncompressed/unzipped items. "
                "This error is likely due to a failure in the asset_unzip_decompress_task. "
                "Check previous logs for more details.",
            ) from err
        logger.info(f"Asset preparation completed, proceeding with conversion... {safe_item.to_dict()}")
        logger.info("Staging task completed, proceeding with conversion...")

        # 3. compute the output product type from the product type mapping

        legacy_product_type = safe_item.properties["product:type"]  # ex: IW_SLC__1S
        logger.info(f"Legacy SAFE product type used for mapping: {legacy_product_type}")
        mapping = find_product_type(legacy_product_type)
        output_product_type = mapping["productType"]  # ex: S01SIWSLC
        if not output_product_type:
            raise RuntimeError(f"No product type mapping found for legacy product type {legacy_product_type!r}")
        logger.info(f"Resolved SAFE conversion output product type: {output_product_type}")

        # 4. compute the output bucket from the provided generated_product_to_collection_identifier mapping

        # Match the computed output product type with the flow input mapping.
        # This gives us the output collection requested by the caller.
        generated_product = resolve_generated_product(
            output_product_type,
            [conversion_input.generated_product_to_collection_identifier],
        )

        output_collection = generated_product.collection_name or generated_product.product_type

        # Read the owner/collection/product-type to S3 bucket rules from OSAM.
        bucket_configuration = fetch_csv_from_endpoint(os.environ["RSPY_HOST_OSAM"] + "/internal/configuration")

        # Resolve the final S3 bucket using the same rules as generic DPR processing.
        output_bucket = find_s3_output_bucket(
            bucket_configuration,
            conversion_input.owner_id,
            output_collection,
            output_product_type,
        )
        logger.info(f"Computed SAFE conversion output bucket: {output_bucket}")

        # DPR receives the output directory; EOPF appends the generated product name under it.
        output_zarr_dir_path = os.path.join(
            "s3://",
            output_bucket,
            conversion_input.owner_id,
            output_collection,
        )

        # 5. convert to zarr
        # dpr_client.run_conv_safe_zarr(payload, cluster_info_eopf)
        # The staged catalog item must expose the SAFE product asset expected by the conversion step.
        input_asset = safe_item.assets.get("product") or next(iter(safe_item.assets.values()), None)
        if input_asset is None:
            raise RuntimeError(f"No SAFE asset found for item {safe_item.id!r}")

        href = input_asset.href.rstrip("/")
        if ".SAFE/" in href:
            input_safe_path = href.split(".SAFE/", 1)[0] + ".SAFE"
        else:
            marker = f"/{safe_item.id}/"
            if marker not in href:
                raise RuntimeError(
                    f"Cannot derive SAFE root path from asset href {href!r} and item id {safe_item.id!r}",
                )
            input_safe_path = href.split(marker, 1)[0] + f"/{safe_item.id}"
        logger.info(f"Using input SAFE path for conversion: {input_safe_path}")

        # Temporary local workaround: use the original input SAFE location until staging keeps file:local_path.
        # input_safe_path = original_input_safe_path
        # logger.info(f"Using original input SAFE path for conversion: {input_safe_path}")

        payload = {
            "input_safe_path": input_safe_path,
            "output_zarr_dir_path": output_zarr_dir_path,
        }

        # Create cluster info from JUPYTERHUB_API_TOKEN env var (only in cluster mode, read from the
        # prefect blocks) and Dask cluster label.
        cluster_info = ClusterInfo(
            jupyter_token=os.environ["JUPYTERHUB_API_TOKEN"] if prefect_utils.CLUSTER_MODE else "",
            cluster_label=conversion_input.dask_cluster_label,
            cluster_instance=conversion_input.dask_cluster_instance or "",
        )

        conversion = safe_conversion_task.submit(
            flow_env.serialize(),
            payload,
            cluster_info,
        )
        conversion_result: dict[str, Any] = conversion.result()  # type: ignore[assignment]

        # 6. Read .zattrs to get stac item
        converted_zarr_uri = conversion_result["zarr_uri"]
        converted_item = read_zarr_stac_item(converted_zarr_uri)
        logger.info(f"Staged SAFE item geometry: {safe_item.geometry}")
        logger.info(f"Staged SAFE item bbox: {safe_item.bbox}")
        converted_item.geometry = safe_item.geometry
        converted_item.bbox = safe_item.bbox
        converted_item.properties["product:type"] = output_product_type

        # Keep a reference to the staged SAFE item used as conversion input.
        staged_item = catalog_items.to_dict()["features"][0]
        derived_from_href = next(
            (link["href"] for link in staged_item.get("links", []) if link.get("rel") == "self"),
            original_stac_href or staged_item["id"],
        )
        converted_item.add_link(Link(rel="derived_from", target=derived_from_href, media_type="application/geo+json"))
        logger.info(f"Created STAC item from converted Zarr metadata: {converted_item.to_dict()}")

        # 7. upload to S3
        # 8. post / put to catalog
        processed_item = DprProcessedItemMetadata(
            output_product_id=generated_product.name,
            product_type=output_product_type,
            stac_item=converted_item,
        )
        published = catalog_flow.publish.submit(
            flow_env.serialize(),
            [conversion_input.generated_product_to_collection_identifier],
            [processed_item],
        )
        published.result()  # type: ignore[unused-coroutine]

        # 9. cleanup (legacy files, staging area)
        cleanup = cleanup_staged_safe_item_task.submit(flow_env.serialize(), output_collection, safe_item.id)
        cleanup.result()  # type: ignore[unused-coroutine]

        logger.info("On-demand conversion flow completed successfully.")

read_zarr_stac_item(zarr_uri)

Read the Zarr .zattrs file and build the STAC item generated by EOPF.

Source code in docs/rs-client-libraries/rs_workflows/on_demand_conversion.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def read_zarr_stac_item(zarr_uri: str) -> Item:
    """Read the Zarr .zattrs file and build the STAC item generated by EOPF."""
    zarr_uri = zarr_uri.rstrip("/")
    # EOPF writes the discovery metadata used for catalog publication in the
    # root .zattrs file of the generated Zarr product.
    zattrs_uri = f"{zarr_uri}/.zattrs"
    storage_options = {}
    if urlparse(zattrs_uri).scheme == "s3":
        # The conversion output is stored in S3, so fsspec needs the same local
        # object-storage credentials used by the workflow services.
        storage_options = {
            "key": os.environ["S3_ACCESSKEY"],
            "secret": os.environ["S3_SECRETKEY"],
            "client_kwargs": {
                "endpoint_url": os.environ["S3_ENDPOINT"],
                "region_name": os.environ["S3_REGION"],
            },
        }

    with fsspec.open(zattrs_uri, "r", encoding="utf-8", **storage_options) as file:
        zattrs = json.load(file)

    # The STAC item is built from the EOPF discovery payload embedded in the Zarr metadata.
    stac_discovery = zattrs.get("stac_discovery")
    if not isinstance(stac_discovery, dict) or "properties" not in stac_discovery:
        raise RuntimeError(f"Missing 'stac_discovery' metadata in {zattrs_uri}")

    # Reuse the existing DPR STAC builder so the SAFE conversion output follows
    # the same catalog item shape as the other DPR products.
    item_id = os.path.basename(zarr_uri.removesuffix(".zarr"))
    return create_stac_item(
        eopf_origin_datetime=None,
        eopf_feature=stac_discovery,
        s3_data_location=zattrs_uri,
        product_name=item_id,
        dpr_processor="safe_to_zarr",
    )

resolve_generated_product(output_product_type, generated_products)

Find the generated product mapping for the computed output product type.

Source code in docs/rs-client-libraries/rs_workflows/on_demand_conversion.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def resolve_generated_product(
    output_product_type: str,
    generated_products: list[FlowGeneratedProduct],
) -> FlowGeneratedProduct:
    """Find the generated product mapping for the computed output product type."""
    fallback_generated_product = None

    for generated_product in generated_products:
        # SAFE conversion uses the computed EOPF product type as output product id/name.
        if generated_product.name != output_product_type:
            continue

        # Prefer the strict mapping: exact generated product name and exact product type.
        if generated_product.product_type == output_product_type:
            return generated_product

        # Keep the same fallback convention as catalog publishing: exact name with wildcard type.
        if generated_product.product_type == "*" and fallback_generated_product is None:
            fallback_generated_product = generated_product

    if fallback_generated_product:
        if not fallback_generated_product.collection_name:
            raise ValueError(
                f"collection_name is mandatory when product_type is '*' for {output_product_type!r}",
            )
        return fallback_generated_product

    raise ValueError(
        f"No generated product mapping found for output product type {output_product_type!r}: " f"{generated_products}",
    )

safe_conversion_task(env, payload, cluster_info) async

Submit and monitor the SAFE-to-Zarr DPR conversion job.

Source code in docs/rs-client-libraries/rs_workflows/on_demand_conversion.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@task(name="SAFE conversion")
async def safe_conversion_task(
    env: FlowEnvArgs,
    payload: dict,
    cluster_info: ClusterInfo,
) -> dict[str, Any]:
    """Submit and monitor the SAFE-to-Zarr DPR conversion job."""
    logger = get_run_logger()
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "safe-conversion"):
        # Use the DPR service client to submit and monitor the SAFE-to-Zarr conversion job.
        dpr_client: DprClient = flow_env.rs_client.get_dpr_client()
        logger.info(f"Triggering SAFE conversion with payload: {payload}")
        job_status = dpr_client.run_conv_safe_zarr(payload, cluster_info)
        conversion_result = dpr_client.wait_for_job(job_status, logger, "SAFE conversion")
        logger.info(f"SAFE conversion completed with result: {conversion_result}")
        # The generic DPR client annotation is list[dict], but conv_safe_zarr returns one result dictionary.
        return cast(dict[str, Any], conversion_result)