Skip to content

API Clients Documentation

This documentation provides an overview of the various API clients available in the rs_client package. Each client is designed to interact with specific services and provide a convenient way to access their functionalities.

RsClient

RsClient class implementation.

Attributes:

Name Type Description
rs_server_href str

RS-Server URL. In local mode, pass None.

rs_server_api_key str

API key for RS-Server authentication. If not set, we try to read it from the RSPY_APIKEY environment variable.

owner_id str

ID of the owner of the STAC catalog collections (no special characters allowoed). By default, this is the user login from the keycloak account, associated to the API key. Or, in local mode, this is the local system username. Else, your API Key must give you the rights to read/write on this catalog owner. This owner ID is also used in the RS-Client logging.

logger Logger

Logging instance.

local_mode bool

Local mode or hybrid/cluster mode.

apikey_headers dict

API key in a dict, ready-to-use in HTTP request headers.

Source code in docs/rs-client-libraries/rs_client/rs_client.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
class RsClient:
    """
    RsClient class implementation.

    Attributes:
        rs_server_href (str): RS-Server URL. In local mode, pass None.
        rs_server_api_key (str): API key for RS-Server authentication.
                                 If not set, we try to read it from the RSPY_APIKEY environment variable.
        owner_id (str): ID of the owner of the STAC catalog collections (no special characters allowoed).
                        By default, this is the user login from the keycloak account, associated to the API key.
                        Or, in local mode, this is the local system username.
                        Else, your API Key must give you the rights to read/write on this catalog owner.
                        This owner ID is also used in the RS-Client logging.
        logger (logging.Logger): Logging instance.
        local_mode (bool): Local mode or hybrid/cluster mode.
        apikey_headers (dict): API key in a dict, ready-to-use in HTTP request headers.
    """

    def __init__(
        self,
        rs_server_href: str | None,
        rs_server_api_key: str | None = None,
        owner_id: str | None = None,
        logger: logging.Logger | None = None,
    ):
        """RsClient class constructor."""
        self.rs_server_href: str | None = rs_server_href
        self.rs_server_api_key: str | None = rs_server_api_key
        self.owner_id: str = owner_id or ""
        self.logger: logging.Logger = logger or Logging.default(__name__)

        # Remove trailing / character(s) from the URL
        if self.rs_server_href:
            self.rs_server_href = self.rs_server_href.strip().rstrip("/").strip()

        # We are in local mode if the URL is undefined.
        # Env vars are used instead to determine the different services URL.
        self.local_mode = not bool(self.rs_server_href)

        # If the API key is not set, we try to read it from the RSPY_APIKEY environment variable.
        if not self.rs_server_api_key:
            self.rs_server_api_key = os.getenv("RSPY_APIKEY")  # None if the env var is not set

        if (not self.local_mode) and (not self.rs_server_api_key):
            raise RuntimeError("API key is mandatory for RS-Server authentication")

        # For HTTP request headers
        self.apikey_headers: dict = (
            {"headers": {APIKEY_HEADER: self.rs_server_api_key}} if self.rs_server_api_key else {}
        )

        # Determine automatically the owner id
        if not self.owner_id:
            # In local mode, we use the local system username
            if self.local_mode:
                self.owner_id = getpass.getuser()

            # In hybrid/cluster mode, we retrieve the API key login
            else:
                self.owner_id = self.apikey_user_login

        # Remove special characters
        self.owner_id = re.sub(r"[^a-zA-Z0-9]+", "", self.owner_id)

        if not self.owner_id:
            raise RuntimeError("The owner ID is empty or only contains special characters")

        self.logger.debug(f"Owner ID: {self.owner_id!r}")

    # The following variable is needed for the tests to pass
    apikey_security_cache: TTLCache = TTLCache(maxsize=sys.maxsize, ttl=120)

    @cached(cache=apikey_security_cache)
    def apikey_security(self) -> tuple[list[str], dict, str]:
        """
        Check the api key validity. Cache an infinite (sys.maxsize) number of results for 120 seconds.

        Returns:
            Tuple of (IAM roles, config, user login) information from the keycloak account, associated to the api key.
        """

        # In local mode, we have no API key, so return empty results
        if self.local_mode:
            return [], {}, ""

        # self.logger.warning(
        #     f"TODO: use {self.rs_server_href}/apikeymanager/check/api_key instead, see: "
        #     "https://pforge-exchange2.astrium.eads.net/jira/browse/RSPY-257",
        # )
        # Does not work in hybrid mode for now because this URL is not exposed.
        check_url = os.environ["RSPY_UAC_CHECK_URL"]

        # Request the API key manager, pass user-defined api key in http header
        # check_url = f"{self.rs_server_href}/apikeymanager/check/api_key"
        self.logger.debug("Call the API key manager")
        response = requests.get(check_url, **self.apikey_headers, timeout=TIMEOUT)

        # Read the api key info
        if response.ok:
            contents = response.json()
            # Note: for now, config is an empty dict
            return contents["iam_roles"], contents["config"], contents["user_login"]

        # Try to read the response detail or error
        try:
            json = response.json()
            if "detail" in json:
                detail = json["detail"]
            else:
                detail = json["error"]

        # If this fail, get the full response content
        except Exception:  # pylint: disable=broad-exception-caught
            detail = response.content

        raise RuntimeError(f"API key manager status code {response.status_code}: {detail}")

    @property
    def apikey_iam_roles(self) -> list[str]:
        """
        Return the IAM (Identity and Access Management) roles from the keycloak account,
        associated to the api key.
        """
        return self.apikey_security()[0]

    @property
    def apikey_config(self) -> dict:
        """Return the config from the keycloak account, associated to the api key."""
        return self.apikey_security()[1]

    @property
    def apikey_user_login(self) -> str:
        """Return the user login from the keycloak account, associated to the api key."""
        return self.apikey_security()[2]

    #############################
    # Get child class instances #
    #############################

    def get_auxip_client(self) -> "AuxipClient":  # type: ignore # noqa: F821
        """
        Return an instance of the child class AuxipClient, with the same attributes as this "self" instance.
        """
        from rs_client.auxip_client import (  # pylint: disable=import-outside-toplevel,cyclic-import
            AuxipClient,
        )

        return AuxipClient(self.rs_server_href, self.rs_server_api_key, self.owner_id, self.logger)

    def get_cadip_client(
        self,
        station: ECadipStation,
    ) -> "CadipClient":  # type: ignore # noqa: F821
        """
        Return an instance of the child class CadipClient, with the same attributes as this "self" instance.

        Args:
            station (ECadipStation): Cadip station
        """
        from rs_client.cadip_client import (  # pylint: disable=import-outside-toplevel,cyclic-import
            CadipClient,
        )

        return CadipClient(self.rs_server_href, self.rs_server_api_key, self.owner_id, station, self.logger)

    def get_stac_client(self, *args, **kwargs) -> "StacClient":  # type: ignore # noqa: F821
        """
        Return an instance of the child class StacClient, with the same attributes as this "self" instance.
        """
        from rs_client.stac_client import (  # pylint: disable=import-outside-toplevel,cyclic-import
            StacClient,
        )

        return StacClient.open(self.rs_server_href, self.rs_server_api_key, self.owner_id, self.logger, *args, **kwargs)

    ############################
    # Call RS-Server endpoints #
    ############################

    def staging_status(self, filename, timeout: int = TIMEOUT) -> EDownloadStatus:
        """Check the status of a file download from the specified rs-server endpoint.

        This function sends a GET request to the rs-server endpoint with the filename as a query parameter
        to retrieve the status of the file download. If the response is successful and contains a 'status'
        key in the JSON response, the function returns the corresponding download status enum value. If the
        response is not successful or does not contain the 'status' key, the function returns a FAILED status.

        Args:
            filename (str): The name of the file for which to check the status.
            timeout (int): The timeout duration for the HTTP request.

        Returns:
            EDownloadStatus: The download status enum value based on the response from the endpoint.
        """

        # TODO: check the status for a certain timeout if http returns NOK ?
        try:
            response = requests.get(
                self.href_status,  # pylint: disable=no-member # ("self" is AuxipClient or CadipClient)
                params={"name": filename},
                timeout=timeout,
                **self.apikey_headers,
            )

            eval_response = response.json()
            if (
                response.ok
                and "name" in eval_response.keys()
                and filename == eval_response["name"]
                and "status" in eval_response.keys()
            ):
                return EDownloadStatus(eval_response["status"])

        except (requests.exceptions.RequestException, requests.exceptions.Timeout) as e:
            self.logger.exception(f"Status endpoint exception: {e}")

        return EDownloadStatus.FAILED

    def staging(self, filename: str, s3_path: str = "", tmp_download_path: str = "", timeout: int = TIMEOUT):
        """Stage a file for download.

        This method stages a file for download by sending a request to the staging endpoint
        with optional parameters for S3 path and temporary download path.

        Args:
            filename (str): The name of the file to be staged for download.
            timeout (int): The timeout duration for the HTTP request.
            s3_path (str, optional): The S3 path where the file will be stored after download.
                Defaults to an empty string.
            tmp_download_path (str, optional): The temporary download path for the file.
                Defaults to an empty string.

        Raises:
            RuntimeError: If an error occurs while staging the file.

        """

        # dictionary to be used for payload request
        payload = {}
        # some protections for the optional args
        if s3_path:
            payload["obs"] = s3_path
        if tmp_download_path:
            payload["local"] = tmp_download_path

        # update the filename to be ingested
        payload["name"] = filename
        try:
            # logger.debug(f"Calling  {endpoint} with payload {payload}")
            response = requests.get(
                self.href_staging,  # pylint: disable=no-member # ("self" is AuxipClient or CadipClient)
                params=payload,
                timeout=timeout,
                **self.apikey_headers,
            )
            self.logger.debug(f"Download start endpoint returned in {response.elapsed.total_seconds()}")
            if not response.ok:
                self.logger.error(f"The download endpoint returned error for file {filename}\n")
                raise RuntimeError(f"The download endpoint returned error for file {filename}")
        except (
            requests.exceptions.RequestException,
            requests.exceptions.Timeout,
            requests.exceptions.ReadTimeout,
        ) as e:
            self.logger.exception(f"Staging file exception for {filename}:", e)
            raise RuntimeError(f"Staging file exception for {filename}") from e

    def search_stations(  # pylint: disable=too-many-arguments
        self,
        start_date: datetime,
        stop_date: datetime,
        limit: Union[int, None] = None,
        sortby: Union[str, None] = None,
        timeout: int = TIMEOUT,
    ) -> list:
        """Retrieve a list of files from the specified endpoint.

        This function queries the specified endpoint to retrieve a list of files available in the
        station (CADIP, ADGS, LTA ...) that were collected by the satellite within the provided time range,
        starting from 'start_date' up to 'stop_date' (inclusive).

        Args:
            start_date (datetime): The start date of the time range.
            stop_date (datetime): The stop date of the time range.
            timeout (int): The timeout duration for the HTTP request.
            limit (int, optional): The maximum number of results to return. Defaults to None.
            sortby (str, optional): The attribute to sort the results by. Defaults to None.

        Returns:
            files (list): The list of files available at the station within the specified time range.

        Raises:
            RuntimeError: if the endpoint can't be reached

        Notes:
            - This function queries the specified endpoint with a time range to retrieve information about
            available files.
            - It constructs a payload with the start and stop dates in ISO 8601 format and sends a GET
            request to the endpoint.
            - The response is expected to be a STAC Compatible formatted JSONResponse, containing information about
             available files.
            - The function converts a STAC FeatureCollection to a Python list.
        """

        payload = {
            "datetime": start_date.strftime(DATETIME_FORMAT)
            + "/"  # 2014-01-01T12:00:00Z/2023-12-30T12:00:00Z",
            + stop_date.strftime(DATETIME_FORMAT),
        }
        if limit:
            payload["limit"] = str(limit)
        if sortby:
            payload["sortby"] = str(sortby)
        try:
            response = requests.get(
                self.href_search,  # pylint: disable=no-member # ("self" is AuxipClient or CadipClient)
                params=payload,
                timeout=timeout,
                **self.apikey_headers,
            )
        except (requests.exceptions.RequestException, requests.exceptions.Timeout) as e:
            self.logger.exception(f"Could not get the response from the station search endpoint: {e}")
            raise RuntimeError("Could not get the response from the station search endpoint") from e

        files = []
        try:
            if response.ok:
                for file_info in response.json()["features"]:
                    files.append(file_info)
            else:
                self.logger.error(f"Error: {response.status_code} : {response.json()}")
        except KeyError as e:
            raise RuntimeError("Wrong format of search endpoint answer") from e

        return files

    ##############################################
    # Methods to be implemented by child classes #
    ##############################################

    @property
    def href_search(self) -> str:
        """Implemented by AuxipClient and CadipClient."""
        raise NotImplementedError

    @property
    def href_staging(self) -> str:
        """Implemented by AuxipClient and CadipClient."""
        raise NotImplementedError

    @property
    def href_status(self) -> str:
        """Implemented by AuxipClient and CadipClient."""
        raise NotImplementedError

