Skip to content

rs_server_catalog/data_management/s3_manager.md

<< Back to index

Module handling all operations on S3 bucket.

S3Manager

Tool class to handle all operations on S3 bucket.

Source code in docs/rs-server/services/catalog/rs_server_catalog/data_management/s3_manager.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
class S3Manager:
    """Tool class to handle all operations on S3 bucket."""

    def __init__(self, s3_credentials: S3Credentials):
        """
        Constructor.

        Args:
            s3_credentials: S3 credentials
        """
        self.s3_handler: S3StorageHandler = self._get_s3_handler(s3_credentials)
        # If we are in local mode, operations on S3 bucket will be skipped
        self.is_catalog_local_mode = int(os.environ.get("RSPY_LOCAL_CATALOG_MODE", 0)) == 1

    def _get_s3_handler(self, s3_credentials: S3Credentials) -> S3StorageHandler:
        """
        Used to create the s3_handler to be used with s3 buckets.

        Args:
            s3_credentials: S3 credentials
        Returns:
            S3StorageHandler: S3 handler
        """
        try:
            s3_handler = S3StorageHandler(s3_credentials)
        except RuntimeError:
            logger.warning(f"Failed to create the s3 handler: {traceback.format_exc()}")
            return None

        return s3_handler

    def clear_catalog_bucket(self, content: dict) -> None:
        """Used to clear specific files from catalog bucket.

        Args:
            content (dict): Files to delete
            s3_handler (S3StorageHandler): S3 handler to use. If None given, will do nothing
        """
        if self.is_catalog_local_mode or (not hasattr(content, "get")):
            return
        for asset in content.get("assets", {}):
            # Retrieve bucket name from config using what's in content
            item_owner = content["properties"].get("owner", "")
            item_collection = content.get("collection", "").removeprefix(f"{item_owner}_")
            item_eopf_type = content["properties"].get("eopf:type", "")
            bucket_name = get_bucket_name_from_config(item_owner, item_collection, item_eopf_type)
            # For catalog bucket, data is already stored into href field (from an asset)
            file_key = content["assets"][asset]["href"]
            if not int(os.environ.get("RSPY_LOCAL_CATALOG_MODE", 0)):  # don't delete files if we are in local mode
                self.s3_handler.delete_key_from_s3(bucket_name, file_key)

    def check_s3_key(self, item: dict, asset_name: str, s3_key: str) -> tuple[bool, int]:
        """Check if the given S3 key exists and matches the expected path.

        Args:
            item (dict): The item from the catalog (if it does exist) containing the asset.
            asset_name (str): The name of the asset to check.
            s3_key (str): The S3 key path to check against.

        Returns:
            bool: True if the S3 key is valid and exists, otherwise False.
            NOTE: Don't mind if we have RSPY_LOCAL_CATALOG_MODE set to ON (meaning self.s3_handler is None)

        Raises:
            HTTPException: If the s3_handler is not available, if S3 paths cannot be retrieved,
                        if the S3 paths do not match, or if there is an error checking the key.
        """
        if not item or self.is_catalog_local_mode:
            return False, -1
        # update an item
        existing_asset = item["assets"].get(asset_name)
        if not existing_asset:
            return False, -1

        # check if the new s3_href is the same as the existing one
        try:
            item_s3_path = existing_asset["href"]
        except KeyError as exc:
            raise HTTPException(
                detail=f"Failed to get the s3 path for the asset {asset_name}",
                status_code=HTTP_500_INTERNAL_SERVER_ERROR,
            ) from exc
        if item_s3_path != s3_key:
            raise HTTPException(
                detail=(
                    f"Received an updated path for the asset {asset_name} of item {item['id']}. "
                    f"The current path is {item_s3_path}, and the new path is {s3_key}. "
                    "However, changing an existing path of an asset is not allowed."
                ),
                status_code=HTTP_400_BAD_REQUEST,
            )
        s3_key_array = s3_key.split("/")
        bucket = s3_key_array[2]
        key_path = "/".join(s3_key_array[3:])

        # check the presence of the key
        try:
            s3_key_exists, size = self.s3_handler.check_s3_key_on_bucket(bucket, key_path)
            if not s3_key_exists:
                return False, -1
                # raise HTTPException(
                #     detail=f"The s3 key {s3_key} should exist on the bucket, but it couldn't be checked",
                #     status_code=HTTP_400_BAD_REQUEST,
                # )
            return True, size
        except RuntimeError as rte:
            raise HTTPException(
                detail=f"When checking the presence of the {s3_key} key, an error has been raised: {rte}",
                status_code=HTTP_400_BAD_REQUEST,
            ) from rte

    def update_stac_item_publication(  # pylint: disable=too-many-locals,too-many-branches,too-many-nested-blocks
        self,
        content: dict,
        request: Request,
        request_ids: dict,
        item: dict,
    ) -> dict:
        """Update the JSON body of a feature with new stac extensions and owner information.

        Args:
            content (dict): The content to update.
            request (Request): The HTTP request object.
            request_ids (dict): IDs associated to the given request
            item (dict): The item from the catalog (if exists) to update.

        Returns:
            dict: The updated content.
            list: List of files to delete from the S3 bucket


        """
        collection_ids = request_ids.get("collection_ids", [])
        user = request_ids.get("owner_id")
        logger.debug(f"Update item for user: {user}")
        if not isinstance(collection_ids, list) or not collection_ids or not user:
            raise HTTPException(
                detail="Failed to get the user or the name of the collection!",
                status_code=HTTP_500_INTERNAL_SERVER_ERROR,
            )
        collection_id = collection_ids[0]
        verify_existing_item_from_catalog(request.method, item, content.get("id", "Unknown"), f"{user}_{collection_id}")

        # 3 - include new stac extensions if not present
        for new_stac_extension in [
            "https://home.rs-python.eu/ownership-stac-extension/v1.1.0/schema.json",
            "https://stac-extensions.github.io/alternate-assets/v1.1.0/schema.json",
            "https://stac-extensions.github.io/file/v2.1.0/schema.json",
        ]:
            if new_stac_extension not in content["stac_extensions"]:
                content["stac_extensions"].append(new_stac_extension)

        # 5 - add owner data
        content["properties"].update({"owner": user})
        content.update({"collection": f"{user}_{collection_id}"})
        logger.debug(f"The updated item for user: {user} ended")
        return content

    def generate_presigned_url(self, content: dict, path: str) -> tuple[str, int]:
        """This function is used to generate a time-limited download url

        Args:
            content (dict): STAC description of the item to generate an URL for
            path (str): Current path to this object

        Returns:
            str: Presigned URL
            int: HTTP return code
        """
        # Assume that pgstac already selected the correct asset id
        # just check type, generate and return url
        path_splitted = path.split("/")
        asset_id = path_splitted[-1]
        item_id = path_splitted[-3]
        # Retrieve bucket name from config using what's in content
        item_owner = content["properties"].get("owner", "")
        item_collection = content.get("collection", "").removeprefix(f"{item_owner}_")
        item_eopf_type = content["properties"].get("eopf:type", "")
        bucket_name = get_bucket_name_from_config(item_owner, item_collection, item_eopf_type)
        try:
            s3_path = content["assets"][asset_id]["href"].removeprefix(f"s3://{bucket_name}/")
        except KeyError:
            return f"Failed to find asset named '{asset_id}' from item '{item_id}'", HTTP_404_NOT_FOUND
        try:
            if not self.s3_handler:
                raise HTTPException(
                    status_code=HTTP_500_INTERNAL_SERVER_ERROR,
                    detail="Failed to find s3 credentials",
                )
            response = self.s3_handler.s3_client.generate_presigned_url(
                "get_object",
                Params={"Bucket": bucket_name, "Key": s3_path},
                ExpiresIn=PRESIGNED_URL_EXPIRATION_TIME,
            )
        except botocore.exceptions.ClientError:
            return "Failed to generate presigned url", HTTP_400_BAD_REQUEST
        return response, HTTP_302_FOUND

    def check_if_item_can_be_published(self, content: dict) -> bool:
        """
        Check if all assets of a given catalog item exist on S3 and are valid for publishing.

        Iterates through each asset in the `content["assets"]` dictionary and verifies
        the presence of the S3 key (or folder/prefix) using `check_s3_key`. Logs the
        results and any errors encountered. Returns True only if all assets exist;
        returns False if at least one asset is missing or cannot be verified.

        Args:
            content (dict): A catalog item dictionary containing asset information
                            under the "assets" key.

        Returns:
            bool: True if all assets exist on S3 and can be published, False otherwise.

        Notes:
            - Handles exceptions raised by `check_s3_key` and logs errors without stopping iteration.
            - For folder/prefix assets, the size returned is ignored (-1), but existence is still validated.
        """
        # (don't do anything if in local mode)
        if self.is_catalog_local_mode:
            return True

        user = content["properties"].get("owner", "")
        collection_id = content.get("collection", "").removeprefix(f"{user}_")
        item_eopf_type = content["properties"].get("eopf:type", "")
        bucket_name = get_bucket_name_from_config(user, collection_id, item_eopf_type)
        exist_list = []
        assets_to_check = []

        # First do the cheap validations locally so we avoid scheduling S3 calls for
        # assets that are already invalid from the STAC payload itself.
        for asset_name, asset_info in content.get("assets", {}).items():
            if not (s3_key := asset_info.get("href")):
                logger.error(f"Asset: {asset_name}, No href key found for this asset")
                exist_list.append(False)
                continue

            # We only allow publication from the bucket resolved from the item metadata.
            # If the href points to a different bucket, we can reject it immediately.
            if bucket_name not in s3_key:
                logger.error(
                    f"Asset: {asset_name}, The s3 key {s3_key} should contain the bucket name {bucket_name}",
                )
                exist_list.append(False)
                continue

            # Keep only the assets that require a real S3 existence check.
            assets_to_check.append((asset_name, s3_key))

        def _check_asset(asset: tuple[str, str]) -> bool:
            # This helper runs inside the thread pool so each asset can be checked
            # independently without blocking the whole publication flow.
            asset_name, s3_key = asset
            try:
                exists, size = self.check_s3_key(content, asset_name, s3_key)
                logger.info(f"Asset: {asset_name}, Found on bucket: {exists}, Size: {size}")
                return exists
            except HTTPException as e:
                logger.error(f"Asset: {asset_name}, Error: {e.detail}")
                return False

        if assets_to_check:
            # boto3 does not provide a generic bulk "exists" API for arbitrary keys,
            # so the best low-risk optimization here is to fan out the checks in parallel.
            # The number of workers is capped to avoid overwhelming the S3 endpoint.
            max_workers = min(len(assets_to_check), max(1, PUBLISH_CHECK_MAX_WORKERS))
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                exist_list.extend(executor.map(_check_asset, assets_to_check))

        return all(exist_list)

    def update_assets_checksums(self, content: dict) -> dict:
        """Update each asset with the checksum returned by S3 GetObjectAttributes."""
        if self.is_catalog_local_mode:
            return content

        user = content["properties"].get("owner", "")
        collection_id = content.get("collection", "").removeprefix(f"{user}_")
        item_eopf_type = content["properties"].get("eopf:type", "")
        bucket_name = get_bucket_name_from_config(user, collection_id, item_eopf_type)

        for asset_name, asset_info in content.get("assets", {}).items():
            href = asset_info.get("href")
            if not href:
                logger.warning("Asset %s has no href; skipping checksum update", asset_name)
                continue

            key = href.removeprefix(f"s3://{bucket_name}/")
            try:
                object_attributes = self.s3_handler.get_object_attributes(bucket_name, key)
            except RuntimeError as error:
                logger.warning("Failed to get checksum attributes for asset %s: %s", asset_name, error)
                continue

            # GetObjectAttributes returns the checksum values in the S3/AWS format:
            # a "Checksum" dict containing one or more algorithm-specific base64 values
            # such as ChecksumCRC32, ChecksumCRC32C, ChecksumSHA1 or ChecksumSHA256.
            # For now we store the first checksum value returned by the object storage
            # into the STAC asset field; the multihash/STAC-normalized conversion can
            # build on this once we preserve the selected algorithm alongside the value.
            checksum = object_attributes.get("Checksum", {})
            for checksum_key, checksum_value in checksum.items():
                if checksum_key.startswith("Checksum") and checksum_value:
                    asset_info["file:checksum"] = checksum_value
                    break

        return content

    async def delete_s3_files(self, s3_files_to_be_deleted: list[str]) -> bool:
        """Used to clear specific files from temporary bucket or from catalog bucket.

        Args:
            s3_files_to_be_deleted (list[str]): list of files to delete from the S3 bucket

        Returns:
            bool: True is deletion was successful, False otherwise
        """
        if not s3_files_to_be_deleted:
            logger.info("No files to be deleted from bucket")
            return True
        if not self.s3_handler:
            logger.error("Failed to create the s3 handler when trying to delete the s3 files")
            return False

        try:
            await self.s3_handler.adelete_keys_from_s3(s3_files_to_be_deleted)
        except RuntimeError as rte:
            logger.exception(
                f"Failed to delete file from s3 bucket. Reason: {rte}. However, the process will still continue !",
            )
        return True

