Skip to content

rs_server_catalog/authentication_catalog.md

<< Back to index

Contains all functions used to manage the authentication in the catalog.

check_user_authorization(request_ids)

Checks that current user/owner is allowed to do operations on catalog objects.

Raises:

Type Description
HTTPException

When the user doesn't have the expected authorizations

Source code in docs/rs-server/services/catalog/rs_server_catalog/authentication_catalog.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def check_user_authorization(request_ids: dict) -> None:
    """
    Checks that current user/owner is allowed to do operations on catalog objects.

    Raises:
        HTTPException: When the user doesn't have the expected authorizations
    """
    # Retrieve owner ID and check authorizations
    if not request_ids["owner_id"]:
        request_ids["owner_id"] = get_user(None, request_ids["user_login"])
    if (  # If we are in cluster mode and the user_login is not authorized
        # to put/post/patch returns a HTTP_401_UNAUTHORIZED status.
        settings.CLUSTER_MODE
        and not get_authorisation(
            request_ids["collection_ids"],
            request_ids["auth_roles"],
            "write",
            request_ids["owner_id"],
            request_ids["user_login"],
        )
    ):
        raise utils2.log_http_exception(logger, status_code=HTTP_401_UNAUTHORIZED, detail="Unauthorized access.")

get_all_accessible_collections(collections, auth_roles, user_login)

Return the list of all collections accessible by the user calling it.

Parameters:

Name Type Description Default
collections dict

List of all collections.

required
auth_roles list

List of roles of the api-key.

required
user_login str

The api-key owner.

required

Returns:

Name Type Description
dict list[dict]

The list of all collections accessible by the user.

Source code in docs/rs-server/services/catalog/rs_server_catalog/authentication_catalog.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def get_all_accessible_collections(collections: dict, auth_roles: list, user_login: str) -> list[dict]:
    """Return the list of all collections accessible by the user calling it.

    Args:
        collections (dict): List of all collections.
        auth_roles (list): List of roles of the api-key.
        user_login (str): The api-key owner.

    Returns:
        dict: The list of all collections accessible by the user.
    """
    # Test user authorization on each collection
    accessible_collections = [
        requested_col
        for requested_col in collections
        if get_authorisation(
            [requested_col["id"]],
            auth_roles,
            "read",
            requested_col["owner"],
            user_login,
            owner_prefix=True,
        )
    ]

    # Return results, sorted by <owner>_<collection_id>
    return sorted(accessible_collections, key=lambda col: col["id"])

get_authorisation(requested_col_ids, auth_roles, requested_action, requested_owner_id, user_login, owner_prefix=False)

Check if the user is authorized to access collections.

Parameters:

Name Type Description Default
requested_col_ids list

IDs of the requested collections.

required
auth_roles list

The list of authorisations for the user_login.

required
requested_action str

Requested action (read, write or download) on the collections.

required
requested_owner_id str

The name of the owner of the collection {collection_id}.

required
user_login str

The owner of the key linked to the request.

required
owner_prefix bool

True if the collection IDs are prefixed by their collection _

False

Returns:

Name Type Description
bool bool

True if the user is authorized, else False

Source code in docs/rs-server/services/catalog/rs_server_catalog/authentication_catalog.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def get_authorisation(
    requested_col_ids: list[str],
    auth_roles: list[str],
    requested_action: str,
    requested_owner_id: str,
    user_login: str,
    owner_prefix: bool = False,
) -> bool:
    """
    Check if the user is authorized to access collections.

    Args:
        requested_col_ids (list): IDs of the requested collections.
        auth_roles (list): The list of authorisations for the user_login.
        requested_action (str): Requested action (read, write or download) on the collections.
        requested_owner_id (str): The name of the owner of the collection {collection_id}.
        user_login (str): The owner of the key linked to the request.
        owner_prefix (bool): True if the collection IDs are prefixed by their collection <owner>_

    Returns:
        bool: True if the user is authorized, else False
    """
    # No authorization needed in local mode
    if settings.LOCAL_MODE:
        return True

    # The UAC/Keycloak user (who is also the owner of the api key and oauth2 cookie)
    # always has all the rights on all the collections he owns.
    if user_login == requested_owner_id:
        return True

    # Parse authorization roles to retrieve the role owner_id, collection_id and action
    auth_role_pattern = (
        r"rs_catalog_(?P<owner_id>.*(?=:)):"  # Group owner_id
        r"(?P<collection_id>.+)_"  # Group collection_id
        r"(?P<action>read|write|download)"  # Group action
        r"(?=$)"  # Lookahead for end of line
    )
    parsed_auth_roles = []
    for role in auth_roles:
        if match := re.match(auth_role_pattern, role):
            parsed_auth_roles.append(match.groupdict())

    # For each requested collection
    for _requested_col_id in requested_col_ids:

        # Does the user have at least one role that authorizes him to request this collection ?
        requested_col_ok = False
        for auth_role in parsed_auth_roles:

            # Remove the owner prefix from the requested collection id, if any
            if owner_prefix:
                requested_col_id = _requested_col_id.removeprefix(f"{requested_owner_id}_")
            else:
                requested_col_id = _requested_col_id

            # Does this role give the authorization to this collection ID ?
            col_id_ok = (auth_role["collection_id"] == "*") or (auth_role["collection_id"] == requested_col_id)

            # Does this role give the authorization to this collection owner ?
            owner_ok = (auth_role["owner_id"] == "*") or (auth_role["owner_id"] == requested_owner_id)

            # Does this role give the authorization to this collection for read/write/download ?
            action_ok = auth_role["action"] == requested_action

            # All conditions must be met for this role to give the authorization to the collection
            if col_id_ok and owner_ok and action_ok:
                requested_col_ok = True
                break  # no need to check other roles

        # The user has no role that authorizes him to request this collection.
        # Return False if the user is not authorized for at least one collection.
        if not requested_col_ok:
            return False

    # Return True if the user is authorized for all collections
    return True