Skip to content

API Clients Documentation

This documentation provides an overview of the various API clients available in the rs_client package. Each client is designed to interact with specific services and provide a convenient way to access their functionalities.

RsClient

Client for interacting with the RS-Server services: - rs-server-staging - rs-server-cadip - rs-server-auxip - rs-server-catalog

This class provides methods to authenticate and interact with RS-Server, manage STAC collections, and handle API requests.

Attributes:

Name Type Description
rs_server_href str | None

RS-Server URL. Pass None for local mode.

rs_server_api_key str | None

API key for RS-Server authentication.

rs_server_oauth2_cookie str | None

OAuth2 session cookie read from the RSPY_OAUTH2_COOKIE environment variable.

owner_id str

The owner of the STAC catalog collections (no special characters allowed). If not set, we try to read it from the RSPY_HOST_USER environment variable. If still not set: - In local mode, it takes the system username. - In cluster mode, it is deduced from the API key or OAuth2 login = your keycloak username. - In hybrid mode, we raise an Exception. If owner_id is different than your keycloak username, then make sure that your keycloak account has the rights to read/write on this catalog owner. owner_id is also used in the RS-Client logging.

logger Logger

Logger instance for logging messages.

local_mode bool

Indicates whether the client is running in local mode.

apikey_headers dict

API key headers for HTTP requests.

http_session Session

HTTP session for handling requests.

Source code in docs/rs-client-libraries/rs_client/rs_client.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
class RsClient:  # pylint: disable=too-many-instance-attributes
    """
    Client for interacting with the RS-Server services:
    - rs-server-staging
    - rs-server-cadip
    - rs-server-auxip
    - rs-server-catalog

    This class provides methods to authenticate and interact with RS-Server,
    manage STAC collections, and handle API requests.

    Attributes:
        rs_server_href (str | None): RS-Server URL. Pass None for local mode.
        rs_server_api_key (str | None): API key for RS-Server authentication.
        rs_server_oauth2_cookie (str | None): OAuth2 session cookie read from
            the `RSPY_OAUTH2_COOKIE` environment variable.
        owner_id (str): The owner of the STAC catalog collections (no special characters allowed).
            If not set, we try to read it from the RSPY_HOST_USER environment variable. If still not set:
            - In local mode, it takes the system username.
            - In cluster mode, it is deduced from the API key or OAuth2 login = your keycloak username.
            - In hybrid mode, we raise an Exception.
            If owner_id is different than your keycloak username, then make sure that your keycloak account has
            the rights to read/write on this catalog owner.
            owner_id is also used in the RS-Client logging.
        logger (logging.Logger): Logger instance for logging messages.
        local_mode (bool): Indicates whether the client is running in local mode.
        apikey_headers (dict): API key headers for HTTP requests.
        http_session (requests.Session): HTTP session for handling requests.
    """

    def __init__(  # pylint: disable=too-many-branches, too-many-arguments, too-many-positional-arguments
        self,
        rs_server_href: str | None,
        rs_server_api_key: str | None = None,
        owner_id: str | None = None,
        logger: logging.Logger | None = None,
    ):
        """
        Initializes an RsClient instance.

        Args:
            rs_server_href (str | None): The URL of the RS-Server. Pass None for local mode.
            rs_server_api_key (str | None, optional): API key for authentication (default: None).
            owner_id (str | None, optional): ID of the catalog owner (default: None).
            logger (logging.Logger | None, optional): Logger instance (default: None).

        Raises:
            RuntimeError: If neither an API key nor an OAuth2 cookie is provided for RS-Server authentication.
            RuntimeError: If the computed owner ID is empty or contains only special characters.
        """
        self.rs_server_href: str | None = rs_server_href
        self.rs_server_api_key: str | None = rs_server_api_key
        self.rs_server_oauth2_cookie: str | None = os.getenv("RSPY_OAUTH2_COOKIE")
        self.owner_id: str | None = owner_id or os.getenv("RSPY_HOST_USER")
        self.logger: logging.Logger = logger or Logging.default(__name__)

        # Remove trailing / character(s) from the URL
        if self.rs_server_href:
            self.rs_server_href = self.rs_server_href.strip().rstrip("/").strip()

        # We are in local mode if the URL is undefined.
        # Env vars are used instead to determine the different services URL.
        self.local_mode = not bool(self.rs_server_href)

        # We are in hybrid mode if not on local mode and the API Key Manager check URL is undefined.
        # NOTE: maybe later we could define this hybrid mode in a different way.
        self.hybrid_mode = (not self.local_mode) and (not RSPY_UAC_CHECK_URL)

        if (not self.local_mode) and (not self.rs_server_api_key) and (not self.rs_server_oauth2_cookie):
            raise RuntimeError("API key or OAuth2 cookie is mandatory for RS-Server authentication")

        # For HTTP request headers
        self.apikey_headers: dict = (
            {"headers": {APIKEY_HEADER: self.rs_server_api_key}} if self.rs_server_api_key else {}
        )

        # HTTP requests session with cookies
        self.http_session = requests.Session()
        if self.rs_server_oauth2_cookie:
            self.http_session.cookies.set("session", self.rs_server_oauth2_cookie)

        # Determine automatically the owner id
        if not self.owner_id:
            # In local mode, we use the local system username
            if self.local_mode:
                self.owner_id = getpass.getuser()

            # In hybrid mode, the API Key Manager check URL is not accessible and there is no OAuth2
            # so the owner id must be set explicitly by the user.
            elif self.hybrid_mode:
                raise RuntimeError(
                    "In hybrid mode, the owner_id must be set explicitly by parameter or environment variable",
                )

            # In cluster mode, we retrieve the OAuth2 or API key login
            else:
                self.owner_id = self.apikey_user_login if self.rs_server_api_key else self.oauth2_user_login

        # Remove special characters
        self.owner_id = re.sub(r"[^a-zA-Z0-9]+", "", self.owner_id)

        if not self.owner_id:
            raise RuntimeError("The owner ID is empty or only contains special characters")

        self.logger.debug(f"Owner ID: {self.owner_id!r}")

    def log_and_raise(self, message: str, original: Exception):
        """
        Logs an error message and raises a RuntimeError.

        This method logs the provided error message using the class logger
        and raises a `RuntimeError`, preserving the original exception as the cause.

        Args:
            message (str): The error message to log.
            original (Exception): The original exception that caused the error.

        Raises:
            RuntimeError: The logged error message, with the original exception as the cause.
        """
        self.logger.exception(message)
        raise RuntimeError(message) from original

    def oauth2_security(self) -> AuthInfo:
        """
        Returns:
            Authentication information from the user keycloak account, associated to the authentication cookie.
        """

        # In local mode, we have no authentication, so return empty results
        if self.local_mode:
            return AuthInfo(user_login="", iam_roles=[], apikey_config={})

        # Call the endpoint to retrieve the user information
        response = self.http_session.get(f"{self.rs_server_href}/auth/me")
        if not response.ok:
            raise RuntimeError(f"OAuth2 status code {response.status_code}: {utils.read_response_error(response)}")

        # Decode the JSON response
        contents = response.json()
        return AuthInfo(
            user_login=contents["user_login"],
            iam_roles=contents["iam_roles"],
            apikey_config={},  # no API key config here
        )

    # The following variable is needed for the tests to pass
    apikey_security_cache: TTLCache = TTLCache(maxsize=sys.maxsize, ttl=120)

    @cached(cache=apikey_security_cache)
    def apikey_security(self) -> AuthInfo:
        """
        Check the api key validity. Cache an infinite (sys.maxsize) number of results for 120 seconds.

        Returns:
            Authentication information from the keycloak account, associated to the api key.
        """

        # In local mode, we have no API key, so return empty results
        if self.local_mode:
            return AuthInfo(user_login="", iam_roles=[], apikey_config={})

        # Request the API key manager, pass user-defined api key in http header
        self.logger.debug("Call the API key manager")
        response = self.http_session.get(RSPY_UAC_CHECK_URL, **self.apikey_headers, timeout=TIMEOUT)
        if not response.ok:
            raise RuntimeError(
                f"API key manager status code {response.status_code}: {utils.read_response_error(response)}",
            )

        # Read the api key info.
        # Note: for now, config is an empty dict.
        contents = response.json()
        return AuthInfo(
            user_login=contents["user_login"],
            iam_roles=contents["iam_roles"],
            apikey_config=contents["config"],
        )

    @property
    def oauth2_user_login(self) -> str:
        """Return the user login from the keycloak account, associated to the authentication cookie."""
        return self.oauth2_security().user_login

    @property
    def apikey_user_login(self) -> str:
        """Return the user login from the keycloak account, associated to the api key."""
        return self.apikey_security().user_login

    @property
    def oauth2_iam_roles(self) -> list[str]:
        """
        Return the IAM (Identity and Access Management) roles from the keycloak account,
        associated to the authentication cookie
        """
        return self.oauth2_security().iam_roles

    @property
    def apikey_iam_roles(self) -> list[str]:
        """
        Return the IAM (Identity and Access Management) roles from the keycloak account,
        associated to the api key.
        """
        return self.apikey_security().iam_roles

    @property
    def apikey_config(self) -> dict:
        """Return the config from the keycloak account, associated to the api key."""
        return self.apikey_security().apikey_config

    @property
    def href_service(self):
        """Implemented by child classes"""

    #############################
    # Get child class instances #
    #############################

    def get_auxip_client(self, station: EAuxipStation, **kwargs) -> "AuxipClient":  # type: ignore # noqa: F821
        """
        Return an instance of the child class AuxipClient, with the same attributes as this "self" instance.
        Args:
            station (EAuxipStation): Auxip station
        """
        from rs_client.auxip_client import (  # pylint: disable=import-outside-toplevel,cyclic-import
            AuxipClient,
        )

        return AuxipClient(self.rs_server_href, self.rs_server_api_key, self.owner_id, station, self.logger, **kwargs)

    def get_cadip_client(self, station: ECadipStation, **kwargs) -> "CadipClient":  # type: ignore # noqa: F821
        """
        Return an instance of the child class CadipClient, with the same attributes as this "self" instance.

        Args:
            station (ECadipStation): Cadip station
        """
        from rs_client.cadip_client import (  # pylint: disable=import-outside-toplevel,cyclic-import
            CadipClient,
        )

        return CadipClient(self.rs_server_href, self.rs_server_api_key, self.owner_id, station, self.logger, **kwargs)

    def get_catalog_client(self, **kwargs) -> "CatalogClient":  # type: ignore # noqa: F821
        """
        Return an instance of the child class CatalogClient, with the same attributes as this "self" instance.
        """
        from rs_client.catalog_client import (  # pylint: disable=import-outside-toplevel,cyclic-import
            CatalogClient,
        )

        return CatalogClient(
            self.rs_server_href,
            self.rs_server_api_key,
            self.owner_id,
            self.logger,
            **kwargs,
        )

    def get_staging_client(self) -> "StagingClient":  # type: ignore # noqa: F821
        """
        Return an instance of the child class AuxipClient, with the same attributes as this "self" instance.
        """
        from rs_client.staging_client import (  # pylint: disable=import-outside-toplevel,cyclic-import
            StagingClient,
        )

        return StagingClient(self.rs_server_href, self.rs_server_api_key, self.owner_id, self.logger)

apikey_config property

Return the config from the keycloak account, associated to the api key.

apikey_iam_roles property

Return the IAM (Identity and Access Management) roles from the keycloak account, associated to the api key.

apikey_user_login property

Return the user login from the keycloak account, associated to the api key.

href_service property

Implemented by child classes

oauth2_iam_roles property

Return the IAM (Identity and Access Management) roles from the keycloak account, associated to the authentication cookie

oauth2_user_login property

Return the user login from the keycloak account, associated to the authentication cookie.

