Skip to content

ADGS

Module for interacting with ADGS system through a FastAPI APIRouter.

This module provides functionality to retrieve a list of products from the ADGS stations. It includes an API endpoint, utility functions, and initialization for accessing EODataAccessGateway.

search_products(request, datetime, limit=1000, sortby='-created')

Endpoint to handle the search for products in the AUX station within a specified time interval.

This function validates the input 'datetime' format, performs a search for products using the ADGS provider, writes the search results to the database, and generates a STAC Feature Collection from the products.

Parameters:

Name Type Description Default
request Request

The request object (unused).

required
datetime str

Time interval in ISO 8601 format.

required
limit int

Maximum number of products to return. Defaults to 1000.

1000
sortby str

Sort by +/-fieldName (ascending/descending). Defaults to "-datetime".

'-created'

Returns:

Type Description
list[dict] | dict

list[dict] | dict: A list of STAC Feature Collections or an error message. If no products are found in the specified time range, returns an empty list.

Raises:

Type Description
HTTPException(exceptions)

If the pagination limit is less than 1.

HTTPException(exceptions)

If there is a bad station identifier (CreateProviderFailed).

HTTPException(exceptions)

If there is a database connection error (sqlalchemy.exc.OperationalError).

HTTPException(exceptions)

If there is a connection error to the station.

HTTPException(exceptions)

If there is a general failure during the process.

Source code in docs/rs-server/services/adgs/rs_server_adgs/api/adgs_search.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@router.get("/adgs/aux/search")
@apikey_validator(station="adgs", access_type="read")
def search_products(  # pylint: disable=too-many-locals
    request: Request,  # pylint: disable=unused-argument
    datetime: Annotated[str, Query(description='Time interval e.g. "2024-01-01T00:00:00Z/2024-01-02T23:59:59Z"')],
    limit: Annotated[int, Query(description="Maximum number of products to return")] = 1000,
    sortby: Annotated[str, Query(description="Sort by +/-fieldName (ascending/descending)")] = "-created",
) -> list[dict] | dict:
    """Endpoint to handle the search for products in the AUX station within a specified time interval.

    This function validates the input 'datetime' format, performs a search for products using the ADGS provider,
    writes the search results to the database, and generates a STAC Feature Collection from the products.

    Args:
        request (Request): The request object (unused).
        datetime (str): Time interval in ISO 8601 format.
        limit (int, optional): Maximum number of products to return. Defaults to 1000.
        sortby (str, optional): Sort by +/-fieldName (ascending/descending). Defaults to "-datetime".

    Returns:
        list[dict] | dict: A list of STAC Feature Collections or an error message.
                           If no products are found in the specified time range, returns an empty list.

    Raises:
        HTTPException (fastapi.exceptions): If the pagination limit is less than 1.
        HTTPException (fastapi.exceptions): If there is a bad station identifier (CreateProviderFailed).
        HTTPException (fastapi.exceptions): If there is a database connection error (sqlalchemy.exc.OperationalError).
        HTTPException (fastapi.exceptions): If there is a connection error to the station.
        HTTPException (fastapi.exceptions): If there is a general failure during the process.
    """

    start_date, stop_date = validate_inputs_format(datetime)
    if limit < 1:
        raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Pagination cannot be less 0")

    try:
        time_range = TimeRange(start_date, stop_date)
        products = init_adgs_provider("adgs").search(time_range, items_per_page=limit)
        write_search_products_to_db(AdgsDownloadStatus, products)
        feature_template_path = ADGS_CONFIG / "ODataToSTAC_template.json"
        stac_mapper_path = ADGS_CONFIG / "adgs_stac_mapper.json"
        with (
            open(feature_template_path, encoding="utf-8") as template,
            open(stac_mapper_path, encoding="utf-8") as stac_map,
        ):
            feature_template = json.loads(template.read())
            stac_mapper = json.loads(stac_map.read())
            adgs_item_collection = create_stac_collection(products, feature_template, stac_mapper)
        logger.info("Succesfully listed and processed products from AUX station")
        return sort_feature_collection(adgs_item_collection, sortby)

    # pylint: disable=duplicate-code
    except CreateProviderFailed as exception:
        logger.error(f"Failed to create EODAG provider!\n{traceback.format_exc()}")
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"Bad station identifier: {exception}",
        ) from exception

    # pylint: disable=duplicate-code
    except sqlalchemy.exc.OperationalError as exception:
        logger.error("Failed to connect to database!")
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail=f"Database connection error: {exception}",
        ) from exception

    except requests.exceptions.ConnectionError as exception:
        logger.error("Failed to connect to station!")
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail=f"Station ADGS connection error: {exception}",
        ) from exception

    except Exception as exception:  # pylint: disable=broad-exception-caught
        logger.error("General failure!")
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail=f"General failure: {exception}",
        ) from exception

Module used to download AUX files from ADGS station.

AdgsDownloadResponse

Bases: BaseModel

Endpoint response

Source code in docs/rs-server/services/adgs/rs_server_adgs/api/adgs_download.py
69
70
71
72
class AdgsDownloadResponse(BaseModel):
    """Endpoint response"""

    started: bool

download_products(request, name, local=None, obs=None, db=Depends(get_db))

Initiate an asynchronous download process for an ADGS product using EODAG.

