Skip to content

rs_workflows/dpr_flow.md

<< Back to index

DPR flow implementation

run_processor(env, processor, cluster_info, s3_payload_run) async

Run the DPR processor.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment

required
processor DprProcessor

DPR processor name

required
s3_payload_run str

S3 bucket location of the output final DPR payload file.

required
Source code in docs/rs-client-libraries/rs_workflows/dpr_flow.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@task(name="Run DPR processor")
async def run_processor(
    env: FlowEnvArgs,
    processor: DprProcessor,
    # payload: dict,
    cluster_info: ClusterInfo,
    s3_payload_run: str,
) -> list[dict]:
    """
    Run the DPR processor.

    Args:
        env: Prefect flow environment
        processor: DPR processor name
        s3_payload_run: S3 bucket location of the output final DPR payload file.
    """
    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "run-processor"):
        # record_performance_indicators(
        #     start_date=datetime.datetime.now(),
        #     status="OK",
        #     dpr_processing_input_stac_items=s3_payload_run,
        #     payload=payload,
        #     dpr_processor_name=processor.value,
        # )
        # Trigger the processor run from the dpr service
        dpr_client: DprClient = flow_env.rs_client.get_dpr_client()
        job_status = dpr_client.run_process(
            process=processor,
            cluster_info=cluster_info,
            s3_config_dir=osp.dirname(s3_payload_run),
            payload_subpath=osp.basename(s3_payload_run),
            s3_report_dir=osp.join(osp.dirname(s3_payload_run)),
        )
        dpr_job = dpr_client.wait_for_job(job_status, logger, f"{processor.value!r} processor")
        logger.info(f"DPR processor output {dpr_job}")
        # Wait for the job to finish
        # record_performance_indicators(stop_date=datetime.datetime.now(), status="OK", stac_items=dpr_job)
        return dpr_job