Skip to content

rs_server_common/authentication/keycloak_util.md

<< Back to index

Utility module to get user information from the KeyCloak server.

KCInfo dataclass

User information from KeyCloak.

Attributes:

Name Type Description
is_enabled bool

is the user enabled in KeyCloak ?

roles list[str]

IAM roles given to the user in KeyCloak.

Source code in docs/rs-server/services/common/rs_server_common/authentication/keycloak_util.py
29
30
31
32
33
34
35
36
37
38
39
40
@dataclass
class KCInfo:
    """
    User information from KeyCloak.

    Attributes:
        is_enabled (bool): is the user enabled in KeyCloak ?
        roles (list[str]): IAM roles given to the user in KeyCloak.
    """

    is_enabled: bool
    roles: list[str]

KCUtil

Utility class to get user information from the KeyCloak server.

Source code in docs/rs-server/services/common/rs_server_common/authentication/keycloak_util.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class KCUtil:  # pylint: disable=too-few-public-methods
    """Utility class to get user information from the KeyCloak server."""

    def __init__(self) -> None:
        """Constructor."""
        self.keycloak_admin = self.__get_keycloak_admin()

    def __get_keycloak_admin(self) -> KeycloakAdmin:
        """Init and return an admin KeyCloak connection."""

        oidc_endpoint = os.environ["OIDC_ENDPOINT"]
        oidc_realm = os.environ["OIDC_REALM"]
        oidc_client_id = os.environ["OIDC_CLIENT_ID"]
        oidc_client_secret = os.environ["OIDC_CLIENT_SECRET"]

        logger.debug(f"Connecting to the keycloak server {oidc_endpoint} ...")
        try:
            keycloak_connection = KeycloakOpenIDConnection(
                server_url=oidc_endpoint,
                realm_name=oidc_realm,
                client_id=oidc_client_id,
                client_secret_key=oidc_client_secret,
                verify=True,
            )
            logger.debug("Connected to the keycloak server")
            return KeycloakAdmin(connection=keycloak_connection)

        except KeycloakError as error:
            raise RuntimeError(
                f"Error connecting with keycloak to '{oidc_endpoint}', "
                f"realm_name={oidc_realm} with client_id="
                f"{oidc_client_id}.",
            ) from error

    def get_user_info(self, user_id: str) -> KCInfo:
        """Get user information from the KeyCloak server."""

        try:
            kadm = self.keycloak_admin
            user = kadm.get_user(user_id)
            iam_roles = [role["name"] for role in kadm.get_composite_realm_roles_of_user(user_id)]
            return KCInfo(user["enabled"], iam_roles)

        except KeycloakGetError as error:

            # If the user is not found, this means he was removed from keycloak.
            # Thus we must remove all his api keys from the database.
            if (
                error.response_code == HTTP_404_NOT_FOUND
                and error.response_body is not None
                and "User not found" in error.response_body.decode("utf-8")
            ):
                logger.warning(f"User '{user_id}' not found in keycloak.")
                return KCInfo(False, [])

            # Raise other exceptions
            raise

__get_keycloak_admin()

Init and return an admin KeyCloak connection.

Source code in docs/rs-server/services/common/rs_server_common/authentication/keycloak_util.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def __get_keycloak_admin(self) -> KeycloakAdmin:
    """Init and return an admin KeyCloak connection."""

    oidc_endpoint = os.environ["OIDC_ENDPOINT"]
    oidc_realm = os.environ["OIDC_REALM"]
    oidc_client_id = os.environ["OIDC_CLIENT_ID"]
    oidc_client_secret = os.environ["OIDC_CLIENT_SECRET"]

    logger.debug(f"Connecting to the keycloak server {oidc_endpoint} ...")
    try:
        keycloak_connection = KeycloakOpenIDConnection(
            server_url=oidc_endpoint,
            realm_name=oidc_realm,
            client_id=oidc_client_id,
            client_secret_key=oidc_client_secret,
            verify=True,
        )
        logger.debug("Connected to the keycloak server")
        return KeycloakAdmin(connection=keycloak_connection)

    except KeycloakError as error:
        raise RuntimeError(
            f"Error connecting with keycloak to '{oidc_endpoint}', "
            f"realm_name={oidc_realm} with client_id="
            f"{oidc_client_id}.",
        ) from error

__init__()

Constructor.

Source code in docs/rs-server/services/common/rs_server_common/authentication/keycloak_util.py
46
47
48
def __init__(self) -> None:
    """Constructor."""
    self.keycloak_admin = self.__get_keycloak_admin()

get_user_info(user_id)

Get user information from the KeyCloak server.

Source code in docs/rs-server/services/common/rs_server_common/authentication/keycloak_util.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def get_user_info(self, user_id: str) -> KCInfo:
    """Get user information from the KeyCloak server."""

    try:
        kadm = self.keycloak_admin
        user = kadm.get_user(user_id)
        iam_roles = [role["name"] for role in kadm.get_composite_realm_roles_of_user(user_id)]
        return KCInfo(user["enabled"], iam_roles)

    except KeycloakGetError as error:

        # If the user is not found, this means he was removed from keycloak.
        # Thus we must remove all his api keys from the database.
        if (
            error.response_code == HTTP_404_NOT_FOUND
            and error.response_body is not None
            and "User not found" in error.response_body.decode("utf-8")
        ):
            logger.warning(f"User '{user_id}' not found in keycloak.")
            return KCInfo(False, [])

        # Raise other exceptions
        raise