Skip to content

CADIP

Module for interacting with CADU system through a FastAPI APIRouter.

This module provides functionality to retrieve a list of products from the CADU system for a specified station. It includes an API endpoint, utility functions, and initialization for accessing EODataAccessGateway.

search_products(request, datetime='', station=FPath(description='CADIP station identifier (MTI, SGS, MPU, INU, etc)'), session_id='', limit=1000, sortby='-created')

Endpoint to retrieve a list of products from the CADU system for a specified station.

This function validates the input 'datetime' format, performs a search for products using the CADIP provider, writes the search results to the database, and generates a STAC Feature Collection from the products.

Parameters:

Name Type Description Default
request Request

The request object (unused).

required
datetime str

Time interval in ISO 8601 format.

''
station str

CADIP station identifier (e.g., MTI, SGS, MPU, INU).

Path(description='CADIP station identifier (MTI, SGS, MPU, INU, etc)')
session_id str

Session from which file belong.

''
limit int

Maximum number of products to return. Defaults to 1000.

1000
sortby str

Sort by +/-fieldName (ascending/descending). Defaults to "-datetime".

'-created'

Returns:

Type Description
list[dict] | dict

list[dict] | dict: A list of STAC Feature Collections or an error message. If no products are found in the specified time range, returns an empty list.

Raises:

Type Description
HTTPException(exceptions)

If the pagination limit is less than 1.

HTTPException(exceptions)

If there is a bad station identifier (CreateProviderFailed).

HTTPException(exceptions)

If there is a database connection error (sqlalchemy.exc.OperationalError).

HTTPException(exceptions)

If there is a connection error to the station.

HTTPException(exceptions)

If there is a general failure during the process.

Source code in docs/rs-server/services/cadip/rs_server_cadip/api/cadip_search.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@router.get("/cadip/{station}/cadu/search")
@apikey_validator(station="cadip", access_type="read")
def search_products(  # pylint: disable=too-many-locals, too-many-arguments
    request: Request,  # pylint: disable=unused-argument
    datetime: Annotated[str, Query(description='Time interval e.g "2024-01-01T00:00:00Z/2024-01-02T23:59:59Z"')] = "",
    station: str = FPath(description="CADIP station identifier (MTI, SGS, MPU, INU, etc)"),
    session_id: Annotated[str, Query(description="Session from which file belong")] = "",
    limit: Annotated[int, Query(description="Maximum number of products to return")] = 1000,
    sortby: Annotated[str, Query(description="Sort by +/-fieldName (ascending/descending)")] = "-created",
) -> list[dict] | dict:
    """Endpoint to retrieve a list of products from the CADU system for a specified station.

    This function validates the input 'datetime' format, performs a search for products using the CADIP provider,
    writes the search results to the database, and generates a STAC Feature Collection from the products.

    Args:
        request (Request): The request object (unused).
        datetime (str): Time interval in ISO 8601 format.
        station (str): CADIP station identifier (e.g., MTI, SGS, MPU, INU).
        session_id (str): Session from which file belong.
        limit (int, optional): Maximum number of products to return. Defaults to 1000.
        sortby (str, optional): Sort by +/-fieldName (ascending/descending). Defaults to "-datetime".

    Returns:
        list[dict] | dict: A list of STAC Feature Collections or an error message.
                           If no products are found in the specified time range, returns an empty list.

    Raises:
        HTTPException (fastapi.exceptions): If the pagination limit is less than 1.
        HTTPException (fastapi.exceptions): If there is a bad station identifier (CreateProviderFailed).
        HTTPException (fastapi.exceptions): If there is a database connection error (sqlalchemy.exc.OperationalError).
        HTTPException (fastapi.exceptions): If there is a connection error to the station.
        HTTPException (fastapi.exceptions): If there is a general failure during the process.
    """
    if not (datetime or session_id):
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Missing search parameters")
    start_date, stop_date = validate_inputs_format(datetime)
    session: Union[List[str], str] = [sid.strip() for sid in session_id.split(",")] if "," in session_id else session_id
    if limit < 1:
        raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Pagination cannot be less 0")
    # Init dataretriever / get products / return
    try:
        products = init_cadip_provider(station).search(
            TimeRange(start_date, stop_date),
            id=session,
            items_per_page=limit,
        )
        write_search_products_to_db(CadipDownloadStatus, products)
        feature_template_path = CADIP_CONFIG / "ODataToSTAC_template.json"
        stac_mapper_path = CADIP_CONFIG / "cadip_stac_mapper.json"
        with (
            open(feature_template_path, encoding="utf-8") as template,
            open(stac_mapper_path, encoding="utf-8") as stac_map,
        ):
            feature_template = json.loads(template.read())
            stac_mapper = json.loads(stac_map.read())
            cadip_item_collection = create_stac_collection(products, feature_template, stac_mapper)
        logger.info("Succesfully listed and processed products from CADIP station")
        return sort_feature_collection(cadip_item_collection, sortby)

    # pylint: disable=duplicate-code
    except CreateProviderFailed as exception:
        logger.error(f"Failed to create EODAG provider!\n{traceback.format_exc()}")
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=f"Bad station identifier: {exception}",
        ) from exception

    # pylint: disable=duplicate-code
    except sqlalchemy.exc.OperationalError as exception:
        logger.error("Failed to connect to database!")
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail=f"Database connection error: {exception}",
        ) from exception

    except requests.exceptions.ConnectionError as exception:
        logger.error("Failed to connect to station!")
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail=f"Station {station} connection error: {exception}",
        ) from exception

    except Exception as exception:  # pylint: disable=broad-exception-caught
        logger.error("General failure!")
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail=f"General failure: {exception}",
        ) from exception

