Skip to content

rs_server_common/data_retrieval/eodag_provider.md

<< Back to index

EODAG Provider.

CustomEODataAccessGateway

Bases: EODataAccessGateway

EODataAccessGateway with a custom config directory management.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class CustomEODataAccessGateway(EODataAccessGateway):
    """EODataAccessGateway with a custom config directory management."""

    def __init__(self, *args, **kwargs):
        """Constructor"""

        self.lock = Lock()
        self.all_auth_providers = []  # all authenticated providers

        # Init environment
        self.eodag_cfg_dir = tempfile.TemporaryDirectory()  # pylint: disable=consider-using-with
        os.environ["EODAG_CFG_DIR"] = self.eodag_cfg_dir.name
        # disable product types discovery
        os.environ["EODAG_EXT_PRODUCT_TYPES_CFG_FILE"] = ""

        # Environment variable values, the last time we checked them. They will be read by eodag.
        self.old_environ = dict(os.environ)

        # Init eodag instance
        super().__init__(*args, **kwargs)

    def __del__(self):
        """Destructor"""
        try:
            shutil.rmtree(self.eodag_cfg_dir.name)  # remove the unique /tmp dir
        except FileNotFoundError:
            pass

    @classmethod
    @lru_cache
    def create(cls, *args, **kwargs) -> "CustomEODataAccessGateway":
        """Return a cached instance of the class."""
        return cls(*args, **kwargs)

    def authenticate_provider(self, provider: str, external_config: StationExternalAuthenticationConfig):
        """
        Set the authentication for an external provider (=station).

        Args:
            provider: the name of the eodag provider (=station name)
            external_config: external provider (=station) authentication from rs-server.yaml file
            or RSPY__TOKEN__xxx env vars
        """

        # In a lock, call this function only once by provider, to avoid changing the config
        # and using it (when calling a search) at the same time.
        with self.lock:
            if provider in self.all_auth_providers:
                return
            self.all_auth_providers.append(provider)

            provider_config = self.providers_config[provider]
            if env_bool("RSPY_USE_MODULE_FOR_STATION_TOKEN", default=False):
                provider_config.update(
                    {"auth": {"credentials": {"token": get_station_token(external_config, {})["access_token"]}}},
                )
            else:
                # mandatory keys
                provider_config.update(
                    {
                        "auth": {
                            "auth_uri": external_config.token_url,
                            "refresh_uri": external_config.token_url,
                            "req_data": {
                                "client_id": external_config.client_id,
                                "client_secret": external_config.client_secret,
                                "username": external_config.username,
                                "password": external_config.password,
                                "grant_type": external_config.grant_type,
                            },
                        },
                    },
                )

                # Used to set the authorization for token retrieval
                if external_config.authorization is not None:
                    provider_config.update({"auth": {"credentials": {"auth_for_token": external_config.authorization}}})

                # optional keys
                if external_config.scope:
                    provider_config.update({"auth": {"req_data": {"scope": external_config.scope}}})

__del__()

Destructor

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
69
70
71
72
73
74
def __del__(self):
    """Destructor"""
    try:
        shutil.rmtree(self.eodag_cfg_dir.name)  # remove the unique /tmp dir
    except FileNotFoundError:
        pass

__init__(*args, **kwargs)

Constructor

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def __init__(self, *args, **kwargs):
    """Constructor"""

    self.lock = Lock()
    self.all_auth_providers = []  # all authenticated providers

    # Init environment
    self.eodag_cfg_dir = tempfile.TemporaryDirectory()  # pylint: disable=consider-using-with
    os.environ["EODAG_CFG_DIR"] = self.eodag_cfg_dir.name
    # disable product types discovery
    os.environ["EODAG_EXT_PRODUCT_TYPES_CFG_FILE"] = ""

    # Environment variable values, the last time we checked them. They will be read by eodag.
    self.old_environ = dict(os.environ)

    # Init eodag instance
    super().__init__(*args, **kwargs)

authenticate_provider(provider, external_config)

Set the authentication for an external provider (=station).

Parameters:

Name Type Description Default
provider str

the name of the eodag provider (=station name)

required
external_config StationExternalAuthenticationConfig

external provider (=station) authentication from rs-server.yaml file

