Skip to content

rs_server_common/authentication/oauth2.md

<< Back to index

Implement OAuth2 authentication to the KeyCloak server.

LoginAndRedirect

Bases: Exception

Used to call the login endpoint and redirect to the calling endpoint. See https://github.com/fastapi/fastapi/discussions/7817#discussioncomment-5144391

Source code in docs/rs-server/services/common/rs_server_common/authentication/oauth2.py
58
59
60
61
62
class LoginAndRedirect(Exception):
    """
    Used to call the login endpoint and redirect to the calling endpoint.
    See https://github.com/fastapi/fastapi/discussions/7817#discussioncomment-5144391
    """

console_logged_message() async

Message sent to the user when they are already logged in from the python console.

Source code in docs/rs-server/services/common/rs_server_common/authentication/oauth2.py
96
97
98
async def console_logged_message() -> HTMLResponse:
    """Message sent to the user when they are already logged in from the python console."""
    return HTMLResponse("You are logged in.")

get_router(app)

Set and return the FastAPI router that implements the endpoints for oauth2 authentication."

Parameters:

Name Type Description Default
app FastAPI

FastAPI application

required

Returns:

Name Type Description
APIRouter APIRouter

FastAPI router, to be added to the FastAPI application.

Source code in docs/rs-server/services/common/rs_server_common/authentication/oauth2.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def get_router(app: FastAPI) -> APIRouter:  # pylint: disable=too-many-locals
    """
    Set and return the FastAPI router that implements the endpoints for oauth2 authentication."

    Args:
        app (FastAPI): FastAPI application

    Returns:
        APIRouter: FastAPI router, to be added to the FastAPI application.
    """

    # Returned router
    router = APIRouter()

    # Read environment variables
    oidc_endpoint = os.environ["OIDC_ENDPOINT"]
    oidc_realm = os.environ["OIDC_REALM"]
    oidc_client_id = os.environ["OIDC_CLIENT_ID"]
    oidc_client_secret = os.environ["OIDC_CLIENT_SECRET"]
    cookie_secret = os.environ["RSPY_COOKIE_SECRET"]

    # Existing middlewares. See: starlette/middleware/__init__.py::Middleware.__repr__
    middleware_names = [getattr(middleware.cls, "__name__", "") for middleware in app.user_middleware]

    # If not already there, add the SessionMiddleware, used to save session cookies.
    # Add it at the end (after the CORS middleware, that must be first)
    # Code copy/pasted from app.add_middleware(SessionMiddleware, secret_key=cookie_secret)
    if "SessionMiddleware" not in middleware_names:
        if app.middleware_stack:
            raise RuntimeError("Cannot add middleware after an application has started")
        app.user_middleware.append(Middleware(SessionMiddleware, secret_key=cookie_secret))

    # Configure the oauth2 authentication

    domain_url = f"{oidc_endpoint}/realms/{oidc_realm}"
    config_data = {
        "KEYCLOAK_CLIENT_ID": oidc_client_id,
        "KEYCLOAK_CLIENT_SECRET": oidc_client_secret,
        "KEYCLOAK_DOMAIN_URL": domain_url,
    }
    config = StarletteConfig(environ=config_data)
    oauth = starlette_client.OAuth(config)

    oidc_metadata_url = domain_url + "/.well-known/openid-configuration"

    global KEYCLOAK  # pylint: disable=global-statement
    KEYCLOAK = oauth.register(
        "keycloak",
        client_id=oidc_client_id,
        client_secret=oidc_client_secret,
        server_metadata_url=oidc_metadata_url,
        client_kwargs={
            "code_challenge_method": "S256",  # Add PKCE for Authorization Code
            "scope": "openid profile email",
        },
    )

    @app.exception_handler(LoginAndRedirect)
    async def login_and_redirect(
        request: Request,
        exc: LoginAndRedirect,  # pylint: disable=unused-argument
    ) -> Response:
        """Used to call the login endpoint and redirect to the calling endpoint."""
        return await login(request)

    @router.get(LOGIN_FROM_BROWSER, include_in_schema=False)
    async def login_from_browser(request: Request):
        """Login to oauth2 from a browser"""
        return await login(request)

    @router.get(LOGIN_FROM_CONSOLE, include_in_schema=False)
    async def login_from_console(request: Request):
        """Login to oauth2 from a python console"""
        return await login(request)

    @router.get("/console_logged_message", include_in_schema=False)
    async def console_logged_message_endpoint() -> HTMLResponse:
        """Send message to the user when they are already logged in from the python console."""
        return await console_logged_message()

    @router.get("/me")
    async def show_my_information(auth_info: Annotated[AuthInfo, Depends(get_user_info)]):
        """Show user information."""
        return {
            "user_login": auth_info.user_login,
            "iam_roles": sorted(auth_info.iam_roles),
        }

    @router.get("/logout", include_in_schema=False)
    async def logout(request: Request):
        """Logout the user."""

        # Remove the cookie
        request.session.pop(COOKIE_NAME, None)

        # Clear the state values used by the oauth2 authentication process
        for key in list(request.session.keys()):
            if key.startswith("_state_"):
                request.session.pop(key, None)

        # Redirect to the keycloak url to logout.
        # This url shows an HTML button that the user has to click manually to logout.
        metadata = await KEYCLOAK.load_server_metadata()
        end_session_endpoint = metadata["end_session_endpoint"]
        return RedirectResponse(end_session_endpoint)

    return router

