Skip to content

Common

Authentication functions implementation.

Note: calls https://gitlab.si.c-s.fr/space_applications/eoservices/apikey-manager

__apikey_security_cached(apikey_value) async

Cached version of apikey_security. Cache an infinite (sys.maxsize) number of results for 120 seconds.

This function serves as a cached version of apikey_security. It retrieves user access control information from the User Authentication and Authorization Control (UAC) manager and caches the result for performance optimization.

Parameters:

Name Type Description Default
apikey_value str

The API key value.

required

Returns:

Name Type Description
tuple tuple[list[str], dict, str]

A tuple containing user IAM roles, configuration data, and user login information.

Raises:

Type Description
HTTPException

If there is an error connecting to the UAC manager or if the UAC manager returns an error.

Source code in docs/rs-server/services/common/rs_server_common/authentication.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
@cached(cache=ttl_cache)
async def __apikey_security_cached(apikey_value) -> tuple[list[str], dict, str]:
    """
    Cached version of apikey_security. Cache an infinite (sys.maxsize) number of results for 120 seconds.

    This function serves as a cached version of apikey_security. It retrieves user access control information
    from the User Authentication and Authorization Control (UAC) manager and caches the result for performance
    optimization.

    Args:
        apikey_value (str): The API key value.

    Returns:
        tuple: A tuple containing user IAM roles, configuration data, and user login information.

    Raises:
        HTTPException: If there is an error connecting to the UAC manager or if the UAC manager returns an error.
    """

    # The uac manager check url is passed as an environment variable
    try:
        check_url = env["RSPY_UAC_CHECK_URL"]
    except KeyError:
        raise HTTPException(HTTP_400_BAD_REQUEST, "UAC manager URL is undefined")  # pylint: disable=raise-missing-from

    # Request the uac, pass user-defined api key in http header
    try:
        response = await settings.http_client().get(check_url, headers={APIKEY_HEADER: apikey_value or ""})
    except httpx.HTTPError as error:
        message = "Error connecting to the UAC manager"
        logger.error(f"{message}\n{traceback.format_exc()}")
        raise HTTPException(HTTP_500_INTERNAL_SERVER_ERROR, message) from error

    # Read the api key info
    if response.is_success:
        contents = response.json()
        # Note: for now, config is an empty dict
        return contents["iam_roles"], contents["config"], contents["user_login"]

    # Try to read the response detail or error
    try:
        json = response.json()
        if "detail" in json:
            detail = json["detail"]
        else:
            detail = json["error"]

    # If this fail, get the full response content
    except Exception:  # pylint: disable=broad-exception-caught
        detail = response.read().decode("utf-8")

    # Forward error
    raise HTTPException(response.status_code, f"UAC manager: {detail}")

apikey_security(request, apikey_header) async

FastAPI Security dependency for the cluster mode. Check the api key validity, passed as an HTTP header.

Parameters:

Name Type Description Default
apikey_header Security

API key passed in HTTP header

required

Returns:

Type Description
tuple[list[str], dict, str]

Tuple of (IAM roles, config, user login) information from the keycloak account, associated to the api key.

Source code in docs/rs-server/services/common/rs_server_common/authentication.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
async def apikey_security(
    request: Request,
    apikey_header: Annotated[str, Security(APIKEY_AUTH_HEADER)],
    # apikey_query: Annotated[str, Security(APIKEY_AUTH_QUERY)],
) -> tuple[list[str], dict, str]:
    """
    FastAPI Security dependency for the cluster mode. Check the api key validity, passed as an HTTP header.

    Args:
        apikey_header (Security): API key passed in HTTP header

    Returns:
        Tuple of (IAM roles, config, user login) information from the keycloak account, associated to the api key.
    """

    # Use the api key passed by either http headers or query parameter (disabled for now)
    apikey_value = apikey_header  # or apikey_query
    if not apikey_value:
        raise HTTPException(
            status_code=HTTP_403_FORBIDDEN,
            detail="Not authenticated",
        )

    # Call the cached function (fastapi Depends doesn't work with @cached)
    auth_roles, auth_config, user_login = await __apikey_security_cached(str(apikey_value))
    request.state.auth_roles = auth_roles
    request.state.auth_config = auth_config
    request.state.user_login = user_login
    logger.debug(f"API key information: {auth_roles, auth_config, user_login}")
    return auth_roles, auth_config, user_login

apikey_validator(station, access_type)

Decorator to validate API key access for a specific station and access type.

This decorator checks if the authorization key contains the necessary role to access the specified station with the specified access type.

Parameters:

Name Type Description Default
station str

The name of the station, either "adgs" or "cadip".

required
access_type str

The type of access, such as "download" or "read".

required

Raises:

Type Description
HTTPException

If the authorization key does not include the required role to access the specified station with the specified access type.

Returns:

Name Type Description
function Callable

The decorator function.

Source code in docs/rs-server/services/common/rs_server_common/authentication.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def apikey_validator(station, access_type):
    """Decorator to validate API key access for a specific station and access type.

    This decorator checks if the authorization key contains the necessary role to access
    the specified station with the specified access type.

    Args:
        station (str): The name of the station, either "adgs" or "cadip".
        access_type (str): The type of access, such as "download" or "read".

    Raises:
        HTTPException: If the authorization key does not include the required role
            to access the specified station with the specified access type.

    Returns:
        function (Callable): The decorator function.
    """

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if settings.CLUSTER_MODE:
                # Read the full cadip station passed in parameter e.g. INS, MPS, ...
                if station == "cadip":
                    cadip_station = kwargs["station"]  # ins, mps, mti, nsg, sgs, or cadip
                    try:
                        full_station = STATIONS_AUTH_LUT[cadip_station.lower()]
                    except KeyError as exception:
                        raise HTTPException(
                            status_code=status.HTTP_400_BAD_REQUEST,
                            detail=f"Unknown CADIP station: {cadip_station!r}",
                        ) from exception
                else:  # for adgs
                    full_station = station

                requested_role = f"rs_{full_station}_{access_type}".upper()
                try:
                    auth_roles = [role.upper() for role in kwargs["request"].state.auth_roles]
                except KeyError:
                    auth_roles = []

                if requested_role not in auth_roles:
                    raise HTTPException(
                        status_code=status.HTTP_401_UNAUTHORIZED,
                        detail=f"Authorization key does not include the right role to {access_type} "
                        f"from the {full_station!r} station",
                    )

            return func(*args, **kwargs)

        return wrapper

    return decorator

Database connection.

Taken from: https://praciano.com.br/fastapi-and-async-sqlalchemy-20-with-pytest-done-right.html

DatabaseSessionManager

Database session configuration.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class DatabaseSessionManager:
    """Database session configuration."""

    lock = Lock()
    multiprocessing_lock = multiprocessing.Lock()

    def __init__(self):
        """Create a Database session configuration."""
        self._engine: Engine | None = None
        self._sessionmaker: sessionmaker | None = None

    @classmethod
    def url(cls):
        """Get database connection URL."""
        try:
            # pylint: disable=consider-using-f-string
            return os.getenv(
                "POSTGRES_URL",
                "postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}".format(
                    user=os.environ["POSTGRES_USER"],
                    password=os.environ["POSTGRES_PASSWORD"],
                    host=os.environ["POSTGRES_HOST"],
                    port=os.environ["POSTGRES_PORT"],
                    dbname=os.environ["POSTGRES_DB"],
                ),
            )
        except KeyError as key_error:
            raise KeyError(
                "The PostgreSQL environment variables are missing: "
                "POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB",
            ) from key_error

    def open_session(self, url: str = ""):
        """Open database session."""

        # If the 'self' object is used by several threads in the same process,
        # make sure to initialize the session only once.
        with DatabaseSessionManager.lock:
            if (self._engine is None) or (self._sessionmaker is None):
                self._engine = create_engine(url or self.url(), poolclass=NullPool, pool_pre_ping=True)
                self._sessionmaker = sessionmaker(autocommit=False, autoflush=False, bind=self._engine)

                try:
                    # Create all tables.
                    # Warning: this only works if the database table modules have been imported
                    # e.g. import rs_server_adgs.adgs_download_status
                    self.create_all()

                # It fails if the database is unreachable, but even in this case the engine and session are not None.
                # Set them to None so we will try to create all tables again on the next try.
                except Exception:
                    self.close()
                    raise

    def close(self):
        """Close database session."""
        if self._engine is not None:
            self._engine.dispose()
            self._engine = None
        self._sessionmaker = None

    @contextlib.contextmanager
    def connect(self) -> Iterator[Connection]:
        """Open new database connection instance."""

        if self._engine is None:
            raise RuntimeError("DatabaseSessionManager is not initialized")

        with self._engine.begin() as connection:
            try:
                yield connection

            # In case of any exception, rollback connection and re-raise into HTTP exception
            except Exception as exception:  # pylint: disable=broad-exception-caught
                connection.rollback()
                self.reraise_http_exception(exception)

    @contextlib.contextmanager
    def session(self) -> Iterator[Session]:
        """Open new database session instance."""

        if self._sessionmaker is None:
            raise RuntimeError("DatabaseSessionManager is not initialized")

        session = self._sessionmaker()
        try:
            yield session

        # In case of any exception, rollback session and re-raise into HTTP exception
        except Exception as exception:  # pylint: disable=broad-exception-caught
            session.rollback()
            self.reraise_http_exception(exception)

        # Close session when deleting instance.
        finally:
            session.close()

    @staticmethod
    def __filelock(func):
        """Avoid concurrent writing to the database using a file locK."""

        @wraps(func)
        def with_filelock(*args, **kwargs):
            """Wrap the the call to 'func' inside the lock."""

            # Let's do this only if the RSPY_WORKING_DIR environment variable is defined.
            # Write a .lock file inside this directory.
            try:
                with FileLock(Path(os.environ["RSPY_WORKING_DIR"]) / f"{__name__}.lock"):
                    return func(*args, **kwargs)

            # Else just call the function without a lock
            except KeyError:
                return func(*args, **kwargs)

        return with_filelock

    @__filelock
    def create_all(self):
        """Create all database tables."""
        with DatabaseSessionManager.multiprocessing_lock:  # Handle concurrent table creation by different processes
            Base.metadata.create_all(bind=self._engine)

    @__filelock
    def drop_all(self):
        """Drop all database tables."""
        with DatabaseSessionManager.multiprocessing_lock:  # Handle concurrent table creation by different processes
            Base.metadata.drop_all(bind=self._engine)

    @classmethod
    def reraise_http_exception(cls, exception: Exception):
        """Re-raise all exceptions into HTTP exceptions."""

        # Raised exceptions are not always printed in the console, so do it manually with the stacktrace.
        logger.error(traceback.format_exc())

        if isinstance(exception, StarletteHTTPException):
            raise exception
        raise HTTPException(status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail=repr(exception))

__filelock(func) staticmethod

Avoid concurrent writing to the database using a file locK.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@staticmethod
def __filelock(func):
    """Avoid concurrent writing to the database using a file locK."""

    @wraps(func)
    def with_filelock(*args, **kwargs):
        """Wrap the the call to 'func' inside the lock."""

        # Let's do this only if the RSPY_WORKING_DIR environment variable is defined.
        # Write a .lock file inside this directory.
        try:
            with FileLock(Path(os.environ["RSPY_WORKING_DIR"]) / f"{__name__}.lock"):
                return func(*args, **kwargs)

        # Else just call the function without a lock
        except KeyError:
            return func(*args, **kwargs)

    return with_filelock

__init__()

Create a Database session configuration.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
49
50
51
52
def __init__(self):
    """Create a Database session configuration."""
    self._engine: Engine | None = None
    self._sessionmaker: sessionmaker | None = None

close()

Close database session.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
 97
 98
 99
100
101
102
def close(self):
    """Close database session."""
    if self._engine is not None:
        self._engine.dispose()
        self._engine = None
    self._sessionmaker = None

connect()

Open new database connection instance.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
@contextlib.contextmanager
def connect(self) -> Iterator[Connection]:
    """Open new database connection instance."""

    if self._engine is None:
        raise RuntimeError("DatabaseSessionManager is not initialized")

    with self._engine.begin() as connection:
        try:
            yield connection

        # In case of any exception, rollback connection and re-raise into HTTP exception
        except Exception as exception:  # pylint: disable=broad-exception-caught
            connection.rollback()
            self.reraise_http_exception(exception)

create_all()

Create all database tables.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
160
161
162
163
164
@__filelock
def create_all(self):
    """Create all database tables."""
    with DatabaseSessionManager.multiprocessing_lock:  # Handle concurrent table creation by different processes
        Base.metadata.create_all(bind=self._engine)

drop_all()

Drop all database tables.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
166
167
168
169
170
@__filelock
def drop_all(self):
    """Drop all database tables."""
    with DatabaseSessionManager.multiprocessing_lock:  # Handle concurrent table creation by different processes
        Base.metadata.drop_all(bind=self._engine)

open_session(url='')

Open database session.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def open_session(self, url: str = ""):
    """Open database session."""

    # If the 'self' object is used by several threads in the same process,
    # make sure to initialize the session only once.
    with DatabaseSessionManager.lock:
        if (self._engine is None) or (self._sessionmaker is None):
            self._engine = create_engine(url or self.url(), poolclass=NullPool, pool_pre_ping=True)
            self._sessionmaker = sessionmaker(autocommit=False, autoflush=False, bind=self._engine)

            try:
                # Create all tables.
                # Warning: this only works if the database table modules have been imported
                # e.g. import rs_server_adgs.adgs_download_status
                self.create_all()

            # It fails if the database is unreachable, but even in this case the engine and session are not None.
            # Set them to None so we will try to create all tables again on the next try.
            except Exception:
                self.close()
                raise

