Skip to content

rs_server_catalog/middleware/request_manager.md

<< Back to index

Module to process the Requests sent by users to the Catalog before routing them to stac-fastapi.

CatalogRequestManager

Class to process the Requests sent by users to the Catalog before routing them to stac-fastapi. Each type of Response is managed in one of the functions.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
class CatalogRequestManager:
    """Class to process the Requests sent by users to the Catalog before routing them to stac-fastapi.
    Each type of Response is managed in one of the functions."""

    def __init__(self, client: CoreCrudClient, request_ids: dict[Any, Any]):
        self.client = client
        self.request_ids = request_ids
        self.s3_files_to_be_deleted: list = []

    @lru_cache
    def s3_manager(self):
        """Creates a cached instance of S3Manager for this class instance (self)."""
        return S3Manager()

    def _override_request_body(self, request: Request, content: Any) -> Request:
        """Update request body (better find the function that updates the body maybe?)"""
        request._body = json.dumps(content).encode("utf-8")  # pylint: disable=protected-access
        request._json = content  # pylint: disable=protected-access
        logger.debug("new request body and json: %s", request._body)  # pylint: disable=protected-access
        return request

    def _override_request_query_string(self, request: Request, query_params: dict) -> Request:
        """Update request query string"""
        request.scope["query_string"] = urlencode(query_params, doseq=True).encode("utf-8")
        logger.debug("new request query_string: %s", request.scope["query_string"])
        return request

    async def _collection_exists(self, request: Request, collection_id: str) -> bool:
        """Check if the collection exists.

        Returns:
            bool: True if the collection exists, False otherwise
        """
        try:
            await self.client.get_collection(collection_id, request)
            return True
        except Exception as e:  # pylint: disable=broad-exception-caught
            logger.error("Collection %s not found: %s", collection_id, e)
            return False

    async def _get_item_from_collection(self, request: Request):
        """Get an item from the collection.

        Args:
            request (Request): The request object.

        Returns:
            Optional[Dict]: The item from the collection if found, else None.
        """
        item_id = self.request_ids["item_id"]
        collection_id = f"{self.request_ids['owner_id']}_{self.request_ids['collection_ids'][0]}"
        try:
            item = await self.client.get_item(item_id=item_id, collection_id=collection_id, request=request)
            return item
        except NotFoundError:
            logger.info(f"The item '{item_id}' does not exist in collection '{collection_id}'")
            return None
        except Exception as e:  # pylint: disable=broad-exception-caught
            logger.exception(f"Exception: {e}")
            raise HTTPException(
                detail=f"Exception when trying to get the item {item_id} from the collection '{collection_id}'",
                status_code=HTTP_400_BAD_REQUEST,
            ) from e

    async def build_filelist_to_be_deleted(self, request):
        """Build the list of the s3 files that will be deleted if the request is successfull"""
        for ci in self.request_ids["collection_ids"]:
            collection_id = f"{self.request_ids['owner_id']}_{ci}"
            items = []
            try:
                if "/items" not in request.scope["path"]:
                    # this is the case for delete endpoint /collections/<collection_name>
                    # use pagination, otherwise a maximum of the default limit (10) items is returned
                    # NOTE: Unable to use the pagination from pgstac client. Temporary, use a limit of 100
                    token = None
                    while True:
                        items_collection = await self.client.item_collection(
                            request=request,
                            collection_id=collection_id,
                            limit=100,
                            token=token,
                        )
                        items.extend(items_collection.get("features", []))
                        # Check if there's a next token for pagination
                        token = get_token_for_pagination(items_collection)

                        if not token:
                            # No more pages left, break the loop
                            break
                else:
                    # this is the case for delete endpoint /collections/<collection_name>/items/<item_name>
                    item = await self.client.get_item(
                        item_id=self.request_ids["item_id"],
                        collection_id=collection_id,
                        request=request,
                    )
                    items = [item]
            except NotFoundError as nfe:
                logger.error(f"Failed to find the requested object to be deleted. {nfe}")
                return
            except KeyError as e:
                logger.error(f"Failed to build the list of items to be deleted due to missing key: {e}")
                return
            logger.debug(f"Found {len(items)} items: {items}")
            try:
                for item in items:
                    assets = item.get("assets", {})
                    for _, asset_info in assets.items():
                        s3_href = asset_info.get("href")
                        if s3_href:
                            self.s3_files_to_be_deleted.append(s3_href)
            except KeyError as e:
                logger.error(
                    f"Failed to build the list of S3 files to be deleted due to missing key in dictionary: {e}",
                )
                return
            logger.info(
                "Successfully built the list of S3 files to be deleted. "
                f"There are {len(self.s3_files_to_be_deleted)} files to be deleted",
            )

    async def manage_requests(self, request: Request) -> Request | Response:
        """Main function to dispatch the request pre-processing depending on which endpoint is called.
        Will pre-process the request using the function associated to the path called and return it.

        Args:
            request (Request): request received by the Catalog.

        Returns:
            Request|Response: Request processed to be sent to stac-fastapi OR a response if the operation
                is not authorized
        """
        if request.method in ("POST", "PUT") and "/search" not in request.scope["path"]:
            # URL: POST / PUT: '/catalog/collections/{USER}:{COLLECTION}'
            # or '/catalog/collections/{USER}:{COLLECTION}/items'
            request_or_response = await self.manage_put_post_request(request)
            if hasattr(request_or_response, "status_code"):  # Unauthorized
                return cast(Response, request_or_response)
            request = request_or_response

        elif request.method == "DELETE":
            if not await self.manage_delete_request(request):
                raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Deletion not allowed.")

        elif "/search" in request.scope["path"]:
            # URL: GET: '/catalog/search'
            request_or_response = await self.manage_search_request(request)
            if hasattr(request_or_response, "status_code"):  # Unauthorized
                return cast(Response, request_or_response)
            request = request_or_response

        elif request.method == "GET" and request.scope["path"] == CATALOG_COLLECTIONS:
            # override default pgstac limit of 10 items if not explicitely set
            if "limit" not in request.query_params:
                request = self._override_request_query_string(request, {**request.query_params, "limit": 1000})

        elif request.method == "PATCH":
            request_or_response = await self.manage_patch_request(request)
            if hasattr(request_or_response, "status_code"):  # Unauthorized
                return cast(Response, request_or_response)
            request = request_or_response

        return request

    async def manage_put_post_request(  # pylint: disable=too-many-statements,too-many-return-statements,too-many-branches  # noqa: E501
        self,
        request: Request,
    ) -> Request | JSONResponse:
        """Adapt the request body for the STAC endpoint.

        Args:
            request (Request): The Client request to be updated.

        Returns:
            Request: The request updated.
        """
        try:
            original_content = await request.json()
            content = copy.deepcopy(original_content)

            check_user_authorization(self.request_ids)

            if len(self.request_ids["collection_ids"]) > 1:
                raise HTTPException(
                    status_code=HTTP_400_BAD_REQUEST,
                    detail="Cannot create or update more than one collection !",
                )

            if len(self.request_ids["collection_ids"]) == 0:
                raise HTTPException(
                    status_code=HTTP_400_BAD_REQUEST,
                    detail="Cannot create or update -> no collection specified !",
                )

            collection = self.request_ids["collection_ids"][0]
            if (
                # POST collection
                request.scope["path"]
                == CATALOG_COLLECTIONS
            ) or (
                # PUT collection
                request.scope["path"]
                == f"{CATALOG_COLLECTIONS}/{self.request_ids['owner_id']}_{collection}"
            ):
                # Manage a collection creation. The apikey user should be the same as the owner
                # field in the body request. In other words, an apikey user cannot create a
                # collection owned by another user.
                # We don't care for local mode, any user may create / delete collection owned by another user
                if common_settings.CLUSTER_MODE and (self.request_ids["owner_id"] != self.request_ids["user_login"]):
                    error = f"The '{self.request_ids['user_login']}' user cannot create a \
collection owned by the '{self.request_ids['owner_id']}' user. Additionally, modifying the 'owner' \
field is not permitted also."
                    raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=error)

                content["id"] = owner_id_and_collection_id(self.request_ids["owner_id"], content["id"])
                if not content.get("owner"):
                    content["owner"] = self.request_ids["owner_id"]

                # See if there is already a collection with this ID. If yes, retrieve its "created" value.
                try:
                    existing_collection = await self.client.get_collection(content["id"], request)
                    date_of_creation = existing_collection.get("created", "")
                except Exception as e:  # pylint: disable=broad-exception-caught
                    logger.debug("Collection %s doesn't exist and will be created: %s", content["id"], e)
                    date_of_creation = ""

                # Update timestamps ("updated", and "created" if it's a new collection)
                content = timestamps_extension.set_timestamps_to_collection(content, original_created=date_of_creation)
                logger.debug(f"Handling for collection {content['id']}")
                # TODO update the links also?

            # The following section handles the request to create/update an item
            elif "/items" in request.scope["path"]:
                # first check if the collection exists
                if not await self._collection_exists(request, f"{self.request_ids['owner_id']}_{collection}"):
                    raise HTTPException(
                        status_code=HTTP_404_NOT_FOUND,
                        detail=f"Collection {collection} does not exist.",
                    )

                # try to get the item if it is already part from the collection
                item = await self._get_item_from_collection(request)
                logger.debug("Starting the update_stac_item_publication thread")
                content, self.s3_files_to_be_deleted = await asyncio.to_thread(
                    self.s3_manager().update_stac_item_publication,
                    content,
                    request,
                    self.request_ids,
                    item,
                )
                logger.debug("The update_stac_item_publication thread finished")
                if content:
                    if request.method == "POST":
                        content = timestamps_extension.set_timestamps_for_creation(content)
                        content = timestamps_extension.set_timestamps_for_insertion(content)
                    else:  # PUT
                        published = expires = ""
                        if item and item.get("properties"):
                            published = item["properties"].get("published", "")
                            expires = item["properties"].get("expires", "")
                        if not published and not expires:
                            raise HTTPException(
                                status_code=HTTP_400_BAD_REQUEST,
                                detail=f"Item {content['id']} not found.",
                            )
                        content = timestamps_extension.set_timestamps_for_update(
                            content,
                            original_published=published,
                            original_expires=expires,
                        )
                    # If item doesn't contain a geometry/bbox, just fill with a default one.
                    if not content.get("geometry", None):
                        content["geometry"] = DEFAULT_GEOM
                    if not content.get("bbox", None):
                        content["bbox"] = DEFAULT_BBOX
                if hasattr(content, "status_code"):
                    return content

            # update request body if needed
            if content != original_content:
                request = self._override_request_body(request, content)

            logger.debug(f"Sending back the response for {request.method} {request.scope['path']}")
            return request  # pylint: disable=protected-access
        except KeyError as kerr_msg:
            raise HTTPException(
                detail=f"Missing key in request body! {kerr_msg}",
                status_code=HTTP_400_BAD_REQUEST,
            ) from kerr_msg

    async def manage_delete_request(self, request: Request):
        """Check if the deletion is allowed.

        Args:
            request (Request): The client request.

        Raises:
            HTTPException: If the user is not authenticated.

        Returns:
            bool: Return True if the deletion is allowed, False otherwise.
        """
        user_login = getpass.getuser()
        auth_roles = []

        if common_settings.CLUSTER_MODE:  # Get the list of access and the user_login calling the endpoint.
            auth_roles = request.state.auth_roles
            user_login = request.state.user_login

        if (  # If we are in cluster mode and the user_login is not authorized
            # to this endpoint returns a HTTP_401_UNAUTHORIZED status.
            common_settings.CLUSTER_MODE
            and self.request_ids["collection_ids"]
            and self.request_ids["owner_id"]
            and not get_authorisation(
                self.request_ids["collection_ids"],
                auth_roles,
                "write",
                self.request_ids["owner_id"],
                user_login,
            )
        ):
            return False

        # Manage a collection deletion. The apikey user (or local user if in local mode)
        # should be the same as the owner field in the body request. In other words, the
        # apikey user cannot delete a collection owned by another user
        # we don't care for local mode, any user may create / delete collection owned by another user
        if (
            (  # DELETE collection
                request.scope["path"]
                == f"{CATALOG_COLLECTIONS}/{self.request_ids['owner_id']}_{self.request_ids['collection_ids'][0]}"
            )
            and common_settings.CLUSTER_MODE
            and (self.request_ids["owner_id"] != user_login)
        ):
            logger.error(
                f"The '{user_login}' user cannot delete a \
collection owned by the '{self.request_ids['owner_id']}' user",
            )
            return False

        await self.build_filelist_to_be_deleted(request)
        return True

    async def manage_search_request(  # pylint: disable=too-many-statements,too-many-branches
        self,
        request: Request,
    ) -> Request | JSONResponse:
        """find the user in the filter parameter and add it to the
        collection name.

        Args:
            request Request: the client request.

        Returns:
            Request: the new request with the collection name updated.
        """
        # ---------- POST requests
        if request.method == "POST":
            content = await request.json()
            original_content = copy.deepcopy(content)

            # Normalize externalIds filters coming from UI (e.g., "=" -> a_overlaps).
            normalized_filter, normalized_lang, changed = normalize_external_ids_filter_value(
                content.get("filter"),
                content.get("filter-lang", "cql2-json"),
            )
            if changed:
                content["filter"] = normalized_filter
                content["filter-lang"] = normalized_lang

            # Build a CQL2 filter for externalIds (array of objects) if requested.
            external_ids_filter = build_external_ids_filter(content.pop("externalIds", None))
            if external_ids_filter is not None:
                existing_filter = parse_filter_to_json(
                    content.get("filter"),
                    content.get("filter-lang", "cql2-json"),
                )
                content["filter"] = combine_filters(existing_filter, external_ids_filter)
                content["filter-lang"] = "cql2-json"

            # Pre-processing of filter extensions
            if "filter" in content:
                content["filter"] = process_filter_extensions(content["filter"])

            # Management of priority for the assignation of the owner_id
            if not self.request_ids["owner_id"]:
                self.request_ids["owner_id"] = (
                    (extract_owner_name_from_json_filter(content["filter"]) if "filter" in content else None)
                    or content.get("owner")
                    or get_user(self.request_ids["owner_id"], self.request_ids["user_login"])
                )

            # Ensure normalized filters are serialized in request body.
            # Add filter-lang option to the content if it doesn't already exist
            if "filter" in content:
                filter_lang = {"filter-lang": content.get("filter-lang", "cql2-json")}
                stac_filter = content.pop("filter")
                content = {
                    **content,
                    **filter_lang,
                    "filter": stac_filter,
                }  # The "filter_lang" field has to be placed BEFORE the filter.

            # ----- Call /catalog/search with POST method endpoint
            if "collections" in content:
                # Check if each collection exist with their raw name, if not concatenate owner_id to the collection name
                for i, collection in enumerate(content["collections"]):
                    if not await self._collection_exists(request, collection):
                        content["collections"][i] = f"{self.request_ids['owner_id']}_{collection}"
                        logger.debug(f"Using collection name: {content['collections'][i]}")
                        # Check the existence of the collection after concatenation of owner_id
                        if not await self._collection_exists(request, content["collections"][i]):
                            raise HTTPException(
                                status_code=HTTP_404_NOT_FOUND,
                                detail=f"Collection {collection} not found.",
                            )

                self.request_ids["collection_ids"] = content["collections"]
            if content != original_content:
                request = self._override_request_body(request, content)

        # ---------- GET requests
        elif request.method == "GET":
            # Get dictionary of query parameters
            query_params_dict = dict(request.query_params)
            original_query_params = dict(query_params_dict)

            # Update owner_id if it is not already defined from path parameters
            if not self.request_ids["owner_id"]:
                self.request_ids["owner_id"] = (
                    (
                        extract_owner_name_from_text_filter(query_params_dict["filter"])
                        if "filter" in query_params_dict
                        else ""
                    )
                    or query_params_dict.get("owner")
                    or get_user(self.request_ids["owner_id"], self.request_ids["user_login"])
                )

            # Normalize externalIds filters coming from UI (e.g., "=" -> a_overlaps).
            normalized_filter, normalized_lang, changed = normalize_external_ids_filter_value(
                query_params_dict.get("filter"),
                query_params_dict.get("filter-lang", "cql2-json"),
            )
            if changed:
                query_params_dict["filter"] = json.dumps(normalized_filter)
                query_params_dict["filter-lang"] = normalized_lang

            # Build a CQL2 filter for externalIds (array of objects) if requested.
            external_ids_filter = build_external_ids_filter(query_params_dict.pop("externalIds", None))
            if external_ids_filter is not None:
                existing_filter = parse_filter_to_json(
                    query_params_dict.get("filter"),
                    query_params_dict.get("filter-lang", "cql2-json"),
                )
                combined_filter = combine_filters(existing_filter, external_ids_filter)
                query_params_dict["filter"] = json.dumps(combined_filter)
                query_params_dict["filter-lang"] = "cql2-json"

            # ----- Catch endpoint catalog/search + query parameters (e.g. /search?ids=S3_OLC&collections=titi)
            if "collections" in query_params_dict:
                coll_list = query_params_dict["collections"].split(",")

                # Check if each collection exist with their raw name, if not concatenate owner_id to the collection name
                for i, collection in enumerate(coll_list):
                    if not await self._collection_exists(request, collection):
                        coll_list[i] = f"{self.request_ids['owner_id']}_{collection}"
                        logger.debug(f"Using collection name: {coll_list[i]}")
                        # Check the existence of the collection after concatenation of owner_id
                        if not await self._collection_exists(request, coll_list[i]):
                            raise HTTPException(
                                status_code=HTTP_404_NOT_FOUND,
                                detail=f"Collection {collection} not found.",
                            )

                self.request_ids["collection_ids"] = coll_list
                query_params_dict["collections"] = ",".join(coll_list)
            if query_params_dict != original_query_params:
                request = self._override_request_query_string(request, query_params_dict)

        # Check that the collection from the request exists
        for collection in self.request_ids["collection_ids"]:
            if not await self._collection_exists(request, collection):
                raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Collection {collection} not found.")

        # Check authorisation in cluster mode
        if common_settings.CLUSTER_MODE:
            get_authorisation(
                self.request_ids["collection_ids"],
                self.request_ids["auth_roles"],
                "read",
                self.request_ids["owner_id"],
                self.request_ids["user_login"],
                # When calling the /search endpoints, the catalog ids are always prefixed by their <owner>_
                owner_prefix=True,
                raise_if_unauthorized=True,
            )
        return request

    async def manage_patch_request(self, request: Request):
        """
        Pre-processing of a PATCH request to the Catalog.
        Does authorization checks and updates the "updated" field of the item to patch.

        Args:
            request (Request): The request from the Client

        Returns:
            Request: Updated request
        """
        try:
            original_content = await request.json()
            content = copy.deepcopy(original_content)

            check_user_authorization(self.request_ids)

            # Update "updated" timestamp (different field if it is an item or a collection)
            is_item = "/items/" in request.scope["path"]
            content = timestamps_extension.set_updated_timestamp_to_now(content, is_item=is_item)

            request = self._override_request_body(request, content)
            return request

        except KeyError as kerr_msg:
            raise HTTPException(
                detail=f"Missing key in request body! {kerr_msg}",
                status_code=HTTP_400_BAD_REQUEST,
            ) from kerr_msg

