Skip to content

rs_server_osam/utils/keycloak_handler.md

<< Back to index

Class to handle connection and requests to Keycloak

KeycloakHandler

Class to handle connection and requests to Keycloak

Source code in docs/rs-server/services/osam/rs_server_osam/utils/keycloak_handler.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class KeycloakHandler:
    """Class to handle connection and requests to Keycloak"""

    def __init__(self) -> None:
        self.keycloak_admin = self.__open_keycloak_connection()

    def __open_keycloak_connection(self) -> KeycloakAdmin:

        server_url = os.environ["OIDC_ENDPOINT"]
        realm_name = os.environ["OIDC_REALM"]
        client_id = os.environ["OIDC_CLIENT_ID"]
        client_secret_key = os.environ["OIDC_CLIENT_SECRET"]

        logger.debug("Connecting to the keycloak server %s ...", server_url)

        try:
            keycloak_connection = KeycloakOpenIDConnection(
                server_url=server_url,
                realm_name=realm_name,
                client_id=client_id,
                client_secret_key=client_secret_key,
                verify=True,
            )
            logger.debug("Connected to the Keycloak server")
            return KeycloakAdmin(connection=keycloak_connection)

        except KeycloakError as error:
            raise RuntimeError(
                f"Error connecting with keycloak to '{server_url}', "
                f"realm_name={realm_name} with client_id={client_id}.",
            ) from error

    def get_keycloak_user_roles(self, user_id: str) -> list[dict]:
        """Returns the list of roles for a given user
        RoleRepresentation: https://www.keycloak.org/docs-api/latest/rest-api/index.html#RoleRepresentation

        Args:
            user_id (str): ID of user for who we want the roles

        Returns:
            list[dict]: List of RoleRepresentation as dicts
        """
        all_roles = {}

        for group in self.keycloak_admin.get_user_groups(user_id) or []:
            for role in self.keycloak_admin.get_group_realm_roles(group["id"]):
                all_roles[role["name"]] = role

        for role in self.keycloak_admin.get_realm_roles_of_user(user_id):
            all_roles[role["name"]] = role

        return list(all_roles.values())

    def get_keycloak_users(self) -> list[dict]:
        """Returns the list of all Keycloak users
        UserRepresentation: https://www.keycloak.org/docs-api/latest/rest-api/index.html#UserRepresentation

        Returns:
            list[dict]: List of UserRepresentation as dicts
        """
        return self.keycloak_admin.get_users({})

    def get_obs_user_from_keycloak_user(self, keycloak_user: dict) -> str | None:
        """Retrieves the attribute 'obs-user' from the given Keycloak user.
        Returns None if the field doesn't exist.

        Args:
            keycloak_user (dict): UserRepresentation
            (https://www.keycloak.org/docs-api/latest/rest-api/index.html#UserRepresentation)

        Returns:
            str | None: obs user ID or None
        """
        try:
            return keycloak_user["attributes"]["obs-user"]
        except KeyError:
            return None

    def get_obs_user_from_keycloak_username(self, keycloak_user: str) -> str | None:
        """
        Fetches the 'obs-user' attribute from Keycloak for the given keycloak_user.

        Returns:
            str or None: The 'obs-user' value if available, otherwise None.

        Raises:
            KeycloakConnectionError, KeycloakAuthenticationError: For critical Keycloak issues.
        """
        # sanitize the variable before logging, otherwise SonarCloud will complain
        # allow common username chars only
        log_keycloak_user = re.sub(r"[^\w.@+-]", "_", str(keycloak_user))
        try:
            user_id = self.keycloak_admin.get_user_id(keycloak_user)
            user = self.keycloak_admin.get_user(user_id)  # type: ignore
            attributes = user.get("attributes", {}) if user else {}

            obs_user = attributes.get("obs-user")
            if isinstance(obs_user, list) and obs_user:
                return obs_user[0]
            if isinstance(obs_user, str):
                return obs_user

            logger.warning(
                f"Unexpected or missing 'obs-user' for '{log_keycloak_user}' (ID: {user_id}). "
                f"Type: {type(obs_user)} Value: {obs_user}",
            )
        except (KeycloakConnectionError, KeycloakAuthenticationError) as e:
            logger.error(f"Keycloak critical error for '{log_keycloak_user}': {e}")
            raise
        except KeycloakError as e:
            logger.error(f"Keycloak error retrieving user '{log_keycloak_user}': {e}")
            raise
        except Exception as e:  # pylint: disable=broad-exception-caught
            logger.error(f"Unexpected error for user '{log_keycloak_user}': {e}")
            raise

        return None

    def set_obs_user_in_keycloak_user(self, keycloak_user: dict, obs_user: str):
        """Sets the attribute 'obs-user' in the given Keycloak user.

        Args:
            keycloak_user (dict): UserRepresentation
            (https://www.keycloak.org/docs-api/latest/rest-api/index.html#UserRepresentation)

        Returns:
            dict: UserRepresentation (https://www.keycloak.org/docs-api/latest/rest-api/index.html#UserRepresentation)
        """
        attributes = keycloak_user.get("attributes", {})
        attributes["obs-user"] = [obs_user]  # Must be a list

        payload = {"attributes": attributes}

        self.keycloak_admin.update_user(user_id=keycloak_user["id"], payload=payload)

    def update_keycloak_user(self, user_id: str, payload: dict):
        """Updates the Keycloak user linked to the given user_id with the given payload.
        The payload must follow Keycloak's UserRepresentation:
        https://www.keycloak.org/docs-api/latest/rest-api/index.html#UserRepresentation

        Args:
            user_id (str): ID of the Keycloak user to update
            payload (dict): UserRepresentation with the up-to-date data
        """
        try:
            self.keycloak_admin.update_user(user_id=user_id, payload=payload)
        except KeycloakPutError as kpe:
            raise RuntimeError(f"Could not update client, {kpe}") from kpe