reraise_http_exception(exception) classmethod

Re-raise all exceptions into HTTP exceptions.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
172
173
174
175
176
177
178
179
180
181
@classmethod
def reraise_http_exception(cls, exception: Exception):
    """Re-raise all exceptions into HTTP exceptions."""

    # Raised exceptions are not always printed in the console, so do it manually with the stacktrace.
    logger.error(traceback.format_exc())

    if isinstance(exception, StarletteHTTPException):
        raise exception
    raise HTTPException(status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail=repr(exception))

session()

Open new database session instance.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@contextlib.contextmanager
def session(self) -> Iterator[Session]:
    """Open new database session instance."""

    if self._sessionmaker is None:
        raise RuntimeError("DatabaseSessionManager is not initialized")

    session = self._sessionmaker()
    try:
        yield session

    # In case of any exception, rollback session and re-raise into HTTP exception
    except Exception as exception:  # pylint: disable=broad-exception-caught
        session.rollback()
        self.reraise_http_exception(exception)

    # Close session when deleting instance.
    finally:
        session.close()

url() classmethod

Get database connection URL.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@classmethod
def url(cls):
    """Get database connection URL."""
    try:
        # pylint: disable=consider-using-f-string
        return os.getenv(
            "POSTGRES_URL",
            "postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}".format(
                user=os.environ["POSTGRES_USER"],
                password=os.environ["POSTGRES_PASSWORD"],
                host=os.environ["POSTGRES_HOST"],
                port=os.environ["POSTGRES_PORT"],
                dbname=os.environ["POSTGRES_DB"],
            ),
        )
    except KeyError as key_error:
        raise KeyError(
            "The PostgreSQL environment variables are missing: "
            "POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB",
        ) from key_error

get_db()

Return a database session for FastAPI dependency injection.

Source code in docs/rs-server/services/common/rs_server_common/db/database.py
187
188
189
190
191
192
193
194
195
def get_db():
    """Return a database session for FastAPI dependency injection."""
    try:
        with sessionmanager.session() as session:
            yield session

    # Re-raise all exceptions into HTTP exceptions
    except Exception as exception:  # pylint: disable=broad-exception-caught
        DatabaseSessionManager.reraise_http_exception(exception)

Logging utility.

CustomFormatter

Bases: Formatter

Custom logging formatter with colored text. See: https://stackoverflow.com/a/56944256

Source code in docs/rs-server/services/common/rs_server_common/utils/logging.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class CustomFormatter(logging.Formatter):
    """
    Custom logging formatter with colored text.
    See: https://stackoverflow.com/a/56944256
    """

    _RED = "\x1b[31m"
    _BOLD_RED = "\x1b[31;1m"
    _GREEN = "\x1b[32m"
    _YELLOW = "\x1b[33m"
    _PURPLE = "\x1b[35m"
    _RESET = "\x1b[0m"

    _FORMAT = f"%(asctime)s.%(msecs)03d [{{color}}%(levelname)s{_RESET}] (%(name)s) %(message)s"
    _DATETIME = "%H:%M:%S"

    _FORMATS = {
        logging.NOTSET: _FORMAT.format(color=""),
        logging.DEBUG: _FORMAT.format(color=_PURPLE),
        logging.INFO: _FORMAT.format(color=_GREEN),
        logging.WARNING: _FORMAT.format(color=_YELLOW),
        logging.ERROR: _FORMAT.format(color=_BOLD_RED),
        logging.CRITICAL: _FORMAT.format(color=_RED),
    }

    def format(self, record):
        level_format = self._FORMATS.get(record.levelno)
        formatter = logging.Formatter(level_format, self._DATETIME)
        return formatter.format(record)

Logging

Logging utility.

Attributes:

Name Type Description
lock

For code synchronization

level

Minimal log level to use for all new logging instances.

Source code in docs/rs-server/services/common/rs_server_common/utils/logging.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class Logging:  # pylint: disable=too-few-public-methods
    """
    Logging utility.

    Attributes:
        lock: For code synchronization
        level: Minimal log level to use for all new logging instances.
    """

    lock = Lock()
    level = logging.DEBUG

    @classmethod
    def default(cls, name="rspy"):
        """
        Return a default Logger class instance.

        Args:
            name (str): Logger name. You can pass __name__ to use your current module name.
        """
        logger = logging.getLogger(name=name)

        with cls.lock:
            # Don't propagate to root logger
            logger.propagate = False

            # If we have already set the handlers for the logger with this name, do nothing more
            if logger.hasHandlers():
                return logger

            # Set the minimal log level to use for all new logging instances.
            logger.setLevel(cls.level)

            # Create console handler
            handler = logging.StreamHandler()
            handler.setFormatter(CustomFormatter())
            logger.addHandler(handler)

            # Export logs to Loki, see: https://pypi.org/project/python-logging-loki/
            # Note: on the cluster, this is not used. Promtail already forwards stdout to Loki.
            loki_endpoint = os.getenv("LOKI_ENDPOINT")
            if loki_endpoint and settings.SERVICE_NAME:
                handler = logging_loki.LokiQueueHandler(
                    Queue(-1),
                    url=loki_endpoint,
                    tags={"service": settings.SERVICE_NAME},
                    # auth=("username", "password"),
                    version="1",
                )
                handler.setFormatter(CustomFormatter())
                logger.addHandler(handler)

            return logger

default(name='rspy') classmethod

Return a default Logger class instance.

Parameters:

Name Type Description Default
name str

Logger name. You can pass name to use your current module name.

'rspy'
Source code in docs/rs-server/services/common/rs_server_common/utils/logging.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@classmethod
def default(cls, name="rspy"):
    """
    Return a default Logger class instance.

    Args:
        name (str): Logger name. You can pass __name__ to use your current module name.
    """
    logger = logging.getLogger(name=name)

    with cls.lock:
        # Don't propagate to root logger
        logger.propagate = False

        # If we have already set the handlers for the logger with this name, do nothing more
        if logger.hasHandlers():
            return logger

        # Set the minimal log level to use for all new logging instances.
        logger.setLevel(cls.level)

        # Create console handler
        handler = logging.StreamHandler()
        handler.setFormatter(CustomFormatter())
        logger.addHandler(handler)

        # Export logs to Loki, see: https://pypi.org/project/python-logging-loki/
        # Note: on the cluster, this is not used. Promtail already forwards stdout to Loki.
        loki_endpoint = os.getenv("LOKI_ENDPOINT")
        if loki_endpoint and settings.SERVICE_NAME:
            handler = logging_loki.LokiQueueHandler(
                Queue(-1),
                url=loki_endpoint,
                tags={"service": settings.SERVICE_NAME},
                # auth=("username", "password"),
                version="1",
            )
            handler.setFormatter(CustomFormatter())
            logger.addHandler(handler)

        return logger

OpenTelemetry utility

init_traces(app, service_name)

Init instrumentation of OpenTelemetry traces.

Parameters:

Name Type Description Default
app FastAPI

FastAPI application

required
service_name str

service name

required
Source code in docs/rs-server/services/common/rs_server_common/utils/opentelemetry.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def init_traces(app: fastapi.FastAPI, service_name: str):
    """
    Init instrumentation of OpenTelemetry traces.

    Args:
        app (fastapi.FastAPI): FastAPI application
        service_name (str): service name
    """

    # See: https://github.com/softwarebloat/python-tracing-demo/tree/main

    # Don't call this line from pytest because it causes errors:
    # Transient error StatusCode.UNAVAILABLE encountered while exporting metrics to localhost:4317, retrying in ..s.
    if not FROM_PYTEST:
        tempo_endpoint = os.getenv("TEMPO_ENDPOINT")
        if not tempo_endpoint:
            return

        # TODO: to avoid errors in local mode:
        # Transient error StatusCode.UNAVAILABLE encountered while exporting metrics to localhost:4317, retrying in ..s.
        #
        # The below line does not work either but at least we have less error messages.
        # See: https://pforge-exchange2.astrium.eads.net/jira/browse/RSPY-221?focusedId=162092&
        # page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-162092
        #
        # Now we have a single line error, which is less worst:
        # Failed to export metrics to tempo:4317, error code: StatusCode.UNIMPLEMENTED
        os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = tempo_endpoint

    otel_resource = Resource(attributes={"service.name": service_name})
    otel_tracer = TracerProvider(resource=otel_resource)
    trace.set_tracer_provider(otel_tracer)

    if not FROM_PYTEST:
        otel_tracer.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint=tempo_endpoint)))

    FastAPIInstrumentor.instrument_app(app, tracer_provider=otel_tracer)
    # logger.debug(f"OpenTelemetry instrumentation of 'fastapi.FastAPIInstrumentor'")

    # Instrument all the dependencies under opentelemetry.instrumentation.*
    # NOTE: we need 'poetry run opentelemetry-bootstrap -a install' to install these.

    package = opentelemetry.instrumentation
    prefix = package.__name__ + "."
    classes = set()

    # We need an empty PYTHONPATH if the env var is missing
    os.environ["PYTHONPATH"] = os.getenv("PYTHONPATH", "")

    # Recursively find all package modules
    for _, module_str, _ in pkgutil.walk_packages(path=package.__path__, prefix=prefix, onerror=None):

        # Don't instrument these modules, they have errors, maybe we should see why
        if module_str in ["opentelemetry.instrumentation.tortoiseorm"]:
            continue

        # Import and find all module classes
        __import__(module_str)
        for _, _class in inspect.getmembers(sys.modules[module_str]):
            if (not inspect.isclass(_class)) or (_class in classes):
                continue

            # Save the class (classes are found several times when imported by other modules)
            classes.add(_class)

            # Don't instrument these classes, they have errors, maybe we should see why
            if _class in [AsyncioInstrumentor, AwsLambdaInstrumentor, BaseInstrumentor]:
                continue

            # If the "instrument" method exists, call it
            _instrument = getattr(_class, "instrument", None)
            if callable(_instrument):

                _class_instance = _class()
                if not _class_instance.is_instrumented_by_opentelemetry:
                    _class_instance.instrument(tracer_provider=otel_tracer)

This module is used to share common functions between apis endpoints

EoDAGDownloadHandler dataclass

Dataclass to store arguments needed for eodag download.

Attributes:

Name Type Description
db_handler DownloadStatus

An instance used to access the database.

thread_started Event

Event to signal the start of the download thread.

station str

Station identifier (needed only for CADIP).

product_id str

Identifier of the product to be downloaded.

name str

Filename of the file to be downloaded.

local str | None

Local path where the product will be stored

obs str | None

Path to the S3 storage where the file will be uploaded

Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@dataclass
class EoDAGDownloadHandler:
    """Dataclass to store arguments needed for eodag download.

    Attributes:
        db_handler (DownloadStatus): An instance used to access the database.
        thread_started (threading.Event): Event to signal the start of the download thread.
        station (str): Station identifier (needed only for CADIP).
        product_id (str): Identifier of the product to be downloaded.
        name (str): Filename of the file to be downloaded.
        local (str | None): Local path where the product will be stored
        obs (str | None): Path to the S3 storage where the file will be uploaded
    """

    db_handler: DownloadStatus
    thread_started: threading.Event
    station: str  # needed only for CADIP
    product_id: str
    name: str
    local: str | None
    obs: str | None

create_stac_collection(products, feature_template, stac_mapper)

Creates a STAC feature collection based on a given template for a list of EOProducts.

Parameters:

Name Type Description Default
products List[EOProduct]

A list of EOProducts to create STAC features for.

required
feature_template dict

The template for generating STAC features.

required
stac_mapper dict

The mapping dictionary for converting EOProduct data to STAC properties.

required

Returns:

Name Type Description
dict dict

The STAC feature collection containing features for each EOProduct.

Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
def create_stac_collection(products: List[EOProduct], feature_template: dict, stac_mapper: dict) -> dict:
    """
    Creates a STAC feature collection based on a given template for a list of EOProducts.

    Args:
        products (List[EOProduct]): A list of EOProducts to create STAC features for.
        feature_template (dict): The template for generating STAC features.
        stac_mapper (dict): The mapping dictionary for converting EOProduct data to STAC properties.

    Returns:
        dict: The STAC feature collection containing features for each EOProduct.
    """
    stac_template: Dict[Any, Any] = {
        "type": "FeatureCollection",
        "numberMatched": 0,
        "numberReturned": 0,
        "features": [],
    }
    for product in products:
        product_data = extract_eo_product(product, stac_mapper)
        feature_tmp = odata_to_stac(copy.deepcopy(feature_template), product_data, stac_mapper)
        stac_template["numberMatched"] += 1
        stac_template["numberReturned"] += 1
        stac_template["features"].append(feature_tmp)
    return stac_template

eodag_download(argument, db, init_provider, **kwargs)

Initiates the eodag download process.

Parameters:

Name Type Description Default
argument EoDAGDownloadHandler

An instance of EoDAGDownloadHandler containing the arguments used in the

required

downloading process. db: The database connection object. init_provider (Callable[[str], Provider]): A function to initialize the provider for downloading. **kwargs: Additional keyword arguments.

Note

The local and obs parameters are optional: - local (str | None): Local path where the product will be stored. If this parameter is not given, the local path where the file is stored will be set to a temporary one. - obs (str | None): Path to S3 storage where the file will be uploaded, after a successful download from CADIP server. If this parameter is not given, the file will not be uploaded to the S3 storage.

