Skip to content

rs_server_common/authentication/token_auth.md

<< Back to index

Authentication token for the staging.

TokenAuth

Bases: AuthBase

Custom authentication class

Parameters:

Name Type Description Default
AuthBase ABC

Base auth class

required
Source code in docs/rs-server/services/common/rs_server_common/authentication/token_auth.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class TokenAuth(AuthBase):
    """Custom authentication class

    Args:
        AuthBase (ABC): Base auth class
    """

    def __init__(self, token: str):
        """Init token auth

        Args:
            token (str): Token value
        """
        self.token = token

    def __call__(self, request: Request):  # type: ignore
        """Add the Authorization header to the request

        Args:
            request (Request): request to be modified

        Returns:
            Request: request with modified headers
        """
        request.headers["Authorization"] = f"Bearer {self.token}"  # type: ignore
        return request

    def __repr__(self) -> str:
        return "RSPY Token handler"

__call__(request)

Add the Authorization header to the request

Parameters:

Name Type Description Default
request Request

request to be modified

required

Returns:

Name Type Description
Request

request with modified headers

Source code in docs/rs-server/services/common/rs_server_common/authentication/token_auth.py
84
85
86
87
88
89
90
91
92
93
94
def __call__(self, request: Request):  # type: ignore
    """Add the Authorization header to the request

    Args:
        request (Request): request to be modified

    Returns:
        Request: request with modified headers
    """
    request.headers["Authorization"] = f"Bearer {self.token}"  # type: ignore
    return request

__init__(token)

Init token auth

Parameters:

Name Type Description Default
token str

Token value

required
Source code in docs/rs-server/services/common/rs_server_common/authentication/token_auth.py
76
77
78
79
80
81
82
def __init__(self, token: str):
    """Init token auth

    Args:
        token (str): Token value
    """
    self.token = token

TokenDataNotFound

Bases: HTTPException

Raised if there are missing data in the dictionary to handle information about the token

Source code in docs/rs-server/services/common/rs_server_common/authentication/token_auth.py
64
65
class TokenDataNotFound(HTTPException):
    """Raised if there are missing data in the dictionary to handle information about the token"""

__request_token(external_auth_config, data_to_send)

Subfunction of get_station_token. Request either access or refresh token.

Source code in docs/rs-server/services/common/rs_server_common/authentication/token_auth.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def __request_token(external_auth_config: StationExternalAuthenticationConfig, data_to_send: dict[str, str]):
    """
    Subfunction of get_station_token. Request either access or refresh token.
    """
    try:
        response = requests.post(
            external_auth_config.token_url,
            data=data_to_send,
            timeout=5,
            headers=prepare_headers(external_auth_config),
        )
    except requests.exceptions.RequestException as e:
        raise log_http_exception(
            status_code=HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Request to token endpoint failed: {str(e)}",
        ) from e

    # Check response status
    if response.status_code != HTTP_200_OK:
        raise log_http_exception(
            status_code=response.status_code,
            detail=f"Failed to get the token from the station {external_auth_config.station_id}. "
            f"Response from the station: {response.text or ''}",
        )

    return response.json()

get_station_token(external_auth_config, original_token_dict)

Retrieve and validate an authentication token for a specific station and service. Thee are two main use cases: - If the token shared variable is empty, it means that we don't have any token for now so we will retrieve one by requesting the authorisation server of the station - If the token shared variable is not empty, it means we already have a token. If it is still valid, we use it to request data to the resource server of the station. If it is not valid anymore, we use the refresh token to request a new token to the authorisation server

Parameters:

Name Type Description Default
external_auth_config StationExternalAuthenticationConfig

The configuration object loaded

required
token_var Variable

variable shared between all workers containing

required

Returns:

Name Type Description
str dict

The token as string.

Raises:

Type Description
HTTPException

If the external authentication configuration cannot be retrieved, if the token request fails, or if the token format is invalid.