apikey_config: dict property

Return the config from the keycloak account, associated to the api key.

apikey_iam_roles: list[str] property

Return the IAM (Identity and Access Management) roles from the keycloak account, associated to the api key.

apikey_user_login: str property

Return the user login from the keycloak account, associated to the api key.

Implemented by AuxipClient and CadipClient.

href_staging: str property

Implemented by AuxipClient and CadipClient.

href_status: str property

Implemented by AuxipClient and CadipClient.

__init__(rs_server_href, rs_server_api_key=None, owner_id=None, logger=None)

RsClient class constructor.

Source code in docs/rs-client-libraries/rs_client/rs_client.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def __init__(
    self,
    rs_server_href: str | None,
    rs_server_api_key: str | None = None,
    owner_id: str | None = None,
    logger: logging.Logger | None = None,
):
    """RsClient class constructor."""
    self.rs_server_href: str | None = rs_server_href
    self.rs_server_api_key: str | None = rs_server_api_key
    self.owner_id: str = owner_id or ""
    self.logger: logging.Logger = logger or Logging.default(__name__)

    # Remove trailing / character(s) from the URL
    if self.rs_server_href:
        self.rs_server_href = self.rs_server_href.strip().rstrip("/").strip()

    # We are in local mode if the URL is undefined.
    # Env vars are used instead to determine the different services URL.
    self.local_mode = not bool(self.rs_server_href)

    # If the API key is not set, we try to read it from the RSPY_APIKEY environment variable.
    if not self.rs_server_api_key:
        self.rs_server_api_key = os.getenv("RSPY_APIKEY")  # None if the env var is not set

    if (not self.local_mode) and (not self.rs_server_api_key):
        raise RuntimeError("API key is mandatory for RS-Server authentication")

    # For HTTP request headers
    self.apikey_headers: dict = (
        {"headers": {APIKEY_HEADER: self.rs_server_api_key}} if self.rs_server_api_key else {}
    )

    # Determine automatically the owner id
    if not self.owner_id:
        # In local mode, we use the local system username
        if self.local_mode:
            self.owner_id = getpass.getuser()

        # In hybrid/cluster mode, we retrieve the API key login
        else:
            self.owner_id = self.apikey_user_login

    # Remove special characters
    self.owner_id = re.sub(r"[^a-zA-Z0-9]+", "", self.owner_id)

    if not self.owner_id:
        raise RuntimeError("The owner ID is empty or only contains special characters")

    self.logger.debug(f"Owner ID: {self.owner_id!r}")

