Utility module to get user information from the KeyCloak server.
KCInfo
dataclass
User information from KeyCloak.
Attributes:
| Name |
Type |
Description |
is_enabled |
bool
|
is the user enabled in KeyCloak ?
|
roles |
list[str]
|
IAM roles given to the user in KeyCloak.
|
Source code in docs/rs-server/services/common/rs_server_common/authentication/keycloak_util.py
29
30
31
32
33
34
35
36
37
38
39
40 | @dataclass
class KCInfo:
"""
User information from KeyCloak.
Attributes:
is_enabled (bool): is the user enabled in KeyCloak ?
roles (list[str]): IAM roles given to the user in KeyCloak.
"""
is_enabled: bool
roles: list[str]
|
KCUtil
Utility class to get user information from the KeyCloak server.
Source code in docs/rs-server/services/common/rs_server_common/authentication/keycloak_util.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99 | class KCUtil: # pylint: disable=too-few-public-methods
"""Utility class to get user information from the KeyCloak server."""
def __init__(self) -> None:
"""Constructor."""
self.keycloak_admin = self.__get_keycloak_admin()
def __get_keycloak_admin(self) -> KeycloakAdmin:
"""Init and return an admin KeyCloak connection."""
oidc_endpoint = os.environ["OIDC_ENDPOINT"]
oidc_realm = os.environ["OIDC_REALM"]
oidc_client_id = os.environ["OIDC_CLIENT_ID"]
oidc_client_secret = os.environ["OIDC_CLIENT_SECRET"]
logger.debug(f"Connecting to the keycloak server {oidc_endpoint} ...")
try:
keycloak_connection = KeycloakOpenIDConnection(
server_url=oidc_endpoint,
realm_name=oidc_realm,
client_id=oidc_client_id,
client_secret_key=oidc_client_secret,
verify=True,
)
logger.debug("Connected to the keycloak server")
return KeycloakAdmin(connection=keycloak_connection)
except KeycloakError as error:
raise RuntimeError(
f"Error connecting with keycloak to '{oidc_endpoint}', "
f"realm_name={oidc_realm} with client_id="
f"{oidc_client_id}.",
) from error
def get_user_info(self, user_id: str) -> KCInfo:
"""Get user information from the KeyCloak server."""
try:
kadm = self.keycloak_admin
user = kadm.get_user(user_id)
iam_roles = [role["name"] for role in kadm.get_composite_realm_roles_of_user(user_id)]
return KCInfo(user["enabled"], iam_roles)
except KeycloakGetError as error:
# If the user is not found, this means he was removed from keycloak.
# Thus we must remove all his api keys from the database.
if (
error.response_code == HTTP_404_NOT_FOUND
and error.response_body is not None
and "User not found" in error.response_body.decode("utf-8")
):
logger.warning(f"User '{user_id}' not found in keycloak.")
return KCInfo(False, [])
# Raise other exceptions
raise
|
__get_keycloak_admin()
Init and return an admin KeyCloak connection.
Source code in docs/rs-server/services/common/rs_server_common/authentication/keycloak_util.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 | def __get_keycloak_admin(self) -> KeycloakAdmin:
"""Init and return an admin KeyCloak connection."""
oidc_endpoint = os.environ["OIDC_ENDPOINT"]
oidc_realm = os.environ["OIDC_REALM"]
oidc_client_id = os.environ["OIDC_CLIENT_ID"]
oidc_client_secret = os.environ["OIDC_CLIENT_SECRET"]
logger.debug(f"Connecting to the keycloak server {oidc_endpoint} ...")
try:
keycloak_connection = KeycloakOpenIDConnection(
server_url=oidc_endpoint,
realm_name=oidc_realm,
client_id=oidc_client_id,
client_secret_key=oidc_client_secret,
verify=True,
)
logger.debug("Connected to the keycloak server")
return KeycloakAdmin(connection=keycloak_connection)
except KeycloakError as error:
raise RuntimeError(
f"Error connecting with keycloak to '{oidc_endpoint}', "
f"realm_name={oidc_realm} with client_id="
f"{oidc_client_id}.",
) from error
|
__init__()
Constructor.
Source code in docs/rs-server/services/common/rs_server_common/authentication/keycloak_util.py
| def __init__(self) -> None:
"""Constructor."""
self.keycloak_admin = self.__get_keycloak_admin()
|
get_user_info(user_id)
Get user information from the KeyCloak server.
Source code in docs/rs-server/services/common/rs_server_common/authentication/keycloak_util.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99 | def get_user_info(self, user_id: str) -> KCInfo:
"""Get user information from the KeyCloak server."""
try:
kadm = self.keycloak_admin
user = kadm.get_user(user_id)
iam_roles = [role["name"] for role in kadm.get_composite_realm_roles_of_user(user_id)]
return KCInfo(user["enabled"], iam_roles)
except KeycloakGetError as error:
# If the user is not found, this means he was removed from keycloak.
# Thus we must remove all his api keys from the database.
if (
error.response_code == HTTP_404_NOT_FOUND
and error.response_body is not None
and "User not found" in error.response_body.decode("utf-8")
):
logger.warning(f"User '{user_id}' not found in keycloak.")
return KCInfo(False, [])
# Raise other exceptions
raise
|