Skip to content

rs_workflows/on_demand/common/l0_last_steps.md

<< Back to index

common Level-0 processing.

process_l0_last_steps(mission, session, flow_params, input_products, verbose) async

Final processing steps that are common to all missions. Raises: ValueError: description

Source code in docs/rs-client-libraries/rs_workflows/on_demand/common/l0_last_steps.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
async def process_l0_last_steps(
    mission: str,
    session: str,
    flow_params: Level0FlowParams,
    input_products: list[FlowInputProduct],
    verbose: bool,
):
    """
    Final processing steps that are common to all missions.
    Raises:
        ValueError: _description_
    """
    logger = get_run_logger()
    logger.info(f"Mode verbose is set to {verbose}")

    # Resolve parameters
    flow_params = flow_params or Level0FlowParams()
    p = await flow_params.resolve(mission=mission, level="0")

    flow_env = FlowEnv(FlowEnvArgs(owner_id=p.owner_identifier))

    with flow_env.start_span(__name__, f"sentinel{mission}-level0-processing"):
        item_session: Item | None = await get_single_catalog_item(flow_env, session, [p.session_collection])

        if not item_session:
            logger.error("❌ The processing cannot be launched.")
            return

        # Satellite identifier
        satellite_identifier = f"sentinel-{mission}{session[:3].lower()}"

        # Published date
        published = item_session.properties.get("published")
        if not isinstance(published, str):
            raise ValueError("Missing or invalid 'published' property in item_session")

        end_datetime = datetime.fromisoformat(published)
        start_datetime = end_datetime

        # Generated products
        generated_product = (
            p.generated_product_to_collection_identifier
            if p.generated_product_to_collection_identifier is not None
            else []
        )

        # Auxiliary products
        aux_product = (
            p.auxiliary_product_to_collection_identifier
            if p.auxiliary_product_to_collection_identifier is not None
            else []
        )

        # Call DPR flow
        await call_dpr_flow(
            FlowEnvArgs(owner_id=p.owner_identifier),
            input_products=input_products,
            start_datetime=start_datetime,
            end_datetime=end_datetime,
            satellite_identifier=satellite_identifier,
            dask_cluster_label=p.dask_cluster_label,
            processor_name=p.processor_name,
            processor_version=p.processor_version,
            pipeline=p.pipeline,
            unit=p.unit,
            priority=p.priority,
            processing_mode=p.processing_mode,
            workflow=p.workflow,
            generated_product_to_collection_identifier=generated_product,
            auxiliary_product_to_collection_identifier=aux_product,
        )