Skip to content

Common

Authentication functions implementation.

auth_validation(station_type, access_type, *args, **kwargs)

Function called by auth_validator

Source code in docs/rs-server/services/common/rs_server_common/authentication/authentication.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def auth_validation(station_type, access_type, *args, **kwargs):  # pylint: disable=unused-argument
    """Function called by auth_validator"""

    # In local mode, there is no authentication to check
    if settings.LOCAL_MODE:
        return

    if not kwargs.get("staging_process", False):
        # Read the full cadip station passed in parameter: ins, mps, mti, nsg, sgs, or cadip
        # No validation needed for landing pages.
        if access_type != "landing_page":
            full_station = f'{"cadip" if station_type == "cadip" else "adgs"}_{kwargs["station"]}'
        else:
            full_station = station_type
        requested_role = f"rs_{full_station}_{access_type}".upper()
    else:
        requested_role = f"rs_processes_{access_type}_{station_type}".upper()

    logger.debug(f"Requested role: {requested_role}")
    try:
        auth_roles = [role.upper() for role in kwargs["request"].state.auth_roles]
    except KeyError:
        auth_roles = []
    logger.debug(f"Auth roles: {auth_roles}")
    if requested_role not in auth_roles:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Missing {requested_role} authorization role",
        )

auth_validator(station, access_type)

Decorator to validate API key access or oauth2 authentication (keycloak) for a specific station and access type.

This decorator checks if the authorization contains the necessary role to access the specified station with the specified access type.

Parameters:

Name Type Description Default
station str

The name of the station, either "adgs" or "cadip".

required
access_type str

The type of access, such as "download" or "read".

required

Raises:

Type Description
HTTPException

If the authorization does not include the required role to access the specified station with the specified access type.

Returns:

Name Type Description
function Callable

The decorator function.

Source code in docs/rs-server/services/common/rs_server_common/authentication/authentication.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def auth_validator(station, access_type):
    """Decorator to validate API key access or oauth2 authentication (keycloak) for a specific station and access type.

    This decorator checks if the authorization contains the necessary role to access
    the specified station with the specified access type.

    Args:
        station (str): The name of the station, either "adgs" or "cadip".
        access_type (str): The type of access, such as "download" or "read".

    Raises:
        HTTPException: If the authorization does not include the required role
            to access the specified station with the specified access type.

    Returns:
        function (Callable): The decorator function.
    """

    def decorator(func):
        @contextmanager
        def wrapping_logic(*args, **kwargs):
            auth_validation(station, access_type, *args, **kwargs)
            yield

        # Decorator for both sync and async functions
        return utils2.decorate_sync_async(wrapping_logic, func)

    return decorator

authenticate(request, apikey_value='') async

FastAPI Security dependency for the cluster mode. Check the api key validity, passed as an HTTP header, or that the user is authenticated with oauth2 (keycloak).

Parameters:

Name Type Description Default
apikey_value Security

API key passed in HTTP header

''

Returns:

Type Description
AuthInfo

Tuple of (IAM roles, config, user login) information from the keycloak account, associated to the api key

AuthInfo

or the user oauth2 account.

Source code in docs/rs-server/services/common/rs_server_common/authentication/authentication.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
async def authenticate(
    request: Request,
    apikey_value: Annotated[str, Security(APIKEY_AUTH_HEADER)] = "",
) -> AuthInfo:
    """
    FastAPI Security dependency for the cluster mode. Check the api key validity, passed as an HTTP header,
    or that the user is authenticated with oauth2 (keycloak).

    Args:
        apikey_value (Security): API key passed in HTTP header

    Returns:
        Tuple of (IAM roles, config, user login) information from the keycloak account, associated to the api key
        or the user oauth2 account.
    """

    # If the request comes from the stac browser
    if settings.request_from_stacbrowser(request):

        # With the stac browser, we don't use either api key or oauth2.
        # It passes an authorization token in a specific header.
        if token := request.headers.get("authorization"):
            issuer, key = await get_issuer_and_public_key()
            if token.startswith("Bearer "):
                token = token[7:]  # remove the "Bearer " header

            # Decode the token
            userinfo = jwt.decode(
                token,
                key=key,
                issuer=issuer,
                audience=os.environ["OIDC_CLIENT_ID"],
                algorithms=["RS256"],
            )

            # The result contains the auth roles we need, but still get them from keycloak
            # so we are sure to have the same behaviour than with the apikey and oauth2
            kc_info = oauth2.KCUTIL.get_user_info(userinfo.get("sub"))

            user_login = userinfo.get("preferred_username")
            if not kc_info.is_enabled:
                raise HTTPException(
                    # Don't use 401 or the stac browser will try to connect to this endpoint again and this will loop
                    status_code=status.HTTP_400_BAD_REQUEST,
                    detail=f"User {user_login!r} is disabled from KeyCloak.",
                )

            # The configuration dict is only set with the API key
            auth_info = AuthInfo(user_login=user_login, iam_roles=kc_info.roles, apikey_config={})

        else:
            # Else, return an "unauthorized" error to force the browser to authenticate
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Authentication needed from the STAC browser",
            )

    # Not from the stac browser
    else:
        # Try to authenticate with the api key value
        auth_info = await apikey_security(apikey_value)

        # Else try to authenticate with oauth2
        if not auth_info:
            auth_info = await oauth2.get_user_info(request)

        if not auth_info:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Not authenticated",
            )

    # Save information in the request state and return it
    request.state.user_login = auth_info.user_login
    request.state.auth_roles = auth_info.iam_roles
    request.state.auth_config = auth_info.apikey_config
    return authenticate_from_pytest(auth_info) if FROM_PYTEST else auth_info

authenticate_from_pytest(auth_info)

'authenticate' function called from pytest.

Source code in docs/rs-server/services/common/rs_server_common/authentication/authentication.py
41
42
43
def authenticate_from_pytest(auth_info: AuthInfo) -> AuthInfo:
    """'authenticate' function called from pytest."""
    return auth_info

get_issuer_and_public_key() async

Get issuer URL from OIDC environment, and public key from the issuer.

Source code in docs/rs-server/services/common/rs_server_common/authentication/authentication.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@cached(cache=TTLCache(maxsize=1, ttl=24 * 3600))  # cache the results for n seconds, they should not change often
async def get_issuer_and_public_key() -> tuple[str, str]:
    """Get issuer URL from OIDC environment, and public key from the issuer."""

    # Read environment variables
    oidc_endpoint = os.environ["OIDC_ENDPOINT"]
    oidc_realm = os.environ["OIDC_REALM"]
    oidc_metadata_url = f"{oidc_endpoint}/realms/{oidc_realm}/.well-known/openid-configuration"

    response = await settings.http_client().get(oidc_metadata_url)
    issuer = response.json()["issuer"]
    response = await settings.http_client().get(issuer)
    public_key = response.json()["public_key"]

    key = "-----BEGIN PUBLIC KEY-----\n" + public_key + "\n-----END PUBLIC KEY-----"
    return (issuer, key)

Database connection.

Taken from: https://praciano.com.br/fastapi-and-async-sqlalchemy-20-with-pytest-done-right.html

DatabaseSessionManager

Database session configuration.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class DatabaseSessionManager:
    """Database session configuration."""

    lock = Lock()
    multiprocessing_lock = multiprocessing.Lock()

    def __init__(self):
        """Create a Database session configuration."""
        self._engine: Engine | None = None
        self._sessionmaker: sessionmaker | None = None

    @classmethod
    def url(cls):
        """Get database connection URL."""
        try:
            # pylint: disable=consider-using-f-string
            return os.getenv(
                "POSTGRES_URL",
                "postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}".format(
                    user=os.environ["POSTGRES_USER"],
                    password=os.environ["POSTGRES_PASSWORD"],
                    host=os.environ["POSTGRES_HOST"],
                    port=os.environ["POSTGRES_PORT"],
                    dbname=os.environ["POSTGRES_DB"],
                ),
            )
        except KeyError as key_error:
            raise KeyError(
                "The PostgreSQL environment variables are missing: "
                "POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB",
            ) from key_error

    def open_session(self, url: str = ""):
        """Open database session."""

        # If the 'self' object is used by several threads in the same process,
        # make sure to initialize the session only once.
        with DatabaseSessionManager.lock:
            if (self._engine is None) or (self._sessionmaker is None):
                self._engine = create_engine(url or self.url(), poolclass=NullPool, pool_pre_ping=True)
                self._sessionmaker = sessionmaker(autocommit=False, autoflush=False, bind=self._engine)

                try:
                    # Create all tables.
                    # Warning: this only works if the database table modules have been imported
                    # e.g. import rs_server_adgs.adgs_download_status
                    self.create_all()

                # It fails if the database is unreachable, but even in this case the engine and session are not None.
                # Set them to None so we will try to create all tables again on the next try.
                except Exception:
                    self.close()
                    raise

    def close(self):
        """Close database session."""
        if self._engine is not None:
            self._engine.dispose()
            self._engine = None
        self._sessionmaker = None

    @contextlib.contextmanager
    def connect(self) -> Iterator[Connection]:
        """Open new database connection instance."""

        if self._engine is None:
            raise RuntimeError("DatabaseSessionManager is not initialized")

        with self._engine.begin() as connection:
            try:
                yield connection

            # In case of any exception, rollback connection and re-raise into HTTP exception
            except Exception as exception:  # pylint: disable=broad-exception-caught
                connection.rollback()
                self.reraise_http_exception(exception)

    @contextlib.contextmanager
    def session(self) -> Iterator[Session]:
        """Open new database session instance."""

        if self._sessionmaker is None:
            raise RuntimeError("DatabaseSessionManager is not initialized")

        session = self._sessionmaker()
        try:
            yield session

        # In case of any exception, rollback session and re-raise into HTTP exception
        except Exception as exception:  # pylint: disable=broad-exception-caught
            session.rollback()
            self.reraise_http_exception(exception)

        # Close session when deleting instance.
        finally:
            session.close()

    @staticmethod
    def __filelock(func):
        """Avoid concurrent writing to the database using a file locK."""
        return filelock(func, "RSPY_WORKING_DIR")

    @__filelock
    def create_all(self):
        """Create all database tables."""
        with DatabaseSessionManager.multiprocessing_lock:  # Handle concurrent table creation by different processes
            Base.metadata.create_all(bind=self._engine)

    @__filelock
    def drop_all(self):
        """Drop all database tables."""
        with DatabaseSessionManager.multiprocessing_lock:  # Handle concurrent table creation by different processes
            Base.metadata.drop_all(bind=self._engine)

    @classmethod
    def reraise_http_exception(cls, exception: Exception):
        """Re-raise all exceptions into HTTP exceptions."""

        # Raised exceptions are not always printed in the console, so do it manually with the stacktrace.
        logger.error(traceback.format_exc())

        if isinstance(exception, StarletteHTTPException):
            raise exception
        raise HTTPException(status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail=repr(exception))

__filelock(func) staticmethod

Avoid concurrent writing to the database using a file locK.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
138
139
140
141
@staticmethod
def __filelock(func):
    """Avoid concurrent writing to the database using a file locK."""
    return filelock(func, "RSPY_WORKING_DIR")

__init__()

Create a Database session configuration.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
47
48
49
50
def __init__(self):
    """Create a Database session configuration."""
    self._engine: Engine | None = None
    self._sessionmaker: sessionmaker | None = None

close()

Close database session.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
 95
 96
 97
 98
 99
100
def close(self):
    """Close database session."""
    if self._engine is not None:
        self._engine.dispose()
        self._engine = None
    self._sessionmaker = None

connect()

Open new database connection instance.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
@contextlib.contextmanager
def connect(self) -> Iterator[Connection]:
    """Open new database connection instance."""

    if self._engine is None:
        raise RuntimeError("DatabaseSessionManager is not initialized")

    with self._engine.begin() as connection:
        try:
            yield connection

        # In case of any exception, rollback connection and re-raise into HTTP exception
        except Exception as exception:  # pylint: disable=broad-exception-caught
            connection.rollback()
            self.reraise_http_exception(exception)

create_all()

Create all database tables.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
143
144
145
146
147
@__filelock
def create_all(self):
    """Create all database tables."""
    with DatabaseSessionManager.multiprocessing_lock:  # Handle concurrent table creation by different processes
        Base.metadata.create_all(bind=self._engine)

drop_all()

Drop all database tables.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
149
150
151
152
153
@__filelock
def drop_all(self):
    """Drop all database tables."""
    with DatabaseSessionManager.multiprocessing_lock:  # Handle concurrent table creation by different processes
        Base.metadata.drop_all(bind=self._engine)

open_session(url='')

Open database session.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def open_session(self, url: str = ""):
    """Open database session."""

    # If the 'self' object is used by several threads in the same process,
    # make sure to initialize the session only once.
    with DatabaseSessionManager.lock:
        if (self._engine is None) or (self._sessionmaker is None):
            self._engine = create_engine(url or self.url(), poolclass=NullPool, pool_pre_ping=True)
            self._sessionmaker = sessionmaker(autocommit=False, autoflush=False, bind=self._engine)

            try:
                # Create all tables.
                # Warning: this only works if the database table modules have been imported
                # e.g. import rs_server_adgs.adgs_download_status
                self.create_all()

            # It fails if the database is unreachable, but even in this case the engine and session are not None.
            # Set them to None so we will try to create all tables again on the next try.
            except Exception:
                self.close()
                raise

reraise_http_exception(exception) classmethod

Re-raise all exceptions into HTTP exceptions.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
155
156
157
158
159
160
161
162
163
164
@classmethod
def reraise_http_exception(cls, exception: Exception):
    """Re-raise all exceptions into HTTP exceptions."""

    # Raised exceptions are not always printed in the console, so do it manually with the stacktrace.
    logger.error(traceback.format_exc())

    if isinstance(exception, StarletteHTTPException):
        raise exception
    raise HTTPException(status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail=repr(exception))

session()

Open new database session instance.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
@contextlib.contextmanager
def session(self) -> Iterator[Session]:
    """Open new database session instance."""

    if self._sessionmaker is None:
        raise RuntimeError("DatabaseSessionManager is not initialized")

    session = self._sessionmaker()
    try:
        yield session

    # In case of any exception, rollback session and re-raise into HTTP exception
    except Exception as exception:  # pylint: disable=broad-exception-caught
        session.rollback()
        self.reraise_http_exception(exception)

    # Close session when deleting instance.
    finally:
        session.close()

url() classmethod

Get database connection URL.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@classmethod
def url(cls):
    """Get database connection URL."""
    try:
        # pylint: disable=consider-using-f-string
        return os.getenv(
            "POSTGRES_URL",
            "postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}".format(
                user=os.environ["POSTGRES_USER"],
                password=os.environ["POSTGRES_PASSWORD"],
                host=os.environ["POSTGRES_HOST"],
                port=os.environ["POSTGRES_PORT"],
                dbname=os.environ["POSTGRES_DB"],
            ),
        )
    except KeyError as key_error:
        raise KeyError(
            "The PostgreSQL environment variables are missing: "
            "POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB",
        ) from key_error

get_db()

Return a database session for FastAPI dependency injection.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
170
171
172
173
174
175
176
177
178
def get_db():
    """Return a database session for FastAPI dependency injection."""
    try:
        with sessionmanager.session() as session:
            yield session

    # Re-raise all exceptions into HTTP exceptions
    except Exception as exception:  # pylint: disable=broad-exception-caught
        DatabaseSessionManager.reraise_http_exception(exception)

Logging utility.

CustomFormatter

Bases: Formatter

Custom logging formatter with colored text. See: https://stackoverflow.com/a/56944256