build_filelist_to_be_deleted(request) async

Build the list of the s3 files that will be deleted if the request is successfull

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
async def build_filelist_to_be_deleted(self, request):
    """Build the list of the s3 files that will be deleted if the request is successfull"""
    for ci in self.request_ids["collection_ids"]:
        collection_id = f"{self.request_ids['owner_id']}_{ci}"
        items = []
        try:
            if "/items" not in request.scope["path"]:
                # this is the case for delete endpoint /collections/<collection_name>
                # use pagination, otherwise a maximum of the default limit (10) items is returned
                # NOTE: Unable to use the pagination from pgstac client. Temporary, use a limit of 100
                token = None
                while True:
                    items_collection = await self.client.item_collection(
                        request=request,
                        collection_id=collection_id,
                        limit=100,
                        token=token,
                    )
                    items.extend(items_collection.get("features", []))
                    # Check if there's a next token for pagination
                    token = get_token_for_pagination(items_collection)

                    if not token:
                        # No more pages left, break the loop
                        break
            else:
                # this is the case for delete endpoint /collections/<collection_name>/items/<item_name>
                item = await self.client.get_item(
                    item_id=self.request_ids["item_id"],
                    collection_id=collection_id,
                    request=request,
                )
                items = [item]
        except NotFoundError as nfe:
            logger.error(f"Failed to find the requested object to be deleted. {nfe}")
            return
        except KeyError as e:
            logger.error(f"Failed to build the list of items to be deleted due to missing key: {e}")
            return
        logger.debug(f"Found {len(items)} items: {items}")
        try:
            for item in items:
                assets = item.get("assets", {})
                for _, asset_info in assets.items():
                    s3_href = asset_info.get("href")
                    if s3_href:
                        self.s3_files_to_be_deleted.append(s3_href)
        except KeyError as e:
            logger.error(
                f"Failed to build the list of S3 files to be deleted due to missing key in dictionary: {e}",
            )
            return
        logger.info(
            "Successfully built the list of S3 files to be deleted. "
            f"There are {len(self.s3_files_to_be_deleted)} files to be deleted",
        )

