Skip to content

rs_server_catalog/middleware/response_manager.md

<< Back to index

Module to process the Responses returned by stac-fastapi for the Catalog middleware.

CatalogResponseManager

Class to process the Responses returned by stac-fastapi for the Catalog middleware. Each type of Response is managed in one of the functions.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/response_manager.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
class CatalogResponseManager:
    """Class to process the Responses returned by stac-fastapi for the Catalog middleware.
    Each type of Response is managed in one of the functions."""

    def __init__(
        self,
        client: CoreCrudClient,
        request_ids: dict[Any, Any],
        s3_files_to_be_deleted: list[str] | None = None,
    ):
        self.client = client
        self.request_ids = request_ids
        self.s3_manager = S3Manager()
        self.s3_files_to_be_deleted = s3_files_to_be_deleted or []

    async def manage_responses(
        self,
        request: Request,
        streaming_response: StreamingResponse,
    ) -> Response:
        """Manage responses sent by stac-fastpi after dispatch and before sending it to the user.

        Args:
            request (Request): Original request sent to stac-fastapi
            streaming_response (StreamingResponse): Response returned by stac-fastapi

        Returns:
            Response: HTTP Response
        """

        # Don't forward responses that fail.
        # NOTE: the 30x (redirect responses) are used by the oauth2 authentication.
        status_code = streaming_response.status_code
        if status_code not in (HTTP_200_OK, HTTP_201_CREATED, HTTP_302_FOUND, HTTP_307_TEMPORARY_REDIRECT):

            # Read the body. WARNING: after this, the body cannot be read a second time.
            body = [chunk async for chunk in streaming_response.body_iterator]
            response_content = json.loads(b"".join(body).decode())  # type:ignore
            logger.debug("response: %d - %s", streaming_response.status_code, response_content)
            self.s3_manager.clear_catalog_bucket(response_content)

            # GET: '/catalog/queryables' when no collections in the catalog
            if (
                request.method == "GET"
                and request.scope["path"] == CATALOG_PREFIX + QUERYABLES
                and not self.request_ids["collection_ids"]
                and response_content["code"] == "NotFoundError"
            ):
                # Return empty list of properties and additionalProperties set to true on /catalog/queryables
                # when there are no collections in catalog.
                return JSONResponse(
                    {
                        "$id": f"{request.url}",
                        "type": "object",
                        "title": "STAC Queryables.",
                        "$schema": "https://json-schema.org/draft-07/schema#",
                        "properties": {},
                        "additionalProperties": True,
                    },
                    HTTP_200_OK,
                    headers_minus_content_length(streaming_response),
                )

            # Return a regular JSON response instead of StreamingResponse because the body cannot be read again.
            return JSONResponse(response_content, status_code, headers_minus_content_length(streaming_response))

        # Handle responses
        response: Response = streaming_response
        if request.scope["path"] == CATALOG_PREFIX + "/search":
            # GET: '/catalog/search'
            response = await self.manage_search_response(request, streaming_response)
        elif request.method == "GET" and "/download" in request.url.path:
            # URL: GET: '/catalog/collections/{USER}:{COLLECTION}/items/{FEATURE_ID}/download/{ASSET_TYPE}
            response = await self.manage_download_response(request, streaming_response)
        elif request.method == "GET" and (
            self.request_ids["owner_id"]
            or request.scope["path"] in [CATALOG_PREFIX, CATALOG_PREFIX + "/", CATALOG_COLLECTIONS, QUERYABLES]
        ):
            # URL: GET: '/catalog/collections/{USER}:{COLLECTION}'
            # URL: GET: '/catalog/'
            # URL: GET: '/catalog/collections
            response = await self.manage_get_response(request, streaming_response)
        elif request.method in ["POST", "PUT"] and self.request_ids["owner_id"]:
            # URL: POST / PUT: '/catalog/collections/{USER}:{COLLECTION}'
            # or '/catalog/collections/{USER}:{COLLECTION}/items'
            response = await self.manage_put_post_response(request, streaming_response)
        elif request.method == "DELETE" and self.request_ids["owner_id"]:
            response = await self.manage_delete_response(streaming_response, self.request_ids["owner_id"])

        return response

    async def manage_search_response(self, request: Request, response: StreamingResponse) -> GeoJSONResponse:
        """The '/catalog/search' endpoint doesn't give the information of the owner_id and collection_ids.
        to get these values, this function try to search them into the search query. If successful,
        updates the response content by removing the owner_id from the collection_ids and adapt all links.
        If not successful, does nothing and return the response.

        Args:
            response (StreamingResponse): The response from the rs server.
            request (Request): The request from the client.

        Returns:
            GeoJSONResponse: The updated response.
        """
        owner_id = ""
        if request.method == "GET":
            query = parse_qs(request.url.query)
            if "filter" in query:
                qs_filter = query["filter"][0]
                owner_id = extract_owner_name_from_text_filter(qs_filter)
        elif request.method == "POST":
            query = await request.json()
            if "filter" in query:
                qs_filter_json = query["filter"]
                owner_id = extract_owner_name_from_json_filter(qs_filter_json)

        if owner_id:
            self.request_ids["owner_id"] = owner_id

        # Remove owner_id from the collection name
        if "collections" in query:
            # Extract owner_id from the name of the first collection in the list
            self.request_ids["owner_id"] = self.request_ids["collection_ids"][0].split("_")[0]
            self.request_ids["collection_ids"] = [
                coll.removeprefix(f"{self.request_ids['owner_id']}_") for coll in query["collections"][0].split(",")
            ]
        body = [chunk async for chunk in response.body_iterator]
        dec_content = b"".join(map(lambda x: x if isinstance(x, bytes) else x.encode(), body)).decode()  # type: ignore
        content = json.loads(dec_content)
        content = adapt_links(content, "features")
        for collection_id in self.request_ids["collection_ids"]:
            content = adapt_links(content, "features", self.request_ids["owner_id"], collection_id)

        # Add the stac authentication extension
        await StacManager.add_authentication_extension(content)

        return GeoJSONResponse(content, response.status_code, headers_minus_content_length(response))

    async def manage_download_response(
        self,
        request: Request,
        response: StreamingResponse,
    ) -> JSONResponse | RedirectResponse:
        """
        Manage download response and handle requests that should generate a presigned URL.

        Args:
            request (starlette.requests.Request): The request object.
            response (starlette.responses.StreamingResponse): The response object received.

        Returns:
            JSONResponse: Returns a JSONResponse object containing either the presigned URL or
            the response content with the appropriate status code.
        """
        user_login = ""
        auth_roles = []
        if common_settings.CLUSTER_MODE:  # Get the list of access and the user_login calling the endpoint.
            auth_roles = request.state.auth_roles
            user_login = request.state.user_login
        if (  # If we are in cluster mode and the user_login is not authorized
            # to this endpoint returns a HTTP_401_UNAUTHORIZED status.
            # pylint: disable=duplicate-code
            common_settings.CLUSTER_MODE
            and self.request_ids["collection_ids"]
            and self.request_ids["owner_id"]
            and not get_authorisation(
                self.request_ids["collection_ids"],
                auth_roles,
                "download",
                self.request_ids["owner_id"],
                user_login,
            )
        ):
            raise log_http_exception(status_code=HTTP_401_UNAUTHORIZED, detail="Unauthorized access.")
        body = [chunk async for chunk in response.body_iterator]
        content = json.loads(b"".join(body).decode())  # type:ignore
        if content.get("code", True) != "NotFoundError":
            # Only generate presigned url if the item is found
            content, code = self.s3_manager.generate_presigned_url(content, request.url.path)
            if code == HTTP_302_FOUND:
                return RedirectResponse(url=content, status_code=code)
            return JSONResponse(content, code, headers_minus_content_length(response))
        return JSONResponse(content, response.status_code, headers_minus_content_length(response))

    async def manage_get_response(
        self,
        request: Request,
        response: StreamingResponse,
    ) -> Response | JSONResponse:
        """Remove the user name from objects and adapt all links.

        Args:
            request (Request): The client request.
            response (Response | StreamingResponse): The response from the rs-catalog.
        Returns:
            Response: The response updated.
        """
        # Load content of the response as a dictionary
        body = [chunk async for chunk in response.body_iterator]
        dec_content = b"".join(map(lambda x: x if isinstance(x, bytes) else x.encode(), body)).decode()  # type: ignore
        content = await self._manage_get_response_content(request, dec_content) if dec_content else None
        media_type = "application/geo+json" if "/items" in request.scope["path"] else None
        return JSONResponse(content, response.status_code, headers_minus_content_length(response), media_type)

    async def _manage_get_response_content(  # pylint: disable=too-many-locals, too-many-branches, too-many-statements
        self,
        request: Request,
        dec_content: str,
    ) -> Any:
        """Manage content of GET responses with a body

        Args:
            request (Request): The client request.
            dec_content (str): The decoded json content
        Returns:
            Any: the response content
        """
        content = json.loads(dec_content)
        StacManager.update_stac_catalog_metadata(content)
        auth_roles = []
        user_login = ""

        if content.get("geometry") == DEFAULT_GEOM:
            content["geometry"] = None
        if content.get("bbox") == DEFAULT_BBOX:
            content["bbox"] = None

        if common_settings.CLUSTER_MODE:  # Get the list of access and the user_login calling the endpoint.
            auth_roles = request.state.auth_roles
            user_login = request.state.user_login

        # Manage local landing page of the catalog
        if request.scope["path"] in (CATALOG_PREFIX, CATALOG_PREFIX + "/"):
            regex_catalog = CATALOG_COLLECTIONS + r"/(?P<owner_id>.+?)_(?P<collection_id>.*)"
            for link in content["links"]:
                link_parser = urlparse(link["href"])

                if match := re.match(regex_catalog, link_parser.path):
                    groups = match.groupdict()
                    new_path = add_user_prefix(link_parser.path, groups["owner_id"], groups["collection_id"])
                    link["href"] = link_parser._replace(path=new_path).geturl()
            url = request.url._url  # pylint: disable=protected-access
            url = url[: len(url) - len(request.url.path)]
            content = add_prefix_link_landing_page(content, url)

            # patch the catalog landing page with "rel": "child" link for each collection
            # limit must be explicitely set, otherwise the default pgstac limit of 10 is used
            collections_resp = await self.client.all_collections(request=request, limit=1000)
            collections = get_all_accessible_collections(
                collections_resp.get("collections", []),
                auth_roles,
                user_login,
            )
            base_url = (
                next((link["href"] for link in content["links"] if link.get("rel") == "self"), "").rstrip("/") + "/"
            )

            for collection in collections:
                collection_id = (
                    collection["id"].removeprefix(f"{collection['owner']}_")
                    if collection["owner"]
                    else collection["id"]
                )
                content["links"].append(
                    {
                        "rel": "child",
                        "type": "application/json",
                        "title": collection.get("title") or collection_id,
                        "href": urljoin(base_url, f"collections/{collection['owner']}:{collection_id}"),
                    },
                )

        elif request.scope["path"] == CATALOG_COLLECTIONS:  # /catalog/collections
            content["collections"] = get_all_accessible_collections(
                content["collections"],
                auth_roles,
                user_login,
            )
            content["collections"] = StacManager.update_links_for_all_collections(content["collections"])

        # If we are in cluster mode and the user_login is not authorized
        # to this endpoint returns a HTTP_401_UNAUTHORIZED status.
        elif (
            common_settings.CLUSTER_MODE
            and self.request_ids["collection_ids"]
            and self.request_ids["owner_id"]
            and not get_authorisation(
                self.request_ids["collection_ids"],
                auth_roles,
                "read",
                self.request_ids["owner_id"],
                user_login,
            )
            # I don't know why but the STAC browser doesn't send authentication for the queryables endpoint.
            # So allow this endpoint without authentication in this specific case.
            and not (common_settings.request_from_stacbrowser(request) and request.url.path.endswith(QUERYABLES))
        ):
            raise log_http_exception(status_code=HTTP_401_UNAUTHORIZED, detail="Unauthorized access.")
        elif (
            "/collections" in request.scope["path"] and "/items" not in request.scope["path"]
        ):  # /catalog/collections/owner_id:collection_id
            content = adapt_object_links(content, self.request_ids["owner_id"])
        elif (
            "/items" in request.scope["path"] and not self.request_ids["item_id"]
        ):  # /catalog/owner_id/collections/collection_id/items
            content = adapt_links(
                content,
                "features",
                self.request_ids["owner_id"],
                self.request_ids["collection_ids"][0],
            )
        elif self.request_ids["item_id"]:  # /catalog/owner_id/collections/collection_id/items/item_id
            content = adapt_object_links(content, self.request_ids["owner_id"])
        else:
            logger.debug(f"No link adaptation performed for {request.scope}")

        # Add the stac authentication extension
        await StacManager.add_authentication_extension(content)
        return content

    async def manage_put_post_response(self, request: Request, response: StreamingResponse):
        """
        Manage put or post responses.

        Args:
            response (starlette.responses.StreamingResponse): The response object received.

        Returns:
            JSONResponse: Returns a JSONResponse object containing the response content
            with the appropriate status code.

        Raises:
            HTTPException: If there is an error while clearing the temporary bucket,
            raises an HTTPException with a status code of 400 and detailed information.
            If there is a generic exception, raises an HTTPException with a status code
            of 400 and a generic bad request detail.
        """
        try:
            user = self.request_ids["owner_id"]
            body = [chunk async for chunk in response.body_iterator]
            response_content = json.loads(b"".join(body).decode())  # type: ignore
            response_content = adapt_object_links(response_content, self.request_ids["owner_id"])

            # Don't display geometry and bbox for default case since it was added just for compliance.
            if request.scope["path"].startswith(
                f"{CATALOG_COLLECTIONS}/{user}_{self.request_ids['collection_ids'][0]}/items",
            ):
                if response_content.get("geometry") == DEFAULT_GEOM:
                    response_content["geometry"] = None
                if response_content.get("bbox") == DEFAULT_BBOX:
                    response_content["bbox"] = None
            self.s3_manager.delete_s3_files(self.s3_files_to_be_deleted)
            self.s3_files_to_be_deleted.clear()
        except RuntimeError as exc:
            raise log_http_exception(
                status_code=HTTP_400_BAD_REQUEST,
                detail=f"Failed to clean temporary bucket: {exc}",
            ) from exc
        except Exception as exc:  # pylint: disable=broad-except
            raise log_http_exception(status_code=HTTP_400_BAD_REQUEST, detail=f"Bad request: {exc}") from exc
        media_type = "application/geo+json" if "/items" in request.scope["path"] else None
        return JSONResponse(response_content, response.status_code, headers_minus_content_length(response), media_type)

    async def manage_delete_response(self, response: StreamingResponse, user: str) -> Response:
        """Change the name of the deleted collection by removing owner_id.

        Args:
            response (StreamingResponse): The client response.
            user (str): The owner id.

        Returns:
            JSONResponse: The new response with the updated collection name.
        """
        body = [chunk async for chunk in response.body_iterator]
        response_content = json.loads(b"".join(body).decode())  # type:ignore
        if "deleted collection" in response_content:
            response_content["deleted collection"] = response_content["deleted collection"].removeprefix(f"{user}_")
        # delete the s3 files as well
        self.s3_manager.delete_s3_files(self.s3_files_to_be_deleted)
        self.s3_files_to_be_deleted.clear()
        return JSONResponse(response_content, HTTP_200_OK, headers_minus_content_length(response))

