Skip to content

rs_server_common/authentication/authentication_to_external.md

<< Back to index

Authentication to external stations module.

ServiceNotFound

Bases: Exception

Raised if there are no existing service matching the provided domain

Source code in docs/rs-server/services/common/rs_server_common/authentication/authentication_to_external.py
37
38
class ServiceNotFound(Exception):
    """Raised if there are no existing service matching the provided domain"""

__read_configuration()

Read the rs-server configuration for authentication to extenal stations.

Returns:

Name Type Description
dict dict

A dictionary containing the configuration data

Source code in docs/rs-server/services/common/rs_server_common/authentication/authentication_to_external.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def __read_configuration() -> dict:  # pylint: disable=too-many-locals
    """
    Read the rs-server configuration for authentication to extenal stations.

    Returns:
        dict: A dictionary containing the configuration data
    """
    config_data: dict[str, Any] = {}

    # Read all the env vars. The pattern for all the env vars used is:
    # RSPY__TOKEN__<service>__<station>__<section_name>__<rest of the info for key>
    # Regular expression to match the pattern RSPY__TOKEN__<service>__<station>__<section>__<rest_of_the_key>
    pattern = r"^RSPY__TOKEN__([^__]+)__([^__]+)__([^__]+)(__.*)?$"

    # Iterate over all environment variables
    for var, value in os.environ.items():
        match = re.match(pattern, var)

        if match:
            # Extract service, station, section, and rest_of_key from the environment variable
            # Convert to lowercase for YAML formatting
            try:
                service, station, section, rest_of_key = (s.lower() if s else "" for s in match.groups())
            except ValueError:
                logger.warning(
                    f"The environment variable {var} does not contain enough values to be unpacked. "
                    "Disregarding this variable.",
                )
                continue

            # Initialize with mandatory fields the station entry if it doesn't exist
            rest_of_key = rest_of_key.strip("__").replace("__", "_") if rest_of_key else None
            station_data = config_data.setdefault(station, {"service": {"name": service}})
            # Initialize a variable for the final processed value
            processed_value: Any = value
            # Check if the value looks like a list
            if value.startswith("[") and value.endswith("]"):
                try:
                    processed_value = [
                        domain.strip(" \"'") for domain in value.strip("[]").split(",")  # Remove whitespace and quotes
                    ]
                except Exception as e:  # pylint: disable=broad-except
                    logger.error(f"Failed to parse list value for {var}: {value}. Error: {e}")
                    raise RuntimeError(f"Failed to parse list value for {var}: {value}. Error: {e}") from e

            if rest_of_key:
                section_data = station_data.setdefault(section, {})
                section_data[rest_of_key] = processed_value
            else:
                station_data[section] = processed_value

    return {"external_data_sources": config_data}

load_external_auth_config(station_id=None, service=None, domain=None)

Load the external authentication configuration from a given station and service or from a domain. Args: station_id (Optional[str]): The ID of the station for which the authorization token is set. service (Optional[str]): The service name ("auxip" or "cadip") used to retrieve the token. domain (Optional[str]): The domain related to the station for the token.

Raises:

Type Description
ValueError

If the station_id is None or an empty string.

Exception

If token retrieval fails for any reason, a general exception will be logged.

Source code in docs/rs-server/services/common/rs_server_common/authentication/authentication_to_external.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def load_external_auth_config(
    station_id: str | None = None,
    service: str | None = None,
    domain: str | None = None,
) -> StationExternalAuthenticationConfig | S3ExternalAuthenticationConfig:
    """
    Load the external authentication configuration from a given station and service or from a domain.
    Args:
        station_id (Optional[str]): The ID of the station for which the authorization token is set.
        service (Optional[str]): The service name ("auxip" or "cadip") used to retrieve the token.
        domain (Optional[str]): The domain related to the station for the token.

    Raises:
        ValueError: If the station_id is None or an empty string.
        Exception: If token retrieval fails for any reason, a general exception will be logged.
    """
    if station_id and service:
        ext_auth_config = load_external_auth_config_by_station_service(station_id.lower(), service)
    elif domain:
        ext_auth_config = load_external_auth_config_by_domain(domain)
    else:
        raise ValueError("Either station_id and service or domain must be provided.")

    if not ext_auth_config:
        raise HTTPException(
            status_code=HTTP_404_NOT_FOUND,
            detail="Failed to retrieve the configuration for the station token.",
        )
    return ext_auth_config