search_session(request, station=FPath(description='CADIP station identifier (MTI, SGS, MPU, INU, etc)'), id=None, platform=None, start_date=None, stop_date=None)

Endpoint to retrieve a list of sessions from any CADIP station.

A valid session search request must contain at least a value for either id, platform, or a time interval (start_date and stop_date correctly defined).

Parameters:

Name Type Description Default
request Request

The request object (unused).

required
station str

CADIP station identifier (e.g., MTI, SGS, MPU, INU).

Path(description='CADIP station identifier (MTI, SGS, MPU, INU, etc)')
id str

Session identifier(s), comma-separated. Defaults to None.

None
platform str

Satellite identifier(s), comma-separated. Defaults to None.

None
start_date str

Start time in ISO 8601 format. Defaults to None.

None
stop_date str

Stop time in ISO 8601 format. Defaults to None.

None

Returns:

Name Type Description
dict dict

A STAC Feature Collection of the sessions.

Raises:

Type Description
HTTPException(exceptions)

If search parameters are missing.

HTTPException(exceptions)

If there is a JSON mapping error.

HTTPException(exceptions)

If there is a value error during mapping.

Source code in docs/rs-server/services/cadip/rs_server_cadip/api/cadip_search.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
@router.get("/cadip/{station}/session")
@apikey_validator(station="cadip", access_type="read")
def search_session(
    request: Request,  # pylint: disable=unused-argument
    station: str = FPath(description="CADIP station identifier (MTI, SGS, MPU, INU, etc)"),
    id: Annotated[
        Union[str, None],
        Query(
            description='Session identifier eg: "S1A_20200105072204051312" or '
            '"S1A_20200105072204051312, S1A_20220715090550123456"',
        ),
    ] = None,
    platform: Annotated[Union[str, None], Query(description='Satellite identifier eg: "S1A" or "S1A, S1B"')] = None,
    start_date: Annotated[Union[str, None], Query(description='Start time e.g. "2024-01-01T00:00:00Z"')] = None,
    stop_date: Annotated[Union[str, None], Query(description='Stop time e.g. "2024-01-01T00:00:00Z"')] = None,
):  # pylint: disable=too-many-arguments, too-many-locals
    """Endpoint to retrieve a list of sessions from any CADIP station.

    A valid session search request must contain at least a value for either *id*, *platform*, or a time interval
    (*start_date* and *stop_date* correctly defined).

    Args:
        request (Request): The request object (unused).
        station (str): CADIP station identifier (e.g., MTI, SGS, MPU, INU).
        id (str, optional): Session identifier(s), comma-separated. Defaults to None.
        platform (str, optional): Satellite identifier(s), comma-separated. Defaults to None.
        start_date (str, optional): Start time in ISO 8601 format. Defaults to None.
        stop_date (str, optional): Stop time in ISO 8601 format. Defaults to None.

    Returns:
        dict (dict): A STAC Feature Collection of the sessions.

    Raises:
        HTTPException (fastapi.exceptions): If search parameters are missing.
        HTTPException (fastapi.exceptions): If there is a JSON mapping error.
        HTTPException (fastapi.exceptions): If there is a value error during mapping.
    """
    session_id: Union[List[str], str, None] = [sid.strip() for sid in id.split(",")] if (id and "," in id) else id
    satellite: Union[List[str], None] = platform.split(",") if platform else None
    time_interval = validate_inputs_format(f"{start_date}/{stop_date}") if start_date and stop_date else (None, None)

    if not (session_id or satellite or (time_interval[0] and time_interval[1])):
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Missing search parameters")

    try:
        products = init_cadip_provider(f"{station}_session").search(
            TimeRange(*time_interval),
            id=session_id,  # pylint: disable=redefined-builtin
            platform=satellite,
            sessions_search=True,
        )
        products = validate_products(products)
        sessions_products = from_session_expand_to_dag_serializer(products)
        write_search_products_to_db(CadipDownloadStatus, sessions_products)
        feature_template_path = CADIP_CONFIG / "cadip_session_ODataToSTAC_template.json"
        stac_mapper_path = CADIP_CONFIG / "cadip_sessions_stac_mapper.json"
        expanded_session_mapper_path = CADIP_CONFIG / "cadip_stac_mapper.json"
        with (
            open(feature_template_path, encoding="utf-8") as template,
            open(stac_mapper_path, encoding="utf-8") as stac_map,
            open(expanded_session_mapper_path, encoding="utf-8") as expanded_session_mapper,
        ):
            feature_template = json.loads(template.read())
            stac_mapper = json.loads(stac_map.read())
            expanded_session_mapper = json.loads(expanded_session_mapper.read())
            cadip_sessions_collection = create_stac_collection(products, feature_template, stac_mapper)
            cadip_sessions_collection = from_session_expand_to_assets_serializer(
                cadip_sessions_collection,
                sessions_products,
                expanded_session_mapper,
                request,
            )
            return cadip_sessions_collection
    # except [OSError, FileNotFoundError] as exception:
    #     return HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"Error: {exception}")
    except json.JSONDecodeError as exception:
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail=f"JSON Map Error: {exception}",
        ) from exception
    except ValueError as exception:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail="Unable to map OData to STAC.",
        ) from exception