__init__(rs_server_href, rs_server_api_key=None, owner_id=None, logger=None)

Initializes an RsClient instance.

Parameters:

Name Type Description Default
rs_server_href str | None

The URL of the RS-Server. Pass None for local mode.

required
rs_server_api_key str | None

API key for authentication (default: None).

None
owner_id str | None

ID of the catalog owner (default: None).

None
logger Logger | None

Logger instance (default: None).

None

Raises:

Type Description
RuntimeError

If neither an API key nor an OAuth2 cookie is provided for RS-Server authentication.

RuntimeError

If the computed owner ID is empty or contains only special characters.

Source code in docs/rs-client-libraries/rs_client/rs_client.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def __init__(  # pylint: disable=too-many-branches, too-many-arguments, too-many-positional-arguments
    self,
    rs_server_href: str | None,
    rs_server_api_key: str | None = None,
    owner_id: str | None = None,
    logger: logging.Logger | None = None,
):
    """
    Initializes an RsClient instance.

    Args:
        rs_server_href (str | None): The URL of the RS-Server. Pass None for local mode.
        rs_server_api_key (str | None, optional): API key for authentication (default: None).
        owner_id (str | None, optional): ID of the catalog owner (default: None).
        logger (logging.Logger | None, optional): Logger instance (default: None).

    Raises:
        RuntimeError: If neither an API key nor an OAuth2 cookie is provided for RS-Server authentication.
        RuntimeError: If the computed owner ID is empty or contains only special characters.
    """
    self.rs_server_href: str | None = rs_server_href
    self.rs_server_api_key: str | None = rs_server_api_key
    self.rs_server_oauth2_cookie: str | None = os.getenv("RSPY_OAUTH2_COOKIE")
    self.owner_id: str | None = owner_id or os.getenv("RSPY_HOST_USER")
    self.logger: logging.Logger = logger or Logging.default(__name__)

    # Remove trailing / character(s) from the URL
    if self.rs_server_href:
        self.rs_server_href = self.rs_server_href.strip().rstrip("/").strip()

    # We are in local mode if the URL is undefined.
    # Env vars are used instead to determine the different services URL.
    self.local_mode = not bool(self.rs_server_href)

    # We are in hybrid mode if not on local mode and the API Key Manager check URL is undefined.
    # NOTE: maybe later we could define this hybrid mode in a different way.
    self.hybrid_mode = (not self.local_mode) and (not RSPY_UAC_CHECK_URL)

    if (not self.local_mode) and (not self.rs_server_api_key) and (not self.rs_server_oauth2_cookie):
        raise RuntimeError("API key or OAuth2 cookie is mandatory for RS-Server authentication")

    # For HTTP request headers
    self.apikey_headers: dict = (
        {"headers": {APIKEY_HEADER: self.rs_server_api_key}} if self.rs_server_api_key else {}
    )

    # HTTP requests session with cookies
    self.http_session = requests.Session()
    if self.rs_server_oauth2_cookie:
        self.http_session.cookies.set("session", self.rs_server_oauth2_cookie)

    # Determine automatically the owner id
    if not self.owner_id:
        # In local mode, we use the local system username
        if self.local_mode:
            self.owner_id = getpass.getuser()

        # In hybrid mode, the API Key Manager check URL is not accessible and there is no OAuth2
        # so the owner id must be set explicitly by the user.
        elif self.hybrid_mode:
            raise RuntimeError(
                "In hybrid mode, the owner_id must be set explicitly by parameter or environment variable",
            )

        # In cluster mode, we retrieve the OAuth2 or API key login
        else:
            self.owner_id = self.apikey_user_login if self.rs_server_api_key else self.oauth2_user_login

    # Remove special characters
    self.owner_id = re.sub(r"[^a-zA-Z0-9]+", "", self.owner_id)

    if not self.owner_id:
        raise RuntimeError("The owner ID is empty or only contains special characters")

    self.logger.debug(f"Owner ID: {self.owner_id!r}")

apikey_security()

Check the api key validity. Cache an infinite (sys.maxsize) number of results for 120 seconds.

Returns:

Type Description
AuthInfo

Authentication information from the keycloak account, associated to the api key.

Source code in docs/rs-client-libraries/rs_client/rs_client.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
@cached(cache=apikey_security_cache)
def apikey_security(self) -> AuthInfo:
    """
    Check the api key validity. Cache an infinite (sys.maxsize) number of results for 120 seconds.

    Returns:
        Authentication information from the keycloak account, associated to the api key.
    """

    # In local mode, we have no API key, so return empty results
    if self.local_mode:
        return AuthInfo(user_login="", iam_roles=[], apikey_config={})

    # Request the API key manager, pass user-defined api key in http header
    self.logger.debug("Call the API key manager")
    response = self.http_session.get(RSPY_UAC_CHECK_URL, **self.apikey_headers, timeout=TIMEOUT)
    if not response.ok:
        raise RuntimeError(
            f"API key manager status code {response.status_code}: {utils.read_response_error(response)}",
        )

    # Read the api key info.
    # Note: for now, config is an empty dict.
    contents = response.json()
    return AuthInfo(
        user_login=contents["user_login"],
        iam_roles=contents["iam_roles"],
        apikey_config=contents["config"],
    )

get_auxip_client(station, **kwargs)

Return an instance of the child class AuxipClient, with the same attributes as this "self" instance. Args: station (EAuxipStation): Auxip station

Source code in docs/rs-client-libraries/rs_client/rs_client.py
259
260
261
262
263
264
265
266
267
268
269
def get_auxip_client(self, station: EAuxipStation, **kwargs) -> "AuxipClient":  # type: ignore # noqa: F821
    """
    Return an instance of the child class AuxipClient, with the same attributes as this "self" instance.
    Args:
        station (EAuxipStation): Auxip station
    """
    from rs_client.auxip_client import (  # pylint: disable=import-outside-toplevel,cyclic-import
        AuxipClient,
    )

    return AuxipClient(self.rs_server_href, self.rs_server_api_key, self.owner_id, station, self.logger, **kwargs)

get_cadip_client(station, **kwargs)

Return an instance of the child class CadipClient, with the same attributes as this "self" instance.

Parameters:

Name Type Description Default
station ECadipStation

Cadip station

required
Source code in docs/rs-client-libraries/rs_client/rs_client.py
271
272
273
274
275
276
277
278
279
280
281
282
def get_cadip_client(self, station: ECadipStation, **kwargs) -> "CadipClient":  # type: ignore # noqa: F821
    """
    Return an instance of the child class CadipClient, with the same attributes as this "self" instance.

    Args:
        station (ECadipStation): Cadip station
    """
    from rs_client.cadip_client import (  # pylint: disable=import-outside-toplevel,cyclic-import
        CadipClient,
    )

    return CadipClient(self.rs_server_href, self.rs_server_api_key, self.owner_id, station, self.logger, **kwargs)

get_catalog_client(**kwargs)

Return an instance of the child class CatalogClient, with the same attributes as this "self" instance.

Source code in docs/rs-client-libraries/rs_client/rs_client.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def get_catalog_client(self, **kwargs) -> "CatalogClient":  # type: ignore # noqa: F821
    """
    Return an instance of the child class CatalogClient, with the same attributes as this "self" instance.
    """
    from rs_client.catalog_client import (  # pylint: disable=import-outside-toplevel,cyclic-import
        CatalogClient,
    )

    return CatalogClient(
        self.rs_server_href,
        self.rs_server_api_key,
        self.owner_id,
        self.logger,
        **kwargs,
    )

get_staging_client()

Return an instance of the child class AuxipClient, with the same attributes as this "self" instance.

Source code in docs/rs-client-libraries/rs_client/rs_client.py
300
301
302
303
304
305
306
307
308
def get_staging_client(self) -> "StagingClient":  # type: ignore # noqa: F821
    """
    Return an instance of the child class AuxipClient, with the same attributes as this "self" instance.
    """
    from rs_client.staging_client import (  # pylint: disable=import-outside-toplevel,cyclic-import
        StagingClient,
    )

    return StagingClient(self.rs_server_href, self.rs_server_api_key, self.owner_id, self.logger)

log_and_raise(message, original)

Logs an error message and raises a RuntimeError.

This method logs the provided error message using the class logger and raises a RuntimeError, preserving the original exception as the cause.

Parameters:

Name Type Description Default
message str

The error message to log.

required
original Exception

The original exception that caused the error.

required

Raises:

Type Description
RuntimeError

The logged error message, with the original exception as the cause.

Source code in docs/rs-client-libraries/rs_client/rs_client.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def log_and_raise(self, message: str, original: Exception):
    """
    Logs an error message and raises a RuntimeError.

    This method logs the provided error message using the class logger
    and raises a `RuntimeError`, preserving the original exception as the cause.

    Args:
        message (str): The error message to log.
        original (Exception): The original exception that caused the error.

    Raises:
        RuntimeError: The logged error message, with the original exception as the cause.
    """
    self.logger.exception(message)
    raise RuntimeError(message) from original

oauth2_security()

Returns:

Type Description
AuthInfo

Authentication information from the user keycloak account, associated to the authentication cookie.

Source code in docs/rs-client-libraries/rs_client/rs_client.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def oauth2_security(self) -> AuthInfo:
    """
    Returns:
        Authentication information from the user keycloak account, associated to the authentication cookie.
    """

    # In local mode, we have no authentication, so return empty results
    if self.local_mode:
        return AuthInfo(user_login="", iam_roles=[], apikey_config={})

    # Call the endpoint to retrieve the user information
    response = self.http_session.get(f"{self.rs_server_href}/auth/me")
    if not response.ok:
        raise RuntimeError(f"OAuth2 status code {response.status_code}: {utils.read_response_error(response)}")

    # Decode the JSON response
    contents = response.json()
    return AuthInfo(
        user_login=contents["user_login"],
        iam_roles=contents["iam_roles"],
        apikey_config={},  # no API key config here
    )

This client is a general client used for interacting with the RS service. It can be used to retrieve a specific client, see below.

CatalogClient

Bases: RsClient

Class to handle the staging process in rs-client-libraries

This class provides python methods to call the different endpoints of the rs-server-staging method.