apikey_security()

Check the api key validity. Cache an infinite (sys.maxsize) number of results for 120 seconds.

Returns:

Type Description
tuple[list[str], dict, str]

Tuple of (IAM roles, config, user login) information from the keycloak account, associated to the api key.

Source code in docs/rs-client-libraries/rs_client/rs_client.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@cached(cache=apikey_security_cache)
def apikey_security(self) -> tuple[list[str], dict, str]:
    """
    Check the api key validity. Cache an infinite (sys.maxsize) number of results for 120 seconds.

    Returns:
        Tuple of (IAM roles, config, user login) information from the keycloak account, associated to the api key.
    """

    # In local mode, we have no API key, so return empty results
    if self.local_mode:
        return [], {}, ""

    # self.logger.warning(
    #     f"TODO: use {self.rs_server_href}/apikeymanager/check/api_key instead, see: "
    #     "https://pforge-exchange2.astrium.eads.net/jira/browse/RSPY-257",
    # )
    # Does not work in hybrid mode for now because this URL is not exposed.
    check_url = os.environ["RSPY_UAC_CHECK_URL"]

    # Request the API key manager, pass user-defined api key in http header
    # check_url = f"{self.rs_server_href}/apikeymanager/check/api_key"
    self.logger.debug("Call the API key manager")
    response = requests.get(check_url, **self.apikey_headers, timeout=TIMEOUT)

    # Read the api key info
    if response.ok:
        contents = response.json()
        # Note: for now, config is an empty dict
        return contents["iam_roles"], contents["config"], contents["user_login"]

    # Try to read the response detail or error
    try:
        json = response.json()
        if "detail" in json:
            detail = json["detail"]
        else:
            detail = json["error"]

    # If this fail, get the full response content
    except Exception:  # pylint: disable=broad-exception-caught
        detail = response.content

    raise RuntimeError(f"API key manager status code {response.status_code}: {detail}")

get_auxip_client()

Return an instance of the child class AuxipClient, with the same attributes as this "self" instance.

Source code in docs/rs-client-libraries/rs_client/rs_client.py
176
177
178
179
180
181
182
183
184
def get_auxip_client(self) -> "AuxipClient":  # type: ignore # noqa: F821
    """
    Return an instance of the child class AuxipClient, with the same attributes as this "self" instance.
    """
    from rs_client.auxip_client import (  # pylint: disable=import-outside-toplevel,cyclic-import
        AuxipClient,
    )

    return AuxipClient(self.rs_server_href, self.rs_server_api_key, self.owner_id, self.logger)

get_cadip_client(station)

Return an instance of the child class CadipClient, with the same attributes as this "self" instance.

Parameters:

Name Type Description Default
station ECadipStation

Cadip station

required
Source code in docs/rs-client-libraries/rs_client/rs_client.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def get_cadip_client(
    self,
    station: ECadipStation,
) -> "CadipClient":  # type: ignore # noqa: F821
    """
    Return an instance of the child class CadipClient, with the same attributes as this "self" instance.

    Args:
        station (ECadipStation): Cadip station
    """
    from rs_client.cadip_client import (  # pylint: disable=import-outside-toplevel,cyclic-import
        CadipClient,
    )

    return CadipClient(self.rs_server_href, self.rs_server_api_key, self.owner_id, station, self.logger)

get_stac_client(*args, **kwargs)

Return an instance of the child class StacClient, with the same attributes as this "self" instance.

Source code in docs/rs-client-libraries/rs_client/rs_client.py
202
203
204
205
206
207
208
209
210
def get_stac_client(self, *args, **kwargs) -> "StacClient":  # type: ignore # noqa: F821
    """
    Return an instance of the child class StacClient, with the same attributes as this "self" instance.
    """
    from rs_client.stac_client import (  # pylint: disable=import-outside-toplevel,cyclic-import
        StacClient,
    )

    return StacClient.open(self.rs_server_href, self.rs_server_api_key, self.owner_id, self.logger, *args, **kwargs)

search_stations(start_date, stop_date, limit=None, sortby=None, timeout=TIMEOUT)

Retrieve a list of files from the specified endpoint.

This function queries the specified endpoint to retrieve a list of files available in the station (CADIP, ADGS, LTA ...) that were collected by the satellite within the provided time range, starting from 'start_date' up to 'stop_date' (inclusive).

Parameters:

Name Type Description Default
start_date datetime

The start date of the time range.

required
stop_date datetime

The stop date of the time range.

required
timeout int

The timeout duration for the HTTP request.

TIMEOUT
limit int

The maximum number of results to return. Defaults to None.

None
sortby str

The attribute to sort the results by. Defaults to None.

None

Returns:

Name Type Description
files list

The list of files available at the station within the specified time range.

Raises:

Type Description
RuntimeError

if the endpoint can't be reached

Notes
  • This function queries the specified endpoint with a time range to retrieve information about available files.
  • It constructs a payload with the start and stop dates in ISO 8601 format and sends a GET request to the endpoint.
  • The response is expected to be a STAC Compatible formatted JSONResponse, containing information about available files.
  • The function converts a STAC FeatureCollection to a Python list.
Source code in docs/rs-client-libraries/rs_client/rs_client.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def search_stations(  # pylint: disable=too-many-arguments
    self,
    start_date: datetime,
    stop_date: datetime,
    limit: Union[int, None] = None,
    sortby: Union[str, None] = None,
    timeout: int = TIMEOUT,
) -> list:
    """Retrieve a list of files from the specified endpoint.

    This function queries the specified endpoint to retrieve a list of files available in the
    station (CADIP, ADGS, LTA ...) that were collected by the satellite within the provided time range,
    starting from 'start_date' up to 'stop_date' (inclusive).

    Args:
        start_date (datetime): The start date of the time range.
        stop_date (datetime): The stop date of the time range.
        timeout (int): The timeout duration for the HTTP request.
        limit (int, optional): The maximum number of results to return. Defaults to None.
        sortby (str, optional): The attribute to sort the results by. Defaults to None.

    Returns:
        files (list): The list of files available at the station within the specified time range.

    Raises:
        RuntimeError: if the endpoint can't be reached

    Notes:
        - This function queries the specified endpoint with a time range to retrieve information about
        available files.
        - It constructs a payload with the start and stop dates in ISO 8601 format and sends a GET
        request to the endpoint.
        - The response is expected to be a STAC Compatible formatted JSONResponse, containing information about
         available files.
        - The function converts a STAC FeatureCollection to a Python list.
    """

    payload = {
        "datetime": start_date.strftime(DATETIME_FORMAT)
        + "/"  # 2014-01-01T12:00:00Z/2023-12-30T12:00:00Z",
        + stop_date.strftime(DATETIME_FORMAT),
    }
    if limit:
        payload["limit"] = str(limit)
    if sortby:
        payload["sortby"] = str(sortby)
    try:
        response = requests.get(
            self.href_search,  # pylint: disable=no-member # ("self" is AuxipClient or CadipClient)
            params=payload,
            timeout=timeout,
            **self.apikey_headers,
        )
    except (requests.exceptions.RequestException, requests.exceptions.Timeout) as e:
        self.logger.exception(f"Could not get the response from the station search endpoint: {e}")
        raise RuntimeError("Could not get the response from the station search endpoint") from e

    files = []
    try:
        if response.ok:
            for file_info in response.json()["features"]:
                files.append(file_info)
        else:
            self.logger.error(f"Error: {response.status_code} : {response.json()}")
    except KeyError as e:
        raise RuntimeError("Wrong format of search endpoint answer") from e

    return files

