Skip to content

rs_workflows/cadip_flow.md

<< Back to index

Cadip flow implementation

on_demand_cadip_staging(env, cadip_collection_identifier, session_identifier, catalog_collection_identifier, staging_retries=3, staging_retry_delay=60) async

Flow to retrieve a session, stage it and add the STAC item into the catalog.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment

required
cadip_collection_identifier str

CADIP collection identifier that contains the mission and station (e.g. s1_ins for Sentinel-1 sessions from the Inuvik station)

required
session_identifier str

Session identifier

required
catalog_collection_identifier str

Catalog collection identifier where CADIP sessions and AUX data are staged

required
Source code in docs/rs-client-libraries/rs_workflows/cadip_flow.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@flow(name="On-demand Cadip staging")
async def on_demand_cadip_staging(
    env: FlowEnvArgs,
    cadip_collection_identifier: str,
    session_identifier: str,
    catalog_collection_identifier: str,
    staging_retries: int = 3,
    staging_retry_delay: int = 60,
):
    """
    Flow to retrieve a session, stage it and add the STAC item into the catalog.

    Args:
        env: Prefect flow environment
        cadip_collection_identifier: CADIP collection identifier that contains the mission and station
            (e.g. s1_ins for Sentinel-1 sessions from the Inuvik station)
        session_identifier: Session identifier
        catalog_collection_identifier: Catalog collection identifier where CADIP sessions and AUX data are staged
    """

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "on-demand-cadip-staging"):

        # Search Cadip sessions
        cadip_items = search_task.with_options(
            retries=3,
            retry_delay_seconds=60,
        ).submit(
            flow_env.serialize(),
            cadip_collection_identifier,
            session_identifier,
            error_if_empty=True,
        )

        # Stage Cadip items.
        staged = staging_task.with_options(
            retries=staging_retries,
            retry_delay_seconds=staging_retry_delay,
        ).submit(
            flow_env.serialize(),
            cadip_items,
            catalog_collection_identifier,
        )

        # Wait for last task to end.
        # NOTE: use .result() and not .wait() to unwrap and propagate exceptions, if any.
        staged.result()  # type: ignore[unused-coroutine]

search(env, cadip_collection_identifier, session_identifier, error_if_empty=False) async

Search Cadip sessions.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment (at least the owner_id is required)

required
cadip_collection_identifier str

CADIP collection identifier (to know the station)

required
session_identifier str

Session identifier

required
error_if_empty bool

Raise a ValueError if the results are empty.

False
Source code in docs/rs-client-libraries/rs_workflows/cadip_flow.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@flow(name="Cadip search")
async def search(
    env: FlowEnvArgs,
    cadip_collection_identifier: str,
    session_identifier: str,
    error_if_empty: bool = False,
) -> ItemCollection:
    """
    Search Cadip sessions.

    Args:
        env: Prefect flow environment (at least the owner_id is required)
        cadip_collection_identifier: CADIP collection identifier (to know the station)
        session_identifier: Session identifier
        error_if_empty: Raise a ValueError if the results are empty.
    """
    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "cadip-search"):

        logger.info("Start Cadip search")
        cadip_client: CadipClient = flow_env.rs_client.get_cadip_client()
        found = cadip_client.search(
            method="GET",
            ids=[session_identifier],
            collections=[cadip_collection_identifier],
        )
        if (not found) and error_if_empty:
            raise ValueError(
                f"No Cadip session found for id={session_identifier!r} collection={cadip_collection_identifier!r}",
            )
        logger.info(f"Cadip search found {len(found)} results: {found}")
        return found

search_task(*args, **kwargs) async

See: search

Source code in docs/rs-client-libraries/rs_workflows/cadip_flow.py
117
118
119
120
@task(name="Cadip search")
async def search_task(*args, **kwargs) -> ItemCollection | None:
    """See: search"""
    return await search.fn(*args, **kwargs)