Skip to content

rs_workflows/on_demand_processing.md

<< Back to index

Prefect flows and tasks for on-demand processing

build_dask_dashboard_url_message(cluster_instance)

Build the Dask dashboard log message from the configured public gateway endpoint.

Source code in docs/rs-client-libraries/rs_workflows/on_demand_processing.py
53
54
55
56
57
58
59
60
61
def build_dask_dashboard_url_message(cluster_instance: str | None) -> str:
    """Build the Dask dashboard log message from the configured public gateway endpoint."""
    public_base = os.getenv("DASK_GATEWAY_PUBLIC", "")

    if not public_base or not cluster_instance:
        return "Dask cluster dashboard URL is unavailable"

    dashboard_url = f"{public_base.rstrip('/')}/clusters/{cluster_instance}/status"
    return f"Dask cluster dashboard URL: {dashboard_url}"

dpr_processing(dpr_input, retry_config=RetryConfig()) async

Prefect flow for dpr-process.

Parameters:

Name Type Description Default
dpr_input DprProcessIn

Input parameters for executing this flow

required
retry_config RetryConfig

Staging retry config

RetryConfig()
Source code in docs/rs-client-libraries/rs_workflows/on_demand_processing.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
@flow(name="process-generic")
async def dpr_processing(
    dpr_input: DprProcessIn,
    retry_config: RetryConfig = RetryConfig(),  # type: ignore
):
    """
    Prefect flow for dpr-process.

    Args:
        dpr_input: Input parameters for executing this flow
        retry_config: Staging retry config
    """
    logger = get_run_logger()
    logger.info(f"Starting the DPR processing flow with processor: {dpr_input.processor_name}")
    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(dpr_input.env)

    with flow_env.start_span(__name__, "dpr-processing"):

        # Create cluster info from JUPYTERHUB_API_TOKEN env var (only in cluster mode, read from the
        # prefect blocks) and Dask cluster label.
        cluster_info = ClusterInfo(
            jupyter_token=os.environ["JUPYTERHUB_API_TOKEN"] if prefect_utils.CLUSTER_MODE else "",
            cluster_label=dpr_input.dask_cluster_label,
            cluster_instance=dpr_input.dask_cluster_instance or "",
        )

        # read tasktable and construct list of processing units
        task_table: dict[str, Any] = flow_env.rs_client.get_dpr_client().get_process(
            dpr_input.processor_name,
            cluster_info,
        )

        # Persist the full task table as a Prefect artifact for later investigation.
        md = "# Task table\n\n```json\n" + json.dumps(task_table, indent=2) + "\n```"
        artifact_key_name: str = "dpr-task-table"
        await acreate_markdown_artifact(key=artifact_key_name, markdown=md, description="DPR task table")
        logger.info(f"📌 Artifact named '{artifact_key_name}' has been linked to this flow.")
        # Log the public Dask dashboard URL when the flow input provides the cluster instance.
        logger.info(build_dask_dashboard_url_message(cluster_info.cluster_instance))

        processing_mode = list(dpr_input.processing_mode) if dpr_input.processing_mode else None
        unit_list = build_unit_list(
            tasktable=task_table,
            pipeline=dpr_input.pipeline,
            unit=dpr_input.unit,
            processing_mode=processing_mode,
            external_variables={
                "start_datetime": dpr_input.start_datetime,
                "end_datetime": dpr_input.end_datetime,
                "reference_date": dpr_input.reference_date,
                "instrument_mode": dpr_input.instrument_mode,
                "satellite": dpr_input.satellite,
            },
        )

        tasks = []
        for unit in unit_list:
            # For each input_adfs element computed on STEP 1
            for input_adfs in unit["input_adfs"]:
                # For each specific input in case of multiplicity=one_per_input
                specific_input_name, product_stac_items = _resolve_specific_input_product_stac_items(
                    input_adfs,
                    task_table,
                    unit,
                    dpr_input.input_products,
                    flow_env.rs_client,
                )
                for specific_input_product_stac_item in product_stac_items:
                    if specific_input_product_stac_item:
                        logger.info(
                            f"Submitting {input_adfs['name']} ADFS task for input {specific_input_product_stac_item}",
                        )
                    tasks.append(
                        process_input_adfs.submit(
                            input_adfs,
                            dpr_input,
                            task_table,
                            (specific_input_name, specific_input_product_stac_item),
                            retry_config.staging_retries,
                            retry_config.staging_retry_delay,
                        ),
                    )

        try:
            aux_items: list[tuple[str, str, tuple[bool, ItemCollection]]] = [t.result() for t in tasks]
        except (RuntimeError, KeyError) as err:
            raise err
        # Set of ADFS. Each tuple includes the adfs name, type and the s3/https storage path
        source_items: list[Item] = []
        adfs: set[tuple[str, str, str]] = set()
        for name, adf_type, (status, item_collection) in aux_items:
            for item in item_collection.items:
                # list with links to be added in derived_from
                source_items.append(item)

                if status:
                    asset = next(iter(item.assets.values()))
                    logger.info(f"ADFS '{name}' of type '{adf_type}': {asset.href}")
                    adfs.add((name, adf_type, asset.href))
                else:
                    raise ValueError(f"The adf input files {next(iter(item.assets.values()))} was not correctly staged")
        # generate the dpr payload file
        task_future = generate_payload.submit(flow_env, unit_list, list(adfs), dpr_input)
        # get the payload generation result
        generated_payload_res = task_future.result()
        # create the generated payload as a dictionary, as it will be used for
        # the prefect artifact. the SecretStr will be masked here
        generated_payload_res_as_dict = generated_payload_res.dump()
        # create the YAML string first (synchronous). This will be used for writing both the artifact as well
        # as the tmp file
        # md = "# Payload file\n\n```json\n" + json.dumps(generated_payload_res_as_dict, indent=2) + "\n```"
        yaml_str = yaml.dump(generated_payload_res_as_dict, default_flow_style=False, sort_keys=False)
        # Write the payload as prefect artifact
        pretty_markdown = f"```yaml\n{yaml_str}\n```"
        artifact_key_name = "dpr-payload"
        await acreate_markdown_artifact(
            key=artifact_key_name,
            markdown=pretty_markdown,
            description="DPR Payload file",
        )
        logger.info(f"📌 Artifact named '{artifact_key_name}' has been linked to this flow.")

        # re-create the generated payload as a dictionary, as it will be used for
        # the payload file to upload to S3. here, the secrets are revealed
        generated_payload_res_with_secrets = generated_payload_res.dump(reveal_secrets=True)
        yaml_str = yaml.dump(generated_payload_res_with_secrets, default_flow_style=False, sort_keys=False)
        # upload the config payload file to S3
        tmp_dir = std_tempfile.gettempdir()
        tmp_file_path = os.path.join(tmp_dir, f"dpr_payload_{datetime.datetime.now().timestamp()}.yaml")
        async with await anyio.open_file(tmp_file_path, "w", encoding="utf-8") as tmp_file:
            await tmp_file.write(yaml_str)
            # flush to be extra-safe
            await tmp_file.flush()
        logger.debug(f"Writing the payload to file :\n {dpr_input.s3_payload_file}")
        await prefect_utils.s3_upload_file(tmp_file_path, dpr_input.s3_payload_file)

        # clean up the temp payload file
        await anyio.Path(tmp_file_path).unlink()

        # Run the DPR processor
        processed_items = run_processor.submit(
            flow_env.serialize(),
            dpr_input.processor_name,
            generated_payload_res,
            cluster_info,
            dpr_input.s3_payload_file,
            dpr_input.input_products,
            wait_for=[task_future],
        )
        try:
            processed_items.result()
        finally:
            prefect_utils.s3_delete(dpr_input.s3_payload_file)

        # add derived_from link
        processed = processed_items.result()
        logger.debug(f"processed_items: {processed}")

        for processed_item in processed:
            processed_item.stac_item.add_derived_from(*source_items)

        # Publish processed items to the catalog
        published = catalog_flow.publish.submit(
            flow_env.serialize(),
            dpr_input.generated_product_to_collection_identifier,
            processed_items,
        )

        # Wait for last task to end.
        # NOTE: use .result() and not .wait() to unwrap and propagate exceptions, if any.
        published.result()  # type: ignore[unused-coroutine]

        return