manage_delete_response(response, user) async

Change the name of the deleted collection by removing owner_id.

Parameters:

Name Type Description Default
response StreamingResponse

The client response.

required
user str

The owner id.

required

Returns:

Name Type Description
JSONResponse Response

The new response with the updated collection name.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/response_manager.py
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
async def manage_delete_response(self, response: StreamingResponse, user: str) -> Response:
    """Change the name of the deleted collection by removing owner_id.

    Args:
        response (StreamingResponse): The client response.
        user (str): The owner id.

    Returns:
        JSONResponse: The new response with the updated collection name.
    """
    body = [chunk async for chunk in response.body_iterator]
    response_content = json.loads(b"".join(body).decode())  # type:ignore
    if "deleted collection" in response_content:
        response_content["deleted collection"] = response_content["deleted collection"].removeprefix(f"{user}_")
    # delete the s3 files as well
    self.s3_manager.delete_s3_files(self.s3_files_to_be_deleted)
    self.s3_files_to_be_deleted.clear()
    return JSONResponse(response_content, HTTP_200_OK, headers_minus_content_length(response))

manage_download_response(request, response) async

Manage download response and handle requests that should generate a presigned URL.

Parameters:

Name Type Description Default
request Request

The request object.

required
response StreamingResponse

The response object received.