manage_delete_request(request) async

Check if the deletion is allowed.

Parameters:

Name Type Description Default
request Request

The client request.

required

Raises:

Type Description
HTTPException

If the user is not authenticated.

Returns:

Name Type Description
bool

Return True if the deletion is allowed, False otherwise.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
    async def manage_delete_request(self, request: Request):
        """Check if the deletion is allowed.

        Args:
            request (Request): The client request.

        Raises:
            HTTPException: If the user is not authenticated.

        Returns:
            bool: Return True if the deletion is allowed, False otherwise.
        """
        user_login = getpass.getuser()
        auth_roles = []

        if common_settings.CLUSTER_MODE:  # Get the list of access and the user_login calling the endpoint.
            auth_roles = request.state.auth_roles
            user_login = request.state.user_login

        if (  # If we are in cluster mode and the user_login is not authorized
            # to this endpoint returns a HTTP_401_UNAUTHORIZED status.
            common_settings.CLUSTER_MODE
            and self.request_ids["collection_ids"]
            and self.request_ids["owner_id"]
            and not get_authorisation(
                self.request_ids["collection_ids"],
                auth_roles,
                "write",
                self.request_ids["owner_id"],
                user_login,
            )
        ):
            return False

        # Manage a collection deletion. The apikey user (or local user if in local mode)
        # should be the same as the owner field in the body request. In other words, the
        # apikey user cannot delete a collection owned by another user
        # we don't care for local mode, any user may create / delete collection owned by another user
        if (
            (  # DELETE collection
                request.scope["path"]
                == f"{CATALOG_COLLECTIONS}/{self.request_ids['owner_id']}_{self.request_ids['collection_ids'][0]}"
            )
            and common_settings.CLUSTER_MODE
            and (self.request_ids["owner_id"] != user_login)
        ):
            logger.error(
                f"The '{user_login}' user cannot delete a \
collection owned by the '{self.request_ids['owner_id']}' user",
            )
            return False

        await self.build_filelist_to_be_deleted(request)
        return True