Module used to download CADU files from CADIP stations.

CadipDownloadResponse

Bases: BaseModel

Endpoint response

Source code in docs/rs-server/services/cadip/rs_server_cadip/api/cadip_download.py
79
80
81
82
class CadipDownloadResponse(BaseModel):
    """Endpoint response"""

    started: bool

download_products(request, name, station=FPath(description='CADIP station identifier (MTI, SGS, MPU, INU, etc)'), local=None, obs=None, db=Depends(get_db))

Initiate an asynchronous download process for a CADU product using EODAG.

This endpoint triggers the download of a CADU product identified by the given name of the file. It starts the download process in a separate thread using the start_eodag_download function and updates the product's status in the database.

Parameters:

Name Type Description Default
request Request

The request object (unused).

required
name str

CADU product name.

required
station str

CADIP station identifier (e.g., MTI, SGS, MPU, INU).

Path(description='CADIP station identifier (MTI, SGS, MPU, INU, etc)')
local str

Local download directory. Defaults to None.

None
obs str

Object storage path (e.g., "s3://bucket-name/sub/dir"). Defaults to None.

None
db Session

The database connection object.

Depends(get_db)

Returns:

Name Type Description
JSONResponse responses

A JSON response indicating whether the download process has started.

Raises:

Type Description
HTTPException

If the product is not found in the database.

HTTPException

If the download thread fails to start.

