Skip to content

rs_workflows/utils/stac.md

<< Back to index

STAC utilities

search(env, cql2, span_name, stac_client_selector, error_if_empty=False, start_log_message=None) async

Search items in a STAC catalogue.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment (at least the owner_id is required)

required
cql2 dict

CQL2 filter read from the processor tasktable.

required
span_name str

Name of the OpenTelemetry span.

required
stac_client_selector StacClientSelector

Function receiving the flow environment and returning the STAC client plus source-specific search keyword arguments.

required
error_if_empty bool

Raise a ValueError if the results are empty.

False
start_log_message str | None

Optional search start log message.

None
Source code in docs/rs-client-libraries/rs_workflows/utils/stac.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
async def search(
    env: FlowEnvArgs,
    cql2: dict,
    span_name: str,
    stac_client_selector: StacClientSelector,
    error_if_empty: bool = False,
    start_log_message: str | None = None,
) -> ItemCollection | None:
    """
    Search items in a STAC catalogue.

    Args:
        env: Prefect flow environment (at least the owner_id is required)
        cql2: CQL2 filter read from the processor tasktable.
        span_name: Name of the OpenTelemetry span.
        stac_client_selector: Function receiving the flow environment and returning the STAC client
            plus source-specific search keyword arguments.
        error_if_empty: Raise a ValueError if the results are empty.
        start_log_message: Optional search start log message.
    """
    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, span_name):

        logger.info(start_log_message or f"Start STAC search using CQL2: {cql2}")
        stac_client, search_kwargs = stac_client_selector(flow_env)
        found = stac_client.search(
            method="POST",
            stac_filter=cql2.get("filter"),
            max_items=cql2.get("limit"),
            collections=cql2.get("collections"),
            sortby=cql2.get("sortby"),
            timestamp=cql2.get("timestamp"),
            **search_kwargs,
        )
        if (not found) and error_if_empty:
            raise ValueError(
                f"No item found for CQL2: {json.dumps(cql2, indent=2)}",
            )
        logger.info(f"STAC search found {len(found)} result(s): {found.to_dict()}")
        return found