Skip to content

rs_server_catalog/middleware/request_manager.md

<< Back to index

Module to process the Requests sent by users to the Catalog before routing them to stac-fastapi.

CatalogRequestManager

Class to process the Requests sent by users to the Catalog before routing them to stac-fastapi. Each type of Response is managed in one of the functions.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
class CatalogRequestManager:
    """Class to process the Requests sent by users to the Catalog before routing them to stac-fastapi.
    Each type of Response is managed in one of the functions."""

    def __init__(self, client: CoreCrudClient, request_ids: dict[Any, Any]):
        self.client = client
        self.request_ids = request_ids
        self.s3_manager = S3Manager()
        self.s3_files_to_be_deleted: list = []

    def _override_request_body(self, request: Request, content: Any) -> Request:
        """Update request body (better find the function that updates the body maybe?)"""
        request._body = json.dumps(content).encode("utf-8")  # pylint: disable=protected-access
        request._json = content  # pylint: disable=protected-access
        logger.debug("new request body and json: %s", request._body)  # pylint: disable=protected-access
        return request

    def _override_request_query_string(self, request: Request, query_params: dict) -> Request:
        """Update request query string"""
        request.scope["query_string"] = urlencode(query_params, doseq=True).encode("utf-8")
        logger.debug("new request query_string: %s", request.scope["query_string"])
        return request

    async def _collection_exists(self, request: Request, collection_id: str) -> bool:
        """Check if the collection exists.

        Returns:
            bool: True if the collection exists, False otherwise
        """
        try:
            await self.client.get_collection(collection_id, request)
            return True
        except Exception as e:  # pylint: disable=broad-exception-caught
            logger.error("Collection %s not found: %s", collection_id, e)
            return False

    async def _get_item_from_collection(self, request: Request):
        """Get an item from the collection.

        Args:
            request (Request): The request object.

        Returns:
            Optional[Dict]: The item from the collection if found, else None.
        """
        item_id = self.request_ids["item_id"]
        collection_id = f"{self.request_ids['owner_id']}_{self.request_ids['collection_ids'][0]}"
        try:
            item = await self.client.get_item(item_id=item_id, collection_id=collection_id, request=request)
            return item
        except NotFoundError:
            logger.info(f"The item '{item_id}' does not exist in collection '{collection_id}'")
            return None
        except Exception as e:  # pylint: disable=broad-exception-caught
            logger.exception(f"Exception: {e}")
            raise log_http_exception(
                detail=f"Exception when trying to get the item {item_id} from the collection '{collection_id}'",
                status_code=HTTP_400_BAD_REQUEST,
            ) from e

    async def build_filelist_to_be_deleted(self, request):
        """Build the list of the s3 files that will be deleted if the request is successfull"""
        for ci in self.request_ids["collection_ids"]:
            collection_id = f"{self.request_ids['owner_id']}_{ci}"
            items = []
            try:
                if "/items" not in request.scope["path"]:
                    # this is the case for delete endpoint /collections/<collection_name>
                    # use pagination, otherwise a maximum of the default limit (10) items is returned
                    # NOTE: Unable to use the pagination from pgstac client. Temporary, use a limit of 100
                    token = None
                    while True:
                        items_collection = await self.client.item_collection(
                            request=request,
                            collection_id=collection_id,
                            limit=100,
                            token=token,
                        )
                        items.extend(items_collection.get("features", []))
                        # Check if there's a next token for pagination
                        token = get_token_for_pagination(items_collection)

                        if not token:
                            # No more pages left, break the loop
                            break
                else:
                    # this is the case for delete endpoint /collections/<collection_name>/items/<item_name>
                    item = await self.client.get_item(
                        item_id=self.request_ids["item_id"],
                        collection_id=collection_id,
                        request=request,
                    )
                    items = [item]
            except NotFoundError as nfe:
                logger.error(f"Failed to find the requested object to be deleted. {nfe}")
                return
            except KeyError as e:
                logger.error(f"Failed to build the list of items to be deleted due to missing key: {e}")
                return
            logger.debug(f"Found {len(items)} items: {items}")
            try:
                for item in items:
                    assets = item.get("assets", {})
                    for _, asset_info in assets.items():
                        s3_href = asset_info.get("href")
                        if s3_href:
                            self.s3_files_to_be_deleted.append(s3_href)
            except KeyError as e:
                logger.error(
                    f"Failed to build the list of S3 files to be deleted due to missing key in dictionary: {e}",
                )
                return
            logger.info(
                "Successfully built the list of S3 files to be deleted. "
                f"There are {len(self.s3_files_to_be_deleted)} files to be deleted",
            )

    async def manage_requests(self, request: Request) -> Request | Response:
        """Main function to dispatch the request pre-processing depending on which endpoint is called.
        Will pre-process the request using the function associated to the path called and return it.

        Args:
            request (Request): request received by the Catalog.

        Returns:
            Request|Response: Request processed to be sent to stac-fastapi OR a response if the operation
                is not authorized
        """
        if request.method in ("POST", "PUT") and "/search" not in request.scope["path"]:
            # URL: POST / PUT: '/catalog/collections/{USER}:{COLLECTION}'
            # or '/catalog/collections/{USER}:{COLLECTION}/items'
            request_or_response = await self.manage_put_post_request(request)
            if hasattr(request_or_response, "status_code"):  # Unauthorized
                return cast(Response, request_or_response)
            request = request_or_response

        elif request.method == "DELETE":
            if not await self.manage_delete_request(request):
                raise log_http_exception(status_code=HTTP_401_UNAUTHORIZED, detail="Deletion not allowed.")

        elif "/search" in request.scope["path"]:
            # URL: GET: '/catalog/search'
            request_or_response = await self.manage_search_request(request)
            if hasattr(request_or_response, "status_code"):  # Unauthorized
                return cast(Response, request_or_response)
            request = request_or_response

        elif request.method == "GET" and request.scope["path"] == CATALOG_COLLECTIONS:
            # override default pgstac limit of 10 items if not explicitely set
            if "limit" not in request.query_params:
                request = self._override_request_query_string(request, {**request.query_params, "limit": 1000})

        elif request.method == "PATCH":
            request_or_response = await self.manage_patch_request(request)
            if hasattr(request_or_response, "status_code"):  # Unauthorized
                return cast(Response, request_or_response)
            request = request_or_response

        return request

    async def manage_put_post_request(  # pylint: disable=too-many-statements,too-many-return-statements,too-many-branches  # noqa: E501
        self,
        request: Request,
    ) -> Request | JSONResponse:
        """Adapt the request body for the STAC endpoint.

        Args:
            request (Request): The Client request to be updated.

        Returns:
            Request: The request updated.
        """
        try:
            original_content = await request.json()
            content = copy.deepcopy(original_content)

            check_user_authorization(self.request_ids)

            if len(self.request_ids["collection_ids"]) > 1:
                raise log_http_exception(
                    status_code=HTTP_400_BAD_REQUEST,
                    detail="Cannot create or update more than one collection !",
                )

            if len(self.request_ids["collection_ids"]) == 0:
                raise log_http_exception(
                    status_code=HTTP_400_BAD_REQUEST,
                    detail="Cannot create or update -> no collection specified !",
                )

            collection = self.request_ids["collection_ids"][0]
            if (
                # POST collection
                request.scope["path"]
                == CATALOG_COLLECTIONS
            ) or (
                # PUT collection
                request.scope["path"]
                == f"{CATALOG_COLLECTIONS}/{self.request_ids['owner_id']}_{collection}"
            ):
                # Manage a collection creation. The apikey user should be the same as the owner
                # field in the body request. In other words, an apikey user cannot create a
                # collection owned by another user.
                # We don't care for local mode, any user may create / delete collection owned by another user
                if common_settings.CLUSTER_MODE and (self.request_ids["owner_id"] != self.request_ids["user_login"]):
                    error = f"The '{self.request_ids['user_login']}' user cannot create a \
collection owned by the '{self.request_ids['owner_id']}' user. Additionally, modifying the 'owner' \
field is not permitted also."
                    raise log_http_exception(status_code=HTTP_401_UNAUTHORIZED, detail=error)

                content["id"] = owner_id_and_collection_id(self.request_ids["owner_id"], content["id"])
                if not content.get("owner"):
                    content["owner"] = self.request_ids["owner_id"]

                # See if there is already a collection with this ID. If yes, retrieve its "created" value.
                try:
                    existing_collection = await self.client.get_collection(content["id"], request)
                    date_of_creation = existing_collection.get("created", "")
                except Exception as e:  # pylint: disable=broad-exception-caught
                    logger.debug("Collection %s doesn't exist and will be created: %s", content["id"], e)
                    date_of_creation = ""

                # Update timestamps ("updated", and "created" if it's a new collection)
                content = timestamps_extension.set_timestamps_to_collection(content, original_created=date_of_creation)
                logger.debug(f"Handling for collection {content['id']}")
                # TODO update the links also?

            # The following section handles the request to create/update an item
            elif "/items" in request.scope["path"]:
                # first check if the collection exists
                if not await self._collection_exists(request, f"{self.request_ids['owner_id']}_{collection}"):
                    raise log_http_exception(
                        status_code=HTTP_404_NOT_FOUND,
                        detail=f"Collection {collection} does not exist.",
                    )

                # try to get the item if it is already part from the collection
                item = await self._get_item_from_collection(request)

                content, self.s3_files_to_be_deleted = self.s3_manager.update_stac_item_publication(
                    content,
                    request,
                    self.request_ids,
                    item,
                )
                if content:
                    if request.method == "POST":
                        content = timestamps_extension.set_timestamps_for_creation(content)
                        content = timestamps_extension.set_timestamps_for_insertion(content)
                    else:  # PUT
                        published = expires = ""
                        if item and item.get("properties"):
                            published = item["properties"].get("published", "")
                            expires = item["properties"].get("expires", "")
                        if not published and not expires:
                            raise log_http_exception(
                                status_code=HTTP_400_BAD_REQUEST,
                                detail=f"Item {content['id']} not found.",
                            )
                        content = timestamps_extension.set_timestamps_for_update(
                            content,
                            original_published=published,
                            original_expires=expires,
                        )
                    # If item doesn't contain a geometry/bbox, just fill with a default one.
                    if not content.get("geometry", None):
                        content["geometry"] = DEFAULT_GEOM
                    if not content.get("bbox", None):
                        content["bbox"] = DEFAULT_BBOX
                if hasattr(content, "status_code"):
                    return content

            # update request body if needed
            if content != original_content:
                request = self._override_request_body(request, content)

            return request  # pylint: disable=protected-access
        except KeyError as kerr_msg:
            raise log_http_exception(
                detail=f"Missing key in request body! {kerr_msg}",
                status_code=HTTP_400_BAD_REQUEST,
            ) from kerr_msg

    async def manage_delete_request(self, request: Request):
        """Check if the deletion is allowed.

        Args:
            request (Request): The client request.

        Raises:
            HTTPException: If the user is not authenticated.

        Returns:
            bool: Return True if the deletion is allowed, False otherwise.
        """
        user_login = getpass.getuser()
        auth_roles = []

        if common_settings.CLUSTER_MODE:  # Get the list of access and the user_login calling the endpoint.
            auth_roles = request.state.auth_roles
            user_login = request.state.user_login

        if (  # If we are in cluster mode and the user_login is not authorized
            # to this endpoint returns a HTTP_401_UNAUTHORIZED status.
            common_settings.CLUSTER_MODE
            and self.request_ids["collection_ids"]
            and self.request_ids["owner_id"]
            and not get_authorisation(
                self.request_ids["collection_ids"],
                auth_roles,
                "write",
                self.request_ids["owner_id"],
                user_login,
            )
        ):
            return False

        # Manage a collection deletion. The apikey user (or local user if in local mode)
        # should be the same as the owner field in the body request. In other words, the
        # apikey user cannot delete a collection owned by another user
        # we don't care for local mode, any user may create / delete collection owned by another user
        if (
            (  # DELETE collection
                request.scope["path"]
                == f"{CATALOG_COLLECTIONS}/{self.request_ids['owner_id']}_{self.request_ids['collection_ids'][0]}"
            )
            and common_settings.CLUSTER_MODE
            and (self.request_ids["owner_id"] != user_login)
        ):
            logger.error(
                f"The '{user_login}' user cannot delete a \
collection owned by the '{self.request_ids['owner_id']}' user",
            )
            return False

        await self.build_filelist_to_be_deleted(request)
        return True

    async def manage_search_request(  # pylint: disable=too-many-statements,too-many-branches
        self,
        request: Request,
    ) -> Request | JSONResponse:
        """find the user in the filter parameter and add it to the
        collection name.

        Args:
            request Request: the client request.

        Returns:
            Request: the new request with the collection name updated.
        """
        # ---------- POST requests
        if request.method == "POST":
            content = await request.json()

            # Pre-processing of filter extensions
            if "filter" in content:
                content["filter"] = process_filter_extensions(content["filter"])

            # Management of priority for the assignation of the owner_id
            if not self.request_ids["owner_id"]:
                self.request_ids["owner_id"] = (
                    (extract_owner_name_from_json_filter(content["filter"]) if "filter" in content else None)
                    or content.get("owner")
                    or get_user(self.request_ids["owner_id"], self.request_ids["user_login"])
                )

            # Add filter-lang option to the content if it doesn't already exist
            if "filter" in content:
                filter_lang = {"filter-lang": content.get("filter-lang", "cql2-json")}
                stac_filter = content.pop("filter")
                content = {
                    **content,
                    **filter_lang,
                    "filter": stac_filter,
                }  # The "filter_lang" field has to be placed BEFORE the filter.

            # ----- Call /catalog/search with POST method endpoint
            if "collections" in content:
                # Check if each collection exist with their raw name, if not concatenate owner_id to the collection name
                for i, collection in enumerate(content["collections"]):
                    if not await self._collection_exists(request, collection):
                        content["collections"][i] = f"{self.request_ids['owner_id']}_{collection}"
                        logger.debug(f"Using collection name: {content['collections'][i]}")
                        # Check the existence of the collection after concatenation of owner_id
                        if not await self._collection_exists(request, content["collections"][i]):
                            raise log_http_exception(
                                status_code=HTTP_404_NOT_FOUND,
                                detail=f"Collection {collection} not found.",
                            )

                self.request_ids["collection_ids"] = content["collections"]
                request = self._override_request_body(request, content)

        # ---------- GET requests
        elif request.method == "GET":
            # Get dictionary of query parameters
            query_params_dict = dict(request.query_params)

            # Update owner_id if it is not already defined from path parameters
            if not self.request_ids["owner_id"]:
                self.request_ids["owner_id"] = (
                    (
                        extract_owner_name_from_text_filter(query_params_dict["filter"])
                        if "filter" in query_params_dict
                        else ""
                    )
                    or query_params_dict.get("owner")
                    or get_user(self.request_ids["owner_id"], self.request_ids["user_login"])
                )

            # ----- Catch endpoint catalog/search + query parameters (e.g. /search?ids=S3_OLC&collections=titi)
            if "collections" in query_params_dict:
                coll_list = query_params_dict["collections"].split(",")

                # Check if each collection exist with their raw name, if not concatenate owner_id to the collection name
                for i, collection in enumerate(coll_list):
                    if not await self._collection_exists(request, collection):
                        coll_list[i] = f"{self.request_ids['owner_id']}_{collection}"
                        logger.debug(f"Using collection name: {coll_list[i]}")
                        # Check the existence of the collection after concatenation of owner_id
                        if not await self._collection_exists(request, coll_list[i]):
                            raise log_http_exception(
                                status_code=HTTP_404_NOT_FOUND,
                                detail=f"Collection {collection} not found.",
                            )

                self.request_ids["collection_ids"] = coll_list
                query_params_dict["collections"] = ",".join(coll_list)
                request = self._override_request_query_string(request, query_params_dict)

        # Check that the collection from the request exists
        for collection in self.request_ids["collection_ids"]:
            if not await self._collection_exists(request, collection):
                raise log_http_exception(status_code=HTTP_404_NOT_FOUND, detail=f"Collection {collection} not found.")

        # Check authorisation in cluster mode
        if common_settings.CLUSTER_MODE and not get_authorisation(
            self.request_ids["collection_ids"],
            self.request_ids["auth_roles"],
            "read",
            self.request_ids["owner_id"],
            self.request_ids["user_login"],
            # When calling the /search endpoints, the catalog ids are always prefixed by their <owner>_
            owner_prefix=True,
        ):
            raise log_http_exception(status_code=HTTP_401_UNAUTHORIZED, detail="Unauthorized access.")
        return request

    async def manage_patch_request(self, request: Request):
        """
        Pre-processing of a PATCH request to the Catalog.
        Does authorization checks and updates the "updated" field of the item to patch.

        Args:
            request (Request): The request from the Client

        Returns:
            Request: Updated request
        """
        try:
            original_content = await request.json()
            content = copy.deepcopy(original_content)

            check_user_authorization(self.request_ids)

            # Update "updated" timestamp (different field if it is an item or a collection)
            is_item = "/items/" in request.scope["path"]
            content = timestamps_extension.set_updated_timestamp_to_now(content, is_item=is_item)

            request = self._override_request_body(request, content)
            return request

        except KeyError as kerr_msg:
            raise log_http_exception(
                detail=f"Missing key in request body! {kerr_msg}",
                status_code=HTTP_400_BAD_REQUEST,
            ) from kerr_msg