__init__(s3_credentials)

Constructor.

Parameters:

Name Type Description Default
s3_credentials S3Credentials

S3 credentials

required
Source code in docs/rs-server/services/catalog/rs_server_catalog/data_management/s3_manager.py
52
53
54
55
56
57
58
59
60
61
def __init__(self, s3_credentials: S3Credentials):
    """
    Constructor.

    Args:
        s3_credentials: S3 credentials
    """
    self.s3_handler: S3StorageHandler = self._get_s3_handler(s3_credentials)
    # If we are in local mode, operations on S3 bucket will be skipped
    self.is_catalog_local_mode = int(os.environ.get("RSPY_LOCAL_CATALOG_MODE", 0)) == 1

check_if_item_can_be_published(content)

Check if all assets of a given catalog item exist on S3 and are valid for publishing.

Iterates through each asset in the content["assets"] dictionary and verifies the presence of the S3 key (or folder/prefix) using check_s3_key. Logs the results and any errors encountered. Returns True only if all assets exist; returns False if at least one asset is missing or cannot be verified.

Parameters:

Name Type Description Default
content dict

A catalog item dictionary containing asset information under the "assets" key.

required

Returns:

Name Type Description
bool bool

True if all assets exist on S3 and can be published, False otherwise.

Notes
  • Handles exceptions raised by check_s3_key and logs errors without stopping iteration.
  • For folder/prefix assets, the size returned is ignored (-1), but existence is still validated.
