Skip to content

rs_server_catalog/middleware/catalog_middleware.md

<< Back to index

A BaseHTTPMiddleware to handle the user multi catalog.

The stac-fastapi software doesn't handle multi catalog. In the rs-server we need to handle user-based catalogs.

The rs-server uses only one catalog but the collections are prefixed by the user name. The middleware is used to hide this mechanism.

The middleware: * redirect the user-specific request to the common stac api endpoint * modifies the request to add the user prefix in the collection name * modifies the response to remove the user prefix in the collection name * modifies the response to update the links.

CatalogMiddleware

Bases: BaseHTTPMiddleware

The user catalog middleware.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/catalog_middleware.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
class CatalogMiddleware(BaseHTTPMiddleware):  # pylint: disable=too-few-public-methods
    """The user catalog middleware."""

    async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
        """Redirect the user catalog specific endpoint and adapt the response content."""
        try:
            response = await UserCatalog(api.client).dispatch(request, call_next)
            return response
        except (HTTPException, StarletteHTTPException) as exc:
            phrase = HTTPStatus(exc.status_code).phrase
            code = "".join(word.title() for word in phrase.split())
            return JSONResponse(
                status_code=exc.status_code,
                content=ErrorResponse(code=code, description=str(exc.detail)),
            )

dispatch(request, call_next) async

Redirect the user catalog specific endpoint and adapt the response content.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/catalog_middleware.py
61
62
63
64
65
66
67
68
69
70
71
72
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
    """Redirect the user catalog specific endpoint and adapt the response content."""
    try:
        response = await UserCatalog(api.client).dispatch(request, call_next)
        return response
    except (HTTPException, StarletteHTTPException) as exc:
        phrase = HTTPStatus(exc.status_code).phrase
        code = "".join(word.title() for word in phrase.split())
        return JSONResponse(
            status_code=exc.status_code,
            content=ErrorResponse(code=code, description=str(exc.detail)),
        )

UserCatalog

The user catalog middleware handler.

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/catalog_middleware.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class UserCatalog:  # pylint: disable=too-few-public-methods
    """The user catalog middleware handler."""

    def __init__(self, client: CoreCrudClient):
        """Constructor, called from the middleware"""
        self.request_ids: dict[Any, Any] = {}
        self.client = client

    async def dispatch(
        self,
        request: Request,
        call_next: RequestResponseEndpoint,
    ) -> Response:
        """
        Redirect the user catalog specific endpoint and adapt the response content.

        Args:
            request (Request): Initial request
            call_next: next call to apply

        Returns:
            response (Response): Response to the current request
        """
        request_body = None if request.method not in ["PATCH", "POST", "PUT"] else await request.json()
        auth_roles = user_login = owner_id = None

        # ---------- Management of  authentification (retrieve user_login + default owner_id)
        if common_settings.CLUSTER_MODE:  # Get the list of access and the user_login calling the endpoint.
            try:
                auth_roles = request.state.auth_roles
                user_login = request.state.user_login
            # Case of endpoints that do not call the authenticate function
            # Get the the user_login calling the endpoint. If this is not set (the authentication.authenticate function
            # is not called), the local user shall be used (later on, in rereoute_url)
            # The common_settings.CLUSTER_MODE may not be used because for some endpoints like /api
            # the authenticate is not called even if common_settings.CLUSTER_MODE is True. Thus, the presence of
            # user_login has to be checked instead
            except (NameError, AttributeError):
                auth_roles = []
                user_login = get_user(None, None)  # Get default local or cluster user
        elif common_settings.LOCAL_MODE:
            user_login = get_user(None, None)
        owner_id = ""  # Default owner_id is empty
        logger.debug(
            f"Received {request.method} from '{user_login}' | {request.url.path}?{request.query_params}",
        )

        # ---------- Request rerouting
        # Dictionary to easily access main data from the request
        self.request_ids = {
            "auth_roles": auth_roles,
            "user_login": user_login,
            "owner_id": owner_id,
            "collection_ids": [],
            "item_id": "",
        }
        reroute_url(request, self.request_ids)
        if not request.scope["path"]:  # Invalid endpoint
            raise log_http_exception(status_code=HTTP_400_BAD_REQUEST, detail="Invalid endpoint.")
        logger.debug(f"path = {request.scope['path']} | requests_ids = {self.request_ids}")

        # Ensure that user_login is not null after rerouting
        if not self.request_ids["user_login"]:
            raise log_http_exception(
                status_code=HTTP_500_INTERNAL_SERVER_ERROR,
                detail="user_login is not defined !",
            )

        # ---------- Body data recovery
        # Recover user and collection id with the ones provided in the request body
        # (if the corresponding parameters have not been recovered from the url)
        # This is available in POST/PUT/PATCH methods only
        if request_body:
            # Edit owner_id with the corresponding body content if exist
            if not self.request_ids["owner_id"]:
                self.request_ids["owner_id"] = request_body.get("owner")
            # received a POST/PUT/PATCH for a STAC item or
            # a STAC collection is created
            if len(self.request_ids["collection_ids"]) == 0:
                collections = request_body.get("collections") or request_body.get("id")
                if collections:
                    self.request_ids["collection_ids"] = collections if isinstance(collections, list) else [collections]

            if not self.request_ids["item_id"] and request_body.get("type") == "Feature":
                self.request_ids["item_id"] = request_body.get("id")

        # ---------- Apply specific changes for each endpoint

        request_manager = CatalogRequestManager(self.client, self.request_ids)
        request = await request_manager.manage_requests(request)
        # If the request manager returns a response, it usually means the user is not authorized
        # to do the operation received, so we directly return the response
        if isinstance(request, Response):
            return request

        response = await call_next(request)

        response_manager = CatalogResponseManager(
            request_manager.client,
            request_manager.request_ids,
            request_manager.s3_files_to_be_deleted,
        )
        return await response_manager.manage_responses(request, cast(StreamingResponse, response))