build_filelist_to_be_deleted(request) async

Build the list of the s3 files that will be deleted if the request is successfull

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
async def build_filelist_to_be_deleted(self, request):
    """Build the list of the s3 files that will be deleted if the request is successfull"""
    for ci in self.request_ids["collection_ids"]:
        collection_id = f"{self.request_ids['owner_id']}_{ci}"
        items = []
        try:
            if "/items" not in request.scope["path"]:
                # this is the case for delete endpoint /collections/<collection_name>
                # use pagination, otherwise a maximum of the default limit (10) items is returned
                # NOTE: Unable to use the pagination from pgstac client. Temporary, use a limit of 100
                token = None
                while True:
                    items_collection = await self.client.item_collection(
                        request=request,
                        collection_id=collection_id,
                        limit=100,
                        token=token,
                    )
                    items.extend(items_collection.get("features", []))
                    # Check if there's a next token for pagination
                    token = get_token_for_pagination(items_collection)

                    if not token:
                        # No more pages left, break the loop
                        break
            else:
                # this is the case for delete endpoint /collections/<collection_name>/items/<item_name>
                item = await self.client.get_item(
                    item_id=self.request_ids["item_id"],
                    collection_id=collection_id,
                    request=request,
                )
                items = [item]
        except NotFoundError as nfe:
            logger.error(f"Failed to find the requested object to be deleted. {nfe}")
            return
        except KeyError as e:
            logger.error(f"Failed to build the list of items to be deleted due to missing key: {e}")
            return
        logger.debug(f"Found {len(items)} items: {items}")
        try:
            for item in items:
                assets = item.get("assets", {})
                for _, asset_info in assets.items():
                    s3_href = asset_info.get("href")
                    if s3_href:
                        self.s3_files_to_be_deleted.append(s3_href)
        except KeyError as e:
            logger.error(
                f"Failed to build the list of S3 files to be deleted due to missing key in dictionary: {e}",
            )
            return
        logger.info(
            "Successfully built the list of S3 files to be deleted. "
            f"There are {len(self.s3_files_to_be_deleted)} files to be deleted",
        )

