Skip to content

rs_workflows/prip_flow.md

<< Back to index

Prip flow implementation

on_demand_prip_staging(env, start_datetime, end_datetime, product_type, prip_collection, catalog_collection_identifier, retry_config=RetryConfig()) async

Flow to retrieve Prip files with the given time interval defined by start_datetime and end_datetime, select only the type of files wanted, stage the files and add STAC items into the catalog.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment

required
start_datetime datetime | str

Start datetime for the time interval used to filter the files (date or timestamp, e.g. "2025-08-07T11:51:12.509000Z")

required
end_datetime datetime | str

End datetime for the time interval used to filter the files (date or timestamp, e.g. "2025-08-10T14:00:00.509000Z")

required
product_type str

Prip product type wanted

required
prip_collection str

PRIP collection identifier (station)

required
catalog_collection_identifier str

Catalog collection identifier where PRIP data are staged

required
Source code in docs/rs-client-libraries/rs_workflows/prip_flow.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@flow(name="On-demand Prip staging")
async def on_demand_prip_staging(
    env: FlowEnvArgs,
    start_datetime: datetime.datetime | str,
    end_datetime: datetime.datetime | str,
    product_type: str,
    prip_collection: str,
    catalog_collection_identifier: str,
    retry_config: RetryConfig = RetryConfig(),  # type: ignore
):
    """
    Flow to retrieve Prip files with the given time interval defined by
    start_datetime and end_datetime, select only the type of files wanted,
    stage the files and add STAC items into the catalog.

    Args:
        env: Prefect flow environment
        start_datetime: Start datetime for the time interval used to filter the files
            (date or timestamp, e.g. "2025-08-07T11:51:12.509000Z")
        end_datetime: End datetime for the time interval used to filter the files
            (date or timestamp, e.g. "2025-08-10T14:00:00.509000Z")
        product_type: Prip product type wanted
        prip_collection: PRIP collection identifier (station)
        catalog_collection_identifier: Catalog collection identifier where PRIP data are staged
    """

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "on-demand-prip-staging"):

        # CQL2 filter: filter on product type and time interval
        cql2_filter = create_valcover_filter(start_datetime, end_datetime, product_type)

        # Search Prip products
        prip_items = search_task.with_options(
            retries=retry_config.staging_retries,
            retry_delay_seconds=retry_config.staging_retry_delay,
        ).submit(
            flow_env.serialize(),
            prip_cql2={"filter": cql2_filter},
            prip_collection=prip_collection,
            error_if_empty=False,
        )

        # Stage Prip items
        staged = staging_task.with_options(
            retries=retry_config.staging_retries,
            retry_delay_seconds=retry_config.staging_retry_delay,
        ).submit(
            flow_env.serialize(),
            prip_items,
            catalog_collection_identifier,
        )

        # Wait for last task to end (unwrap exceptions if any)
        staged.result()  # type: ignore[unused-coroutine]

search(env, prip_cql2, prip_collection='', error_if_empty=False) async

Search Prip products.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment (at least the owner_id is required)

required
prip_cql2 dict

PRIP CQL2 filter.

required
prip_collection str

PRIP ollection identifier (to know the station)

''
error_if_empty bool

Raise a ValueError if the results are empty.

False
Source code in docs/rs-client-libraries/rs_workflows/prip_flow.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@flow(name="Prip search")
async def search(
    env: FlowEnvArgs,
    prip_cql2: dict,
    prip_collection: str = "",
    error_if_empty: bool = False,
) -> ItemCollection | None:
    """
    Search Prip products.

    Args:
        env: Prefect flow environment (at least the owner_id is required)
        prip_cql2: PRIP CQL2 filter.
        prip_collection: PRIP ollection identifier (to know the station)
        error_if_empty: Raise a ValueError if the results are empty.
    """
    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "prip-search"):

        logger.info("Start PRIP search")
        prip_client: PripClient = flow_env.rs_client.get_prip_client()
        found = prip_client.search(
            method="POST",
            stac_filter=prip_cql2.get("filter"),
            max_items=prip_cql2.get("limit", 10),
            sortby=prip_cql2.get("sortby", "-created"),
            collections=[prip_collection],
        )
        if (not found) and error_if_empty:
            raise ValueError("No PRIP products found")
        logger.info(f"PRIP search found {len(found)} results: {found}")
        return found

search_task(*args, **kwargs) async

See: search

Source code in docs/rs-client-libraries/rs_workflows/prip_flow.py
128
129
130
131
@task(name="PRIP search")
async def search_task(*args, **kwargs) -> ItemCollection | None:
    """See: search"""
    return await search.fn(*args, **kwargs)