Skip to content

rs_workflows/operation/osam_flows.md

<< Back to index

OSAM flow implementation

OSAMRequestError

Bases: Exception

Raised when OSAM returns an unexpected HTTP status.

Source code in docs/rs-client-libraries/rs_workflows/operation/osam_flows.py
32
33
class OSAMRequestError(Exception):
    """Raised when OSAM returns an unexpected HTTP status."""

OSAMUserNotFoundError

Bases: Exception

Raised when the OSAM user does not exist.

Source code in docs/rs-client-libraries/rs_workflows/operation/osam_flows.py
28
29
class OSAMUserNotFoundError(Exception):
    """Raised when the OSAM user does not exist."""

create_rights_artifact(rights, username) async

Register the JSON with OBS rights.

Source code in docs/rs-client-libraries/rs_workflows/operation/osam_flows.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@task(name="create artifact with JSON OBS rights")
async def create_rights_artifact(rights: dict, username: str) -> None:
    """
    Register the JSON with OBS rights.

    Args:
        rights (dict)
        username (str)
    """
    pretty_json = json.dumps(rights, indent=2, ensure_ascii=False)
    markdown_report = f"""
## Object Storage rights for **{username}**

```json
{pretty_json}
"""
    await acreate_markdown_artifact(key="rights", markdown=markdown_report, description="session staging output")

osam_synchronize_accounts(env=FlowEnvArgs(owner_id='operator-osam')) async

Synchronize keycloak and object storage accounts.

Parameters:

Name Type Description Default
env FlowEnvArgs

user account that call the flow

FlowEnvArgs(owner_id='operator-osam')

Raises:

Type Description
OSAMRequestError

error HTTP status error

Source code in docs/rs-client-libraries/rs_workflows/operation/osam_flows.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@flow(
    name="OSAM synchronize accounts",
    description="Synchronize keycloak and object storage accounts.",
    log_prints=True,
    validate_parameters=True,
)
async def osam_synchronize_accounts(env: FlowEnvArgs = FlowEnvArgs(owner_id="operator-osam")) -> None:
    """
    Synchronize keycloak and object storage accounts.

    Args:
        env (FlowEnvArgs): user account that call the flow

    Raises:
        OSAMRequestError: error HTTP status error
    """
    print("Synchronize keycloak and object storage accounts.")

    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "OSAM-synchronize-accounts"):
        rs_server_href = os.getenv("RSPY_WEBSITE")
        request_url = f"{rs_server_href}/storage/accounts/update"
        test = flow_env.rs_client.apikey_headers
        print(f"Call request: {request_url} with {test} ")
        response = requests.post(request_url, **flow_env.rs_client.apikey_headers, timeout=30)
        if response.status_code != 200:
            raise OSAMRequestError(
                f"❌ Unexpected HTTP status {response.status_code} while synchronising accounts ({response.text}).",
            )
        print("✔️ The synchronization process is now running. Allow a few minutes before reviewing the changes.")

osam_update_user(user_name, env=FlowEnvArgs(owner_id='operator-osam')) async

Flow that update a single OBS account.

Parameters:

Name Type Description Default
env FlowEnvArgs

account that call the flow

FlowEnvArgs(owner_id='operator-osam')
user_name str

account to be updated

required
Source code in docs/rs-client-libraries/rs_workflows/operation/osam_flows.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
@flow(name="OSAM update account", log_prints=True, validate_parameters=True)
async def osam_update_user(user_name: str, env: FlowEnvArgs = FlowEnvArgs(owner_id="operator-osam")):
    """
    Flow that update a single OBS account.

    Args:
        env (FlowEnvArgs): account that call the flow
        user_name (str): account to be updated
    """
    task_run_ctx = TaskRunContext.get()
    if task_run_ctx is not None:
        task_run_ctx.task_run.name = f"📦Update Object Storage rights for user '{user_name}'"
    print("Start update OSAM user rights.")

    # Initialize flow environment and telemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "OSAM-update-user"):

        # Retrieve the RS server URL from the environment variable
        rs_server_href = os.getenv("RSPY_WEBSITE")
        request_url = f"{rs_server_href}/storage/account/{user_name}/update"
        print(f"Call request: {request_url}")
        response = requests.post(request_url, **flow_env.rs_client.apikey_headers, timeout=30)

        if response.status_code == 404:
            raise OSAMUserNotFoundError(f"❌ User '{user_name}' does not exist in OSAM (HTTP 404): {response.text} .")

        if response.status_code != 200:
            raise OSAMRequestError(
                f"❌ Unexpected HTTP status {response.status_code} while updating user '{user_name}': {response.text}.",
            )
        print(f"✔️ Rights for user '{user_name}' successfully applied.")

        print("Regiser the new rights...")
        # Make the request for user's access rights
        request_url = f"{rs_server_href}/storage/account/{user_name}/rights"
        print(f"Call request: {request_url}")
        response = requests.get(request_url, **flow_env.rs_client.apikey_headers, timeout=30)

        if response.status_code != 200:
            raise OSAMRequestError(
                f"❌ Failed to retrieve rights for '{user_name}' (HTTP {response.status_code} {response.text}).",
            )
        rights = response.json()
        await create_rights_artifact(rights, user_name)  # type: ignore