Skip to content

rs_server_catalog/data_management/stac_manager.md

<< Back to index

Module grouping functions dedicated to the manipulation of STAC data

StacManager

Class grouping functions dedicated to the manipulation of STAC data

Source code in docs/rs-server/services/catalog/rs_server_catalog/data_management/stac_manager.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class StacManager:
    """Class grouping functions dedicated to the manipulation of STAC data"""

    @staticmethod
    async def add_authentication_extension(content: dict) -> None:
        """Add the stac authentication extension, see: https://github.com/stac-extensions/authentication

        Args:
            content (dict): STAC description of the object to which add the authentication extension
        """

        # Only on cluster mode
        if not common_settings.CLUSTER_MODE:
            return

        # Read environment variables
        oidc_endpoint = os.environ["OIDC_ENDPOINT"]
        oidc_realm = os.environ["OIDC_REALM"]
        oidc_metadata_url = f"{oidc_endpoint}/realms/{oidc_realm}/.well-known/openid-configuration"

        # Add the STAC extension at the root
        extensions = content.setdefault("stac_extensions", [])
        url = "https://stac-extensions.github.io/authentication/v1.1.0/schema.json"
        if url not in extensions:
            extensions.append(url)

        # Add the authentication schemes under the root or "properties" (for the items)
        parent = content
        if content.get("type") == "Feature":
            parent = content.setdefault("properties", {})
        oidc = await oauth2.KEYCLOAK.load_server_metadata()
        parent.setdefault("auth:schemes", {}).update(
            {
                "apikey": {
                    "type": "apiKey",
                    "description": f"API key generated using {os.environ['RSPY_UAC_HOMEPAGE']}"  # link to /docs
                    # add anchor to the "new api key" endpoint
                    "#/Manage%20API%20keys/get_new_api_key_auth_api_key_new_get",
                    "name": "x-api-key",
                    "in": "header",
                },
                "openid": {
                    "type": "openIdConnect",
                    "description": "OpenID Connect",
                    "openIdConnectUrl": oidc_metadata_url,
                },
                "oauth2": {
                    "type": "oauth2",
                    "description": "OAuth2+PKCE Authorization Code Flow",
                    "flows": {
                        "authorizationCode": {
                            "authorizationUrl": oidc["authorization_endpoint"],
                            "tokenUrl": oidc["token_endpoint"],
                            "scopes": {},
                        },
                    },
                },
                "s3": {
                    "type": "s3",
                    "description": "S3",
                },
            },
        )

        # Add the authentication reference to each link and asset
        for link in content.get("links", []):
            link["auth:refs"] = ["apikey", "openid", "oauth2"]
        for asset in list(content.get("assets", {}).values()):
            asset["auth:refs"] = ["s3"]
            if ALTERNATE_STRING in asset:
                asset[ALTERNATE_STRING].update({"auth:refs": ["apikey", "openid", "oauth2"]})
        # Add the extension to the response root and to nested collections, items, ...
        # Do recursive calls to all nested fields, if defined
        for nested_field in ["collections", "features"]:
            for nested_content in content.get(nested_field, []):
                await StacManager.add_authentication_extension(nested_content)

    @staticmethod
    def update_stac_catalog_metadata(metadata: dict) -> None:
        """Update the metadata fields from a catalog

        Args:
            metadata (dict): The metadata that has to be updated. The fields id, title,
                            description and stac_version are to be updated, by using the env vars which have
                            to be set before starting the app/pod. The existing values are used if
                            the env vars are not found
        """
        if metadata.get("type") == "Catalog":
            for key in ["id", "title", "description", "stac_version"]:
                if key in metadata:
                    metadata[key] = os.environ.get(f"CATALOG_METADATA_{key.upper()}", metadata[key])

    @staticmethod
    def update_links_for_all_collections(collections: list[dict]) -> list[dict]:
        """Update the links for the endpoint /catalog/collections.

        Args:
            collections (list[dict]): all the collections to be updated.

        Returns:
            list[dict]: all the collections after the links updated.
        """
        for collection in collections:
            owner_id = collection["owner"]
            collection["id"] = collection["id"].removeprefix(f"{owner_id}_")
            for link in collection["links"]:
                link_parser = urlparse(link["href"])
                new_path = add_user_prefix(link_parser.path, owner_id, collection["id"])
                link["href"] = link_parser._replace(path=new_path).geturl()
        return collections

    @staticmethod
    def get_s3_filename_from_asset(asset: dict) -> tuple[str, bool]:
        """
        Retrieve the S3 key from the asset content.

        During the staging process, the content of the asset should be:
            "filename": {
                "href": "s3://temp_catalog/path/to/filename",
            }

        Once the asset is inserted in the catalog, the content typically looks like this:
            "filename": {
                "alternate": {
                    "https": {
                        "https://127.0.0.1:8083/catalog/collections/user:collection_name/items/filename/download/file",
                    }
                },
                "href": "s3://rs-dev-cluster-catalog/path/to/filename",
            }

        Args:
            asset (dict): The content of the asset.

        Returns:
            tuple[str, bool]: A tuple containing the full S3 path of the object and a boolean indicating
                            whether the S3 key was retrieved from the 'alternate' field.

        Raises:
            HTTPException: If the S3 key could not be loaded or is invalid.
        """
        # Attempt to retrieve the S3 key from the 'alternate.s3.href' or 'href' fields
        s3_filename = asset.get("href", "")
        alternate_field = bool(asset.get("alternate", None))

        # Validate that the S3 key was successfully retrieved and has the correct format
        if not is_s3_path(s3_filename):
            raise HTTPException(
                detail=f"Failed to load the S3 key from the asset content {asset}",
                status_code=HTTP_400_BAD_REQUEST,
            )

        return s3_filename, alternate_field

