Skip to content

rs_server_osam/main.md

<< Back to index

osam main module.

__get_user_rights(user)

Retrieves and constructs the S3 access rights policy for a specified user.

This function
  • Looks up the user's Keycloak roles from the in-memory user store.
  • Parses the roles to determine S3 access permissions (read, read+download, write+download).
  • Generates a full S3 access policy document using predefined templates.

Parameters:

Name Type Description Default
user str

Username of the account for which to retrieve access rights.

required

Returns:

Name Type Description
dict

The constructed S3 access policy for the specified user.

Raises:

Type Description
HTTPException

If the user is not found in the in-memory Keycloak user store (HTTP 404).

Source code in docs/rs-server/services/osam/rs_server_osam/main.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def __get_user_rights(user):
    """
    Retrieves and constructs the S3 access rights policy for a specified user.

    This function:
      - Looks up the user's Keycloak roles from the in-memory user store.
      - Parses the roles to determine S3 access permissions (read, read+download, write+download).
      - Generates a full S3 access policy document using predefined templates.

    Args:
        user (str): Username of the account for which to retrieve access rights.

    Returns:
        dict: The constructed S3 access policy for the specified user.

    Raises:
        HTTPException: If the user is not found in the in-memory Keycloak user store (HTTP 404).
    """

    # If the users info have not been calculated yet (by calling '/storage/accounts/update')
    if app.extra["users_info"] is None:
        # We calculate them in a threading lock (so several threads won't call this at the same time)
        with LOCK:
            # Check a second time, in case another thread updated the value
            if app.extra["users_info"] is None:
                app.extra["users_info"] = build_users_data_map()

    if user not in app.extra["users_info"]:
        return None
    logger.debug(f"Building the rights for user {user}")
    s3_rights = build_s3_rights(app.extra["users_info"][user])
    return update_s3_rights_lists(s3_rights)

app_lifespan(fastapi_app) async

Lifespann app to be implemented with start up / stop logic

Source code in docs/rs-server/services/osam/rs_server_osam/main.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
@asynccontextmanager
async def app_lifespan(fastapi_app: FastAPI):
    """Lifespann app to be implemented with start up / stop logic"""

    logger.info("Starting up the application...")
    fastapi_app.extra["shutdown_event"] = threading.Event()
    # the trigger for running the logic in the background task
    fastapi_app.extra["users_sync_trigger"] = threading.Event()
    # save info for future requests of endpoint /storage/account/{user}/rights
    fastapi_app.extra["users_info"] = None  # dict[str, Any] | None
    # start the background task in a thread using asyncio.to_thread
    fastapi_app.extra["refresh_task"] = asyncio.create_task(
        asyncio.to_thread(main_osam_task, DEFAULT_OSAM_FREQUENCY_SYNC),
    )
    # trigger the first run -> this was disabled by a request from ops
    # app.extra["users_sync_trigger"].set()

    # Init objects for dependency injection
    settings.set_http_client(httpx.AsyncClient(timeout=DEFAULT_TIMEOUT_CONFIG))

    # Yield control back to the application (this is where the app will run)
    yield

    # shutdown logic (cleanup)
    logger.info("Shutting down the application...")
    # cancel the refresh task and wait for it to exit cleanly
    fastapi_app.extra["shutdown_event"].set()
    # make the main_osam_task to exit from the wait sleeping
    fastapi_app.extra["users_sync_trigger"].set()

    refresh_task = fastapi_app.extra.get("refresh_task")
    if refresh_task:
        try:
            await refresh_task  # Ensure the task exits
        except Exception as e:  # pylint: disable=broad-exception-caught
            logger.exception(f"Exception during shutdown of background thread: {e}")

    # Close objects for dependency injection
    await settings.del_http_client()

    logger.info("Application gracefully stopped...")

auth_validation(request)

Authorization validation: check that the user has the right role for a specific action.

Parameters:

Name Type Description Default
request Request

HTTP request