Raises:

Type Description
RuntimeError

If there is an issue connecting to the S3 storage during the download.

Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def eodag_download(argument: EoDAGDownloadHandler, db, init_provider: Callable[[str], Provider], **kwargs):
    """Initiates the eodag download process.

    Args:
        argument (EoDAGDownloadHandler): An instance of EoDAGDownloadHandler containing the arguments used in the
    downloading process.
        db: The database connection object.
        init_provider (Callable[[str], Provider]): A function to initialize the provider for downloading.
        **kwargs: Additional keyword arguments.

    Note:
        The local and obs parameters are optional:
        - local (str | None): Local path where the product will be stored. If this
            parameter is not given, the local path where the file is stored will be set to a temporary one.
        - obs (str | None): Path to S3 storage where the file will be uploaded, after a successful download from CADIP
            server. If this parameter is not given, the file will not be uploaded to the S3 storage.

    Raises:
        RuntimeError: If there is an issue connecting to the S3 storage during the download.
    """

    # Open a database sessions in this thread, because the session from the root thread may have closed.
    # Get the product download status

    db_product = argument.db_handler.get(db, name=argument.name)
    # init eodag object
    try:
        logger.debug(
            "%s : %s : %s: Thread started !",
            os.getpid(),
            threading.get_ident(),
            datetime.now(),
        )

        setup_logging(3, no_progress_bar=True)
        # tempfile to be used here

        # Update the status to IN_PROGRESS in the database
        db_product.in_progress(db)
        local = kwargs["default_path"] if not argument.local else argument.local
        # notify the main thread that the download will be started
        # To be discussed: init_provider may fail, but in the same time it takes too much
        # when properly initialized, and the timeout for download endpoint return is overpassed
        argument.thread_started.set()
        provider = init_provider(argument.station)
        init = datetime.now()
        filename = Path(local) / argument.name
        provider.download(argument.product_id, filename)
        logger.info(
            "%s : %s : File: %s downloaded in %s",
            os.getpid(),
            threading.get_ident(),
            argument.name,
            datetime.now() - init,
        )
    except Exception as exception:  # pylint: disable=broad-exception-caught
        # Pylint disabled since error is logged here.
        logger.error(
            "%s : %s : %s: Exception caught: %s",
            os.getpid(),
            threading.get_ident(),
            datetime.now(),
            exception,
        )

        # Try n times to update the status to FAILED in the database
        update_db(db, db_product, EDownloadStatus.FAILED, repr(exception))
        return

    if argument.obs:
        try:
            # NOTE: The environment variables have to be set from outside
            # otherwise the connection with the s3 endpoint fails
            # TODO: the secrets should be set through env vars
            # pylint: disable=pointless-string-statement
            """
            secrets = {
                "s3endpoint": None,
                "accesskey": None,
                "secretkey": None,
            }
            S3StorageHandler.get_secrets_from_file(secrets, "/home/" + os.environ["USER"] + "/.s3cfg")
            os.environ["S3_ACCESSKEY"] = secrets["accesskey"]
            os.environ["S3_SECRETKEY"] = secrets["secretkey"]
            os.environ["S3_ENDPOINT"] = secrets["s3endpoint"]
            os.environ["S3_REGION"] = "sbg"
            """
            s3_handler = S3StorageHandler(
                os.environ["S3_ACCESSKEY"],
                os.environ["S3_SECRETKEY"],
                os.environ["S3_ENDPOINT"],
                os.environ["S3_REGION"],  # "sbg",
            )
            obs_array = argument.obs.split("/")  # s3://bucket/path/to
            s3_config = PutFilesToS3Config(
                [str(filename)],
                obs_array[2],
                "/".join(obs_array[3:]),
            )
            s3_handler.put_files_to_s3(s3_config)
        except (RuntimeError, KeyError) as e:
            logger.exception(f"Could not connect to the s3 storage: {e}")
            # Try n times to update the status to FAILED in the database
            update_db(
                db,
                db_product,
                EDownloadStatus.FAILED,
                "Could not connect to the s3 storage",
            )
            return
        except Exception as e:  # pylint: disable=broad-except
            logger.exception(f"General exception: {e}")
            return
        finally:
            os.remove(filename)

    # Try n times to update the status to DONE in the database
    update_db(db, db_product, EDownloadStatus.DONE)
    logger.debug("Download finished succesfully for %s", db_product.name)

extract_eo_product(eo_product, mapper)

This function is creating key:value pairs from an EOProduct properties

Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
377
378
379
def extract_eo_product(eo_product: EOProduct, mapper: dict) -> dict:
    """This function is creating key:value pairs from an EOProduct properties"""
    return {key: value for key, value in eo_product.properties.items() if key in mapper.values()}

is_valid_date_format(date)

Check if a string adheres to the expected date format "YYYY-MM-DDTHH:MM:SS.sssZ".

Parameters:

Name Type Description Default
date str

The string to be validated for the specified date format.

required

Returns:

Name Type Description
bool bool

True if the input string adheres to the expected date format, otherwise False.

Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def is_valid_date_format(date: str) -> bool:
    """Check if a string adheres to the expected date format "YYYY-MM-DDTHH:MM:SS.sssZ".

    Args:
        date (str): The string to be validated for the specified date format.

    Returns:
        bool: True if the input string adheres to the expected date format, otherwise False.

    """
    try:
        datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
        return True
    except ValueError:
        return False

odata_to_stac(feature_template, odata_dict, odata_stac_mapper)

Maps OData values to a given STAC template.

Parameters:

Name Type Description Default
feature_template dict

The STAC feature template to be populated.

required
odata_dict dict

The dictionary containing OData values.

required
odata_stac_mapper dict

The mapping dictionary for converting OData keys to STAC properties.

required

Returns:

Name Type Description
dict dict

The populated STAC feature template.

Raises:

Type Description
ValueError

If the provided STAC feature template is invalid.

Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
def odata_to_stac(feature_template: dict, odata_dict: dict, odata_stac_mapper: dict) -> dict:
    """
    Maps OData values to a given STAC template.

    Args:
        feature_template (dict): The STAC feature template to be populated.
        odata_dict (dict): The dictionary containing OData values.
        odata_stac_mapper (dict): The mapping dictionary for converting OData keys to STAC properties.

    Returns:
        dict: The populated STAC feature template.

    Raises:
        ValueError: If the provided STAC feature template is invalid.
    """
    if not all(item in feature_template.keys() for item in ["properties", "id", "assets"]):
        raise ValueError("Invalid stac feature template")
    for stac_key, eodag_key in odata_stac_mapper.items():
        if eodag_key in odata_dict:
            if stac_key in feature_template["properties"]:
                feature_template["properties"][stac_key] = odata_dict[eodag_key]
            elif stac_key == "id":
                feature_template["id"] = odata_dict[eodag_key]
            elif stac_key == "file:size":
                feature_template["assets"]["file"][stac_key] = odata_dict[eodag_key]
    return feature_template

sort_feature_collection(feature_collection, sortby)

Sorts a STAC feature collection based on a given criteria.

Parameters:

Name Type Description Default
feature_collection dict

The STAC feature collection to be sorted.

required
sortby str

The sorting criteria. Use "+fieldName" for ascending order or "-fieldName" for descending order. Use "+doNotSort" to skip sorting.

required

Returns:

Name Type Description
dict dict

The sorted STAC feature collection.

Note

If sortby is not in the format of "+fieldName" or "-fieldName", the function defaults to ascending order by the "datetime" field.

Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def sort_feature_collection(feature_collection: dict, sortby: str) -> dict:
    """
    Sorts a STAC feature collection based on a given criteria.

    Args:
        feature_collection (dict): The STAC feature collection to be sorted.
        sortby (str): The sorting criteria. Use "+fieldName" for ascending order
            or "-fieldName" for descending order. Use "+doNotSort" to skip sorting.

    Returns:
        dict: The sorted STAC feature collection.

    Note:
        If sortby is not in the format of "+fieldName" or "-fieldName",
        the function defaults to ascending order by the "datetime" field.
    """
    # Force default sorting even if the input is invalid, don't block the return collection because of sorting.
    if sortby != "+doNotSort":
        order = sortby[0]
        if order not in ["+", "-"]:
            order = "+"

        if len(feature_collection["features"]) and "properties" in feature_collection["features"][0]:
            field = sortby[1:]
            by = "datetime" if field not in feature_collection["features"][0]["properties"].keys() else field
            feature_collection["features"] = sorted(
                feature_collection["features"],
                key=lambda feature: feature["properties"][by],
                reverse=order == "-",
            )
    return feature_collection

update_db(db, db_product, estatus, status_fail_message=None)

Update the download status of a product in the database.

This function attempts to update the download status of a product in the database. It retries the update operation for a maximum of three times, waiting 1 second between attempts.

Parameters:

Name Type Description Default
db Session

The database session.

required
db_product DownloadStatus

The product whose status needs to be updated.

required
estatus EDownloadStatus

The new download status.

required
status_fail_message Optional[str]

An optional message associated with the failure status.

None

Raises:

Type Description
OperationalError(exc)

If the database update operation fails after multiple attempts.

Example

update_db(db_session, product_instance, EDownloadStatus.DONE)

Note
  • This function is designed to update the download status in the database.
  • It retries the update operation for a maximum of three times.
  • If the update fails, an exception is raised, indicating an issue with the database.
Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
def update_db(
    db: sqlalchemy.orm.Session,
    db_product: DownloadStatus,
    estatus: EDownloadStatus,
    status_fail_message=None,
):
    """Update the download status of a product in the database.

    This function attempts to update the download status of a product in the database.
    It retries the update operation for a maximum of three times, waiting 1 second between attempts.

    Args:
        db (sqlalchemy.orm.Session): The database session.
        db_product (DownloadStatus): The product whose status needs to be updated.
        estatus (EDownloadStatus): The new download status.
        status_fail_message (Optional[str]): An optional message associated with the failure status.

    Raises:
        OperationalError (sqlalchemy.exc): If the database update operation fails after multiple attempts.

    Example:
        >>> update_db(db_session, product_instance, EDownloadStatus.DONE)

    Note:
        - This function is designed to update the download status in the database.
        - It retries the update operation for a maximum of three times.
        - If the update fails, an exception is raised, indicating an issue with the database.

    """
    # Try n times to update the status.
    # Don't do it for NOT_STARTED and IN_PROGRESS (call directly db_product.not_started
    # or db_product.in_progress) because it will anyway be overwritten later by DONE or FAILED.

    # Init last exception to empty value.
    last_exception: Exception = Exception()

    for _ in range(3):
        try:
            if estatus == EDownloadStatus.FAILED:
                db_product.failed(db, status_fail_message)
            elif estatus == EDownloadStatus.DONE:
                db_product.done(db)

            # The database update worked, exit function
            return

        # The database update failed, wait n seconds and retry
        except sqlalchemy.exc.OperationalError as exception:
            logger.error(f"Error updating status in database:\n{exception}")
            last_exception = exception
            time.sleep(1)

    # If all attemps failed, raise the last Exception
    raise last_exception

validate_inputs_format(interval)

Validate the format of the input time interval.

This function checks whether the input interval has a valid format (start_date/stop_date) and whether the start and stop dates are in a valid ISO 8601 format.

Parameters:

Name Type Description Default
interval str

The time interval to be validated, with the following format: "2024-01-01T00:00:00Z/2024-01-02T23:59:59Z"

required

Returns:

Type Description
Union[None, datetime]

Tuple[Union[None, datetime], Union[None, datetime]]: A tuple containing: - start_date (datetime): The start date of the interval. - stop_date (datetime): The stop date of the interval.

Union[None, datetime]

Or [None, None] if the provided interval is empty.

Note
  • The input interval should be in the format "start_date/stop_date" (e.g., "2022-01-01T00:00:00Z/2022-01-02T00:00:00Z").
  • This function checks for missing start/stop and validates the ISO 8601 format of start and stop dates.
  • If there is an error, err_code and err_text provide information about the issue.
Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def validate_inputs_format(interval: str) -> Tuple[Union[None, datetime], Union[None, datetime]]:
    """
    Validate the format of the input time interval.

    This function checks whether the input interval has a valid format (start_date/stop_date) and
    whether the start and stop dates are in a valid ISO 8601 format.

    Args:
        interval (str): The time interval to be validated, with the following format:
            "2024-01-01T00:00:00Z/2024-01-02T23:59:59Z"

    Returns:
        Tuple[Union[None, datetime], Union[None, datetime]]:
            A tuple containing:
            - start_date (datetime): The start date of the interval.
            - stop_date (datetime): The stop date of the interval.
        Or [None, None] if the provided interval is empty.

    Note:
        - The input interval should be in the format "start_date/stop_date"
        (e.g., "2022-01-01T00:00:00Z/2022-01-02T00:00:00Z").
        - This function checks for missing start/stop and validates the ISO 8601 format of start and stop dates.
        - If there is an error, err_code and err_text provide information about the issue.
    """
    if not interval:
        return None, None
    try:
        start_date, stop_date = interval.split("/")
    except ValueError as exc:
        logger.error("Missing start or stop in endpoint call!")
        raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Missing start/stop") from exc
    if (not is_valid_date_format(start_date)) or (not is_valid_date_format(stop_date)):
        logger.error("Invalid start/stop in endpoint call!")
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Missing start/stop")

    return datetime.fromisoformat(start_date), datetime.fromisoformat(stop_date)

write_search_products_to_db(db_handler_class, products)

Processes a list of products by adding them to the database if not already present.