Source code in docs/rs-server/services/common/rs_server_common/utils/logging.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class CustomFormatter(logging.Formatter):
    """
    Custom logging formatter with colored text.
    See: https://stackoverflow.com/a/56944256
    """

    _RED = "\x1b[31m"
    _BOLD_RED = "\x1b[31;1m"
    _GREEN = "\x1b[32m"
    _YELLOW = "\x1b[33m"
    _PURPLE = "\x1b[35m"
    _RESET = "\x1b[0m"

    _FORMAT = f"%(asctime)s.%(msecs)03d [{{color}}%(levelname)s{_RESET}] (%(name)s) %(message)s"
    _DATETIME = "%H:%M:%S"

    _FORMATS = {
        logging.NOTSET: _FORMAT.format(color=""),
        logging.DEBUG: _FORMAT.format(color=_PURPLE),
        logging.INFO: _FORMAT.format(color=_GREEN),
        logging.WARNING: _FORMAT.format(color=_YELLOW),
        logging.ERROR: _FORMAT.format(color=_BOLD_RED),
        logging.CRITICAL: _FORMAT.format(color=_RED),
    }

    def format(self, record):
        level_format = self._FORMATS.get(record.levelno)
        formatter = logging.Formatter(level_format, self._DATETIME)
        return formatter.format(record)

Logging

Logging utility.

Attributes:

Name Type Description
lock

For code synchronization

level

Minimal log level to use for all new logging instances.

Source code in docs/rs-server/services/common/rs_server_common/utils/logging.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class Logging:  # pylint: disable=too-few-public-methods
    """
    Logging utility.

    Attributes:
        lock: For code synchronization
        level: Minimal log level to use for all new logging instances.
    """

    lock = Lock()
    level = logging.DEBUG

    @classmethod
    def default(cls, name="rspy"):
        """
        Return a default Logger class instance.

        Args:
            name (str): Logger name. You can pass __name__ to use your current module name.
        """
        logger = logging.getLogger(name=name)

        with cls.lock:
            # Don't propagate to root logger
            logger.propagate = False

            # If we have already set the handlers for the logger with this name, do nothing more
            if logger.hasHandlers():
                return logger

            # Set the minimal log level to use for all new logging instances.
            logger.setLevel(cls.level)

            # Create console handler
            handler = logging.StreamHandler()
            handler.setFormatter(CustomFormatter())
            logger.addHandler(handler)

            # Export logs to Loki, see: https://pypi.org/project/python-logging-loki/
            # Note: on the cluster, this is not used. Promtail already forwards stdout to Loki.
            loki_endpoint = os.getenv("LOKI_ENDPOINT")
            if loki_endpoint and settings.SERVICE_NAME:
                handler = logging_loki.LokiQueueHandler(
                    Queue(-1),
                    url=loki_endpoint,
                    tags={"service": settings.SERVICE_NAME},
                    # auth=("username", "password"),
                    version="1",
                )
                handler.setFormatter(CustomFormatter())
                logger.addHandler(handler)

            return logger

default(name='rspy') classmethod

Return a default Logger class instance.

Parameters:

Name Type Description Default
name str

Logger name. You can pass name to use your current module name.

'rspy'
Source code in docs/rs-server/services/common/rs_server_common/utils/logging.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@classmethod
def default(cls, name="rspy"):
    """
    Return a default Logger class instance.

    Args:
        name (str): Logger name. You can pass __name__ to use your current module name.
    """
    logger = logging.getLogger(name=name)

    with cls.lock:
        # Don't propagate to root logger
        logger.propagate = False

        # If we have already set the handlers for the logger with this name, do nothing more
        if logger.hasHandlers():
            return logger

        # Set the minimal log level to use for all new logging instances.
        logger.setLevel(cls.level)

        # Create console handler
        handler = logging.StreamHandler()
        handler.setFormatter(CustomFormatter())
        logger.addHandler(handler)

        # Export logs to Loki, see: https://pypi.org/project/python-logging-loki/
        # Note: on the cluster, this is not used. Promtail already forwards stdout to Loki.
        loki_endpoint = os.getenv("LOKI_ENDPOINT")
        if loki_endpoint and settings.SERVICE_NAME:
            handler = logging_loki.LokiQueueHandler(
                Queue(-1),
                url=loki_endpoint,
                tags={"service": settings.SERVICE_NAME},
                # auth=("username", "password"),
                version="1",
            )
            handler.setFormatter(CustomFormatter())
            logger.addHandler(handler)

        return logger

OpenTelemetry utility

init_traces(app, service_name)

Init instrumentation of OpenTelemetry traces.

Parameters:

Name Type Description Default
app FastAPI

FastAPI application

required
service_name str

service name

required
Source code in docs/rs-server/services/common/rs_server_common/utils/opentelemetry.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def init_traces(app: fastapi.FastAPI, service_name: str):
    """
    Init instrumentation of OpenTelemetry traces.

    Args:
        app (fastapi.FastAPI): FastAPI application
        service_name (str): service name
    """

    # See: https://github.com/softwarebloat/python-tracing-demo/tree/main

    # Don't call this line from pytest because it causes errors:
    # Transient error StatusCode.UNAVAILABLE encountered while exporting metrics to localhost:4317, retrying in ..s.
    if not FROM_PYTEST:
        tempo_endpoint = os.getenv("TEMPO_ENDPOINT")
        if not tempo_endpoint:
            return

        # TODO: to avoid errors in local mode:
        # Transient error StatusCode.UNAVAILABLE encountered while exporting metrics to localhost:4317, retrying in ..s.
        #
        # The below line does not work either but at least we have less error messages.
        # See: https://pforge-exchange2.astrium.eads.net/jira/browse/RSPY-221?focusedId=162092&
        # page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-162092
        #
        # Now we have a single line error, which is less worst:
        # Failed to export metrics to tempo:4317, error code: StatusCode.UNIMPLEMENTED
        os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = tempo_endpoint

    otel_resource = Resource(attributes={"service.name": service_name})
    otel_tracer = TracerProvider(resource=otel_resource)
    trace.set_tracer_provider(otel_tracer)

    if not FROM_PYTEST:
        otel_tracer.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint=tempo_endpoint)))

    FastAPIInstrumentor.instrument_app(app, tracer_provider=otel_tracer)
    # logger.debug(f"OpenTelemetry instrumentation of 'fastapi.FastAPIInstrumentor'")

    # Instrument all the dependencies under opentelemetry.instrumentation.*
    # NOTE: we need 'poetry run opentelemetry-bootstrap -a install' to install these.

    package = opentelemetry.instrumentation
    prefix = package.__name__ + "."
    classes = set()

    # We need an empty PYTHONPATH if the env var is missing
    os.environ["PYTHONPATH"] = os.getenv("PYTHONPATH", "")

    # Recursively find all package modules
    for _, module_str, _ in pkgutil.walk_packages(path=package.__path__, prefix=prefix, onerror=None):

        # Don't instrument these modules, they have errors, maybe we should see why
        if module_str in [
            "opentelemetry.instrumentation.tortoiseorm",
            "opentelemetry.instrumentation.auto_instrumentation.sitecustomize",
        ]:
            continue

        # Import and find all module classes
        __import__(module_str)
        for _, _class in inspect.getmembers(sys.modules[module_str]):
            if (not inspect.isclass(_class)) or (_class in classes):
                continue

            # Save the class (classes are found several times when imported by other modules)
            classes.add(_class)

            # Don't instrument these classes, they have errors, maybe we should see why
            if _class in [AsyncioInstrumentor, AwsLambdaInstrumentor, BaseInstrumentor, HTTPXClientInstrumentor]:
                continue

            # If the "instrument" method exists, call it
            _instrument = getattr(_class, "instrument", None)
            if callable(_instrument):

                _class_instance = _class()
                if _class == RequestsInstrumentor and os.getenv("OTEL_PYTHON_REQUESTS_TRACE_HEADERS"):
                    _class_instance.instrument(
                        tracer_provider=otel_tracer,
                        request_hook=request_hook,
                        response_hook=response_hook,
                    )
                elif not _class_instance.is_instrumented_by_opentelemetry:
                    _class_instance.instrument(tracer_provider=otel_tracer)

request_hook(span, request)

HTTP requests intrumentation

Source code in docs/rs-server/services/common/rs_server_common/utils/opentelemetry.py
42
43
44
45
46
47
def request_hook(span, request):
    """
    HTTP requests intrumentation
    """
    if span:
        span.set_attribute("http.request.headers", str(request.headers))

response_hook(span, request, response)

HTTP responses intrumentation

Source code in docs/rs-server/services/common/rs_server_common/utils/opentelemetry.py
50
51
52
53
54
55
def response_hook(span, request, response):  # pylint: disable=W0613
    """
    HTTP responses intrumentation
    """
    if span:
        span.set_attribute("http.response.headers", str(response.headers))

This module is used to share common functions between apis endpoints

check_and_fix_timerange(item)

This function ensures the item does not have a single timerange value

Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def check_and_fix_timerange(item: dict):
    """This function ensures the item does not have a single timerange value"""
    properties = item.get("properties", {})

    start_dt = properties.get("start_datetime")
    end_dt = properties.get("end_datetime")
    dt = properties.get("datetime")

    if start_dt and not end_dt:
        properties["end_datetime"] = max(start_dt, dt) if dt else start_dt
        logger.warning(f"Forced end_datetime property in {item}")
    elif end_dt and not start_dt:
        properties.pop("end_datetime", None)
        logger.warning(f"Removed end_datetime property from {item}")

extract_eo_product(eo_product, mapper)

This function is creating key:value pairs from an EOProduct properties

Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
213
214
215
216
217
218
def extract_eo_product(eo_product: EOProduct, mapper: dict) -> dict:
    """This function is creating key:value pairs from an EOProduct properties"""
    eo_product.properties.update(
        {item.get("Name", None): item.get("Value", None) for item in eo_product.properties.get("attrs", [])},
    )
    return {key: value for key, value in eo_product.properties.items() if key in mapper.values()}

is_valid_date_format(date)

Check if a string adheres to the expected date format "YYYY-MM-DDTHH:MM:SS[.sss]Z".

Parameters:

Name Type Description Default
date str

The string to be validated for the specified date format.

required

Returns:

Name Type Description
bool bool

True if the input string adheres to the expected date format, otherwise False.

Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def is_valid_date_format(date: str) -> bool:
    """Check if a string adheres to the expected date format "YYYY-MM-DDTHH:MM:SS[.sss]Z".

    Args:
        date (str): The string to be validated for the specified date format.

    Returns:
        bool: True if the input string adheres to the expected date format, otherwise False.

    """
    try:
        datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")  # test without milliseconds
        return True
    except ValueError:
        try:
            datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%fZ")  # test with milliseconds
            return True
        except ValueError:
            pass
    return False

odata_to_stac(feature_template, odata_dict, odata_stac_mapper)

Maps OData values to a given STAC template.

Parameters:

Name Type Description Default
feature_template dict

The STAC feature template to be populated.

required
odata_dict dict

The dictionary containing OData values.

required
odata_stac_mapper dict

The mapping dictionary for converting OData keys to STAC properties.

required

Returns:

Name Type Description
dict dict

The populated STAC feature template.

Raises:

Type Description
ValueError

If the provided STAC feature template is invalid.

Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def odata_to_stac(feature_template: dict, odata_dict: dict, odata_stac_mapper: dict) -> dict:
    """
    Maps OData values to a given STAC template.

    Args:
        feature_template (dict): The STAC feature template to be populated.
        odata_dict (dict): The dictionary containing OData values.
        odata_stac_mapper (dict): The mapping dictionary for converting OData keys to STAC properties.

    Returns:
        dict: The populated STAC feature template.

    Raises:
        ValueError: If the provided STAC feature template is invalid.
    """
    if not all(item in feature_template.keys() for item in ["properties", "id", "assets"]):
        raise ValueError("Invalid stac feature template")
    for stac_key, eodag_key in odata_stac_mapper.items():
        if eodag_key in odata_dict:
            if stac_key in feature_template["properties"]:
                feature_template["properties"][stac_key] = odata_dict[eodag_key]
            elif stac_key == "id":
                feature_template["id"] = odata_dict[eodag_key]
            elif stac_key in feature_template["assets"]["file"]:
                feature_template["assets"]["file"][stac_key] = odata_dict[eodag_key]
    # to pass pydantic validation, make sure we don't have a single timerange value
    check_and_fix_timerange(feature_template)
    return feature_template

validate_inputs_format(date_time, raise_errors=True)

Validate the format and content of a time interval string.

This function checks whether the provided time interval string is in a valid format and whether the start and stop dates conform to the ISO 8601 standard. It supports a variety of interval formats, including open-ended intervals.

Parameters:

Name Type Description Default
date_time str

The time interval string to validate. Supported formats include: - "2024-01-01T00:00:00Z/2024-01-02T23:59:59Z" (closed interval) - "../2024-01-02T23:59:59Z" (open start interval) - "2024-01-01T00:00:00Z/.." (open end interval) - "2024-01-01T00:00:00Z" (fixed date)

required
raise_errors bool

If True, raises an exception for invalid input. If False, returns [None, None, None] for invalid input.

True

Returns:

Type Description
Any

List[Union[datetime, None]]: A list containing three elements: - fixed_date (datetime or None): The single fixed date if applicable. - start_date (datetime or None): The start date of the interval. - stop_date (datetime or None): The stop date of the interval. Returns [None, None, None] if the input is invalid or empty.

Raises:

Type Description
HTTPException

If raise_errors is True and the input is invalid, an HTTP 400 or 422 error is raised.

Note
  • The input interval should use the ISO 8601 format for dates and times.
  • If using an open-ended interval, one side of the interval can be omitted (e.g., "../2024-01-02T23:59:59Z").
Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def validate_inputs_format(
    date_time: str,
    raise_errors: bool = True,
) -> Any:
    """
    Validate the format and content of a time interval string.

    This function checks whether the provided time interval string is in a valid format and
    whether the start and stop dates conform to the ISO 8601 standard. It supports a variety
    of interval formats, including open-ended intervals.

    Args:
        date_time (str): The time interval string to validate. Supported formats include:
            - "2024-01-01T00:00:00Z/2024-01-02T23:59:59Z" (closed interval)
            - "../2024-01-02T23:59:59Z" (open start interval)
            - "2024-01-01T00:00:00Z/.." (open end interval)
            - "2024-01-01T00:00:00Z" (fixed date)
        raise_errors (bool): If True, raises an exception for invalid input.
            If False, returns [None, None, None] for invalid input.

    Returns:
        List[Union[datetime, None]]: A list containing three elements:
            - fixed_date (datetime or None): The single fixed date if applicable.
            - start_date (datetime or None): The start date of the interval.
            - stop_date (datetime or None): The stop date of the interval.
            Returns [None, None, None] if the input is invalid or empty.

    Raises:
        HTTPException: If `raise_errors` is True and the input is invalid, an HTTP 400 or 422
            error is raised.

    Note:
        - The input interval should use the ISO 8601 format for dates and times.
        - If using an open-ended interval, one side of the interval can be omitted
          (e.g., "../2024-01-02T23:59:59Z").
    """
    fixed_date, start_date, stop_date = "", "", ""
    if not date_time:
        return None, None, None
    try:
        if "/" in date_time:
            # Open/Closed interval, ../2018-02-12T23:20:50Z or 2018-02-12T23:20:50Z/..
            start_date, stop_date = date_time.split("/")
        else:
            fixed_date = date_time
    except ValueError as exc:
        logger.error("Missing start or stop in endpoint call!")
        raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Missing start/stop") from exc

    for date in [fixed_date, start_date, stop_date]:
        if date.strip("'\".") and not is_valid_date_format(date):
            logger.info("Invalid start/stop in endpoint call!")
            if raise_errors:
                raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Missing start/stop")
            return None, None, None

    def to_dt(dates) -> list[Any]:
        """Converts a list of date strings to datetime objects or None if the conversion fails."""
        return [datetime.fromisoformat(date) if is_valid_date(date) else None for date in dates]

    def is_valid_date(date: str) -> bool:
        """Check if the string can be converted to a valid datetime."""
        try:
            datetime.fromisoformat(date)
            return True
        except ValueError:
            return False

    fixed_date_dt, start_date_dt, stop_date_dt = to_dt([fixed_date, start_date, stop_date])

    # if fixed_date_dt and "." not in fixed_date:
    #     # If miliseconds are not defined, don't set to .000Z create a timeinterval, to gather all products
    #     # from that milisecond
    #     start_date_dt = fixed_date_dt.replace(microsecond=0)  # type: ignore
    #     stop_date_dt = fixed_date_dt.replace(microsecond=999999)  # type: ignore
    #     fixed_date_dt = None
    #     return fixed_date_dt, start_date_dt, stop_date_dt
    # if stop_date_dt and "." not in stop_date:
    #     # If stop_date interval miliseconds value is not defined, set it to 999
    #     stop_date_dt = stop_date_dt.replace(microsecond=999999)  # type: ignore

    return fixed_date_dt, start_date_dt, stop_date_dt