manage_patch_request(request) async

Pre-processing of a PATCH request to the Catalog. Does authorization checks and updates the "updated" field of the item to patch.

Parameters:

Name Type Description Default
request Request

The request from the Client

required

Returns:

Name Type Description
Request

Updated request

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
async def manage_patch_request(self, request: Request):
    """
    Pre-processing of a PATCH request to the Catalog.
    Does authorization checks and updates the "updated" field of the item to patch.

    Args:
        request (Request): The request from the Client

    Returns:
        Request: Updated request
    """
    try:
        original_content = await request.json()
        content = copy.deepcopy(original_content)

        check_user_authorization(self.request_ids)

        # Update "updated" timestamp (different field if it is an item or a collection)
        is_item = "/items/" in request.scope["path"]
        content = timestamps_extension.set_updated_timestamp_to_now(content, is_item=is_item)

        request = self._override_request_body(request, content)
        return request

    except KeyError as kerr_msg:
        raise HTTPException(
            detail=f"Missing key in request body! {kerr_msg}",
            status_code=HTTP_400_BAD_REQUEST,
        ) from kerr_msg

manage_put_post_request(request) async

Adapt the request body for the STAC endpoint.

Parameters:

Name Type Description Default
request Request

The Client request to be updated.

required

Returns:

Name Type Description
Request Request | JSONResponse