Source code in docs/rs-server/services/catalog/rs_server_catalog/data_management/s3_manager.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def check_if_item_can_be_published(self, content: dict) -> bool:
    """
    Check if all assets of a given catalog item exist on S3 and are valid for publishing.

    Iterates through each asset in the `content["assets"]` dictionary and verifies
    the presence of the S3 key (or folder/prefix) using `check_s3_key`. Logs the
    results and any errors encountered. Returns True only if all assets exist;
    returns False if at least one asset is missing or cannot be verified.

    Args:
        content (dict): A catalog item dictionary containing asset information
                        under the "assets" key.

    Returns:
        bool: True if all assets exist on S3 and can be published, False otherwise.

    Notes:
        - Handles exceptions raised by `check_s3_key` and logs errors without stopping iteration.
        - For folder/prefix assets, the size returned is ignored (-1), but existence is still validated.
    """
    # (don't do anything if in local mode)
    if self.is_catalog_local_mode:
        return True

    user = content["properties"].get("owner", "")
    collection_id = content.get("collection", "").removeprefix(f"{user}_")
    item_eopf_type = content["properties"].get("eopf:type", "")
    bucket_name = get_bucket_name_from_config(user, collection_id, item_eopf_type)
    exist_list = []
    assets_to_check = []

    # First do the cheap validations locally so we avoid scheduling S3 calls for
    # assets that are already invalid from the STAC payload itself.
    for asset_name, asset_info in content.get("assets", {}).items():
        if not (s3_key := asset_info.get("href")):
            logger.error(f"Asset: {asset_name}, No href key found for this asset")
            exist_list.append(False)
            continue

        # We only allow publication from the bucket resolved from the item metadata.
        # If the href points to a different bucket, we can reject it immediately.
        if bucket_name not in s3_key:
            logger.error(
                f"Asset: {asset_name}, The s3 key {s3_key} should contain the bucket name {bucket_name}",
            )
            exist_list.append(False)
            continue

        # Keep only the assets that require a real S3 existence check.
        assets_to_check.append((asset_name, s3_key))

    def _check_asset(asset: tuple[str, str]) -> bool:
        # This helper runs inside the thread pool so each asset can be checked
        # independently without blocking the whole publication flow.
        asset_name, s3_key = asset
        try:
            exists, size = self.check_s3_key(content, asset_name, s3_key)
            logger.info(f"Asset: {asset_name}, Found on bucket: {exists}, Size: {size}")
            return exists
        except HTTPException as e:
            logger.error(f"Asset: {asset_name}, Error: {e.detail}")
            return False

    if assets_to_check:
        # boto3 does not provide a generic bulk "exists" API for arbitrary keys,
        # so the best low-risk optimization here is to fan out the checks in parallel.
        # The number of workers is capped to avoid overwhelming the S3 endpoint.
        max_workers = min(len(assets_to_check), max(1, PUBLISH_CHECK_MAX_WORKERS))
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            exist_list.extend(executor.map(_check_asset, assets_to_check))

    return all(exist_list)