This function iterates over a list of products. For each product, it checks whether the product is already registered in the database. If the product is not in the database, it is added with its relevant details. The function collects a list of product IDs and names for further processing.

Parameters:

Name Type Description Default
db_handler_class DownloadStatus

The database handler class used for database operations.

required
products List[Product]

A list of product objects to be processed.

required

Returns:

Name Type Description
products List[Tuple[str, str]]

A list of tuples, each containing the 'id' and 'Name' properties of a product.

Raises:

Type Description
OperationalError

If there's an issue connecting to the database.

Notes: The function assumes that 'products' is a list of objects with a 'properties' attribute, which is a dictionary containing keys 'id', 'Name', and 'startTimeFromAscendingNode'.

'get_db' is a context manager that provides a database session.

'EDownloadStatus' is an enumeration representing download status.

Source code in docs/rs-server/services/common/rs_server_common/utils/utils.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def write_search_products_to_db(db_handler_class: DownloadStatus, products: EOProduct) -> None:
    """
    Processes a list of products by adding them to the database if not already present.

    This function iterates over a list of products. For each product, it checks whether the product
    is already registered in the database. If the product is not in the database, it is added with
    its relevant details. The function collects a list of product IDs and names for further processing.

    Args:
        db_handler_class (DownloadStatus): The database handler class used for database operations.
        products (List[Product]): A list of product objects to be processed.

    Returns:
        products (List[Tuple[str, str]]): A list of tuples, each containing the 'id' and 'Name' properties of a product.

    Raises:
        sqlalchemy.exc.OperationalError: If there's an issue connecting to the database.

    Notes:
    The function assumes that 'products' is a list of objects with a 'properties' attribute,
    which is a dictionary containing keys 'id', 'Name', and 'startTimeFromAscendingNode'.

    'get_db' is a context manager that provides a database session.

    'EDownloadStatus' is an enumeration representing download status.
    """
    with contextmanager(get_db)() as db:
        try:
            for product in products:
                if db_handler_class.get_if_exists(db, product.properties["Name"]) is not None:
                    logger.info(
                        "Product %s is already registered in database, skipping",
                        product.properties["Name"],
                    )
                    continue

                db_handler_class.create(
                    db,
                    product_id=product.properties["id"],
                    name=product.properties["Name"],
                    available_at_station=datetime.fromisoformat(product.properties["startTimeFromAscendingNode"]),
                    status=EDownloadStatus.NOT_STARTED,
                )

        except sqlalchemy.exc.OperationalError:
            logger.error("Failed to connect with DB during listing procedure")
            raise

EODAG Provider.

EodagProvider

Bases: Provider

An EODAG provider.

It uses EODAG to provide data from external sources.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class EodagProvider(Provider):
    """An EODAG provider.

    It uses EODAG to provide data from external sources.
    """

    lock = Lock()  # static Lock instance

    def __init__(self, config_file: Path, provider: str):
        """Create a EODAG provider.

        Args:
            config_file: the path to the eodag configuration file
            provider: the name of the eodag provider
        """
        self.eodag_cfg_dir = tempfile.TemporaryDirectory()  # pylint: disable=consider-using-with
        self.provider: str = provider
        self.config_file = config_file
        self.client: EODataAccessGateway = self.init_eodag_client(config_file)
        self.client.set_preferred_provider(self.provider)

    def __del__(self):
        """Destructor"""
        try:
            shutil.rmtree(self.eodag_cfg_dir.name)  # remove the unique /tmp dir
        except FileNotFoundError:
            pass

    def init_eodag_client(self, config_file: Path) -> EODataAccessGateway:
        """Initialize the eodag client.

        The EODAG client is initialized for the given provider.

        Args:
            config_file: the path to the eodag configuration file

        Returns:
             the initialized eodag client
        """
        try:
            # Use thread-lock
            with EodagProvider.lock:
                os.environ["EODAG_CFG_DIR"] = self.eodag_cfg_dir.name
                return EODataAccessGateway(config_file.as_posix())
        except Exception as e:
            raise CreateProviderFailed(f"Can't initialize {self.provider} provider") from e

    def _specific_search(self, between: TimeRange, **kwargs) -> Union[SearchResult, List]:
        """
        Conducts a search for products within a specified time range.

        This private method interfaces with the client's search functionality,
        retrieving products that fall within the given time range. The 'between'
        parameter is expected to be a TimeRange object, encompassing start and end
        timestamps. The method returns a dictionary of products keyed by their
        respective identifiers.

        Args:
            between (TimeRange): An object representing the start and end timestamps
                                for the search range.

        Returns:
            SearchResult: A dictionary where keys are product identifiers and
                            values are EOProduct instances.

        Note:
            The time format of the 'between' parameter should be verified or formatted
            appropriately before invoking this method. The method also assumes that the
            client's search function is correctly set up to handle the provided time
            range format.

        Raises:
            Exception: If the search encounters an error or fails, an exception is raised.
        """
        mapped_search_args = {}
        sessions_search = kwargs.pop("sessions_search", False)

        session_id = kwargs.pop("id", None)
        if session_id:
            # If request contains session id, map it to eodag parameter accordingly (SessionID for single, Ids for list)
            if isinstance(session_id, list):
                mapped_search_args["SessionIds"] = ", ".join(session_id)
            elif isinstance(session_id, str):
                mapped_search_args["SessionID"] = session_id

        if sessions_search:
            # If request is for session search, handle platform - if any provided.
            platform = kwargs.pop("platform", None)

            # Very annoying, for files odata is **SessionID**, for sessions is **SessionId**
            if "SessionID" in mapped_search_args:
                mapped_search_args["SessionId"] = mapped_search_args.pop("SessionID")
            if platform:
                if isinstance(platform, list):
                    mapped_search_args["platforms"] = ", ".join(platform)
                elif isinstance(platform, str):
                    mapped_search_args["platform"] = platform

        if between:
            # Since now both for files and sessions, time interval is optional, map it if provided.
            mapped_search_args.update(
                {
                    "startTimeFromAscendingNode": str(between.start),
                    "completionTimeFromAscendingNode": str(between.end),
                },
            )

        try:
            # Start search -> user defined search params in mapped_search_args (id), pagination in kwargs (top, limit).
            products, _ = self.client.search(
                **mapped_search_args,  # type: ignore
                provider=self.provider,
                raise_errors=True,
                **kwargs,
            )
        except RequestError:
            # Empty list if something goes wrong in eodag
            return []

        return products

    def download(self, product_id: str, to_file: Path) -> None:
        """Download the expected product at the given local location.

        EODAG needs an EOProduct to download.
        We build an EOProduct from the id and download location
        to be able to call EODAG for download.


        Args:
            product_id: the id of the product to download
            to_file: the path where the product has to be download

        Returns:
            None

        """
        product = self.create_eodag_product(product_id, to_file.name)
        # download_plugin = self.client._plugins_manager.get_download_plugin(product)
        # authent_plugin = self.client._plugins_manager.get_auth_plugin(product.provider)
        # product.register_downloader(download_plugin, authent_plugin)
        self.client.download(product, outputs_prefix=to_file.parent)

    def create_eodag_product(self, product_id: str, filename: str):
        """Initialize an EO product with minimal properties.

        The title is used by EODAG as the name of the downloaded file.
        The download link is used by EODAG as http request url for download.
        The geometry is mandatory in an EO Product so we add the all earth as geometry.

        Args:
            product_id (str): the id of EO Product
            filename (str): the name of the downloaded file

        Returns:
            product (EOProduct): the initialized EO Product

        """
        try:
            with open(self.config_file, "r", encoding="utf-8") as f:
                base_uri = yaml.safe_load(f)[self.provider.lower()]["download"]["base_uri"]
            return EOProduct(
                self.provider,
                {
                    "id": product_id,
                    "title": filename,
                    "geometry": "POLYGON((180 -90, 180 90, -180 90, -180 -90, 180 -90))",
                    # TODO build from configuration (but how ?)
                    "downloadLink": f"{base_uri}({product_id})/$value",
                },
            )
        except Exception as e:
            raise CreateProviderFailed(f"Can't initialize {self.provider} download provider") from e

__del__()

Destructor

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
52
53
54
55
56
57
def __del__(self):
    """Destructor"""
    try:
        shutil.rmtree(self.eodag_cfg_dir.name)  # remove the unique /tmp dir
    except FileNotFoundError:
        pass

__init__(config_file, provider)

Create a EODAG provider.

Parameters:

Name Type Description Default
config_file Path

the path to the eodag configuration file

required
provider str

the name of the eodag provider

required
Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
39
40
41
42
43
44
45
46
47
48
49
50
def __init__(self, config_file: Path, provider: str):
    """Create a EODAG provider.

    Args:
        config_file: the path to the eodag configuration file
        provider: the name of the eodag provider
    """
    self.eodag_cfg_dir = tempfile.TemporaryDirectory()  # pylint: disable=consider-using-with
    self.provider: str = provider
    self.config_file = config_file
    self.client: EODataAccessGateway = self.init_eodag_client(config_file)
    self.client.set_preferred_provider(self.provider)

create_eodag_product(product_id, filename)

Initialize an EO product with minimal properties.

The title is used by EODAG as the name of the downloaded file. The download link is used by EODAG as http request url for download. The geometry is mandatory in an EO Product so we add the all earth as geometry.

Parameters:

Name Type Description Default
product_id str

the id of EO Product

required
filename str

the name of the downloaded file

required

Returns:

Name Type Description
product EOProduct

the initialized EO Product

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def create_eodag_product(self, product_id: str, filename: str):
    """Initialize an EO product with minimal properties.

    The title is used by EODAG as the name of the downloaded file.
    The download link is used by EODAG as http request url for download.
    The geometry is mandatory in an EO Product so we add the all earth as geometry.

    Args:
        product_id (str): the id of EO Product
        filename (str): the name of the downloaded file

    Returns:
        product (EOProduct): the initialized EO Product

    """
    try:
        with open(self.config_file, "r", encoding="utf-8") as f:
            base_uri = yaml.safe_load(f)[self.provider.lower()]["download"]["base_uri"]
        return EOProduct(
            self.provider,
            {
                "id": product_id,
                "title": filename,
                "geometry": "POLYGON((180 -90, 180 90, -180 90, -180 -90, 180 -90))",
                # TODO build from configuration (but how ?)
                "downloadLink": f"{base_uri}({product_id})/$value",
            },
        )
    except Exception as e:
        raise CreateProviderFailed(f"Can't initialize {self.provider} download provider") from e

download(product_id, to_file)

Download the expected product at the given local location.

EODAG needs an EOProduct to download. We build an EOProduct from the id and download location to be able to call EODAG for download.

Parameters:

Name Type Description Default
product_id str

the id of the product to download

required
to_file Path

the path where the product has to be download

required

Returns:

Type Description
None

None

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def download(self, product_id: str, to_file: Path) -> None:
    """Download the expected product at the given local location.

    EODAG needs an EOProduct to download.
    We build an EOProduct from the id and download location
    to be able to call EODAG for download.


    Args:
        product_id: the id of the product to download
        to_file: the path where the product has to be download

    Returns:
        None

    """
    product = self.create_eodag_product(product_id, to_file.name)
    # download_plugin = self.client._plugins_manager.get_download_plugin(product)
    # authent_plugin = self.client._plugins_manager.get_auth_plugin(product.provider)
    # product.register_downloader(download_plugin, authent_plugin)
    self.client.download(product, outputs_prefix=to_file.parent)

init_eodag_client(config_file)

Initialize the eodag client.

The EODAG client is initialized for the given provider.

Parameters:

Name Type Description Default
config_file Path

the path to the eodag configuration file

required

Returns:

Type Description
EODataAccessGateway

the initialized eodag client

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def init_eodag_client(self, config_file: Path) -> EODataAccessGateway:
    """Initialize the eodag client.

    The EODAG client is initialized for the given provider.

    Args:
        config_file: the path to the eodag configuration file

    Returns:
         the initialized eodag client
    """
    try:
        # Use thread-lock
        with EodagProvider.lock:
            os.environ["EODAG_CFG_DIR"] = self.eodag_cfg_dir.name
            return EODataAccessGateway(config_file.as_posix())
    except Exception as e:
        raise CreateProviderFailed(f"Can't initialize {self.provider} provider") from e

Provider mechanism.

CreateProviderFailed

Bases: Exception

Exception raised when an error occurred during the init of a provider.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
53
54
class CreateProviderFailed(Exception):
    """Exception raised when an error occurred during the init of a provider."""

DownloadProductFailed

Bases: Exception

Exception raised when an error occurred during the download.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
61
62
class DownloadProductFailed(Exception):
    """Exception raised when an error occurred during the download."""

Product dataclass

A product.

A product has an external identifier and a dictionary of metadata.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
42
43
44
45
46
47
48
49
50
@dataclass
class Product:
    """A product.

    A product has an external identifier and a dictionary of metadata.
    """

    id_: str
    metadata: dict[str, str]

Provider

Bases: ABC

A product provider.