The request updated.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
    async def manage_put_post_request(  # pylint: disable=too-many-statements,too-many-return-statements,too-many-branches  # noqa: E501
        self,
        request: Request,
    ) -> Request | JSONResponse:
        """Adapt the request body for the STAC endpoint.

        Args:
            request (Request): The Client request to be updated.

        Returns:
            Request: The request updated.
        """
        try:
            original_content = await request.json()
            content = copy.deepcopy(original_content)

            check_user_authorization(self.request_ids)

            if len(self.request_ids["collection_ids"]) > 1:
                raise HTTPException(
                    status_code=HTTP_400_BAD_REQUEST,
                    detail="Cannot create or update more than one collection !",
                )

            if len(self.request_ids["collection_ids"]) == 0:
                raise HTTPException(
                    status_code=HTTP_400_BAD_REQUEST,
                    detail="Cannot create or update -> no collection specified !",
                )

            collection = self.request_ids["collection_ids"][0]
            if (
                # POST collection
                request.scope["path"]
                == CATALOG_COLLECTIONS
            ) or (
                # PUT collection
                request.scope["path"]
                == f"{CATALOG_COLLECTIONS}/{self.request_ids['owner_id']}_{collection}"
            ):
                # Manage a collection creation. The apikey user should be the same as the owner
                # field in the body request. In other words, an apikey user cannot create a
                # collection owned by another user.
                # We don't care for local mode, any user may create / delete collection owned by another user
                if common_settings.CLUSTER_MODE and (self.request_ids["owner_id"] != self.request_ids["user_login"]):
                    error = f"The '{self.request_ids['user_login']}' user cannot create a \
collection owned by the '{self.request_ids['owner_id']}' user. Additionally, modifying the 'owner' \
field is not permitted also."
                    raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=error)

                content["id"] = owner_id_and_collection_id(self.request_ids["owner_id"], content["id"])
                if not content.get("owner"):
                    content["owner"] = self.request_ids["owner_id"]

                # See if there is already a collection with this ID. If yes, retrieve its "created" value.
                try:
                    existing_collection = await self.client.get_collection(content["id"], request)
                    date_of_creation = existing_collection.get("created", "")
                except Exception as e:  # pylint: disable=broad-exception-caught
                    logger.debug("Collection %s doesn't exist and will be created: %s", content["id"], e)
                    date_of_creation = ""

                # Update timestamps ("updated", and "created" if it's a new collection)
                content = timestamps_extension.set_timestamps_to_collection(content, original_created=date_of_creation)
                logger.debug(f"Handling for collection {content['id']}")
                # TODO update the links also?

            # The following section handles the request to create/update an item
            elif "/items" in request.scope["path"]:
                # first check if the collection exists
                if not await self._collection_exists(request, f"{self.request_ids['owner_id']}_{collection}"):
                    raise HTTPException(
                        status_code=HTTP_404_NOT_FOUND,
                        detail=f"Collection {collection} does not exist.",
                    )

                # try to get the item if it is already part from the collection
                item = await self._get_item_from_collection(request)
                logger.debug("Starting the update_stac_item_publication thread")
                content, self.s3_files_to_be_deleted = await asyncio.to_thread(
                    self.s3_manager().update_stac_item_publication,
                    content,
                    request,
                    self.request_ids,
                    item,
                )
                logger.debug("The update_stac_item_publication thread finished")
                if content:
                    if request.method == "POST":
                        content = timestamps_extension.set_timestamps_for_creation(content)
                        content = timestamps_extension.set_timestamps_for_insertion(content)
                    else:  # PUT
                        published = expires = ""
                        if item and item.get("properties"):
                            published = item["properties"].get("published", "")
                            expires = item["properties"].get("expires", "")
                        if not published and not expires:
                            raise HTTPException(
                                status_code=HTTP_400_BAD_REQUEST,
                                detail=f"Item {content['id']} not found.",
                            )
                        content = timestamps_extension.set_timestamps_for_update(
                            content,
                            original_published=published,
                            original_expires=expires,
                        )
                    # If item doesn't contain a geometry/bbox, just fill with a default one.
                    if not content.get("geometry", None):
                        content["geometry"] = DEFAULT_GEOM
                    if not content.get("bbox", None):
                        content["bbox"] = DEFAULT_BBOX
                if hasattr(content, "status_code"):
                    return content

            # update request body if needed
            if content != original_content:
                request = self._override_request_body(request, content)

            logger.debug(f"Sending back the response for {request.method} {request.scope['path']}")
            return request  # pylint: disable=protected-access
        except KeyError as kerr_msg:
            raise HTTPException(
                detail=f"Missing key in request body! {kerr_msg}",
                status_code=HTTP_400_BAD_REQUEST,
            ) from kerr_msg

