Skip to content

rs_server_catalog/utils.md

<< Back to index

This library contains functions used in handling the user catalog.

delete_s3_files(s3_files_to_be_deleted)

Used to clear specific files from temporary bucket or from catalog bucket.

Source code in docs/rs-server/services/catalog/rs_server_catalog/utils.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def delete_s3_files(s3_files_to_be_deleted):
    """Used to clear specific files from temporary bucket or from catalog bucket."""
    if not s3_files_to_be_deleted:
        logger.info("No files to be deleted from bucket")
        return True
    s3_handler = get_s3_handler()
    if not s3_handler:
        logger.error("Failed to create the s3 handler when trying to delete the s3 files")
        return False

    try:
        s3_handler.delete_keys_from_s3(s3_files_to_be_deleted)
    except RuntimeError as rte:
        logger.exception(
            f"Failed to delete keys from s3 bucket. Reason: {rte}. However, the process will still continue !",
        )
    return True

get_s3_filename_from_asset(asset)

Retrieve the S3 key from the asset content.

During the staging process, the content of the asset should be: "filename": { "href": "s3://temp_catalog/path/to/filename", }

Once the asset is inserted in the catalog, the content typically looks like this: "filename": { "alternate": { "https": { "https://127.0.0.1:8083/catalog/collections/user:collection_name/items/filename/download/file", } }, "href": "s3://rs-dev-cluster-catalog/path/to/filename", }

Parameters:

Name Type Description Default
asset dict

The content of the asset.

required

Returns:

Type Description
tuple[str, bool]

tuple[str, bool]: A tuple containing the full S3 path of the object and a boolean indicating whether the S3 key was retrieved from the 'alternate' field.

Raises:

Type Description
HTTPException

If the S3 key could not be loaded or is invalid.

Source code in docs/rs-server/services/catalog/rs_server_catalog/utils.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def get_s3_filename_from_asset(asset: dict) -> tuple[str, bool]:
    """
    Retrieve the S3 key from the asset content.

    During the staging process, the content of the asset should be:
        "filename": {
            "href": "s3://temp_catalog/path/to/filename",
        }

    Once the asset is inserted in the catalog, the content typically looks like this:
        "filename": {
            "alternate": {
                "https": {
                    "https://127.0.0.1:8083/catalog/collections/user:collection_name/items/filename/download/file",
                }
            },
            "href": "s3://rs-dev-cluster-catalog/path/to/filename",
        }

    Args:
        asset (dict): The content of the asset.

    Returns:
        tuple[str, bool]: A tuple containing the full S3 path of the object and a boolean indicating
                          whether the S3 key was retrieved from the 'alternate' field.

    Raises:
        HTTPException: If the S3 key could not be loaded or is invalid.
    """
    # Attempt to retrieve the S3 key from the 'alternate.s3.href' or 'href' fields
    s3_filename = asset.get("href", "")
    alternate_field = bool(asset.get("alternate", None))

    # Validate that the S3 key was successfully retrieved and has the correct format
    if not is_s3_path(s3_filename):
        raise HTTPException(
            detail=f"Failed to load the S3 key from the asset content {asset}",
            status_code=HTTP_400_BAD_REQUEST,
        )

    return s3_filename, alternate_field

get_s3_handler()

Used to create the s3_handler to be used with s3 buckets.

Source code in docs/rs-server/services/catalog/rs_server_catalog/utils.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def get_s3_handler():
    """Used to create the s3_handler to be used with s3 buckets."""
    try:
        s3_handler = S3StorageHandler(
            os.environ["S3_ACCESSKEY"],
            os.environ["S3_SECRETKEY"],
            os.environ["S3_ENDPOINT"],
            os.environ["S3_REGION"],
        )
    except KeyError:
        print("Failed to find s3 credentials when trying to create the s3 handler")
        return None
    except RuntimeError:
        print("Failed to create the s3 handler")
        return None

    return s3_handler

get_temp_bucket_name(files_s3_key)

Retrieve the temporary bucket name from a list of S3 keys.

Parameters:

Name Type Description Default
files_s3_key list[str]

A list of S3 key strings.

required

Returns:

Type Description
str | None

str | None: The name of the temporary S3 bucket if valid, otherwise None.

Raises:

Type Description
HTTPException

If the S3 key does not match the expected pattern, or if multiple buckets are used.

Source code in docs/rs-server/services/catalog/rs_server_catalog/utils.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def get_temp_bucket_name(files_s3_key: list[str]) -> str | None:
    """
    Retrieve the temporary bucket name from a list of S3 keys.

    Args:
        files_s3_key (list[str]): A list of S3 key strings.

    Returns:
        str | None: The name of the temporary S3 bucket if valid, otherwise None.

    Raises:
        HTTPException: If the S3 key does not match the expected pattern, or if multiple buckets are used.
    """
    if not files_s3_key:
        return None

    bucket_names = set()

    for s3_key in files_s3_key:
        if not is_s3_path(s3_key):
            raise RuntimeError(
                f"The S3 key '{s3_key}' does not match the correct S3 path pattern " "(s3://bucket_name/path/to/obj)",
            )
        # Extract and add the bucket name to the set
        bucket_names.add(s3_key.split("/")[2])

    if len(bucket_names) != 1:
        raise RuntimeError(f"A single temporary S3 bucket should be used in the assets: {bucket_names!r}")

    return bucket_names.pop()

get_token_for_pagination(items_dic)

Used to get the token to be used when calling functions from the stac-fastapi-pgstac object.

Source code in docs/rs-server/services/catalog/rs_server_catalog/utils.py
186
187
188
189
190
191
192
def get_token_for_pagination(items_dic: dict[Any, Any]):
    """Used to get the token to be used when calling functions from the stac-fastapi-pgstac object."""
    token = None
    for link in items_dic.get("links", []):
        if link.get("rel") == "next":
            token = link.get("href", None)
    return token

headers_minus_content_length(response)

Returns response headers without Content-Length

Source code in docs/rs-server/services/catalog/rs_server_catalog/utils.py
195
196
197
def headers_minus_content_length(response: Response) -> dict[str, str]:
    """Returns response headers without Content-Length"""
    return {k: v for k, v in response.headers.items() if k.lower() != "content-length"}

is_s3_path(s3_key)

Function to check if a string matches the S3 pattern

Source code in docs/rs-server/services/catalog/rs_server_catalog/utils.py
128
129
130
131
132
def is_s3_path(s3_key):
    """Function to check if a string matches the S3 pattern"""
    if not isinstance(s3_key, str):
        return False
    return bool(s3_pattern.match(s3_key))

verify_existing_item_from_catalog(method, item, content_id_str, user_collection_str)

Verify if an exisiting item from the catalog may be created or updated

Parameters:

Name Type Description Default
method str

The HTTP method used in the request (e.g., "POST", "PUT", "PATCH").

required
item dict

The item from the catalog to check.

required
content_id_str str

The name of the item, used for generating an error message

required
user_collection_str str

The collection identifier including the user.

required

Raises:

Type Description
HTTPException

If a POST request is made for an existing item, or if a PUT/PATCH request is made for a non-existent item.

Source code in docs/rs-server/services/catalog/rs_server_catalog/utils.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def verify_existing_item_from_catalog(method: str, item: dict, content_id_str: str, user_collection_str: str):
    """Verify if an exisiting item from the catalog may be created or updated

    Args:
        method (str): The HTTP method used in the request (e.g., "POST", "PUT", "PATCH").
        item (dict): The item from the catalog to check.
        content_id_str (str): The name of the item, used for generating an error message
        user_collection_str (str): The collection identifier including the user.

    Raises:
        HTTPException: If a POST request is made for an existing item,
                       or if a PUT/PATCH request is made for a non-existent item.
    """

    # Protection for cases where a POST request attempts to add an
    # item with a name that already exists in the database.
    if method == "POST" and item:
        raise HTTPException(
            detail=f"The item {item['id']} " f"already exists in the {user_collection_str} collection",
            status_code=HTTP_409_CONFLICT,
        )
    # Protection for cases where a PUT or PATCH request is made for an item
    # that does not exist in the database.
    if method in {"PUT", "PATCH"} and not item:
        raise HTTPException(
            detail=f"The item {content_id_str} "
            f"does not exist in the {user_collection_str} collection for an update (PUT / PATCH request received)",
            status_code=HTTP_400_BAD_REQUEST,
        )