validate_sort_input(sortby)

Used to transform stac sort parameter to odata type. -datetime = startTimeFromAscendingNode DESC.

Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
221
222
223
224
225
226
def validate_sort_input(sortby: str):
    """Used to transform stac sort parameter to odata type.
    -datetime = startTimeFromAscendingNode DESC.
    """
    sortby = sortby.strip("'\"").lower()
    return [(sortby[1:], "DESC" if sortby[0] == "-" else "ASC")]

validate_str_list(parameter)

Validates and parses a parameter that can be either a string or a comma-separated list of strings.

The function processes the input parameter to: - Strip whitespace from each item in a comma-separated list. - Return a single string if the list has only one item. - Return a list of strings if the input contains multiple valid items.

Examples:

  • Input: 'S1A' Output: 'S1A' (str)

  • Input: 'S1A, S2B' Output: ['S1A', 'S2B'] (list of str)

# Test case bgfx, when input contains ',' but not a validd value, output should not be ['S1A', ''] - Input: 'S1A,' Output: 'S1A' (str)

  • Input: 'S1A, S2B, ' Output: ['S1A', 'S2B'] (list of str)
Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def validate_str_list(parameter: str) -> list | str:
    """
    Validates and parses a parameter that can be either a string or a comma-separated list of strings.

    The function processes the input parameter to:
    - Strip whitespace from each item in a comma-separated list.
    - Return a single string if the list has only one item.
    - Return a list of strings if the input contains multiple valid items.

    Examples:
        - Input: 'S1A'
          Output: 'S1A' (str)

        - Input: 'S1A, S2B'
          Output: ['S1A', 'S2B'] (list of str)

          # Test case bgfx, when input contains ',' but not a validd value, output should not be ['S1A', '']
        - Input: 'S1A,'
          Output: 'S1A' (str)

        - Input: 'S1A, S2B, '
          Output: ['S1A', 'S2B'] (list of str)
    """
    if parameter and "," in parameter:
        items = [item.strip() for item in parameter.split(",") if item.strip()]
        return items if len(items) > 1 else items[0]
    return parameter

EODAG Provider.

CustomEODataAccessGateway

Bases: EODataAccessGateway

EODataAccessGateway with a custom config directory management.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class CustomEODataAccessGateway(EODataAccessGateway):
    """EODataAccessGateway with a custom config directory management."""

    def __init__(self, *args, **kwargs):
        """Constructor"""

        self.lock = Lock()

        # Init environment
        self.eodag_cfg_dir = tempfile.TemporaryDirectory()  # pylint: disable=consider-using-with
        os.environ["EODAG_CFG_DIR"] = self.eodag_cfg_dir.name
        # disable product types discovery
        os.environ["EODAG_EXT_PRODUCT_TYPES_CFG_FILE"] = ""

        # Environment variable values, the last time we checked them. They will be read by eodag.
        self.old_environ = dict(os.environ)

        # Init eodag instance
        super().__init__(*args, **kwargs)

    def __del__(self):
        """Destructor"""
        try:
            shutil.rmtree(self.eodag_cfg_dir.name)  # remove the unique /tmp dir
        except FileNotFoundError:
            pass

    @classmethod
    @lru_cache
    def create(cls, *args, **kwargs):
        """Return a cached instance of the class."""
        return cls(*args, **kwargs)

    def override_config_from_env(self):
        """
        Update the eodag conf from the latest EODAG__<provider>__auth__... env vars
        that are set in authentication_to_external.py, if they have changed
        """
        with self.lock:  # safer to use a thread lock before calling eodag and modifying a global var
            if (new_environ := dict(os.environ)) != self.old_environ:
                self.old_environ = new_environ
                override_config_from_env(self.providers_config)

__del__()

Destructor

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
66
67
68
69
70
71
def __del__(self):
    """Destructor"""
    try:
        shutil.rmtree(self.eodag_cfg_dir.name)  # remove the unique /tmp dir
    except FileNotFoundError:
        pass

__init__(*args, **kwargs)

Constructor

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def __init__(self, *args, **kwargs):
    """Constructor"""

    self.lock = Lock()

    # Init environment
    self.eodag_cfg_dir = tempfile.TemporaryDirectory()  # pylint: disable=consider-using-with
    os.environ["EODAG_CFG_DIR"] = self.eodag_cfg_dir.name
    # disable product types discovery
    os.environ["EODAG_EXT_PRODUCT_TYPES_CFG_FILE"] = ""

    # Environment variable values, the last time we checked them. They will be read by eodag.
    self.old_environ = dict(os.environ)

    # Init eodag instance
    super().__init__(*args, **kwargs)

create(*args, **kwargs) cached classmethod

Return a cached instance of the class.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
73
74
75
76
77
@classmethod
@lru_cache
def create(cls, *args, **kwargs):
    """Return a cached instance of the class."""
    return cls(*args, **kwargs)

override_config_from_env()

Update the eodag conf from the latest EODAG__auth... env vars that are set in authentication_to_external.py, if they have changed

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
79
80
81
82
83
84
85
86
87
def override_config_from_env(self):
    """
    Update the eodag conf from the latest EODAG__<provider>__auth__... env vars
    that are set in authentication_to_external.py, if they have changed
    """
    with self.lock:  # safer to use a thread lock before calling eodag and modifying a global var
        if (new_environ := dict(os.environ)) != self.old_environ:
            self.old_environ = new_environ
            override_config_from_env(self.providers_config)

EodagProvider

Bases: Provider

An EODAG provider.

It uses EODAG to provide data from external sources.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
class EodagProvider(Provider):
    """An EODAG provider.

    It uses EODAG to provide data from external sources.
    """

    def __init__(self, config_file: Path, provider: str):  # type: ignore
        """Create a EODAG provider.

        Args:
            config_file: the path to the eodag configuration file
            provider: the name of the eodag provider
        """
        self.provider: str = provider
        self.config_file = config_file.resolve().as_posix()
        try:
            with global_lock:  # use a thread lock before calling the lru_cache
                self.client = CustomEODataAccessGateway.create(self.config_file)
        except Exception as e:
            raise CreateProviderFailed(f"Can't initialize {self.provider} provider") from e
        self.client.set_preferred_provider(self.provider)

        # If the eodag object was already existing and retrieved from the lru_cache,
        # we need to update its configuration from the latest env vars, if they have changed
        self.client.override_config_from_env()

    def _specific_search(self, **kwargs) -> SearchResult | list:
        """
        Conducts a search for products using the specified OData arguments.

        This private method interfaces with the EODAG client's search functionality
        to retrieve products that match the given search parameters. It handles
        special cases such as `PublicationDate` and session ID lists while enforcing
        pagination constraints as per provider limitations.

        Args:
            **kwargs: Arbitrary keyword arguments specifying search parameters,
                including all queryables defined in the provider's configuration as OData arguments.

        Returns:
            Union[SearchResult, List]: A `SearchResult` object containing the matched products
            or an empty list if no matches are found.

        Raises:
            HTTPException: If a validation error occurs in the search query.
            SearchProductFailed: If the search request fails due to request errors,
                misconfiguration, or authentication issues.
            ValueError: If authentication with EODAG fails.

        Notes:
            - Ensures compliance with provider-specific constraints, such as pagination limits.
            - Logs encountered errors and provides detailed messages in case of failures.
        """

        mapped_search_args: dict[str, str | None] = {}
        if session_id := kwargs.pop("SessionId", None):
            # Map session_id to the appropriate eodag parameter
            session_id = session_id[0] if len(session_id) == 1 else session_id
            key = "SessionIds" if isinstance(session_id, list) else "SessionId"
            value = ", ".join(f"'{s}'" for s in session_id) if isinstance(session_id, list) else f"'{session_id}'"
            mapped_search_args[key] = value

        if kwargs.pop("sessions_search", False):
            # If request is for session search, handle platform - if any provided.
            platform = kwargs.pop("Satellite", None)

            if platform:
                key = "platforms" if isinstance(platform, list) else "platform"
                value = ", ".join(f"'{p}'" for p in platform) if isinstance(platform, list) else f"'{platform}'"
                mapped_search_args[key] = value

        if date_time := kwargs.pop("PublicationDate", False):
            # Since now both for files and sessions, time interval is optional, map it if provided.
            fixed, start, end = (str(date) if date else None for date in date_time)
            mapped_search_args.update(
                {
                    "PublicationDate": fixed,
                    "StartPublicationDate": start,
                    "StopPublicationDate": end,
                },
            )
        max_items_allowed = int(self.client.providers_config[self.provider].search.pagination["max_items_per_page"])
        if int(kwargs["items_per_page"]) > max_items_allowed:
            logger.warning(
                f"Requesting {kwargs['items_per_page']} exceeds maximum of {max_items_allowed} "
                "allowed for this provider!",
            )
            logger.warning(f"Number of items per page was set to {max_items_allowed - 1}.")
            kwargs["items_per_page"] = max_items_allowed - 1
        try:
            logger.info(f"Searching from {self.provider} with parameters {mapped_search_args}")
            # Start search -> user defined search params in mapped_search_args (id), pagination in kwargs (top, limit).
            # search_method = self.client.search if "session" not in self.provider else self.client.search_iter_page
            products = self.client.search(
                **mapped_search_args,  # type: ignore
                provider=self.provider,
                raise_errors=True,
                productType="S1_SAR_RAW" if "adgs" not in self.provider.lower() else "CAMS_GRF_AUX",
                **kwargs,
            )
            repr(products)  # trigger eodag validation.

        except ValidationError as exc:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=exc.message) from exc
        except (RequestError, MisconfiguredError) as e:
            # invalid token: EODAG returns an exception with "FORBIDDEN" in e.args when the token key is invalid.
            if e.args and "FORBIDDEN" in e.args[0]:
                raise SearchProductFailed(
                    f"Can't search provider {self.provider} " "because the used token is not valid",
                ) from e
            logger.debug(e)
            raise SearchProductFailed(e) from e
        except AuthenticationError as exc:
            raise ValueError("EoDAG could not authenticate") from exc

        if products.number_matched:
            logger.info(f"Returned {products.number_matched} session from {self.provider}")

        if products.errors:
            logger.error(f"Errors from {self.provider}: {products.errors}")
        return products

    def download(self, product_id: str, to_file: Path) -> None:
        """Download the expected product at the given local location.

        EODAG needs an EOProduct to download.
        We build an EOProduct from the id and download location
        to be able to call EODAG for download.


        Args:
            product_id: the id of the product to download
            to_file: the path where the product has to be download

        Returns:
            None

        """
        # Dirty fix for eodag: change extension
        org_file = to_file
        to_file = to_file.with_suffix(to_file.suffix + "_fix_eodag")

        # Use thread-lock because self.client.download is not thread-safe
        with self.client.lock:
            product = self.create_eodag_product(product_id, to_file.name)
            # download_plugin = self.client._plugins_manager.get_download_plugin(product)
            # authent_plugin = self.client._plugins_manager.get_auth_plugin(product.provider)
            # product.register_downloader(download_plugin, authent_plugin)
            self.client.download(product, output_dir=str(to_file.parent))

            # Dirty fix continued: rename the download directory
            if to_file.is_dir() and (not org_file.is_dir()):
                to_file.rename(org_file)

    def create_eodag_product(self, product_id: str, filename: str):
        """Initialize an EO product with minimal properties.

        The title is used by EODAG as the name of the downloaded file.
        The download link is used by EODAG as http request url for download.
        The geometry is mandatory in an EO Product so we add the all earth as geometry.

        Args:
            product_id (str): the id of EO Product
            filename (str): the name of the downloaded file

        Returns:
            product (EOProduct): the initialized EO Product

        """
        try:
            with open(self.config_file, encoding="utf-8") as f:
                base_uri = yaml.safe_load(f)[self.provider.lower()]["download"]["base_uri"]
            return EOProduct(
                self.provider,
                {
                    "id": product_id,
                    "title": filename,
                    "geometry": "POLYGON((180 -90, 180 90, -180 90, -180 -90, 180 -90))",
                    # TODO build from configuration (but how ?)
                    "downloadLink": f"{base_uri}({product_id})/$value",
                },
            )
        except Exception as e:
            raise CreateProviderFailed(f"Can't initialize {self.provider} download provider") from e

__init__(config_file, provider)

Create a EODAG provider.

Parameters:

Name Type Description Default
config_file Path

the path to the eodag configuration file

required
provider str

the name of the eodag provider

required
Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def __init__(self, config_file: Path, provider: str):  # type: ignore
    """Create a EODAG provider.

    Args:
        config_file: the path to the eodag configuration file
        provider: the name of the eodag provider
    """
    self.provider: str = provider
    self.config_file = config_file.resolve().as_posix()
    try:
        with global_lock:  # use a thread lock before calling the lru_cache
            self.client = CustomEODataAccessGateway.create(self.config_file)
    except Exception as e:
        raise CreateProviderFailed(f"Can't initialize {self.provider} provider") from e
    self.client.set_preferred_provider(self.provider)

    # If the eodag object was already existing and retrieved from the lru_cache,
    # we need to update its configuration from the latest env vars, if they have changed
    self.client.override_config_from_env()

create_eodag_product(product_id, filename)

Initialize an EO product with minimal properties.

The title is used by EODAG as the name of the downloaded file. The download link is used by EODAG as http request url for download. The geometry is mandatory in an EO Product so we add the all earth as geometry.

Parameters:

Name Type Description Default
product_id str

the id of EO Product

required
filename str

the name of the downloaded file

required

Returns:

Name Type Description
product EOProduct

the initialized EO Product

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def create_eodag_product(self, product_id: str, filename: str):
    """Initialize an EO product with minimal properties.

    The title is used by EODAG as the name of the downloaded file.
    The download link is used by EODAG as http request url for download.
    The geometry is mandatory in an EO Product so we add the all earth as geometry.

    Args:
        product_id (str): the id of EO Product
        filename (str): the name of the downloaded file

    Returns:
        product (EOProduct): the initialized EO Product

    """
    try:
        with open(self.config_file, encoding="utf-8") as f:
            base_uri = yaml.safe_load(f)[self.provider.lower()]["download"]["base_uri"]
        return EOProduct(
            self.provider,
            {
                "id": product_id,
                "title": filename,
                "geometry": "POLYGON((180 -90, 180 90, -180 90, -180 -90, 180 -90))",
                # TODO build from configuration (but how ?)
                "downloadLink": f"{base_uri}({product_id})/$value",
            },
        )
    except Exception as e:
        raise CreateProviderFailed(f"Can't initialize {self.provider} download provider") from e

