Skip to content

rs_workflows/catalog_flow.md

<< Back to index

Catalog flow implementation

Search Catalog items.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment (at least the owner_id is required)

required
catalog_cql2 dict

CQL2 filter.

required
error_if_empty bool

Raise a ValueError if the results are empty.

False
Source code in docs/rs-client-libraries/rs_workflows/catalog_flow.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@flow(name="Catalog search")
async def catalog_search(
    env: FlowEnvArgs,
    catalog_cql2: dict,
    error_if_empty: bool = False,
) -> ItemCollection | None:
    """
    Search Catalog items.

    Args:
        env: Prefect flow environment (at least the owner_id is required)
        catalog_cql2: CQL2 filter.
        error_if_empty: Raise a ValueError if the results are empty.
    """
    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "catalog-search"):

        logger.info("Start Catalog search")
        catalog_client: CatalogClient = flow_env.rs_client.get_catalog_client()
        found = catalog_client.search(
            method="POST",
            stac_filter=catalog_cql2.get("filter"),
            max_items=catalog_cql2.get("limit"),
            sortby=catalog_cql2.get("sortby"),
        )
        if (not found) and error_if_empty:
            raise ValueError(
                f"No Catalog item found for CQL2 filter: {json.dumps(catalog_cql2, indent=2)}",
            )
        logger.info(f"Catalog search found {len(found)} results: {found}")  # type: ignore
        return found

catalog_search_task(*args, **kwargs) async

See: search

Source code in docs/rs-client-libraries/rs_workflows/catalog_flow.py
168
169
170
171
@task(name="Catalog search")
async def catalog_search_task(*args, **kwargs) -> ItemCollection | None:
    """See: search"""
    return await catalog_search.fn(*args, **kwargs)

publish(env, catalog_collection_identifier, payload_file, items) async

Publish items to the catalog

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment

required
catalog_collection_identifier list[dict]

Catalog collection identifier where the items are staged

required
payload_file PayloadSchema

Payload file configuration for the dpr processor

required
items list[dict]

Items to publish, as STAC dicts

required
Source code in docs/rs-client-libraries/rs_workflows/catalog_flow.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@task(name="Publish to catalog")
async def publish(
    env: FlowEnvArgs,
    catalog_collection_identifier: list[dict],
    payload_file: PayloadSchema,
    items: list[dict],
):
    """
    Publish items to the catalog

    Args:
        env: Prefect flow environment
        catalog_collection_identifier: Catalog collection identifier where the items are staged
        payload_file: Payload file configuration for the dpr processor
        items: Items to publish, as STAC dicts
    """
    logger = get_run_logger()
    flow_env = FlowEnv(env)

    def extract_product_type_and_collection(cci: dict):
        cci_tuple = next(iter(cci.values()))
        if isinstance(cci_tuple, tuple):
            return cci_tuple[0], cci_tuple[1]
        return cci_tuple, cci_tuple

    def find_matching_output_product(output_dir: str):
        if not payload_file.io:
            return None

        for output_ps in payload_file.io.output_products:
            if output_dir == output_ps.id:
                return output_ps
        return None

    def build_item(feature_dict: dict) -> Item:
        sd = feature_dict["stac_discovery"]
        return Item(
            id=sd["id"],
            geometry=sd["geometry"],
            bbox=sd["bbox"],
            datetime=datetime.fromisoformat(sd["properties"]["datetime"]),
            properties=sd["properties"],
        )

    def build_asset(path: str, title: str) -> Asset:
        return Asset(
            href=path,
            title=title,
            media_type="application/vnd+zarr",
            roles=["data", "metadata"],
            extra_fields={
                "file:local_path": path,
                "auth:ref": "should be filled thanks to story RSPY-280",
            },
        )

    catalog_client: CatalogClient = flow_env.rs_client.get_catalog_client()

    with flow_env.start_span(__name__, "publish-to-catalog"):
        for feature_dict in items:
            try:
                sd_props = feature_dict["stac_discovery"]["properties"]
                feature_product_type = sd_props["product:type"].upper()

                for cci in catalog_collection_identifier:
                    product_type, collection = extract_product_type_and_collection(cci)

                    if feature_product_type != product_type.upper():
                        continue

                    output_dir = next(iter(cci))
                    output_ps = find_matching_output_product(output_dir)
                    if not output_ps:
                        continue

                    item = build_item(feature_dict)

                    title = f"{item.id}.zarr"
                    output_path = os.path.join(output_ps.path, title)

                    item.assets = {title: build_asset(output_path, title)}
                    catalog_client.add_item(collection, item)

            except Exception as e:
                raise RuntimeError(f"Exception while publishing: {json.dumps(feature_dict, indent=2)}") from e

    # list collections for logging
    collections = catalog_client.get_collections()
    logger.info("\nCollections response:")
    for collection in collections:
        logger.info(f"ID: {collection.id}, Title: {collection.title}")

    logger.info("End catalog publishing")