check_s3_key(item, asset_name, s3_key)

Check if the given S3 key exists and matches the expected path.

Parameters:

Name Type Description Default
item dict

The item from the catalog (if it does exist) containing the asset.

required
asset_name str

The name of the asset to check.

required
s3_key str

The S3 key path to check against.

required

Returns:

Name Type Description
bool bool

True if the S3 key is valid and exists, otherwise False.

NOTE int

Don't mind if we have RSPY_LOCAL_CATALOG_MODE set to ON (meaning self.s3_handler is None)

Raises:

Type Description
HTTPException

If the s3_handler is not available, if S3 paths cannot be retrieved, if the S3 paths do not match, or if there is an error checking the key.

Source code in docs/rs-server/services/catalog/rs_server_catalog/data_management/s3_manager.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def check_s3_key(self, item: dict, asset_name: str, s3_key: str) -> tuple[bool, int]:
    """Check if the given S3 key exists and matches the expected path.

    Args:
        item (dict): The item from the catalog (if it does exist) containing the asset.
        asset_name (str): The name of the asset to check.
        s3_key (str): The S3 key path to check against.

    Returns:
        bool: True if the S3 key is valid and exists, otherwise False.
        NOTE: Don't mind if we have RSPY_LOCAL_CATALOG_MODE set to ON (meaning self.s3_handler is None)

    Raises:
        HTTPException: If the s3_handler is not available, if S3 paths cannot be retrieved,
                    if the S3 paths do not match, or if there is an error checking the key.
    """
    if not item or self.is_catalog_local_mode:
        return False, -1
    # update an item
    existing_asset = item["assets"].get(asset_name)
    if not existing_asset:
        return False, -1

    # check if the new s3_href is the same as the existing one
    try:
        item_s3_path = existing_asset["href"]
    except KeyError as exc:
        raise HTTPException(
            detail=f"Failed to get the s3 path for the asset {asset_name}",
            status_code=HTTP_500_INTERNAL_SERVER_ERROR,
        ) from exc
    if item_s3_path != s3_key:
        raise HTTPException(
            detail=(
                f"Received an updated path for the asset {asset_name} of item {item['id']}. "
                f"The current path is {item_s3_path}, and the new path is {s3_key}. "
                "However, changing an existing path of an asset is not allowed."
            ),
            status_code=HTTP_400_BAD_REQUEST,
        )
    s3_key_array = s3_key.split("/")
    bucket = s3_key_array[2]
    key_path = "/".join(s3_key_array[3:])

    # check the presence of the key
    try:
        s3_key_exists, size = self.s3_handler.check_s3_key_on_bucket(bucket, key_path)
        if not s3_key_exists:
            return False, -1
            # raise HTTPException(
            #     detail=f"The s3 key {s3_key} should exist on the bucket, but it couldn't be checked",
            #     status_code=HTTP_400_BAD_REQUEST,
            # )
        return True, size
    except RuntimeError as rte:
        raise HTTPException(
            detail=f"When checking the presence of the {s3_key} key, an error has been raised: {rte}",
            status_code=HTTP_400_BAD_REQUEST,
        ) from rte