get_keycloak_user_roles(user_id)

Returns the list of roles for a given user RoleRepresentation: https://www.keycloak.org/docs-api/latest/rest-api/index.html#RoleRepresentation

Parameters:

Name Type Description Default
user_id str

ID of user for who we want the roles

required

Returns:

Type Description
list[dict]

list[dict]: List of RoleRepresentation as dicts

Source code in docs/rs-server/services/osam/rs_server_osam/utils/keycloak_handler.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def get_keycloak_user_roles(self, user_id: str) -> list[dict]:
    """Returns the list of roles for a given user
    RoleRepresentation: https://www.keycloak.org/docs-api/latest/rest-api/index.html#RoleRepresentation

    Args:
        user_id (str): ID of user for who we want the roles

    Returns:
        list[dict]: List of RoleRepresentation as dicts
    """
    all_roles = {}

    for group in self.keycloak_admin.get_user_groups(user_id) or []:
        for role in self.keycloak_admin.get_group_realm_roles(group["id"]):
            all_roles[role["name"]] = role

    for role in self.keycloak_admin.get_realm_roles_of_user(user_id):
        all_roles[role["name"]] = role

    return list(all_roles.values())

get_keycloak_users()

Returns the list of all Keycloak users UserRepresentation: https://www.keycloak.org/docs-api/latest/rest-api/index.html#UserRepresentation

Returns:

Type Description
list[dict]

list[dict]: List of UserRepresentation as dicts

Source code in docs/rs-server/services/osam/rs_server_osam/utils/keycloak_handler.py
84
85
86
87
88
89
90
91
def get_keycloak_users(self) -> list[dict]:
    """Returns the list of all Keycloak users
    UserRepresentation: https://www.keycloak.org/docs-api/latest/rest-api/index.html#UserRepresentation

    Returns:
        list[dict]: List of UserRepresentation as dicts
    """
    return self.keycloak_admin.get_users({})

get_obs_user_from_keycloak_user(keycloak_user)

Retrieves the attribute 'obs-user' from the given Keycloak user. Returns None if the field doesn't exist.

Parameters:

Name Type Description Default
keycloak_user dict

UserRepresentation