__init__(client)

Constructor, called from the middleware

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/catalog_middleware.py
78
79
80
81
def __init__(self, client: CoreCrudClient):
    """Constructor, called from the middleware"""
    self.request_ids: dict[Any, Any] = {}
    self.client = client

dispatch(request, call_next) async

Redirect the user catalog specific endpoint and adapt the response content.

Parameters:

Name Type Description Default
request Request

Initial request

required
call_next RequestResponseEndpoint

next call to apply

required

Returns:

Name Type Description
response Response

Response to the current request

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/catalog_middleware.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
async def dispatch(
    self,
    request: Request,
    call_next: RequestResponseEndpoint,
) -> Response:
    """
    Redirect the user catalog specific endpoint and adapt the response content.

    Args:
        request (Request): Initial request
        call_next: next call to apply

    Returns:
        response (Response): Response to the current request
    """
    request_body = None if request.method not in ["PATCH", "POST", "PUT"] else await request.json()
    auth_roles = user_login = owner_id = None

    # ---------- Management of  authentification (retrieve user_login + default owner_id)
    if common_settings.CLUSTER_MODE:  # Get the list of access and the user_login calling the endpoint.
        try:
            auth_roles = request.state.auth_roles
            user_login = request.state.user_login
        # Case of endpoints that do not call the authenticate function
        # Get the the user_login calling the endpoint. If this is not set (the authentication.authenticate function
        # is not called), the local user shall be used (later on, in rereoute_url)
        # The common_settings.CLUSTER_MODE may not be used because for some endpoints like /api
        # the authenticate is not called even if common_settings.CLUSTER_MODE is True. Thus, the presence of
        # user_login has to be checked instead
        except (NameError, AttributeError):
            auth_roles = []
            user_login = get_user(None, None)  # Get default local or cluster user
    elif common_settings.LOCAL_MODE:
        user_login = get_user(None, None)
    owner_id = ""  # Default owner_id is empty
    logger.debug(
        f"Received {request.method} from '{user_login}' | {request.url.path}?{request.query_params}",
    )

    # ---------- Request rerouting
    # Dictionary to easily access main data from the request
    self.request_ids = {
        "auth_roles": auth_roles,
        "user_login": user_login,
        "owner_id": owner_id,
        "collection_ids": [],
        "item_id": "",
    }
    reroute_url(request, self.request_ids)
    if not request.scope["path"]:  # Invalid endpoint
        raise log_http_exception(status_code=HTTP_400_BAD_REQUEST, detail="Invalid endpoint.")
    logger.debug(f"path = {request.scope['path']} | requests_ids = {self.request_ids}")

    # Ensure that user_login is not null after rerouting
    if not self.request_ids["user_login"]:
        raise log_http_exception(
            status_code=HTTP_500_INTERNAL_SERVER_ERROR,
            detail="user_login is not defined !",
        )

    # ---------- Body data recovery
    # Recover user and collection id with the ones provided in the request body
    # (if the corresponding parameters have not been recovered from the url)
    # This is available in POST/PUT/PATCH methods only
    if request_body:
        # Edit owner_id with the corresponding body content if exist
        if not self.request_ids["owner_id"]:
            self.request_ids["owner_id"] = request_body.get("owner")
        # received a POST/PUT/PATCH for a STAC item or
        # a STAC collection is created
        if len(self.request_ids["collection_ids"]) == 0:
            collections = request_body.get("collections") or request_body.get("id")
            if collections:
                self.request_ids["collection_ids"] = collections if isinstance(collections, list) else [collections]

        if not self.request_ids["item_id"] and request_body.get("type") == "Feature":
            self.request_ids["item_id"] = request_body.get("id")

    # ---------- Apply specific changes for each endpoint

    request_manager = CatalogRequestManager(self.client, self.request_ids)
    request = await request_manager.manage_requests(request)
    # If the request manager returns a response, it usually means the user is not authorized
    # to do the operation received, so we directly return the response
    if isinstance(request, Response):
        return request

    response = await call_next(request)

    response_manager = CatalogResponseManager(
        request_manager.client,
        request_manager.request_ids,
        request_manager.s3_files_to_be_deleted,
    )
    return await response_manager.manage_responses(request, cast(StreamingResponse, response))

log_http_exception(*args, **kwargs)

Log error and return an HTTP exception to be raised by the caller

Source code in docs/rs-server/services/catalog/rs_server_catalog/middleware/catalog_middleware.py
53
54
55
def log_http_exception(*args, **kwargs) -> type[HTTPException]:
    """Log error and return an HTTP exception to be raised by the caller"""
    return utils2.log_http_exception(logger, *args, **kwargs)