Skip to content

rs_workflows/catalog_flow.md

<< Back to index

Catalog flow implementation

Search Catalog items.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment (at least the owner_id is required)

required
catalog_cql2 dict

CQL2 filter.

required
error_if_empty bool

Raise a ValueError if the results are empty.

False
Source code in docs/rs-client-libraries/rs_workflows/catalog_flow.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@flow(name="Catalog search")
async def catalog_search(
    env: FlowEnvArgs,
    catalog_cql2: dict,
    error_if_empty: bool = False,
) -> ItemCollection | None:
    """
    Search Catalog items.

    Args:
        env: Prefect flow environment (at least the owner_id is required)
        catalog_cql2: CQL2 filter.
        error_if_empty: Raise a ValueError if the results are empty.
    """
    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "catalog-search"):

        logger.info("Start Catalog search")
        catalog_client: CatalogClient = flow_env.rs_client.get_catalog_client()
        found = catalog_client.search(
            method="POST",
            stac_filter=catalog_cql2.get("filter"),
            max_items=catalog_cql2.get("limit"),
            sortby=catalog_cql2.get("sortby"),
        )
        if (not found) and error_if_empty:
            raise ValueError(
                f"No Catalog item found for CQL2 filter: {json.dumps(catalog_cql2, indent=2)}",
            )
        logger.info(f"Catalog search found {len(found)} results: {found}")  # type: ignore
        return found

catalog_search_task(*args, **kwargs) async

See: search

Source code in docs/rs-client-libraries/rs_workflows/catalog_flow.py
151
152
153
154
@task(name="Catalog search")
async def catalog_search_task(*args, **kwargs) -> ItemCollection | None:
    """See: search"""
    return await catalog_search.fn(*args, **kwargs)

get_item(env, target_collection, item) async

Get a catalog item by its ID.

Source code in docs/rs-client-libraries/rs_workflows/catalog_flow.py
157
158
159
160
161
162
163
164
165
166
167
168
@task(name="Get catalog item")
async def get_item(
    env: FlowEnvArgs,
    target_collection,
    item,
):
    """
    Get a catalog item by its ID.
    """
    flow_env = FlowEnv(env)
    catalog_client: CatalogClient = flow_env.rs_client.get_catalog_client()
    return catalog_client.get_item(target_collection, item)

publish(env, generated_product_to_collection_identifier, items_metadata) async

Publish items to the catalog

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment

required
collection

Catalog collection identifier where the items are published

required
items_metadata list[DprProcessedItemMetadata]

List of DprProcessedItemMetadata containing items to publish

required
Source code in docs/rs-client-libraries/rs_workflows/catalog_flow.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@task(name="Publish to catalog")
async def publish(
    env: FlowEnvArgs,
    generated_product_to_collection_identifier: list[FlowGeneratedProduct],
    items_metadata: list[DprProcessedItemMetadata],
):
    """
    Publish items to the catalog

    Args:
        env: Prefect flow environment
        collection: Catalog collection identifier where the items are published
        items_metadata: List of DprProcessedItemMetadata containing items to publish
    """
    logger = get_run_logger()
    flow_env = FlowEnv(env)

    catalog_client: CatalogClient = flow_env.rs_client.get_catalog_client()

    with flow_env.start_span(__name__, "publish-to-catalog"):
        for item_metadata in items_metadata:
            item = item_metadata.stac_item
            try:
                # Resolve destination collection using metadata from STAC item
                target_collection = resolve_collection(
                    item_metadata,
                    generated_product_to_collection_identifier,
                )
                # TEMPFIX: Items generated by DPR with [] as geometry
                # Also, remove some fields that s1 l0 processor is not compliant with
                # regarding the catalog ingestion
                # TEMPFIX: Items generated by DPR may contain empty geometry list
                if not item.geometry:
                    item.geometry = None
                # TEMPFIX: Items generated by DPR may contain empty bbox list
                if not item.bbox:
                    item.bbox = None
                instruments = item.properties.get("instruments")
                if instruments is not None and not isinstance(instruments, list):
                    item.properties["instruments"] = [instruments]
                # TEMPFIX END
                logger.info(
                    "Writing product %s to %s. This may take a while...",
                    item.id,
                    target_collection,
                )
                # Publish item to catalog
                # TODO: adjust timeout as needed. Current value is 6 hours.
                logger.info(
                    "Using timeout of 21600 seconds for catalog item publishing",
                )
                response = catalog_client.add_item(target_collection, item, timeout=21600)
                logger.info(
                    "Writing product %s to %s done. Response %s with message: %s",
                    item.id,
                    target_collection,
                    response.status_code,
                    response.text,
                )

            except Exception as e:
                # Re-raise with full item context for easier debugging
                raise RuntimeError(
                    f"Exception while publishing item: {json.dumps(item.to_dict(), indent=2)}",
                ) from e

    # list collections for logging
    collections = catalog_client.get_collections()
    logger.info("\nCollections response:")
    for collection in collections:
        logger.info(f"ID: {collection.id}, Title: {collection.title}")

    logger.info("End catalog publishing")