staging(filename, s3_path='', tmp_download_path='', timeout=TIMEOUT)

Stage a file for download.

This method stages a file for download by sending a request to the staging endpoint with optional parameters for S3 path and temporary download path.

Parameters:

Name Type Description Default
filename str

The name of the file to be staged for download.

required
timeout int

The timeout duration for the HTTP request.

TIMEOUT
s3_path str

The S3 path where the file will be stored after download. Defaults to an empty string.

''
tmp_download_path str

The temporary download path for the file. Defaults to an empty string.

''

Raises:

Type Description
RuntimeError

If an error occurs while staging the file.

Source code in docs/rs-client-libraries/rs_client/rs_client.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def staging(self, filename: str, s3_path: str = "", tmp_download_path: str = "", timeout: int = TIMEOUT):
    """Stage a file for download.

    This method stages a file for download by sending a request to the staging endpoint
    with optional parameters for S3 path and temporary download path.

    Args:
        filename (str): The name of the file to be staged for download.
        timeout (int): The timeout duration for the HTTP request.
        s3_path (str, optional): The S3 path where the file will be stored after download.
            Defaults to an empty string.
        tmp_download_path (str, optional): The temporary download path for the file.
            Defaults to an empty string.

    Raises:
        RuntimeError: If an error occurs while staging the file.

    """

    # dictionary to be used for payload request
    payload = {}
    # some protections for the optional args
    if s3_path:
        payload["obs"] = s3_path
    if tmp_download_path:
        payload["local"] = tmp_download_path

    # update the filename to be ingested
    payload["name"] = filename
    try:
        # logger.debug(f"Calling  {endpoint} with payload {payload}")
        response = requests.get(
            self.href_staging,  # pylint: disable=no-member # ("self" is AuxipClient or CadipClient)
            params=payload,
            timeout=timeout,
            **self.apikey_headers,
        )
        self.logger.debug(f"Download start endpoint returned in {response.elapsed.total_seconds()}")
        if not response.ok:
            self.logger.error(f"The download endpoint returned error for file {filename}\n")
            raise RuntimeError(f"The download endpoint returned error for file {filename}")
    except (
        requests.exceptions.RequestException,
        requests.exceptions.Timeout,
        requests.exceptions.ReadTimeout,
    ) as e:
        self.logger.exception(f"Staging file exception for {filename}:", e)
        raise RuntimeError(f"Staging file exception for {filename}") from e

staging_status(filename, timeout=TIMEOUT)

Check the status of a file download from the specified rs-server endpoint.

This function sends a GET request to the rs-server endpoint with the filename as a query parameter to retrieve the status of the file download. If the response is successful and contains a 'status' key in the JSON response, the function returns the corresponding download status enum value. If the response is not successful or does not contain the 'status' key, the function returns a FAILED status.

Parameters:

Name Type Description Default
filename str

The name of the file for which to check the status.

required
timeout int

The timeout duration for the HTTP request.

TIMEOUT

Returns:

Name Type Description
EDownloadStatus EDownloadStatus

The download status enum value based on the response from the endpoint.

Source code in docs/rs-client-libraries/rs_client/rs_client.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def staging_status(self, filename, timeout: int = TIMEOUT) -> EDownloadStatus:
    """Check the status of a file download from the specified rs-server endpoint.

    This function sends a GET request to the rs-server endpoint with the filename as a query parameter
    to retrieve the status of the file download. If the response is successful and contains a 'status'
    key in the JSON response, the function returns the corresponding download status enum value. If the
    response is not successful or does not contain the 'status' key, the function returns a FAILED status.

    Args:
        filename (str): The name of the file for which to check the status.
        timeout (int): The timeout duration for the HTTP request.

    Returns:
        EDownloadStatus: The download status enum value based on the response from the endpoint.
    """

    # TODO: check the status for a certain timeout if http returns NOK ?
    try:
        response = requests.get(
            self.href_status,  # pylint: disable=no-member # ("self" is AuxipClient or CadipClient)
            params={"name": filename},
            timeout=timeout,
            **self.apikey_headers,
        )

        eval_response = response.json()
        if (
            response.ok
            and "name" in eval_response.keys()
            and filename == eval_response["name"]
            and "status" in eval_response.keys()
        ):
            return EDownloadStatus(eval_response["status"])

    except (requests.exceptions.RequestException, requests.exceptions.Timeout) as e:
        self.logger.exception(f"Status endpoint exception: {e}")

    return EDownloadStatus.FAILED

This client is used for interacting with the RS service. It provides methods for performing common operations such as querying and managing data.

AuxipClient

Bases: RsClient

AuxipClient class implementation.

Attributes: see :py:class:RsClient

Source code in docs/rs-client-libraries/rs_client/auxip_client.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class AuxipClient(RsClient):
    """
    AuxipClient class implementation.

    Attributes: see :py:class:`RsClient`
    """

    @property
    def href_adgs(self) -> str:
        """
        Return the RS-Server ADGS URL hostname.
        This URL can be overwritten using the RSPY_HOST_ADGS env variable (used e.g. for local mode).
        Either it should just be the RS-Server URL.
        """
        if from_env := os.getenv("RSPY_HOST_ADGS", None):
            return from_env.rstrip("/")
        if not self.rs_server_href:
            raise RuntimeError("RS-Server URL is undefined")
        return self.rs_server_href.rstrip("/")

    @property
    def href_search(self) -> str:
        """Return the RS-Server hostname and path where the ADGS search endpoint is deployed."""
        return f"{self.href_adgs}/adgs/aux/search"

    @property
    def href_staging(self) -> str:
        """Return the RS-Server hostname and path where the ADGS staging endpoint is deployed."""
        return f"{self.href_adgs}/adgs/aux"

    @property
    def href_status(self) -> str:
        """Return the RS-Server hostname and path where the ADGS status endpoint is deployed."""
        return f"{self.href_adgs}/adgs/aux/status"

    @property
    def station_name(self) -> str:
        """Return "AUXIP"."""
        return AUXIP_STATION

href_adgs: str property

Return the RS-Server ADGS URL hostname. This URL can be overwritten using the RSPY_HOST_ADGS env variable (used e.g. for local mode). Either it should just be the RS-Server URL.