required
Source code in docs/rs-server/services/osam/rs_server_osam/main.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def auth_validation(request: Request):
    """
    Authorization validation: check that the user has the right role for a specific action.

    Args:
        request: HTTP request

    Raises:
        HTTPException if the user does not have the right role.
    """

    # In local mode, there is no authorization to check
    if settings.LOCAL_MODE:
        return

    requested_role = "rs_osam_update"  # in lower case
    logger.debug(f"Requested role: {requested_role!r}")

    try:
        auth_roles = [role.lower() for role in request.state.auth_roles]
        user_login = request.state.user_login
    except AttributeError as exc:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Authorization information is missing",
        ) from exc

    if requested_role not in auth_roles:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Missing authorization role {requested_role!r} for user {user_login!r} with roles: {auth_roles}",
        )

create_and_delete_obs_accounts(request) async

This endpoint is called by an RS operator with the rs_osam_update role. It triggers the synchronization of the creation and deletion of S3 Object Storage (OBS) accounts for all RS users, associated to their Keycloak account.

How it works:

  1. When a new Keycloak user account is created, an associated OBS access account with no rights is created and linked to it.

  2. When a Keycloak user account is deleted, the associated OBS access account is also deleted.

NOTE: to synchronize OBS user rights from Keycloak you then need to call the endpoint /storage/account/{user}/update.

Returns:

JSONResponse — Always a success message saying that the synchronization algorithm of the accounts started.

Source code in docs/rs-server/services/osam/rs_server_osam/main.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
@router.post("/storage/accounts/update")
async def create_and_delete_obs_accounts(request: Request):
    """
    This endpoint is called by an RS operator with the *rs_osam_update* role.
    It triggers the synchronization of the creation and deletion of S3 Object Storage (OBS) accounts for all RS users,
    associated to their Keycloak account.

    How it works:

    1. When a new Keycloak user account is created, an associated OBS access account **with no rights** is created
    and linked to it.

    2. When a Keycloak user account is deleted, the associated OBS access account is also deleted.

    NOTE: to synchronize OBS user rights from Keycloak you then need to call the endpoint
    */storage/account/{user}/update*.

    ### Returns:
    JSONResponse — Always a success message saying that the synchronization algorithm of the accounts started.
    """

    # NOTE: this endpoint sets a flag to initiate a background task (`main_osam_task`) that performs the account linking
    # logic between Keycloak and the Object Storage Access Manager (OSAM). It doesn't wait for a completion signal
    # from the background task and returns a success response.

    # Check that the user has the right role for this endpoint
    auth_validation(request)

    # Trigger the background task. This was also requested by the operations team: the endpoint should return
    # immediately to the user without waiting for the algorithm to complete.
    app.extra["users_sync_trigger"].set()
    return JSONResponse(
        status_code=HTTP_200_OK,
        content="The algorithm for updating the Keycloak and OVH accounts has been initiated. "
        "The process duration may vary depending on the number of accounts to be updated.",
    )

get_obs_user_rights(request, user) async

This endpoint is called by an RS operator with the rs_osam_update role. It returns the S3 Object Storage (OBS) rights of any user, calculated from their associated Keycloak account.

When called, this endpoint will:

  1. Read the user's roles from their Keycloak account.

  2. Calculate the associated OBS access policy rights: they describe the buckets, paths, and permission levels (such as read, write and download) that the user has access to.

  3. Return the access policy in the OBS JSON format, without applying them to the OBS user account.

Args

user (str) — The Keycloak username for which the access policy should be returned.

Returns

JSONResponse — The computed OBS access policy for the user.

Raises

404 — If the user does not exist in Keycloak.

Source code in docs/rs-server/services/osam/rs_server_osam/main.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
@router.get("/storage/account/{user}/rights", include_in_schema=False)
async def get_obs_user_rights(request: Request, user: str):
    """
    This endpoint is called by an RS operator with the *rs_osam_update* role. It returns the S3 Object Storage (OBS)
    rights of any user, calculated from their associated Keycloak account.

    When called, this endpoint will:

    1. Read the user's roles from their Keycloak account.

    2. Calculate the associated OBS access policy rights: they describe the buckets, paths, and permission levels
    (such as read, write and download) that the user has access to.

    3. Return the access policy in the OBS JSON format, without applying them to the OBS user account.

    ### Args
    user (str) — The Keycloak username for which the access policy should be returned.

    ### Returns
    JSONResponse — The computed OBS access policy for the user.

    ### Raises
    404 — If the user does not exist in Keycloak.
    """
    # Check that the user has the right role for this endpoint
    auth_validation(request)

    logger.debug("Endpoint for getting the user rights")
    output = __get_user_rights(user)
    if not output:
        raise HTTPException(
            HTTP_404_NOT_FOUND,
            f"User '{user}' does not exist in keycloak. Try to call '/storage/accounts/update' first.",
        )
    return JSONResponse(status_code=HTTP_200_OK, content=json.loads(json.dumps(output)))