add_authentication_extension(content) async staticmethod

Add the stac authentication extension, see: https://github.com/stac-extensions/authentication

Parameters:

Name Type Description Default
content dict

STAC description of the object to which add the authentication extension

required
Source code in docs/rs-server/services/catalog/rs_server_catalog/data_management/stac_manager.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@staticmethod
async def add_authentication_extension(content: dict) -> None:
    """Add the stac authentication extension, see: https://github.com/stac-extensions/authentication

    Args:
        content (dict): STAC description of the object to which add the authentication extension
    """

    # Only on cluster mode
    if not common_settings.CLUSTER_MODE:
        return

    # Read environment variables
    oidc_endpoint = os.environ["OIDC_ENDPOINT"]
    oidc_realm = os.environ["OIDC_REALM"]
    oidc_metadata_url = f"{oidc_endpoint}/realms/{oidc_realm}/.well-known/openid-configuration"

    # Add the STAC extension at the root
    extensions = content.setdefault("stac_extensions", [])
    url = "https://stac-extensions.github.io/authentication/v1.1.0/schema.json"
    if url not in extensions:
        extensions.append(url)

    # Add the authentication schemes under the root or "properties" (for the items)
    parent = content
    if content.get("type") == "Feature":
        parent = content.setdefault("properties", {})
    oidc = await oauth2.KEYCLOAK.load_server_metadata()
    parent.setdefault("auth:schemes", {}).update(
        {
            "apikey": {
                "type": "apiKey",
                "description": f"API key generated using {os.environ['RSPY_UAC_HOMEPAGE']}"  # link to /docs
                # add anchor to the "new api key" endpoint
                "#/Manage%20API%20keys/get_new_api_key_auth_api_key_new_get",
                "name": "x-api-key",
                "in": "header",
            },
            "openid": {
                "type": "openIdConnect",
                "description": "OpenID Connect",
                "openIdConnectUrl": oidc_metadata_url,
            },
            "oauth2": {
                "type": "oauth2",
                "description": "OAuth2+PKCE Authorization Code Flow",
                "flows": {
                    "authorizationCode": {
                        "authorizationUrl": oidc["authorization_endpoint"],
                        "tokenUrl": oidc["token_endpoint"],
                        "scopes": {},
                    },
                },
            },
            "s3": {
                "type": "s3",
                "description": "S3",
            },
        },
    )

    # Add the authentication reference to each link and asset
    for link in content.get("links", []):
        link["auth:refs"] = ["apikey", "openid", "oauth2"]
    for asset in list(content.get("assets", {}).values()):
        asset["auth:refs"] = ["s3"]
        if ALTERNATE_STRING in asset:
            asset[ALTERNATE_STRING].update({"auth:refs": ["apikey", "openid", "oauth2"]})
    # Add the extension to the response root and to nested collections, items, ...
    # Do recursive calls to all nested fields, if defined
    for nested_field in ["collections", "features"]:
        for nested_content in content.get(nested_field, []):
            await StacManager.add_authentication_extension(nested_content)