get_user_info(request) async

Get user information from the OAuth2 authentication and the KeyCloak server.

Parameters:

Name Type Description Default
request Request

HTTP request

required
is_endpoint_dependency bool

is this function called as an endpoint dependency ?

required

Returns:

Name Type Description
tuple AuthInfo

A tuple containing user IAM roles, configuration data, and user login information.

Source code in docs/rs-server/services/common/rs_server_common/authentication/oauth2.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
async def get_user_info(request: Request) -> AuthInfo:
    """
    Get user information from the OAuth2 authentication and the KeyCloak server.

    Args:
        request (Request): HTTP request
        is_endpoint_dependency (bool): is this function called as an endpoint dependency ?

    Returns:
        tuple: A tuple containing user IAM roles, configuration data, and user login information.
    """

    # Read user information from cookies to see if he's logged in.
    # NOTE: the cookie is read from SessionMiddleware
    user = request.session.get(COOKIE_NAME)
    if not user:
        # We can login then redirect to this endpoint, but this is not possible to make redirection from the Swagger.
        # In this case, referer = http://<domain>:<port>/docs
        referer = request.headers.get("referer")
        if referer and (urlparse(referer).path.rstrip("/") == SWAGGER_HOMEPAGE):
            raise HTTPException(
                status.HTTP_401_UNAUTHORIZED,
                "You must first login by clicking the 'Login' link on top of this Swagger page.",
            )

        # Else, if the request comes from a browser, we login, then redirect (in the same webpage)
        # to the calling endpoint.
        if await is_from_browser(request):
            raise LoginAndRedirect

        # Else, the request comes from a console (curl, python console, ...).
        # It's not possible to redirect so send an HTTP error.
        # logger.debug(
        #     "Error login from a console using: "
        #     f"{str(request.url)!r} with headers:\n{json.dumps(dict(request.headers),indent=2)}",
        # )
        raise HTTPException(
            status.HTTP_401_UNAUTHORIZED,
            f"Error connecting to {str(request.url)!r} without a valid API key or OAuth2 session cookie.",
        )

    # Read the user ID and name from the cookie = from the OAuth2 authentication process
    user_id = user.get("sub")
    user_login = user.get("preferred_username")

    if KCUTIL is None:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="get_user_info method called in local mode",
        )

    # Now call the KeyCloak server again to get the user information (IAM roles, ...) from the user ID
    user_info = KCUTIL.get_user_info(user_id)  # pylint: disable=possibly-used-before-assignment

    # If the user is still enabled in KeyCloak
    if user_info.is_enabled:
        # The configuration dict is only set with the API key, not with the OAuth2 authentication.
        return AuthInfo(user_login=user_login, iam_roles=user_info.roles, apikey_config={})

    raise HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail=f"User {user_login!r} not found in KeyCloak.",
    )