Return the RS-Server hostname and path where the ADGS search endpoint is deployed.

href_staging: str property

Return the RS-Server hostname and path where the ADGS staging endpoint is deployed.

href_status: str property

Return the RS-Server hostname and path where the ADGS status endpoint is deployed.

station_name: str property

Return "AUXIP".

The AuxipClient is tailored for accessing the AUXIP service. It includes functionalities for handling auxiliary data and metadata.

CadipClient

Bases: RsClient

CadipClient class implementation.

see :py:class:`RsClient`

Name Type Description
station ECadipStation

Cadip station

Source code in docs/rs-client-libraries/rs_client/cadip_client.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class CadipClient(RsClient):
    """
    CadipClient class implementation.

    Attributes: see :py:class:`RsClient`
        station (ECadipStation): Cadip station
    """

    def __init__(  # pylint: disable=too-many-arguments
        self,
        rs_server_href: str | None,
        rs_server_api_key: str | None,
        owner_id: str,
        station: ECadipStation,
        logger: logging.Logger | None = None,
    ):
        """CadipClient class constructor."""
        super().__init__(rs_server_href, rs_server_api_key, owner_id, logger)
        try:
            self.station: ECadipStation = ECadipStation[station]
        except KeyError as e:
            self.logger.exception(f"There is no such CADIP station: {station}")
            raise RuntimeError(f"There is no such CADIP station: {station}") from e

    @property
    def href_cadip(self) -> str:
        """
        Return the RS-Server CADIP URL hostname.
        This URL can be overwritten using the RSPY_HOST_CADIP env variable (used e.g. for local mode).
        Either it should just be the RS-Server URL.
        """
        if from_env := os.getenv("RSPY_HOST_CADIP", None):
            return from_env.rstrip("/")
        if not self.rs_server_href:
            raise RuntimeError("RS-Server URL is undefined")
        return self.rs_server_href.rstrip("/")

    @property
    def href_search(self) -> str:
        """Return the RS-Server hostname and path where the CADIP search endpoint is deployed."""
        return f"{self.href_cadip}/cadip/{self.station.value}/cadu/search"

    @property
    def href_session(self) -> str:
        """Return the RS-Server hostname and path where the CADIP search session endpoint is deployed."""
        return f"{self.href_cadip}/cadip/{self.station.value}/session"

    @property
    def href_staging(self) -> str:
        """Return the RS-Server hostname and path where the CADIP staging endpoint is deployed."""
        return f"{self.href_cadip}/cadip/{self.station.value}/cadu"

    @property
    def href_status(self) -> str:
        """Return the RS-Server hostname and path where the CADIP status endpoint is deployed."""
        return f"{self.href_cadip}/cadip/{self.station.value}/cadu/status"

    @property
    def station_name(self) -> str:
        """Return the station name."""
        return self.station.value  # TO BE DISCUSSED: maybe just return "CADIP"

    ############################
    # Call RS-Server endpoints #
    ############################

    def search_sessions(  # pylint: disable=too-many-arguments
        self,
        session_ids: list[str] | None = None,
        start_date: datetime | None = None,
        stop_date: datetime | None = None,
        platforms: list[EPlatform] | None = None,
        timeout: int = TIMEOUT,
    ) -> list[dict]:  # TODO return pystac.ItemCollection instead
        """Endpoint to retrieve list of sessions from any CADIP station.

        Args:
            timeout (int): The timeout duration for the HTTP request.
            session_ids (list[str]): Session identifiers
                (eg: ["S1A_20170501121534062343"] or ["S1A_20170501121534062343, S1A_20240328185208053186"])
            start_date (datetime): Start date of the time interval
            stop_date (datetime): Stop date of the time interval
            platforms (list[PlatformEnum]): platform list
        """

        payload = {}
        if session_ids:
            payload["id"] = ",".join(session_ids)
        if platforms:
            payload["platform"] = ",".join([platform.value for platform in platforms])
        if start_date:
            payload["start_date"] = start_date.strftime(DATETIME_FORMAT)
        if stop_date:
            payload["stop_date"] = stop_date.strftime(DATETIME_FORMAT)
        try:
            response = requests.get(
                self.href_session,
                params=payload,
                timeout=timeout,
                **self.apikey_headers,
            )
        except (requests.exceptions.RequestException, requests.exceptions.Timeout) as e:
            self.logger.exception(f"Could not get the response from the session search endpoint: {e}")
            raise RuntimeError("Could not get the response from the session search endpoint") from e

        sessions = []
        try:
            if response.ok:
                for session_info in response.json()["features"]:
                    sessions.append(session_info)
            else:
                self.logger.error(f"Error: {response.status_code} : {response.json()}")
        except KeyError as e:
            raise RuntimeError("Wrong format of session search endpoint answer") from e

        return sessions

href_cadip: str property

Return the RS-Server CADIP URL hostname. This URL can be overwritten using the RSPY_HOST_CADIP env variable (used e.g. for local mode). Either it should just be the RS-Server URL.

Return the RS-Server hostname and path where the CADIP search endpoint is deployed.

href_session: str property

Return the RS-Server hostname and path where the CADIP search session endpoint is deployed.

href_staging: str property

Return the RS-Server hostname and path where the CADIP staging endpoint is deployed.

href_status: str property

Return the RS-Server hostname and path where the CADIP status endpoint is deployed.

station_name: str property

Return the station name.

__init__(rs_server_href, rs_server_api_key, owner_id, station, logger=None)

CadipClient class constructor.

Source code in docs/rs-client-libraries/rs_client/cadip_client.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(  # pylint: disable=too-many-arguments
    self,
    rs_server_href: str | None,
    rs_server_api_key: str | None,
    owner_id: str,
    station: ECadipStation,
    logger: logging.Logger | None = None,
):
    """CadipClient class constructor."""
    super().__init__(rs_server_href, rs_server_api_key, owner_id, logger)
    try:
        self.station: ECadipStation = ECadipStation[station]
    except KeyError as e:
        self.logger.exception(f"There is no such CADIP station: {station}")
        raise RuntimeError(f"There is no such CADIP station: {station}") from e

search_sessions(session_ids=None, start_date=None, stop_date=None, platforms=None, timeout=TIMEOUT)

Endpoint to retrieve list of sessions from any CADIP station.

Parameters:

Name Type Description Default
timeout int

The timeout duration for the HTTP request.

TIMEOUT
session_ids list[str]

Session identifiers (eg: ["S1A_20170501121534062343"] or ["S1A_20170501121534062343, S1A_20240328185208053186"])

None
start_date datetime

Start date of the time interval

None
stop_date datetime

Stop date of the time interval

None
platforms list[PlatformEnum]

platform list