required

Returns:

Name Type Description
JSONResponse JSONResponse | RedirectResponse

Returns a JSONResponse object containing either the presigned URL or

JSONResponse | RedirectResponse

the response content with the appropriate status code.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/response_manager.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
async def manage_download_response(
    self,
    request: Request,
    response: StreamingResponse,
) -> JSONResponse | RedirectResponse:
    """
    Manage download response and handle requests that should generate a presigned URL.

    Args:
        request (starlette.requests.Request): The request object.
        response (starlette.responses.StreamingResponse): The response object received.

    Returns:
        JSONResponse: Returns a JSONResponse object containing either the presigned URL or
        the response content with the appropriate status code.
    """
    user_login = ""
    auth_roles = []
    if common_settings.CLUSTER_MODE:  # Get the list of access and the user_login calling the endpoint.
        auth_roles = request.state.auth_roles
        user_login = request.state.user_login
    if (  # If we are in cluster mode and the user_login is not authorized
        # to this endpoint returns a HTTP_401_UNAUTHORIZED status.
        # pylint: disable=duplicate-code
        common_settings.CLUSTER_MODE
        and self.request_ids["collection_ids"]
        and self.request_ids["owner_id"]
        and not get_authorisation(
            self.request_ids["collection_ids"],
            auth_roles,
            "download",
            self.request_ids["owner_id"],
            user_login,
        )
    ):
        raise log_http_exception(status_code=HTTP_401_UNAUTHORIZED, detail="Unauthorized access.")
    body = [chunk async for chunk in response.body_iterator]
    content = json.loads(b"".join(body).decode())  # type:ignore
    if content.get("code", True) != "NotFoundError":
        # Only generate presigned url if the item is found
        content, code = self.s3_manager.generate_presigned_url(content, request.url.path)
        if code == HTTP_302_FOUND:
            return RedirectResponse(url=content, status_code=code)
        return JSONResponse(content, code, headers_minus_content_length(response))
    return JSONResponse(content, response.status_code, headers_minus_content_length(response))