required
Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def authenticate_provider(self, provider: str, external_config: StationExternalAuthenticationConfig):
    """
    Set the authentication for an external provider (=station).

    Args:
        provider: the name of the eodag provider (=station name)
        external_config: external provider (=station) authentication from rs-server.yaml file
        or RSPY__TOKEN__xxx env vars
    """

    # In a lock, call this function only once by provider, to avoid changing the config
    # and using it (when calling a search) at the same time.
    with self.lock:
        if provider in self.all_auth_providers:
            return
        self.all_auth_providers.append(provider)

        provider_config = self.providers_config[provider]
        if env_bool("RSPY_USE_MODULE_FOR_STATION_TOKEN", default=False):
            provider_config.update(
                {"auth": {"credentials": {"token": get_station_token(external_config, {})["access_token"]}}},
            )
        else:
            # mandatory keys
            provider_config.update(
                {
                    "auth": {
                        "auth_uri": external_config.token_url,
                        "refresh_uri": external_config.token_url,
                        "req_data": {
                            "client_id": external_config.client_id,
                            "client_secret": external_config.client_secret,
                            "username": external_config.username,
                            "password": external_config.password,
                            "grant_type": external_config.grant_type,
                        },
                    },
                },
            )

            # Used to set the authorization for token retrieval
            if external_config.authorization is not None:
                provider_config.update({"auth": {"credentials": {"auth_for_token": external_config.authorization}}})

            # optional keys
            if external_config.scope:
                provider_config.update({"auth": {"req_data": {"scope": external_config.scope}}})

create(*args, **kwargs) cached classmethod

Return a cached instance of the class.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
76
77
78
79
80
@classmethod
@lru_cache
def create(cls, *args, **kwargs) -> "CustomEODataAccessGateway":
    """Return a cached instance of the class."""
    return cls(*args, **kwargs)

EodagProvider

Bases: Provider

An EODAG provider.

