Skip to content

rs_workflows/utils/dpr.md

<< Back to index

Helper task to interact with the DPR as a service.

call_dpr_flow(env, input_products, start_datetime, end_datetime, satellite_identifier, dask_cluster_label, processor_name, processor_version, pipeline, unit, priority, processing_mode, workflow, generated_product_to_collection_identifier, auxiliary_product_to_collection_identifier) async

Call any DPR processing flow with a set of default parameters. In case an optional parameter is not set, its value is get from Prefect Variable named 'prefect_settings' The payload is stored on a S3 bucket.

Source code in docs/rs-client-libraries/rs_workflows/utils/dpr.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
async def call_dpr_flow(
    env: FlowEnvArgs,
    input_products: list[FlowInputProduct],
    start_datetime: datetime,
    end_datetime: datetime,
    satellite_identifier: str,
    dask_cluster_label: str,
    processor_name: str,
    processor_version: str,
    pipeline: DprPipeline | None,
    unit: str,
    priority: Priority | None,
    processing_mode: list[ProcessingMode],
    workflow: WorkflowType | None,
    generated_product_to_collection_identifier: list[FlowGeneratedProduct],
    auxiliary_product_to_collection_identifier: list[AuxiliaryProductMapping],
) -> None:
    """
    Call any DPR processing flow with a set of default parameters.
    In case an optional parameter is not set, its value is get from Prefect Variable named 'prefect_settings'
    The payload is stored on a S3 bucket.
    """
    s3_payload: str = generate_payload_path(env.owner_id)

    a_process: DprProcessIn = DprProcessIn(
        env=env,
        processor_name=DprProcessor(processor_name),
        processor_version=processor_version,
        dask_cluster_label=dask_cluster_label,
        s3_payload_file=f"{s3_payload}/payload_{processor_name}.yaml",
        pipeline=DprPipeline(pipeline),
        unit=unit,
        priority=Priority(priority),
        workflow_type=WorkflowType(workflow),
        input_products=input_products,
        generated_product_to_collection_identifier=generated_product_to_collection_identifier,
        auxiliary_product_to_collection_identifier=auxiliary_product_to_collection_identifier,
        processing_mode=processing_mode,
        start_datetime=start_datetime,
        end_datetime=end_datetime,
        satellite=satellite_identifier,
    )

    print(a_process.model_dump_json(indent=2))
    await dpr_processing_task(a_process)

dpr_processing_task(*args, **kwargs) async

See: dpr_processing

Source code in docs/rs-client-libraries/rs_workflows/utils/dpr.py
 97
 98
 99
100
@task(name="dpr processing")
async def dpr_processing_task(*args, **kwargs) -> tuple[bool, ItemCollection | None]:
    """See: dpr_processing"""
    return await dpr_processing.fn(*args, **kwargs)

generate_payload_path(owner_id)

Generate an hard coded path to store the payload. This is a workaroud, waiting for share disk solution.

Source code in docs/rs-client-libraries/rs_workflows/utils/dpr.py
40
41
42
43
44
45
46
47
def generate_payload_path(owner_id: str) -> str:
    """
    Generate an hard coded path to store the payload.
    This is a workaroud, waiting for share disk solution.
    """
    # TODO : use a local path on the share disk
    s3_payload = f"s3://prip-rs-playground/{owner_id}/{time.strftime('%Y-%m-%d--%H-%M-%S')}"
    return s3_payload