download(product_id, to_file)

Download the expected product at the given local location.

EODAG needs an EOProduct to download. We build an EOProduct from the id and download location to be able to call EODAG for download.

Parameters:

Name Type Description Default
product_id str

the id of the product to download

required
to_file Path

the path where the product has to be download

required

Returns:

Type Description
None

None

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def download(self, product_id: str, to_file: Path) -> None:
    """Download the expected product at the given local location.

    EODAG needs an EOProduct to download.
    We build an EOProduct from the id and download location
    to be able to call EODAG for download.


    Args:
        product_id: the id of the product to download
        to_file: the path where the product has to be download

    Returns:
        None

    """
    # Dirty fix for eodag: change extension
    org_file = to_file
    to_file = to_file.with_suffix(to_file.suffix + "_fix_eodag")

    # Use thread-lock because self.client.download is not thread-safe
    with self.client.lock:
        product = self.create_eodag_product(product_id, to_file.name)
        # download_plugin = self.client._plugins_manager.get_download_plugin(product)
        # authent_plugin = self.client._plugins_manager.get_auth_plugin(product.provider)
        # product.register_downloader(download_plugin, authent_plugin)
        self.client.download(product, output_dir=str(to_file.parent))

        # Dirty fix continued: rename the download directory
        if to_file.is_dir() and (not org_file.is_dir()):
            to_file.rename(org_file)

Provider mechanism.

CreateProviderFailed

Bases: Exception

Exception raised when an error occurred during the init of a provider.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
53
54
class CreateProviderFailed(Exception):
    """Exception raised when an error occurred during the init of a provider."""

DownloadProductFailed

Bases: Exception

Exception raised when an error occurred during the download.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
61
62
class DownloadProductFailed(Exception):
    """Exception raised when an error occurred during the download."""

Product dataclass

A product.

A product has an external identifier and a dictionary of metadata.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
42
43
44
45
46
47
48
49
50
@dataclass
class Product:
    """A product.

    A product has an external identifier and a dictionary of metadata.
    """

    id_: str
    metadata: dict[str, str]

Provider

Bases: ABC

A product provider.

A provider gives a common interface to search for files from an external data source and download them locally.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
class Provider(ABC):
    """A product provider.

    A provider gives a common interface to search for files from an external data source
    and download them locally.
    """

    def search(self, **kwargs) -> Any:
        """Search for products with the given time range.

        The search result is a dictionary of products found indexed by id.

        Returns:
            The files found indexed by file id. Specific to each provider.

        """
        time = kwargs.get("datetime", [None, None])
        between = TimeRange(start, stop) if (start := time[1]) and (stop := time[2]) else None
        if between:
            if between.duration() == timedelta(0):
                return []
            if between.duration() < timedelta(0):
                raise SearchProductFailed(f"Search timerange is inverted : ({between.start} -> {between.end})")
        return self._specific_search(**kwargs)

    @abstractmethod
    def _specific_search(self, **kwargs) -> Any:
        """Search for products with the given query parameters.

        Specific search for products after common verification.

        Returns:
            the files found indexed by file id.

        """

    @abstractmethod
    def download(self, product_id: str, to_file: Path) -> None:
        """Download the given product to the given local path.

        Args:
            product_id: id of the product to download
            to_file: path where the file should be downloaded

        Returns:
            None

        """

download(product_id, to_file) abstractmethod

Download the given product to the given local path.

Parameters:

Name Type Description Default
product_id str

id of the product to download

required
to_file Path

path where the file should be downloaded

required

Returns:

Type Description
None

None

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
101
102
103
104
105
106
107
108
109
110
111
112
@abstractmethod
def download(self, product_id: str, to_file: Path) -> None:
    """Download the given product to the given local path.

    Args:
        product_id: id of the product to download
        to_file: path where the file should be downloaded

    Returns:
        None

    """

search(**kwargs)

Search for products with the given time range.

The search result is a dictionary of products found indexed by id.

Returns:

Type Description
Any

The files found indexed by file id. Specific to each provider.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def search(self, **kwargs) -> Any:
    """Search for products with the given time range.

    The search result is a dictionary of products found indexed by id.

    Returns:
        The files found indexed by file id. Specific to each provider.

    """
    time = kwargs.get("datetime", [None, None])
    between = TimeRange(start, stop) if (start := time[1]) and (stop := time[2]) else None
    if between:
        if between.duration() == timedelta(0):
            return []
        if between.duration() < timedelta(0):
            raise SearchProductFailed(f"Search timerange is inverted : ({between.start} -> {between.end})")
    return self._specific_search(**kwargs)

SearchProductFailed

Bases: Exception

Exception raised when an error occurred during the search.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
57
58
class SearchProductFailed(Exception):
    """Exception raised when an error occurred during the search."""

TimeRange dataclass

A time range.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@dataclass
class TimeRange:
    """A time range."""

    start: datetime
    end: datetime

    def duration(self) -> timedelta:
        """Duration of the timerange.

        Returns: duration of the timerange
        """
        return self.end - self.start

    def __bool__(self) -> bool:
        return self.start is not None and self.end is not None

duration()

Duration of the timerange.

Returns: duration of the timerange

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
31
32
33
34
35
36
def duration(self) -> timedelta:
    """Duration of the timerange.

    Returns: duration of the timerange
    """
    return self.end - self.start

TODO Docstring to be added.

CustomSessionRedirect

Bases: Session

Custom session to handle HTTP 307 redirects and retain Authorization headers for allowed hosts.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class CustomSessionRedirect(requests.Session):
    """
    Custom session to handle HTTP 307 redirects and retain Authorization headers for allowed hosts.
    """

    def __init__(self, trusted_domains=list[str] | None):
        """
        Initialize the CustomSession instance.

        Args:
            trusted_domains (list): List of allowed hosts for redirection in case of change of protocol (HTTP <> HTTPS).
        """
        super().__init__()
        self.trusted_domains: list[str] = trusted_domains or []  # List of allowed hosts for redirection

    def should_strip_auth(self, old_url, new_url) -> bool:
        """
        Override the default behavior of stripping Authorization headers during redirection.

        Args:
            old_url (str): The URL of the original request.
            new_url (str): The URL to which the request is redirected.

        Returns:
            bool: Whether to strip Authorization headers (False to retain them).
        """
        old_parsed = urlparse(old_url)
        new_parsed = urlparse(new_url)

        # Check if the new host is in the allowed list
        # Also, include the original domain as an implicitly allowed domain
        if new_parsed.hostname == old_parsed.hostname or new_parsed.hostname in self.trusted_domains:
            # Allow protocol changes (HTTPS -> HTTP or vice versa) within the same or trusted hosts
            return False  # Do not strip auth

        return super().should_strip_auth(old_url, new_url)

__init__(trusted_domains=list[str] | None)

Initialize the CustomSession instance.

Parameters:

Name Type Description Default
trusted_domains list

List of allowed hosts for redirection in case of change of protocol (HTTP <> HTTPS).

list[str] | None
Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
114
115
116
117
118
119
120
121
122
def __init__(self, trusted_domains=list[str] | None):
    """
    Initialize the CustomSession instance.

    Args:
        trusted_domains (list): List of allowed hosts for redirection in case of change of protocol (HTTP <> HTTPS).
    """
    super().__init__()
    self.trusted_domains: list[str] = trusted_domains or []  # List of allowed hosts for redirection

should_strip_auth(old_url, new_url)

Override the default behavior of stripping Authorization headers during redirection.

Parameters:

Name Type Description Default
old_url str

The URL of the original request.

required
new_url str

The URL to which the request is redirected.

required

Returns:

Name Type Description
bool bool

Whether to strip Authorization headers (False to retain them).

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def should_strip_auth(self, old_url, new_url) -> bool:
    """
    Override the default behavior of stripping Authorization headers during redirection.

    Args:
        old_url (str): The URL of the original request.
        new_url (str): The URL to which the request is redirected.

    Returns:
        bool: Whether to strip Authorization headers (False to retain them).
    """
    old_parsed = urlparse(old_url)
    new_parsed = urlparse(new_url)

    # Check if the new host is in the allowed list
    # Also, include the original domain as an implicitly allowed domain
    if new_parsed.hostname == old_parsed.hostname or new_parsed.hostname in self.trusted_domains:
        # Allow protocol changes (HTTPS -> HTTP or vice versa) within the same or trusted hosts
        return False  # Do not strip auth

    return super().should_strip_auth(old_url, new_url)

GetKeysFromS3Config dataclass

S3 configuration for download

Attributes:

Name Type Description
s3_files list

A list with the S3 object keys to be downloaded.

bucket str

The S3 bucket name.

local_prefix str

The local prefix where files will be downloaded.

overwrite bool

Flag indicating whether to overwrite existing files. Default is False.

max_retries int

The maximum number of download retries. Default is DWN_S3FILE_RETRIES.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@dataclass
class GetKeysFromS3Config:
    """S3 configuration for download

    Attributes:
        s3_files (list): A list with the  S3 object keys to be downloaded.
        bucket (str): The S3 bucket name.
        local_prefix (str): The local prefix where files will be downloaded.
        overwrite (bool, optional): Flag indicating whether to overwrite existing files. Default is False.
        max_retries (int, optional): The maximum number of download retries. Default is DWN_S3FILE_RETRIES.

    """

    s3_files: list
    bucket: str
    local_prefix: str
    overwrite: bool = False
    max_retries: int = DWN_S3FILE_RETRIES

PutFilesToS3Config dataclass

Configuration for uploading files to S3.

Attributes:

Name Type Description
files List

A list with the local file paths to be uploaded.

bucket str

The S3 bucket name.

s3_path str

The S3 path where files will be uploaded.

max_retries int

The maximum number of upload retries. Default is UP_S3FILE_RETRIES.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@dataclass
class PutFilesToS3Config:
    """Configuration for uploading files to S3.

    Attributes:
        files (List): A list with the local file paths to be uploaded.
        bucket (str): The S3 bucket name.
        s3_path (str): The S3 path where files will be uploaded.
        max_retries (int, optional): The maximum number of upload retries. Default is UP_S3FILE_RETRIES.

    """

    files: list
    bucket: str
    s3_path: str
    max_retries: int = UP_S3FILE_RETRIES

S3StorageHandler

Interacts with an S3 storage

S3StorageHandler for interacting with an S3 storage service.

Attributes:

Name Type Description
access_key_id str

The access key ID for S3 authentication.

secret_access_key str

The secret access key for S3 authentication.

endpoint_url str

The endpoint URL for the S3 service.

region_name str

The region name.

s3_client client

