Skip to content

rs_workflows/catalog_flow.md

<< Back to index

Catalog flow implementation

Search Catalog items.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment (at least the owner_id is required)

required
catalog_cql2 dict

CQL2 filter.

required
error_if_empty bool

Raise a ValueError if the results are empty.

False
Source code in docs/rs-client-libraries/rs_workflows/catalog_flow.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@flow(name="Catalog search")
async def catalog_search(
    env: FlowEnvArgs,
    catalog_cql2: dict,
    error_if_empty: bool = False,
) -> ItemCollection | None:
    """
    Search Catalog items.

    Args:
        env: Prefect flow environment (at least the owner_id is required)
        catalog_cql2: CQL2 filter.
        error_if_empty: Raise a ValueError if the results are empty.
    """
    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "catalog-search"):

        logger.info("Start Catalog search")
        catalog_client: CatalogClient = flow_env.rs_client.get_catalog_client()
        found = catalog_client.search(
            method="POST",
            stac_filter=catalog_cql2.get("filter"),
            max_items=catalog_cql2.get("limit"),
            sortby=catalog_cql2.get("sortby"),
        )
        if (not found) and error_if_empty:
            raise ValueError(
                f"No Catalog item found for CQL2 filter: {json.dumps(catalog_cql2, indent=2)}",
            )
        logger.info(f"Catalog search found {len(found)} results: {found}")  # type: ignore
        return found

catalog_search_task(*args, **kwargs) async

See: search

Source code in docs/rs-client-libraries/rs_workflows/catalog_flow.py
145
146
147
148
@task(name="Catalog search")
async def catalog_search_task(*args, **kwargs) -> ItemCollection | None:
    """See: search"""
    return await catalog_search.fn(*args, **kwargs)

get_item(env, target_collection, item) async

Get a catalog item by its ID.

Source code in docs/rs-client-libraries/rs_workflows/catalog_flow.py
151
152
153
154
155
156
157
158
159
160
161
162
@task(name="Get catalog item")
async def get_item(
    env: FlowEnvArgs,
    target_collection,
    item,
):
    """
    Get a catalog item by its ID.
    """
    flow_env = FlowEnv(env)
    catalog_client: CatalogClient = flow_env.rs_client.get_catalog_client()
    return catalog_client.get_item(target_collection, item)

publish(env, target_collections, items) async

Publish items to the catalog

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment

required
collection

Catalog collection identifier where the items are published

required
items

Items to publish, as STAC dicts or pystac.Items

required
Source code in docs/rs-client-libraries/rs_workflows/catalog_flow.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
@task(name="Publish to catalog")
async def publish(
    env: FlowEnvArgs,
    target_collections,
    items,
):
    """
    Publish items to the catalog

    Args:
        env: Prefect flow environment
        collection: Catalog collection identifier where the items are published
        items: Items to publish, as STAC dicts or pystac.Items
    """
    logger = get_run_logger()
    flow_env = FlowEnv(env)

    catalog_client: CatalogClient = flow_env.rs_client.get_catalog_client()

    with flow_env.start_span(__name__, "publish-to-catalog"):
        for item in items:
            try:
                # Extract product type from STAC item
                product_type = (
                    item.properties["product:type"] if isinstance(item, Item) else item["properties"]["product:type"]
                )
                # Resolve destination collection
                target_collection = resolve_collection(product_type, target_collections)
                # TEMPFIX: Items generated by DPR with [] as geometry
                # Also, remove some fields that s1 l0 processor is not compliant with
                # regarding the catalog ingestion
                if isinstance(item, Item):
                    # TEMPFIX: Items generated by DPR may contain empty geometry lists
                    if not item.geometry:
                        item.geometry = None

                    instruments = item.properties.get("instruments")
                    if instruments is not None and not isinstance(instruments, list):
                        item.properties["instruments"] = [instruments]
                # TEMPFIX END
                logger.info(
                    "Writing product %s to %s. This may take a while...",
                    item.id if isinstance(item, Item) else item["id"],
                    target_collection,
                )
                # Publish item to catalog
                # TODO: adjust timeout as needed. Current value is 6 hours.
                logger.info(
                    "Using timeout of 21600 seconds for catalog item publishing",
                )
                response = catalog_client.add_item(target_collection, item, timeout=21600)
                logger.info(
                    "Writing product %s to %s done. Response %s with message: %s",
                    item.id if isinstance(item, Item) else item["id"],
                    target_collection,
                    response.status_code,
                    response.text,
                )

            except Exception as e:
                # Re-raise with full item context for easier debugging
                item = item.to_dict() if hasattr(item, "to_dict") else item
                raise RuntimeError(
                    f"Exception while publishing item: {json.dumps(item, indent=2)}",
                ) from e

    # list collections for logging
    collections = catalog_client.get_collections()
    logger.info("\nCollections response:")
    for collection in collections:
        logger.info(f"ID: {collection.id}, Title: {collection.title}")

    logger.info("End catalog publishing")

resolve_collection(product_type, target_collections)

Resolve the target collection for a given product type.

Supports input values as strings or tuples (product_type, collection), and input as dict or list of dicts.

Parameters:

Name Type Description Default
product_type str

STAC product type.

required
target_collections

Dict or list of dicts with target collections.

required

Returns:

Type Description
str

The resolved target collection.

Raises:

Type Description
ValueError

If the product type cannot be resolved.

Source code in docs/rs-client-libraries/rs_workflows/catalog_flow.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def resolve_collection(product_type: str, target_collections) -> str:
    """Resolve the target collection for a given product type.

    Supports input values as strings or tuples (product_type, collection),
    and input as dict or list of dicts.

    Args:
        product_type: STAC product type.
        target_collections: Dict or list of dicts with target collections.

    Returns:
        The resolved target collection.

    Raises:
        ValueError: If the product type cannot be resolved.
    """
    collections = {}

    if isinstance(target_collections, dict):
        # normalize dict input
        for value in target_collections.values():
            if isinstance(value, tuple):
                key, collection = value
                collections[key] = collection
            else:  # string case
                collections[value] = value
    else:
        # normalize list of dicts input
        for d in target_collections:
            for value in d.values():
                if isinstance(value, tuple):
                    key, collection = value
                    collections[key] = collection
                else:  # string case
                    collections[value] = value

    # lookup product_type
    target_collection = collections.get(product_type, collections.get("*"))
    if not target_collection:
        raise ValueError(f"Product type unknown: {product_type}")

    return target_collection