Remark: this class don't inherits from the owslib.ogcapi.processes.Processes class because the latter doesn't provide wrapping for all endpoints defined in rs-server-staging (it only provides the /processes and /processes/{processId}/execution endpoints + it doesn't allow to manage apikey_header parameter which is passed as an extra argument).

Source code in docs/rs-client-libraries/rs_client/staging_client.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
class StagingClient(RsClient):
    """
    Class to handle the staging process in rs-client-libraries

    This class provides python methods to call the different endpoints of the rs-server-staging method.

    Remark: this class don't inherits from the owslib.ogcapi.processes.Processes class because the latter
    doesn't provide wrapping for all endpoints defined in rs-server-staging (it only provides the  /processes
    and /processes/{processId}/execution endpoints + it doesn't allow to manage apikey_header parameter which
    is passed as an extra argument).
    """

    @property
    def href_service(self) -> str:
        """
        Return the RS-Server staging URL hostname.
        This URL can be overwritten using the RSPY_HOST_STAGING env variable (used e.g. for local mode).
        Otherwise it should just be the RS-Server URL.
        """
        return get_href_service(self.rs_server_href, "RSPY_HOST_STAGING")

    def validate_and_unmarshal_request(self, request: PreparedRequest) -> Any:
        """Validate an endpoint request according to the ogc specifications

        Args:
            request (Request): endpoint request

        Returns:
            ResponseUnmarshalResult.data: data validated by the openapi_core
            unmarshal_response method
        """
        if not DO_VALIDATE:
            return request.body

        if not os.path.isfile(PATH_TO_YAML_OPENAPI):
            raise FileNotFoundError(f"The following file path was not found: {PATH_TO_YAML_OPENAPI}")

        openapi = OpenAPI.from_file_path(PATH_TO_YAML_OPENAPI)
        openapi_request = RequestsOpenAPIRequest(request)

        # validate_request(request, spec=Spec.from_file_path(PATH_TO_YAML_OPENAPI))
        result = openapi.unmarshal_request(openapi_request)

        if result.errors:
            raise StagingValidationException(
                f"Error validating the request of the enpoint "
                f"{openapi_request.path}: {str(result.errors[0])}",  # type: ignore
            )
        if not result.body:
            raise StagingValidationException(
                f"Error validating the request of the enpoint "
                f"{openapi_request.path}: 'data' field of ResponseUnmarshalResult"
                f"object is empty",
            )
        return result.body

    def validate_and_unmarshal_response(self, response: Response) -> Any:
        """
        Validate an endpoint response according to the ogc specifications
        (described as yaml schemas)

        Args:
            response (Response): endpoint response
        Returns:
            ResponseUnmarshalResult.data: data validated by the openapi_core
            unmarshal_response method
        """
        if not DO_VALIDATE:
            if not response.content:
                raise StagingValidationException("Response content is empty !")
            return json.loads(response.content)

        if not os.path.isfile(PATH_TO_YAML_OPENAPI):
            raise FileNotFoundError(f"The following file path was not found: {PATH_TO_YAML_OPENAPI}")

        openapi = OpenAPI.from_file_path(PATH_TO_YAML_OPENAPI)
        openapi_request = RequestsOpenAPIRequest(response.request)
        openapi_response = RequestsOpenAPIResponse(response)

        # Alternative method to validate the response
        # validate_response(response=response, spec= Spec.from_file_path(PATH_TO_YAML_OPENAPI), request=request)
        result = openapi.unmarshal_response(openapi_request, openapi_response)  # type: ignore
        if result.errors:
            raise StagingValidationException(  # type: ignore
                f"Error validating the response of the enpoint "
                f"{openapi_request.path}: {str(result.errors[0])}",  # type: ignore
            )
        if not result.data:
            raise StagingValidationException(
                f"Error validating the response of the enpoint "
                f"{openapi_request.path}: 'data' field of ResponseUnmarshalResult"
                f"object is empty",
            )
        return result.data

    ############################
    # Call RS-Server endpoints #
    ############################

    def get_processes(self) -> dict:
        """_summary_

        Returns:
            dict: dictionary containing the content of the response
        """
        response = self.http_session.get(
            url=f"{self.href_service}/processes",
            timeout=TIMEOUT,
            **self.apikey_headers,
        )
        return self.validate_and_unmarshal_response(response)

    def get_process(self, process_id: str) -> dict:
        """
        Wrapper to get a specific process
        Args:
            process_id (str): name of the resource
        """
        response = self.http_session.get(
            url=f"{self.href_service}/processes/{process_id}",
            timeout=TIMEOUT,
            **self.apikey_headers,
        )
        return self.validate_and_unmarshal_response(response)

    def run_staging(  # pylint: disable=too-many-locals
        self,
        stac_input: dict[Any, Any] | str,
        out_coll_name: str,
    ) -> dict:
        """Method to start the staging process from rs-client - Call the endpoint /processes/staging/execution

        Args:
            stac_input (dict | str): input dictionary: the stac_input can have different format. It can be:
                - A Python dictionary corresponding to a Feature or a FeatureCollection (that can be for example
                  the output of a search for Cadip or Auxip sessions)
                - A json string corresponding to a Feature or a FeatureCollection
                - A string corresponding to a path to a json file containing a Feature or a FeatureCollection
            out_coll_name (): _description_

        Return:
            job_id (int, str): Returns the status code of the staging request + the identifier
            (or None if staging endpoint fails) of the running job
        """
        stac_input_dict = {}
        # If stac_input is a file, load this file to a dictionary
        if isinstance(stac_input, str):
            # If the input is a valid path to a json_file, load this file
            if os.path.exists(os.path.dirname(stac_input)) and stac_input.endswith(".json"):
                # Read the yaml or json file
                with open(stac_input, encoding="utf-8") as opened:
                    stac_file_to_dict = json.loads(opened.read())
                    stac_input_dict = stac_file_to_dict
            # If the input string is not a path, try to convert the content of the string to a json dictionary
            else:
                stac_input_dict = json.loads(stac_input)
        else:
            stac_input_dict = stac_input

        if "type" not in stac_input_dict:
            raise KeyError("Key 'type' is missing from the staging input data")

        # Validate input data using Pydantic
        if stac_input_dict["type"] == "Feature":
            stac_item = Item(**stac_input_dict)
            stac_item_collection = ItemCollection(
                **{
                    "type": "FeatureCollection",
                    "context": {"limit": 1000, "returned": 2},
                    "features": [stac_item],
                },  # type: ignore
            )
        else:
            stac_item_collection = ItemCollection(**stac_input_dict)

        # Load staging body base structure and fill it with the staging content
        if not os.path.isfile(PATH_TO_STAGING_BODY):
            raise FileNotFoundError(f"The following file path was not found: {PATH_TO_STAGING_BODY}")

        # TODO: this staging_body content will be validated with the self.validate_and_unmarshal_request(request)
        # once rs-server-staging will be updated
        with open(PATH_TO_STAGING_BODY, encoding="utf-8") as f:
            staging_body = json.load(f)
        staging_body["inputs"]["collection"]["id"] = out_coll_name
        staging_body["inputs"]["items"].update(stac_item_collection.model_dump(mode="json"))
        # TODO: replace the staging_body "outputs" field with the following when rs-server-staging is updated
        # TODO: "outputs": {"featureCollectionOutput": {"transmissionMode": "value"}},

        # Check that the request containing the staging body is valid
        request = requests.Request(  # pylint: disable=W0612 # noqa: F841
            method="POST",  # Méthode HTTP, peut être 'POST', 'GET', etc.
            url=f"{self.href_service}/processes/{RESOURCE}/execution",  # L'URL de l'endpoint
            json=staging_body,  # Corps de la requête en JSON
        ).prepare()
        # TODO: uncomment when rs-server-staging is updated
        # TODO: self.validate_and_unmarshal_request(request)

        response = self.http_session.post(
            url=f"{self.href_service}/processes/staging/execution",
            json=staging_body,
            **self.apikey_headers,
            timeout=TIMEOUT,
        )
        return self.validate_and_unmarshal_response(response)

    def get_jobs(self) -> dict:
        """Method to get running jobs"""
        response = self.http_session.get(
            url=f"{self.href_service}/jobs",
            **self.apikey_headers,
            timeout=TIMEOUT,
        )
        return self.validate_and_unmarshal_response(response)

    def get_job_info(self, job_id: str) -> dict:  # pylint: disable=too-many-locals
        """Method to get a specific job response"""
        response = self.http_session.get(
            url=f"{self.href_service}/jobs/{job_id}",
            **self.apikey_headers,
            timeout=TIMEOUT,
        )
        return self.validate_and_unmarshal_response(response)

    def delete_job(self, job_id: str) -> dict:  # pylint: disable=too-many-locals
        """Method to get a specific job response"""
        response = self.http_session.delete(
            url=f"{self.href_service}/jobs/{job_id}",
            **self.apikey_headers,
            timeout=TIMEOUT,
        )
        return self.validate_and_unmarshal_response(response)

    def get_job_results(self, job_id: str) -> dict:
        """Wrapper to get the result of a specfific job

        Args:
            job_id (str): _description_
        """
        response = self.http_session.get(
            url=f"{self.href_service}/jobs/{job_id}/results",
            timeout=TIMEOUT,
            **self.apikey_headers,
        )
        return self.validate_and_unmarshal_response(response)

href_service property

Return the RS-Server staging URL hostname. This URL can be overwritten using the RSPY_HOST_STAGING env variable (used e.g. for local mode). Otherwise it should just be the RS-Server URL.

delete_job(job_id)

Method to get a specific job response

Source code in docs/rs-client-libraries/rs_client/staging_client.py
288
289
290
291
292
293
294
295
def delete_job(self, job_id: str) -> dict:  # pylint: disable=too-many-locals
    """Method to get a specific job response"""
    response = self.http_session.delete(
        url=f"{self.href_service}/jobs/{job_id}",
        **self.apikey_headers,
        timeout=TIMEOUT,
    )
    return self.validate_and_unmarshal_response(response)

get_job_info(job_id)

Method to get a specific job response

Source code in docs/rs-client-libraries/rs_client/staging_client.py
279
280
281
282
283
284
285
286
def get_job_info(self, job_id: str) -> dict:  # pylint: disable=too-many-locals
    """Method to get a specific job response"""
    response = self.http_session.get(
        url=f"{self.href_service}/jobs/{job_id}",
        **self.apikey_headers,
        timeout=TIMEOUT,
    )
    return self.validate_and_unmarshal_response(response)

get_job_results(job_id)

Wrapper to get the result of a specfific job

Parameters:

Name Type Description Default
job_id str

description

required
Source code in docs/rs-client-libraries/rs_client/staging_client.py
297
298
299
300
301
302
303
304
305
306
307
308
def get_job_results(self, job_id: str) -> dict:
    """Wrapper to get the result of a specfific job

    Args:
        job_id (str): _description_
    """
    response = self.http_session.get(
        url=f"{self.href_service}/jobs/{job_id}/results",
        timeout=TIMEOUT,
        **self.apikey_headers,
    )
    return self.validate_and_unmarshal_response(response)

get_jobs()

Method to get running jobs

Source code in docs/rs-client-libraries/rs_client/staging_client.py
270
271
272
273
274
275
276
277
def get_jobs(self) -> dict:
    """Method to get running jobs"""
    response = self.http_session.get(
        url=f"{self.href_service}/jobs",
        **self.apikey_headers,
        timeout=TIMEOUT,
    )
    return self.validate_and_unmarshal_response(response)

get_process(process_id)

Wrapper to get a specific process Args: process_id (str): name of the resource

Source code in docs/rs-client-libraries/rs_client/staging_client.py
177
178
179
180
181
182
183
184
185
186
187
188
def get_process(self, process_id: str) -> dict:
    """
    Wrapper to get a specific process
    Args:
        process_id (str): name of the resource
    """
    response = self.http_session.get(
        url=f"{self.href_service}/processes/{process_id}",
        timeout=TIMEOUT,
        **self.apikey_headers,
    )
    return self.validate_and_unmarshal_response(response)

get_processes()

summary

Returns:

Name Type Description
dict dict

dictionary containing the content of the response

Source code in docs/rs-client-libraries/rs_client/staging_client.py
164
165
166
167
168
169
170
171
172
173
174
175
def get_processes(self) -> dict:
    """_summary_

    Returns:
        dict: dictionary containing the content of the response
    """
    response = self.http_session.get(
        url=f"{self.href_service}/processes",
        timeout=TIMEOUT,
        **self.apikey_headers,
    )
    return self.validate_and_unmarshal_response(response)

run_staging(stac_input, out_coll_name)

Method to start the staging process from rs-client - Call the endpoint /processes/staging/execution

Parameters:

Name Type Description Default
stac_input dict | str

input dictionary: the stac_input can have different format. It can be: - A Python dictionary corresponding to a Feature or a FeatureCollection (that can be for example the output of a search for Cadip or Auxip sessions) - A json string corresponding to a Feature or a FeatureCollection - A string corresponding to a path to a json file containing a Feature or a FeatureCollection

required
out_coll_name

description

required
Return

job_id (int, str): Returns the status code of the staging request + the identifier (or None if staging endpoint fails) of the running job

Source code in docs/rs-client-libraries/rs_client/staging_client.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def run_staging(  # pylint: disable=too-many-locals
    self,
    stac_input: dict[Any, Any] | str,
    out_coll_name: str,
) -> dict:
    """Method to start the staging process from rs-client - Call the endpoint /processes/staging/execution

    Args:
        stac_input (dict | str): input dictionary: the stac_input can have different format. It can be:
            - A Python dictionary corresponding to a Feature or a FeatureCollection (that can be for example
              the output of a search for Cadip or Auxip sessions)
            - A json string corresponding to a Feature or a FeatureCollection
            - A string corresponding to a path to a json file containing a Feature or a FeatureCollection
        out_coll_name (): _description_

    Return:
        job_id (int, str): Returns the status code of the staging request + the identifier
        (or None if staging endpoint fails) of the running job
    """
    stac_input_dict = {}
    # If stac_input is a file, load this file to a dictionary
    if isinstance(stac_input, str):
        # If the input is a valid path to a json_file, load this file
        if os.path.exists(os.path.dirname(stac_input)) and stac_input.endswith(".json"):
            # Read the yaml or json file
            with open(stac_input, encoding="utf-8") as opened:
                stac_file_to_dict = json.loads(opened.read())
                stac_input_dict = stac_file_to_dict
        # If the input string is not a path, try to convert the content of the string to a json dictionary
        else:
            stac_input_dict = json.loads(stac_input)
    else:
        stac_input_dict = stac_input

    if "type" not in stac_input_dict:
        raise KeyError("Key 'type' is missing from the staging input data")

    # Validate input data using Pydantic
    if stac_input_dict["type"] == "Feature":
        stac_item = Item(**stac_input_dict)
        stac_item_collection = ItemCollection(
            **{
                "type": "FeatureCollection",
                "context": {"limit": 1000, "returned": 2},
                "features": [stac_item],
            },  # type: ignore
        )
    else:
        stac_item_collection = ItemCollection(**stac_input_dict)

    # Load staging body base structure and fill it with the staging content
    if not os.path.isfile(PATH_TO_STAGING_BODY):
        raise FileNotFoundError(f"The following file path was not found: {PATH_TO_STAGING_BODY}")

    # TODO: this staging_body content will be validated with the self.validate_and_unmarshal_request(request)
    # once rs-server-staging will be updated
    with open(PATH_TO_STAGING_BODY, encoding="utf-8") as f:
        staging_body = json.load(f)
    staging_body["inputs"]["collection"]["id"] = out_coll_name
    staging_body["inputs"]["items"].update(stac_item_collection.model_dump(mode="json"))
    # TODO: replace the staging_body "outputs" field with the following when rs-server-staging is updated
    # TODO: "outputs": {"featureCollectionOutput": {"transmissionMode": "value"}},

    # Check that the request containing the staging body is valid
    request = requests.Request(  # pylint: disable=W0612 # noqa: F841
        method="POST",  # Méthode HTTP, peut être 'POST', 'GET', etc.
        url=f"{self.href_service}/processes/{RESOURCE}/execution",  # L'URL de l'endpoint
        json=staging_body,  # Corps de la requête en JSON
    ).prepare()
    # TODO: uncomment when rs-server-staging is updated
    # TODO: self.validate_and_unmarshal_request(request)

    response = self.http_session.post(
        url=f"{self.href_service}/processes/staging/execution",
        json=staging_body,
        **self.apikey_headers,
        timeout=TIMEOUT,
    )
    return self.validate_and_unmarshal_response(response)

validate_and_unmarshal_request(request)

Validate an endpoint request according to the ogc specifications

Parameters:

Name Type Description Default
request Request

endpoint request

required

Returns:

Type Description
Any

ResponseUnmarshalResult.data: data validated by the openapi_core

Any

unmarshal_response method

Source code in docs/rs-client-libraries/rs_client/staging_client.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def validate_and_unmarshal_request(self, request: PreparedRequest) -> Any:
    """Validate an endpoint request according to the ogc specifications

    Args:
        request (Request): endpoint request

    Returns:
        ResponseUnmarshalResult.data: data validated by the openapi_core
        unmarshal_response method
    """
    if not DO_VALIDATE:
        return request.body

    if not os.path.isfile(PATH_TO_YAML_OPENAPI):
        raise FileNotFoundError(f"The following file path was not found: {PATH_TO_YAML_OPENAPI}")

    openapi = OpenAPI.from_file_path(PATH_TO_YAML_OPENAPI)
    openapi_request = RequestsOpenAPIRequest(request)

    # validate_request(request, spec=Spec.from_file_path(PATH_TO_YAML_OPENAPI))
    result = openapi.unmarshal_request(openapi_request)

    if result.errors:
        raise StagingValidationException(
            f"Error validating the request of the enpoint "
            f"{openapi_request.path}: {str(result.errors[0])}",  # type: ignore
        )
    if not result.body:
        raise StagingValidationException(
            f"Error validating the request of the enpoint "
            f"{openapi_request.path}: 'data' field of ResponseUnmarshalResult"
            f"object is empty",
        )
    return result.body

validate_and_unmarshal_response(response)

Validate an endpoint response according to the ogc specifications (described as yaml schemas)

Parameters:

Name Type Description Default
response Response

endpoint response

required

Returns: ResponseUnmarshalResult.data: data validated by the openapi_core unmarshal_response method

Source code in docs/rs-client-libraries/rs_client/staging_client.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def validate_and_unmarshal_response(self, response: Response) -> Any:
    """
    Validate an endpoint response according to the ogc specifications
    (described as yaml schemas)

    Args:
        response (Response): endpoint response
    Returns:
        ResponseUnmarshalResult.data: data validated by the openapi_core
        unmarshal_response method
    """
    if not DO_VALIDATE:
        if not response.content:
            raise StagingValidationException("Response content is empty !")
        return json.loads(response.content)

    if not os.path.isfile(PATH_TO_YAML_OPENAPI):
        raise FileNotFoundError(f"The following file path was not found: {PATH_TO_YAML_OPENAPI}")

    openapi = OpenAPI.from_file_path(PATH_TO_YAML_OPENAPI)
    openapi_request = RequestsOpenAPIRequest(response.request)
    openapi_response = RequestsOpenAPIResponse(response)

    # Alternative method to validate the response
    # validate_response(response=response, spec= Spec.from_file_path(PATH_TO_YAML_OPENAPI), request=request)
    result = openapi.unmarshal_response(openapi_request, openapi_response)  # type: ignore
    if result.errors:
        raise StagingValidationException(  # type: ignore
            f"Error validating the response of the enpoint "
            f"{openapi_request.path}: {str(result.errors[0])}",  # type: ignore
        )
    if not result.data:
        raise StagingValidationException(
            f"Error validating the response of the enpoint "
            f"{openapi_request.path}: 'data' field of ResponseUnmarshalResult"
            f"object is empty",
        )
    return result.data

This client allows you to interact with the the RS-Server Staging service, making it easy to stage files from external stations CADIP/AUXIP. It inherits the RsClient class

StacBase

Bases: RsClient

Base class for interacting with a STAC (SpatioTemporal Asset Catalog) API using pystac-client.

This class provides methods to retrieve STAC collections, items, queryables, and perform searches.

Source code in docs/rs-client-libraries/rs_client/stac_base.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
class StacBase(RsClient):
    """
    Base class for interacting with a STAC (SpatioTemporal Asset Catalog) API using pystac-client.

    This class provides methods to retrieve STAC collections, items, queryables, and perform searches.
    """

    @handle_api_error
    def __init__(  # pylint: disable=too-many-branches, too-many-arguments, too-many-positional-arguments
        self,
        rs_server_href: str | None,
        rs_server_api_key: str | None = None,
        owner_id: str | None = None,
        logger: logging.Logger | None = None,
        stac_href: str | None = None,  # Flag to enable pystac_client for specific subclasses
        headers: dict[str, str] | None = None,
        parameters: dict[str, Any] | None = None,
        ignore_conformance: bool | None = None,
        modifier: Callable[[Collection | Item | ItemCollection | dict[Any, Any]], None] | None = None,
        request_modifier: Callable[[Request], Request | None] | None = None,
        stac_io: StacApiIO | None = None,
        timeout: Timeout | None = TIMEOUT,
    ):
        """
        Initialize the StacBase instance.

        Args:
            rs_server_href (str | None): URL of the RS server.
            rs_server_api_key (str | None, optional): API key for authentication.
            owner_id (str | None, optional): Owner identifier.
            logger (logging.Logger | None, optional): Logger instance.
            stac_href (str | None): STAC API URL.
            headers (Optional[Dict[str, str]], optional): HTTP headers.
            parameters (Optional[Dict[str, Any]], optional): Additional query parameters.
            ignore_conformance (Optional[bool], optional): Whether to ignore conformance.
            modifier (Callable, optional): Function to modify collection, item, or item collection.
            request_modifier (Optional[Callable[[Request], Union[Request, None]]], optional):
                                                                Function to modify requests.
            stac_io (Optional[StacApiIO], optional): Custom STAC API I/O handler.
            timeout (Optional[Timeout], optional): Request timeout.
        """
        # call RsClient init
        super().__init__(rs_server_href, rs_server_api_key, owner_id, logger)

        # Initialize pystac_client.Client only if required (for CadipClient, AuxipClient, StacClient)
        if not stac_href:
            raise RuntimeError("No stac href provided")
        # pystac_client may throw APIError exception this is handled bu the decorator handle_api_error
        self.stac_href = stac_href
        if rs_server_api_key:
            if headers is None:
                headers = {}
            headers[APIKEY_HEADER] = rs_server_api_key
        if stac_io is None:
            stac_io = StacApiIO(  # This is what is done in pystac_client/client.py::from_file
                headers=headers,
                parameters=parameters,
                request_modifier=request_modifier,
                timeout=timeout,
            )
        # Save the OAuth2 authentication cookie in the pystac client cookies
        if self.rs_server_oauth2_cookie:
            stac_io.session.cookies.set("session", self.rs_server_oauth2_cookie)
        self.ps_client = Client.open(
            stac_href,
            headers=headers,
            parameters=parameters,
            ignore_conformance=ignore_conformance,
            modifier=modifier,
            request_modifier=request_modifier,
            stac_io=stac_io,
            timeout=timeout,
        )

    ################################
    # Specific STAC implementation #
    ################################
    @handle_api_error
    def get_landing(self) -> dict:
        """
        Retrieve the STAC API landing page.

        Returns:
            dict: The landing page response.

        Raises:
            RuntimeError: If an API error occurs.
        """

        return self.ps_client.to_dict()

    @handle_api_error
    def get_collections(self) -> Iterator[Collection]:
        """
        Retrieve available STAC collections the user has permission to access.

        Returns:
            Iterator[Collection]: An iterator over available collections.

        Raises:
            RuntimeError: If an API error occurs.
        """

        # Get all the available collections
        return self.ps_client.get_collections()

    @lru_cache
    @handle_api_error
    def get_collection(self, collection_id: str) -> Collection | CollectionClient:
        """
        Retrieve a specific STAC collection by ID.

        Args:
            collection_id (str): The ID of the collection.

        Returns:
            Union[Collection, CollectionClient]: The requested collection.

        Raises:
            RuntimeError: If an API error occurs.
        """
        return self.ps_client.get_collection(collection_id)

    @handle_api_error
    def get_items(self, collection_id: str, items_ids: str | None = None) -> Iterator["Item"]:
        """
        Retrieve all items or specific items from a collection.

        Args:
            collection_id (str): The ID of the collection.
            items_ids (Union[str, None], optional): Specific item ID(s) to retrieve.

        Returns:
            Iterator[Item]: An iterator over retrieved items.
        Raises:
            RuntimeError: If an API error occurs.
        """

        # Retrieve the collection
        collection = self.ps_client.get_collection(collection_id)
        # Retrieve a list of items
        if items_ids:
            self.logger.info(f"Retrieving specific items from collection '{collection_id}'.")
            return collection.get_items(*items_ids)
        # Retrieve all items
        self.logger.info(f"Retrieving all items from collection '{collection_id}'.")
        return collection.get_items()

    @handle_api_error
    def get_item(self, collection_id: str, item_id: str) -> Item | None:
        """
        Retrieve a specific item from a collection.

        Args:
            collection_id (str): The collection ID.
            item_id (str): The item ID.

        Returns:
            Item | None: The retrieved item or None if not found.

        Raises:
            RuntimeError: If an API error occurs.
        """

        # Retrieve the collection
        collection = self.ps_client.get_collection(collection_id)
        item = collection.get_item(item_id)
        if not item:
            self.logger.error(f"Item with ID '{item_id}' not found in collection '{collection_id}'.")
        return item

    @handle_api_error
    def get_collection_queryables(self, collection_id) -> dict[str, Any]:
        """
        Retrieve queryable fields for a specific collection.

        Args:
            collection_id (str): The collection ID.

        Returns:
            Dict[str, Any]: Dictionary of queryable fields.

        Raises:
            RuntimeError: If an API error occurs.
        """

        return self.ps_client.get_merged_queryables([collection_id])

    def get_queryables(self) -> dict[str, Any]:
        """
        Retrieve queryable fields for all collections in the STAC API. These are the available terms for
        usage when writing filter expressions in /search endpoint for all the collections
        NOTE: the pystac-client library doesn't have a function for this action, so the direct call of
        the endpoint is needed

        Returns:
            Dict[str, Any]: Dictionary of queryable fields.

        Raises:
            RuntimeError: If an exception occurs from the request level.
        """
        try:
            href_queryables = self.stac_href + "queryables"
            response = self.http_session.get(
                href_queryables,
                **self.apikey_headers,
                timeout=TIMEOUT,
            )
        except (requests.exceptions.RequestException, requests.exceptions.Timeout) as e:
            self.logger.exception(f"Could not get the response from the endpoint {href_queryables}: {e}")
            raise RuntimeError(
                f"Could not get the response from the endpoint {href_queryables}",
            ) from e
        if not response.ok:
            raise RuntimeError(f"Could not get queryables from {href_queryables}")
        try:
            json_data = response.json()
            return cast(dict[str, Any], json_data)  # Explicitly cast to Dict[str, Any]
        except ValueError as e:
            raise RuntimeError(f"Invalid JSON response from {href_queryables}") from e

    @handle_api_error
    def search(  # pylint: disable=too-many-arguments, too-many-positional-arguments
        self,
        **kwargs,
    ) -> ItemCollection | None:
        """
        Perform a STAC search using query parameters.

        Returns:
            ItemCollection | None: Retrieved item collection or None if not found.

        Raises:
            RuntimeError: If an API error occurs.
        """
        kwargs.pop("owner_id", None)
        kwargs["datetime"] = kwargs.pop("timestamp", None)
        kwargs["filter"] = kwargs.pop("stac_filter", None)

        try:
            items_search = self.ps_client.search(**kwargs)

            return items_search.item_collection()
        except NotImplementedError:
            self.logger.exception(
                "The API does not conform to the STAC API Item Search spec"
                "or does not have a link with a 'rel' type of 'search' ",
            )
        return None

__init__(rs_server_href, rs_server_api_key=None, owner_id=None, logger=None, stac_href=None, headers=None, parameters=None, ignore_conformance=None, modifier=None, request_modifier=None, stac_io=None, timeout=TIMEOUT)

Initialize the StacBase instance.

Parameters:

Name Type Description Default
rs_server_href str | None

URL of the RS server.

required
rs_server_api_key str | None

API key for authentication.

None
owner_id str | None

Owner identifier.

None
logger Logger | None

Logger instance.

None
stac_href str | None

STAC API URL.

None
headers Optional[Dict[str, str]]

HTTP headers.

None
parameters Optional[Dict[str, Any]]

Additional query parameters.

None
ignore_conformance Optional[bool]

Whether to ignore conformance.

None
modifier Callable

Function to modify collection, item, or item collection.

None
request_modifier Optional[Callable[[Request], Union[Request, None]]]
                                            Function to modify requests.
None
stac_io Optional[StacApiIO]

Custom STAC API I/O handler.

None
timeout Optional[Timeout]

Request timeout.

TIMEOUT
Source code in docs/rs-client-libraries/rs_client/stac_base.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@handle_api_error
def __init__(  # pylint: disable=too-many-branches, too-many-arguments, too-many-positional-arguments
    self,
    rs_server_href: str | None,
    rs_server_api_key: str | None = None,
    owner_id: str | None = None,
    logger: logging.Logger | None = None,
    stac_href: str | None = None,  # Flag to enable pystac_client for specific subclasses
    headers: dict[str, str] | None = None,
    parameters: dict[str, Any] | None = None,
    ignore_conformance: bool | None = None,
    modifier: Callable[[Collection | Item | ItemCollection | dict[Any, Any]], None] | None = None,
    request_modifier: Callable[[Request], Request | None] | None = None,
    stac_io: StacApiIO | None = None,
    timeout: Timeout | None = TIMEOUT,
):
    """
    Initialize the StacBase instance.

    Args:
        rs_server_href (str | None): URL of the RS server.
        rs_server_api_key (str | None, optional): API key for authentication.
        owner_id (str | None, optional): Owner identifier.
        logger (logging.Logger | None, optional): Logger instance.
        stac_href (str | None): STAC API URL.
        headers (Optional[Dict[str, str]], optional): HTTP headers.
        parameters (Optional[Dict[str, Any]], optional): Additional query parameters.
        ignore_conformance (Optional[bool], optional): Whether to ignore conformance.
        modifier (Callable, optional): Function to modify collection, item, or item collection.
        request_modifier (Optional[Callable[[Request], Union[Request, None]]], optional):
                                                            Function to modify requests.
        stac_io (Optional[StacApiIO], optional): Custom STAC API I/O handler.
        timeout (Optional[Timeout], optional): Request timeout.
    """
    # call RsClient init
    super().__init__(rs_server_href, rs_server_api_key, owner_id, logger)

    # Initialize pystac_client.Client only if required (for CadipClient, AuxipClient, StacClient)
    if not stac_href:
        raise RuntimeError("No stac href provided")
    # pystac_client may throw APIError exception this is handled bu the decorator handle_api_error
    self.stac_href = stac_href
    if rs_server_api_key:
        if headers is None:
            headers = {}
        headers[APIKEY_HEADER] = rs_server_api_key
    if stac_io is None:
        stac_io = StacApiIO(  # This is what is done in pystac_client/client.py::from_file
            headers=headers,
            parameters=parameters,
            request_modifier=request_modifier,
            timeout=timeout,
        )
    # Save the OAuth2 authentication cookie in the pystac client cookies
    if self.rs_server_oauth2_cookie:
        stac_io.session.cookies.set("session", self.rs_server_oauth2_cookie)
    self.ps_client = Client.open(
        stac_href,
        headers=headers,
        parameters=parameters,
        ignore_conformance=ignore_conformance,
        modifier=modifier,
        request_modifier=request_modifier,
        stac_io=stac_io,
        timeout=timeout,
    )

get_collection(collection_id) cached

Retrieve a specific STAC collection by ID.

Parameters:

Name Type Description Default
collection_id str

The ID of the collection.

required

Returns:

Type Description
Collection | CollectionClient

Union[Collection, CollectionClient]: The requested collection.

Raises:

Type Description
RuntimeError

If an API error occurs.

Source code in docs/rs-client-libraries/rs_client/stac_base.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@lru_cache
@handle_api_error
def get_collection(self, collection_id: str) -> Collection | CollectionClient:
    """
    Retrieve a specific STAC collection by ID.

    Args:
        collection_id (str): The ID of the collection.

    Returns:
        Union[Collection, CollectionClient]: The requested collection.

    Raises:
        RuntimeError: If an API error occurs.
    """
    return self.ps_client.get_collection(collection_id)

get_collection_queryables(collection_id)

Retrieve queryable fields for a specific collection.

Parameters:

Name Type Description Default
collection_id str

The collection ID.

required

Returns:

Type Description
dict[str, Any]

Dict[str, Any]: Dictionary of queryable fields.

Raises:

Type Description
RuntimeError

If an API error occurs.

Source code in docs/rs-client-libraries/rs_client/stac_base.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
@handle_api_error
def get_collection_queryables(self, collection_id) -> dict[str, Any]:
    """
    Retrieve queryable fields for a specific collection.

    Args:
        collection_id (str): The collection ID.

    Returns:
        Dict[str, Any]: Dictionary of queryable fields.

    Raises:
        RuntimeError: If an API error occurs.
    """

    return self.ps_client.get_merged_queryables([collection_id])

get_collections()

Retrieve available STAC collections the user has permission to access.

Returns:

Type Description
Iterator[Collection]

Iterator[Collection]: An iterator over available collections.

Raises:

Type Description
RuntimeError

If an API error occurs.

Source code in docs/rs-client-libraries/rs_client/stac_base.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
@handle_api_error
def get_collections(self) -> Iterator[Collection]:
    """
    Retrieve available STAC collections the user has permission to access.

    Returns:
        Iterator[Collection]: An iterator over available collections.

    Raises:
        RuntimeError: If an API error occurs.
    """

    # Get all the available collections
    return self.ps_client.get_collections()

get_item(collection_id, item_id)

Retrieve a specific item from a collection.

Parameters:

Name Type Description Default
collection_id str

The collection ID.

required
item_id str

The item ID.

required

Returns:

Type Description
Item | None

Item | None: The retrieved item or None if not found.

Raises:

Type Description
RuntimeError

If an API error occurs.

Source code in docs/rs-client-libraries/rs_client/stac_base.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
@handle_api_error
def get_item(self, collection_id: str, item_id: str) -> Item | None:
    """
    Retrieve a specific item from a collection.

    Args:
        collection_id (str): The collection ID.
        item_id (str): The item ID.

    Returns:
        Item | None: The retrieved item or None if not found.

    Raises:
        RuntimeError: If an API error occurs.
    """

    # Retrieve the collection
    collection = self.ps_client.get_collection(collection_id)
    item = collection.get_item(item_id)
    if not item:
        self.logger.error(f"Item with ID '{item_id}' not found in collection '{collection_id}'.")
    return item

get_items(collection_id, items_ids=None)

Retrieve all items or specific items from a collection.

Parameters:

Name Type Description Default
collection_id str

The ID of the collection.

required
items_ids Union[str, None]

Specific item ID(s) to retrieve.

None

Returns:

Type Description
Iterator[Item]

Iterator[Item]: An iterator over retrieved items.

Raises: RuntimeError: If an API error occurs.

Source code in docs/rs-client-libraries/rs_client/stac_base.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@handle_api_error
def get_items(self, collection_id: str, items_ids: str | None = None) -> Iterator["Item"]:
    """
    Retrieve all items or specific items from a collection.

    Args:
        collection_id (str): The ID of the collection.
        items_ids (Union[str, None], optional): Specific item ID(s) to retrieve.

    Returns:
        Iterator[Item]: An iterator over retrieved items.
    Raises:
        RuntimeError: If an API error occurs.
    """

    # Retrieve the collection
    collection = self.ps_client.get_collection(collection_id)
    # Retrieve a list of items
    if items_ids:
        self.logger.info(f"Retrieving specific items from collection '{collection_id}'.")
        return collection.get_items(*items_ids)
    # Retrieve all items
    self.logger.info(f"Retrieving all items from collection '{collection_id}'.")
    return collection.get_items()

get_landing()

Retrieve the STAC API landing page.

Returns:

Name Type Description
dict dict

The landing page response.

Raises:

Type Description
RuntimeError

If an API error occurs.

Source code in docs/rs-client-libraries/rs_client/stac_base.py
142
143
144
145
146
147
148
149
150
151
152
153
154
@handle_api_error
def get_landing(self) -> dict:
    """
    Retrieve the STAC API landing page.

    Returns:
        dict: The landing page response.

    Raises:
        RuntimeError: If an API error occurs.
    """

    return self.ps_client.to_dict()

get_queryables()

Retrieve queryable fields for all collections in the STAC API. These are the available terms for usage when writing filter expressions in /search endpoint for all the collections NOTE: the pystac-client library doesn't have a function for this action, so the direct call of the endpoint is needed

Returns:

Type Description
dict[str, Any]

Dict[str, Any]: Dictionary of queryable fields.

Raises:

Type Description
RuntimeError

If an exception occurs from the request level.

Source code in docs/rs-client-libraries/rs_client/stac_base.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def get_queryables(self) -> dict[str, Any]:
    """
    Retrieve queryable fields for all collections in the STAC API. These are the available terms for
    usage when writing filter expressions in /search endpoint for all the collections
    NOTE: the pystac-client library doesn't have a function for this action, so the direct call of
    the endpoint is needed

    Returns:
        Dict[str, Any]: Dictionary of queryable fields.

    Raises:
        RuntimeError: If an exception occurs from the request level.
    """
    try:
        href_queryables = self.stac_href + "queryables"
        response = self.http_session.get(
            href_queryables,
            **self.apikey_headers,
            timeout=TIMEOUT,
        )
    except (requests.exceptions.RequestException, requests.exceptions.Timeout) as e:
        self.logger.exception(f"Could not get the response from the endpoint {href_queryables}: {e}")
        raise RuntimeError(
            f"Could not get the response from the endpoint {href_queryables}",
        ) from e
    if not response.ok:
        raise RuntimeError(f"Could not get queryables from {href_queryables}")
    try:
        json_data = response.json()
        return cast(dict[str, Any], json_data)  # Explicitly cast to Dict[str, Any]
    except ValueError as e:
        raise RuntimeError(f"Invalid JSON response from {href_queryables}") from e

search(**kwargs)

Perform a STAC search using query parameters.

Returns:

Type Description
ItemCollection | None

ItemCollection | None: Retrieved item collection or None if not found.

Raises:

Type Description
RuntimeError

If an API error occurs.

Source code in docs/rs-client-libraries/rs_client/stac_base.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
@handle_api_error
def search(  # pylint: disable=too-many-arguments, too-many-positional-arguments
    self,
    **kwargs,
) -> ItemCollection | None:
    """
    Perform a STAC search using query parameters.

    Returns:
        ItemCollection | None: Retrieved item collection or None if not found.

    Raises:
        RuntimeError: If an API error occurs.
    """
    kwargs.pop("owner_id", None)
    kwargs["datetime"] = kwargs.pop("timestamp", None)
    kwargs["filter"] = kwargs.pop("stac_filter", None)

    try:
        items_search = self.ps_client.search(**kwargs)

        return items_search.item_collection()
    except NotImplementedError:
        self.logger.exception(
            "The API does not conform to the STAC API Item Search spec"
            "or does not have a link with a 'rel' type of 'search' ",
        )
    return None

The StacBase class serves as a foundational implementation for interacting with a STAC (SpatioTemporal Asset Catalog) API to provide a robust interface for retrieving collections, items, and queryables, as well as performing searches. It inherits the RsClient class

AuxipClient

Bases: StacBase

AuxipClient class implementation.

Attributes: see :py:class:RsClient

Source code in docs/rs-client-libraries/rs_client/auxip_client.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
class AuxipClient(StacBase):
    """
    AuxipClient class implementation.

    Attributes: see :py:class:`RsClient`
    """

    def __init__(  # pylint: disable=too-many-arguments, too-many-positional-arguments
        self,
        rs_server_href: str | None,
        rs_server_api_key: str | None,
        owner_id: str | None,
        station: EAuxipStation | str,
        logger: logging.Logger | None = None,
        **kwargs: dict[str, Any],
    ):
        """
        Initializes an AuxipClient instance.

        Args:
            rs_server_href (str | None): The URL of the RS-Server. Pass None for local mode.
            rs_server_api_key (str | None): API key for authentication.
            owner_id (str | None): ID of the catalog owner.
            station (EAuxipStation | str): The AUXIP station identifier.
            logger (logging.Logger | None, optional): Logger instance (default: None).
            **kwargs: Arbitrary keyword arguments that may include:
                - `headers` (Optional[Dict[str, str]])
                - `parameters` (Optional[Dict[str, Any]])
                - `ignore_conformance` (Optional[bool])
                - `modifier` (Callable[[Collection | Item | ItemCollection | dict[Any, Any]], None] | None)
                - `request_modifier` (Optional[Callable[[Request], Union[Request, None]]])
                - `stac_io` (Optional[StacApiIO])
                - `timeout` (Optional[Timeout])

        Raises:
            RuntimeError: If the provided station is not a valid AUXIP station.
        """
        super().__init__(
            rs_server_href,
            rs_server_api_key,
            owner_id,
            logger,
            get_href_service(rs_server_href, "RSPY_HOST_ADGS") + "/auxip/",
            **kwargs,
        )
        try:
            self.station: EAuxipStation = EAuxipStation[station] if isinstance(station, str) else station
        except KeyError as e:
            self.log_and_raise(f"There is no such AUXIP station: {station}", e)

    @property
    def href_service(self) -> str:
        """
        Return the RS-Server ADGS URL hostname.
        This URL can be overwritten using the RSPY_HOST_ADGS env variable (used e.g. for local mode).
        Otherwise it should just be the RS-Server URL.
        """
        return get_href_service(self.rs_server_href, "RSPY_HOST_ADGS") + "/auxip"

    @property
    def station_name(self) -> str:
        """Return the station name."""
        return self.station.value

href_service property

Return the RS-Server ADGS URL hostname. This URL can be overwritten using the RSPY_HOST_ADGS env variable (used e.g. for local mode). Otherwise it should just be the RS-Server URL.

station_name property

Return the station name.

__init__(rs_server_href, rs_server_api_key, owner_id, station, logger=None, **kwargs)

Initializes an AuxipClient instance.

Parameters:

Name Type Description Default
rs_server_href str | None

The URL of the RS-Server. Pass None for local mode.

required
rs_server_api_key str | None

API key for authentication.

required
owner_id str | None

ID of the catalog owner.

required
station EAuxipStation | str

The AUXIP station identifier.

required
logger Logger | None

Logger instance (default: None).

None
**kwargs dict[str, Any]

Arbitrary keyword arguments that may include: - headers (Optional[Dict[str, str]]) - parameters (Optional[Dict[str, Any]]) - ignore_conformance (Optional[bool]) - modifier (Callable[[Collection | Item | ItemCollection | dict[Any, Any]], None] | None) - request_modifier (Optional[Callable[[Request], Union[Request, None]]]) - stac_io (Optional[StacApiIO]) - timeout (Optional[Timeout])

{}

Raises:

Type Description
RuntimeError

If the provided station is not a valid AUXIP station.

Source code in docs/rs-client-libraries/rs_client/auxip_client.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __init__(  # pylint: disable=too-many-arguments, too-many-positional-arguments
    self,
    rs_server_href: str | None,
    rs_server_api_key: str | None,
    owner_id: str | None,
    station: EAuxipStation | str,
    logger: logging.Logger | None = None,
    **kwargs: dict[str, Any],
):
    """
    Initializes an AuxipClient instance.

    Args:
        rs_server_href (str | None): The URL of the RS-Server. Pass None for local mode.
        rs_server_api_key (str | None): API key for authentication.
        owner_id (str | None): ID of the catalog owner.
        station (EAuxipStation | str): The AUXIP station identifier.
        logger (logging.Logger | None, optional): Logger instance (default: None).
        **kwargs: Arbitrary keyword arguments that may include:
            - `headers` (Optional[Dict[str, str]])
            - `parameters` (Optional[Dict[str, Any]])
            - `ignore_conformance` (Optional[bool])
            - `modifier` (Callable[[Collection | Item | ItemCollection | dict[Any, Any]], None] | None)
            - `request_modifier` (Optional[Callable[[Request], Union[Request, None]]])
            - `stac_io` (Optional[StacApiIO])
            - `timeout` (Optional[Timeout])

    Raises:
        RuntimeError: If the provided station is not a valid AUXIP station.
    """
    super().__init__(
        rs_server_href,
        rs_server_api_key,
        owner_id,
        logger,
        get_href_service(rs_server_href, "RSPY_HOST_ADGS") + "/auxip/",
        **kwargs,
    )
    try:
        self.station: EAuxipStation = EAuxipStation[station] if isinstance(station, str) else station
    except KeyError as e:
        self.log_and_raise(f"There is no such AUXIP station: {station}", e)

The AuxipClient is tailored for accessing the AUXIP service. It includes functionalities for querying auxiliary data and metadata from an external AUXIP station. It inherits the StacBase class

CadipClient

Bases: StacBase

CadipClient class implementation.

see :py:class:`RsClient`

Name Type Description
station ECadipStation

Cadip station

Source code in docs/rs-client-libraries/rs_client/cadip_client.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class CadipClient(StacBase):
    """
    CadipClient class implementation.

    Attributes: see :py:class:`RsClient`
        station (ECadipStation): Cadip station
    """

    def __init__(  # pylint: disable=too-many-arguments, too-many-positional-arguments
        self,
        rs_server_href: str | None,
        rs_server_api_key: str | None,
        owner_id: str | None,
        station: str,
        logger: logging.Logger | None = None,
        **kwargs: dict[str, Any],
    ):
        """
        Initializes a CadipClient instance.

        Args:
            rs_server_href (str | None): The URL of the RS-Server. Pass None for local mode.
            rs_server_api_key (str | None): API key for authentication.
            owner_id (str | None): ID of the catalog owner.
            station (EAuxipStation | str): The CADIP station identifier.
            logger (logging.Logger | None, optional): Logger instance (default: None).
            **kwargs: Arbitrary keyword arguments that may include:
                - `headers` (Optional[Dict[str, str]])
                - `parameters` (Optional[Dict[str, Any]])
                - `ignore_conformance` (Optional[bool])
                - `modifier` (Callable[[Collection | Item | ItemCollection | dict[Any, Any]], None] | None)
                - `request_modifier` (Optional[Callable[[Request], Union[Request, None]]])
                - `stac_io` (Optional[StacApiIO])
                - `timeout` (Optional[Timeout])

        Raises:
            RuntimeError: If the provided station is not a valid CADIP station.
        """
        super().__init__(
            rs_server_href,
            rs_server_api_key,
            owner_id,
            logger,
            get_href_service(rs_server_href, "RSPY_HOST_CADIP") + "/cadip/",
            **kwargs,
        )
        try:
            self.station: ECadipStation = ECadipStation[station] if isinstance(station, str) else station
        except KeyError as e:
            self.log_and_raise(f"There is no such CADIP station: {station}", e)

    @property
    def href_service(self) -> str:
        """
        Return the RS-Server CADIP URL hostname.
        This URL can be overwritten using the RSPY_HOST_CADIP env variable (used e.g. for local mode).
        Otherwise it should just be the RS-Server URL.
        """
        return get_href_service(self.rs_server_href, "RSPY_HOST_CADIP") + "/cadip"

    @property
    def station_name(self) -> str:
        """Return the station name."""
        return self.station.value

href_service property

Return the RS-Server CADIP URL hostname. This URL can be overwritten using the RSPY_HOST_CADIP env variable (used e.g. for local mode). Otherwise it should just be the RS-Server URL.

station_name property

Return the station name.

__init__(rs_server_href, rs_server_api_key, owner_id, station, logger=None, **kwargs)

Initializes a CadipClient instance.

Parameters:

Name Type Description Default
rs_server_href str | None

The URL of the RS-Server. Pass None for local mode.

required
rs_server_api_key str | None

API key for authentication.

required
owner_id str | None

ID of the catalog owner.

required
station EAuxipStation | str

The CADIP station identifier.

required
logger Logger | None

Logger instance (default: None).

None
**kwargs dict[str, Any]

Arbitrary keyword arguments that may include: - headers (Optional[Dict[str, str]]) - parameters (Optional[Dict[str, Any]]) - ignore_conformance (Optional[bool]) - modifier (Callable[[Collection | Item | ItemCollection | dict[Any, Any]], None] | None) - request_modifier (Optional[Callable[[Request], Union[Request, None]]]) - stac_io (Optional[StacApiIO]) - timeout (Optional[Timeout])

{}

Raises:

Type Description
RuntimeError

If the provided station is not a valid CADIP station.

Source code in docs/rs-client-libraries/rs_client/cadip_client.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def __init__(  # pylint: disable=too-many-arguments, too-many-positional-arguments
    self,
    rs_server_href: str | None,
    rs_server_api_key: str | None,
    owner_id: str | None,
    station: str,
    logger: logging.Logger | None = None,
    **kwargs: dict[str, Any],
):
    """
    Initializes a CadipClient instance.

    Args:
        rs_server_href (str | None): The URL of the RS-Server. Pass None for local mode.
        rs_server_api_key (str | None): API key for authentication.
        owner_id (str | None): ID of the catalog owner.
        station (EAuxipStation | str): The CADIP station identifier.
        logger (logging.Logger | None, optional): Logger instance (default: None).
        **kwargs: Arbitrary keyword arguments that may include:
            - `headers` (Optional[Dict[str, str]])
            - `parameters` (Optional[Dict[str, Any]])
            - `ignore_conformance` (Optional[bool])
            - `modifier` (Callable[[Collection | Item | ItemCollection | dict[Any, Any]], None] | None)
            - `request_modifier` (Optional[Callable[[Request], Union[Request, None]]])
            - `stac_io` (Optional[StacApiIO])
            - `timeout` (Optional[Timeout])

    Raises:
        RuntimeError: If the provided station is not a valid CADIP station.
    """
    super().__init__(
        rs_server_href,
        rs_server_api_key,
        owner_id,
        logger,
        get_href_service(rs_server_href, "RSPY_HOST_CADIP") + "/cadip/",
        **kwargs,
    )
    try:
        self.station: ECadipStation = ECadipStation[station] if isinstance(station, str) else station
    except KeyError as e:
        self.log_and_raise(f"There is no such CADIP station: {station}", e)

CadipClient is designed to interface with the CADIP service. It includes functionalities for querying data and metadata from an external CADIP station. It inherits the StacBase class

CatalogClient

Bases: StacBase

CatalogClient inherits from both rs_client.RsClient and pystac_client.Client. The goal of this class is to allow an user to use RS-Server services more easily than calling REST endpoints directly.

Source code in docs/rs-client-libraries/rs_client/catalog_client.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
class CatalogClient(StacBase):  # type: ignore # pylint: disable=too-many-ancestors
    """CatalogClient inherits from both rs_client.RsClient and pystac_client.Client. The goal of this class is to
    allow an user to use RS-Server services more easily than calling REST endpoints directly.
    """

    ##################
    # Initialisation #
    ##################

    def __init__(  # pylint: disable=too-many-arguments, too-many-positional-arguments
        self,
        rs_server_href: str | None,
        rs_server_api_key: str | None,
        owner_id: str | None,
        logger: logging.Logger | None = None,
        **kwargs,
    ):
        """CatalogClient class constructor."""
        super().__init__(
            rs_server_href,
            rs_server_api_key,
            owner_id,
            logger,
            get_href_service(rs_server_href, "RSPY_HOST_CATALOG") + "/catalog/",
            **kwargs,
        )

    ##############
    # Properties #
    ##############
    @property
    def href_service(self) -> str:
        """
        Return the RS-Server Catalog URL hostname.
        This URL can be overwritten using the RSPY_HOST_CATALOG env variable (used e.g. for local mode).
        Otherwise it should just be the RS-Server URL.
        """

        return get_href_service(self.rs_server_href, "RSPY_HOST_CATALOG")

    def full_collection_id(self, owner_id: str | None, collection_id: str, concat_char: str | None = None) -> str:
        """
        Generates a full collection identifier by concatenating the owner ID and collection ID.

        This function constructs a full collection ID by combining the provided `owner_id` (or a
        default owner ID from `self.owner_id`) with `collection_id` using a specified separator.

        Parameters:
            owner_id (str | None): The owner identifier. If `None`, it defaults to `self.owner_id`.
            collection_id (str): The collection identifier that must always be provided.
            concat_char (str | None, optional): The character used to concatenate `owner_id`
                                                and `collection_id`. Defaults to ":".

        Returns:
            str: A string representing the full collection ID, formatted as:
                `"owner_id:collection_id"` by default or using the specified `concat_char`, that may
                be `_`.

        Raises:
            - **AttributeError**: If `self.owner_id` is not set and `owner_id` is `None`,
                causing an attempt to concatenate a `NoneType` with a string.

        Notes:
            - This function is useful in scenarios where collections are stored with unique
                identifiers that require owner prefixes for proper scoping.
        """

        if not concat_char:
            concat_char = ":"
        return f"{owner_id or self.owner_id}{concat_char}{collection_id}"

    ################################
    # Specific STAC implementation #
    ################################

    # STAC read opperations. These can be done with pystac_client (by calling super StacBase functions)

    def get_collection(  # type: ignore
        self,
        collection_id: str,
        owner_id: str | None = None,
    ) -> Collection | CollectionClient:
        """Get the requested collection"""
        return super().get_collection(self.full_collection_id(owner_id, collection_id, ":"))

    def get_items(
        self,
        collection_id: str,
        items_ids: str | None = None,
        owner_id: str | None = None,
    ) -> Iterator[Item]:
        """Get all items from a specific collection."""
        return super().get_items(self.full_collection_id(owner_id, collection_id, ":"), items_ids)

    def get_item(self, collection_id: str, item_id: str, owner_id: str | None = None):
        """Get an item from a specific collection."""
        return super().get_item(self.full_collection_id(owner_id, collection_id, ":"), item_id)

    def search(  # type: ignore # pylint: disable=too-many-arguments, arguments-differ
        self,
        **kwargs,
    ) -> ItemSearch | None:
        """Search items inside a specific collection."""

        kwargs["collections"] = [
            self.full_collection_id(kwargs["owner_id"], collection, "_") for collection in kwargs["collections"]
        ]  # type: ignore
        return super().search(**kwargs)  # type: ignore

    # end of STAC read opperations

    # STAC write opperations. These can't be done with pystac_client
    # - add_collection
    # - remove_collection
    # - add_item
    # - remove_item

    def add_collection(
        self,
        collection: Collection,
        add_public_license: bool = True,
        owner_id: str | None = None,
        timeout: int = TIMEOUT,
    ) -> Response:
        """Update the collection links, then post the collection into the catalog.

        Args:
            collection (Collection): STAC collection
            add_public_license (bool): If True, add a public domain license field and link.
            owner_id (str, optional): Collection owner ID. If missing, we use self.owner_id.
            timeout (int): The timeout duration for the HTTP request.

        Returns:
            JSONResponse (json): The response of the request.
        """

        full_owner_id = owner_id or self.owner_id

        # Use owner_id:collection_id instead of just the collection ID, before adding the links,
        # so the links contain the full owner_id:collection_id
        short_collection_id = collection.id
        full_collection_id = self.full_collection_id(owner_id, short_collection_id)
        collection.id = full_collection_id

        # Default description
        if not collection.description:
            collection.description = f"This is the collection {short_collection_id} from user {full_owner_id}."

        # Add the owner_id as an extra field
        collection.extra_fields["owner"] = full_owner_id

        # Add public domain license
        if add_public_license:
            collection.license = "public-domain"
            collection.add_link(
                Link(
                    rel=RelType.LICENSE,
                    target="https://creativecommons.org/licenses/publicdomain/",
                    title="public-domain",
                ),
            )

        # Update the links
        self.ps_client.add_child(collection)

        # Restore the short collection_id at the root of the collection
        collection.id = short_collection_id

        # Check that the collection is compliant to STAC
        collection.validate_all()

        # Post the collection to the catalog
        return self.http_session.post(
            f"{self.href_service}/catalog/collections",
            json=collection.to_dict(),
            **self.apikey_headers,
            timeout=timeout,
        )

    def remove_collection(
        self,
        collection_id: str,
        owner_id: str | None = None,
        timeout: int = TIMEOUT,
    ) -> Response:
        """Remove/delete a collection from the catalog.

        Args:
            collection_id (str): The collection id.
            owner_id (str, optional): Collection owner ID. If missing, we use self.owner_id.
            timeout (int): The timeout duration for the HTTP request.

        Returns:
            JSONResponse: The response of the request.
        """
        # owner_id:collection_id
        full_collection_id = self.full_collection_id(owner_id, collection_id)

        # Remove the collection from the "child" links of the local catalog instance
        collection_link = f"{self.ps_client.self_href.rstrip('/')}/collections/{full_collection_id}"
        self.ps_client.links = [
            link
            for link in self.ps_client.links
            if not ((link.rel == pystac.RelType.CHILD) and (link.href == collection_link))
        ]

        # We need to clear the cache for this and parent "get_collection" methods
        # because their returned value must be updated.
        self.ps_client.get_collection.cache_clear()

        # Remove the collection from the server catalog
        return self.http_session.delete(
            f"{self.href_service}/catalog/collections/{full_collection_id}",
            **self.apikey_headers,
            timeout=timeout,
        )

    def add_item(  # type: ignore # pylint: disable=arguments-renamed
        self,
        collection_id: str,
        item: Item,
        owner_id: str | None = None,
        timeout: int = TIMEOUT,
    ) -> Response:
        """Update the item links, then post the item into the catalog.

        Args:
            collection_id (str): The collection id.
            item (Item): STAC item to update and post
            owner_id (str, optional): Collection owner ID. If missing, we use self.owner_id.
            timeout (int): The timeout duration for the HTTP request.

        Returns:
            JSONResponse: The response of the request.
        """
        # owner_id:collection_id
        full_collection_id = self.full_collection_id(owner_id, collection_id)

        # Check that the item is compliant to STAC
        item.validate()

        # Get the collection from the catalog
        collection = self.get_collection(collection_id, owner_id)

        # Update the item  contents
        collection.add_item(item)  # type: ignore

        # Post the item to the catalog
        return self.http_session.post(
            f"{self.href_service}/catalog/collections/{full_collection_id}/items",
            json=item.to_dict(),
            **self.apikey_headers,
            timeout=timeout,
        )

    def remove_item(  # type: ignore # pylint: disable=arguments-differ
        self,
        collection_id: str,
        item_id: str,
        owner_id: str | None = None,
        timeout: int = TIMEOUT,
    ) -> Response:
        """Remove/delete an item from a collection.

        Args:
            collection_id (str): The collection id.
            item_id (str): The item id.
            owner_id (str, optional): Collection owner ID. If missing, we use self.owner_id.
            timeout (int): The timeout duration for the HTTP request.

        Returns:
            JSONResponse: The response of the request.
        """
        # owner_id:collection_id
        full_collection_id = self.full_collection_id(owner_id, collection_id)

        # Remove the collection from the server catalog
        return self.http_session.delete(
            f"{self.href_service}/catalog/collections/{full_collection_id}/items/{item_id}",
            **self.apikey_headers,
            timeout=timeout,
        )

href_service property

Return the RS-Server Catalog URL hostname. This URL can be overwritten using the RSPY_HOST_CATALOG env variable (used e.g. for local mode). Otherwise it should just be the RS-Server URL.

__init__(rs_server_href, rs_server_api_key, owner_id, logger=None, **kwargs)

CatalogClient class constructor.

Source code in docs/rs-client-libraries/rs_client/catalog_client.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(  # pylint: disable=too-many-arguments, too-many-positional-arguments
    self,
    rs_server_href: str | None,
    rs_server_api_key: str | None,
    owner_id: str | None,
    logger: logging.Logger | None = None,
    **kwargs,
):
    """CatalogClient class constructor."""
    super().__init__(
        rs_server_href,
        rs_server_api_key,
        owner_id,
        logger,
        get_href_service(rs_server_href, "RSPY_HOST_CATALOG") + "/catalog/",
        **kwargs,
    )

add_collection(collection, add_public_license=True, owner_id=None, timeout=TIMEOUT)

Update the collection links, then post the collection into the catalog.

Parameters:

Name Type Description Default
collection Collection

STAC collection

required
add_public_license bool

If True, add a public domain license field and link.

True
owner_id str

Collection owner ID. If missing, we use self.owner_id.

None
timeout int

The timeout duration for the HTTP request.

TIMEOUT

Returns:

Name Type Description
JSONResponse json

The response of the request.

Source code in docs/rs-client-libraries/rs_client/catalog_client.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def add_collection(
    self,
    collection: Collection,
    add_public_license: bool = True,
    owner_id: str | None = None,
    timeout: int = TIMEOUT,
) -> Response:
    """Update the collection links, then post the collection into the catalog.

    Args:
        collection (Collection): STAC collection
        add_public_license (bool): If True, add a public domain license field and link.
        owner_id (str, optional): Collection owner ID. If missing, we use self.owner_id.
        timeout (int): The timeout duration for the HTTP request.

    Returns:
        JSONResponse (json): The response of the request.
    """

    full_owner_id = owner_id or self.owner_id

    # Use owner_id:collection_id instead of just the collection ID, before adding the links,
    # so the links contain the full owner_id:collection_id
    short_collection_id = collection.id
    full_collection_id = self.full_collection_id(owner_id, short_collection_id)
    collection.id = full_collection_id

    # Default description
    if not collection.description:
        collection.description = f"This is the collection {short_collection_id} from user {full_owner_id}."

    # Add the owner_id as an extra field
    collection.extra_fields["owner"] = full_owner_id

    # Add public domain license
    if add_public_license:
        collection.license = "public-domain"
        collection.add_link(
            Link(
                rel=RelType.LICENSE,
                target="https://creativecommons.org/licenses/publicdomain/",
                title="public-domain",
            ),
        )

    # Update the links
    self.ps_client.add_child(collection)

    # Restore the short collection_id at the root of the collection
    collection.id = short_collection_id

    # Check that the collection is compliant to STAC
    collection.validate_all()

    # Post the collection to the catalog
    return self.http_session.post(
        f"{self.href_service}/catalog/collections",
        json=collection.to_dict(),
        **self.apikey_headers,
        timeout=timeout,
    )

add_item(collection_id, item, owner_id=None, timeout=TIMEOUT)

Update the item links, then post the item into the catalog.

Parameters:

Name Type Description Default
collection_id str

The collection id.

required
item Item

STAC item to update and post

required
owner_id str

Collection owner ID. If missing, we use self.owner_id.

None
timeout int

The timeout duration for the HTTP request.

TIMEOUT

Returns:

Name Type Description
JSONResponse Response

The response of the request.

Source code in docs/rs-client-libraries/rs_client/catalog_client.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def add_item(  # type: ignore # pylint: disable=arguments-renamed
    self,
    collection_id: str,
    item: Item,
    owner_id: str | None = None,
    timeout: int = TIMEOUT,
) -> Response:
    """Update the item links, then post the item into the catalog.

    Args:
        collection_id (str): The collection id.
        item (Item): STAC item to update and post
        owner_id (str, optional): Collection owner ID. If missing, we use self.owner_id.
        timeout (int): The timeout duration for the HTTP request.

    Returns:
        JSONResponse: The response of the request.
    """
    # owner_id:collection_id
    full_collection_id = self.full_collection_id(owner_id, collection_id)

    # Check that the item is compliant to STAC
    item.validate()

    # Get the collection from the catalog
    collection = self.get_collection(collection_id, owner_id)

    # Update the item  contents
    collection.add_item(item)  # type: ignore

    # Post the item to the catalog
    return self.http_session.post(
        f"{self.href_service}/catalog/collections/{full_collection_id}/items",
        json=item.to_dict(),
        **self.apikey_headers,
        timeout=timeout,
    )

full_collection_id(owner_id, collection_id, concat_char=None)

Generates a full collection identifier by concatenating the owner ID and collection ID.

This function constructs a full collection ID by combining the provided owner_id (or a default owner ID from self.owner_id) with collection_id using a specified separator.

Parameters:

Name Type Description Default
owner_id str | None

The owner identifier. If None, it defaults to self.owner_id.

required
collection_id str

The collection identifier that must always be provided.

required
concat_char str | None

The character used to concatenate owner_id and collection_id. Defaults to ":".

None

Returns:

Name Type Description
str str

A string representing the full collection ID, formatted as: "owner_id:collection_id" by default or using the specified concat_char, that may be _.

Raises:

Type Description
- **AttributeError**

If self.owner_id is not set and owner_id is None, causing an attempt to concatenate a NoneType with a string.

Notes
  • This function is useful in scenarios where collections are stored with unique identifiers that require owner prefixes for proper scoping.
Source code in docs/rs-client-libraries/rs_client/catalog_client.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def full_collection_id(self, owner_id: str | None, collection_id: str, concat_char: str | None = None) -> str:
    """
    Generates a full collection identifier by concatenating the owner ID and collection ID.

    This function constructs a full collection ID by combining the provided `owner_id` (or a
    default owner ID from `self.owner_id`) with `collection_id` using a specified separator.

    Parameters:
        owner_id (str | None): The owner identifier. If `None`, it defaults to `self.owner_id`.
        collection_id (str): The collection identifier that must always be provided.
        concat_char (str | None, optional): The character used to concatenate `owner_id`
                                            and `collection_id`. Defaults to ":".

    Returns:
        str: A string representing the full collection ID, formatted as:
            `"owner_id:collection_id"` by default or using the specified `concat_char`, that may
            be `_`.

    Raises:
        - **AttributeError**: If `self.owner_id` is not set and `owner_id` is `None`,
            causing an attempt to concatenate a `NoneType` with a string.

    Notes:
        - This function is useful in scenarios where collections are stored with unique
            identifiers that require owner prefixes for proper scoping.
    """

    if not concat_char:
        concat_char = ":"
    return f"{owner_id or self.owner_id}{concat_char}{collection_id}"

get_collection(collection_id, owner_id=None)

Get the requested collection

Source code in docs/rs-client-libraries/rs_client/catalog_client.py
110
111
112
113
114
115
116
def get_collection(  # type: ignore
    self,
    collection_id: str,
    owner_id: str | None = None,
) -> Collection | CollectionClient:
    """Get the requested collection"""
    return super().get_collection(self.full_collection_id(owner_id, collection_id, ":"))

get_item(collection_id, item_id, owner_id=None)

Get an item from a specific collection.

Source code in docs/rs-client-libraries/rs_client/catalog_client.py
127
128
129
def get_item(self, collection_id: str, item_id: str, owner_id: str | None = None):
    """Get an item from a specific collection."""
    return super().get_item(self.full_collection_id(owner_id, collection_id, ":"), item_id)

get_items(collection_id, items_ids=None, owner_id=None)

Get all items from a specific collection.

Source code in docs/rs-client-libraries/rs_client/catalog_client.py
118
119
120
121
122
123
124
125
def get_items(
    self,
    collection_id: str,
    items_ids: str | None = None,
    owner_id: str | None = None,
) -> Iterator[Item]:
    """Get all items from a specific collection."""
    return super().get_items(self.full_collection_id(owner_id, collection_id, ":"), items_ids)

remove_collection(collection_id, owner_id=None, timeout=TIMEOUT)

Remove/delete a collection from the catalog.

Parameters:

Name Type Description Default
collection_id str

The collection id.

required
owner_id str

Collection owner ID. If missing, we use self.owner_id.

None
timeout int

The timeout duration for the HTTP request.

TIMEOUT

Returns:

Name Type Description
JSONResponse Response

The response of the request.

Source code in docs/rs-client-libraries/rs_client/catalog_client.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def remove_collection(
    self,
    collection_id: str,
    owner_id: str | None = None,
    timeout: int = TIMEOUT,
) -> Response:
    """Remove/delete a collection from the catalog.

    Args:
        collection_id (str): The collection id.
        owner_id (str, optional): Collection owner ID. If missing, we use self.owner_id.
        timeout (int): The timeout duration for the HTTP request.

    Returns:
        JSONResponse: The response of the request.
    """
    # owner_id:collection_id
    full_collection_id = self.full_collection_id(owner_id, collection_id)

    # Remove the collection from the "child" links of the local catalog instance
    collection_link = f"{self.ps_client.self_href.rstrip('/')}/collections/{full_collection_id}"
    self.ps_client.links = [
        link
        for link in self.ps_client.links
        if not ((link.rel == pystac.RelType.CHILD) and (link.href == collection_link))
    ]

    # We need to clear the cache for this and parent "get_collection" methods
    # because their returned value must be updated.
    self.ps_client.get_collection.cache_clear()

    # Remove the collection from the server catalog
    return self.http_session.delete(
        f"{self.href_service}/catalog/collections/{full_collection_id}",
        **self.apikey_headers,
        timeout=timeout,
    )

remove_item(collection_id, item_id, owner_id=None, timeout=TIMEOUT)

Remove/delete an item from a collection.

Parameters:

Name Type Description Default
collection_id str

The collection id.

required
item_id str

The item id.

required
owner_id str

Collection owner ID. If missing, we use self.owner_id.

None
timeout int

The timeout duration for the HTTP request.

TIMEOUT

Returns:

Name Type Description
JSONResponse Response

The response of the request.

Source code in docs/rs-client-libraries/rs_client/catalog_client.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def remove_item(  # type: ignore # pylint: disable=arguments-differ
    self,
    collection_id: str,
    item_id: str,
    owner_id: str | None = None,
    timeout: int = TIMEOUT,
) -> Response:
    """Remove/delete an item from a collection.

    Args:
        collection_id (str): The collection id.
        item_id (str): The item id.
        owner_id (str, optional): Collection owner ID. If missing, we use self.owner_id.
        timeout (int): The timeout duration for the HTTP request.

    Returns:
        JSONResponse: The response of the request.
    """
    # owner_id:collection_id
    full_collection_id = self.full_collection_id(owner_id, collection_id)

    # Remove the collection from the server catalog
    return self.http_session.delete(
        f"{self.href_service}/catalog/collections/{full_collection_id}/items/{item_id}",
        **self.apikey_headers,
        timeout=timeout,
    )

search(**kwargs)

Search items inside a specific collection.

Source code in docs/rs-client-libraries/rs_client/catalog_client.py
131
132
133
134
135
136
137
138
139
140
def search(  # type: ignore # pylint: disable=too-many-arguments, arguments-differ
    self,
    **kwargs,
) -> ItemSearch | None:
    """Search items inside a specific collection."""

    kwargs["collections"] = [
        self.full_collection_id(kwargs["owner_id"], collection, "_") for collection in kwargs["collections"]
    ]  # type: ignore
    return super().search(**kwargs)  # type: ignore

This client allows you to interact with the STAC service, making it easy to search, retrieve, and manage spatio-temporal asset catalog data. It inherits the StacBase class

For detailed usage instructions and examples for each client, please refer to the respective sections.