get_storage_configuration()

This endpoint returns the bucket configuration configmap. This is used by different services in different namespaces.

This endpoint reads the CSV-based configuration file stored in Object Storage and returns it as a JSON array of arrays. Each inner array represents a row of the configuration file. If the configuration file is missing or cannot be read, an error response is returned.

Returns

list[list[str]] — The parsed configuration file as a JSON array.

Raises

404 — If the configuration file does not exist.
500 — If an unexpected error occurs while reading the file

Source code in docs/rs-server/services/osam/rs_server_osam/main.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
@technical_router.get("/internal/configuration", include_in_schema=False)
def get_storage_configuration() -> list[list[str]]:
    """
    This endpoint returns the bucket configuration configmap.
    This is used by different services in different namespaces.

    This endpoint reads the CSV-based configuration file stored in Object Storage
    and returns it as a JSON array of arrays. Each inner array represents a
    row of the configuration file. If the configuration file is missing or
    cannot be read, an error response is returned.

    ### Returns
    list[list[str]] — The parsed configuration file as a JSON array.

    ### Raises
    404 — If the configuration file does not exist.<br>
    500 — If an unexpected error occurs while reading the file
    """

    try:
        return load_configmap_data()
    except FileNotFoundError as e:
        raise HTTPException(status_code=404, detail=str(e)) from e
    except RuntimeError as e:
        raise HTTPException(status_code=500, detail=str(e)) from e

get_your_s3_credentials(request) async

This endpoint is called by any anthenticated user. It returns your personnal S3 credentials, so you can connect to the bucket where your products have been generated.

Returns

dict — A dictionary containing 'access_key', 'secret_key', 'endpoint', 'region' for the user's S3 storage.

Source code in docs/rs-server/services/osam/rs_server_osam/main.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
@router.get("/storage/account/credentials")
async def get_your_s3_credentials(request: Request) -> dict:
    """
    This endpoint is called by any anthenticated user.
    It returns your personnal S3 credentials, so you can connect to the bucket where your products have been generated.

    ### Returns
    dict — A dictionary containing 'access_key', 'secret_key', 'endpoint', 'region' for the user's S3 storage.
    """
    # In local mode, just return the common bucket credentials.
    if settings.LOCAL_MODE:
        return {
            "access_key": os.environ["S3_ACCESSKEY"],
            "secret_key": os.environ["S3_SECRETKEY"],
            "endpoint": os.environ["S3_ENDPOINT"],
            "region": os.environ["S3_REGION"],
        }

    # Cluster mode
    try:
        user_login = request.state.user_login
    except AttributeError as exc:
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Authorization information is missing",
        ) from exc
    logger.info(f"Getting ovh s3 credentials for keycloak user {user_login}")
    return get_user_s3_credentials(user_login)

main_osam_task(timeout=60)

Asynchronous background task that periodically links RS-Python users to observation users.

This function continuously waits for either a shutdown signal or an external trigger (users_sync_trigger) to perform synchronization of Keycloak user attributes using link_rspython_users_and_obs_users(). The loop exits gracefully on shutdown signal.

Parameters:

Name Type Description Default
timeout int

Number of seconds to wait before checking for shutdown or trigger events. Defaults to 60 seconds.

60

Returns:

Type Description

None

Raises:

Type Description
RuntimeError

This function does not explicitly raise RuntimeError, but any internal failure is logged, and the task continues unless a shutdown signal is received.