Source code in docs/rs-server/services/common/rs_server_common/authentication/token_auth.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def get_station_token(external_auth_config: StationExternalAuthenticationConfig, original_token_dict: dict) -> dict:
    """
    Retrieve and validate an authentication token for a specific station and service.
    Thee are two main use cases:
        - If the token shared variable is empty, it means that we don't have any token for now
          so we will retrieve one by requesting the authorisation server of the station
        - If the token shared variable is not empty, it means we already have a token. If it
          is still valid, we use it to request data to the resource server of the station.
          If it is not valid anymore, we use the refresh token to request a new token to
          the authorisation server

    Args:
        external_auth_config (StationExternalAuthenticationConfig): The configuration object loaded
        from the rs-server.yaml file.
        token_var (dask.distributed.Variable): variable shared between all workers containing
        information of the current token used to request data on the current station

    Returns:
        str: The token as string.

    Raises:
        HTTPException: If the external authentication configuration cannot be retrieved,
                       if the token request fails, or if the token format is invalid.
    """
    if not external_auth_config:
        raise log_http_exception(
            status_code=HTTP_401_UNAUTHORIZED,
            detail="Failed to retrieve the configuration for the station token.",
        )
    token_dict = copy.deepcopy(original_token_dict)
    # If no tokens are yet registered, we ask the authorisation server to generate one by providing
    # an "authorisation grant" to the authorisation server
    validate_token_dict(token_dict, external_auth_config)
    current_date = datetime.datetime.now()

    nb_secs_before_token_exp = int(os.getenv("RSPY_TIME_BEFORE_ACCESS_TOKEN_EXPIRE", "60"))
    nb_secs_before_refresh_token_exp = int(os.getenv("RSPY_TIME_BEFORE_REFRESH_TOKEN_EXPIRE", "60"))

    # If we have no token yet, then we need one
    if not token_dict:
        get_token = True
        logger.info(
            f"""No existing token found -> fetching a new access token """
            f"""from station url: {external_auth_config.token_url}""",
        )

    # Else, check if the access token is expired
    else:
        access_age = (current_date - token_dict["access_token_creation_date"]).total_seconds()
        access_age += nb_secs_before_token_exp  # take a margin

        # We don't need a new token if the access token is young enough
        if access_age <= token_dict["expires_in"]:
            get_token = False

        # If the access token is too old, we also check the refresh token
        else:
            # If it's missing, then we need a new token
            if "refresh_expires_in" not in token_dict:
                get_token = True

            # Else we need a new token if the access and refresh token are both too old
            else:
                refresh_age = (current_date - token_dict["refresh_token_creation_date"]).total_seconds()
                refresh_age += nb_secs_before_refresh_token_exp  # take a margin
                get_token = refresh_age > token_dict["refresh_expires_in"]

        if get_token:
            logger.info(
                f"""Current access and refresh token expired -> fetching access token """
                f"""from station url: {external_auth_config.token_url}""",
            )

    # If necessary, get a new token using the authorisation grant
    if get_token:
        # Get the new token and add its creation date
        data_to_send = prepare_data(external_auth_config, call_refresh=False)
        token_dict.update(__request_token(external_auth_config, data_to_send))
        token_dict["access_token_creation_date"] = token_dict["refresh_token_creation_date"] = datetime.datetime.now()

        logger.info(f"Access token retrieved from the station url: {external_auth_config.token_url} ")
        # Validate the token variable and then update the shared token
        validate_token_dict(token_dict, external_auth_config)

    else:
        # Check that the token variable contains the mandatory elements
        validate_token_dict(token_dict, external_auth_config)

        # If the current token expires in less than one minute, create a new request to send
        # to the authorisation server with the refresh token given in the payload of the request
        current_date = datetime.datetime.now()
        diff_in_sec = (current_date - token_dict["access_token_creation_date"]).total_seconds()

        if diff_in_sec > token_dict["expires_in"] - nb_secs_before_token_exp:
            logger.info("Current access_token is about to expire. Launching request to refresh the token...")

            data_to_send = prepare_data(external_auth_config, call_refresh=True)
            data_to_send.update({"refresh_token": token_dict["refresh_token"]})

            # Refresh the token and add the creation date of the newly created token
            token_dict.update(__request_token(external_auth_config, data_to_send))
            token_dict["access_token_creation_date"] = datetime.datetime.now()

            # Validate the new token dictionary and update the shared token variable with this dictionary
            validate_token_dict(token_dict, external_auth_config)
            logger.info("Access token has been successfully refreshed !")

    return token_dict

log_http_exception(*args, **kwargs)

Log error and return an HTTP exception to be raised by the caller

Source code in docs/rs-server/services/common/rs_server_common/authentication/token_auth.py
44
45
46
def log_http_exception(*args, **kwargs) -> type[HTTPException]:
    """Log error and return an HTTP exception to be raised by the caller"""
    return utils2.log_http_exception(logger, *args, **kwargs)

prepare_data(external_auth_config, call_refresh)

Prepare data for token requests based on authentication configuration.