A provider gives a common interface to search for files from an external data source and download them locally.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class Provider(ABC):
    """A product provider.

    A provider gives a common interface to search for files from an external data source
    and download them locally.
    """

    def search(self, between: TimeRange, **kwargs) -> Any:
        """Search for products with the given time range.

        The search result is a dictionary of products found indexed by id.

        Args:
            between: the search period

        Returns:
            The files found indexed by file id. Specific to each provider.

        """
        if between:
            if between.duration() == timedelta(0):
                return []
            if between.duration() < timedelta(0):
                raise SearchProductFailed(f"Search timerange is inverted : ({between.start} -> {between.end})")
        return self._specific_search(between, **kwargs)

    @abstractmethod
    def _specific_search(self, between: TimeRange) -> Any:
        """Search for products with the given time range.

        Specific search for products after common verification.

        Args:
            between: the search period

        Returns:
            the files found indexed by file id.

        """

    @abstractmethod
    def download(self, product_id: str, to_file: Path) -> None:
        """Download the given product to the given local path.

        Args:
            product_id: id of the product to download
            to_file: path where the file should be downloaded

        Returns:
            None

        """

download(product_id, to_file) abstractmethod

Download the given product to the given local path.

Parameters:

Name Type Description Default
product_id str

id of the product to download

required
to_file Path

path where the file should be downloaded

required

Returns:

Type Description
None

None

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
105
106
107
108
109
110
111
112
113
114
115
116
@abstractmethod
def download(self, product_id: str, to_file: Path) -> None:
    """Download the given product to the given local path.

    Args:
        product_id: id of the product to download
        to_file: path where the file should be downloaded

    Returns:
        None

    """

search(between, **kwargs)

Search for products with the given time range.

The search result is a dictionary of products found indexed by id.

Parameters:

Name Type Description Default
between TimeRange

the search period

required

Returns:

Type Description
Any

The files found indexed by file id. Specific to each provider.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def search(self, between: TimeRange, **kwargs) -> Any:
    """Search for products with the given time range.

    The search result is a dictionary of products found indexed by id.

    Args:
        between: the search period

    Returns:
        The files found indexed by file id. Specific to each provider.

    """
    if between:
        if between.duration() == timedelta(0):
            return []
        if between.duration() < timedelta(0):
            raise SearchProductFailed(f"Search timerange is inverted : ({between.start} -> {between.end})")
    return self._specific_search(between, **kwargs)

SearchProductFailed

Bases: Exception

Exception raised when an error occurred during the search.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
57
58
class SearchProductFailed(Exception):
    """Exception raised when an error occurred during the search."""

TimeRange dataclass

A time range.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@dataclass
class TimeRange:
    """A time range."""

    start: datetime
    end: datetime

    def duration(self) -> timedelta:
        """Duration of the timerange.

        Returns: duration of the timerange
        """
        return self.end - self.start

    def __bool__(self) -> bool:
        return self.start is not None and self.end is not None

duration()

Duration of the timerange.

Returns: duration of the timerange

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/provider.py
31
32
33
34
35
36
def duration(self) -> timedelta:
    """Duration of the timerange.

    Returns: duration of the timerange
    """
    return self.end - self.start

TODO Docstring to be added.

GetKeysFromS3Config dataclass

S3 configuration for download

Attributes:

Name Type Description
s3_files list

A list with the S3 object keys to be downloaded.

bucket str

The S3 bucket name.

local_prefix str

The local prefix where files will be downloaded.

overwrite bool

Flag indicating whether to overwrite existing files. Default is False.

max_retries int

The maximum number of download retries. Default is DWN_S3FILE_RETRIES.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@dataclass
class GetKeysFromS3Config:
    """S3 configuration for download

    Attributes:
        s3_files (list): A list with the  S3 object keys to be downloaded.
        bucket (str): The S3 bucket name.
        local_prefix (str): The local prefix where files will be downloaded.
        overwrite (bool, optional): Flag indicating whether to overwrite existing files. Default is False.
        max_retries (int, optional): The maximum number of download retries. Default is DWN_S3FILE_RETRIES.

    """

    s3_files: list
    bucket: str
    local_prefix: str
    overwrite: bool = False
    max_retries: int = DWN_S3FILE_RETRIES

PutFilesToS3Config dataclass

Configuration for uploading files to S3.

Attributes:

Name Type Description
files List

A list with the local file paths to be uploaded.

bucket str

The S3 bucket name.

s3_path str

The S3 path where files will be uploaded.

max_retries int

The maximum number of upload retries. Default is UP_S3FILE_RETRIES.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@dataclass
class PutFilesToS3Config:
    """Configuration for uploading files to S3.

    Attributes:
        files (List): A list with the local file paths to be uploaded.
        bucket (str): The S3 bucket name.
        s3_path (str): The S3 path where files will be uploaded.
        max_retries (int, optional): The maximum number of upload retries. Default is UP_S3FILE_RETRIES.

    """

    files: list
    bucket: str
    s3_path: str
    max_retries: int = UP_S3FILE_RETRIES

S3StorageHandler

Interacts with an S3 storage

S3StorageHandler for interacting with an S3 storage service.

Attributes:

Name Type Description
access_key_id str

The access key ID for S3 authentication.

secret_access_key str

The secret access key for S3 authentication.

endpoint_url str

The endpoint URL for the S3 service.

region_name str

The region name.

s3_client client