get_s3_filename_from_asset(asset) staticmethod

Retrieve the S3 key from the asset content.

During the staging process, the content of the asset should be: "filename": { "href": "s3://temp_catalog/path/to/filename", }

Once the asset is inserted in the catalog, the content typically looks like this: "filename": { "alternate": { "https": { "https://127.0.0.1:8083/catalog/collections/user:collection_name/items/filename/download/file", } }, "href": "s3://rs-dev-cluster-catalog/path/to/filename", }

Parameters:

Name Type Description Default
asset dict

The content of the asset.

required

Returns:

Type Description
tuple[str, bool]

tuple[str, bool]: A tuple containing the full S3 path of the object and a boolean indicating whether the S3 key was retrieved from the 'alternate' field.

Raises:

Type Description
HTTPException

If the S3 key could not be loaded or is invalid.

Source code in docs/rs-server/services/catalog/rs_server_catalog/data_management/stac_manager.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
@staticmethod
def get_s3_filename_from_asset(asset: dict) -> tuple[str, bool]:
    """
    Retrieve the S3 key from the asset content.

    During the staging process, the content of the asset should be:
        "filename": {
            "href": "s3://temp_catalog/path/to/filename",
        }

    Once the asset is inserted in the catalog, the content typically looks like this:
        "filename": {
            "alternate": {
                "https": {
                    "https://127.0.0.1:8083/catalog/collections/user:collection_name/items/filename/download/file",
                }
            },
            "href": "s3://rs-dev-cluster-catalog/path/to/filename",
        }

    Args:
        asset (dict): The content of the asset.

    Returns:
        tuple[str, bool]: A tuple containing the full S3 path of the object and a boolean indicating
                        whether the S3 key was retrieved from the 'alternate' field.

    Raises:
        HTTPException: If the S3 key could not be loaded or is invalid.
    """
    # Attempt to retrieve the S3 key from the 'alternate.s3.href' or 'href' fields
    s3_filename = asset.get("href", "")
    alternate_field = bool(asset.get("alternate", None))

    # Validate that the S3 key was successfully retrieved and has the correct format
    if not is_s3_path(s3_filename):
        raise HTTPException(
            detail=f"Failed to load the S3 key from the asset content {asset}",
            status_code=HTTP_400_BAD_REQUEST,
        )

    return s3_filename, alternate_field

Update the links for the endpoint /catalog/collections.

Parameters:

Name Type Description Default
collections list[dict]

all the collections to be updated.

required

Returns:

Type Description
list[dict]

list[dict]: all the collections after the links updated.

Source code in docs/rs-server/services/catalog/rs_server_catalog/data_management/stac_manager.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@staticmethod
def update_links_for_all_collections(collections: list[dict]) -> list[dict]:
    """Update the links for the endpoint /catalog/collections.

    Args:
        collections (list[dict]): all the collections to be updated.

    Returns:
        list[dict]: all the collections after the links updated.
    """
    for collection in collections:
        owner_id = collection["owner"]
        collection["id"] = collection["id"].removeprefix(f"{owner_id}_")
        for link in collection["links"]:
            link_parser = urlparse(link["href"])
            new_path = add_user_prefix(link_parser.path, owner_id, collection["id"])
            link["href"] = link_parser._replace(path=new_path).geturl()
    return collections

update_stac_catalog_metadata(metadata) staticmethod

Update the metadata fields from a catalog

Parameters:

Name Type Description Default
metadata dict

The metadata that has to be updated. The fields id, title, description and stac_version are to be updated, by using the env vars which have to be set before starting the app/pod. The existing values are used if the env vars are not found

required
Source code in docs/rs-server/services/catalog/rs_server_catalog/data_management/stac_manager.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@staticmethod
def update_stac_catalog_metadata(metadata: dict) -> None:
    """Update the metadata fields from a catalog

    Args:
        metadata (dict): The metadata that has to be updated. The fields id, title,
                        description and stac_version are to be updated, by using the env vars which have
                        to be set before starting the app/pod. The existing values are used if
                        the env vars are not found
    """
    if metadata.get("type") == "Catalog":
        for key in ["id", "title", "description", "stac_version"]:
            if key in metadata:
                metadata[key] = os.environ.get(f"CATALOG_METADATA_{key.upper()}", metadata[key])