clear_catalog_bucket(content)

Used to clear specific files from catalog bucket.

Parameters:

Name Type Description Default
content dict

Files to delete

required
s3_handler S3StorageHandler

S3 handler to use. If None given, will do nothing

required
Source code in docs/rs-server/services/catalog/rs_server_catalog/data_management/s3_manager.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def clear_catalog_bucket(self, content: dict) -> None:
    """Used to clear specific files from catalog bucket.

    Args:
        content (dict): Files to delete
        s3_handler (S3StorageHandler): S3 handler to use. If None given, will do nothing
    """
    if self.is_catalog_local_mode or (not hasattr(content, "get")):
        return
    for asset in content.get("assets", {}):
        # Retrieve bucket name from config using what's in content
        item_owner = content["properties"].get("owner", "")
        item_collection = content.get("collection", "").removeprefix(f"{item_owner}_")
        item_eopf_type = content["properties"].get("eopf:type", "")
        bucket_name = get_bucket_name_from_config(item_owner, item_collection, item_eopf_type)
        # For catalog bucket, data is already stored into href field (from an asset)
        file_key = content["assets"][asset]["href"]
        if not int(os.environ.get("RSPY_LOCAL_CATALOG_MODE", 0)):  # don't delete files if we are in local mode
            self.s3_handler.delete_key_from_s3(bucket_name, file_key)