manage_delete_request(request) async

Check if the deletion is allowed.

Parameters:

Name Type Description Default
request Request

The client request.

required

Raises:

Type Description
HTTPException

If the user is not authenticated.

Returns:

Name Type Description
bool

Return True if the deletion is allowed, False otherwise.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
    async def manage_delete_request(self, request: Request):
        """Check if the deletion is allowed.

        Args:
            request (Request): The client request.

        Raises:
            HTTPException: If the user is not authenticated.

        Returns:
            bool: Return True if the deletion is allowed, False otherwise.
        """
        user_login = getpass.getuser()
        auth_roles = []

        if common_settings.CLUSTER_MODE:  # Get the list of access and the user_login calling the endpoint.
            auth_roles = request.state.auth_roles
            user_login = request.state.user_login

        if (  # If we are in cluster mode and the user_login is not authorized
            # to this endpoint returns a HTTP_401_UNAUTHORIZED status.
            common_settings.CLUSTER_MODE
            and self.request_ids["collection_ids"]
            and self.request_ids["owner_id"]
            and not get_authorisation(
                self.request_ids["collection_ids"],
                auth_roles,
                "write",
                self.request_ids["owner_id"],
                user_login,
            )
        ):
            return False

        # Manage a collection deletion. The apikey user (or local user if in local mode)
        # should be the same as the owner field in the body request. In other words, the
        # apikey user cannot delete a collection owned by another user
        # we don't care for local mode, any user may create / delete collection owned by another user
        if (
            (  # DELETE collection
                request.scope["path"]
                == f"{CATALOG_COLLECTIONS}/{self.request_ids['owner_id']}_{self.request_ids['collection_ids'][0]}"
            )
            and common_settings.CLUSTER_MODE
            and (self.request_ids["owner_id"] != user_login)
        ):
            logger.error(
                f"The '{user_login}' user cannot delete a \
collection owned by the '{self.request_ids['owner_id']}' user",
            )
            return False

        await self.build_filelist_to_be_deleted(request)
        return True

