Skip to content

rs_workflows/cadip_flow.md

<< Back to index

Cadip flow implementation

search(env, cadip_collection_identifier, session_identifier, error_if_empty=False) async

Search Cadip sessions.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment (at least the owner_id is required)

required
cadip_collection_identifier str

CADIP collection identifier (to know the station)

required
session_identifier str

Session identifier

required
error_if_empty bool

Raise a ValueError if the results are empty.

False
Source code in docs/rs-client-libraries/rs_workflows/cadip_flow.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@flow(name="Cadip search")
async def search(
    env: FlowEnvArgs,
    cadip_collection_identifier: str,
    session_identifier: str,
    error_if_empty: bool = False,
) -> ItemCollection:
    """
    Search Cadip sessions.

    Args:
        env: Prefect flow environment (at least the owner_id is required)
        cadip_collection_identifier: CADIP collection identifier (to know the station)
        session_identifier: Session identifier
        error_if_empty: Raise a ValueError if the results are empty.
    """
    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "cadip-search"):

        logger.info("Start Cadip search")
        cadip_client: CadipClient = flow_env.rs_client.get_cadip_client()
        found = cadip_client.search(
            method="GET",
            ids=[session_identifier],
            collections=[cadip_collection_identifier],
        )
        if (not found) and error_if_empty:
            raise ValueError(
                f"No Cadip session found for id={session_identifier!r} collection={cadip_collection_identifier!r}",
            )
        logger.info(f"Cadip search found {len(found)} results: {found}")
        return found

search_task(*args, **kwargs) async

See: search

Source code in docs/rs-client-libraries/rs_workflows/cadip_flow.py
66
67
68
69
@task(name="Cadip search")
async def search_task(*args, **kwargs) -> ItemCollection | None:
    """See: search"""
    return await search.fn(*args, **kwargs)