Skip to content

rs_server_cadip/cadip_utils.md

<< Back to index

Module for interacting with CADU system through a FastAPI APIRouter.

This module provides functionality to retrieve a list of products from the CADU system for a specified station. It includes an API endpoint, utility functions, and initialization for accessing EODataAccessGateway.

cadip_map_mission(platform, constellation)

Map STAC platform and constellation into OData Satellite.

Input: platform = sentinel-1a Output: A Input: constellation = sentinel-1 Output: A, B, C

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def cadip_map_mission(platform: str, constellation: str) -> str | None:
    """
    Map STAC platform and constellation into OData Satellite.

    Input: platform = sentinel-1a       Output: A
    Input: constellation = sentinel-1   Output: A, B, C
    """
    data: dict = map_stac_platform()
    satellite: str | None = None
    satellites: str | None = None
    try:
        if platform:
            config = next(sat[platform] for sat in data["satellites"] if platform in sat)
            satellite = config.get("code", None)
        if constellation:
            satellites = ",".join(
                [
                    satellite_info["code"]
                    for satellite in data["satellites"]
                    for satellite_info in satellite.values()
                    if satellite_info.get("constellation") == constellation
                ],
            )
            if satellite and satellite not in satellites:
                raise HTTPException(
                    status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
                    detail="Invalid combination of platform-constellation",
                )
    except (KeyError, IndexError, StopIteration) as exc:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
            detail="Cannot map platform/constellation",
        ) from exc
    return satellite or satellites

cadip_odata_to_stac_template() cached

Used each time to read the ODataToSTAC_template json template.

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
54
55
56
57
58
59
@lru_cache
def cadip_odata_to_stac_template():
    """Used each time to read the ODataToSTAC_template json template."""
    with open(CADIP_CONFIG / "ODataToSTAC_template.json", encoding="utf-8") as mapper:
        config = json.loads(mapper.read())
    return config  # WARNING: if the caller wants to modify this cached object, he must deepcopy it first

cadip_reverse_map_mission(platform)

Function used to re-map platform and constellation based on satellite value.

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
212
213
214
215
216
217
218
219
220
def cadip_reverse_map_mission(platform: str | None) -> tuple[str | None, str | None]:
    """Function used to re-map platform and constellation based on satellite value."""
    if not platform:
        return None, None
    for satellite in map_stac_platform()["satellites"]:
        for key, info in satellite.items():
            if info.get("code") == platform:
                return key, info.get("constellation")
    return None, None

cadip_session_odata_to_stac_template() cached

Used each time to read the cadip_session_ODataToSTAC_template json template.

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
62
63
64
65
66
67
@lru_cache
def cadip_session_odata_to_stac_template():
    """Used each time to read the cadip_session_ODataToSTAC_template json template."""
    with open(CADIP_CONFIG / "cadip_session_ODataToSTAC_template.json", encoding="utf-8") as mapper:
        config = json.loads(mapper.read())
    return config  # WARNING: if the caller wants to modify this cached object, he must deepcopy it first

cadip_session_stac_mapper() cached

Used each time to read the cadip_sessions_stac_mapper json config.

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
78
79
80
81
82
83
@lru_cache
def cadip_session_stac_mapper():
    """Used each time to read the cadip_sessions_stac_mapper json config."""
    with open(CADIP_CONFIG / "cadip_sessions_stac_mapper.json", encoding="utf-8") as mapper:
        config = json.loads(mapper.read())
    return config  # WARNING: if the caller wants to modify this cached object, he must deepcopy it first

cadip_stac_mapper() cached

Used each time to read the cadip_stac_mapper json config.

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
70
71
72
73
74
75
@lru_cache
def cadip_stac_mapper():
    """Used each time to read the cadip_stac_mapper json config."""
    with open(CADIP_CONFIG / "cadip_stac_mapper.json", encoding="utf-8") as mapper:
        config = json.loads(mapper.read())
    return config  # WARNING: if the caller wants to modify this cached object, he must deepcopy it first

from_session_expand_to_assets_serializer(feature_collection, input_session, mapper)

Associate all expanded files with session from feature_collection and create a stac_pydantic.Asset for each file.

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def from_session_expand_to_assets_serializer(
    feature_collection: ItemCollection,
    input_session: eodag.EOProduct,
    mapper: dict,
) -> ItemCollection:
    """
    Associate all expanded files with session from feature_collection and create a stac_pydantic.Asset for each file.
    """
    for session in feature_collection.features:
        # Iterate over products and map them to assets
        for product in input_session:
            if product.properties["SessionId"] == session.id:
                # Create Asset
                asset: Asset = map_dag_file_to_asset(mapper, product, product.properties["href"])
                # Add Asset to Item.
                session.assets.update({asset.title or "": asset})
        # Remove processed products from input_session
        input_session = [product for product in input_session if product.properties["SessionId"] != session.id]

    return feature_collection

from_session_expand_to_dag_serializer(input_sessions)

Convert a list of sessions containing expanded files metadata into a list of files for serialization into the DB.

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
128
129
130
131
132
133
134
135
136
137
138
def from_session_expand_to_dag_serializer(input_sessions: list[eodag.EOProduct]) -> list[eodag.EOProduct]:
    """
    Convert a list of sessions containing expanded files metadata into a list of files for serialization into the DB.
    """
    return [
        eodag.EOProduct(
            provider="internal_session_product_file_from_cadip",
            properties=update_product(session.properties),
        )
        for session in input_sessions
    ]