manage_requests(request) async

Main function to dispatch the request pre-processing depending on which endpoint is called. Will pre-process the request using the function associated to the path called and return it.

Parameters:

Name Type Description Default
request Request

request received by the Catalog.

required

Returns:

Type Description
Request | Response

Request|Response: Request processed to be sent to stac-fastapi OR a response if the operation is not authorized

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
async def manage_requests(self, request: Request) -> Request | Response:
    """Main function to dispatch the request pre-processing depending on which endpoint is called.
    Will pre-process the request using the function associated to the path called and return it.

    Args:
        request (Request): request received by the Catalog.

    Returns:
        Request|Response: Request processed to be sent to stac-fastapi OR a response if the operation
            is not authorized
    """
    if request.method in ("POST", "PUT") and "/search" not in request.scope["path"]:
        # URL: POST / PUT: '/catalog/collections/{USER}:{COLLECTION}'
        # or '/catalog/collections/{USER}:{COLLECTION}/items'
        request_or_response = await self.manage_put_post_request(request)
        if hasattr(request_or_response, "status_code"):  # Unauthorized
            return cast(Response, request_or_response)
        request = request_or_response

    elif request.method == "DELETE":
        if not await self.manage_delete_request(request):
            raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Deletion not allowed.")

    elif "/search" in request.scope["path"]:
        # URL: GET: '/catalog/search'
        request_or_response = await self.manage_search_request(request)
        if hasattr(request_or_response, "status_code"):  # Unauthorized
            return cast(Response, request_or_response)
        request = request_or_response

    elif request.method == "GET" and request.scope["path"] == CATALOG_COLLECTIONS:
        # override default pgstac limit of 10 items if not explicitely set
        if "limit" not in request.query_params:
            request = self._override_request_query_string(request, {**request.query_params, "limit": 1000})

    elif request.method == "PATCH":
        request_or_response = await self.manage_patch_request(request)
        if hasattr(request_or_response, "status_code"):  # Unauthorized
            return cast(Response, request_or_response)
        request = request_or_response

    return request

manage_search_request(request) async

find the user in the filter parameter and add it to the collection name.

Parameters:

Name Type Description Default
request Request

the client request.

required

Returns:

Name Type Description
Request Request | JSONResponse