It uses EODAG to provide data from external sources.

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
class EodagProvider(Provider):
    """An EODAG provider.

    It uses EODAG to provide data from external sources.
    """

    def __init__(self, external_config: StationExternalAuthenticationConfig, eodag_config_path: Path, provider: str):
        """Create a EODAG provider.

        Args:
            external_config: external provider (=station) authentication from rs-server.yaml file
            or RSPY__TOKEN__xxx env vars. Override values from the eodag config file (below).
            eodag_config_path: path to the eodag configuration file (adgs_ws_config.yaml or cadip_ws_config.yaml)
            provider: the name of the eodag provider (=station name)
        """
        self.provider: str = provider
        self.eodag_config_path = eodag_config_path.resolve().as_posix()
        try:
            with global_lock:  # use a thread lock before calling the lru_cache
                self.client = CustomEODataAccessGateway.create(self.eodag_config_path)
        except Exception as e:
            raise CreateProviderFailed(f"Can't initialize {self.provider} provider") from e
        self.client.set_preferred_provider(self.provider)
        self.client.authenticate_provider(self.provider, external_config)

    def _handle_multiple_values(self, mapped_search_args: dict, values: list | str, singular_key: str, plural_key: str):
        value = values[0] if isinstance(values, list) and len(values) == 1 else values
        key = plural_key if isinstance(value, list) else singular_key
        mapped_search_args[key] = ",".join(f"'{p}'" for p in value) if isinstance(value, list) else f"'{value}'"

    def _specific_search(self, **kwargs) -> SearchResult | list:  # pylint: disable=too-many-branches,too-many-locals
        """
        Conducts a search for products using the specified OData arguments.

        This private method interfaces with the EODAG client's search functionality
        to retrieve products that match the given search parameters. It handles
        special cases such as `PublicationDate` and session ID lists while enforcing
        pagination constraints as per provider limitations.

        Args:
            **kwargs: Arbitrary keyword arguments specifying search parameters,
                including all queryables defined in the provider's configuration as OData arguments.

        Returns:
            Union[SearchResult, List]: A `SearchResult` object containing the matched products
            or an empty list if no matches are found.

        Raises:
            HTTPException: If a validation error occurs in the search query.
            SearchProductFailed: If the search request fails due to request errors,
                misconfiguration, or authentication issues.
            ValueError: If authentication with EODAG fails.

        Notes:
            - Ensures compliance with provider-specific constraints, such as pagination limits.
            - Logs encountered errors and provides detailed messages in case of failures.
        """

        mapped_search_args: dict[str, str | None] = {}
        if session_id := kwargs.pop("SessionId", None):
            # Map session_id to the appropriate eodag parameter
            self._handle_multiple_values(mapped_search_args, session_id, "SessionId", "SessionIds")

        if kwargs.pop("sessions_search", False):
            # If request is for session search, handle platform - if any provided.
            if platform := kwargs.pop("Satellite", None):
                self._handle_multiple_values(mapped_search_args, platform, "platform", "platforms")

        for multivalued_key in ("attr_ptype", "platformSerialIdentifier", "platformShortName"):
            # Handle AUXIP/PRIP parameters that can have one or several values
            if values := kwargs.pop(multivalued_key, None):
                self._handle_multiple_values(mapped_search_args, values, multivalued_key, multivalued_key + "s")

        if date_time := kwargs.pop("PublicationDate", False):
            # Since now both for files and sessions, time interval is optional, map it if provided.
            fixed, start, end = (str(date) if date else None for date in date_time)
            mapped_search_args.update(
                {
                    "PublicationDate": fixed,
                    "StartPublicationDate": start,
                    "StopPublicationDate": end,
                },
            )

        for op in temporal_operations:
            if query := kwargs.pop(op, None):
                mapped_search_args[op] = query

        max_items_allowed = int(self.client.providers_config[self.provider].search.pagination["max_items_per_page"])
        if int(kwargs["items_per_page"]) > max_items_allowed:
            logger.warning(
                f"Requesting {kwargs['items_per_page']} exceeds maximum of {max_items_allowed} "
                "allowed for this provider!",
            )
            logger.warning(f"Number of items per page was set to {max_items_allowed - 1}.")
            kwargs["items_per_page"] = max_items_allowed - 1
        try:
            logger.info(f"Searching from {self.provider} with parameters {mapped_search_args} and kwargs {kwargs}")
            # Start search -> user defined search params in mapped_search_args (id), pagination in kwargs (top, limit).
            # search_method = self.client.search if "session" not in self.provider else self.client.search_iter_page
            try:
                prov_cfg = self.client.providers_config[self.provider]
                products_cfg = getattr(prov_cfg, "products", {})
                dataset_key = next(iter(products_cfg.keys()))
            except Exception:  # pylint: disable=broad-exception-caught
                dataset_key = "S1_SAR_RAW"  # last-resort fallback
            products = self.client.search(
                **mapped_search_args,  # type: ignore
                provider=self.provider,
                raise_errors=True,
                productType=dataset_key,
                **kwargs,
            )
            repr(products)  # trigger eodag validation.

        except ValidationError as exc:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=exc.message) from exc
        except (RequestError, MisconfiguredError) as e:
            # invalid token: EODAG returns an exception with "FORBIDDEN" in e.args when the token key is invalid.
            if e.args and "FORBIDDEN" in e.args[0]:
                raise SearchProductFailed(
                    f"Can't search provider {self.provider} " "because the used token is not valid",
                ) from e
            logger.debug(e)
            raise SearchProductFailed(e) from e
        except AuthenticationError as exc:
            raise ValueError("EoDAG could not authenticate") from exc

        if products.number_matched:
            logger.info(f"Returned {products.number_matched} session from {self.provider}")

        if products.errors:
            logger.error(f"Errors from {self.provider}: {products.errors}")
        return products

    def download(self, product_id: str, to_file: Path) -> None:
        """Download the expected product at the given local location.

        EODAG needs an EOProduct to download.
        We build an EOProduct from the id and download location
        to be able to call EODAG for download.


        Args:
            product_id: the id of the product to download
            to_file: the path where the product has to be download

        Returns:
            None

        """
        # Dirty fix for eodag: change extension
        org_file = to_file
        to_file = to_file.with_suffix(to_file.suffix + "_fix_eodag")

        # Use thread-lock because self.client.download is not thread-safe
        with self.client.lock:
            product = self.create_eodag_product(product_id, to_file.name)
            # download_plugin = self.client._plugins_manager.get_download_plugin(product)
            # authent_plugin = self.client._plugins_manager.get_auth_plugin(product.provider)
            # product.register_downloader(download_plugin, authent_plugin)
            self.client.download(product, output_dir=str(to_file.parent))

            # Dirty fix continued: rename the download directory
            if to_file.is_dir() and (not org_file.is_dir()):
                to_file.rename(org_file)

    def create_eodag_product(self, product_id: str, filename: str):
        """Initialize an EO product with minimal properties.

        The title is used by EODAG as the name of the downloaded file.
        The download link is used by EODAG as http request url for download.
        The geometry is mandatory in an EO Product so we add the all earth as geometry.

        Args:
            product_id (str): the id of EO Product
            filename (str): the name of the downloaded file

        Returns:
            product (EOProduct): the initialized EO Product

        """
        try:
            with open(self.eodag_config_path, encoding="utf-8") as f:
                base_uri = yaml.safe_load(f)[self.provider.lower()]["download"]["base_uri"]
            return EOProduct(
                self.provider,
                {
                    "id": product_id,
                    "title": filename,
                    "geometry": "POLYGON((180 -90, 180 90, -180 90, -180 -90, 180 -90))",
                    # TODO build from configuration (but how ?)
                    "downloadLink": f"{base_uri}({product_id})/$value",
                },
            )
        except Exception as e:
            raise CreateProviderFailed(f"Can't initialize {self.provider} download provider") from e