The s3 client to interact with the s3 storage

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
class S3StorageHandler:
    """Interacts with an S3 storage

    S3StorageHandler for interacting with an S3 storage service.

    Attributes:
        access_key_id (str): The access key ID for S3 authentication.
        secret_access_key (str): The secret access key for S3 authentication.
        endpoint_url (str): The endpoint URL for the S3 service.
        region_name (str): The region name.
        s3_client (boto3.client): The s3 client to interact with the s3 storage
    """

    def __init__(self, access_key_id, secret_access_key, endpoint_url, region_name):
        """Initialize the S3StorageHandler instance.

        Args:
            access_key_id (str): The access key ID for S3 authentication.
            secret_access_key (str): The secret access key for S3 authentication.
            endpoint_url (str): The endpoint URL for the S3 service.
            region_name (str): The region name.

        Raises:
            RuntimeError: If the connection to the S3 storage cannot be established.
        """
        self.logger = Logging.default(__name__)

        self.access_key_id = access_key_id
        self.secret_access_key = secret_access_key
        self.endpoint_url = endpoint_url
        self.region_name = region_name
        self.s3_client: boto3.client = None
        self.connect_s3()
        # Suppress botocore debug messages
        logging.getLogger("botocore").setLevel(logging.INFO)
        logging.getLogger("boto3").setLevel(logging.INFO)
        logging.getLogger("urllib3").setLevel(logging.INFO)
        self.logger.debug("S3StorageHandler created !")

    def __get_s3_client(self, access_key_id, secret_access_key, endpoint_url, region_name):
        """Retrieve or create an S3 client instance.

        Args:
            access_key_id (str): The access key ID for S3 authentication.
            secret_access_key (str): The secret access key for S3 authentication.
            endpoint_url (str): The endpoint URL for the S3 service.
            region_name (str): The region name.

        Returns:
            client (boto3): An S3 client instance.
        """

        client_config = botocore.config.Config(
            max_pool_connections=100,
            # timeout for connection
            connect_timeout=5,
            # attempts in trying connection
            # note:  the default behaviour of boto3 is retrying
            # connections multiple times and exponentially backing off in between
            retries={"total_max_attempts": S3_PROTOCOL_MAX_ATTEMPTS},
        )
        try:
            return boto3.client(
                "s3",
                aws_access_key_id=access_key_id,
                aws_secret_access_key=secret_access_key,
                endpoint_url=endpoint_url,
                region_name=region_name,
                config=client_config,
            )

        except Exception as e:
            self.logger.exception(f"Client error exception: {e}")
            raise RuntimeError("Client error exception ") from e

    def connect_s3(self):
        """Establish a connection to the S3 service.

        If the S3 client is not already instantiated, this method calls the private __get_s3_client
        method to create an S3 client instance using the provided credentials and configuration (see __init__).
        """
        if self.s3_client is None:
            self.s3_client = self.__get_s3_client(
                self.access_key_id,
                self.secret_access_key,
                self.endpoint_url,
                self.region_name,
            )

    def disconnect_s3(self):
        """Close the connection to the S3 service."""
        if self.s3_client is None:
            return
        self.s3_client.close()
        self.s3_client = None

    def delete_file_from_s3(self, bucket, key, max_retries=S3_MAX_RETRIES):
        """Delete a file from S3.
        The functionality implies a retry mechanism at the application level, which is different
        than the retry mechanism from the s3 protocol level, with "retries" parameter from the s3 Config


        Args:
            bucket (str): The S3 bucket name.
            key (str): The S3 object key.

        Raises:
            RuntimeError: If an error occurs during the bucket access check.
        """
        if bucket is None or key is None:
            raise RuntimeError("Input error for deleting the file")
        attempt = 0
        while attempt < max_retries:
            try:
                self.connect_s3()
                self.logger.debug("Deleting s3 key s3://%s/%s", bucket, key)
                if not self.check_s3_key_on_bucket(bucket, key):
                    self.logger.debug("S3 key to be deleted s3://%s/%s does not exist", bucket, key)
                    return
                self.s3_client.delete_object(Bucket=bucket, Key=key)
                self.logger.info("S3 key deleted: s3://%s/%s", bucket, key)
                return
            except (botocore.client.ClientError, botocore.exceptions.BotoCoreError) as e:
                attempt += 1
                if attempt < max_retries:
                    # keep retrying
                    self.disconnect_s3()
                    self.logger.error(
                        f"Failed to delete key s3://{bucket}/{key}: {e} \nRetrying in {S3_RETRY_TIMEOUT} seconds. ",
                    )
                    self.wait_timeout(S3_RETRY_TIMEOUT)
                    continue
                self.logger.exception(f"Failed to delete key s3://{bucket}/{key}. Reason: {e}")
                raise RuntimeError(f"Failed to delete key s3://{bucket}/{key}. Reason: {e}") from e
            except RuntimeError as e:
                self.logger.exception(f"Failed to check the key s3://{bucket}/{key}. Useless to retry. Reason: {e}")
                raise RuntimeError(
                    f"Failed to check the key s3://{bucket}/{key}. Useless to retry. Reason: {e}",
                ) from e
            except Exception as e:
                self.logger.exception(f"Failed to delete key s3://{bucket}/{key}. Reason: {e}")
                raise RuntimeError(f"Failed to delete key s3://{bucket}/{key}. Reason: {e}") from e

    # helper functions

    @staticmethod
    def get_secrets_from_file(secrets, secret_file):
        """Read secrets from a specified file.

        It reads the secrets from .s3cfg or aws credentials files
        This function should not be used in production

        Args:
            secrets (dict): Dictionary to store retrieved secrets.
            secret_file (str): Path to the file containing secrets.
        """
        dict_filled = 0
        with open(secret_file, encoding="utf-8") as aws_credentials_file:
            lines = aws_credentials_file.readlines()
            for line in lines:
                if not secrets["s3endpoint"] and "host_bucket" in line:
                    dict_filled += 1
                    secrets["s3endpoint"] = line.strip().split("=")[1].strip()
                elif not secrets["accesskey"] and "access_key" in line:
                    dict_filled += 1
                    secrets["accesskey"] = line.strip().split("=")[1].strip()
                elif not secrets["secretkey"] and "secret_" in line and "_key" in line:
                    dict_filled += 1
                    secrets["secretkey"] = line.strip().split("=")[1].strip()
                if dict_filled == 3:
                    break

    @staticmethod
    def get_basename(input_path):
        """Get the filename from a full path.

        Args:
            input_path (str): The full path.

        Returns:
            filename (str): The filename.
        """
        path, filename = ntpath.split(input_path)
        return filename or ntpath.basename(path)

    @staticmethod
    def s3_path_parser(s3_url):
        """
        Parses S3 URL to extract bucket, prefix, and file.

        Args:
            s3_url (str): The S3 URL.

        Returns:
            (bucket, prefix, s3_file) (tuple): Tuple containing bucket, prefix, and file.
        """
        s3_data = s3_url.replace("s3://", "").split("/")
        bucket = ""
        start_idx = 0
        if s3_url.startswith("s3://"):
            bucket = s3_data[0]

            start_idx = 1
        prefix = ""
        if start_idx < len(s3_data):
            prefix = "/".join(s3_data[start_idx:-1])
        s3_file = s3_data[-1]
        return bucket, prefix, s3_file

    def files_to_be_downloaded(self, bucket, paths):
        """Create a list with the S3 keys to be downloaded.

        The list will have the s3 keys to be downloaded from the bucket.
        It contains pairs (local_prefix_where_the_file_will_be_downloaded, full_s3_key_path)
        If a s3 key doesn't exist, the pair will be (None, requested_s3_key_path)

        Args:
            bucket (str): The S3 bucket name.
            paths (list): List of S3 object keys.

        Returns:
            list_with_files (list): List of tuples (local_prefix, full_s3_key_path).
        """
        # declaration of the list
        list_with_files: list[Any] = []
        # for each key, identify it as a file or a folder
        # in the case of a folder, the files will be recursively gathered
        for key in paths:
            path = key.strip().lstrip("/")
            s3_files = self.list_s3_files_obj(bucket, path)
            if len(s3_files) == 0:
                self.logger.warning("No key %s found.", path)
                list_with_files.append((None, path))
                continue
            self.logger.debug("total: %s | s3_files = %s", len(s3_files), s3_files)
            basename_part = self.get_basename(path)

            # check if it's a file or a dir
            if len(s3_files) == 1 and path == s3_files[0]:
                # the current key is a file, append it to the list
                list_with_files.append(("", s3_files[0]))
                self.logger.debug("Append files: list_with_files = %s", list_with_files)
            else:
                # the current key is a folder, append all its files (recursively gathered) to the list
                for s3_file in s3_files:
                    split = s3_file.split("/")
                    split_idx = split.index(basename_part)
                    list_with_files.append((os.path.join(*split[split_idx:-1]), s3_file.strip("/")))

        return list_with_files

    def files_to_be_uploaded(self, paths):
        """Creates a list with the local files to be uploaded.

        The list contains pairs (s3_path, absolute_local_file_path)
        If the local file doesn't exist, the pair will be (None, requested_file_to_upload)

        Args:
            paths (list): List of local file paths.

        Returns:
            list_with_files (list): List of tuples (s3_path, absolute_local_file_path).
        """

        list_with_files = []
        for local in paths:
            path = local.strip()
            # check if it is a file
            self.logger.debug("path = %s", path)
            if os.path.isfile(path):
                self.logger.debug("Add %s", path)
                list_with_files.append(("", path))

            elif os.path.isdir(path):
                for root, dir_names, filenames in os.walk(path):
                    for file in filenames:
                        full_file_path = os.path.join(root, file.strip("/"))
                        self.logger.debug("full_file_path = %s | dir_names = %s", full_file_path, dir_names)
                        if not os.path.isfile(full_file_path):
                            continue
                        self.logger.debug(
                            "get_basename(path) = %s | root = %s | replace = %s",
                            self.get_basename(path),
                            root,
                            root.replace(path, ""),
                        )

                        keep_path = os.path.join(self.get_basename(path), root.replace(path, "").strip("/")).strip("/")
                        self.logger.debug("path = %s | keep_path = %s | root = %s", path, keep_path, root)

                        self.logger.debug("Add: %s | %s", keep_path, full_file_path)
                        list_with_files.append((keep_path, full_file_path))
            else:
                self.logger.warning("The path %s is not a directory nor a file, it will not be uploaded", path)

        return list_with_files

    def list_s3_files_obj(self, bucket, prefix):
        """Retrieve the content of an S3 directory.

        Args:
            bucket (str): The S3 bucket name.
            prefix (str): The S3 object key prefix.

        Returns:
            s3_files (list): List containing S3 object keys.
        """

        s3_files = []

        try:
            paginator: Any = self.s3_client.get_paginator("list_objects_v2")
            pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
            for page in pages:
                for item in page.get("Contents", ()):
                    if item is not None:
                        s3_files.append(item["Key"])
        except Exception as error:
            self.logger.exception(f"Exception when trying to list files from s3://{bucket}/{prefix}: {error}")
            raise RuntimeError(f"Listing files from s3://{bucket}/{prefix}") from error

        return s3_files

    def check_bucket_access(self, bucket):
        """Check the accessibility of an S3 bucket.

        Args:
            bucket (str): The S3 bucket name.

        Raises:
            RuntimeError: If an error occurs during the bucket access check.
        """

        try:
            self.connect_s3()
            self.s3_client.head_bucket(Bucket=bucket)
        except botocore.client.ClientError as error:
            # check that it was a 404 vs 403 errors
            # If it was a 404 error, then the bucket does not exist.
            error_code = error.response["Error"]["Code"]
            if error_code == S3_ERR_FORBIDDEN_ACCESS:
                self.logger.exception(f"{bucket} is a private bucket. Forbidden access!")
                raise RuntimeError(f"{bucket} is a private bucket. Forbidden access!") from error
            if error_code == S3_ERR_NOT_FOUND:
                self.logger.exception(f"{bucket} bucket does not exist!")
                raise RuntimeError(f"{bucket} bucket does not exist!") from error
            self.logger.exception(f"Exception when checking the access to {bucket} bucket: {error}")
            raise RuntimeError(f"Exception when checking the access to {bucket} bucket") from error
        except botocore.exceptions.EndpointConnectionError as error:
            self.logger.exception(f"Failed to connect to the endpoint when trying to access {bucket}: {error}")
            raise RuntimeError(f"Failed to connect to the endpoint when trying to access {bucket}!") from error
        except Exception as error:
            self.logger.exception(f"General exception when trying to access bucket {bucket}: {error}")
            raise RuntimeError(f"General exception when trying to access bucket {bucket}") from error

    def check_s3_key_on_bucket(self, bucket, s3_key):
        """Check if the s3 key available in the bucket.

        Args:
            bucket (str): The S3 bucket name.
            s3_key (str): The s3 key that should be checked

        Raises:
            RuntimeError: If an error occurs during the bucket access check or if
                the s3_key is not available.
        """
        try:
            self.connect_s3()
            self.logger.debug(f"Checking for the presence of the s3 key s3://{bucket}/{s3_key}")
            self.s3_client.head_object(Bucket=bucket, Key=s3_key)
        except botocore.client.ClientError as error:
            # check that it was a 404 vs 403 errors
            # If it was a 404 error, then the bucket does not exist.
            error_code = error.response["Error"]["Code"]
            if error_code == S3_ERR_FORBIDDEN_ACCESS:
                self.logger.exception(f"{bucket} is a private bucket. Forbidden access!")
                raise RuntimeError(f"{bucket} is a private bucket. Forbidden access!") from error
            if error_code == S3_ERR_NOT_FOUND:
                self.logger.exception(f"The key s3://{bucket}/{s3_key} does not exist!")
                return False
            self.logger.exception(f"Exception when checking the access to key s3://{bucket}/{s3_key}: {error}")
            raise RuntimeError(f"Exception when checking the access to {bucket} bucket") from error
        except (
            botocore.exceptions.EndpointConnectionError,
            botocore.exceptions.NoCredentialsError,
            botocore.exceptions.PartialCredentialsError,
        ) as error:
            self.logger.exception(f"Failed to connect to the endpoint when trying to access {bucket}: {error}")
            raise RuntimeError(f"Failed to connect to the endpoint when trying to access {bucket}!") from error
        except Exception as error:
            self.logger.exception(f"General exception when trying to access bucket {bucket}: {error}")
            raise RuntimeError(f"General exception when trying to access bucket {bucket}") from error

        return True

    def wait_timeout(self, timeout):
        """
        Wait for a specified timeout duration (minimum 200 ms).

        This function implements a simple timeout mechanism, where it sleeps for 0.2 seconds
        in each iteration until the cumulative sleep time reaches the specified timeout duration.

        Args:
            timeout (float): The total duration to wait in seconds.

        """
        time_cnt = 0.0
        while time_cnt < timeout:
            time.sleep(SLEEP_TIME)
            time_cnt += SLEEP_TIME

    def check_file_overwriting(self, local_file, overwrite):
        """Check if file exists and determine if it should be overwritten.

        Args:
            local_file (str): Path to the local file.
            overwrite (bool): Whether to overwrite the existing file.

        Returns:
            bool (bool): True if the file should be overwritten, False otherwise.

        Note:
        - If the file already exists and the overwrite flag is set to True, the function logs a message,
        deletes the existing file, and returns True.
        - If the file already exists and the overwrite flag is set to False, the function logs a warning
        message, and returns False. In this case, the existing file won't be deleted.
        - If the file doesn't exist, the function returns True.

        """
        if os.path.isfile(local_file):
            if overwrite:  # The file already exists, so delete it first
                self.logger.info(
                    "File %s already exists. Deleting it before downloading",
                    S3StorageHandler.get_basename(local_file),
                )
                os.remove(local_file)
            else:
                self.logger.warning(
                    "File %s already exists. Ignoring (use the overwrite flag if you want to overwrite this file)",
                    S3StorageHandler.get_basename(local_file),
                )
                return False

        return True

    def get_keys_from_s3(self, config: GetKeysFromS3Config) -> list:
        """Download S3 keys specified in the configuration.

        Args:
            config (GetKeysFromS3Config): Configuration for the S3 download.

        Returns:
            List[str]: A list with the S3 keys that couldn't be downloaded.

        Raises:
            Exception: Any unexpected exception raised during the download process.

        The function attempts to download files from S3 according to the provided configuration.
        It returns a list of S3 keys that couldn't be downloaded successfully.

        """

        # check the access to the bucket first, or even if it does exist
        self.check_bucket_access(config.bucket)

        # collection_files: list of files to be downloaded
        #                   the list contains pair objects with the following
        #                   syntax: (local_path_to_be_added_to_the_local_prefix, s3_key)
        #                   the local_path_to_be_added_to_the_local_prefix may be none if the file doesn't exist
        collection_files = self.files_to_be_downloaded(config.bucket, config.s3_files)

        self.logger.debug("collection_files = %s | bucket = %s", collection_files, config.bucket)
        failed_files = []

        for collection_file in collection_files:
            if collection_file[0] is None:
                failed_files.append(collection_file[1])
                continue

            local_path = os.path.join(config.local_prefix, collection_file[0].strip("/"))
            s3_file = collection_file[1]
            # for each file to download, create the local dir (if it does not exist)
            os.makedirs(local_path, exist_ok=True)
            # create the path for local file
            local_file = os.path.join(local_path, self.get_basename(s3_file).strip("/"))

            if not self.check_file_overwriting(local_file, config.overwrite):
                continue
            # download the files
            downloaded = False
            for keep_trying in range(config.max_retries):
                try:
                    self.connect_s3()
                    dwn_start = datetime.now()
                    self.s3_client.download_file(config.bucket, s3_file, local_file)
                    self.logger.debug(
                        "s3://%s/%s downloaded to %s in %s ms",
                        config.bucket,
                        s3_file,
                        local_file,
                        datetime.now() - dwn_start,
                    )
                    downloaded = True
                    break
                except (botocore.client.ClientError, botocore.exceptions.EndpointConnectionError) as error:
                    self.logger.exception(
                        "Error when downloading the file %s. \
Exception: %s. Retrying in %s seconds for %s more times",
                        s3_file,
                        error,
                        DWN_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.disconnect_s3()
                    self.wait_timeout(DWN_S3FILE_RETRY_TIMEOUT)
                except RuntimeError:
                    self.logger.exception(
                        "Error when downloading the file %s. \
Failed to get the s3 client. Retrying in %s seconds for %s more times",
                        s3_file,
                        DWN_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.wait_timeout(DWN_S3FILE_RETRY_TIMEOUT)

            if not downloaded:
                self.logger.error(
                    "Failed to download the file %s. The download was \
retried for %s times. Aborting",
                    s3_file,
                    config.max_retries,
                )
                failed_files.append(s3_file)

        return failed_files

    def put_files_to_s3(self, config: PutFilesToS3Config) -> list:
        """Upload files to S3 according to the provided configuration.

        Args:
            config (PutFilesToS3Config): Configuration for the S3 upload.

        Returns:
            List[str]: A list with the local file paths that couldn't be uploaded.

        Raises:
            Exception: Any unexpected exception raised during the upload process.

        The function attempts to upload files to S3 according to the provided configuration.
        It returns a list of local files that couldn't be uploaded successfully.

        """

        # check the access to the bucket first, or even if it does exist
        self.check_bucket_access(config.bucket)

        collection_files = self.files_to_be_uploaded(config.files)
        failed_files = []

        for collection_file in collection_files:
            if collection_file[0] is None:
                self.logger.error("The file %s can't be uploaded, its s3 prefix is None", collection_file[0])
                failed_files.append(collection_file[1])
                continue

            file_to_be_uploaded = collection_file[1]
            # create the s3 key
            key = os.path.join(config.s3_path, collection_file[0], os.path.basename(file_to_be_uploaded).strip("/"))
            uploaded = False
            for keep_trying in range(config.max_retries):
                try:
                    # get the s3 client
                    self.connect_s3()
                    self.logger.info(
                        "Upload file %s to s3://%s/%s",
                        file_to_be_uploaded,
                        config.bucket,
                        key.lstrip("/"),
                    )

                    self.s3_client.upload_file(file_to_be_uploaded, config.bucket, key)
                    uploaded = True
                    break
                except (
                    botocore.client.ClientError,
                    botocore.exceptions.EndpointConnectionError,
                    boto3.exceptions.S3UploadFailedError,
                ) as error:
                    self.logger.exception(
                        "Error when uploading the file %s. \
Exception: %s. Retrying in %s seconds for %s more times",
                        file_to_be_uploaded,
                        error,
                        UP_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.disconnect_s3()
                    self.wait_timeout(UP_S3FILE_RETRY_TIMEOUT)
                except RuntimeError:
                    self.logger.exception(
                        "Error when uploading the file %s. \
Failed to get the s3 client. Retrying in %s seconds for %s more times",
                        file_to_be_uploaded,
                        UP_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.wait_timeout(UP_S3FILE_RETRY_TIMEOUT)

            if not uploaded:
                self.logger.error(
                    "Failed to upload the file %s. The upload was \
retried for %s times. Aborting",
                    file_to_be_uploaded,
                    config.max_retries,
                )
                failed_files.append(file_to_be_uploaded)

        return failed_files

    def transfer_from_s3_to_s3(self, config: TransferFromS3ToS3Config) -> list:
        """Copy S3 keys specified in the configuration.
        Args:
            config (TransferFromS3ToS3Config): Configuration object containing bucket source, bucket destination,
                      S3 files, maximum retries.

        Returns:
            list: A list of S3 keys that failed to be copied.

        Raises:
            Exception: Any unexpected exception raised during the upload process.
        """
        # check the access to both buckets first, or even if they do exist
        self.check_bucket_access(config.bucket_src)
        self.check_bucket_access(config.bucket_dst)

        # collection_files: list of files to be downloaded
        #                   the list contains pair objects with the following
        #                   syntax: (local_path_to_be_added_to_the_local_prefix, s3_key)

        collection_files = self.files_to_be_downloaded(config.bucket_src, config.s3_files)

        self.logger.debug("collection_files = %s | bucket = %s", collection_files, config.bucket_src)
        failed_files = []
        copy_src = {"Bucket": config.bucket_src, "Key": ""}

        for collection_file in collection_files:
            if collection_file[0] is None:
                failed_files.append(collection_file[1])
                continue

            copied = False
            for keep_trying in range(config.max_retries):
                self.logger.debug(
                    "keep_trying %s | range(config.max_retries) %s ",
                    keep_trying,
                    range(config.max_retries),
                )
                try:
                    self.connect_s3()
                    dwn_start = datetime.now()
                    copy_src["Key"] = collection_file[1]
                    self.logger.debug("copy_src = %s", copy_src)
                    self.s3_client.copy_object(CopySource=copy_src, Bucket=config.bucket_dst, Key=collection_file[1])
                    self.logger.debug(
                        "s3://%s/%s copied to s3://%s/%s in %s ms",
                        config.bucket_src,
                        collection_file[1],
                        config.bucket_dst,
                        collection_file[1],
                        datetime.now() - dwn_start,
                    )
                    if not config.copy_only:
                        self.delete_file_from_s3(config.bucket_src, collection_file[1])
                    copied = True
                    break
                except (botocore.client.ClientError, botocore.exceptions.EndpointConnectionError) as error:
                    self.logger.exception(
                        "Error when copying the file s3://%s/%s to s3://%s. \
Exception: %s. Retrying in %s seconds for %s more times",
                        config.bucket_src,
                        collection_file[1],
                        config.bucket_dst,
                        error,
                        DWN_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.disconnect_s3()
                    self.wait_timeout(DWN_S3FILE_RETRY_TIMEOUT)
                except RuntimeError:
                    self.logger.exception(
                        "Error when copying the file s3://%s/%s to s3://%s. \
Failed to get the s3 client. Retrying in %s seconds for %s more times",
                        config.bucket_src,
                        collection_file[1],
                        config.bucket_dst,
                        DWN_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.wait_timeout(DWN_S3FILE_RETRY_TIMEOUT)

            if not copied:
                self.logger.error(
                    "Failed to copy the file s3://%s/%s to s3://%s. The copy was \
retried for %s times. Aborting",
                    config.bucket_src,
                    collection_file[1],
                    config.bucket_dst,
                    config.max_retries,
                )
                failed_files.append(collection_file[1])

        return failed_files

    def s3_streaming_upload(  # pylint: disable=too-many-locals
        self,
        stream_url: str,
        trusted_domains: list[str],
        auth: Any,
        bucket: str,
        key: str,
        max_retries=S3_MAX_RETRIES,
    ):
        """
        Upload a file to an S3 bucket using HTTP byte-streaming with retries.

        This method retrieves data from `stream_url` in chunks and uploads it to the specified S3 bucket
        (`bucket`) under the specified key (`key`). It includes retry logic for network and S3 client errors,
        with exponential backoff between retries. The method handles errors during both the HTTP request and the
        S3 upload process, raising a `RuntimeError` if the retries are exhausted without success.

        Args:
            stream_url (str): The URL of the file to be streamed and uploaded.
            trusted_domains (list): List of allowed hosts for redirection in case of change of protocol (HTTP <> HTTPS).
            auth (Any): Authentication credentials for the HTTP request (if required).
            bucket (str): The name of the target S3 bucket.
            key (str): The S3 object key (file path) to store the streamed file.
            max_retries (int, optional): The maximum number of retry attempts if an error occurs
                (default is `S3_MAX_RETRIES`).

        Raises:
            RuntimeError: If there is a failure during the streaming upload process, either due to the HTTP request
                or the S3 upload, after exhausting all retries.

        Process:
            1. The function attempts to download the file from `stream_url` using streaming and upload it to S3.
            2. It redirects the url request by overriding the default should_strip_auth, see CustomSessionRedirect
            3. If an error occurs (e.g., connection error, S3 client error), it retries the operation with exponential
                backoff.
            4. The default chunk size for streaming is set to 64KB, and multipart upload configuration is used for
            large files.
            5. After `max_retries` attempts, if the upload is unsuccessful, a `RuntimeError` is raised.

        Retry Mechanism:
            - Retries occur for network-related errors (`RequestException`) or S3 client errors
                (`ClientError`, `BotoCoreError`).
            - The function waits before retrying, with the delay time increasing exponentially
                (based on the `backoff_factor`).
            - The backoff formula is `backoff_factor * (2 ** (attempt - 1))`, allowing progressively
                longer wait times between retries.

        Exception Handling:
            - HTTP errors such as timeouts or bad responses (4xx, 5xx) are handled using
                `requests.exceptions.RequestException`.
            - S3 client errors such as `ClientError` and `BotoCoreError` are captured, logged, and retried.
            - Any other unexpected errors are caught and re-raised as `RuntimeError`.
        """
        if bucket is None or key is None:
            raise RuntimeError(f"Input error for streaming the file from {stream_url} to s3://{bucket}/{key}")
        timeout: tuple[int, int] = (HTTP_CONNECTION_TIMEOUT, HTTP_READ_TIMEOUT)
        backoff_factor = S3_RETRY_TIMEOUT
        attempt = 0
        # Prepare the request
        session = CustomSessionRedirect(trusted_domains)
        self.logger.debug(f"trusted_domains = {trusted_domains}")
        request = requests.Request(
            method="GET",
            url=stream_url,
            auth=auth,
        )
        prepared_request = session.prepare_request(request)
        while attempt < max_retries:
            try:
                self.connect_s3()
                self.logger.info(f"Starting the streaming of {stream_url} to s3://{bucket}/{key}")
                with session.send(prepared_request, stream=True, timeout=timeout) as response:
                    # with requests.get(stream_url, stream=True, auth=auth, timeout=timeout) as response:
                    self.logger.debug(f"Request headers: {response.request.headers}")
                    response.raise_for_status()  # Raise an error for bad responses (4xx and 5xx)

                    # Default chunksize is set to 64Kb, can be manually increased
                    chunk_size = 64 * 1024  # 64kb
                    with response.raw as data_stream:
                        self.s3_client.upload_fileobj(
                            data_stream,
                            bucket,
                            key,
                            Config=boto3.s3.transfer.TransferConfig(multipart_threshold=chunk_size * 2),
                        )
                    self.logger.info(f"Successfully uploaded to s3://{bucket}/{key}")
                    return
            except (
                requests.exceptions.RequestException,
                botocore.client.ClientError,
                botocore.exceptions.BotoCoreError,
            ) as e:
                attempt += 1
                if attempt < max_retries:
                    # keep retrying
                    self.disconnect_s3()
                    delay = backoff_factor * (2 ** (attempt - 1))
                    self.logger.error(
                        f"Failed to stream the file from {stream_url} to s3://{bucket}/{key}: {e}"
                        f" Retrying in {delay} seconds. ",
                    )
                    self.wait_timeout(S3_RETRY_TIMEOUT)
                    continue
                self.logger.exception(
                    f"Failed to stream the file from {stream_url} to s3://{bucket}/{key}: {e}."
                    f"\nTried for {max_retries} times, giving up",
                )
                raise RuntimeError(f"Failed to stream the file from {stream_url} to s3://{bucket}/{key}: {e}.") from e
            except Exception as e:
                self.logger.exception(
                    "General exception. " f"Failed to stream the file from {stream_url} to s3://{bucket}/{key}: {e}",
                )
                raise RuntimeError(
                    "General exception. " f"Failed to stream the file from {stream_url} to s3://{bucket}/{key}: {e}",
                ) from e

__get_s3_client(access_key_id, secret_access_key, endpoint_url, region_name)

Retrieve or create an S3 client instance.

Parameters:

Name Type Description Default
access_key_id str

The access key ID for S3 authentication.

required
secret_access_key str

The secret access key for S3 authentication.

required
endpoint_url str

The endpoint URL for the S3 service.

required
region_name str

The region name.

required

Returns:

Name Type Description
client boto3

An S3 client instance.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def __get_s3_client(self, access_key_id, secret_access_key, endpoint_url, region_name):
    """Retrieve or create an S3 client instance.

    Args:
        access_key_id (str): The access key ID for S3 authentication.
        secret_access_key (str): The secret access key for S3 authentication.
        endpoint_url (str): The endpoint URL for the S3 service.
        region_name (str): The region name.

    Returns:
        client (boto3): An S3 client instance.
    """

    client_config = botocore.config.Config(
        max_pool_connections=100,
        # timeout for connection
        connect_timeout=5,
        # attempts in trying connection
        # note:  the default behaviour of boto3 is retrying
        # connections multiple times and exponentially backing off in between
        retries={"total_max_attempts": S3_PROTOCOL_MAX_ATTEMPTS},
    )
    try:
        return boto3.client(
            "s3",
            aws_access_key_id=access_key_id,
            aws_secret_access_key=secret_access_key,
            endpoint_url=endpoint_url,
            region_name=region_name,
            config=client_config,
        )

    except Exception as e:
        self.logger.exception(f"Client error exception: {e}")
        raise RuntimeError("Client error exception ") from e

__init__(access_key_id, secret_access_key, endpoint_url, region_name)

Initialize the S3StorageHandler instance.

Parameters:

Name Type Description Default
access_key_id str

The access key ID for S3 authentication.

required
secret_access_key str

The secret access key for S3 authentication.

required
endpoint_url str

The endpoint URL for the S3 service.

required
region_name str

The region name.

required

Raises:

Type Description
RuntimeError

If the connection to the S3 storage cannot be established.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def __init__(self, access_key_id, secret_access_key, endpoint_url, region_name):
    """Initialize the S3StorageHandler instance.

    Args:
        access_key_id (str): The access key ID for S3 authentication.
        secret_access_key (str): The secret access key for S3 authentication.
        endpoint_url (str): The endpoint URL for the S3 service.
        region_name (str): The region name.

    Raises:
        RuntimeError: If the connection to the S3 storage cannot be established.
    """
    self.logger = Logging.default(__name__)

    self.access_key_id = access_key_id
    self.secret_access_key = secret_access_key
    self.endpoint_url = endpoint_url
    self.region_name = region_name
    self.s3_client: boto3.client = None
    self.connect_s3()
    # Suppress botocore debug messages
    logging.getLogger("botocore").setLevel(logging.INFO)
    logging.getLogger("boto3").setLevel(logging.INFO)
    logging.getLogger("urllib3").setLevel(logging.INFO)
    self.logger.debug("S3StorageHandler created !")

check_bucket_access(bucket)

Check the accessibility of an S3 bucket.

Parameters:

Name Type Description Default
bucket str

The S3 bucket name.

required

Raises:

Type Description
RuntimeError

If an error occurs during the bucket access check.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
def check_bucket_access(self, bucket):
    """Check the accessibility of an S3 bucket.

    Args:
        bucket (str): The S3 bucket name.

    Raises:
        RuntimeError: If an error occurs during the bucket access check.
    """

    try:
        self.connect_s3()
        self.s3_client.head_bucket(Bucket=bucket)
    except botocore.client.ClientError as error:
        # check that it was a 404 vs 403 errors
        # If it was a 404 error, then the bucket does not exist.
        error_code = error.response["Error"]["Code"]
        if error_code == S3_ERR_FORBIDDEN_ACCESS:
            self.logger.exception(f"{bucket} is a private bucket. Forbidden access!")
            raise RuntimeError(f"{bucket} is a private bucket. Forbidden access!") from error
        if error_code == S3_ERR_NOT_FOUND:
            self.logger.exception(f"{bucket} bucket does not exist!")
            raise RuntimeError(f"{bucket} bucket does not exist!") from error
        self.logger.exception(f"Exception when checking the access to {bucket} bucket: {error}")
        raise RuntimeError(f"Exception when checking the access to {bucket} bucket") from error
    except botocore.exceptions.EndpointConnectionError as error:
        self.logger.exception(f"Failed to connect to the endpoint when trying to access {bucket}: {error}")
        raise RuntimeError(f"Failed to connect to the endpoint when trying to access {bucket}!") from error
    except Exception as error:
        self.logger.exception(f"General exception when trying to access bucket {bucket}: {error}")
        raise RuntimeError(f"General exception when trying to access bucket {bucket}") from error

check_file_overwriting(local_file, overwrite)

Check if file exists and determine if it should be overwritten.

Parameters:

Name Type Description Default
local_file str

Path to the local file.

required
overwrite bool

Whether to overwrite the existing file.

required

Returns:

Name Type Description
bool bool

True if the file should be overwritten, False otherwise.

Note: - If the file already exists and the overwrite flag is set to True, the function logs a message, deletes the existing file, and returns True. - If the file already exists and the overwrite flag is set to False, the function logs a warning message, and returns False. In this case, the existing file won't be deleted. - If the file doesn't exist, the function returns True.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
def check_file_overwriting(self, local_file, overwrite):
    """Check if file exists and determine if it should be overwritten.

    Args:
        local_file (str): Path to the local file.
        overwrite (bool): Whether to overwrite the existing file.

    Returns:
        bool (bool): True if the file should be overwritten, False otherwise.

    Note:
    - If the file already exists and the overwrite flag is set to True, the function logs a message,
    deletes the existing file, and returns True.
    - If the file already exists and the overwrite flag is set to False, the function logs a warning
    message, and returns False. In this case, the existing file won't be deleted.
    - If the file doesn't exist, the function returns True.

    """
    if os.path.isfile(local_file):
        if overwrite:  # The file already exists, so delete it first
            self.logger.info(
                "File %s already exists. Deleting it before downloading",
                S3StorageHandler.get_basename(local_file),
            )
            os.remove(local_file)
        else:
            self.logger.warning(
                "File %s already exists. Ignoring (use the overwrite flag if you want to overwrite this file)",
                S3StorageHandler.get_basename(local_file),
            )
            return False

    return True

check_s3_key_on_bucket(bucket, s3_key)

Check if the s3 key available in the bucket.

Parameters:

Name Type Description Default
bucket str

The S3 bucket name.

required
s3_key str

The s3 key that should be checked

required

Raises:

Type Description
RuntimeError

If an error occurs during the bucket access check or if the s3_key is not available.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
def check_s3_key_on_bucket(self, bucket, s3_key):
    """Check if the s3 key available in the bucket.

    Args:
        bucket (str): The S3 bucket name.
        s3_key (str): The s3 key that should be checked

    Raises:
        RuntimeError: If an error occurs during the bucket access check or if
            the s3_key is not available.
    """
    try:
        self.connect_s3()
        self.logger.debug(f"Checking for the presence of the s3 key s3://{bucket}/{s3_key}")
        self.s3_client.head_object(Bucket=bucket, Key=s3_key)
    except botocore.client.ClientError as error:
        # check that it was a 404 vs 403 errors
        # If it was a 404 error, then the bucket does not exist.
        error_code = error.response["Error"]["Code"]
        if error_code == S3_ERR_FORBIDDEN_ACCESS:
            self.logger.exception(f"{bucket} is a private bucket. Forbidden access!")
            raise RuntimeError(f"{bucket} is a private bucket. Forbidden access!") from error
        if error_code == S3_ERR_NOT_FOUND:
            self.logger.exception(f"The key s3://{bucket}/{s3_key} does not exist!")
            return False
        self.logger.exception(f"Exception when checking the access to key s3://{bucket}/{s3_key}: {error}")
        raise RuntimeError(f"Exception when checking the access to {bucket} bucket") from error
    except (
        botocore.exceptions.EndpointConnectionError,
        botocore.exceptions.NoCredentialsError,
        botocore.exceptions.PartialCredentialsError,
    ) as error:
        self.logger.exception(f"Failed to connect to the endpoint when trying to access {bucket}: {error}")
        raise RuntimeError(f"Failed to connect to the endpoint when trying to access {bucket}!") from error
    except Exception as error:
        self.logger.exception(f"General exception when trying to access bucket {bucket}: {error}")
        raise RuntimeError(f"General exception when trying to access bucket {bucket}") from error

    return True

connect_s3()

Establish a connection to the S3 service.

If the S3 client is not already instantiated, this method calls the private get_s3_client method to create an S3 client instance using the provided credentials and configuration (see __init).

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
222
223
224
225
226
227
228
229
230
231
232
233
234
def connect_s3(self):
    """Establish a connection to the S3 service.

    If the S3 client is not already instantiated, this method calls the private __get_s3_client
    method to create an S3 client instance using the provided credentials and configuration (see __init__).
    """
    if self.s3_client is None:
        self.s3_client = self.__get_s3_client(
            self.access_key_id,
            self.secret_access_key,
            self.endpoint_url,
            self.region_name,
        )

delete_file_from_s3(bucket, key, max_retries=S3_MAX_RETRIES)

Delete a file from S3. The functionality implies a retry mechanism at the application level, which is different than the retry mechanism from the s3 protocol level, with "retries" parameter from the s3 Config

Parameters:

Name Type Description Default
bucket str

The S3 bucket name.

required
key str

The S3 object key.

required

Raises:

Type Description
RuntimeError

If an error occurs during the bucket access check.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def delete_file_from_s3(self, bucket, key, max_retries=S3_MAX_RETRIES):
    """Delete a file from S3.
    The functionality implies a retry mechanism at the application level, which is different
    than the retry mechanism from the s3 protocol level, with "retries" parameter from the s3 Config


    Args:
        bucket (str): The S3 bucket name.
        key (str): The S3 object key.

    Raises:
        RuntimeError: If an error occurs during the bucket access check.
    """
    if bucket is None or key is None:
        raise RuntimeError("Input error for deleting the file")
    attempt = 0
    while attempt < max_retries:
        try:
            self.connect_s3()
            self.logger.debug("Deleting s3 key s3://%s/%s", bucket, key)
            if not self.check_s3_key_on_bucket(bucket, key):
                self.logger.debug("S3 key to be deleted s3://%s/%s does not exist", bucket, key)
                return
            self.s3_client.delete_object(Bucket=bucket, Key=key)
            self.logger.info("S3 key deleted: s3://%s/%s", bucket, key)
            return
        except (botocore.client.ClientError, botocore.exceptions.BotoCoreError) as e:
            attempt += 1
            if attempt < max_retries:
                # keep retrying
                self.disconnect_s3()
                self.logger.error(
                    f"Failed to delete key s3://{bucket}/{key}: {e} \nRetrying in {S3_RETRY_TIMEOUT} seconds. ",
                )
                self.wait_timeout(S3_RETRY_TIMEOUT)
                continue
            self.logger.exception(f"Failed to delete key s3://{bucket}/{key}. Reason: {e}")
            raise RuntimeError(f"Failed to delete key s3://{bucket}/{key}. Reason: {e}") from e
        except RuntimeError as e:
            self.logger.exception(f"Failed to check the key s3://{bucket}/{key}. Useless to retry. Reason: {e}")
            raise RuntimeError(
                f"Failed to check the key s3://{bucket}/{key}. Useless to retry. Reason: {e}",
            ) from e
        except Exception as e:
            self.logger.exception(f"Failed to delete key s3://{bucket}/{key}. Reason: {e}")
            raise RuntimeError(f"Failed to delete key s3://{bucket}/{key}. Reason: {e}") from e

disconnect_s3()

Close the connection to the S3 service.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
236
237
238
239
240
241
def disconnect_s3(self):
    """Close the connection to the S3 service."""
    if self.s3_client is None:
        return
    self.s3_client.close()
    self.s3_client = None

files_to_be_downloaded(bucket, paths)

Create a list with the S3 keys to be downloaded.

The list will have the s3 keys to be downloaded from the bucket. It contains pairs (local_prefix_where_the_file_will_be_downloaded, full_s3_key_path) If a s3 key doesn't exist, the pair will be (None, requested_s3_key_path)

Parameters:

Name Type Description Default
bucket str

The S3 bucket name.

required
paths list

List of S3 object keys.

required

Returns:

Name Type Description
list_with_files list

List of tuples (local_prefix, full_s3_key_path).

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def files_to_be_downloaded(self, bucket, paths):
    """Create a list with the S3 keys to be downloaded.

    The list will have the s3 keys to be downloaded from the bucket.
    It contains pairs (local_prefix_where_the_file_will_be_downloaded, full_s3_key_path)
    If a s3 key doesn't exist, the pair will be (None, requested_s3_key_path)

    Args:
        bucket (str): The S3 bucket name.
        paths (list): List of S3 object keys.

    Returns:
        list_with_files (list): List of tuples (local_prefix, full_s3_key_path).
    """
    # declaration of the list
    list_with_files: list[Any] = []
    # for each key, identify it as a file or a folder
    # in the case of a folder, the files will be recursively gathered
    for key in paths:
        path = key.strip().lstrip("/")
        s3_files = self.list_s3_files_obj(bucket, path)
        if len(s3_files) == 0:
            self.logger.warning("No key %s found.", path)
            list_with_files.append((None, path))
            continue
        self.logger.debug("total: %s | s3_files = %s", len(s3_files), s3_files)
        basename_part = self.get_basename(path)

        # check if it's a file or a dir
        if len(s3_files) == 1 and path == s3_files[0]:
            # the current key is a file, append it to the list
            list_with_files.append(("", s3_files[0]))
            self.logger.debug("Append files: list_with_files = %s", list_with_files)
        else:
            # the current key is a folder, append all its files (recursively gathered) to the list
            for s3_file in s3_files:
                split = s3_file.split("/")
                split_idx = split.index(basename_part)
                list_with_files.append((os.path.join(*split[split_idx:-1]), s3_file.strip("/")))

    return list_with_files

files_to_be_uploaded(paths)

Creates a list with the local files to be uploaded.

The list contains pairs (s3_path, absolute_local_file_path) If the local file doesn't exist, the pair will be (None, requested_file_to_upload)

Parameters:

Name Type Description Default
paths list

List of local file paths.

required

Returns:

Name Type Description
list_with_files list

List of tuples (s3_path, absolute_local_file_path).

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
def files_to_be_uploaded(self, paths):
    """Creates a list with the local files to be uploaded.

    The list contains pairs (s3_path, absolute_local_file_path)
    If the local file doesn't exist, the pair will be (None, requested_file_to_upload)

    Args:
        paths (list): List of local file paths.

    Returns:
        list_with_files (list): List of tuples (s3_path, absolute_local_file_path).
    """

    list_with_files = []
    for local in paths:
        path = local.strip()
        # check if it is a file
        self.logger.debug("path = %s", path)
        if os.path.isfile(path):
            self.logger.debug("Add %s", path)
            list_with_files.append(("", path))

        elif os.path.isdir(path):
            for root, dir_names, filenames in os.walk(path):
                for file in filenames:
                    full_file_path = os.path.join(root, file.strip("/"))
                    self.logger.debug("full_file_path = %s | dir_names = %s", full_file_path, dir_names)
                    if not os.path.isfile(full_file_path):
                        continue
                    self.logger.debug(
                        "get_basename(path) = %s | root = %s | replace = %s",
                        self.get_basename(path),
                        root,
                        root.replace(path, ""),
                    )

                    keep_path = os.path.join(self.get_basename(path), root.replace(path, "").strip("/")).strip("/")
                    self.logger.debug("path = %s | keep_path = %s | root = %s", path, keep_path, root)

                    self.logger.debug("Add: %s | %s", keep_path, full_file_path)
                    list_with_files.append((keep_path, full_file_path))
        else:
            self.logger.warning("The path %s is not a directory nor a file, it will not be uploaded", path)

    return list_with_files

get_basename(input_path) staticmethod

Get the filename from a full path.

Parameters:

Name Type Description Default
input_path str

The full path.

required

Returns:

Name Type Description
filename str

The filename.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
319
320
321
322
323
324
325
326
327
328
329
330
@staticmethod
def get_basename(input_path):
    """Get the filename from a full path.

    Args:
        input_path (str): The full path.

    Returns:
        filename (str): The filename.
    """
    path, filename = ntpath.split(input_path)
    return filename or ntpath.basename(path)

get_keys_from_s3(config)

Download S3 keys specified in the configuration.

Parameters:

Name Type Description Default
config GetKeysFromS3Config

Configuration for the S3 download.

required

Returns:

Type Description
list

List[str]: A list with the S3 keys that couldn't be downloaded.

Raises:

Type Description
Exception

Any unexpected exception raised during the download process.

The function attempts to download files from S3 according to the provided configuration. It returns a list of S3 keys that couldn't be downloaded successfully.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
    def get_keys_from_s3(self, config: GetKeysFromS3Config) -> list:
        """Download S3 keys specified in the configuration.

        Args:
            config (GetKeysFromS3Config): Configuration for the S3 download.

        Returns:
            List[str]: A list with the S3 keys that couldn't be downloaded.

        Raises:
            Exception: Any unexpected exception raised during the download process.

        The function attempts to download files from S3 according to the provided configuration.
        It returns a list of S3 keys that couldn't be downloaded successfully.

        """

        # check the access to the bucket first, or even if it does exist
        self.check_bucket_access(config.bucket)

        # collection_files: list of files to be downloaded
        #                   the list contains pair objects with the following
        #                   syntax: (local_path_to_be_added_to_the_local_prefix, s3_key)
        #                   the local_path_to_be_added_to_the_local_prefix may be none if the file doesn't exist
        collection_files = self.files_to_be_downloaded(config.bucket, config.s3_files)

        self.logger.debug("collection_files = %s | bucket = %s", collection_files, config.bucket)
        failed_files = []

        for collection_file in collection_files:
            if collection_file[0] is None:
                failed_files.append(collection_file[1])
                continue

            local_path = os.path.join(config.local_prefix, collection_file[0].strip("/"))
            s3_file = collection_file[1]
            # for each file to download, create the local dir (if it does not exist)
            os.makedirs(local_path, exist_ok=True)
            # create the path for local file
            local_file = os.path.join(local_path, self.get_basename(s3_file).strip("/"))

            if not self.check_file_overwriting(local_file, config.overwrite):
                continue
            # download the files
            downloaded = False
            for keep_trying in range(config.max_retries):
                try:
                    self.connect_s3()
                    dwn_start = datetime.now()
                    self.s3_client.download_file(config.bucket, s3_file, local_file)
                    self.logger.debug(
                        "s3://%s/%s downloaded to %s in %s ms",
                        config.bucket,
                        s3_file,
                        local_file,
                        datetime.now() - dwn_start,
                    )
                    downloaded = True
                    break
                except (botocore.client.ClientError, botocore.exceptions.EndpointConnectionError) as error:
                    self.logger.exception(
                        "Error when downloading the file %s. \
Exception: %s. Retrying in %s seconds for %s more times",
                        s3_file,
                        error,
                        DWN_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.disconnect_s3()
                    self.wait_timeout(DWN_S3FILE_RETRY_TIMEOUT)
                except RuntimeError:
                    self.logger.exception(
                        "Error when downloading the file %s. \
Failed to get the s3 client. Retrying in %s seconds for %s more times",
                        s3_file,
                        DWN_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.wait_timeout(DWN_S3FILE_RETRY_TIMEOUT)

            if not downloaded:
                self.logger.error(
                    "Failed to download the file %s. The download was \
retried for %s times. Aborting",
                    s3_file,
                    config.max_retries,
                )
                failed_files.append(s3_file)

        return failed_files

get_secrets_from_file(secrets, secret_file) staticmethod

Read secrets from a specified file.

It reads the secrets from .s3cfg or aws credentials files This function should not be used in production

Parameters:

Name Type Description Default
secrets dict

Dictionary to store retrieved secrets.

required
secret_file str

Path to the file containing secrets.

required
Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
@staticmethod
def get_secrets_from_file(secrets, secret_file):
    """Read secrets from a specified file.

    It reads the secrets from .s3cfg or aws credentials files
    This function should not be used in production

    Args:
        secrets (dict): Dictionary to store retrieved secrets.
        secret_file (str): Path to the file containing secrets.
    """
    dict_filled = 0
    with open(secret_file, encoding="utf-8") as aws_credentials_file:
        lines = aws_credentials_file.readlines()
        for line in lines:
            if not secrets["s3endpoint"] and "host_bucket" in line:
                dict_filled += 1
                secrets["s3endpoint"] = line.strip().split("=")[1].strip()
            elif not secrets["accesskey"] and "access_key" in line:
                dict_filled += 1
                secrets["accesskey"] = line.strip().split("=")[1].strip()
            elif not secrets["secretkey"] and "secret_" in line and "_key" in line:
                dict_filled += 1
                secrets["secretkey"] = line.strip().split("=")[1].strip()
            if dict_filled == 3:
                break

list_s3_files_obj(bucket, prefix)

Retrieve the content of an S3 directory.

Parameters:

Name Type Description Default
bucket str

The S3 bucket name.

required
prefix str

The S3 object key prefix.

required

Returns:

Name Type Description
s3_files list

List containing S3 object keys.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def list_s3_files_obj(self, bucket, prefix):
    """Retrieve the content of an S3 directory.

    Args:
        bucket (str): The S3 bucket name.
        prefix (str): The S3 object key prefix.

    Returns:
        s3_files (list): List containing S3 object keys.
    """

    s3_files = []

    try:
        paginator: Any = self.s3_client.get_paginator("list_objects_v2")
        pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
        for page in pages:
            for item in page.get("Contents", ()):
                if item is not None:
                    s3_files.append(item["Key"])
    except Exception as error:
        self.logger.exception(f"Exception when trying to list files from s3://{bucket}/{prefix}: {error}")
        raise RuntimeError(f"Listing files from s3://{bucket}/{prefix}") from error

    return s3_files

put_files_to_s3(config)

Upload files to S3 according to the provided configuration.

Parameters:

Name Type Description Default
config PutFilesToS3Config

Configuration for the S3 upload.

required

Returns:

Type Description
list

List[str]: A list with the local file paths that couldn't be uploaded.

Raises:

Type Description
Exception

Any unexpected exception raised during the upload process.

The function attempts to upload files to S3 according to the provided configuration. It returns a list of local files that couldn't be uploaded successfully.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
    def put_files_to_s3(self, config: PutFilesToS3Config) -> list:
        """Upload files to S3 according to the provided configuration.

        Args:
            config (PutFilesToS3Config): Configuration for the S3 upload.

        Returns:
            List[str]: A list with the local file paths that couldn't be uploaded.

        Raises:
            Exception: Any unexpected exception raised during the upload process.

        The function attempts to upload files to S3 according to the provided configuration.
        It returns a list of local files that couldn't be uploaded successfully.

        """

        # check the access to the bucket first, or even if it does exist
        self.check_bucket_access(config.bucket)

        collection_files = self.files_to_be_uploaded(config.files)
        failed_files = []

        for collection_file in collection_files:
            if collection_file[0] is None:
                self.logger.error("The file %s can't be uploaded, its s3 prefix is None", collection_file[0])
                failed_files.append(collection_file[1])
                continue

            file_to_be_uploaded = collection_file[1]
            # create the s3 key
            key = os.path.join(config.s3_path, collection_file[0], os.path.basename(file_to_be_uploaded).strip("/"))
            uploaded = False
            for keep_trying in range(config.max_retries):
                try:
                    # get the s3 client
                    self.connect_s3()
                    self.logger.info(
                        "Upload file %s to s3://%s/%s",
                        file_to_be_uploaded,
                        config.bucket,
                        key.lstrip("/"),
                    )

                    self.s3_client.upload_file(file_to_be_uploaded, config.bucket, key)
                    uploaded = True
                    break
                except (
                    botocore.client.ClientError,
                    botocore.exceptions.EndpointConnectionError,
                    boto3.exceptions.S3UploadFailedError,
                ) as error:
                    self.logger.exception(
                        "Error when uploading the file %s. \
Exception: %s. Retrying in %s seconds for %s more times",
                        file_to_be_uploaded,
                        error,
                        UP_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.disconnect_s3()
                    self.wait_timeout(UP_S3FILE_RETRY_TIMEOUT)
                except RuntimeError:
                    self.logger.exception(
                        "Error when uploading the file %s. \
Failed to get the s3 client. Retrying in %s seconds for %s more times",
                        file_to_be_uploaded,
                        UP_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.wait_timeout(UP_S3FILE_RETRY_TIMEOUT)

            if not uploaded:
                self.logger.error(
                    "Failed to upload the file %s. The upload was \
retried for %s times. Aborting",
                    file_to_be_uploaded,
                    config.max_retries,
                )
                failed_files.append(file_to_be_uploaded)

        return failed_files

s3_path_parser(s3_url) staticmethod

Parses S3 URL to extract bucket, prefix, and file.

Parameters:

Name Type Description Default
s3_url str

The S3 URL.

required

Returns:

Type Description
bucket, prefix, s3_file) (tuple

Tuple containing bucket, prefix, and file.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
@staticmethod
def s3_path_parser(s3_url):
    """
    Parses S3 URL to extract bucket, prefix, and file.

    Args:
        s3_url (str): The S3 URL.

    Returns:
        (bucket, prefix, s3_file) (tuple): Tuple containing bucket, prefix, and file.
    """
    s3_data = s3_url.replace("s3://", "").split("/")
    bucket = ""
    start_idx = 0
    if s3_url.startswith("s3://"):
        bucket = s3_data[0]

        start_idx = 1
    prefix = ""
    if start_idx < len(s3_data):
        prefix = "/".join(s3_data[start_idx:-1])
    s3_file = s3_data[-1]
    return bucket, prefix, s3_file

s3_streaming_upload(stream_url, trusted_domains, auth, bucket, key, max_retries=S3_MAX_RETRIES)

Upload a file to an S3 bucket using HTTP byte-streaming with retries.

This method retrieves data from stream_url in chunks and uploads it to the specified S3 bucket (bucket) under the specified key (key). It includes retry logic for network and S3 client errors, with exponential backoff between retries. The method handles errors during both the HTTP request and the S3 upload process, raising a RuntimeError if the retries are exhausted without success.

Parameters:

Name Type Description Default
stream_url str

The URL of the file to be streamed and uploaded.

required
trusted_domains list

List of allowed hosts for redirection in case of change of protocol (HTTP <> HTTPS).

required
auth Any

Authentication credentials for the HTTP request (if required).

required
bucket str

The name of the target S3 bucket.

required
key str

The S3 object key (file path) to store the streamed file.

required
max_retries int

The maximum number of retry attempts if an error occurs (default is S3_MAX_RETRIES).

S3_MAX_RETRIES

Raises:

Type Description
RuntimeError

If there is a failure during the streaming upload process, either due to the HTTP request or the S3 upload, after exhausting all retries.

Process
  1. The function attempts to download the file from stream_url using streaming and upload it to S3.
  2. It redirects the url request by overriding the default should_strip_auth, see CustomSessionRedirect
  3. If an error occurs (e.g., connection error, S3 client error), it retries the operation with exponential backoff.
  4. The default chunk size for streaming is set to 64KB, and multipart upload configuration is used for large files.
  5. After max_retries attempts, if the upload is unsuccessful, a RuntimeError is raised.
Retry Mechanism
  • Retries occur for network-related errors (RequestException) or S3 client errors (ClientError, BotoCoreError).
  • The function waits before retrying, with the delay time increasing exponentially (based on the backoff_factor).
  • The backoff formula is backoff_factor * (2 ** (attempt - 1)), allowing progressively longer wait times between retries.
Exception Handling
  • HTTP errors such as timeouts or bad responses (4xx, 5xx) are handled using requests.exceptions.RequestException.
  • S3 client errors such as ClientError and BotoCoreError are captured, logged, and retried.
  • Any other unexpected errors are caught and re-raised as RuntimeError.
Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
def s3_streaming_upload(  # pylint: disable=too-many-locals
    self,
    stream_url: str,
    trusted_domains: list[str],
    auth: Any,
    bucket: str,
    key: str,
    max_retries=S3_MAX_RETRIES,
):
    """
    Upload a file to an S3 bucket using HTTP byte-streaming with retries.

    This method retrieves data from `stream_url` in chunks and uploads it to the specified S3 bucket
    (`bucket`) under the specified key (`key`). It includes retry logic for network and S3 client errors,
    with exponential backoff between retries. The method handles errors during both the HTTP request and the
    S3 upload process, raising a `RuntimeError` if the retries are exhausted without success.

    Args:
        stream_url (str): The URL of the file to be streamed and uploaded.
        trusted_domains (list): List of allowed hosts for redirection in case of change of protocol (HTTP <> HTTPS).
        auth (Any): Authentication credentials for the HTTP request (if required).
        bucket (str): The name of the target S3 bucket.
        key (str): The S3 object key (file path) to store the streamed file.
        max_retries (int, optional): The maximum number of retry attempts if an error occurs
            (default is `S3_MAX_RETRIES`).

    Raises:
        RuntimeError: If there is a failure during the streaming upload process, either due to the HTTP request
            or the S3 upload, after exhausting all retries.

    Process:
        1. The function attempts to download the file from `stream_url` using streaming and upload it to S3.
        2. It redirects the url request by overriding the default should_strip_auth, see CustomSessionRedirect
        3. If an error occurs (e.g., connection error, S3 client error), it retries the operation with exponential
            backoff.
        4. The default chunk size for streaming is set to 64KB, and multipart upload configuration is used for
        large files.
        5. After `max_retries` attempts, if the upload is unsuccessful, a `RuntimeError` is raised.

    Retry Mechanism:
        - Retries occur for network-related errors (`RequestException`) or S3 client errors
            (`ClientError`, `BotoCoreError`).
        - The function waits before retrying, with the delay time increasing exponentially
            (based on the `backoff_factor`).
        - The backoff formula is `backoff_factor * (2 ** (attempt - 1))`, allowing progressively
            longer wait times between retries.

    Exception Handling:
        - HTTP errors such as timeouts or bad responses (4xx, 5xx) are handled using
            `requests.exceptions.RequestException`.
        - S3 client errors such as `ClientError` and `BotoCoreError` are captured, logged, and retried.
        - Any other unexpected errors are caught and re-raised as `RuntimeError`.
    """
    if bucket is None or key is None:
        raise RuntimeError(f"Input error for streaming the file from {stream_url} to s3://{bucket}/{key}")
    timeout: tuple[int, int] = (HTTP_CONNECTION_TIMEOUT, HTTP_READ_TIMEOUT)
    backoff_factor = S3_RETRY_TIMEOUT
    attempt = 0
    # Prepare the request
    session = CustomSessionRedirect(trusted_domains)
    self.logger.debug(f"trusted_domains = {trusted_domains}")
    request = requests.Request(
        method="GET",
        url=stream_url,
        auth=auth,
    )
    prepared_request = session.prepare_request(request)
    while attempt < max_retries:
        try:
            self.connect_s3()
            self.logger.info(f"Starting the streaming of {stream_url} to s3://{bucket}/{key}")
            with session.send(prepared_request, stream=True, timeout=timeout) as response:
                # with requests.get(stream_url, stream=True, auth=auth, timeout=timeout) as response:
                self.logger.debug(f"Request headers: {response.request.headers}")
                response.raise_for_status()  # Raise an error for bad responses (4xx and 5xx)

                # Default chunksize is set to 64Kb, can be manually increased
                chunk_size = 64 * 1024  # 64kb
                with response.raw as data_stream:
                    self.s3_client.upload_fileobj(
                        data_stream,
                        bucket,
                        key,
                        Config=boto3.s3.transfer.TransferConfig(multipart_threshold=chunk_size * 2),
                    )
                self.logger.info(f"Successfully uploaded to s3://{bucket}/{key}")
                return
        except (
            requests.exceptions.RequestException,
            botocore.client.ClientError,
            botocore.exceptions.BotoCoreError,
        ) as e:
            attempt += 1
            if attempt < max_retries:
                # keep retrying
                self.disconnect_s3()
                delay = backoff_factor * (2 ** (attempt - 1))
                self.logger.error(
                    f"Failed to stream the file from {stream_url} to s3://{bucket}/{key}: {e}"
                    f" Retrying in {delay} seconds. ",
                )
                self.wait_timeout(S3_RETRY_TIMEOUT)
                continue
            self.logger.exception(
                f"Failed to stream the file from {stream_url} to s3://{bucket}/{key}: {e}."
                f"\nTried for {max_retries} times, giving up",
            )
            raise RuntimeError(f"Failed to stream the file from {stream_url} to s3://{bucket}/{key}: {e}.") from e
        except Exception as e:
            self.logger.exception(
                "General exception. " f"Failed to stream the file from {stream_url} to s3://{bucket}/{key}: {e}",
            )
            raise RuntimeError(
                "General exception. " f"Failed to stream the file from {stream_url} to s3://{bucket}/{key}: {e}",
            ) from e

transfer_from_s3_to_s3(config)

Copy S3 keys specified in the configuration. Args: config (TransferFromS3ToS3Config): Configuration object containing bucket source, bucket destination, S3 files, maximum retries.

Returns:

Name Type Description
list list

A list of S3 keys that failed to be copied.

Raises:

Type Description
Exception

Any unexpected exception raised during the upload process.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
    def transfer_from_s3_to_s3(self, config: TransferFromS3ToS3Config) -> list:
        """Copy S3 keys specified in the configuration.
        Args:
            config (TransferFromS3ToS3Config): Configuration object containing bucket source, bucket destination,
                      S3 files, maximum retries.

        Returns:
            list: A list of S3 keys that failed to be copied.

        Raises:
            Exception: Any unexpected exception raised during the upload process.
        """
        # check the access to both buckets first, or even if they do exist
        self.check_bucket_access(config.bucket_src)
        self.check_bucket_access(config.bucket_dst)

        # collection_files: list of files to be downloaded
        #                   the list contains pair objects with the following
        #                   syntax: (local_path_to_be_added_to_the_local_prefix, s3_key)

        collection_files = self.files_to_be_downloaded(config.bucket_src, config.s3_files)

        self.logger.debug("collection_files = %s | bucket = %s", collection_files, config.bucket_src)
        failed_files = []
        copy_src = {"Bucket": config.bucket_src, "Key": ""}

        for collection_file in collection_files:
            if collection_file[0] is None:
                failed_files.append(collection_file[1])
                continue

            copied = False
            for keep_trying in range(config.max_retries):
                self.logger.debug(
                    "keep_trying %s | range(config.max_retries) %s ",
                    keep_trying,
                    range(config.max_retries),
                )
                try:
                    self.connect_s3()
                    dwn_start = datetime.now()
                    copy_src["Key"] = collection_file[1]
                    self.logger.debug("copy_src = %s", copy_src)
                    self.s3_client.copy_object(CopySource=copy_src, Bucket=config.bucket_dst, Key=collection_file[1])
                    self.logger.debug(
                        "s3://%s/%s copied to s3://%s/%s in %s ms",
                        config.bucket_src,
                        collection_file[1],
                        config.bucket_dst,
                        collection_file[1],
                        datetime.now() - dwn_start,
                    )
                    if not config.copy_only:
                        self.delete_file_from_s3(config.bucket_src, collection_file[1])
                    copied = True
                    break
                except (botocore.client.ClientError, botocore.exceptions.EndpointConnectionError) as error:
                    self.logger.exception(
                        "Error when copying the file s3://%s/%s to s3://%s. \
Exception: %s. Retrying in %s seconds for %s more times",
                        config.bucket_src,
                        collection_file[1],
                        config.bucket_dst,
                        error,
                        DWN_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.disconnect_s3()
                    self.wait_timeout(DWN_S3FILE_RETRY_TIMEOUT)
                except RuntimeError:
                    self.logger.exception(
                        "Error when copying the file s3://%s/%s to s3://%s. \
Failed to get the s3 client. Retrying in %s seconds for %s more times",
                        config.bucket_src,
                        collection_file[1],
                        config.bucket_dst,
                        DWN_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.wait_timeout(DWN_S3FILE_RETRY_TIMEOUT)

            if not copied:
                self.logger.error(
                    "Failed to copy the file s3://%s/%s to s3://%s. The copy was \
retried for %s times. Aborting",
                    config.bucket_src,
                    collection_file[1],
                    config.bucket_dst,
                    config.max_retries,
                )
                failed_files.append(collection_file[1])

        return failed_files

wait_timeout(timeout)

Wait for a specified timeout duration (minimum 200 ms).

This function implements a simple timeout mechanism, where it sleeps for 0.2 seconds in each iteration until the cumulative sleep time reaches the specified timeout duration.

Parameters:

Name Type Description Default
timeout float

The total duration to wait in seconds.

required
Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
def wait_timeout(self, timeout):
    """
    Wait for a specified timeout duration (minimum 200 ms).

    This function implements a simple timeout mechanism, where it sleeps for 0.2 seconds
    in each iteration until the cumulative sleep time reaches the specified timeout duration.

    Args:
        timeout (float): The total duration to wait in seconds.

    """
    time_cnt = 0.0
    while time_cnt < timeout:
        time.sleep(SLEEP_TIME)
        time_cnt += SLEEP_TIME

TransferFromS3ToS3Config dataclass

S3 configuration for copying a list with keys between buckets

Attributes:

Name Type Description
s3_files list

A list with the S3 object keys to be copied.

bucket_src str

The source S3 bucket name.

bucket_dst str

The destination S3 bucket name.

max_retries int

The maximum number of download retries. Default is DWN_S3FILE_RETRIES.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@dataclass
class TransferFromS3ToS3Config:
    """S3 configuration for copying a list with keys between buckets

    Attributes:
        s3_files (list): A list with the S3 object keys to be copied.
        bucket_src (str): The source S3 bucket name.
        bucket_dst (str): The destination S3 bucket name.
        max_retries (int, optional): The maximum number of download retries. Default is DWN_S3FILE_RETRIES.

    """

    s3_files: list
    bucket_src: str
    bucket_dst: str
    copy_only: bool = False
    max_retries: int = DWN_S3FILE_RETRIES