the new request with the collection name updated.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
async def manage_search_request(  # pylint: disable=too-many-statements,too-many-branches
    self,
    request: Request,
) -> Request | JSONResponse:
    """find the user in the filter parameter and add it to the
    collection name.

    Args:
        request Request: the client request.

    Returns:
        Request: the new request with the collection name updated.
    """
    # ---------- POST requests
    if request.method == "POST":
        content = await request.json()
        original_content = copy.deepcopy(content)

        # Normalize externalIds filters coming from UI (e.g., "=" -> a_overlaps).
        normalized_filter, normalized_lang, changed = normalize_external_ids_filter_value(
            content.get("filter"),
            content.get("filter-lang", "cql2-json"),
        )
        if changed:
            content["filter"] = normalized_filter
            content["filter-lang"] = normalized_lang

        # Build a CQL2 filter for externalIds (array of objects) if requested.
        external_ids_filter = build_external_ids_filter(content.pop("externalIds", None))
        if external_ids_filter is not None:
            existing_filter = parse_filter_to_json(
                content.get("filter"),
                content.get("filter-lang", "cql2-json"),
            )
            content["filter"] = combine_filters(existing_filter, external_ids_filter)
            content["filter-lang"] = "cql2-json"

        # Pre-processing of filter extensions
        if "filter" in content:
            content["filter"] = process_filter_extensions(content["filter"])

        # Management of priority for the assignation of the owner_id
        if not self.request_ids["owner_id"]:
            self.request_ids["owner_id"] = (
                (extract_owner_name_from_json_filter(content["filter"]) if "filter" in content else None)
                or content.get("owner")
                or get_user(self.request_ids["owner_id"], self.request_ids["user_login"])
            )

        # Ensure normalized filters are serialized in request body.
        # Add filter-lang option to the content if it doesn't already exist
        if "filter" in content:
            filter_lang = {"filter-lang": content.get("filter-lang", "cql2-json")}
            stac_filter = content.pop("filter")
            content = {
                **content,
                **filter_lang,
                "filter": stac_filter,
            }  # The "filter_lang" field has to be placed BEFORE the filter.

        # ----- Call /catalog/search with POST method endpoint
        if "collections" in content:
            # Check if each collection exist with their raw name, if not concatenate owner_id to the collection name
            for i, collection in enumerate(content["collections"]):
                if not await self._collection_exists(request, collection):
                    content["collections"][i] = f"{self.request_ids['owner_id']}_{collection}"
                    logger.debug(f"Using collection name: {content['collections'][i]}")
                    # Check the existence of the collection after concatenation of owner_id
                    if not await self._collection_exists(request, content["collections"][i]):
                        raise HTTPException(
                            status_code=HTTP_404_NOT_FOUND,
                            detail=f"Collection {collection} not found.",
                        )

            self.request_ids["collection_ids"] = content["collections"]
        if content != original_content:
            request = self._override_request_body(request, content)

    # ---------- GET requests
    elif request.method == "GET":
        # Get dictionary of query parameters
        query_params_dict = dict(request.query_params)
        original_query_params = dict(query_params_dict)

        # Update owner_id if it is not already defined from path parameters
        if not self.request_ids["owner_id"]:
            self.request_ids["owner_id"] = (
                (
                    extract_owner_name_from_text_filter(query_params_dict["filter"])
                    if "filter" in query_params_dict
                    else ""
                )
                or query_params_dict.get("owner")
                or get_user(self.request_ids["owner_id"], self.request_ids["user_login"])
            )

        # Normalize externalIds filters coming from UI (e.g., "=" -> a_overlaps).
        normalized_filter, normalized_lang, changed = normalize_external_ids_filter_value(
            query_params_dict.get("filter"),
            query_params_dict.get("filter-lang", "cql2-json"),
        )
        if changed:
            query_params_dict["filter"] = json.dumps(normalized_filter)
            query_params_dict["filter-lang"] = normalized_lang

        # Build a CQL2 filter for externalIds (array of objects) if requested.
        external_ids_filter = build_external_ids_filter(query_params_dict.pop("externalIds", None))
        if external_ids_filter is not None:
            existing_filter = parse_filter_to_json(
                query_params_dict.get("filter"),
                query_params_dict.get("filter-lang", "cql2-json"),
            )
            combined_filter = combine_filters(existing_filter, external_ids_filter)
            query_params_dict["filter"] = json.dumps(combined_filter)
            query_params_dict["filter-lang"] = "cql2-json"

        # ----- Catch endpoint catalog/search + query parameters (e.g. /search?ids=S3_OLC&collections=titi)
        if "collections" in query_params_dict:
            coll_list = query_params_dict["collections"].split(",")

            # Check if each collection exist with their raw name, if not concatenate owner_id to the collection name
            for i, collection in enumerate(coll_list):
                if not await self._collection_exists(request, collection):
                    coll_list[i] = f"{self.request_ids['owner_id']}_{collection}"
                    logger.debug(f"Using collection name: {coll_list[i]}")
                    # Check the existence of the collection after concatenation of owner_id
                    if not await self._collection_exists(request, coll_list[i]):
                        raise HTTPException(
                            status_code=HTTP_404_NOT_FOUND,
                            detail=f"Collection {collection} not found.",
                        )

            self.request_ids["collection_ids"] = coll_list
            query_params_dict["collections"] = ",".join(coll_list)
        if query_params_dict != original_query_params:
            request = self._override_request_query_string(request, query_params_dict)

    # Check that the collection from the request exists
    for collection in self.request_ids["collection_ids"]:
        if not await self._collection_exists(request, collection):
            raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Collection {collection} not found.")

    # Check authorisation in cluster mode
    if common_settings.CLUSTER_MODE:
        get_authorisation(
            self.request_ids["collection_ids"],
            self.request_ids["auth_roles"],
            "read",
            self.request_ids["owner_id"],
            self.request_ids["user_login"],
            # When calling the /search endpoints, the catalog ids are always prefixed by their <owner>_
            owner_prefix=True,
            raise_if_unauthorized=True,
        )
    return request

s3_manager() cached

Creates a cached instance of S3Manager for this class instance (self).

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
215
216
217
218
@lru_cache
def s3_manager(self):
    """Creates a cached instance of S3Manager for this class instance (self)."""
    return S3Manager()

build_external_ids_filter(raw)

Create a CQL2 filter (a_overlaps) for externalIds tokens.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
101
102
103
104
105
106
107
def build_external_ids_filter(raw: Any) -> dict | None:
    """Create a CQL2 filter (a_overlaps) for externalIds tokens."""
    tokens = build_external_ids_tokens(raw)
    if not tokens:
        return None
    # pgstac expects array overlap when querying token arrays.
    return {"op": "a_overlaps", "args": [{"property": "externalIds"}, tokens]}

build_external_ids_tokens(raw)

Normalize externalIds values into tokens used by pgstac filtering.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def build_external_ids_tokens(raw: Any) -> list[str]:
    """Normalize externalIds values into tokens used by pgstac filtering."""
    tokens: list[str] = []
    seen: set[str] = set()
    for part in iter_external_id_parts(raw):
        token = None
        if ":" in part:
            scheme, value = part.split(":", 1)
            scheme = scheme.strip()
            value = value.strip()
            # Keep scheme:value, scheme-only, or value-only depending on input form.
            if scheme and value:
                token = f"{scheme}:{value}"
            elif scheme and not value:
                token = scheme
            elif value and not scheme:
                token = value
        else:
            # No scheme provided, keep raw value.
            token = part
        if token and token not in seen:
            tokens.append(token)
            seen.add(token)
    return tokens