load_external_auth_config_by_domain(domain)

Load the external authentication configuration based on the domain from a YAML file.

Parameters:

Name Type Description Default
domain str

The domain to search for in the authentication configuration.

required

Returns:

Type Description
StationExternalAuthenticationConfig | S3ExternalAuthenticationConfig | None

Optional[StationExternalAuthenticationConfig | S3ExternalAuthenticationConfig]: An object representing the

StationExternalAuthenticationConfig | S3ExternalAuthenticationConfig | None

external authentication configuration, with a variation if it is for a regular external station or an external

StationExternalAuthenticationConfig | S3ExternalAuthenticationConfig | None

S3 bucket, or None if the station or service is not found or if an error occurs.

Source code in docs/rs-server/services/common/rs_server_common/authentication/authentication_to_external.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def load_external_auth_config_by_domain(
    domain: str,
) -> StationExternalAuthenticationConfig | S3ExternalAuthenticationConfig | None:
    """
    Load the external authentication configuration based on the domain from a YAML file.

    Args:
        domain (str): The domain to search for in the authentication configuration.

    Returns:
        Optional[StationExternalAuthenticationConfig | S3ExternalAuthenticationConfig]: An object representing the
        external authentication configuration, with a variation if it is for a regular external station or an external
        S3 bucket, or None if the station or service is not found or if an error occurs.
    """

    # Iterate through the external data sources in the configuration
    for station_id, station_dict in CONFIGURATION.get("external_data_sources", {}).items():
        if station_dict.get("domain") == domain:
            return create_external_auth_config(station_id, station_dict, station_dict.get("service", {}))

    # Return an exception if there are no matching services for the given domain
    raise ServiceNotFound(f"No matching service found for domain: {domain}")

load_external_auth_config_by_station_service(station_id, service=None)

Load the external authentication configuration for a given station and service from a YAML file.

Parameters:

Name Type Description Default
station_id str

The ID of the station for which the authentication config is being loaded.

required
service str

The name of the service ("auxip" or "cadip") to load the authentication configuration for.

None

Returns:

Type Description
StationExternalAuthenticationConfig | S3ExternalAuthenticationConfig | None

Optional[StationExternalAuthenticationConfig | S3ExternalAuthenticationConfig]: An object representing the

StationExternalAuthenticationConfig | S3ExternalAuthenticationConfig | None

external authentication configuration, with a variation if it is for a regular external station or an external

StationExternalAuthenticationConfig | S3ExternalAuthenticationConfig | None

S3 bucket, or None if the station or service is not found or if an error occurs.

Source code in docs/rs-server/services/common/rs_server_common/authentication/authentication_to_external.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def load_external_auth_config_by_station_service(
    station_id: str,
    service: str | None = None,
) -> StationExternalAuthenticationConfig | S3ExternalAuthenticationConfig | None:
    """
    Load the external authentication configuration for a given station and service from a YAML file.

    Args:
        station_id (str): The ID of the station for which the authentication config is being loaded.
        service (str): The name of the service ("auxip" or "cadip") to load the authentication configuration for.

    Returns:
        Optional[StationExternalAuthenticationConfig | S3ExternalAuthenticationConfig]: An object representing the
        external authentication configuration, with a variation if it is for a regular external station or an external
        S3 bucket, or None if the station or service is not found or if an error occurs.

    """

    # check if the station_id has 'session', this is a particular case for cadip
    raw_station_id = station_id.replace("_session", "")

    # Retrieve station and service details from the YAML config
    station_dict = CONFIGURATION.get("external_data_sources", {}).get(raw_station_id, {})
    service_dict = station_dict.get("service", {})

    # Validate that the service name matches
    try:
        if service and service_dict.get("name") != service:
            logger.warning(f"No matching service found for station_id: {raw_station_id} and service: {service}")
            return None
    except KeyError:
        logger.exception("Failed to check the service name from configuration")
        return None

    # Create and return the ExternalAuthenticationConfig object
    return create_external_auth_config(station_id, station_dict, service_dict)