Skip to content

rs_workflows/on_demand/common/l0.md

<< Back to index

common Level-0 processing.

process_l0(session, flow_params=None, verbose=False) async

This is the generic l0 processing flow. It performs common L0 task like retrieving session from catalog an staging it from cadip if needed. It call process_s1l0, process_s2l0 or process_s3l0

Only session parameter is mandatory. All other parameters get their default values from Prefect variable but can be overriden on demand.

Source code in docs/rs-client-libraries/rs_workflows/on_demand/common/l0.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@flow(name="process level-0")
async def process_l0(
    session: str,
    flow_params: Level0FlowParams | None = None,
    verbose: bool = False,
) -> None:
    """
    This is the generic l0 processing flow.
    It performs common L0 task like retrieving session from catalog an staging it from cadip if needed.
    It call process_s1l0, process_s2l0 or process_s3l0

    Only session parameter is mandatory.
    All other parameters get their default values from Prefect variable but can be overriden on demand.

    """
    logger = get_run_logger()
    logger.info(f"Mode verbose is set to {verbose}")

    # Check session name format
    pattern = re.compile(r"^S[123]._")
    if not pattern.match(session):
        logger.error("❌ Bad Sentinel-1,2,3 session name.")
        raise ValueError(f"Invalid session name: '{session}'")

    # We detect the mission
    mission: str = session[1]
    logger.info(f"✔️ Sentinel-{mission} session name is correct.")

    # Override of some parameters with default configuration
    if flow_params is None:
        flow_params = Level0FlowParams()
    p: Level0FlowParams = await flow_params.resolve(mission, level="0")

    flow_env = FlowEnv(FlowEnvArgs(owner_id=p.owner_identifier))
    with flow_env.start_span(__name__, "level0-processing"):
        found = False

        # Check that the chosen dask_cluster_label is deployed
        if await is_dask_cluster_running(p.dask_cluster_label) is False:
            raise ValueError(f"❌ '{p.dask_cluster_label}' is unknown or not ready.")

        # Try to retrieve the session on the collection
        item_session: Item | None = await get_single_catalog_item(flow_env, session, [p.session_collection])

        # If the session is not on the rs-catalog, we will try to stage it
        if item_session:
            found = True
            evicted, eviction_date = is_evicted(item_session)
            if evicted:
                logger.error(f"❌ The session '{session}' has been evicted (eviction date = {eviction_date}) ")
                raise ValueError(f"'{session}' has been evicted")
            if is_published(item_session) is False:
                logger.error(f"❌ The session '{session}' has not been published yet")
                raise ValueError(f"'{session}' has not been publised")
        else:
            logger.info(f"Try to stage session  {session} from {mission} stations :{p.cadip_collections}")
            station = await get_cadip_station(
                flow_env,
                session,
                p.cadip_collections,
            )
            if station is not None:
                found = await stage_session_common(flow_env, station, session)

        # The session is stagged at this step.
        # We can call the flow
        logger.info(f"We start Sentinel-{mission} processing.")
        if found:
            match int(mission):
                case 1:
                    await process_s1l0_task(session=session, flow_params=p, verbose=verbose)
                case 3:
                    await process_s3l0_task(session=session, flow_params=p, verbose=verbose)