Source code in docs/rs-server/services/osam/rs_server_osam/main.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def main_osam_task(timeout: int = 60):
    """
    Asynchronous background task that periodically links RS-Python users to observation users.

    This function continuously waits for either a shutdown signal or an external trigger (`users_sync_trigger`)
    to perform synchronization of Keycloak user attributes using `link_rspython_users_and_obs_users()`.
    The loop exits gracefully on shutdown signal.

    Args:
        timeout (int, optional): Number of seconds to wait before checking for shutdown or trigger events.
                                 Defaults to 60 seconds.

    Returns:
        None

    Raises:
        RuntimeError: This function does not explicitly raise `RuntimeError`, but any internal failure
                      is logged, and the task continues unless a shutdown signal is received.
    """
    logger.info("Starting the main background thread ")
    logger.info(f"Timeout {timeout} for triggering the sync of keycloak and ovh accounts is disabled")
    while True:
        try:
            # Wait for either the trigger action (from endpoint) or the timeout before starting the refresh process
            # for getting attributes from keycloack
            # Later Edit: The timeout was disabled because of the request from ops team
            # if this is wanted later, just add `timeout=timeout` as input param in wait()
            triggered = app.extra["users_sync_trigger"].wait()
            if app.extra["shutdown_event"].is_set():  # If shutting down, exit loop
                logger.info("Shutting down background thread and exit")
                break

            # If triggered manually (i.e. by calling .set() and not by the timeout), prepare for the next one
            if triggered:
                logger.debug("Releasing users_sync_trigger")
                app.extra["users_sync_trigger"].clear()

            logger.info("Starting the sync process between keycloak accounts and ovh accounts")

            link_rspython_users_and_obs_users()
            app.extra["users_info"] = build_users_data_map()

            logger.info("Sync process finished")

        except Exception as e:  # pylint: disable=broad-exception-caught
            # Handle cancellation properly even for asyncio.CancelledError (for example when FastAPI shuts down)
            logger.exception(f"Handle cancellation: {e}")
            # let's continue
    logger.info("Exiting from the getting keycloack attributes thread !")

update_obs_user_rights(request, user) async

This endpoint is called by an RS operator with the rs_osam_update role. It updates the S3 Object Storage (OBS) rights of any user, calculated from their associated Keycloak account.

When called, this endpoint will:

  1. Read the user's roles from their Keycloak account.

  2. Calculate the associated OBS access policy rights: they describe the buckets, paths, and permission levels (such as read, write and download) that the user has access to.

  3. Apply the access policy to the user's OBS account.

The operation ensures that the user's OBS permissions match their Keycloak permissions.

Args

user (str) — The Keycloak username for which the access policy should be applied.

Returns

JSONResponse — A JSON response confirming that the access policy has been applied.

Raises

404 — If the user does not exist in Keycloak.
400 — If the policy could not be applied by the Object Storage provider.

Source code in docs/rs-server/services/osam/rs_server_osam/main.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
@router.post("/storage/account/{user}/update")
async def update_obs_user_rights(request: Request, user: str):
    """
    This endpoint is called by an RS operator with the *rs_osam_update* role. It updates the S3 Object Storage (OBS)
    rights of any user, calculated from their associated Keycloak account.

    When called, this endpoint will:

    1. Read the user's roles from their Keycloak account.

    2. Calculate the associated OBS access policy rights: they describe the buckets, paths, and permission levels
    (such as read, write and download) that the user has access to.

    3. Apply the access policy to the user's OBS account.

    The operation ensures that the user's OBS permissions match their Keycloak permissions.

    ### Args
    user (str) — The Keycloak username for which the access policy should be applied.

    ### Returns
    JSONResponse — A JSON response confirming that the access policy has been applied.

    ### Raises
    404 — If the user does not exist in Keycloak.<br>
    400 — If the policy could not be applied by the Object Storage provider.
    """
    # Check that the user has the right role for this endpoint
    auth_validation(request)

    logger.debug("Endpoint for applying the user access policy")
    current_rights = __get_user_rights(user)
    if not current_rights:
        raise HTTPException(
            HTTP_404_NOT_FOUND,
            f"User '{user}' does not exist in keycloak. Try to call '/storage/accounts/update' first.",
        )
    status_code = HTTP_200_OK
    result, msg = apply_user_access_policy(user, json.dumps(current_rights))
    if not result:
        status_code = HTTP_400_BAD_REQUEST
    return JSONResponse(status_code=status_code, content=msg)