Skip to content

rs_server_prip/api/prip_search.md

<< Back to index

Module for interacting with PRIP system through a FastAPI APIRouter.

This module provides functionality to retrieve a list of products from the PRIP stations. It includes an API endpoint, utility functions, and initialization for accessing EODataAccessGateway.

MockPgstacPrip

Bases: MockPgstac

PRIP implementation of MockPgstac

Source code in docs/rs-server/services/prip/rs_server_prip/api/prip_search.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class MockPgstacPrip(MockPgstac):
    """PRIP implementation of MockPgstac"""

    def __init__(self, request: Request | None = None, readwrite: Literal["r", "w"] | None = None):
        super().__init__(
            request=request,
            readwrite=readwrite,
            service="prip",
            all_collections=lambda: read_conf()["collections"],
            select_config=select_config,
            stac_to_odata=stac_to_odata,
            map_mission=map_auxip_prip_mission,
            temporal_mapping={
                "eopf:origin_datetime": "OriginDate",
                "start_datetime": "ContentDate/Start",
                "datetime": "ContentDate/Start",
                "end_datetime": "ContentDate/End",
                "created": "Attributes[Name=processingDate]",
                "processing:datetime": "Attributes[Name=processingDate]",
                "published": "PublicationDate",
            },
        )
        self.sortby = "-published"

    @handle_exceptions
    def process_search(
        self,
        station: str,
        odata_params: dict,
        collection_provider: Callable[[dict], str | None],
        limit: int,
        page: int,
    ) -> stac_pydantic.ItemCollection:
        """Search prip products for the given station and OData parameters."""
        # Update odata names that shadow eodag builtins (productype)

        odata_params["Name"] = names[0] if isinstance(names := odata_params.get("Name"), list) else names
        if product_type := odata_params.pop("productType", None):
            odata_params["attr_ptype"] = split_multiple_values(product_type)

        for key in ("platformSerialIdentifier", "platformShortName"):
            if value := odata_params.pop(key, None):
                odata_params[key] = split_multiple_values(value)

        return process_product_search(station, odata_params, collection_provider, limit, self.sortby, page)

Search prip products for the given station and OData parameters.

Source code in docs/rs-server/services/prip/rs_server_prip/api/prip_search.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@handle_exceptions
def process_search(
    self,
    station: str,
    odata_params: dict,
    collection_provider: Callable[[dict], str | None],
    limit: int,
    page: int,
) -> stac_pydantic.ItemCollection:
    """Search prip products for the given station and OData parameters."""
    # Update odata names that shadow eodag builtins (productype)

    odata_params["Name"] = names[0] if isinstance(names := odata_params.get("Name"), list) else names
    if product_type := odata_params.pop("productType", None):
        odata_params["attr_ptype"] = split_multiple_values(product_type)

    for key in ("platformSerialIdentifier", "platformShortName"):
        if value := odata_params.pop(key, None):
            odata_params[key] = split_multiple_values(value)

    return process_product_search(station, odata_params, collection_provider, limit, self.sortby, page)

auth_validation(request, collection_id, access_type)

Check if the user KeyCloak roles contain the right for this specific PRIP collection and access type.

Parameters:

Name Type Description Default
collection_id str

Used to find the PRIP station from the RSPY_PRIP_SEARCH_CONFIG config yaml file.

required
access_type str

The type of access, such as "download" or "read".

required
Source code in docs/rs-server/services/prip/rs_server_prip/api/prip_search.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def auth_validation(request: Request, collection_id: str, access_type: str):
    """
    Check if the user KeyCloak roles contain the right for this specific PRIP collection and access type.

    Args:
        collection_id (str): Used to find the PRIP station from the RSPY_PRIP_SEARCH_CONFIG config yaml file.
        access_type (str): The type of access, such as "download" or "read".
    """

    # Find the collection which id == the input collection_id
    collection = select_config(collection_id)
    if not collection:
        raise HTTPException(status.HTTP_404_NOT_FOUND, f"Unknown PRIP collection: {collection_id!r}")
    station = collection["station"]

    # Call the authentication function from the authentication module
    authentication.auth_validation("prip", access_type, request=request, station=station)

get_allowed_prip_collections(request) async

Return the PRIP collections to which the user has access to.

Source code in docs/rs-server/services/prip/rs_server_prip/api/prip_search.py
169
170
171
172
173
174
175
@router.get("/prip/collections")
@handle_exceptions
async def get_allowed_prip_collections(request: Request):
    """Return the PRIP collections to which the user has access to."""
    logger.info(f"Starting {request.url.path}")
    authentication.auth_validation("prip", "landing_page", request=request)
    return await request.app.state.pgstac_client.all_collections(request=request)