Source code in docs/rs-server/services/cadip/rs_server_cadip/api/cadip_download.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@router.get("/cadip/{station}/cadu", response_model=CadipDownloadResponse)
@apikey_validator(station="cadip", access_type="download")
def download_products(
    request: Request,  # pylint: disable=unused-argument
    name: Annotated[str, Query(description="CADU product name")],
    station: str = FPath(description="CADIP station identifier (MTI, SGS, MPU, INU, etc)"),
    local: Annotated[str | None, Query(description="Local download directory")] = None,
    obs: Annotated[str | None, Query(description='Object storage path e.g. "s3://bucket-name/sub/dir"')] = None,
    db: Session = Depends(get_db),
):  # pylint: disable=too-many-arguments
    """Initiate an asynchronous download process for a CADU product using EODAG.

    This endpoint triggers the download of a CADU product identified by the given
    name of the file. It starts the download process in a separate thread
    using the start_eodag_download function and updates the product's status in the database.

    Args:
        request (Request): The request object (unused).
        name (str): CADU product name.
        station (str): CADIP station identifier (e.g., MTI, SGS, MPU, INU).
        local (str, optional): Local download directory. Defaults to None.
        obs (str, optional): Object storage path (e.g., "s3://bucket-name/sub/dir"). Defaults to None.
        db (Session): The database connection object.

    Returns:
        JSONResponse (starlette.responses): A JSON response indicating whether the download process has started.

    Raises:
        HTTPException: If the product is not found in the database.
        HTTPException: If the download thread fails to start.
    """

    # Get the product download status from database
    try:
        db_product = CadipDownloadStatus.get(db, name=name)
    except Exception as exception:  # pylint: disable=broad-exception-caught
        logger.error(exception)
        return JSONResponse(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            content={"started": "false"},
        )

    # Reset status to not_started
    db_product.not_started(db)

    # start a thread to run the action in background
    logger.debug(
        "%s : %s : %s: MAIN THREAD: Starting thread, local = %s",
        os.getpid(),
        threading.get_ident(),
        datetime.now(),
        locals(),
    )

    thread_started = Event()
    # fmt: off
    # Skip this function call formatting to avoid the following error: pylint R0801: Similar lines in 2 files
    eodag_args = EoDAGDownloadHandler(
        CadipDownloadStatus, thread_started, station.lower(), str(db_product.product_id),
        name, local, obs,
    )
    # fmt: on
    # Big note / TODO here
    # Is there a mechanism to catch / capture return value from a function running inside a thread?
    # If start_eodag_download throws an error, there is no simple solution to return it with FastAPI
    thread = threading.Thread(
        target=start_eodag_download,
        args=(eodag_args,),
    )
    thread.start()

    # check the start of the thread
    if not thread_started.wait(timeout=DWN_THREAD_START_TIMEOUT):
        logger.error("Download thread did not start !")
        # Try n times to update the status to FAILED in the database
        update_db(db, db_product, EDownloadStatus.FAILED, "Download thread did not start !")
        return JSONResponse(status_code=status.HTTP_408_REQUEST_TIMEOUT, content={"started": "false"})

    return JSONResponse(status_code=status.HTTP_200_OK, content={"started": "true"})

start_eodag_download(argument)

Start the eodag download process.

This function initiates the eodag download process using the provided arguments. It sets up the temporary directory where the files are to be downloaded and gets the database handler

Parameters:

Name Type Description Default
argument EoDAGDownloadHandler

An instance of EoDAGDownloadHandler containing

required

the arguments used in the downloading process

Source code in docs/rs-server/services/cadip/rs_server_cadip/api/cadip_download.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def start_eodag_download(argument: EoDAGDownloadHandler):
    """Start the eodag download process.

    This function initiates the eodag download process using the provided arguments. It sets up
    the temporary directory where the files are to be downloaded and gets the database handler

    Args:
        argument (EoDAGDownloadHandler): An instance of EoDAGDownloadHandler containing
    the arguments used in the downloading process


    """

    # Open a database sessions in this thread, because the session from the root thread may have closed.
    try:
        with tempfile.TemporaryDirectory() as default_temp_path, contextmanager(get_db)() as db:
            eodag_download(
                argument,
                db,
                init_cadip_provider,
                default_path=default_temp_path,
            )
    except Exception as e:  # pylint: disable=broad-except
        logger.error(f"Exception caught: {e}")

HTTP endpoints to get the downloading status from CADIP stations.

get_download_status(request, name, db=Depends(get_db), station=FPath(description='CADIP station identifier (MTI, SGS, MPU, INU, etc)'))

Get the download status of a CADU product by its name.

This endpoint retrieves the download status of a CADU product from the database using the provided product name.

Parameters:

Name Type Description Default
request Request

The request object (unused).

required
name str

CADU product name.

required
db Session

The database connection object.

Depends(get_db)
station str

CADIP station identifier (e.g., MTI, SGS, MPU, INU).

Path(description='CADIP station identifier (MTI, SGS, MPU, INU, etc)')

Returns:

Name Type Description
ReadDownloadStatus DownloadStatus

The download status of the product.

Source code in docs/rs-server/services/cadip/rs_server_cadip/api/cadip_status.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@router.get("/cadip/{station}/cadu/status", response_model=ReadDownloadStatus)
@apikey_validator(station="cadip", access_type="download")
def get_download_status(
    request: Request,  # pylint: disable=unused-argument
    name: Annotated[str, Query(description="CADU product name")],
    db: Session = Depends(get_db),
    station: str = FPath(  # pylint: disable=unused-argument
        description="CADIP station identifier (MTI, SGS, MPU, INU, etc)",
    ),
):
    """
    Get the download status of a CADU product by its name.

    This endpoint retrieves the download status of a CADU product from the database
    using the provided product name.

    Args:
        request (Request): The request object (unused).
        name (str): CADU product name.
        db (Session): The database connection object.
        station (str): CADIP station identifier (e.g., MTI, SGS, MPU, INU).

    Returns:
        ReadDownloadStatus (DownloadStatus): The download status of the product.
    """

    return CadipDownloadStatus.get(name=name, db=db)