combine_filters(existing, extra)

Combine two CQL2 filters with AND.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
141
142
143
144
145
def combine_filters(existing: dict | None, extra: dict) -> dict:
    """Combine two CQL2 filters with AND."""
    if existing is None:
        return extra
    return {"op": "and", "args": [existing, extra]}

filter_has_external_ids(filter_json)

Check if a CQL2 filter tree references the externalIds property.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
148
149
150
151
152
153
154
155
156
157
158
def filter_has_external_ids(filter_json: Any) -> bool:
    """Check if a CQL2 filter tree references the externalIds property."""
    if isinstance(filter_json, dict):
        if filter_json.get("property") == "externalIds":
            return True
        # Recursively scan nested operations/arguments.
        return any(filter_has_external_ids(value) for value in filter_json.values())
    if isinstance(filter_json, list):
        # Lists can hold nested filter nodes.
        return any(filter_has_external_ids(item) for item in filter_json)
    return False

iter_external_id_parts(raw)

Split externalIds input (string/list) into clean, comma-separated tokens.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
62
63
64
65
66
67
68
69
70
71
72
def iter_external_id_parts(raw: Any) -> list[str]:
    """Split externalIds input (string/list) into clean, comma-separated tokens."""
    parts: list[str] = []
    values = raw if isinstance(raw, list) else [raw]
    for value in values:
        # Allow callers to pass a single string with comma-separated ids.
        for part in str(value or "").split(","):
            part = part.strip()
            if part:
                parts.append(part)
    return parts

normalize_external_ids_filter_value(raw_filter, filter_lang)

Normalize externalIds filters and return (filter, lang, changed).

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
191
192
193
194
195
196
197
198
199
200
201
202
203
def normalize_external_ids_filter_value(raw_filter: Any, filter_lang: str) -> tuple[Any, str, bool]:
    """Normalize externalIds filters and return (filter, lang, changed)."""
    if raw_filter is None:
        return raw_filter, filter_lang, False
    if isinstance(raw_filter, str) and "externalIds" not in raw_filter:
        return raw_filter, filter_lang, False
    # Parse to JSON so we can rewrite externalIds operators for pgstac.
    filter_json = parse_filter_to_json(raw_filter, filter_lang)
    if filter_json is None or not filter_has_external_ids(filter_json):
        return raw_filter, filter_lang, False
    # Convert externalIds comparisons to array-overlap filters (a_overlaps).
    normalized = normalize_external_ids_in_filter(filter_json)
    return normalized, "cql2-json", True

normalize_external_ids_in_filter(filter_json)

Rewrite externalIds comparisons into array-overlap filters.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def normalize_external_ids_in_filter(filter_json: dict) -> dict:
    """Rewrite externalIds comparisons into array-overlap filters."""
    if not isinstance(filter_json, dict):
        return filter_json
    op = filter_json.get("op")
    if op in ("and", "or", "not"):
        # Walk the boolean tree and normalize only the externalIds leaf comparisons.
        args = filter_json.get("args", [])
        if isinstance(args, list):
            return {**filter_json, "args": [normalize_external_ids_in_filter(arg) for arg in args]}
        return filter_json
    if op in ("=", "==", "eq", "in"):
        # STAC Browser sends "externalIds = <uuid>", but pgstac stores externalIds as an array of tokens.
        # Using "=" against an array yields no matches, so we convert it to a_overlaps on token list.
        args = filter_json.get("args", [])
        if isinstance(args, list) and len(args) == 2:
            left, right = args
            if isinstance(left, dict) and left.get("property") == "externalIds":
                # Normalize raw values (string, list, comma-separated) to tokens.
                tokens = build_external_ids_tokens(right)
                if tokens:
                    return {"op": "a_overlaps", "args": [{"property": "externalIds"}, tokens]}
            if isinstance(right, dict) and right.get("property") == "externalIds":
                # Support the (value, property) argument order too.
                tokens = build_external_ids_tokens(left)
                if tokens:
                    return {"op": "a_overlaps", "args": [{"property": "externalIds"}, tokens]}
    return filter_json

parse_filter_to_json(raw_filter, filter_lang)

Normalize a CQL2 filter (text or json) to CQL2-JSON.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/request_manager.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def parse_filter_to_json(raw_filter: Any, filter_lang: str) -> dict | None:
    """Normalize a CQL2 filter (text or json) to CQL2-JSON."""
    if raw_filter is None:
        return None
    if isinstance(raw_filter, dict):
        return raw_filter
    if isinstance(raw_filter, str):
        try:
            if filter_lang == "cql2-text":
                # cql2 exposes either parse_text or parse depending on version.
                parser = getattr(cql2, "parse_text", None) or getattr(cql2, "parse", None)
                if parser is None or not callable(parser):
                    raise HTTPException(
                        status_code=HTTP_400_BAD_REQUEST,
                        detail="CQL2 text parser is not available.",
                    )
                cql2_text_parser = cast(Callable[[str], Any], parser)
                # pylint can't infer the callable from getattr; runtime is safe after callable() check.
                return cql2_text_parser(raw_filter).to_json()  # pylint: disable=not-callable
            return json.loads(raw_filter)
        except Exception as exc:  # pylint: disable=broad-exception-caught
            raise HTTPException(
                status_code=HTTP_400_BAD_REQUEST,
                detail=(
                    "Invalid filter format for externalIds search: "
                    f"raw_filter={raw_filter!r}, filter_lang={filter_lang!r}"
                ),
            ) from exc
    return None