None
Source code in docs/rs-client-libraries/rs_client/cadip_client.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def search_sessions(  # pylint: disable=too-many-arguments
    self,
    session_ids: list[str] | None = None,
    start_date: datetime | None = None,
    stop_date: datetime | None = None,
    platforms: list[EPlatform] | None = None,
    timeout: int = TIMEOUT,
) -> list[dict]:  # TODO return pystac.ItemCollection instead
    """Endpoint to retrieve list of sessions from any CADIP station.

    Args:
        timeout (int): The timeout duration for the HTTP request.
        session_ids (list[str]): Session identifiers
            (eg: ["S1A_20170501121534062343"] or ["S1A_20170501121534062343, S1A_20240328185208053186"])
        start_date (datetime): Start date of the time interval
        stop_date (datetime): Stop date of the time interval
        platforms (list[PlatformEnum]): platform list
    """

    payload = {}
    if session_ids:
        payload["id"] = ",".join(session_ids)
    if platforms:
        payload["platform"] = ",".join([platform.value for platform in platforms])
    if start_date:
        payload["start_date"] = start_date.strftime(DATETIME_FORMAT)
    if stop_date:
        payload["stop_date"] = stop_date.strftime(DATETIME_FORMAT)
    try:
        response = requests.get(
            self.href_session,
            params=payload,
            timeout=timeout,
            **self.apikey_headers,
        )
    except (requests.exceptions.RequestException, requests.exceptions.Timeout) as e:
        self.logger.exception(f"Could not get the response from the session search endpoint: {e}")
        raise RuntimeError("Could not get the response from the session search endpoint") from e

    sessions = []
    try:
        if response.ok:
            for session_info in response.json()["features"]:
                sessions.append(session_info)
        else:
            self.logger.error(f"Error: {response.status_code} : {response.json()}")
    except KeyError as e:
        raise RuntimeError("Wrong format of session search endpoint answer") from e

    return sessions

CadipClient is designed to interface with the CADIP service. Use this client to perform tasks related to CADIP data processing and retrieval.

StacClient

Bases: RsClient, Client

StacClient inherits from both rs_client.RsClient and pystac_client.Client. The goal of this class is to allow an user to use RS-Server services more easily than calling REST endpoints directly.

Source code in docs/rs-client-libraries/rs_client/stac_client.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
class StacClient(RsClient, Client):  # type: ignore # pylint: disable=too-many-ancestors
    """StacClient inherits from both rs_client.RsClient and pystac_client.Client. The goal of this class is to
    allow an user to use RS-Server services more easily than calling REST endpoints directly.
    """

    ##################
    # Initialisation #
    ##################

    def __init__(  # pylint: disable=too-many-arguments,super-init-not-called # super-init is called in .open(...)
        self,
        id: str,  # pylint: disable=redefined-builtin
        description: str,
        title: Optional[str] = None,
        stac_extensions: Optional[List[str]] = None,
        extra_fields: Optional[Dict[str, Any]] = None,
        href: Optional[str] = None,
        catalog_type: CatalogType = CatalogType.ABSOLUTE_PUBLISHED,
        strategy: Optional[HrefLayoutStrategy] = None,
        *,
        modifier: Optional[Callable[[Modifiable], None]] = None,
        **kwargs: Dict[str, Any],
    ):
        """
        Constructor. Called only by pystac.
        As an user: don't use this directly, call the open(...) class method instead or RsClient.get_stac_client(...).
        """

        # Call manually the parent pystac Client constructor.
        # The RsClient constructor will be called manually later.
        Client.__init__(
            self,
            id=id,
            description=description,
            title=title,
            stac_extensions=stac_extensions,
            extra_fields=extra_fields,
            href=href,
            catalog_type=catalog_type,
            strategy=strategy,
            modifier=modifier,
            **kwargs,
        )

    @classmethod
    def open(  # type: ignore  # pylint: disable=arguments-renamed, too-many-arguments
        cls,
        # RsClient parameters
        rs_server_href: str | None,
        rs_server_api_key: str | None,
        owner_id: str | None,
        logger: logging.Logger | None = None,
        # pystac Client parameters
        headers: Optional[Dict[str, str]] = None,
        parameters: Optional[Dict[str, Any]] = None,
        ignore_conformance: Optional[bool] = None,
        modifier: Optional[Callable[[Modifiable], None]] = None,
        request_modifier: Optional[Callable[[Request], Union[Request, None]]] = None,
        stac_io: Optional[StacApiIO] = None,
        timeout: Optional[Timeout] = TIMEOUT,
    ) -> StacClient:
        """Create a new StacClient instance."""

        if rs_server_api_key:
            if headers is None:
                headers = {}
            headers[APIKEY_HEADER] = rs_server_api_key

        client: StacClient = super().open(  # type: ignore
            cls.__href_catalog(rs_server_href) + "/catalog/",
            headers,
            parameters,
            ignore_conformance,
            modifier,
            request_modifier,
            stac_io,
            timeout,
        )

        # Manual call to the parent RsClient constructor
        RsClient.__init__(
            client,
            rs_server_href=rs_server_href,
            rs_server_api_key=rs_server_api_key,
            owner_id=owner_id,
            logger=logger,
        )

        return client

    ##############
    # Properties #
    ##############

    @property
    def href_catalog(self) -> str:
        """
        Return the RS-Server catalog URL hostname.
        This URL can be overwritten using the RSPY_HOST_CATALOG env variable (used e.g. for local mode).
        Either it should just be the RS-Server URL.
        """
        return self.__href_catalog(self.rs_server_href)

    @staticmethod
    def __href_catalog(rs_server_href) -> str:
        if from_env := os.getenv("RSPY_HOST_CATALOG", None):
            return from_env.rstrip("/")
        if not rs_server_href:
            raise RuntimeError("RS-Server URL is undefined")
        return rs_server_href.rstrip("/")

    def full_collection_id(self, owner_id: str | None, collection_id: str):
        """
        Return the full collection name as: <owner_id>:<collection_id>

        Args:
            owner_id (str): Collection owner ID. If missing, we use self.owner_id.
            collection_id (str): Collection name
        """
        return f"{owner_id or self.owner_id}:{collection_id}"

    ################################
    # Specific STAC implementation #
    ################################

    @lru_cache()
    def get_collection(self, collection_id: str, owner_id: str | None = None) -> Union[Collection, CollectionClient]:
        """Get the requested collection as <owner_id>:<collection_id>"""
        full_collection_id = self.full_collection_id(owner_id, collection_id)
        return Client.get_collection(self, full_collection_id)

    def add_collection(
        self,
        collection: Collection,
        add_public_license: bool = True,
        owner_id: str | None = None,
        timeout: int = TIMEOUT,
    ) -> Response:
        """Update the collection links, then post the collection into the catalog.

        Args:
            collection (Collection): STAC collection
            add_public_license (bool): If True, add a public domain license field and link.
            owner_id (str, optional): Collection owner ID. If missing, we use self.owner_id.
            timeout (int): The timeout duration for the HTTP request.

        Returns:
            JSONResponse (json): The response of the request.
        """

        full_owner_id = owner_id or self.owner_id

        # Use owner_id:collection_id instead of just the collection ID, before adding the links,
        # so the links contain the full owner_id:collection_id
        short_collection_id = collection.id
        full_collection_id = self.full_collection_id(owner_id, short_collection_id)
        collection.id = full_collection_id

        # Default description
        if not collection.description:
            collection.description = f"This is the collection {short_collection_id} from user {full_owner_id}."

        # Add the owner_id as an extra field
        collection.extra_fields["owner"] = full_owner_id

        # Add public domain license
        if add_public_license:
            collection.license = "public-domain"
            collection.add_link(
                Link(
                    rel=RelType.LICENSE,
                    target="https://creativecommons.org/licenses/publicdomain/",
                    title="public-domain",
                ),
            )

        # Update the links
        self.add_child(collection)

        # Restore the short collection_id at the root of the collection
        collection.id = short_collection_id

        # Check that the collection is compliant to STAC
        collection.validate_all()

        # Post the collection to the catalog
        return requests.post(
            f"{self.href_catalog}/catalog/collections",
            json=collection.to_dict(),
            **self.apikey_headers,
            timeout=timeout,
        )

    def remove_collection(
        self,
        collection_id: str,
        owner_id: str | None = None,
        timeout: int = TIMEOUT,
    ) -> Response:
        """Remove/delete a collection from the catalog.

        Args:
            collection_id (str): The collection id.
            owner_id (str, optional): Collection owner ID. If missing, we use self.owner_id.
            timeout (int): The timeout duration for the HTTP request.

        Returns:
            JSONResponse: The response of the request.
        """
        # owner_id:collection_id
        full_collection_id = self.full_collection_id(owner_id, collection_id)

        # Remove the collection from the "child" links of the local catalog instance
        collection_link = f"{self.self_href.rstrip('/')}/collections/{full_collection_id}"
        self.links = [
            link for link in self.links if not ((link.rel == pystac.RelType.CHILD) and (link.href == collection_link))
        ]

        # We need to clear the cache for this and parent "get_collection" methods
        # because their returned value must be updated.
        self.get_collection.cache_clear()
        Client.get_collection.cache_clear()

        # Remove the collection from the server catalog
        return requests.delete(
            f"{self.href_catalog}/catalog/collections/{full_collection_id}",
            **self.apikey_headers,
            timeout=timeout,
        )

    def add_item(  # type: ignore # pylint: disable=arguments-renamed
        self,
        collection_id: str,
        item: Item,
        owner_id: str | None = None,
        timeout: int = TIMEOUT,
    ) -> Response:
        """Update the item links, then post the item into the catalog.

        Args:
            collection_id (str): The collection id.
            item (Item): STAC item to update and post
            owner_id (str, optional): Collection owner ID. If missing, we use self.owner_id.
            timeout (int): The timeout duration for the HTTP request.

        Returns:
            JSONResponse: The response of the request.
        """
        # owner_id:collection_id
        full_collection_id = self.full_collection_id(owner_id, collection_id)

        # Get the collection from the catalog
        collection = self.get_collection(collection_id, owner_id)

        # Update the item  contents
        collection.add_item(item)

        # Post the item to the catalog
        return requests.post(
            f"{self.href_catalog}/catalog/collections/{full_collection_id}/items",
            json=item.to_dict(),
            **self.apikey_headers,
            timeout=timeout,
        )

    def remove_item(  # type: ignore # pylint: disable=arguments-differ
        self,
        collection_id: str,
        item_id: str,
        owner_id: str | None = None,
        timeout: int = TIMEOUT,
    ) -> Response:
        """Remove/delete an item from a collection.

        Args:
            collection_id (str): The collection id.
            item_id (str): The item id.
            owner_id (str, optional): Collection owner ID. If missing, we use self.owner_id.
            timeout (int): The timeout duration for the HTTP request.

        Returns:
            JSONResponse: The response of the request.
        """
        # owner_id:collection_id
        full_collection_id = self.full_collection_id(owner_id, collection_id)

        # Remove the collection from the server catalog
        return requests.delete(
            f"{self.href_catalog}/catalog/collections/{full_collection_id}/items/{item_id}",
            **self.apikey_headers,
            timeout=timeout,
        )

