Skip to content

rs_workflows/utils/dpr.md

<< Back to index

Helper task to interact with the DPR as a service.

call_dpr_flow(env, input_products, external_variables, dask_cluster_label, processor_name, processor_version, pipeline, unit, priority, processing_mode, workflow, generated_product_to_collection_identifier, auxiliary_product_to_collection_identifier, logging_level=LoggingLevel.INFO, dask_task_timeout=None, temporary_folder=None, temporary_shared=False) async

Call any DPR processing flow with a set of default parameters. In case an optional parameter is not set, its value is get from Prefect Variable named 'prefect_settings' The payload is stored on a S3 bucket.

Source code in docs/rs-client-libraries/rs_workflows/utils/dpr.py
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
async def call_dpr_flow(
    env: FlowEnvArgs,
    input_products: list[FlowInputProduct],
    external_variables: dict[str, Any],
    dask_cluster_label: str,
    processor_name: str,
    processor_version: str,
    pipeline: DprPipeline | str | None,
    unit: str | None,
    priority: Priority | None,
    processing_mode: list[ProcessingMode],
    workflow: WorkflowType | None,
    generated_product_to_collection_identifier: list[FlowGeneratedProduct],
    auxiliary_product_to_collection_identifier: list[AuxiliaryProductMapping],
    logging_level: LoggingLevel = LoggingLevel.INFO,
    dask_task_timeout: int | None = None,
    temporary_folder: str | None = None,
    temporary_shared: bool = False,
) -> None:
    """
    Call any DPR processing flow with a set of default parameters.
    In case an optional parameter is not set, its value is get from Prefect Variable named 'prefect_settings'
    The payload is stored on a S3 bucket.
    """
    s3_payload: str = generate_payload_path(env.owner_id)

    a_process: DprProcessIn = DprProcessIn(
        env=env,
        processor_name=DprProcessor(processor_name),
        processor_version=processor_version,
        dask_cluster_label=dask_cluster_label,
        s3_payload_file=f"{s3_payload}/payload_{processor_name}.yaml",
        pipeline=(
            DprPipeline(pipeline) if pipeline in DprPipeline._value2member_map_ else pipeline  # pylint: disable=W0212
        ),
        unit=unit,
        priority=Priority(priority),
        workflow_type=WorkflowType(workflow),
        input_products=input_products,
        generated_product_to_collection_identifier=generated_product_to_collection_identifier,
        auxiliary_product_to_collection_identifier=auxiliary_product_to_collection_identifier,
        logging_level=logging_level,
        dask_task_timeout=dask_task_timeout,
        temporary_folder=temporary_folder,
        temporary_shared=temporary_shared,
        processing_mode=processing_mode,
        **external_variables,
    )

    print(a_process.model_dump_json(indent=2))
    await dpr_processing_task(a_process)

dpr_processing_task(*args, **kwargs) async

See: dpr_processing

Source code in docs/rs-client-libraries/rs_workflows/utils/dpr.py
104
105
106
107
@task(name="dpr processing")
async def dpr_processing_task(*args, **kwargs) -> tuple[bool, ItemCollection | None]:
    """See: dpr_processing"""
    return await dpr_processing.fn(*args, **kwargs)

generate_payload_path(owner_id)

Generate an hard coded path to store the payload. This is a workaroud, waiting for share disk solution.

Source code in docs/rs-client-libraries/rs_workflows/utils/dpr.py
41
42
43
44
45
46
47
48
def generate_payload_path(owner_id: str) -> str:
    """
    Generate an hard coded path to store the payload.
    This is a workaroud, waiting for share disk solution.
    """
    # TODO : use a local path on the share disk
    s3_payload = f"s3://prip-rs-playground/{owner_id}/{time.strftime('%Y-%m-%d--%H-%M-%S')}"
    return s3_payload