delete_s3_files(s3_files_to_be_deleted) async

Used to clear specific files from temporary bucket or from catalog bucket.

Parameters:

Name Type Description Default
s3_files_to_be_deleted list[str]

list of files to delete from the S3 bucket

required

Returns:

Name Type Description
bool bool

True is deletion was successful, False otherwise

Source code in docs/rs-server/services/catalog/rs_server_catalog/data_management/s3_manager.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
async def delete_s3_files(self, s3_files_to_be_deleted: list[str]) -> bool:
    """Used to clear specific files from temporary bucket or from catalog bucket.

    Args:
        s3_files_to_be_deleted (list[str]): list of files to delete from the S3 bucket

    Returns:
        bool: True is deletion was successful, False otherwise
    """
    if not s3_files_to_be_deleted:
        logger.info("No files to be deleted from bucket")
        return True
    if not self.s3_handler:
        logger.error("Failed to create the s3 handler when trying to delete the s3 files")
        return False

    try:
        await self.s3_handler.adelete_keys_from_s3(s3_files_to_be_deleted)
    except RuntimeError as rte:
        logger.exception(
            f"Failed to delete file from s3 bucket. Reason: {rte}. However, the process will still continue !",
        )
    return True

generate_presigned_url(content, path)

This function is used to generate a time-limited download url

Parameters:

Name Type Description Default
content dict

STAC description of the item to generate an URL for

required
path str

Current path to this object

required

Returns:

Name Type Description
str str

Presigned URL

int int

HTTP return code

Source code in docs/rs-server/services/catalog/rs_server_catalog/data_management/s3_manager.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def generate_presigned_url(self, content: dict, path: str) -> tuple[str, int]:
    """This function is used to generate a time-limited download url

    Args:
        content (dict): STAC description of the item to generate an URL for
        path (str): Current path to this object

    Returns:
        str: Presigned URL
        int: HTTP return code
    """
    # Assume that pgstac already selected the correct asset id
    # just check type, generate and return url
    path_splitted = path.split("/")
    asset_id = path_splitted[-1]
    item_id = path_splitted[-3]
    # Retrieve bucket name from config using what's in content
    item_owner = content["properties"].get("owner", "")
    item_collection = content.get("collection", "").removeprefix(f"{item_owner}_")
    item_eopf_type = content["properties"].get("eopf:type", "")
    bucket_name = get_bucket_name_from_config(item_owner, item_collection, item_eopf_type)
    try:
        s3_path = content["assets"][asset_id]["href"].removeprefix(f"s3://{bucket_name}/")
    except KeyError:
        return f"Failed to find asset named '{asset_id}' from item '{item_id}'", HTTP_404_NOT_FOUND
    try:
        if not self.s3_handler:
            raise HTTPException(
                status_code=HTTP_500_INTERNAL_SERVER_ERROR,
                detail="Failed to find s3 credentials",
            )
        response = self.s3_handler.s3_client.generate_presigned_url(
            "get_object",
            Params={"Bucket": bucket_name, "Key": s3_path},
            ExpiresIn=PRESIGNED_URL_EXPIRATION_TIME,
        )
    except botocore.exceptions.ClientError:
        return "Failed to generate presigned url", HTTP_400_BAD_REQUEST
    return response, HTTP_302_FOUND