href_catalog: str property

Return the RS-Server catalog URL hostname. This URL can be overwritten using the RSPY_HOST_CATALOG env variable (used e.g. for local mode). Either it should just be the RS-Server URL.

__init__(id, description, title=None, stac_extensions=None, extra_fields=None, href=None, catalog_type=CatalogType.ABSOLUTE_PUBLISHED, strategy=None, *, modifier=None, **kwargs)

Constructor. Called only by pystac. As an user: don't use this directly, call the open(...) class method instead or RsClient.get_stac_client(...).

Source code in docs/rs-client-libraries/rs_client/stac_client.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def __init__(  # pylint: disable=too-many-arguments,super-init-not-called # super-init is called in .open(...)
    self,
    id: str,  # pylint: disable=redefined-builtin
    description: str,
    title: Optional[str] = None,
    stac_extensions: Optional[List[str]] = None,
    extra_fields: Optional[Dict[str, Any]] = None,
    href: Optional[str] = None,
    catalog_type: CatalogType = CatalogType.ABSOLUTE_PUBLISHED,
    strategy: Optional[HrefLayoutStrategy] = None,
    *,
    modifier: Optional[Callable[[Modifiable], None]] = None,
    **kwargs: Dict[str, Any],
):
    """
    Constructor. Called only by pystac.
    As an user: don't use this directly, call the open(...) class method instead or RsClient.get_stac_client(...).
    """

    # Call manually the parent pystac Client constructor.
    # The RsClient constructor will be called manually later.
    Client.__init__(
        self,
        id=id,
        description=description,
        title=title,
        stac_extensions=stac_extensions,
        extra_fields=extra_fields,
        href=href,
        catalog_type=catalog_type,
        strategy=strategy,
        modifier=modifier,
        **kwargs,
    )

add_collection(collection, add_public_license=True, owner_id=None, timeout=TIMEOUT)

Update the collection links, then post the collection into the catalog.

Parameters:

Name Type Description Default
collection Collection

STAC collection

required
add_public_license bool

If True, add a public domain license field and link.

True
owner_id str

Collection owner ID. If missing, we use self.owner_id.

None
timeout int

The timeout duration for the HTTP request.

TIMEOUT

Returns:

Name Type Description
JSONResponse json

The response of the request.

Source code in docs/rs-client-libraries/rs_client/stac_client.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def add_collection(
    self,
    collection: Collection,
    add_public_license: bool = True,
    owner_id: str | None = None,
    timeout: int = TIMEOUT,
) -> Response:
    """Update the collection links, then post the collection into the catalog.

    Args:
        collection (Collection): STAC collection
        add_public_license (bool): If True, add a public domain license field and link.
        owner_id (str, optional): Collection owner ID. If missing, we use self.owner_id.
        timeout (int): The timeout duration for the HTTP request.

    Returns:
        JSONResponse (json): The response of the request.
    """

    full_owner_id = owner_id or self.owner_id

    # Use owner_id:collection_id instead of just the collection ID, before adding the links,
    # so the links contain the full owner_id:collection_id
    short_collection_id = collection.id
    full_collection_id = self.full_collection_id(owner_id, short_collection_id)
    collection.id = full_collection_id

    # Default description
    if not collection.description:
        collection.description = f"This is the collection {short_collection_id} from user {full_owner_id}."

    # Add the owner_id as an extra field
    collection.extra_fields["owner"] = full_owner_id

    # Add public domain license
    if add_public_license:
        collection.license = "public-domain"
        collection.add_link(
            Link(
                rel=RelType.LICENSE,
                target="https://creativecommons.org/licenses/publicdomain/",
                title="public-domain",
            ),
        )

    # Update the links
    self.add_child(collection)

    # Restore the short collection_id at the root of the collection
    collection.id = short_collection_id

    # Check that the collection is compliant to STAC
    collection.validate_all()

    # Post the collection to the catalog
    return requests.post(
        f"{self.href_catalog}/catalog/collections",
        json=collection.to_dict(),
        **self.apikey_headers,
        timeout=timeout,
    )

add_item(collection_id, item, owner_id=None, timeout=TIMEOUT)

Update the item links, then post the item into the catalog.

Parameters:

Name Type Description Default
collection_id str