get_conformance(request) async

Return the STAC/OGC conformance classes implemented by this server.

Source code in docs/rs-server/services/prip/rs_server_prip/api/prip_search.py
162
163
164
165
166
@router.get("/prip/conformance")
async def get_conformance(request: Request):
    """Return the STAC/OGC conformance classes implemented by this server."""
    authentication.auth_validation("prip", "landing_page", request=request)
    return await request.app.state.pgstac_client.conformance()

get_prip_collection(request, collection_id) async

Return a specific PRIP collection.

Source code in docs/rs-server/services/prip/rs_server_prip/api/prip_search.py
178
179
180
181
182
183
184
185
186
187
@router.get("/prip/collections/{collection_id}")
@handle_exceptions
async def get_prip_collection(
    request: Request,
    collection_id: str,
) -> list[dict] | dict | stac_pydantic.Collection:
    """Return a specific PRIP collection."""
    logger.info(f"Starting {request.url.path}")
    auth_validation(request, collection_id, "read")
    return await request.app.state.pgstac_client.get_collection(collection_id, request)

get_prip_collection_items(request, collection_id, bbox=None, datetime=None, filter_=None, filter_lang='cql2-text', sortby=None, limit=None, page=None) async

Retrieve a list of items from a specified PRIP collection.

This endpoint returns a collection of items associated with the given PRIP collection ID. It utilizes the collection ID to validate access and fetches the items based on defined query parameters.

Parameters:

Name Type Description Default
collection_id str

PRIP collection ID. Must be a valid collection identifier (e.g., 'ins_s1').

required
bbox BBoxType

Bounding box filter as four or six numbers [west, south, east, north] or [west, south, minz, east, north, maxz]. Defaults to None. If provided, items intersecting the bbox are returned.

None
datetime DateTimeType

Temporal filter. Either a single RFC 3339 timestamp/interval (e.g., "2024-01-01T00:00:00Z") or a closed/open interval (e.g., "2024-01-01T00:00:00Z/2024-01-31T23:59:59Z", "../2024-01-01T00:00:00Z"). Defaults to None.

None
filter_ FilterType

CQL2 filter expression (text or JSON depending on filter_lang). Defaults to None.

None
filter_lang FilterLangType

CQL2 language for filter_. One of "cql2-text" or "cql2-json". Defaults to "cql2-text".

'cql2-text'
sortby SortByType

Sort specification. Single field or list of fields with optional direction, e.g., {"field": "published", "direction": "desc"}. Defaults to None (implementation default ordering).

None
limit LimitType

Page size (maximum number of items to return). Defaults to None (service default / configured max).

None
page PageType

1-based page index. Defaults to None (first page).

None

Returns: list[dict]: A FeatureCollection of items belonging to the specified collection, or an error message if the collection is not found.

Raises:

Type Description
HTTPException

If the authentication fails, or if there are issues with the collection ID provided.

Source code in docs/rs-server/services/prip/rs_server_prip/api/prip_search.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
@router.get("/prip/collections/{collection_id}/items", response_class=GeoJSONResponse)
async def get_prip_collection_items(
    request: Request,
    collection_id: CollectionType,
    # stac search parameters
    bbox: BBoxType = None,
    datetime: DateTimeType = None,
    filter_: FilterType = None,
    filter_lang: FilterLangType = "cql2-text",
    sortby: SortByType = None,
    limit: LimitType = None,
    page: PageType = None,
) -> list[dict] | dict:
    """
    Retrieve a list of items from a specified PRIP collection.

    This endpoint returns a collection of items associated with the given PRIP
    collection ID. It utilizes the collection ID to validate access and fetches
    the items based on defined query parameters.

    Args:
        collection_id (str): PRIP collection ID. Must be a valid collection identifier
                             (e.g., 'ins_s1').
        bbox (BBoxType, optional): Bounding box filter as four or six numbers
            [west, south, east, north] or [west, south, minz, east, north, maxz].
            Defaults to None. If provided, items intersecting the bbox are returned.

        datetime (DateTimeType, optional): Temporal filter. Either a single RFC 3339
            timestamp/interval (e.g., "2024-01-01T00:00:00Z") or a closed/open interval
            (e.g., "2024-01-01T00:00:00Z/2024-01-31T23:59:59Z", "../2024-01-01T00:00:00Z").
            Defaults to None.

        filter_ (FilterType, optional): CQL2 filter expression (text or JSON depending
            on `filter_lang`). Defaults to None.

        filter_lang (FilterLangType, optional): CQL2 language for `filter_`. One of
            "cql2-text" or "cql2-json". Defaults to "cql2-text".

        sortby (SortByType, optional): Sort specification. Single field or list of fields
            with optional direction, e.g., {"field": "published", "direction": "desc"}.
            Defaults to None (implementation default ordering).

        limit (LimitType, optional): Page size (maximum number of items to return).
            Defaults to None (service default / configured max).

        page (PageType, optional): 1-based page index.
            Defaults to None (first page).
    Returns:
        list[dict]: A FeatureCollection of items belonging to the specified collection, or an
                    error message if the collection is not found.

    Raises:
        HTTPException: If the authentication fails, or if there are issues with the
                       collection ID provided.
    """
    logger.info(f"Starting {request.url.path}")
    auth_validation(request, collection_id, "read")
    return await request.app.state.pgstac_client.item_collection(
        collection_id,
        request,
        bbox=check_bbox_input(bbox),
        datetime=datetime,
        filter_expr=filter_,
        filter_lang=filter_lang,
        sortby=[sortby] if sortby else None,
        limit=limit,
        page=page,
    )