is_from_browser(request) async

Return True if the request comes from a browser, False if from a console (curl, python console, ...)

Source code in docs/rs-server/services/common/rs_server_common/authentication/oauth2.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
async def is_from_browser(request: Request) -> bool:
    """Return True if the request comes from a browser, False if from a console (curl, python console, ...)"""

    # See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent
    try:
        user_agent = request.headers["user-agent"]
    except KeyError:
        return False
    return any(
        browser in user_agent
        for browser in (
            "Mozilla/",
            "Gecko/",
            "Firefox/",
            "AppleWebKit/",
            "Chrome/",
            "Safari/",
            "OPR/",
            "Opera/",
            "Presto/",
            "Edg/",
        )
    )

is_logged_in(request) async

Return True if the user is logged in.

Source code in docs/rs-server/services/common/rs_server_common/authentication/oauth2.py
90
91
92
93
async def is_logged_in(request: Request) -> bool:
    """Return True if the user is logged in."""
    # Check if the session cookie exists
    return COOKIE_NAME in request.session

login(request) async

Login using oauth2 from either a browser or a python console.

Source code in docs/rs-server/services/common/rs_server_common/authentication/oauth2.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
async def login(request: Request):
    """
    Login using oauth2 from either a browser or a python console.
    """
    calling_endpoint = request.url
    called_from_console = calling_endpoint.path.rstrip("/") == f"{AUTH_PREFIX}{LOGIN_FROM_CONSOLE}"

    # If the user is already logged in
    if await is_logged_in(request):
        if called_from_console:
            return await console_logged_message()

        # If the /login endpoint was called from the browser, redirect to the Swagger UI
        if calling_endpoint.path.rstrip("/") == f"{AUTH_PREFIX}{LOGIN_FROM_BROWSER}":
            return RedirectResponse(SWAGGER_HOMEPAGE)

        # For other endpoints called from the browser, redirect to this endpoint
        return RedirectResponse(calling_endpoint)

    # Code and state coming from keycloak
    code = request.query_params.get("code")
    state = request.query_params.get("state")

    # If they are not set, then we need to call keycloak,
    # which then will call again this endpiont.
    if (not code) and (not state):
        response = await KEYCLOAK.authorize_redirect(request, calling_endpoint)

        # If called from a console, return the login page url so the caller can display it itself.
        if called_from_console:
            return response.headers["location"]

        # From a browser, make the redirection in the current browser tab
        return response

    # Else we are called from keycloak.
    token = await KEYCLOAK.authorize_access_token(request)
    userinfo = dict(token["userinfo"])

    # Check that the user is enabled in keycloak
    user_id = userinfo.get("sub")
    user_login = userinfo.get("preferred_username")
    if KCUTIL is None:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="login method called in local mode",
        )
    if not KCUTIL.get_user_info(user_id).is_enabled:  # pylint: disable=possibly-used-before-assignment
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"User {user_login!r} is disabled from KeyCloak.",
        )

    # In a session cookie, we save the user information received from keycloak.
    request.session[COOKIE_NAME] = userinfo

    # Redirect to the calling endpoint after removing the authentication query parameters from the URL.
    # See: https://stackoverflow.com/a/7734686
    url = urlparse(str(calling_endpoint))
    query = parse_qs(url.query, keep_blank_values=True)
    for param in ["state", "session_state", "iss", "code"]:
        query.pop(param, None)
    url = url._replace(query=urlencode(query, True))
    return RedirectResponse(urlunparse(url))