Skip to content

rs_workflows/on_demand/common/l0_last_steps.md

<< Back to index

common Level-0 processing.

process_l0_last_steps(mission, session, flow_params, input_products, verbose) async

Final processing steps that are common to all missions. Raises: ValueError: description

Source code in docs/rs-client-libraries/rs_workflows/on_demand/common/l0_last_steps.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
async def process_l0_last_steps(
    mission: str,
    session: str,
    flow_params: Level0FlowParams,
    input_products: list[FlowInputProduct],
    verbose: bool,
):
    """
    Final processing steps that are common to all missions.
    Raises:
        ValueError: _description_
    """
    logger = get_run_logger()
    logger.info(f"Mode verbose is set to {verbose}")

    # Resolve parameters
    flow_params = flow_params or Level0FlowParams()
    p = await flow_params.resolve(mission)

    flow_env = FlowEnv(FlowEnvArgs(owner_id=p.owner_identifier))

    with flow_env.start_span(__name__, f"sentinel{mission}-level0-processing"):
        item_session: Item | None = await get_single_catalog_item(flow_env, session, [p.session_collection])

        if not item_session:
            logger.error("❌ The processing cannot be launched.")
            return

        # Satellite identifier
        satellite_identifier = f"sentinel-{mission}{session[2].lower()}"

        # Published date
        published = item_session.properties.get("published")
        if not isinstance(published, str):
            raise ValueError("Missing or invalid 'published' property in item_session")

        end_datetime = datetime.fromisoformat(published)
        start_datetime = end_datetime

        # Call DPR flow
        await call_dpr_flow(
            FlowEnvArgs(owner_id=p.owner_identifier),
            input_products=input_products,
            external_variables={
                "start_datetime": start_datetime,
                "end_datetime": end_datetime,
                "satellite_identifier": satellite_identifier,
            },
            dask_cluster_label=p.dask_cluster_label,
            processor_name=p.processor_name,
            processor_version=p.processor_version,
            pipeline=p.pipeline,
            unit=p.unit,
            priority=p.priority,
            processing_mode=p.processing_mode,
            workflow=p.workflow,
            generated_product_to_collection_identifier=p.generated_product_to_collection_identifier or [],
            auxiliary_product_to_collection_identifier=p.auxiliary_product_to_collection_identifier or [],
            logging_level=p.logging_level,
        )