manage_get_response(request, response) async

Remove the user name from objects and adapt all links.

Parameters:

Name Type Description Default
request Request

The client request.

required
response Response | StreamingResponse

The response from the rs-catalog.

required

Returns: Response: The response updated.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/response_manager.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
async def manage_get_response(
    self,
    request: Request,
    response: StreamingResponse,
) -> Response | JSONResponse:
    """Remove the user name from objects and adapt all links.

    Args:
        request (Request): The client request.
        response (Response | StreamingResponse): The response from the rs-catalog.
    Returns:
        Response: The response updated.
    """
    # Load content of the response as a dictionary
    body = [chunk async for chunk in response.body_iterator]
    dec_content = b"".join(map(lambda x: x if isinstance(x, bytes) else x.encode(), body)).decode()  # type: ignore
    content = await self._manage_get_response_content(request, dec_content) if dec_content else None
    media_type = "application/geo+json" if "/items" in request.scope["path"] else None
    return JSONResponse(content, response.status_code, headers_minus_content_length(response), media_type)

manage_put_post_response(request, response) async

Manage put or post responses.

Parameters:

Name Type Description Default
response StreamingResponse

The response object received.

required

Returns:

Name Type Description
JSONResponse

Returns a JSONResponse object containing the response content

with the appropriate status code.