__init__(external_config, eodag_config_path, provider)

Create a EODAG provider.

Parameters:

Name Type Description Default
external_config StationExternalAuthenticationConfig

external provider (=station) authentication from rs-server.yaml file

required
eodag_config_path Path

path to the eodag configuration file (adgs_ws_config.yaml or cadip_ws_config.yaml)

required
provider str

the name of the eodag provider (=station name)

required
Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def __init__(self, external_config: StationExternalAuthenticationConfig, eodag_config_path: Path, provider: str):
    """Create a EODAG provider.

    Args:
        external_config: external provider (=station) authentication from rs-server.yaml file
        or RSPY__TOKEN__xxx env vars. Override values from the eodag config file (below).
        eodag_config_path: path to the eodag configuration file (adgs_ws_config.yaml or cadip_ws_config.yaml)
        provider: the name of the eodag provider (=station name)
    """
    self.provider: str = provider
    self.eodag_config_path = eodag_config_path.resolve().as_posix()
    try:
        with global_lock:  # use a thread lock before calling the lru_cache
            self.client = CustomEODataAccessGateway.create(self.eodag_config_path)
    except Exception as e:
        raise CreateProviderFailed(f"Can't initialize {self.provider} provider") from e
    self.client.set_preferred_provider(self.provider)
    self.client.authenticate_provider(self.provider, external_config)

create_eodag_product(product_id, filename)

Initialize an EO product with minimal properties.

The title is used by EODAG as the name of the downloaded file. The download link is used by EODAG as http request url for download. The geometry is mandatory in an EO Product so we add the all earth as geometry.

Parameters:

Name Type Description Default
product_id str

the id of EO Product

required
filename str

the name of the downloaded file

required

Returns:

Name Type Description
product EOProduct

the initialized EO Product

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def create_eodag_product(self, product_id: str, filename: str):
    """Initialize an EO product with minimal properties.

    The title is used by EODAG as the name of the downloaded file.
    The download link is used by EODAG as http request url for download.
    The geometry is mandatory in an EO Product so we add the all earth as geometry.

    Args:
        product_id (str): the id of EO Product
        filename (str): the name of the downloaded file

    Returns:
        product (EOProduct): the initialized EO Product

    """
    try:
        with open(self.eodag_config_path, encoding="utf-8") as f:
            base_uri = yaml.safe_load(f)[self.provider.lower()]["download"]["base_uri"]
        return EOProduct(
            self.provider,
            {
                "id": product_id,
                "title": filename,
                "geometry": "POLYGON((180 -90, 180 90, -180 90, -180 -90, 180 -90))",
                # TODO build from configuration (but how ?)
                "downloadLink": f"{base_uri}({product_id})/$value",
            },
        )
    except Exception as e:
        raise CreateProviderFailed(f"Can't initialize {self.provider} download provider") from e

download(product_id, to_file)

Download the expected product at the given local location.

EODAG needs an EOProduct to download. We build an EOProduct from the id and download location to be able to call EODAG for download.

Parameters:

Name Type Description Default
product_id str

the id of the product to download

required
to_file Path

the path where the product has to be download

required

Returns:

Type Description
None

None

Source code in docs/rs-server/services/common/rs_server_common/data_retrieval/eodag_provider.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def download(self, product_id: str, to_file: Path) -> None:
    """Download the expected product at the given local location.

    EODAG needs an EOProduct to download.
    We build an EOProduct from the id and download location
    to be able to call EODAG for download.


    Args:
        product_id: the id of the product to download
        to_file: the path where the product has to be download

    Returns:
        None

    """
    # Dirty fix for eodag: change extension
    org_file = to_file
    to_file = to_file.with_suffix(to_file.suffix + "_fix_eodag")

    # Use thread-lock because self.client.download is not thread-safe
    with self.client.lock:
        product = self.create_eodag_product(product_id, to_file.name)
        # download_plugin = self.client._plugins_manager.get_download_plugin(product)
        # authent_plugin = self.client._plugins_manager.get_auth_plugin(product.provider)
        # product.register_downloader(download_plugin, authent_plugin)
        self.client.download(product, output_dir=str(to_file.parent))

        # Dirty fix continued: rename the download directory
        if to_file.is_dir() and (not org_file.is_dir()):
            to_file.rename(org_file)