Update input session items with associated assets based on session id property.

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def link_assets_to_session(session_features: list[Item], asset_items: list[dict]):
    """Update input session items with associated assets based on session id property."""
    # Validity check to be later added.
    for feature in session_features:
        matching_assets = [asset_item for asset_item in asset_items if feature.id == asset_item["SessionId"]]
        for asset_item in matching_assets:
            asset_dict = {
                map_key: asset_item[map_value]
                for map_key, map_value in cadip_stac_mapper().items()
                if map_value in asset_item
            }
            asset: Asset = Asset(title=asset_dict.pop("id"), roles=["cadu"], **asset_dict)
            if asset.title:
                feature.assets.update({asset.title: asset})
            else:
                logger.error(f"Ignored CADU asset without title: {asset}")
        try:
            properties: ItemProperties = feature.properties
            start_date = properties.start_datetime
            end_date = max(
                (
                    datetime.fromisoformat(item["PublicationDate"].replace("Z", "")).replace(tzinfo=timezone.utc)
                    for item in matching_assets
                ),
                default=None,
            )
            # https://github.com/radiantearth/stac-spec/blob/master/commons/common-metadata.md#date-and-time-range
            # Using one of the fields REQUIRES inclusion of the other field as well to enable a user to search STAC
            # records by the provided times. So if you use start_datetime you need to add end_datetime and vice-versa.
            if start_date and end_date:
                properties.end_datetime = strftime_millis(end_date)  # type: ignore
            elif start_date or end_date:
                logger.warning(f"{feature.id} has only one time range property: {start_date}/{end_date}")
                properties.start_datetime = None
                properties.end_datetime = None
        except ValueError as e:
            logger.warning(f"Cannot update start/end datetime for {feature.id}: {e}")
            continue

map_dag_file_to_asset(mapper, product, href)

This function is used to map extended files from odata to stac format.

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
120
121
122
123
124
125
def map_dag_file_to_asset(mapper: dict, product: eodag.EOProduct, href: str) -> Asset:
    """This function is used to map extended files from odata to stac format."""
    asset = {map_key: product.properties[map_value] for map_key, map_value in mapper.items()}
    href = re.sub(r"\([^\)]*\)", f'({product.properties["id"]})', href)
    asset.pop("id")
    return Asset(href=href, roles=["cadu"], title=product.properties["Name"], **asset)

prepare_collection(collection)

Used to create a more complex mapping on platform/constallation from odata to stac.

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
263
264
265
266
267
268
269
def prepare_collection(collection: ItemCollection) -> ItemCollection:
    """Used to create a more complex mapping on platform/constallation from odata to stac."""
    for feature in collection.features:
        feature.properties.platform, feature.properties.constellation = cadip_reverse_map_mission(
            feature.properties.platform,
        )
    return collection

read_conf() cached

Used each time to read RSPY_CADIP_SEARCH_CONFIG config yaml.

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
45
46
47
48
49
50
51
@lru_cache
def read_conf():
    """Used each time to read RSPY_CADIP_SEARCH_CONFIG config yaml."""
    cadip_search_config = os.environ.get("RSPY_CADIP_SEARCH_CONFIG", str(search_yaml.absolute()))
    with open(cadip_search_config, encoding="utf-8") as search_conf:
        config = yaml.safe_load(search_conf)
    return config  # WARNING: if the caller wants to modify this cached object, he must deepcopy it first

rename_keys(product)

Rename keys in the product dictionary. To match eodag specific properties key name (id / startTime..)

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
103
104
105
106
107
108
109
def rename_keys(product: dict) -> dict:
    """Rename keys in the product dictionary. To match eodag specific properties key name (id / startTime..)"""
    if "Id" in product:
        product["id"] = product.pop("Id")
    if "PublicationDate" in product:
        product["startTimeFromAscendingNode"] = product["PublicationDate"]
    return product

select_config(configuration_id)

Used to select a specific configuration from yaml file, returns None if not found.

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
86
87
88
89
90
91
def select_config(configuration_id: str) -> dict | None:
    """Used to select a specific configuration from yaml file, returns None if not found."""
    return next(
        (item for item in read_conf()["collections"] if item["id"] == configuration_id),
        None,
    )

stac_to_odata(stac_params)

Convert a parameter directory from STAC keys to OData keys. Return the new directory.

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
 94
 95
 96
 97
 98
 99
100
def stac_to_odata(stac_params: dict) -> dict:
    """Convert a parameter directory from STAC keys to OData keys. Return the new directory."""
    return {
        cadip_session_stac_mapper().get(stac_key, stac_key): value
        for stac_key, value in stac_params.items()
        if value is not None
    }

update_product(product)

Update product with renamed keys and default geometry.

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
112
113
114
115
116
117
def update_product(product: dict) -> dict:
    """Update product with renamed keys and default geometry."""
    product = rename_keys(product)
    product.update(DEFAULT_GEOM)
    product["href"] = re.sub(r"\([^\)]*\)", f'({product["id"]})', product["href"])
    return product

validate_products(products)

Function used to remove all miconfigured outputs.

Source code in docs/rs-server/services/cadip/rs_server_cadip/cadip_utils.py
163
164
165
166
167
168
169
170
171
172
173
def validate_products(products: eodag.EOProduct):
    """Function used to remove all miconfigured outputs."""
    valid_eo_products = []
    for product in products:
        try:
            str(product)
            valid_eo_products.append(product)
        except eodag.utils.exceptions.MisconfiguredError as e:
            logger.warning(e)
            continue
    return valid_eo_products