manage_patch_request(request) async

Pre-processing of a PATCH request to the Catalog. Does authorization checks and updates the "updated" field of the item to patch.

Parameters:

Name Type Description Default
request Request

The request from the Client

required

Returns:

Name Type Description
Request

Updated request

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
async def manage_patch_request(self, request: Request):
    """
    Pre-processing of a PATCH request to the Catalog.
    Does authorization checks and updates the "updated" field of the item to patch.

    Args:
        request (Request): The request from the Client

    Returns:
        Request: Updated request
    """
    try:
        original_content = await request.json()
        content = copy.deepcopy(original_content)

        check_user_authorization(self.request_ids)

        # Update "updated" timestamp (different field if it is an item or a collection)
        is_item = "/items/" in request.scope["path"]
        content = timestamps_extension.set_updated_timestamp_to_now(content, is_item=is_item)

        request = self._override_request_body(request, content)
        return request

    except KeyError as kerr_msg:
        raise log_http_exception(
            detail=f"Missing key in request body! {kerr_msg}",
            status_code=HTTP_400_BAD_REQUEST,
        ) from kerr_msg

manage_put_post_request(request) async

Adapt the request body for the STAC endpoint.

Parameters:

Name Type Description Default
request Request

The Client request to be updated.

required

Returns:

Name Type Description
Request Request | JSONResponse

The request updated.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
    async def manage_put_post_request(  # pylint: disable=too-many-statements,too-many-return-statements,too-many-branches  # noqa: E501
        self,
        request: Request,
    ) -> Request | JSONResponse:
        """Adapt the request body for the STAC endpoint.

        Args:
            request (Request): The Client request to be updated.

        Returns:
            Request: The request updated.
        """
        try:
            original_content = await request.json()
            content = copy.deepcopy(original_content)

            check_user_authorization(self.request_ids)

            if len(self.request_ids["collection_ids"]) > 1:
                raise log_http_exception(
                    status_code=HTTP_400_BAD_REQUEST,
                    detail="Cannot create or update more than one collection !",
                )

            if len(self.request_ids["collection_ids"]) == 0:
                raise log_http_exception(
                    status_code=HTTP_400_BAD_REQUEST,
                    detail="Cannot create or update -> no collection specified !",
                )

            collection = self.request_ids["collection_ids"][0]
            if (
                # POST collection
                request.scope["path"]
                == CATALOG_COLLECTIONS
            ) or (
                # PUT collection
                request.scope["path"]
                == f"{CATALOG_COLLECTIONS}/{self.request_ids['owner_id']}_{collection}"
            ):
                # Manage a collection creation. The apikey user should be the same as the owner
                # field in the body request. In other words, an apikey user cannot create a
                # collection owned by another user.
                # We don't care for local mode, any user may create / delete collection owned by another user
                if common_settings.CLUSTER_MODE and (self.request_ids["owner_id"] != self.request_ids["user_login"]):
                    error = f"The '{self.request_ids['user_login']}' user cannot create a \
collection owned by the '{self.request_ids['owner_id']}' user. Additionally, modifying the 'owner' \
field is not permitted also."
                    raise log_http_exception(status_code=HTTP_401_UNAUTHORIZED, detail=error)

                content["id"] = owner_id_and_collection_id(self.request_ids["owner_id"], content["id"])
                if not content.get("owner"):
                    content["owner"] = self.request_ids["owner_id"]

                # See if there is already a collection with this ID. If yes, retrieve its "created" value.
                try:
                    existing_collection = await self.client.get_collection(content["id"], request)
                    date_of_creation = existing_collection.get("created", "")
                except Exception as e:  # pylint: disable=broad-exception-caught
                    logger.debug("Collection %s doesn't exist and will be created: %s", content["id"], e)
                    date_of_creation = ""

                # Update timestamps ("updated", and "created" if it's a new collection)
                content = timestamps_extension.set_timestamps_to_collection(content, original_created=date_of_creation)
                logger.debug(f"Handling for collection {content['id']}")
                # TODO update the links also?

            # The following section handles the request to create/update an item
            elif "/items" in request.scope["path"]:
                # first check if the collection exists
                if not await self._collection_exists(request, f"{self.request_ids['owner_id']}_{collection}"):
                    raise log_http_exception(
                        status_code=HTTP_404_NOT_FOUND,
                        detail=f"Collection {collection} does not exist.",
                    )

                # try to get the item if it is already part from the collection
                item = await self._get_item_from_collection(request)

                content, self.s3_files_to_be_deleted = self.s3_manager.update_stac_item_publication(
                    content,
                    request,
                    self.request_ids,
                    item,
                )
                if content:
                    if request.method == "POST":
                        content = timestamps_extension.set_timestamps_for_creation(content)
                        content = timestamps_extension.set_timestamps_for_insertion(content)
                    else:  # PUT
                        published = expires = ""
                        if item and item.get("properties"):
                            published = item["properties"].get("published", "")
                            expires = item["properties"].get("expires", "")
                        if not published and not expires:
                            raise log_http_exception(
                                status_code=HTTP_400_BAD_REQUEST,
                                detail=f"Item {content['id']} not found.",
                            )
                        content = timestamps_extension.set_timestamps_for_update(
                            content,
                            original_published=published,
                            original_expires=expires,
                        )
                    # If item doesn't contain a geometry/bbox, just fill with a default one.
                    if not content.get("geometry", None):
                        content["geometry"] = DEFAULT_GEOM
                    if not content.get("bbox", None):
                        content["bbox"] = DEFAULT_BBOX
                if hasattr(content, "status_code"):
                    return content

            # update request body if needed
            if content != original_content:
                request = self._override_request_body(request, content)

            return request  # pylint: disable=protected-access
        except KeyError as kerr_msg:
            raise log_http_exception(
                detail=f"Missing key in request body! {kerr_msg}",
                status_code=HTTP_400_BAD_REQUEST,
            ) from kerr_msg