This endpoint triggers the download of an ADGS product identified by the given name of the file. It starts the download process in a separate thread using the start_eodag_download function and updates the product's status in the database.

Parameters:

Name Type Description Default
request Request

The request object (unused).

required
name str

AUX product name.

required
local str

Local download directory.

None
obs str

Object storage path (e.g., "s3://bucket-name/sub/dir").

None
db Session

The database connection object.

Depends(get_db)

Returns:

Name Type Description
JSONResponse responses

A JSON response indicating whether the download process has started.

Source code in docs/rs-server/services/adgs/rs_server_adgs/api/adgs_download.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
@router.get("/adgs/aux", response_model=AdgsDownloadResponse)
@apikey_validator(station="adgs", access_type="download")
def download_products(
    request: Request,  # pylint: disable=unused-argument
    name: Annotated[str, Query(description="AUX product name")],
    local: Annotated[str | None, Query(description="Local download directory")] = None,
    obs: Annotated[str | None, Query(description='Object storage path e.g. "s3://bucket-name/sub/dir"')] = None,
    db: Session = Depends(get_db),
):
    """Initiate an asynchronous download process for an ADGS product using EODAG.

    This endpoint triggers the download of an ADGS product identified by the given
    name of the file. It starts the download process in a separate thread
    using the start_eodag_download function and updates the product's status in the database.

    Args:
        request (Request): The request object (unused).
        name (str): AUX product name.
        local (str, optional): Local download directory.
        obs (str, optional): Object storage path (e.g., "s3://bucket-name/sub/dir").
        db (Session): The database connection object.

    Returns:
        JSONResponse (starlette.responses): A JSON response indicating whether the download process has started.

    """

    try:
        db_product = AdgsDownloadStatus.get(db, name=name)
    except Exception as exception:  # pylint: disable=broad-exception-caught
        logger.error(exception)
        return JSONResponse(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            content={"started": "false"},
        )

    # Reset status to not_started
    db_product.not_started(db)

    # start a thread to run the action in background
    thread_started = threading.Event()
    # fmt: off
    eodag_args = EoDAGDownloadHandler(
        AdgsDownloadStatus, thread_started, "adgs", str(db_product.product_id),
        name, local, obs,
    )
    # fmt: on
    thread = threading.Thread(
        target=start_eodag_download,
        args=(eodag_args,),
    )
    thread.start()

    # check the start of the thread
    if not thread_started.wait(timeout=DWN_THREAD_START_TIMEOUT):
        logger.error("Download thread did not start !")
        # Try n times to update the status to FAILED in the database
        update_db(db, db_product, EDownloadStatus.FAILED, "Download thread did not start !")
        return JSONResponse(status_code=status.HTTP_408_REQUEST_TIMEOUT, content={"started": "false"})

    return JSONResponse(status_code=status.HTTP_200_OK, content={"started": "true"})

start_eodag_download(argument)

Start the eodag download process.

This function initiates the eodag download process using the provided arguments. It sets up the temporary directory where the files are to be downloaded and gets the database handler

Parameters:

Name Type Description Default
argument EoDAGDownloadHandler

An instance of EoDAGDownloadHandler containing the arguments used in the

required

downloading process

Source code in docs/rs-server/services/adgs/rs_server_adgs/api/adgs_download.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def start_eodag_download(argument: EoDAGDownloadHandler):
    """Start the eodag download process.

    This function initiates the eodag download process using the provided arguments. It sets up
    the temporary directory where the files are to be downloaded and gets the database handler

    Args:
        argument (EoDAGDownloadHandler): An instance of EoDAGDownloadHandler containing the arguments used in the
    downloading process

    """
    # Open a database sessions in this thread, because the session from the root thread may have closed.
    try:
        with tempfile.TemporaryDirectory() as default_temp_path, contextmanager(get_db)() as db:
            eodag_download(
                argument,
                db,
                init_adgs_provider,
                default_path=default_temp_path,
            )
    except Exception as e:  # pylint: disable=broad-except
        logger.error(f"Exception caught: {e}")

HTTP endpoints to get the downloading status from ADGS stations.

get_download_status(request, name, db=Depends(get_db))

Get a product download status from its ID or name.

Parameters:

Name Type Description Default
request Request

The request object (unused).

required
name str

The name of the AUX product.

required
db Session

The database connection object.

Depends(get_db)

Returns:

Name Type Description
ReadDownloadStatus DownloadStatus

The download status of the specified AUX product.

Raises:

Type Description
HTTPException

If the product is not found in the database.

Source code in docs/rs-server/services/adgs/rs_server_adgs/api/adgs_status.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@router.get("/adgs/aux/status", response_model=ReadDownloadStatus)
@apikey_validator(station="adgs", access_type="download")
def get_download_status(
    request: Request,  # pylint: disable=unused-argument
    name: Annotated[str, Query(description="AUX product name")],
    db: Session = Depends(get_db),
):
    """
    Get a product download status from its ID or name.

    Args:
        request (Request): The request object (unused).
        name (str): The name of the AUX product.
        db (Session): The database connection object.

    Returns:
        ReadDownloadStatus (DownloadStatus): The download status of the specified AUX product.

    Raises:
        HTTPException: If the product is not found in the database.
    """

    return AdgsDownloadStatus.get(name=name, db=db)