resolve_collection(item_metadata, generated_product_to_collection_identifier)

Resolve the target collection for a given product metadata.

The matching logic uses both output_product_id and product_type from the DprProcessedItemMetadata to find the corresponding FlowGeneratedProduct. Wildcards ('*') are supported for product_type in FlowGeneratedProduct.

Returns:

Type Description
str

The resolved target collection name.

Raises:

Type Description
ValueError

If the product cannot be resolved to any collection.

Source code in docs/rs-client-libraries/rs_workflows/catalog_flow.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def resolve_collection(
    item_metadata: DprProcessedItemMetadata,
    generated_product_to_collection_identifier: list[FlowGeneratedProduct],
) -> str:
    """Resolve the target collection for a given product metadata.

    The matching logic uses both output_product_id and product_type from the
    DprProcessedItemMetadata to find the corresponding FlowGeneratedProduct.
    Wildcards ('*') are supported for product_type in FlowGeneratedProduct.

    Returns:
        The resolved target collection name.

    Raises:
        ValueError: If the product cannot be resolved to any collection.
    """
    logger = get_run_logger()
    logger.info(
        f"Resolving target collection for item metadata: {item_metadata}"
        f" with generated_product_to_collection_identifier: {generated_product_to_collection_identifier}",
    )

    # We prioritize matches in the following order:
    # 1. Exact Name, Exact Type
    # 2. Exact Name, Wildcard Type (least specific)
    # wildcards for name are NOT supported, they don't make sense in this context as tasktable don't support them

    target_collection = None
    for gen_prod in generated_product_to_collection_identifier:
        if gen_prod.name != item_metadata.output_product_id:
            continue
        # 1. Exact Name, Exact Type
        if item_metadata.product_type == gen_prod.product_type:
            target_collection = gen_prod.collection_name or gen_prod.product_type
            logger.info(f"Exact match (Name & Type) found: {gen_prod}, resolved collection: {target_collection}")
            break
        # 2. Exact Name, Wildcard Type
        if gen_prod.product_type == "*" and target_collection is None:
            target_collection = gen_prod.collection_name or gen_prod.product_type
            # This protection is also set in build_output_products from payload_generator.py,
            # but it's good to have it here as well for safety or a future change in logic.
            if target_collection == "*":
                raise RuntimeError(
                    f"The product type in generated_product_to_collection_identifier cannot be '*' "
                    f"if the collection name is not specified for product '{gen_prod.name}'",
                )
            logger.info(
                f"Match with Exact Name & Wildcard Type: {gen_prod}, resolved collection: {target_collection}",
            )

    if not target_collection:
        raise ValueError(
            f"No match for {item_metadata.output_product_id} in "
            f"{generated_product_to_collection_identifier}. "
            f"Could not find a collection to publish the stac_item from: {item_metadata}",
        )

    return target_collection