update_assets_checksums(content)

Update each asset with the checksum returned by S3 GetObjectAttributes.

Source code in docs/rs-server/services/catalog/rs_server_catalog/data_management/s3_manager.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def update_assets_checksums(self, content: dict) -> dict:
    """Update each asset with the checksum returned by S3 GetObjectAttributes."""
    if self.is_catalog_local_mode:
        return content

    user = content["properties"].get("owner", "")
    collection_id = content.get("collection", "").removeprefix(f"{user}_")
    item_eopf_type = content["properties"].get("eopf:type", "")
    bucket_name = get_bucket_name_from_config(user, collection_id, item_eopf_type)

    for asset_name, asset_info in content.get("assets", {}).items():
        href = asset_info.get("href")
        if not href:
            logger.warning("Asset %s has no href; skipping checksum update", asset_name)
            continue

        key = href.removeprefix(f"s3://{bucket_name}/")
        try:
            object_attributes = self.s3_handler.get_object_attributes(bucket_name, key)
        except RuntimeError as error:
            logger.warning("Failed to get checksum attributes for asset %s: %s", asset_name, error)
            continue

        # GetObjectAttributes returns the checksum values in the S3/AWS format:
        # a "Checksum" dict containing one or more algorithm-specific base64 values
        # such as ChecksumCRC32, ChecksumCRC32C, ChecksumSHA1 or ChecksumSHA256.
        # For now we store the first checksum value returned by the object storage
        # into the STAC asset field; the multihash/STAC-normalized conversion can
        # build on this once we preserve the selected algorithm alongside the value.
        checksum = object_attributes.get("Checksum", {})
        for checksum_key, checksum_value in checksum.items():
            if checksum_key.startswith("Checksum") and checksum_value:
                asset_info["file:checksum"] = checksum_value
                break

    return content

update_stac_item_publication(content, request, request_ids, item)

Update the JSON body of a feature with new stac extensions and owner information.

Parameters:

Name Type Description Default
content dict

The content to update.

required
request Request

The HTTP request object.

required
request_ids dict

IDs associated to the given request

required
item dict

The item from the catalog (if exists) to update.

required

Returns:

Name Type Description
dict dict

The updated content.

list dict

List of files to delete from the S3 bucket

Source code in docs/rs-server/services/catalog/rs_server_catalog/data_management/s3_manager.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def update_stac_item_publication(  # pylint: disable=too-many-locals,too-many-branches,too-many-nested-blocks
    self,
    content: dict,
    request: Request,
    request_ids: dict,
    item: dict,
) -> dict:
    """Update the JSON body of a feature with new stac extensions and owner information.

    Args:
        content (dict): The content to update.
        request (Request): The HTTP request object.
        request_ids (dict): IDs associated to the given request
        item (dict): The item from the catalog (if exists) to update.

    Returns:
        dict: The updated content.
        list: List of files to delete from the S3 bucket


    """
    collection_ids = request_ids.get("collection_ids", [])
    user = request_ids.get("owner_id")
    logger.debug(f"Update item for user: {user}")
    if not isinstance(collection_ids, list) or not collection_ids or not user:
        raise HTTPException(
            detail="Failed to get the user or the name of the collection!",
            status_code=HTTP_500_INTERNAL_SERVER_ERROR,
        )
    collection_id = collection_ids[0]
    verify_existing_item_from_catalog(request.method, item, content.get("id", "Unknown"), f"{user}_{collection_id}")

    # 3 - include new stac extensions if not present
    for new_stac_extension in [
        "https://home.rs-python.eu/ownership-stac-extension/v1.1.0/schema.json",
        "https://stac-extensions.github.io/alternate-assets/v1.1.0/schema.json",
        "https://stac-extensions.github.io/file/v2.1.0/schema.json",
    ]:
        if new_stac_extension not in content["stac_extensions"]:
            content["stac_extensions"].append(new_stac_extension)

    # 5 - add owner data
    content["properties"].update({"owner": user})
    content.update({"collection": f"{user}_{collection_id}"})
    logger.debug(f"The updated item for user: {user} ended")
    return content