Raises:

Type Description
HTTPException

If there is an error while clearing the temporary bucket,

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/response_manager.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
async def manage_put_post_response(self, request: Request, response: StreamingResponse):
    """
    Manage put or post responses.

    Args:
        response (starlette.responses.StreamingResponse): The response object received.

    Returns:
        JSONResponse: Returns a JSONResponse object containing the response content
        with the appropriate status code.

    Raises:
        HTTPException: If there is an error while clearing the temporary bucket,
        raises an HTTPException with a status code of 400 and detailed information.
        If there is a generic exception, raises an HTTPException with a status code
        of 400 and a generic bad request detail.
    """
    try:
        user = self.request_ids["owner_id"]
        body = [chunk async for chunk in response.body_iterator]
        response_content = json.loads(b"".join(body).decode())  # type: ignore
        response_content = adapt_object_links(response_content, self.request_ids["owner_id"])

        # Don't display geometry and bbox for default case since it was added just for compliance.
        if request.scope["path"].startswith(
            f"{CATALOG_COLLECTIONS}/{user}_{self.request_ids['collection_ids'][0]}/items",
        ):
            if response_content.get("geometry") == DEFAULT_GEOM:
                response_content["geometry"] = None
            if response_content.get("bbox") == DEFAULT_BBOX:
                response_content["bbox"] = None
        self.s3_manager.delete_s3_files(self.s3_files_to_be_deleted)
        self.s3_files_to_be_deleted.clear()
    except RuntimeError as exc:
        raise log_http_exception(
            status_code=HTTP_400_BAD_REQUEST,
            detail=f"Failed to clean temporary bucket: {exc}",
        ) from exc
    except Exception as exc:  # pylint: disable=broad-except
        raise log_http_exception(status_code=HTTP_400_BAD_REQUEST, detail=f"Bad request: {exc}") from exc
    media_type = "application/geo+json" if "/items" in request.scope["path"] else None
    return JSONResponse(response_content, response.status_code, headers_minus_content_length(response), media_type)