get_prip_collection_specific_item(request, collection_id, item_id) async

Retrieve a specific item from a specified PRIP collection.

This endpoint fetches details of a specific item within the given PRIP collection by its unique item ID. It utilizes the provided collection ID and item ID to validate access and return item information.

  • collection_id (str): PRIP collection ID. Must be a valid collection identifier (e.g., 'ins_s1').
  • item_id (str): PRIP item ID. Must be a valid item identifier (e.g., 'S1A_OPER_MPL_ORBPRE_20210214T021411_20210221T021411_0001.EOF').
  • dict: A JSON object containing details of the specified item, or an error message if the item is not found.
  • HTTPException: If the authentication fails, or if the specified item is not found in the collection.

Example: A successful response will return:

{
    "id": "S1A_OPER_MPL_ORBPRE_20210214T021411_20210221T021411_0001.EOF",
    "type": "Feature",
    "properties": {
        ...  # Detailed properties of the item
    },
    "geometry": {
        ...  # Geometry details of the item
    },
    "links": [
        ...  # Links associated with the item
    ]
}
Source code in docs/rs-server/services/prip/rs_server_prip/api/prip_search.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
@router.get(path="/prip/collections/{collection_id}/items/{item_id}", response_class=GeoJSONResponse)
@handle_exceptions
async def get_prip_collection_specific_item(
    request: Request,
    collection_id: Annotated[str, FPath(title="PRIP{} collection ID.", description="E.G. ")],
    item_id: Annotated[
        str,
        FPath(
            title="PRIP Id",
            description="E.G. S1A_OPER_MPL_ORBPRE_20210214T021411_20210221T021411_0001.EOF",
        ),
    ],
) -> list[dict] | dict:
    """
    Retrieve a specific item from a specified PRIP collection.

    This endpoint fetches details of a specific item within the given PRIP collection
    by its unique item ID. It utilizes the provided collection ID and item ID to
    validate access and return item information.

    Args:
    - collection_id (str): PRIP collection ID. Must be a valid collection identifier
            (e.g., 'ins_s1').
    - item_id (str): PRIP item ID. Must be a valid item identifier
            (e.g., 'S1A_OPER_MPL_ORBPRE_20210214T021411_20210221T021411_0001.EOF').

    Returns:
    - dict: A JSON object containing details of the specified item, or an error
            message if the item is not found.

    Raises:
    - HTTPException: If the authentication fails, or if the specified item is
                    not found in the collection.

    Example:
    A successful response will return: \n
        {
            "id": "S1A_OPER_MPL_ORBPRE_20210214T021411_20210221T021411_0001.EOF",
            "type": "Feature",
            "properties": {
                ...  # Detailed properties of the item
            },
            "geometry": {
                ...  # Geometry details of the item
            },
            "links": [
                ...  # Links associated with the item
            ]
        }

    """
    logger.info(f"Starting {request.url.path}")
    auth_validation(request, collection_id, "read")

    # Search all the collection items then search manually for the right one.
    # TODO: allow the search function to take the item ID instead.
    try:
        item = await request.app.state.pgstac_client.get_item(item_id, collection_id, request)
    except HTTPException:  # validation error, just forward it
        raise
    except Exception as exc:  # stac_fastapi.types.errors.NotFoundError
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"PRIP item {item_id!r} not found.",
        ) from exc
    return item