manage_requests(request) async

Main function to dispatch the request pre-processing depending on which endpoint is called. Will pre-process the request using the function associated to the path called and return it.

Parameters:

Name Type Description Default
request Request

request received by the Catalog.

required

Returns:

Type Description
Request | Response

Request|Response: Request processed to be sent to stac-fastapi OR a response if the operation is not authorized

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
async def manage_requests(self, request: Request) -> Request | Response:
    """Main function to dispatch the request pre-processing depending on which endpoint is called.
    Will pre-process the request using the function associated to the path called and return it.

    Args:
        request (Request): request received by the Catalog.

    Returns:
        Request|Response: Request processed to be sent to stac-fastapi OR a response if the operation
            is not authorized
    """
    if request.method in ("POST", "PUT") and "/search" not in request.scope["path"]:
        # URL: POST / PUT: '/catalog/collections/{USER}:{COLLECTION}'
        # or '/catalog/collections/{USER}:{COLLECTION}/items'
        request_or_response = await self.manage_put_post_request(request)
        if hasattr(request_or_response, "status_code"):  # Unauthorized
            return cast(Response, request_or_response)
        request = request_or_response

    elif request.method == "DELETE":
        if not await self.manage_delete_request(request):
            raise log_http_exception(status_code=HTTP_401_UNAUTHORIZED, detail="Deletion not allowed.")

    elif "/search" in request.scope["path"]:
        # URL: GET: '/catalog/search'
        request_or_response = await self.manage_search_request(request)
        if hasattr(request_or_response, "status_code"):  # Unauthorized
            return cast(Response, request_or_response)
        request = request_or_response

    elif request.method == "GET" and request.scope["path"] == CATALOG_COLLECTIONS:
        # override default pgstac limit of 10 items if not explicitely set
        if "limit" not in request.query_params:
            request = self._override_request_query_string(request, {**request.query_params, "limit": 1000})

    elif request.method == "PATCH":
        request_or_response = await self.manage_patch_request(request)
        if hasattr(request_or_response, "status_code"):  # Unauthorized
            return cast(Response, request_or_response)
        request = request_or_response

    return request