manage_responses(request, streaming_response) async

Manage responses sent by stac-fastpi after dispatch and before sending it to the user.

Parameters:

Name Type Description Default
request Request

Original request sent to stac-fastapi

required
streaming_response StreamingResponse

Response returned by stac-fastapi

required

Returns:

Name Type Description
Response Response

HTTP Response

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/response_manager.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
async def manage_responses(
    self,
    request: Request,
    streaming_response: StreamingResponse,
) -> Response:
    """Manage responses sent by stac-fastpi after dispatch and before sending it to the user.

    Args:
        request (Request): Original request sent to stac-fastapi
        streaming_response (StreamingResponse): Response returned by stac-fastapi

    Returns:
        Response: HTTP Response
    """

    # Don't forward responses that fail.
    # NOTE: the 30x (redirect responses) are used by the oauth2 authentication.
    status_code = streaming_response.status_code
    if status_code not in (HTTP_200_OK, HTTP_201_CREATED, HTTP_302_FOUND, HTTP_307_TEMPORARY_REDIRECT):

        # Read the body. WARNING: after this, the body cannot be read a second time.
        body = [chunk async for chunk in streaming_response.body_iterator]
        response_content = json.loads(b"".join(body).decode())  # type:ignore
        logger.debug("response: %d - %s", streaming_response.status_code, response_content)
        self.s3_manager.clear_catalog_bucket(response_content)

        # GET: '/catalog/queryables' when no collections in the catalog
        if (
            request.method == "GET"
            and request.scope["path"] == CATALOG_PREFIX + QUERYABLES
            and not self.request_ids["collection_ids"]
            and response_content["code"] == "NotFoundError"
        ):
            # Return empty list of properties and additionalProperties set to true on /catalog/queryables
            # when there are no collections in catalog.
            return JSONResponse(
                {
                    "$id": f"{request.url}",
                    "type": "object",
                    "title": "STAC Queryables.",
                    "$schema": "https://json-schema.org/draft-07/schema#",
                    "properties": {},
                    "additionalProperties": True,
                },
                HTTP_200_OK,
                headers_minus_content_length(streaming_response),
            )

        # Return a regular JSON response instead of StreamingResponse because the body cannot be read again.
        return JSONResponse(response_content, status_code, headers_minus_content_length(streaming_response))

    # Handle responses
    response: Response = streaming_response
    if request.scope["path"] == CATALOG_PREFIX + "/search":
        # GET: '/catalog/search'
        response = await self.manage_search_response(request, streaming_response)
    elif request.method == "GET" and "/download" in request.url.path:
        # URL: GET: '/catalog/collections/{USER}:{COLLECTION}/items/{FEATURE_ID}/download/{ASSET_TYPE}
        response = await self.manage_download_response(request, streaming_response)
    elif request.method == "GET" and (
        self.request_ids["owner_id"]
        or request.scope["path"] in [CATALOG_PREFIX, CATALOG_PREFIX + "/", CATALOG_COLLECTIONS, QUERYABLES]
    ):
        # URL: GET: '/catalog/collections/{USER}:{COLLECTION}'
        # URL: GET: '/catalog/'
        # URL: GET: '/catalog/collections
        response = await self.manage_get_response(request, streaming_response)
    elif request.method in ["POST", "PUT"] and self.request_ids["owner_id"]:
        # URL: POST / PUT: '/catalog/collections/{USER}:{COLLECTION}'
        # or '/catalog/collections/{USER}:{COLLECTION}/items'
        response = await self.manage_put_post_response(request, streaming_response)
    elif request.method == "DELETE" and self.request_ids["owner_id"]:
        response = await self.manage_delete_response(streaming_response, self.request_ids["owner_id"])

    return response