get_root_catalog(request) async

Redirect to the landing page.

Source code in docs/rs-server/services/prip/rs_server_prip/api/prip_search.py
154
155
156
157
158
159
@router.get("/prip")
async def get_root_catalog(request: Request):
    """Redirect to the landing page."""
    logger.info(f"Starting {request.url.path}")
    authentication.auth_validation("prip", "landing_page", request=request)
    return await request.app.state.pgstac_client.landing_page(request=request)

home_endpoint() async

Redirect to the landing page.

Source code in docs/rs-server/services/prip/rs_server_prip/api/prip_search.py
148
149
150
151
@router.get("/", include_in_schema=False)
async def home_endpoint():
    """Redirect to the landing page."""
    return RedirectResponse("/prip")

Performs a search for products using the PRIP provider and generates a STAC Feature Collection from the products.

Parameters:

Name Type Description Default
station str

prip station identifier.

required
queryables dict

Query parameters for filtering results.

required
collection_provider Callable[[dict], str | None]

Function that determines STAC collection for a given OData entity

required
limit int

Maximum number of products to return.

required
sortby str

Sorting field with +/- prefix for ascending/descending order.

required
page int

Page number for pagination. Defaults to 1.

1
**kwargs

Additional search parameters.

{}

Returns:

Type Description
ItemCollection

stac_pydantic.ItemCollection: A STAC-compliant Feature Collection containing the search results.

Raises:

Type Description
HTTPException

If the pagination limit is less than 1.

HTTPException

If an invalid station identifier is provided (CreateProviderFailed).

HTTPException

If there is a connection error with the station (requests.exceptions.ConnectionError).

HTTPException

If there is a general failure during the process.

Source code in docs/rs-server/services/prip/rs_server_prip/api/prip_search.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def process_product_search(  # pylint: disable=too-many-locals
    station: str,
    queryables: dict,
    collection_provider: Callable[[dict], str | None],
    limit: int,
    sortby: str,
    page: int = 1,
    **kwargs,
) -> stac_pydantic.ItemCollection:
    """
    Performs a search for products using the PRIP provider and generates a STAC Feature Collection from the products.

    Args:
        station (str): prip station identifier.
        queryables (dict): Query parameters for filtering results.
        collection_provider (Callable[[dict], str | None]): Function that determines STAC collection
                                                            for a given OData entity
        limit (int): Maximum number of products to return.
        sortby (str): Sorting field with +/- prefix for ascending/descending order.
        page (int, optional): Page number for pagination. Defaults to 1.
        **kwargs: Additional search parameters.

    Returns:
        stac_pydantic.ItemCollection: A STAC-compliant Feature Collection containing the search results.

    Raises:
        HTTPException: If the pagination limit is less than 1.
        HTTPException: If an invalid station identifier is provided (`CreateProviderFailed`).
        HTTPException: If there is a connection error with the station (`requests.exceptions.ConnectionError`).
        HTTPException: If there is a general failure during the process.
    """
    try:
        products = prip_retriever.init_prip_provider(station).search(
            **validate(queryables),
            items_per_page=limit,
            sort_by=validate_sort_input(sortby),
            page=page,
            **kwargs,
        )
        collection = create_stac_collection(
            products,
            prip_odata_to_stac_template(),
            prip_stac_mapper(),
            collection_provider,
        )

        # Attach PRIP assets/download links, contentType, rels, etc.
        return prepare_collection(serialize_prip_asset(collection, products))

    except CreateProviderFailed as exception:
        logger.error(f"Failed to create EODAG provider!\n{traceback.format_exc()}")
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"Bad station identifier: {exception}",
        ) from exception
    except requests.exceptions.ConnectionError as exception:
        logger.error("Failed to connect to station!")
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail=f"Station PRIP connection error: {exception}",
        ) from exception
    except Exception as exception:  # pylint: disable=broad-exception-caught
        logger.error(f"General failure! {exception}")
        if isinstance(exception, HTTPException):
            raise
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail=f"General failure: {exception}",
        ) from exception

validate(queryables)

Function used to verify / update PRIP-specific queryables before being sent to eodag.

Source code in docs/rs-server/services/prip/rs_server_prip/api/prip_search.py
74
75
76
77
78
79
def validate(queryables: dict):
    """Function used to verify / update PRIP-specific queryables before being sent to eodag."""
    if "PublicationDate" in queryables:
        queryables["PublicationDate"] = validate_inputs_format(queryables["PublicationDate"])

    return queryables