158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357 | @flow
async def on_demand_conversion(
conversion_input: ConversionIn,
retry_config: RetryConfig = RetryConfig(), # type: ignore
):
"""Docstring"""
logger = get_run_logger()
logger.info(f"Starting on-demand conversion flow with input: {conversion_input}")
flow_env = FlowEnv(conversion_input.env)
staging_collection = conversion_input.generated_product_to_collection_identifier.collection_name
if staging_collection is None:
raise ValueError("collection_name is required to stage and retrieve the SAFE item")
with flow_env.start_span(__name__, "legacy-conversion"):
# 1. stage
logger.info("Staging task submitted, waiting for completion...")
# stac_input is str | dict; normalize once so all downstream code works uniformly.
stac_item: str | dict[str, Any] = conversion_input.stac_input
stac_item_id = stac_item.split("/")[-1] if isinstance(stac_item, str) else stac_item["features"][0]["id"]
original_stac_href = None
if isinstance(stac_item, str):
original_stac_href = stac_item
else:
input_feature = stac_item["features"][0]
original_stac_href = next(
(link["href"] for link in input_feature.get("links", []) if link.get("rel") == "self"),
input_feature["id"],
)
legacy_product = staging_task.with_options(
retries=retry_config.staging_retries,
retry_delay_seconds=retry_config.staging_retry_delay,
).submit(
flow_env.serialize(),
stac_input=conversion_input.stac_input,
catalog_collection_identifier=staging_collection,
asset_names=conversion_input.selected_assets or {"product"},
poll_interval=10,
)
staging_results = legacy_product.result() # type: ignore[unused-coroutine]
for job_name, job_result in staging_results.items():
if job_result.get("status") != "successful":
raise RuntimeError(
f"Staging job {job_name!r} failed with status {job_result.get('status')!r}: "
f"{job_result.get('message')}",
)
catalog_client: CatalogClient = flow_env.rs_client.get_catalog_client()
catalog_items = ItemCollection(
catalog_client.get_items(
collection_id=staging_collection,
items_ids=[stac_item_id],
),
)
logger.info(f"Retrieved catalog items after staging: {catalog_items.to_dict()}")
# Start from the staged catalog item; if it contains an archived SAFE asset,
# step 2 will replace this with the uncompressed item.
safe_item = catalog_items.items[0]
# 2. Prepare assets for conversion (e.g. unzip if needed)
try:
for idx in get_archived_item_indexes(catalog_items):
safe_zipped_item = catalog_items.items[idx]
logger.info(f"Processing item {safe_zipped_item.id} for asset extraction...")
safe_unzipped_item = asset_unzip_decompress_task.submit(safe_zipped_item, True)
safe_item = safe_unzipped_item.result() # type: ignore[assignment]
safe_item.assets.pop("product", None)
catalog_client.update_item(safe_item)
except Exception as err:
raise RuntimeError(
"Error while trying to update the item collection with the uncompressed/unzipped items. "
"This error is likely due to a failure in the asset_unzip_decompress_task. "
"Check previous logs for more details.",
) from err
logger.info(f"Asset preparation completed, proceeding with conversion... {safe_item.to_dict()}")
logger.info("Staging task completed, proceeding with conversion...")
# 3. compute the output product type from the product type mapping
legacy_product_type = safe_item.properties["product:type"] # ex: IW_SLC__1S
logger.info(f"Legacy SAFE product type used for mapping: {legacy_product_type}")
mapping = find_product_type(legacy_product_type)
output_product_type = mapping["productType"] # ex: S01SIWSLC
if not output_product_type:
raise RuntimeError(f"No product type mapping found for legacy product type {legacy_product_type!r}")
logger.info(f"Resolved SAFE conversion output product type: {output_product_type}")
# 4. compute the output bucket from the provided generated_product_to_collection_identifier mapping
# Match the computed output product type with the flow input mapping.
# This gives us the output collection requested by the caller.
generated_product = resolve_generated_product(
output_product_type,
[conversion_input.generated_product_to_collection_identifier],
)
output_collection = generated_product.collection_name or generated_product.product_type
# Read the owner/collection/product-type to S3 bucket rules from OSAM.
bucket_configuration = fetch_csv_from_endpoint(os.environ["RSPY_HOST_OSAM"] + "/internal/configuration")
# Resolve the final S3 bucket using the same rules as generic DPR processing.
output_bucket = find_s3_output_bucket(
bucket_configuration,
conversion_input.owner_id,
output_collection,
output_product_type,
)
logger.info(f"Computed SAFE conversion output bucket: {output_bucket}")
# DPR receives the output directory; EOPF appends the generated product name under it.
output_zarr_dir_path = os.path.join(
"s3://",
output_bucket,
conversion_input.owner_id,
output_collection,
)
# 5. convert to zarr
# dpr_client.run_conv_safe_zarr(payload, cluster_info_eopf)
# The staged catalog item must expose the SAFE product asset expected by the conversion step.
input_asset = safe_item.assets.get("product") or next(iter(safe_item.assets.values()), None)
if input_asset is None:
raise RuntimeError(f"No SAFE asset found for item {safe_item.id!r}")
href = input_asset.href.rstrip("/")
if ".SAFE/" in href:
input_safe_path = href.split(".SAFE/", 1)[0] + ".SAFE"
else:
marker = f"/{safe_item.id}/"
if marker not in href:
raise RuntimeError(
f"Cannot derive SAFE root path from asset href {href!r} and item id {safe_item.id!r}",
)
input_safe_path = href.split(marker, 1)[0] + f"/{safe_item.id}"
logger.info(f"Using input SAFE path for conversion: {input_safe_path}")
# Temporary local workaround: use the original input SAFE location until staging keeps file:local_path.
# input_safe_path = original_input_safe_path
# logger.info(f"Using original input SAFE path for conversion: {input_safe_path}")
payload = {
"input_safe_path": input_safe_path,
"output_zarr_dir_path": output_zarr_dir_path,
}
# Create cluster info from JUPYTERHUB_API_TOKEN env var (only in cluster mode, read from the
# prefect blocks) and Dask cluster label.
cluster_info = ClusterInfo(
jupyter_token=os.environ["JUPYTERHUB_API_TOKEN"] if prefect_utils.CLUSTER_MODE else "",
cluster_label=conversion_input.dask_cluster_label,
cluster_instance=conversion_input.dask_cluster_instance or "",
)
conversion = safe_conversion_task.submit(
flow_env.serialize(),
payload,
cluster_info,
)
conversion_result: dict[str, Any] = conversion.result() # type: ignore[assignment]
# 6. Read .zattrs to get stac item
converted_zarr_uri = conversion_result["zarr_uri"]
converted_item = read_zarr_stac_item(converted_zarr_uri)
logger.info(f"Staged SAFE item geometry: {safe_item.geometry}")
logger.info(f"Staged SAFE item bbox: {safe_item.bbox}")
converted_item.geometry = safe_item.geometry
converted_item.bbox = safe_item.bbox
converted_item.properties["product:type"] = output_product_type
# Keep a reference to the staged SAFE item used as conversion input.
staged_item = catalog_items.to_dict()["features"][0]
derived_from_href = next(
(link["href"] for link in staged_item.get("links", []) if link.get("rel") == "self"),
original_stac_href or staged_item["id"],
)
converted_item.add_link(Link(rel="derived_from", target=derived_from_href, media_type="application/geo+json"))
logger.info(f"Created STAC item from converted Zarr metadata: {converted_item.to_dict()}")
# 7. upload to S3
# 8. post / put to catalog
processed_item = DprProcessedItemMetadata(
output_product_id=generated_product.name,
product_type=output_product_type,
stac_item=converted_item,
)
published = catalog_flow.publish.submit(
flow_env.serialize(),
[conversion_input.generated_product_to_collection_identifier],
[processed_item],
)
published.result() # type: ignore[unused-coroutine]
# 9. cleanup (legacy files, staging area)
cleanup = cleanup_staged_safe_item_task.submit(flow_env.serialize(), output_collection, safe_item.id)
cleanup.result() # type: ignore[unused-coroutine]
logger.info("On-demand conversion flow completed successfully.")
|