Parameters:

Name Type Description Default
external_auth_config StationExternalAuthenticationConfig

Configuration object containing authentication details.

required

Returns:

Type Description
dict[str, str]

Dict[str, str]: Dictionary containing the prepared data for the request.

Source code in docs/rs-server/services/common/rs_server_common/authentication/token_auth.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def prepare_data(external_auth_config: StationExternalAuthenticationConfig, call_refresh: bool) -> dict[str, str]:
    """Prepare data for token requests based on authentication configuration.

    Args:
        external_auth_config (StationExternalAuthenticationConfig): Configuration object containing
            authentication details.

    Returns:
        Dict[str, str]: Dictionary containing the prepared data for the request.
    """
    data_to_send = {"client_id": external_auth_config.client_id, "client_secret": external_auth_config.client_secret}
    if call_refresh:
        data_to_send["grant_type"] = "refresh_token"
    else:
        data_to_send.update(
            {
                "grant_type": external_auth_config.grant_type,
                "username": external_auth_config.username,
                "password": external_auth_config.password,
            },
        )
        if external_auth_config.scope:
            data_to_send["scope"] = external_auth_config.scope

    return data_to_send

prepare_headers(external_auth_config)

Prepare HTTP headers for token requests.

Parameters:

Name Type Description Default
external_auth_config StationExternalAuthenticationConfig

Configuration object containing authentication details.

required

Returns:

Type Description
dict[str, str]

Dict[str, str]: Dictionary containing the prepared headers.

Source code in docs/rs-server/services/common/rs_server_common/authentication/token_auth.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def prepare_headers(external_auth_config: StationExternalAuthenticationConfig) -> dict[str, str]:
    """Prepare HTTP headers for token requests.

    Args:
        external_auth_config (StationExternalAuthenticationConfig): Configuration object containing
            authentication details.

    Returns:
        Dict[str, str]: Dictionary containing the prepared headers.
    """
    headers = {"Content-Type": HEADER_CONTENT_TYPE}
    # Add Authorization header if it exists
    if external_auth_config.authorization:
        headers["Authorization"] = external_auth_config.authorization
    return headers

validate_token_dict(token_dict, config)

Check if the token variable contains the mandatory attributes

Parameters:

Name Type Description Default
token_dict Any
required
config StationExternalAuthenticationConfig

external_auth_config (StationExternalAuthenticationConfig): The configuration object loaded

required
token_dict Dict

dictionary containing information about the current token

required
Source code in docs/rs-server/services/common/rs_server_common/authentication/token_auth.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def validate_token_dict(token_dict: Any, config: StationExternalAuthenticationConfig):
    """
    Check if the token variable contains the mandatory attributes

    Args:
        token_dict (Any):
        config (StationExternalAuthenticationConfig):
          external_auth_config (StationExternalAuthenticationConfig): The configuration object loaded
        from the rs-server.yaml file.
        token_dict (Dict): dictionary containing information about the current token
        information of the current token used to request data on the current station
    """
    if not token_dict:
        return

    for attr in MANDATORY_TOKEN_ATTRS:
        if attr not in token_dict:
            raise log_http_exception(
                HTTP_500_INTERNAL_SERVER_ERROR,
                f"Mandatory attribute {attr} is not defined in the token variable "
                f"of the station {config.station_id}!",
                None,
                TokenDataNotFound,
            )
        if not token_dict[attr]:
            raise log_http_exception(
                HTTP_500_INTERNAL_SERVER_ERROR,
                f"Token variable attribute {attr} of the station {config.station_id} is None !",
                None,
                TokenDataNotFound,
            )
    for attr in "access_token", "refresh_token":
        validate_token_format(attr)

validate_token_format(token)

Validate the format of a given token.

Parameters:

Name Type Description Default
token str

The token string to be validated.

required

Raises:

Type Description
HTTPException

If the token format does not match the expected pattern.

Source code in docs/rs-server/services/common/rs_server_common/authentication/token_auth.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def validate_token_format(token: str) -> None:
    """Validate the format of a given token.

    Args:
        token (str): The token string to be validated.

    Raises:
        HTTPException: If the token format does not match the expected pattern.
    """
    # Check if the token matches the expected format using a regular expression
    if not re.match(r"^[A-Za-z0-9\-_\.]+$", token):
        # Raise an HTTP exception if the token format is invalid
        raise log_http_exception(
            status_code=HTTP_400_BAD_REQUEST,
            detail="Invalid token format received from the station.",
        )