manage_search_response(request, response) async

The '/catalog/search' endpoint doesn't give the information of the owner_id and collection_ids. to get these values, this function try to search them into the search query. If successful, updates the response content by removing the owner_id from the collection_ids and adapt all links. If not successful, does nothing and return the response.

Parameters:

Name Type Description Default
response StreamingResponse

The response from the rs server.

required
request Request

The request from the client.

required

Returns:

Name Type Description
GeoJSONResponse GeoJSONResponse

The updated response.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/response_manager.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
async def manage_search_response(self, request: Request, response: StreamingResponse) -> GeoJSONResponse:
    """The '/catalog/search' endpoint doesn't give the information of the owner_id and collection_ids.
    to get these values, this function try to search them into the search query. If successful,
    updates the response content by removing the owner_id from the collection_ids and adapt all links.
    If not successful, does nothing and return the response.

    Args:
        response (StreamingResponse): The response from the rs server.
        request (Request): The request from the client.

    Returns:
        GeoJSONResponse: The updated response.
    """
    owner_id = ""
    if request.method == "GET":
        query = parse_qs(request.url.query)
        if "filter" in query:
            qs_filter = query["filter"][0]
            owner_id = extract_owner_name_from_text_filter(qs_filter)
    elif request.method == "POST":
        query = await request.json()
        if "filter" in query:
            qs_filter_json = query["filter"]
            owner_id = extract_owner_name_from_json_filter(qs_filter_json)

    if owner_id:
        self.request_ids["owner_id"] = owner_id

    # Remove owner_id from the collection name
    if "collections" in query:
        # Extract owner_id from the name of the first collection in the list
        self.request_ids["owner_id"] = self.request_ids["collection_ids"][0].split("_")[0]
        self.request_ids["collection_ids"] = [
            coll.removeprefix(f"{self.request_ids['owner_id']}_") for coll in query["collections"][0].split(",")
        ]
    body = [chunk async for chunk in response.body_iterator]
    dec_content = b"".join(map(lambda x: x if isinstance(x, bytes) else x.encode(), body)).decode()  # type: ignore
    content = json.loads(dec_content)
    content = adapt_links(content, "features")
    for collection_id in self.request_ids["collection_ids"]:
        content = adapt_links(content, "features", self.request_ids["owner_id"], collection_id)

    # Add the stac authentication extension
    await StacManager.add_authentication_extension(content)

    return GeoJSONResponse(content, response.status_code, headers_minus_content_length(response))

log_http_exception(*args, **kwargs)

Log error and return an HTTP exception to be raised by the caller

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/response_manager.py
70
71
72
def log_http_exception(*args, **kwargs) -> type[HTTPException]:
    """Log error and return an HTTP exception to be raised by the caller"""
    return utils2.log_http_exception(logger, *args, **kwargs)