The s3 client to interact with the s3 storage

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
class S3StorageHandler:
    """Interacts with an S3 storage

    S3StorageHandler for interacting with an S3 storage service.

    Attributes:
        access_key_id (str): The access key ID for S3 authentication.
        secret_access_key (str): The secret access key for S3 authentication.
        endpoint_url (str): The endpoint URL for the S3 service.
        region_name (str): The region name.
        s3_client (boto3.client): The s3 client to interact with the s3 storage
    """

    def __init__(self, access_key_id, secret_access_key, endpoint_url, region_name):
        """Initialize the S3StorageHandler instance.

        Args:
            access_key_id (str): The access key ID for S3 authentication.
            secret_access_key (str): The secret access key for S3 authentication.
            endpoint_url (str): The endpoint URL for the S3 service.
            region_name (str): The region name.

        Raises:
            RuntimeError: If the connection to the S3 storage cannot be established.
        """
        self.logger = Logging.default(__name__)

        self.access_key_id = access_key_id
        self.secret_access_key = secret_access_key
        self.endpoint_url = endpoint_url
        self.region_name = region_name
        self.s3_client: boto3.client = None
        self.connect_s3()
        self.logger.debug("S3StorageHandler created !")

    def __get_s3_client(self, access_key_id, secret_access_key, endpoint_url, region_name):
        """Retrieve or create an S3 client instance.

        Args:
            access_key_id (str): The access key ID for S3 authentication.
            secret_access_key (str): The secret access key for S3 authentication.
            endpoint_url (str): The endpoint URL for the S3 service.
            region_name (str): The region name.

        Returns:
            client (boto3): An S3 client instance.
        """

        client_config = botocore.config.Config(
            max_pool_connections=100,
            # timeout for connection
            connect_timeout=5,
            # attempts in trying connection
            # note:  the default behaviour of boto3 is retrying
            # connections multiple times and exponentially backing off in between
            retries={"total_max_attempts": 5},
        )
        try:
            return boto3.client(
                "s3",
                aws_access_key_id=access_key_id,
                aws_secret_access_key=secret_access_key,
                endpoint_url=endpoint_url,
                region_name=region_name,
                config=client_config,
            )

        except Exception as e:
            self.logger.exception(f"Client error exception: {e}")
            raise RuntimeError("Client error exception ") from e

    def connect_s3(self):
        """Establish a connection to the S3 service.

        If the S3 client is not already instantiated, this method calls the private __get_s3_client
        method to create an S3 client instance using the provided credentials and configuration (see __init__).
        """
        if self.s3_client is None:
            self.s3_client = self.__get_s3_client(
                self.access_key_id,
                self.secret_access_key,
                self.endpoint_url,
                self.region_name,
            )

    def disconnect_s3(self):
        """Close the connection to the S3 service."""
        if self.s3_client is None:
            return
        self.s3_client.close()
        self.s3_client = None

    def delete_file_from_s3(self, bucket, s3_obj):
        """Delete a file from S3.

        Args:
            bucket (str): The S3 bucket name.
            s3_obj (str): The S3 object key.

        Raises:
            RuntimeError: If an error occurs during the bucket access check.
        """
        if self.s3_client is None or bucket is None or s3_obj is None:
            raise RuntimeError("Input error for deleting the file")
        try:
            self.logger.info("Delete key s3://%s/%s", bucket, s3_obj)
            self.s3_client.delete_object(Bucket=bucket, Key=s3_obj)
        except botocore.client.ClientError as e:
            self.logger.exception(f"Failed to delete key s3://{bucket}/{s3_obj}: {e}")
            raise RuntimeError(f"Failed to delete key s3://{bucket}/{s3_obj}") from e
        except Exception as e:
            self.logger.exception(f"Failed to delete key s3://{bucket}/{s3_obj}: {e}")
            raise RuntimeError(f"Failed to delete key s3://{bucket}/{s3_obj}") from e

    # helper functions

    @staticmethod
    def get_secrets_from_file(secrets, secret_file):
        """Read secrets from a specified file.

        It reads the secrets from .s3cfg or aws credentials files
        This function should not be used in production

        Args:
            secrets (dict): Dictionary to store retrieved secrets.
            secret_file (str): Path to the file containing secrets.
        """
        dict_filled = 0
        with open(secret_file, "r", encoding="utf-8") as aws_credentials_file:
            lines = aws_credentials_file.readlines()
            for line in lines:
                if not secrets["s3endpoint"] and "host_bucket" in line:
                    dict_filled += 1
                    secrets["s3endpoint"] = line.strip().split("=")[1].strip()
                elif not secrets["accesskey"] and "access_key" in line:
                    dict_filled += 1
                    secrets["accesskey"] = line.strip().split("=")[1].strip()
                elif not secrets["secretkey"] and "secret_" in line and "_key" in line:
                    dict_filled += 1
                    secrets["secretkey"] = line.strip().split("=")[1].strip()
                if dict_filled == 3:
                    break

    @staticmethod
    def get_basename(input_path):
        """Get the filename from a full path.

        Args:
            input_path (str): The full path.

        Returns:
            filename (str): The filename.
        """
        path, filename = ntpath.split(input_path)
        return filename or ntpath.basename(path)

    @staticmethod
    def s3_path_parser(s3_url):
        """
        Parses S3 URL to extract bucket, prefix, and file.

        Args:
            s3_url (str): The S3 URL.

        Returns:
            (bucket, prefix, s3_file) (tuple): Tuple containing bucket, prefix, and file.
        """
        s3_data = s3_url.replace("s3://", "").split("/")
        bucket = ""
        start_idx = 0
        if s3_url.startswith("s3://"):
            bucket = s3_data[0]

            start_idx = 1
        prefix = ""
        if start_idx < len(s3_data):
            prefix = "/".join(s3_data[start_idx:-1])
        s3_file = s3_data[-1]
        return bucket, prefix, s3_file

    def files_to_be_downloaded(self, bucket, paths):
        """Create a list with the S3 keys to be downloaded.

        The list will have the s3 keys to be downloaded from the bucket.
        It contains pairs (local_prefix_where_the_file_will_be_downloaded, full_s3_key_path)
        If a s3 key doesn't exist, the pair will be (None, requested_s3_key_path)

        Args:
            bucket (str): The S3 bucket name.
            paths (list): List of S3 object keys.

        Returns:
            list_with_files (list): List of tuples (local_prefix, full_s3_key_path).
        """
        # declaration of the list
        list_with_files: List[Any] = []
        # for each key, identify it as a file or a folder
        # in the case of a folder, the files will be recursively gathered
        for key in paths:
            path = key.strip().lstrip("/")
            s3_files = self.list_s3_files_obj(bucket, path)
            if len(s3_files) == 0:
                self.logger.warning("No key %s found.", path)
                list_with_files.append((None, path))
                continue
            self.logger.debug("total: %s | s3_files = %s", len(s3_files), s3_files)
            basename_part = self.get_basename(path)

            # check if it's a file or a dir
            if len(s3_files) == 1 and path == s3_files[0]:
                # the current key is a file, append it to the list
                list_with_files.append(("", s3_files[0]))
                self.logger.debug("Append files: list_with_files = %s", list_with_files)
            else:
                # the current key is a folder, append all its files (reursively gathered) to the list
                for s3_file in s3_files:
                    split = s3_file.split("/")
                    split_idx = split.index(basename_part)
                    list_with_files.append((os.path.join(*split[split_idx:-1]), s3_file.strip("/")))

        return list_with_files

    def files_to_be_uploaded(self, paths):
        """Creates a list with the local files to be uploaded.

        The list contains pairs (s3_path, absolute_local_file_path)
        If the local file doesn't exist, the pair will be (None, requested_file_to_upload)

        Args:
            paths (list): List of local file paths.

        Returns:
            list_with_files (list): List of tuples (s3_path, absolute_local_file_path).
        """

        list_with_files = []
        for local in paths:
            path = local.strip()
            # check if it is a file
            self.logger.debug("path = %s", path)
            if os.path.isfile(path):
                self.logger.debug("Add %s", path)
                list_with_files.append(("", path))

            elif os.path.isdir(path):
                for root, dir_names, filenames in os.walk(path):
                    for file in filenames:
                        full_file_path = os.path.join(root, file.strip("/"))
                        self.logger.debug("full_file_path = %s | dir_names = %s", full_file_path, dir_names)
                        if not os.path.isfile(full_file_path):
                            continue
                        self.logger.debug(
                            "get_basename(path) = %s | root = %s | replace = %s",
                            self.get_basename(path),
                            root,
                            root.replace(path, ""),
                        )

                        keep_path = os.path.join(self.get_basename(path), root.replace(path, "").strip("/")).strip("/")
                        self.logger.debug("path = %s | keep_path = %s | root = %s", path, keep_path, root)

                        self.logger.debug("Add: %s | %s", keep_path, full_file_path)
                        list_with_files.append((keep_path, full_file_path))
            else:
                self.logger.warning("The path %s is not a directory nor a file, it will not be uploaded", path)

        return list_with_files

    def list_s3_files_obj(self, bucket, prefix):
        """Retrieve the content of an S3 directory.

        Args:
            bucket (str): The S3 bucket name.
            prefix (str): The S3 object key prefix.

        Returns:
            s3_files (list): List containing S3 object keys.
        """

        s3_files = []

        try:
            paginator: Any = self.s3_client.get_paginator("list_objects_v2")
            pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
            for page in pages:
                for item in page.get("Contents", ()):
                    if item is not None:
                        s3_files.append(item["Key"])
        except Exception as error:
            self.logger.exception(f"Exception when trying to list files from s3://{bucket}/{prefix}: {error}")
            raise RuntimeError(f"Listing files from s3://{bucket}/{prefix}") from error

        return s3_files

    def check_bucket_access(self, bucket):
        """Check the accessibility of an S3 bucket.

        Args:
            bucket (str): The S3 bucket name.

        Raises:
            RuntimeError: If an error occurs during the bucket access check.
        """

        try:
            self.connect_s3()
            self.s3_client.head_bucket(Bucket=bucket)
        except botocore.client.ClientError as error:
            # check that it was a 404 vs 403 errors
            # If it was a 404 error, then the bucket does not exist.
            error_code = int(error.response["Error"]["Code"])
            if error_code == S3_ERR_FORBIDDEN_ACCESS:
                self.logger.exception((f"{bucket} is a private bucket. Forbidden access!"))
                raise RuntimeError(f"{bucket} is a private bucket. Forbidden access!") from error
            if error_code == S3_ERR_NOT_FOUND:
                self.logger.exception((f"{bucket} bucket does not exist!"))
                raise RuntimeError(f"{bucket} bucket does not exist!") from error
            self.logger.exception(f"Exception when checking the access to {bucket} bucket: {error}")
            raise RuntimeError(f"Exception when checking the access to {bucket} bucket") from error
        except botocore.exceptions.EndpointConnectionError as error:
            self.logger.exception(f"Could not connect to the endpoint when trying to access {bucket}: {error}")
            raise RuntimeError(f"Could not connect to the endpoint when trying to access {bucket}!") from error
        except Exception as error:
            self.logger.exception(f"General exception when trying to access bucket {bucket}: {error}")
            raise RuntimeError(f"General exception when trying to access bucket {bucket}") from error

    def wait_timeout(self, timeout):
        """
        Wait for a specified timeout duration (minimum 200 ms).

        This function implements a simple timeout mechanism, where it sleeps for 0.2 seconds
        in each iteration until the cumulative sleep time reaches the specified timeout duration.

        Args:
            timeout (float): The total duration to wait in seconds.

        """
        time_cnt = 0.0
        while time_cnt < timeout:
            time.sleep(SLEEP_TIME)
            time_cnt += SLEEP_TIME

    def check_file_overwriting(self, local_file, overwrite):
        """Check if file exists and determine if it should be overwritten.

        Args:
            local_file (str): Path to the local file.
            overwrite (bool): Whether to overwrite the existing file.

        Returns:
            bool (bool): True if the file should be overwritten, False otherwise.

        Note:
        - If the file already exists and the overwrite flag is set to True, the function logs a message,
        deletes the existing file, and returns True.
        - If the file already exists and the overwrite flag is set to False, the function logs a warning
        message, and returns False. In this case, the existing file won't be deleted.
        - If the file doesn't exist, the function returns True.

        """
        if os.path.isfile(local_file):
            if overwrite:  # The file already exists, so delete it first
                self.logger.info(
                    "File %s already exists. Deleting it before downloading",
                    S3StorageHandler.get_basename(local_file),
                )
                os.remove(local_file)
            else:
                self.logger.warning(
                    "File %s already exists. Ignoring (use the overwrite flag if you want to overwrite this file)",
                    S3StorageHandler.get_basename(local_file),
                )
                return False

        return True

    def get_keys_from_s3(self, config: GetKeysFromS3Config) -> list:
        """Download S3 keys specified in the configuration.

        Args:
            config (GetKeysFromS3Config): Configuration for the S3 download.

        Returns:
            List[str]: A list with the S3 keys that couldn't be downloaded.

        Raises:
            Exception: Any unexpected exception raised during the download process.

        The function attempts to download files from S3 according to the provided configuration.
        It returns a list of S3 keys that couldn't be downloaded successfully.

        """

        # check the access to the bucket first, or even if it does exist
        self.check_bucket_access(config.bucket)

        # collection_files: list of files to be downloaded
        #                   the list contains pair objects with the following
        #                   syntax: (local_path_to_be_added_to_the_local_prefix, s3_key)
        #                   the local_path_to_be_added_to_the_local_prefix may be none if the file doesn't exist
        collection_files = self.files_to_be_downloaded(config.bucket, config.s3_files)

        self.logger.debug("collection_files = %s | bucket = %s", collection_files, config.bucket)
        failed_files = []

        for collection_file in collection_files:
            if collection_file[0] is None:
                failed_files.append(collection_file[1])
                continue

            local_path = os.path.join(config.local_prefix, collection_file[0].strip("/"))
            s3_file = collection_file[1]
            # for each file to download, create the local dir (if it does not exist)
            os.makedirs(local_path, exist_ok=True)
            # create the path for local file
            local_file = os.path.join(local_path, self.get_basename(s3_file).strip("/"))

            if not self.check_file_overwriting(local_file, config.overwrite):
                continue
            # download the files
            downloaded = False
            for keep_trying in range(config.max_retries):
                try:
                    self.connect_s3()
                    dwn_start = datetime.now()
                    self.s3_client.download_file(config.bucket, s3_file, local_file)
                    self.logger.debug(
                        "s3://%s/%s downloaded to %s in %s ms",
                        config.bucket,
                        s3_file,
                        local_file,
                        datetime.now() - dwn_start,
                    )
                    downloaded = True
                    break
                except (botocore.client.ClientError, botocore.exceptions.EndpointConnectionError) as error:
                    self.logger.exception(
                        "Error when downloading the file %s. \
Exception: %s. Retrying in %s seconds for %s more times",
                        s3_file,
                        error,
                        DWN_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.disconnect_s3()
                    self.wait_timeout(DWN_S3FILE_RETRY_TIMEOUT)
                except RuntimeError:
                    self.logger.exception(
                        "Error when downloading the file %s. \
Couldn't get the s3 client. Retrying in %s seconds for %s more times",
                        s3_file,
                        DWN_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.wait_timeout(DWN_S3FILE_RETRY_TIMEOUT)

            if not downloaded:
                self.logger.error(
                    "Could not download the file %s. The download was \
retried for %s times. Aborting",
                    s3_file,
                    config.max_retries,
                )
                failed_files.append(s3_file)

        return failed_files

    def put_files_to_s3(self, config: PutFilesToS3Config) -> list:
        """Upload files to S3 according to the provided configuration.

        Args:
            config (PutFilesToS3Config): Configuration for the S3 upload.

        Returns:
            List[str]: A list with the local file paths that couldn't be uploaded.

        Raises:
            Exception: Any unexpected exception raised during the upload process.

        The function attempts to upload files to S3 according to the provided configuration.
        It returns a list of local files that couldn't be uploaded successfully.

        """

        # check the access to the bucket first, or even if it does exist
        self.check_bucket_access(config.bucket)

        collection_files = self.files_to_be_uploaded(config.files)
        failed_files = []

        for collection_file in collection_files:
            if collection_file[0] is None:
                self.logger.error("The file %s can't be uploaded, its s3 prefix is None", collection_file[0])
                failed_files.append(collection_file[1])
                continue

            file_to_be_uploaded = collection_file[1]
            # create the s3 key
            s3_obj = os.path.join(config.s3_path, collection_file[0], os.path.basename(file_to_be_uploaded).strip("/"))
            uploaded = False
            for keep_trying in range(config.max_retries):
                try:
                    # get the s3 client
                    self.connect_s3()
                    self.logger.info(
                        "Upload file %s to s3://%s/%s",
                        file_to_be_uploaded,
                        config.bucket,
                        s3_obj.lstrip("/"),
                    )

                    self.s3_client.upload_file(file_to_be_uploaded, config.bucket, s3_obj)
                    uploaded = True
                    break
                except (
                    botocore.client.ClientError,
                    botocore.exceptions.EndpointConnectionError,
                    boto3.exceptions.S3UploadFailedError,
                ) as error:
                    self.logger.exception(
                        "Error when uploading the file %s. \
Exception: %s. Retrying in %s seconds for %s more times",
                        file_to_be_uploaded,
                        error,
                        UP_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.disconnect_s3()
                    self.wait_timeout(UP_S3FILE_RETRY_TIMEOUT)
                except RuntimeError:
                    self.logger.exception(
                        "Error when uploading the file %s. \
Couldn't get the s3 client. Retrying in %s seconds for %s more times",
                        file_to_be_uploaded,
                        UP_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.wait_timeout(UP_S3FILE_RETRY_TIMEOUT)

            if not uploaded:
                self.logger.error(
                    "Could not upload the file %s. The upload was \
retried for %s times. Aborting",
                    file_to_be_uploaded,
                    config.max_retries,
                )
                failed_files.append(file_to_be_uploaded)

        return failed_files

    def transfer_from_s3_to_s3(self, config: TransferFromS3ToS3Config) -> list:
        """Copy S3 keys specified in the configuration.
        Args:
            config (TransferFromS3ToS3Config): Configuration object containing bucket source, bucket destination,
                      S3 files, maximum retries.

        Returns:
            list: A list of S3 keys that failed to be copied.

        Raises:
            Exception: Any unexpected exception raised during the upload process.
        """
        # check the access to both buckets first, or even if they do exist
        self.check_bucket_access(config.bucket_src)
        self.check_bucket_access(config.bucket_dst)

        # collection_files: list of files to be downloaded
        #                   the list contains pair objects with the following
        #                   syntax: (local_path_to_be_added_to_the_local_prefix, s3_key)

        collection_files = self.files_to_be_downloaded(config.bucket_src, config.s3_files)

        self.logger.debug("collection_files = %s | bucket = %s", collection_files, config.bucket_src)
        failed_files = []
        copy_src = {"Bucket": config.bucket_src, "Key": ""}

        for collection_file in collection_files:
            if collection_file[0] is None:
                failed_files.append(collection_file[1])
                continue

            copied = False
            for keep_trying in range(config.max_retries):
                self.logger.debug(
                    "keep_trying %s | range(config.max_retries) %s ",
                    keep_trying,
                    range(config.max_retries),
                )
                try:
                    self.connect_s3()
                    dwn_start = datetime.now()
                    copy_src["Key"] = collection_file[1]
                    self.logger.debug("copy_src = %s", copy_src)
                    self.s3_client.copy_object(CopySource=copy_src, Bucket=config.bucket_dst, Key=collection_file[1])
                    self.logger.debug(
                        "s3://%s/%s copied to s3://%s/%s in %s ms",
                        config.bucket_src,
                        collection_file[1],
                        config.bucket_dst,
                        collection_file[1],
                        datetime.now() - dwn_start,
                    )
                    if not config.copy_only:
                        self.delete_file_from_s3(config.bucket_src, collection_file[1])
                        self.logger.debug("Key deleted s3://%s/%s", config.bucket_src, collection_file[1])
                    copied = True
                    break
                except (botocore.client.ClientError, botocore.exceptions.EndpointConnectionError) as error:
                    self.logger.exception(
                        "Error when copying the file s3://%s/%s to s3://%s. \
Exception: %s. Retrying in %s seconds for %s more times",
                        config.bucket_src,
                        collection_file[1],
                        config.bucket_dst,
                        error,
                        DWN_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.disconnect_s3()
                    self.wait_timeout(DWN_S3FILE_RETRY_TIMEOUT)
                except RuntimeError:
                    self.logger.exception(
                        "Error when copying the file s3://%s/%s to s3://%s. \
Couldn't get the s3 client. Retrying in %s seconds for %s more times",
                        config.bucket_src,
                        collection_file[1],
                        config.bucket_dst,
                        DWN_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.wait_timeout(DWN_S3FILE_RETRY_TIMEOUT)

            if not copied:
                self.logger.error(
                    "Could not copy the file s3://%s/%s to s3://%s. The copy was \
retried for %s times. Aborting",
                    config.bucket_src,
                    collection_file[1],
                    config.bucket_dst,
                    config.max_retries,
                )
                failed_files.append(collection_file[1])

        return failed_files

__get_s3_client(access_key_id, secret_access_key, endpoint_url, region_name)

Retrieve or create an S3 client instance.

Parameters:

Name Type Description Default
access_key_id str

The access key ID for S3 authentication.

required
secret_access_key str

The secret access key for S3 authentication.

required
endpoint_url str

The endpoint URL for the S3 service.

required
region_name str

The region name.

required

Returns:

Name Type Description
client boto3

An S3 client instance.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def __get_s3_client(self, access_key_id, secret_access_key, endpoint_url, region_name):
    """Retrieve or create an S3 client instance.

    Args:
        access_key_id (str): The access key ID for S3 authentication.
        secret_access_key (str): The secret access key for S3 authentication.
        endpoint_url (str): The endpoint URL for the S3 service.
        region_name (str): The region name.

    Returns:
        client (boto3): An S3 client instance.
    """

    client_config = botocore.config.Config(
        max_pool_connections=100,
        # timeout for connection
        connect_timeout=5,
        # attempts in trying connection
        # note:  the default behaviour of boto3 is retrying
        # connections multiple times and exponentially backing off in between
        retries={"total_max_attempts": 5},
    )
    try:
        return boto3.client(
            "s3",
            aws_access_key_id=access_key_id,
            aws_secret_access_key=secret_access_key,
            endpoint_url=endpoint_url,
            region_name=region_name,
            config=client_config,
        )

    except Exception as e:
        self.logger.exception(f"Client error exception: {e}")
        raise RuntimeError("Client error exception ") from e

__init__(access_key_id, secret_access_key, endpoint_url, region_name)

Initialize the S3StorageHandler instance.

Parameters:

Name Type Description Default
access_key_id str

The access key ID for S3 authentication.

required
secret_access_key str

The secret access key for S3 authentication.

required
endpoint_url str

The endpoint URL for the S3 service.

required
region_name str

The region name.

required

Raises:

Type Description
RuntimeError

If the connection to the S3 storage cannot be established.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def __init__(self, access_key_id, secret_access_key, endpoint_url, region_name):
    """Initialize the S3StorageHandler instance.

    Args:
        access_key_id (str): The access key ID for S3 authentication.
        secret_access_key (str): The secret access key for S3 authentication.
        endpoint_url (str): The endpoint URL for the S3 service.
        region_name (str): The region name.

    Raises:
        RuntimeError: If the connection to the S3 storage cannot be established.
    """
    self.logger = Logging.default(__name__)

    self.access_key_id = access_key_id
    self.secret_access_key = secret_access_key
    self.endpoint_url = endpoint_url
    self.region_name = region_name
    self.s3_client: boto3.client = None
    self.connect_s3()
    self.logger.debug("S3StorageHandler created !")

check_bucket_access(bucket)

Check the accessibility of an S3 bucket.

Parameters:

Name Type Description Default
bucket str

The S3 bucket name.

required

Raises:

Type Description
RuntimeError

If an error occurs during the bucket access check.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def check_bucket_access(self, bucket):
    """Check the accessibility of an S3 bucket.

    Args:
        bucket (str): The S3 bucket name.

    Raises:
        RuntimeError: If an error occurs during the bucket access check.
    """

    try:
        self.connect_s3()
        self.s3_client.head_bucket(Bucket=bucket)
    except botocore.client.ClientError as error:
        # check that it was a 404 vs 403 errors
        # If it was a 404 error, then the bucket does not exist.
        error_code = int(error.response["Error"]["Code"])
        if error_code == S3_ERR_FORBIDDEN_ACCESS:
            self.logger.exception((f"{bucket} is a private bucket. Forbidden access!"))
            raise RuntimeError(f"{bucket} is a private bucket. Forbidden access!") from error
        if error_code == S3_ERR_NOT_FOUND:
            self.logger.exception((f"{bucket} bucket does not exist!"))
            raise RuntimeError(f"{bucket} bucket does not exist!") from error
        self.logger.exception(f"Exception when checking the access to {bucket} bucket: {error}")
        raise RuntimeError(f"Exception when checking the access to {bucket} bucket") from error
    except botocore.exceptions.EndpointConnectionError as error:
        self.logger.exception(f"Could not connect to the endpoint when trying to access {bucket}: {error}")
        raise RuntimeError(f"Could not connect to the endpoint when trying to access {bucket}!") from error
    except Exception as error:
        self.logger.exception(f"General exception when trying to access bucket {bucket}: {error}")
        raise RuntimeError(f"General exception when trying to access bucket {bucket}") from error

check_file_overwriting(local_file, overwrite)

Check if file exists and determine if it should be overwritten.

Parameters:

Name Type Description Default
local_file str

Path to the local file.

required
overwrite bool

Whether to overwrite the existing file.

required

Returns:

Name Type Description
bool bool

True if the file should be overwritten, False otherwise.

Note: - If the file already exists and the overwrite flag is set to True, the function logs a message, deletes the existing file, and returns True. - If the file already exists and the overwrite flag is set to False, the function logs a warning message, and returns False. In this case, the existing file won't be deleted. - If the file doesn't exist, the function returns True.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def check_file_overwriting(self, local_file, overwrite):
    """Check if file exists and determine if it should be overwritten.

    Args:
        local_file (str): Path to the local file.
        overwrite (bool): Whether to overwrite the existing file.

    Returns:
        bool (bool): True if the file should be overwritten, False otherwise.

    Note:
    - If the file already exists and the overwrite flag is set to True, the function logs a message,
    deletes the existing file, and returns True.
    - If the file already exists and the overwrite flag is set to False, the function logs a warning
    message, and returns False. In this case, the existing file won't be deleted.
    - If the file doesn't exist, the function returns True.

    """
    if os.path.isfile(local_file):
        if overwrite:  # The file already exists, so delete it first
            self.logger.info(
                "File %s already exists. Deleting it before downloading",
                S3StorageHandler.get_basename(local_file),
            )
            os.remove(local_file)
        else:
            self.logger.warning(
                "File %s already exists. Ignoring (use the overwrite flag if you want to overwrite this file)",
                S3StorageHandler.get_basename(local_file),
            )
            return False

    return True

connect_s3()

Establish a connection to the S3 service.

If the S3 client is not already instantiated, this method calls the private get_s3_client method to create an S3 client instance using the provided credentials and configuration (see __init).

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
167
168
169
170
171
172
173
174
175
176
177
178
179
def connect_s3(self):
    """Establish a connection to the S3 service.

    If the S3 client is not already instantiated, this method calls the private __get_s3_client
    method to create an S3 client instance using the provided credentials and configuration (see __init__).
    """
    if self.s3_client is None:
        self.s3_client = self.__get_s3_client(
            self.access_key_id,
            self.secret_access_key,
            self.endpoint_url,
            self.region_name,
        )

delete_file_from_s3(bucket, s3_obj)

Delete a file from S3.

Parameters:

Name Type Description Default
bucket str

The S3 bucket name.

required
s3_obj str

The S3 object key.

required

Raises:

Type Description
RuntimeError

If an error occurs during the bucket access check.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def delete_file_from_s3(self, bucket, s3_obj):
    """Delete a file from S3.

    Args:
        bucket (str): The S3 bucket name.
        s3_obj (str): The S3 object key.

    Raises:
        RuntimeError: If an error occurs during the bucket access check.
    """
    if self.s3_client is None or bucket is None or s3_obj is None:
        raise RuntimeError("Input error for deleting the file")
    try:
        self.logger.info("Delete key s3://%s/%s", bucket, s3_obj)
        self.s3_client.delete_object(Bucket=bucket, Key=s3_obj)
    except botocore.client.ClientError as e:
        self.logger.exception(f"Failed to delete key s3://{bucket}/{s3_obj}: {e}")
        raise RuntimeError(f"Failed to delete key s3://{bucket}/{s3_obj}") from e
    except Exception as e:
        self.logger.exception(f"Failed to delete key s3://{bucket}/{s3_obj}: {e}")
        raise RuntimeError(f"Failed to delete key s3://{bucket}/{s3_obj}") from e

disconnect_s3()

Close the connection to the S3 service.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
181
182
183
184
185
186
def disconnect_s3(self):
    """Close the connection to the S3 service."""
    if self.s3_client is None:
        return
    self.s3_client.close()
    self.s3_client = None

files_to_be_downloaded(bucket, paths)

Create a list with the S3 keys to be downloaded.

The list will have the s3 keys to be downloaded from the bucket. It contains pairs (local_prefix_where_the_file_will_be_downloaded, full_s3_key_path) If a s3 key doesn't exist, the pair will be (None, requested_s3_key_path)

Parameters:

Name Type Description Default
bucket str

The S3 bucket name.

required
paths list

List of S3 object keys.

required

Returns:

Name Type Description
list_with_files list

List of tuples (local_prefix, full_s3_key_path).

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def files_to_be_downloaded(self, bucket, paths):
    """Create a list with the S3 keys to be downloaded.

    The list will have the s3 keys to be downloaded from the bucket.
    It contains pairs (local_prefix_where_the_file_will_be_downloaded, full_s3_key_path)
    If a s3 key doesn't exist, the pair will be (None, requested_s3_key_path)

    Args:
        bucket (str): The S3 bucket name.
        paths (list): List of S3 object keys.

    Returns:
        list_with_files (list): List of tuples (local_prefix, full_s3_key_path).
    """
    # declaration of the list
    list_with_files: List[Any] = []
    # for each key, identify it as a file or a folder
    # in the case of a folder, the files will be recursively gathered
    for key in paths:
        path = key.strip().lstrip("/")
        s3_files = self.list_s3_files_obj(bucket, path)
        if len(s3_files) == 0:
            self.logger.warning("No key %s found.", path)
            list_with_files.append((None, path))
            continue
        self.logger.debug("total: %s | s3_files = %s", len(s3_files), s3_files)
        basename_part = self.get_basename(path)

        # check if it's a file or a dir
        if len(s3_files) == 1 and path == s3_files[0]:
            # the current key is a file, append it to the list
            list_with_files.append(("", s3_files[0]))
            self.logger.debug("Append files: list_with_files = %s", list_with_files)
        else:
            # the current key is a folder, append all its files (reursively gathered) to the list
            for s3_file in s3_files:
                split = s3_file.split("/")
                split_idx = split.index(basename_part)
                list_with_files.append((os.path.join(*split[split_idx:-1]), s3_file.strip("/")))

    return list_with_files

files_to_be_uploaded(paths)

Creates a list with the local files to be uploaded.

The list contains pairs (s3_path, absolute_local_file_path) If the local file doesn't exist, the pair will be (None, requested_file_to_upload)

Parameters:

Name Type Description Default
paths list

List of local file paths.

required

Returns:

Name Type Description
list_with_files list

List of tuples (s3_path, absolute_local_file_path).

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def files_to_be_uploaded(self, paths):
    """Creates a list with the local files to be uploaded.

    The list contains pairs (s3_path, absolute_local_file_path)
    If the local file doesn't exist, the pair will be (None, requested_file_to_upload)

    Args:
        paths (list): List of local file paths.

    Returns:
        list_with_files (list): List of tuples (s3_path, absolute_local_file_path).
    """

    list_with_files = []
    for local in paths:
        path = local.strip()
        # check if it is a file
        self.logger.debug("path = %s", path)
        if os.path.isfile(path):
            self.logger.debug("Add %s", path)
            list_with_files.append(("", path))

        elif os.path.isdir(path):
            for root, dir_names, filenames in os.walk(path):
                for file in filenames:
                    full_file_path = os.path.join(root, file.strip("/"))
                    self.logger.debug("full_file_path = %s | dir_names = %s", full_file_path, dir_names)
                    if not os.path.isfile(full_file_path):
                        continue
                    self.logger.debug(
                        "get_basename(path) = %s | root = %s | replace = %s",
                        self.get_basename(path),
                        root,
                        root.replace(path, ""),
                    )

                    keep_path = os.path.join(self.get_basename(path), root.replace(path, "").strip("/")).strip("/")
                    self.logger.debug("path = %s | keep_path = %s | root = %s", path, keep_path, root)

                    self.logger.debug("Add: %s | %s", keep_path, full_file_path)
                    list_with_files.append((keep_path, full_file_path))
        else:
            self.logger.warning("The path %s is not a directory nor a file, it will not be uploaded", path)

    return list_with_files

get_basename(input_path) staticmethod

Get the filename from a full path.

Parameters:

Name Type Description Default
input_path str

The full path.

required

Returns:

Name Type Description
filename str

The filename.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
239
240
241
242
243
244
245
246
247
248
249
250
@staticmethod
def get_basename(input_path):
    """Get the filename from a full path.

    Args:
        input_path (str): The full path.

    Returns:
        filename (str): The filename.
    """
    path, filename = ntpath.split(input_path)
    return filename or ntpath.basename(path)

get_keys_from_s3(config)

Download S3 keys specified in the configuration.

Parameters:

Name Type Description Default
config GetKeysFromS3Config

Configuration for the S3 download.

required

Returns:

Type Description
list

List[str]: A list with the S3 keys that couldn't be downloaded.

Raises:

Type Description
Exception

Any unexpected exception raised during the download process.

The function attempts to download files from S3 according to the provided configuration. It returns a list of S3 keys that couldn't be downloaded successfully.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
    def get_keys_from_s3(self, config: GetKeysFromS3Config) -> list:
        """Download S3 keys specified in the configuration.

        Args:
            config (GetKeysFromS3Config): Configuration for the S3 download.

        Returns:
            List[str]: A list with the S3 keys that couldn't be downloaded.

        Raises:
            Exception: Any unexpected exception raised during the download process.

        The function attempts to download files from S3 according to the provided configuration.
        It returns a list of S3 keys that couldn't be downloaded successfully.

        """

        # check the access to the bucket first, or even if it does exist
        self.check_bucket_access(config.bucket)

        # collection_files: list of files to be downloaded
        #                   the list contains pair objects with the following
        #                   syntax: (local_path_to_be_added_to_the_local_prefix, s3_key)
        #                   the local_path_to_be_added_to_the_local_prefix may be none if the file doesn't exist
        collection_files = self.files_to_be_downloaded(config.bucket, config.s3_files)

        self.logger.debug("collection_files = %s | bucket = %s", collection_files, config.bucket)
        failed_files = []

        for collection_file in collection_files:
            if collection_file[0] is None:
                failed_files.append(collection_file[1])
                continue

            local_path = os.path.join(config.local_prefix, collection_file[0].strip("/"))
            s3_file = collection_file[1]
            # for each file to download, create the local dir (if it does not exist)
            os.makedirs(local_path, exist_ok=True)
            # create the path for local file
            local_file = os.path.join(local_path, self.get_basename(s3_file).strip("/"))

            if not self.check_file_overwriting(local_file, config.overwrite):
                continue
            # download the files
            downloaded = False
            for keep_trying in range(config.max_retries):
                try:
                    self.connect_s3()
                    dwn_start = datetime.now()
                    self.s3_client.download_file(config.bucket, s3_file, local_file)
                    self.logger.debug(
                        "s3://%s/%s downloaded to %s in %s ms",
                        config.bucket,
                        s3_file,
                        local_file,
                        datetime.now() - dwn_start,
                    )
                    downloaded = True
                    break
                except (botocore.client.ClientError, botocore.exceptions.EndpointConnectionError) as error:
                    self.logger.exception(
                        "Error when downloading the file %s. \
Exception: %s. Retrying in %s seconds for %s more times",
                        s3_file,
                        error,
                        DWN_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.disconnect_s3()
                    self.wait_timeout(DWN_S3FILE_RETRY_TIMEOUT)
                except RuntimeError:
                    self.logger.exception(
                        "Error when downloading the file %s. \
Couldn't get the s3 client. Retrying in %s seconds for %s more times",
                        s3_file,
                        DWN_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.wait_timeout(DWN_S3FILE_RETRY_TIMEOUT)

            if not downloaded:
                self.logger.error(
                    "Could not download the file %s. The download was \
retried for %s times. Aborting",
                    s3_file,
                    config.max_retries,
                )
                failed_files.append(s3_file)

        return failed_files

get_secrets_from_file(secrets, secret_file) staticmethod

Read secrets from a specified file.

It reads the secrets from .s3cfg or aws credentials files This function should not be used in production

Parameters:

Name Type Description Default
secrets dict

Dictionary to store retrieved secrets.

required
secret_file str

Path to the file containing secrets.

required
Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
@staticmethod
def get_secrets_from_file(secrets, secret_file):
    """Read secrets from a specified file.

    It reads the secrets from .s3cfg or aws credentials files
    This function should not be used in production

    Args:
        secrets (dict): Dictionary to store retrieved secrets.
        secret_file (str): Path to the file containing secrets.
    """
    dict_filled = 0
    with open(secret_file, "r", encoding="utf-8") as aws_credentials_file:
        lines = aws_credentials_file.readlines()
        for line in lines:
            if not secrets["s3endpoint"] and "host_bucket" in line:
                dict_filled += 1
                secrets["s3endpoint"] = line.strip().split("=")[1].strip()
            elif not secrets["accesskey"] and "access_key" in line:
                dict_filled += 1
                secrets["accesskey"] = line.strip().split("=")[1].strip()
            elif not secrets["secretkey"] and "secret_" in line and "_key" in line:
                dict_filled += 1
                secrets["secretkey"] = line.strip().split("=")[1].strip()
            if dict_filled == 3:
                break

list_s3_files_obj(bucket, prefix)

Retrieve the content of an S3 directory.

Parameters:

Name Type Description Default
bucket str

The S3 bucket name.

required
prefix str

The S3 object key prefix.

required

Returns:

Name Type Description
s3_files list

List containing S3 object keys.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
def list_s3_files_obj(self, bucket, prefix):
    """Retrieve the content of an S3 directory.

    Args:
        bucket (str): The S3 bucket name.
        prefix (str): The S3 object key prefix.

    Returns:
        s3_files (list): List containing S3 object keys.
    """

    s3_files = []

    try:
        paginator: Any = self.s3_client.get_paginator("list_objects_v2")
        pages = paginator.paginate(Bucket=bucket, Prefix=prefix)
        for page in pages:
            for item in page.get("Contents", ()):
                if item is not None:
                    s3_files.append(item["Key"])
    except Exception as error:
        self.logger.exception(f"Exception when trying to list files from s3://{bucket}/{prefix}: {error}")
        raise RuntimeError(f"Listing files from s3://{bucket}/{prefix}") from error

    return s3_files

put_files_to_s3(config)

Upload files to S3 according to the provided configuration.

Parameters:

Name Type Description Default
config PutFilesToS3Config

Configuration for the S3 upload.

required

Returns:

Type Description
list

List[str]: A list with the local file paths that couldn't be uploaded.

Raises:

Type Description
Exception

Any unexpected exception raised during the upload process.

The function attempts to upload files to S3 according to the provided configuration. It returns a list of local files that couldn't be uploaded successfully.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
    def put_files_to_s3(self, config: PutFilesToS3Config) -> list:
        """Upload files to S3 according to the provided configuration.

        Args:
            config (PutFilesToS3Config): Configuration for the S3 upload.

        Returns:
            List[str]: A list with the local file paths that couldn't be uploaded.

        Raises:
            Exception: Any unexpected exception raised during the upload process.

        The function attempts to upload files to S3 according to the provided configuration.
        It returns a list of local files that couldn't be uploaded successfully.

        """

        # check the access to the bucket first, or even if it does exist
        self.check_bucket_access(config.bucket)

        collection_files = self.files_to_be_uploaded(config.files)
        failed_files = []

        for collection_file in collection_files:
            if collection_file[0] is None:
                self.logger.error("The file %s can't be uploaded, its s3 prefix is None", collection_file[0])
                failed_files.append(collection_file[1])
                continue

            file_to_be_uploaded = collection_file[1]
            # create the s3 key
            s3_obj = os.path.join(config.s3_path, collection_file[0], os.path.basename(file_to_be_uploaded).strip("/"))
            uploaded = False
            for keep_trying in range(config.max_retries):
                try:
                    # get the s3 client
                    self.connect_s3()
                    self.logger.info(
                        "Upload file %s to s3://%s/%s",
                        file_to_be_uploaded,
                        config.bucket,
                        s3_obj.lstrip("/"),
                    )

                    self.s3_client.upload_file(file_to_be_uploaded, config.bucket, s3_obj)
                    uploaded = True
                    break
                except (
                    botocore.client.ClientError,
                    botocore.exceptions.EndpointConnectionError,
                    boto3.exceptions.S3UploadFailedError,
                ) as error:
                    self.logger.exception(
                        "Error when uploading the file %s. \
Exception: %s. Retrying in %s seconds for %s more times",
                        file_to_be_uploaded,
                        error,
                        UP_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.disconnect_s3()
                    self.wait_timeout(UP_S3FILE_RETRY_TIMEOUT)
                except RuntimeError:
                    self.logger.exception(
                        "Error when uploading the file %s. \
Couldn't get the s3 client. Retrying in %s seconds for %s more times",
                        file_to_be_uploaded,
                        UP_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.wait_timeout(UP_S3FILE_RETRY_TIMEOUT)

            if not uploaded:
                self.logger.error(
                    "Could not upload the file %s. The upload was \
retried for %s times. Aborting",
                    file_to_be_uploaded,
                    config.max_retries,
                )
                failed_files.append(file_to_be_uploaded)

        return failed_files

s3_path_parser(s3_url) staticmethod

Parses S3 URL to extract bucket, prefix, and file.

Parameters:

Name Type Description Default
s3_url str

The S3 URL.

required

Returns:

Type Description
bucket, prefix, s3_file) (tuple

Tuple containing bucket, prefix, and file.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
@staticmethod
def s3_path_parser(s3_url):
    """
    Parses S3 URL to extract bucket, prefix, and file.

    Args:
        s3_url (str): The S3 URL.

    Returns:
        (bucket, prefix, s3_file) (tuple): Tuple containing bucket, prefix, and file.
    """
    s3_data = s3_url.replace("s3://", "").split("/")
    bucket = ""
    start_idx = 0
    if s3_url.startswith("s3://"):
        bucket = s3_data[0]

        start_idx = 1
    prefix = ""
    if start_idx < len(s3_data):
        prefix = "/".join(s3_data[start_idx:-1])
    s3_file = s3_data[-1]
    return bucket, prefix, s3_file

transfer_from_s3_to_s3(config)

Copy S3 keys specified in the configuration. Args: config (TransferFromS3ToS3Config): Configuration object containing bucket source, bucket destination, S3 files, maximum retries.

Returns:

Name Type Description
list list

A list of S3 keys that failed to be copied.

Raises:

Type Description
Exception

Any unexpected exception raised during the upload process.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
    def transfer_from_s3_to_s3(self, config: TransferFromS3ToS3Config) -> list:
        """Copy S3 keys specified in the configuration.
        Args:
            config (TransferFromS3ToS3Config): Configuration object containing bucket source, bucket destination,
                      S3 files, maximum retries.

        Returns:
            list: A list of S3 keys that failed to be copied.

        Raises:
            Exception: Any unexpected exception raised during the upload process.
        """
        # check the access to both buckets first, or even if they do exist
        self.check_bucket_access(config.bucket_src)
        self.check_bucket_access(config.bucket_dst)

        # collection_files: list of files to be downloaded
        #                   the list contains pair objects with the following
        #                   syntax: (local_path_to_be_added_to_the_local_prefix, s3_key)

        collection_files = self.files_to_be_downloaded(config.bucket_src, config.s3_files)

        self.logger.debug("collection_files = %s | bucket = %s", collection_files, config.bucket_src)
        failed_files = []
        copy_src = {"Bucket": config.bucket_src, "Key": ""}

        for collection_file in collection_files:
            if collection_file[0] is None:
                failed_files.append(collection_file[1])
                continue

            copied = False
            for keep_trying in range(config.max_retries):
                self.logger.debug(
                    "keep_trying %s | range(config.max_retries) %s ",
                    keep_trying,
                    range(config.max_retries),
                )
                try:
                    self.connect_s3()
                    dwn_start = datetime.now()
                    copy_src["Key"] = collection_file[1]
                    self.logger.debug("copy_src = %s", copy_src)
                    self.s3_client.copy_object(CopySource=copy_src, Bucket=config.bucket_dst, Key=collection_file[1])
                    self.logger.debug(
                        "s3://%s/%s copied to s3://%s/%s in %s ms",
                        config.bucket_src,
                        collection_file[1],
                        config.bucket_dst,
                        collection_file[1],
                        datetime.now() - dwn_start,
                    )
                    if not config.copy_only:
                        self.delete_file_from_s3(config.bucket_src, collection_file[1])
                        self.logger.debug("Key deleted s3://%s/%s", config.bucket_src, collection_file[1])
                    copied = True
                    break
                except (botocore.client.ClientError, botocore.exceptions.EndpointConnectionError) as error:
                    self.logger.exception(
                        "Error when copying the file s3://%s/%s to s3://%s. \
Exception: %s. Retrying in %s seconds for %s more times",
                        config.bucket_src,
                        collection_file[1],
                        config.bucket_dst,
                        error,
                        DWN_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.disconnect_s3()
                    self.wait_timeout(DWN_S3FILE_RETRY_TIMEOUT)
                except RuntimeError:
                    self.logger.exception(
                        "Error when copying the file s3://%s/%s to s3://%s. \
Couldn't get the s3 client. Retrying in %s seconds for %s more times",
                        config.bucket_src,
                        collection_file[1],
                        config.bucket_dst,
                        DWN_S3FILE_RETRY_TIMEOUT,
                        config.max_retries - keep_trying,
                    )
                    self.wait_timeout(DWN_S3FILE_RETRY_TIMEOUT)

            if not copied:
                self.logger.error(
                    "Could not copy the file s3://%s/%s to s3://%s. The copy was \
retried for %s times. Aborting",
                    config.bucket_src,
                    collection_file[1],
                    config.bucket_dst,
                    config.max_retries,
                )
                failed_files.append(collection_file[1])

        return failed_files

wait_timeout(timeout)

Wait for a specified timeout duration (minimum 200 ms).

This function implements a simple timeout mechanism, where it sleeps for 0.2 seconds in each iteration until the cumulative sleep time reaches the specified timeout duration.

Parameters:

Name Type Description Default
timeout float

The total duration to wait in seconds.

required
Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
def wait_timeout(self, timeout):
    """
    Wait for a specified timeout duration (minimum 200 ms).

    This function implements a simple timeout mechanism, where it sleeps for 0.2 seconds
    in each iteration until the cumulative sleep time reaches the specified timeout duration.

    Args:
        timeout (float): The total duration to wait in seconds.

    """
    time_cnt = 0.0
    while time_cnt < timeout:
        time.sleep(SLEEP_TIME)
        time_cnt += SLEEP_TIME

TransferFromS3ToS3Config dataclass

S3 configuration for copying a list with keys between buckets

Attributes:

Name Type Description
s3_files list

A list with the S3 object keys to be copied.

bucket_src str

The source S3 bucket name.

bucket_dst str

The destination S3 bucket name.

max_retries int

The maximum number of download retries. Default is DWN_S3FILE_RETRIES.

Source code in docs/rs-server/services/common/rs_server_common/s3_storage_handler/s3_storage_handler.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@dataclass
class TransferFromS3ToS3Config:
    """S3 configuration for copying a list with keys between buckets

    Attributes:
        s3_files (list): A list with the S3 object keys to be copied.
        bucket_src (str): The source S3 bucket name.
        bucket_dst (str): The destination S3 bucket name.
        max_retries (int, optional): The maximum number of download retries. Default is DWN_S3FILE_RETRIES.

    """

    s3_files: list
    bucket_src: str
    bucket_dst: str
    copy_only: bool = False
    max_retries: int = DWN_S3FILE_RETRIES