The collection id.

required
item Item

STAC item to update and post

required
owner_id str

Collection owner ID. If missing, we use self.owner_id.

None
timeout int

The timeout duration for the HTTP request.

TIMEOUT

Returns:

Name Type Description
JSONResponse Response

The response of the request.

Source code in docs/rs-client-libraries/rs_client/stac_client.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def add_item(  # type: ignore # pylint: disable=arguments-renamed
    self,
    collection_id: str,
    item: Item,
    owner_id: str | None = None,
    timeout: int = TIMEOUT,
) -> Response:
    """Update the item links, then post the item into the catalog.

    Args:
        collection_id (str): The collection id.
        item (Item): STAC item to update and post
        owner_id (str, optional): Collection owner ID. If missing, we use self.owner_id.
        timeout (int): The timeout duration for the HTTP request.

    Returns:
        JSONResponse: The response of the request.
    """
    # owner_id:collection_id
    full_collection_id = self.full_collection_id(owner_id, collection_id)

    # Get the collection from the catalog
    collection = self.get_collection(collection_id, owner_id)

    # Update the item  contents
    collection.add_item(item)

    # Post the item to the catalog
    return requests.post(
        f"{self.href_catalog}/catalog/collections/{full_collection_id}/items",
        json=item.to_dict(),
        **self.apikey_headers,
        timeout=timeout,
    )

full_collection_id(owner_id, collection_id)

Return the full collection name as: :

Parameters:

Name Type Description Default
owner_id str

Collection owner ID. If missing, we use self.owner_id.

required
collection_id str

Collection name

required
Source code in docs/rs-client-libraries/rs_client/stac_client.py
147
148
149
150
151
152
153
154
155
def full_collection_id(self, owner_id: str | None, collection_id: str):
    """
    Return the full collection name as: <owner_id>:<collection_id>

    Args:
        owner_id (str): Collection owner ID. If missing, we use self.owner_id.
        collection_id (str): Collection name
    """
    return f"{owner_id or self.owner_id}:{collection_id}"

get_collection(collection_id, owner_id=None) cached

Get the requested collection as :

Source code in docs/rs-client-libraries/rs_client/stac_client.py
161
162
163
164
165
@lru_cache()
def get_collection(self, collection_id: str, owner_id: str | None = None) -> Union[Collection, CollectionClient]:
    """Get the requested collection as <owner_id>:<collection_id>"""
    full_collection_id = self.full_collection_id(owner_id, collection_id)
    return Client.get_collection(self, full_collection_id)

open(rs_server_href, rs_server_api_key, owner_id, logger=None, headers=None, parameters=None, ignore_conformance=None, modifier=None, request_modifier=None, stac_io=None, timeout=TIMEOUT) classmethod

Create a new StacClient instance.

Source code in docs/rs-client-libraries/rs_client/stac_client.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@classmethod
def open(  # type: ignore  # pylint: disable=arguments-renamed, too-many-arguments
    cls,
    # RsClient parameters
    rs_server_href: str | None,
    rs_server_api_key: str | None,
    owner_id: str | None,
    logger: logging.Logger | None = None,
    # pystac Client parameters
    headers: Optional[Dict[str, str]] = None,
    parameters: Optional[Dict[str, Any]] = None,
    ignore_conformance: Optional[bool] = None,
    modifier: Optional[Callable[[Modifiable], None]] = None,
    request_modifier: Optional[Callable[[Request], Union[Request, None]]] = None,
    stac_io: Optional[StacApiIO] = None,
    timeout: Optional[Timeout] = TIMEOUT,
) -> StacClient:
    """Create a new StacClient instance."""

    if rs_server_api_key:
        if headers is None:
            headers = {}
        headers[APIKEY_HEADER] = rs_server_api_key

    client: StacClient = super().open(  # type: ignore
        cls.__href_catalog(rs_server_href) + "/catalog/",
        headers,
        parameters,
        ignore_conformance,
        modifier,
        request_modifier,
        stac_io,
        timeout,
    )

    # Manual call to the parent RsClient constructor
    RsClient.__init__(
        client,
        rs_server_href=rs_server_href,
        rs_server_api_key=rs_server_api_key,
        owner_id=owner_id,
        logger=logger,
    )

    return client

remove_collection(collection_id, owner_id=None, timeout=TIMEOUT)

Remove/delete a collection from the catalog.

Parameters:

Name Type Description Default
collection_id str

The collection id.

required
owner_id str

Collection owner ID. If missing, we use self.owner_id.

None
timeout int

The timeout duration for the HTTP request.

TIMEOUT

Returns:

Name Type Description
JSONResponse Response

The response of the request.

Source code in docs/rs-client-libraries/rs_client/stac_client.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def remove_collection(
    self,
    collection_id: str,
    owner_id: str | None = None,
    timeout: int = TIMEOUT,
) -> Response:
    """Remove/delete a collection from the catalog.

    Args:
        collection_id (str): The collection id.
        owner_id (str, optional): Collection owner ID. If missing, we use self.owner_id.
        timeout (int): The timeout duration for the HTTP request.

    Returns:
        JSONResponse: The response of the request.
    """
    # owner_id:collection_id
    full_collection_id = self.full_collection_id(owner_id, collection_id)

    # Remove the collection from the "child" links of the local catalog instance
    collection_link = f"{self.self_href.rstrip('/')}/collections/{full_collection_id}"
    self.links = [
        link for link in self.links if not ((link.rel == pystac.RelType.CHILD) and (link.href == collection_link))
    ]

    # We need to clear the cache for this and parent "get_collection" methods
    # because their returned value must be updated.
    self.get_collection.cache_clear()
    Client.get_collection.cache_clear()

    # Remove the collection from the server catalog
    return requests.delete(
        f"{self.href_catalog}/catalog/collections/{full_collection_id}",
        **self.apikey_headers,
        timeout=timeout,
    )

remove_item(collection_id, item_id, owner_id=None, timeout=TIMEOUT)

Remove/delete an item from a collection.

Parameters:

Name Type Description Default
collection_id str

The collection id.

required
item_id str

The item id.

required
owner_id str

Collection owner ID. If missing, we use self.owner_id.

None
timeout int

The timeout duration for the HTTP request.

TIMEOUT

Returns:

Name Type Description
JSONResponse Response

The response of the request.

Source code in docs/rs-client-libraries/rs_client/stac_client.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def remove_item(  # type: ignore # pylint: disable=arguments-differ
    self,
    collection_id: str,
    item_id: str,
    owner_id: str | None = None,
    timeout: int = TIMEOUT,
) -> Response:
    """Remove/delete an item from a collection.

    Args:
        collection_id (str): The collection id.
        item_id (str): The item id.
        owner_id (str, optional): Collection owner ID. If missing, we use self.owner_id.
        timeout (int): The timeout duration for the HTTP request.

    Returns:
        JSONResponse: The response of the request.
    """
    # owner_id:collection_id
    full_collection_id = self.full_collection_id(owner_id, collection_id)

    # Remove the collection from the server catalog
    return requests.delete(
        f"{self.href_catalog}/catalog/collections/{full_collection_id}/items/{item_id}",
        **self.apikey_headers,
        timeout=timeout,
    )

This client allows you to interact with the STAC service, making it easy to search, retrieve, and manage spatio-temporal asset catalog data.

For detailed usage instructions and examples for each client, please refer to the respective sections.