Skip to content

rs_workflows/prip_flow.md

<< Back to index

Prip flow implementation

search(env, prip_cql2, prip_collection='', error_if_empty=False) async

Search Prip products.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment (at least the owner_id is required)

required
prip_cql2 dict

PRIP CQL2 filter.

required
prip_collection str

PRIP ollection identifier (to know the station)

''
error_if_empty bool

Raise a ValueError if the results are empty.

False
Source code in docs/rs-client-libraries/rs_workflows/prip_flow.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@flow(name="Prip search")
async def search(
    env: FlowEnvArgs,
    prip_cql2: dict,
    prip_collection: str = "",
    error_if_empty: bool = False,
) -> ItemCollection | None:
    """
    Search Prip products.

    Args:
        env: Prefect flow environment (at least the owner_id is required)
        prip_cql2: PRIP CQL2 filter.
        prip_collection: PRIP ollection identifier (to know the station)
        error_if_empty: Raise a ValueError if the results are empty.
    """
    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "prip-search"):

        logger.info("Start PRIP search")
        prip_client: PripClient = flow_env.rs_client.get_prip_client()
        found = prip_client.search(
            method="POST",
            stac_filter=prip_cql2.get("filter"),
            max_items=prip_cql2.get("limit", 10),
            sortby=prip_cql2.get("sortby", "-created"),
            collections=[prip_collection],
        )
        if (not found) and error_if_empty:
            raise ValueError("No PRIP products found")
        logger.info(f"PRIP search found {len(found)} results: {found}")
        return found

search_task(*args, **kwargs) async

See: search

Source code in docs/rs-client-libraries/rs_workflows/prip_flow.py
65
66
67
68
@task(name="PRIP search")
async def search_task(*args, **kwargs) -> ItemCollection | None:
    """See: search"""
    return await search.fn(*args, **kwargs)