process_input_adfs(input_adfs, dpr_input, task_table, specific_input_product=(None, None), staging_retries=3, staging_retry_delay=60) async

Stage the ADFS inputs described in the task table for one processing unit input.

The task iterates through the ordered alternatives defined for a given ADFS input and stops at the first alternative that produces staged items.

For each alternative, the task: - builds the final AUX CQL2 request from the task table definition - resolves the target AUX collection identifier - runs AUX staging with retries - normalizes archived outputs when the staged assets still point to compressed archives - updates the catalog entries after normalization so downstream payload generation sees the final asset hrefs

Returns:

Type Description
tuple[str, str, tuple[bool, ItemCollection]]

tuple[str, str, tuple[bool, ItemCollection]]: The input ADFS name and type together with the original staging status/item collection tuple shape expected by downstream code.

Raises:

Type Description
RuntimeError

If no alternative returns staged data, or if the task table content cannot be read as expected.

Source code in docs/rs-client-libraries/rs_workflows/on_demand_processing.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
@task(name="Process input ADFS")
async def process_input_adfs(
    input_adfs: dict[str, Any],
    dpr_input: DprProcessIn,
    task_table: dict[str, Any],
    specific_input_product: tuple[str | None, Item | None] = (None, None),
    staging_retries: int = 3,
    staging_retry_delay: int = 60,
) -> tuple[str, str, tuple[bool, ItemCollection]]:
    """
    Stage the ADFS inputs described in the task table for one processing unit input.

    The task iterates through the ordered alternatives defined for a given ADFS
    input and stops at the first alternative that produces staged items.

    For each alternative, the task:
    - builds the final AUX CQL2 request from the task table definition
    - resolves the target AUX collection identifier
    - runs AUX staging with retries
    - normalizes archived outputs when the staged assets still point to
      compressed archives
    - updates the catalog entries after normalization so downstream payload
      generation sees the final asset hrefs

    Returns:
        tuple[str, str, tuple[bool, ItemCollection]]:
            The input ADFS name and type together with the original staging status/item
            collection tuple shape expected by downstream code.

    Raises:
        RuntimeError:
            If no alternative returns staged data, or if the task table content
            cannot be read as expected.
    """
    logger = get_run_logger()
    logger.info(f"🚧 Starting processing input ADFS for {input_adfs}")
    try:
        # For each "alternative" ( get it following the "order" )
        for alternative in input_adfs.get("alternatives", []):
            result = await _stage_input_adfs_alternative(
                alternative,
                input_adfs,
                dpr_input,
                task_table,
                specific_input_product,
                staging_retries,
                staging_retry_delay,
            )
            if result is not None:
                return result

        raise RuntimeError(f"Searching for adfs input {input_adfs['name']} did not return any result")

    except KeyError as kerr:
        raise RuntimeError(
            f"Unable to read / process tasktable and build cql2-json for: {json.dumps(input_adfs)}",
        ) from kerr