64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541 | class CatalogRequestManager:
"""Class to process the Requests sent by users to the Catalog before routing them to stac-fastapi.
Each type of Response is managed in one of the functions."""
def __init__(self, client: CoreCrudClient, request_ids: dict[Any, Any]):
self.client = client
self.request_ids = request_ids
self.s3_manager = S3Manager()
self.s3_files_to_be_deleted: list = []
def _override_request_body(self, request: Request, content: Any) -> Request:
"""Update request body (better find the function that updates the body maybe?)"""
request._body = json.dumps(content).encode("utf-8") # pylint: disable=protected-access
request._json = content # pylint: disable=protected-access
logger.debug("new request body and json: %s", request._body) # pylint: disable=protected-access
return request
def _override_request_query_string(self, request: Request, query_params: dict) -> Request:
"""Update request query string"""
request.scope["query_string"] = urlencode(query_params, doseq=True).encode("utf-8")
logger.debug("new request query_string: %s", request.scope["query_string"])
return request
async def _collection_exists(self, request: Request, collection_id: str) -> bool:
"""Check if the collection exists.
Returns:
bool: True if the collection exists, False otherwise
"""
try:
await self.client.get_collection(collection_id, request)
return True
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("Collection %s not found: %s", collection_id, e)
return False
async def _get_item_from_collection(self, request: Request):
"""Get an item from the collection.
Args:
request (Request): The request object.
Returns:
Optional[Dict]: The item from the collection if found, else None.
"""
item_id = self.request_ids["item_id"]
collection_id = f"{self.request_ids['owner_id']}_{self.request_ids['collection_ids'][0]}"
try:
item = await self.client.get_item(item_id=item_id, collection_id=collection_id, request=request)
return item
except NotFoundError:
logger.info(f"The item '{item_id}' does not exist in collection '{collection_id}'")
return None
except Exception as e: # pylint: disable=broad-exception-caught
logger.exception(f"Exception: {e}")
raise log_http_exception(
detail=f"Exception when trying to get the item {item_id} from the collection '{collection_id}'",
status_code=HTTP_400_BAD_REQUEST,
) from e
async def build_filelist_to_be_deleted(self, request):
"""Build the list of the s3 files that will be deleted if the request is successfull"""
for ci in self.request_ids["collection_ids"]:
collection_id = f"{self.request_ids['owner_id']}_{ci}"
items = []
try:
if "/items" not in request.scope["path"]:
# this is the case for delete endpoint /collections/<collection_name>
# use pagination, otherwise a maximum of the default limit (10) items is returned
# NOTE: Unable to use the pagination from pgstac client. Temporary, use a limit of 100
token = None
while True:
items_collection = await self.client.item_collection(
request=request,
collection_id=collection_id,
limit=100,
token=token,
)
items.extend(items_collection.get("features", []))
# Check if there's a next token for pagination
token = get_token_for_pagination(items_collection)
if not token:
# No more pages left, break the loop
break
else:
# this is the case for delete endpoint /collections/<collection_name>/items/<item_name>
item = await self.client.get_item(
item_id=self.request_ids["item_id"],
collection_id=collection_id,
request=request,
)
items = [item]
except NotFoundError as nfe:
logger.error(f"Failed to find the requested object to be deleted. {nfe}")
return
except KeyError as e:
logger.error(f"Failed to build the list of items to be deleted due to missing key: {e}")
return
logger.debug(f"Found {len(items)} items: {items}")
try:
for item in items:
assets = item.get("assets", {})
for _, asset_info in assets.items():
s3_href = asset_info.get("href")
if s3_href:
self.s3_files_to_be_deleted.append(s3_href)
except KeyError as e:
logger.error(
f"Failed to build the list of S3 files to be deleted due to missing key in dictionary: {e}",
)
return
logger.info(
"Successfully built the list of S3 files to be deleted. "
f"There are {len(self.s3_files_to_be_deleted)} files to be deleted",
)
async def manage_requests(self, request: Request) -> Request | Response:
"""Main function to dispatch the request pre-processing depending on which endpoint is called.
Will pre-process the request using the function associated to the path called and return it.
Args:
request (Request): request received by the Catalog.
Returns:
Request|Response: Request processed to be sent to stac-fastapi OR a response if the operation
is not authorized
"""
if request.method in ("POST", "PUT") and "/search" not in request.scope["path"]:
# URL: POST / PUT: '/catalog/collections/{USER}:{COLLECTION}'
# or '/catalog/collections/{USER}:{COLLECTION}/items'
request_or_response = await self.manage_put_post_request(request)
if hasattr(request_or_response, "status_code"): # Unauthorized
return cast(Response, request_or_response)
request = request_or_response
elif request.method == "DELETE":
if not await self.manage_delete_request(request):
raise log_http_exception(status_code=HTTP_401_UNAUTHORIZED, detail="Deletion not allowed.")
elif "/search" in request.scope["path"]:
# URL: GET: '/catalog/search'
request_or_response = await self.manage_search_request(request)
if hasattr(request_or_response, "status_code"): # Unauthorized
return cast(Response, request_or_response)
request = request_or_response
elif request.method == "GET" and request.scope["path"] == CATALOG_COLLECTIONS:
# override default pgstac limit of 10 items if not explicitely set
if "limit" not in request.query_params:
request = self._override_request_query_string(request, {**request.query_params, "limit": 1000})
elif request.method == "PATCH":
request_or_response = await self.manage_patch_request(request)
if hasattr(request_or_response, "status_code"): # Unauthorized
return cast(Response, request_or_response)
request = request_or_response
return request
async def manage_put_post_request( # pylint: disable=too-many-statements,too-many-return-statements,too-many-branches # noqa: E501
self,
request: Request,
) -> Request | JSONResponse:
"""Adapt the request body for the STAC endpoint.
Args:
request (Request): The Client request to be updated.
Returns:
Request: The request updated.
"""
try:
original_content = await request.json()
content = copy.deepcopy(original_content)
check_user_authorization(self.request_ids)
if len(self.request_ids["collection_ids"]) > 1:
raise log_http_exception(
status_code=HTTP_400_BAD_REQUEST,
detail="Cannot create or update more than one collection !",
)
if len(self.request_ids["collection_ids"]) == 0:
raise log_http_exception(
status_code=HTTP_400_BAD_REQUEST,
detail="Cannot create or update -> no collection specified !",
)
collection = self.request_ids["collection_ids"][0]
if (
# POST collection
request.scope["path"]
== CATALOG_COLLECTIONS
) or (
# PUT collection
request.scope["path"]
== f"{CATALOG_COLLECTIONS}/{self.request_ids['owner_id']}_{collection}"
):
# Manage a collection creation. The apikey user should be the same as the owner
# field in the body request. In other words, an apikey user cannot create a
# collection owned by another user.
# We don't care for local mode, any user may create / delete collection owned by another user
if common_settings.CLUSTER_MODE and (self.request_ids["owner_id"] != self.request_ids["user_login"]):
error = f"The '{self.request_ids['user_login']}' user cannot create a \
collection owned by the '{self.request_ids['owner_id']}' user. Additionally, modifying the 'owner' \
field is not permitted also."
raise log_http_exception(status_code=HTTP_401_UNAUTHORIZED, detail=error)
content["id"] = owner_id_and_collection_id(self.request_ids["owner_id"], content["id"])
if not content.get("owner"):
content["owner"] = self.request_ids["owner_id"]
# See if there is already a collection with this ID. If yes, retrieve its "created" value.
try:
existing_collection = await self.client.get_collection(content["id"], request)
date_of_creation = existing_collection.get("created", "")
except Exception as e: # pylint: disable=broad-exception-caught
logger.debug("Collection %s doesn't exist and will be created: %s", content["id"], e)
date_of_creation = ""
# Update timestamps ("updated", and "created" if it's a new collection)
content = timestamps_extension.set_timestamps_to_collection(content, original_created=date_of_creation)
logger.debug(f"Handling for collection {content['id']}")
# TODO update the links also?
# The following section handles the request to create/update an item
elif "/items" in request.scope["path"]:
# first check if the collection exists
if not await self._collection_exists(request, f"{self.request_ids['owner_id']}_{collection}"):
raise log_http_exception(
status_code=HTTP_404_NOT_FOUND,
detail=f"Collection {collection} does not exist.",
)
# try to get the item if it is already part from the collection
item = await self._get_item_from_collection(request)
content, self.s3_files_to_be_deleted = self.s3_manager.update_stac_item_publication(
content,
request,
self.request_ids,
item,
)
if content:
if request.method == "POST":
content = timestamps_extension.set_timestamps_for_creation(content)
content = timestamps_extension.set_timestamps_for_insertion(content)
else: # PUT
published = expires = ""
if item and item.get("properties"):
published = item["properties"].get("published", "")
expires = item["properties"].get("expires", "")
if not published and not expires:
raise log_http_exception(
status_code=HTTP_400_BAD_REQUEST,
detail=f"Item {content['id']} not found.",
)
content = timestamps_extension.set_timestamps_for_update(
content,
original_published=published,
original_expires=expires,
)
# If item doesn't contain a geometry/bbox, just fill with a default one.
if not content.get("geometry", None):
content["geometry"] = DEFAULT_GEOM
if not content.get("bbox", None):
content["bbox"] = DEFAULT_BBOX
if hasattr(content, "status_code"):
return content
# update request body if needed
if content != original_content:
request = self._override_request_body(request, content)
return request # pylint: disable=protected-access
except KeyError as kerr_msg:
raise log_http_exception(
detail=f"Missing key in request body! {kerr_msg}",
status_code=HTTP_400_BAD_REQUEST,
) from kerr_msg
async def manage_delete_request(self, request: Request):
"""Check if the deletion is allowed.
Args:
request (Request): The client request.
Raises:
HTTPException: If the user is not authenticated.
Returns:
bool: Return True if the deletion is allowed, False otherwise.
"""
user_login = getpass.getuser()
auth_roles = []
if common_settings.CLUSTER_MODE: # Get the list of access and the user_login calling the endpoint.
auth_roles = request.state.auth_roles
user_login = request.state.user_login
if ( # If we are in cluster mode and the user_login is not authorized
# to this endpoint returns a HTTP_401_UNAUTHORIZED status.
common_settings.CLUSTER_MODE
and self.request_ids["collection_ids"]
and self.request_ids["owner_id"]
and not get_authorisation(
self.request_ids["collection_ids"],
auth_roles,
"write",
self.request_ids["owner_id"],
user_login,
)
):
return False
# Manage a collection deletion. The apikey user (or local user if in local mode)
# should be the same as the owner field in the body request. In other words, the
# apikey user cannot delete a collection owned by another user
# we don't care for local mode, any user may create / delete collection owned by another user
if (
( # DELETE collection
request.scope["path"]
== f"{CATALOG_COLLECTIONS}/{self.request_ids['owner_id']}_{self.request_ids['collection_ids'][0]}"
)
and common_settings.CLUSTER_MODE
and (self.request_ids["owner_id"] != user_login)
):
logger.error(
f"The '{user_login}' user cannot delete a \
collection owned by the '{self.request_ids['owner_id']}' user",
)
return False
await self.build_filelist_to_be_deleted(request)
return True
async def manage_search_request( # pylint: disable=too-many-statements,too-many-branches
self,
request: Request,
) -> Request | JSONResponse:
"""find the user in the filter parameter and add it to the
collection name.
Args:
request Request: the client request.
Returns:
Request: the new request with the collection name updated.
"""
# ---------- POST requests
if request.method == "POST":
content = await request.json()
# Pre-processing of filter extensions
if "filter" in content:
content["filter"] = process_filter_extensions(content["filter"])
# Management of priority for the assignation of the owner_id
if not self.request_ids["owner_id"]:
self.request_ids["owner_id"] = (
(extract_owner_name_from_json_filter(content["filter"]) if "filter" in content else None)
or content.get("owner")
or get_user(self.request_ids["owner_id"], self.request_ids["user_login"])
)
# Add filter-lang option to the content if it doesn't already exist
if "filter" in content:
filter_lang = {"filter-lang": content.get("filter-lang", "cql2-json")}
stac_filter = content.pop("filter")
content = {
**content,
**filter_lang,
"filter": stac_filter,
} # The "filter_lang" field has to be placed BEFORE the filter.
# ----- Call /catalog/search with POST method endpoint
if "collections" in content:
# Check if each collection exist with their raw name, if not concatenate owner_id to the collection name
for i, collection in enumerate(content["collections"]):
if not await self._collection_exists(request, collection):
content["collections"][i] = f"{self.request_ids['owner_id']}_{collection}"
logger.debug(f"Using collection name: {content['collections'][i]}")
# Check the existence of the collection after concatenation of owner_id
if not await self._collection_exists(request, content["collections"][i]):
raise log_http_exception(
status_code=HTTP_404_NOT_FOUND,
detail=f"Collection {collection} not found.",
)
self.request_ids["collection_ids"] = content["collections"]
request = self._override_request_body(request, content)
# ---------- GET requests
elif request.method == "GET":
# Get dictionary of query parameters
query_params_dict = dict(request.query_params)
# Update owner_id if it is not already defined from path parameters
if not self.request_ids["owner_id"]:
self.request_ids["owner_id"] = (
(
extract_owner_name_from_text_filter(query_params_dict["filter"])
if "filter" in query_params_dict
else ""
)
or query_params_dict.get("owner")
or get_user(self.request_ids["owner_id"], self.request_ids["user_login"])
)
# ----- Catch endpoint catalog/search + query parameters (e.g. /search?ids=S3_OLC&collections=titi)
if "collections" in query_params_dict:
coll_list = query_params_dict["collections"].split(",")
# Check if each collection exist with their raw name, if not concatenate owner_id to the collection name
for i, collection in enumerate(coll_list):
if not await self._collection_exists(request, collection):
coll_list[i] = f"{self.request_ids['owner_id']}_{collection}"
logger.debug(f"Using collection name: {coll_list[i]}")
# Check the existence of the collection after concatenation of owner_id
if not await self._collection_exists(request, coll_list[i]):
raise log_http_exception(
status_code=HTTP_404_NOT_FOUND,
detail=f"Collection {collection} not found.",
)
self.request_ids["collection_ids"] = coll_list
query_params_dict["collections"] = ",".join(coll_list)
request = self._override_request_query_string(request, query_params_dict)
# Check that the collection from the request exists
for collection in self.request_ids["collection_ids"]:
if not await self._collection_exists(request, collection):
raise log_http_exception(status_code=HTTP_404_NOT_FOUND, detail=f"Collection {collection} not found.")
# Check authorisation in cluster mode
if common_settings.CLUSTER_MODE and not get_authorisation(
self.request_ids["collection_ids"],
self.request_ids["auth_roles"],
"read",
self.request_ids["owner_id"],
self.request_ids["user_login"],
# When calling the /search endpoints, the catalog ids are always prefixed by their <owner>_
owner_prefix=True,
):
raise log_http_exception(status_code=HTTP_401_UNAUTHORIZED, detail="Unauthorized access.")
return request
async def manage_patch_request(self, request: Request):
"""
Pre-processing of a PATCH request to the Catalog.
Does authorization checks and updates the "updated" field of the item to patch.
Args:
request (Request): The request from the Client
Returns:
Request: Updated request
"""
try:
original_content = await request.json()
content = copy.deepcopy(original_content)
check_user_authorization(self.request_ids)
# Update "updated" timestamp (different field if it is an item or a collection)
is_item = "/items/" in request.scope["path"]
content = timestamps_extension.set_updated_timestamp_to_now(content, is_item=is_item)
request = self._override_request_body(request, content)
return request
except KeyError as kerr_msg:
raise log_http_exception(
detail=f"Missing key in request body! {kerr_msg}",
status_code=HTTP_400_BAD_REQUEST,
) from kerr_msg
|