manage_search_request(request) async

find the user in the filter parameter and add it to the collection name.

Parameters:

Name Type Description Default
request Request

the client request.

required

Returns:

Name Type Description
Request Request | JSONResponse

the new request with the collection name updated.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
async def manage_search_request(  # pylint: disable=too-many-statements,too-many-branches
    self,
    request: Request,
) -> Request | JSONResponse:
    """find the user in the filter parameter and add it to the
    collection name.

    Args:
        request Request: the client request.

    Returns:
        Request: the new request with the collection name updated.
    """
    # ---------- POST requests
    if request.method == "POST":
        content = await request.json()

        # Pre-processing of filter extensions
        if "filter" in content:
            content["filter"] = process_filter_extensions(content["filter"])

        # Management of priority for the assignation of the owner_id
        if not self.request_ids["owner_id"]:
            self.request_ids["owner_id"] = (
                (extract_owner_name_from_json_filter(content["filter"]) if "filter" in content else None)
                or content.get("owner")
                or get_user(self.request_ids["owner_id"], self.request_ids["user_login"])
            )

        # Add filter-lang option to the content if it doesn't already exist
        if "filter" in content:
            filter_lang = {"filter-lang": content.get("filter-lang", "cql2-json")}
            stac_filter = content.pop("filter")
            content = {
                **content,
                **filter_lang,
                "filter": stac_filter,
            }  # The "filter_lang" field has to be placed BEFORE the filter.

        # ----- Call /catalog/search with POST method endpoint
        if "collections" in content:
            # Check if each collection exist with their raw name, if not concatenate owner_id to the collection name
            for i, collection in enumerate(content["collections"]):
                if not await self._collection_exists(request, collection):
                    content["collections"][i] = f"{self.request_ids['owner_id']}_{collection}"
                    logger.debug(f"Using collection name: {content['collections'][i]}")
                    # Check the existence of the collection after concatenation of owner_id
                    if not await self._collection_exists(request, content["collections"][i]):
                        raise log_http_exception(
                            status_code=HTTP_404_NOT_FOUND,
                            detail=f"Collection {collection} not found.",
                        )

            self.request_ids["collection_ids"] = content["collections"]
            request = self._override_request_body(request, content)

    # ---------- GET requests
    elif request.method == "GET":
        # Get dictionary of query parameters
        query_params_dict = dict(request.query_params)

        # Update owner_id if it is not already defined from path parameters
        if not self.request_ids["owner_id"]:
            self.request_ids["owner_id"] = (
                (
                    extract_owner_name_from_text_filter(query_params_dict["filter"])
                    if "filter" in query_params_dict
                    else ""
                )
                or query_params_dict.get("owner")
                or get_user(self.request_ids["owner_id"], self.request_ids["user_login"])
            )

        # ----- Catch endpoint catalog/search + query parameters (e.g. /search?ids=S3_OLC&collections=titi)
        if "collections" in query_params_dict:
            coll_list = query_params_dict["collections"].split(",")

            # Check if each collection exist with their raw name, if not concatenate owner_id to the collection name
            for i, collection in enumerate(coll_list):
                if not await self._collection_exists(request, collection):
                    coll_list[i] = f"{self.request_ids['owner_id']}_{collection}"
                    logger.debug(f"Using collection name: {coll_list[i]}")
                    # Check the existence of the collection after concatenation of owner_id
                    if not await self._collection_exists(request, coll_list[i]):
                        raise log_http_exception(
                            status_code=HTTP_404_NOT_FOUND,
                            detail=f"Collection {collection} not found.",
                        )

            self.request_ids["collection_ids"] = coll_list
            query_params_dict["collections"] = ",".join(coll_list)
            request = self._override_request_query_string(request, query_params_dict)

    # Check that the collection from the request exists
    for collection in self.request_ids["collection_ids"]:
        if not await self._collection_exists(request, collection):
            raise log_http_exception(status_code=HTTP_404_NOT_FOUND, detail=f"Collection {collection} not found.")

    # Check authorisation in cluster mode
    if common_settings.CLUSTER_MODE and not get_authorisation(
        self.request_ids["collection_ids"],
        self.request_ids["auth_roles"],
        "read",
        self.request_ids["owner_id"],
        self.request_ids["user_login"],
        # When calling the /search endpoints, the catalog ids are always prefixed by their <owner>_
        owner_prefix=True,
    ):
        raise log_http_exception(status_code=HTTP_401_UNAUTHORIZED, detail="Unauthorized access.")
    return request

log_http_exception(*args, **kwargs)

Log error and return an HTTP exception to be raised by the caller

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
59
60
61
def log_http_exception(*args, **kwargs) -> type[HTTPException]:
    """Log error and return an HTTP exception to be raised by the caller"""
    return utils2.log_http_exception(logger, *args, **kwargs)