required
(https

//www.keycloak.org/docs-api/latest/rest-api/index.html#UserRepresentation)

required

Returns:

Type Description
str | None

str | None: obs user ID or None

Source code in docs/rs-server/services/osam/rs_server_osam/utils/keycloak_handler.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def get_obs_user_from_keycloak_user(self, keycloak_user: dict) -> str | None:
    """Retrieves the attribute 'obs-user' from the given Keycloak user.
    Returns None if the field doesn't exist.

    Args:
        keycloak_user (dict): UserRepresentation
        (https://www.keycloak.org/docs-api/latest/rest-api/index.html#UserRepresentation)

    Returns:
        str | None: obs user ID or None
    """
    try:
        return keycloak_user["attributes"]["obs-user"]
    except KeyError:
        return None

get_obs_user_from_keycloak_username(keycloak_user)

Fetches the 'obs-user' attribute from Keycloak for the given keycloak_user.

Returns:

Type Description
str | None

str or None: The 'obs-user' value if available, otherwise None.

Raises:

Type Description
(KeycloakConnectionError, KeycloakAuthenticationError)

For critical Keycloak issues.

Source code in docs/rs-server/services/osam/rs_server_osam/utils/keycloak_handler.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def get_obs_user_from_keycloak_username(self, keycloak_user: str) -> str | None:
    """
    Fetches the 'obs-user' attribute from Keycloak for the given keycloak_user.

    Returns:
        str or None: The 'obs-user' value if available, otherwise None.

    Raises:
        KeycloakConnectionError, KeycloakAuthenticationError: For critical Keycloak issues.
    """
    # sanitize the variable before logging, otherwise SonarCloud will complain
    # allow common username chars only
    log_keycloak_user = re.sub(r"[^\w.@+-]", "_", str(keycloak_user))
    try:
        user_id = self.keycloak_admin.get_user_id(keycloak_user)
        user = self.keycloak_admin.get_user(user_id)  # type: ignore
        attributes = user.get("attributes", {}) if user else {}

        obs_user = attributes.get("obs-user")
        if isinstance(obs_user, list) and obs_user:
            return obs_user[0]
        if isinstance(obs_user, str):
            return obs_user

        logger.warning(
            f"Unexpected or missing 'obs-user' for '{log_keycloak_user}' (ID: {user_id}). "
            f"Type: {type(obs_user)} Value: {obs_user}",
        )
    except (KeycloakConnectionError, KeycloakAuthenticationError) as e:
        logger.error(f"Keycloak critical error for '{log_keycloak_user}': {e}")
        raise
    except KeycloakError as e:
        logger.error(f"Keycloak error retrieving user '{log_keycloak_user}': {e}")
        raise
    except Exception as e:  # pylint: disable=broad-exception-caught
        logger.error(f"Unexpected error for user '{log_keycloak_user}': {e}")
        raise

    return None

set_obs_user_in_keycloak_user(keycloak_user, obs_user)

Sets the attribute 'obs-user' in the given Keycloak user.

Parameters:

Name Type Description Default
keycloak_user dict

UserRepresentation

required
(https

//www.keycloak.org/docs-api/latest/rest-api/index.html#UserRepresentation)

required

Returns:

Name Type Description
dict

UserRepresentation (https://www.keycloak.org/docs-api/latest/rest-api/index.html#UserRepresentation)

Source code in docs/rs-server/services/osam/rs_server_osam/utils/keycloak_handler.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def set_obs_user_in_keycloak_user(self, keycloak_user: dict, obs_user: str):
    """Sets the attribute 'obs-user' in the given Keycloak user.

    Args:
        keycloak_user (dict): UserRepresentation
        (https://www.keycloak.org/docs-api/latest/rest-api/index.html#UserRepresentation)

    Returns:
        dict: UserRepresentation (https://www.keycloak.org/docs-api/latest/rest-api/index.html#UserRepresentation)
    """
    attributes = keycloak_user.get("attributes", {})
    attributes["obs-user"] = [obs_user]  # Must be a list

    payload = {"attributes": attributes}

    self.keycloak_admin.update_user(user_id=keycloak_user["id"], payload=payload)

update_keycloak_user(user_id, payload)

Updates the Keycloak user linked to the given user_id with the given payload. The payload must follow Keycloak's UserRepresentation: https://www.keycloak.org/docs-api/latest/rest-api/index.html#UserRepresentation

Parameters:

Name Type Description Default
user_id str

ID of the Keycloak user to update

required
payload dict

UserRepresentation with the up-to-date data

required
Source code in docs/rs-server/services/osam/rs_server_osam/utils/keycloak_handler.py
166
167
168
169
170
171
172
173
174
175
176
177
178
def update_keycloak_user(self, user_id: str, payload: dict):
    """Updates the Keycloak user linked to the given user_id with the given payload.
    The payload must follow Keycloak's UserRepresentation:
    https://www.keycloak.org/docs-api/latest/rest-api/index.html#UserRepresentation

    Args:
        user_id (str): ID of the Keycloak user to update
        payload (dict): UserRepresentation with the up-to-date data
    """
    try:
        self.keycloak_admin.update_user(user_id=user_id, payload=payload)
    except KeycloakPutError as kpe:
        raise RuntimeError(f"Could not update client, {kpe}") from kpe