Skip to content

rs_workflows/auxip_flow.md

<< Back to index

Auxip flow implementation.

auxip_staging(env, cql2_filter, catalog_collection_identifier, timeout_seconds=-1) async

Generic flow to retrieve a list of items matching the STAC CQL2 filter given, and to stage the ones that are not already in the catalog.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment

required
stac_query dict

CQL2 filter to select which files to stage

required
catalog_collection_identifier str

Catalog collection identifier where CADIP sessions and AUX data are staged

required
timeout_seconds int

Timeout value for the Auxip search task. Optional, if no value is given the process will run until it is completed

-1

Returns:

Name Type Description
bool bool

Return status: False if staging failed, True otherwise

ItemCollection ItemCollection | None

List of catalog Items staged from Auxip station

Source code in docs/rs-client-libraries/rs_workflows/auxip_flow.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
@flow(name="Auxip staging")
async def auxip_staging(
    env: FlowEnvArgs,
    cql2_filter: dict,
    catalog_collection_identifier: str,
    timeout_seconds: int = -1,
) -> tuple[bool, ItemCollection | None]:
    """
    Generic flow to retrieve a list of items matching the STAC CQL2 filter given, and to stage the ones
    that are not already in the catalog.

    Args:
        env (FlowEnvArgs): Prefect flow environment
        stac_query (dict): CQL2 filter to select which files to stage
        catalog_collection_identifier (str): Catalog collection identifier where CADIP sessions and AUX data are staged
        timeout_seconds (int): Timeout value for the Auxip search task.
            Optional, if no value is given the process will run until it is completed

    Returns:
        bool: Return status: False if staging failed, True otherwise
        ItemCollection: List of catalog Items staged from Auxip station
    """
    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "auxip-staging"):

        # Search Auxip products
        auxip_items: ItemCollection | None = (
            search_task.with_options(timeout_seconds=timeout_seconds if timeout_seconds >= 0 else None)
            .submit(
                flow_env.serialize(),
                auxip_cql2=cql2_filter,
                error_if_empty=False,
            )
            .result()  # type: ignore
        )

        # Stop process if search task didn't return any item
        if not auxip_items or len(auxip_items) == 0:
            logger.info("Nothing to stage: Auxip search with given filter returned empty result.")
            return True, None

        # Stage Auxip items
        staged = staging_task.submit(
            flow_env.serialize(),
            auxip_items,
            catalog_collection_identifier,
        )

        # Wait for last task to end.
        # NOTE: use .result() and not .wait() to unwrap and propagate exceptions, if any.
        staging_results = staged.result()

        # Check that all jobs monitored were successful. Otherwise, return status is "False"
        return_status = True
        for job_name in staging_results:
            job_result = staging_results[job_name]
            if "status" not in job_result or job_result["status"] != "successful":
                logger.info(
                    f"Staging job '{job_name}' with ID {job_result['jobID']} FAILED.\n"
                    f"Status: {job_result['status']} - Reason: {job_result['message']}",
                )
                logger.debug({job_name: job_result})
                return_status = False

        # Get staged items from catalog (to have the correct href)
        catalog_client: CatalogClient = flow_env.rs_client.get_catalog_client()
        catalog_items = ItemCollection(
            catalog_client.get_items(
                collection_id=catalog_collection_identifier,
                items_ids=[item.id for item in auxip_items],
            ),
        )

        # Create artifact if all jobs succeeded
        if return_status:
            logger.info("Staging successful, creating artifact with a list of staged items.")
            await acreate_markdown_artifact(
                markdown=f"{json.dumps(catalog_items.to_dict(), indent=2)}",
                key="auxiliary-files",
                description="Auxiliary files added to catalog.",
            )

        return return_status, catalog_items

auxip_staging_task(*args, **kwargs) async

See: auxip_staging

Source code in docs/rs-client-libraries/rs_workflows/auxip_flow.py
339
340
341
342
@task(name="Auxip staging")
async def auxip_staging_task(*args, **kwargs) -> tuple[bool, ItemCollection | None]:
    """See: auxip_staging"""
    return await auxip_staging.fn(*args, **kwargs)

auxip_unzip_decompress(auxip_item) async

Prefect flow used to unzip and decompress ADFS.

Source code in docs/rs-client-libraries/rs_workflows/auxip_flow.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
@flow(name="Auxip unzip and decompress")
async def auxip_unzip_decompress(auxip_item: Item) -> Item:
    """Prefect flow used to unzip and decompress ADFS."""
    logger = get_run_logger()
    updated_assets = {}

    for asset_name, asset in auxip_item.assets.items():
        # After normalisation (unzip / decompress) the href is changed with the new s3 path.
        # Therefore asset name should also be updated for supported archive types.
        if asset_name.endswith(archive_suffixes):
            new_href = await process_asset(asset.href, asset_name)
            asset.href = new_href
            updated_assets[strip_archive_suffix(asset_name)] = asset
        else:
            updated_assets[asset_name] = asset

    logger.info(f"Updated the following asset {updated_assets} for item {auxip_item.id}")
    auxip_item.assets = updated_assets
    return auxip_item

auxip_unzip_decompress_task(*args, **kwargs) async

See: auxip_unzip_decompress

Source code in docs/rs-client-libraries/rs_workflows/auxip_flow.py
345
346
347
348
@task(name="Auxip unzip and decompress")
async def auxip_unzip_decompress_task(*args, **kwargs) -> Item:
    """See: auxip_unzip_decompress"""
    return await auxip_unzip_decompress.fn(*args, **kwargs)

on_demand_auxip_staging(env, start_datetime, end_datetime, product_type, catalog_collection_identifier) async

Flow to retrieve Auxip files using a ValCover filter with the given time interval defined by start_datetime and end_datetime, select only the type of files wanted if eopf_type is given, stage the files and add STAC items into the catalog. Informations on ValCover filter: https://pforge-exchange2.astrium.eads.net/confluence/display/COPRS/4.+External+data+selection+policies

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment

required
start_datetime datetime | str

Start datetime for the time interval used to filter the files (select a date or directly enter a timestamp, e.g. "2025-08-07T11:51:12.509000Z")

required
end_datetime datetime | str

End datetime for the time interval used to filter the files (select a date or directly enter a timestamp, e.g. "2025-08-10T14:00:00.509000Z")

required
product_type str

Auxiliary file type wanted

required
catalog_collection_identifier str

Catalog collection identifier where CADIP sessions and AUX data are staged

required

Returns:

Name Type Description
bool bool

Return status: False if staging failed, True otherwise

ItemCollection ItemCollection | None

List of Items retrieved from the Auxip search and staged to the catalog

Source code in docs/rs-client-libraries/rs_workflows/auxip_flow.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
@flow(name="On-demand Auxip staging")
async def on_demand_auxip_staging(
    env: FlowEnvArgs,
    start_datetime: datetime.datetime | str,
    end_datetime: datetime.datetime | str,
    product_type: str,
    catalog_collection_identifier: str,
) -> tuple[bool, ItemCollection | None]:
    """
    Flow to retrieve Auxip files using a ValCover filter with the given time interval defined by
    start_datetime and end_datetime, select only the type of files wanted if eopf_type is given, stage
    the files and add STAC items into the catalog.
    Informations on ValCover filter:
    https://pforge-exchange2.astrium.eads.net/confluence/display/COPRS/4.+External+data+selection+policies

    Args:
        env: Prefect flow environment
        start_datetime: Start datetime for the time interval used to filter the files
            (select a date or directly enter a timestamp, e.g. "2025-08-07T11:51:12.509000Z")
        end_datetime: End datetime for the time interval used to filter the files
            (select a date or directly enter a timestamp, e.g. "2025-08-10T14:00:00.509000Z")
        product_type: Auxiliary file type wanted
        catalog_collection_identifier: Catalog collection identifier where CADIP sessions and AUX data are staged

    Returns:
        bool: Return status: False if staging failed, True otherwise
        ItemCollection: List of Items retrieved from the Auxip search and staged to the catalog
    """

    # CQL2 filter: we use a filter combining a ValCover filter and a product type filter
    cql2_filter = create_valcover_filter(start_datetime, end_datetime, product_type)

    return await auxip_staging.fn(
        env=env,
        cql2_filter={"filter": cql2_filter},
        catalog_collection_identifier=catalog_collection_identifier,
    )

process_asset(asset_href, asset_name) async

Process an archived AUXIP asset stored in S3 and replace it with its extracted content.

If the asset href points to a .zip, .tar, .tgz, or .tar.gz object in S3, the archive is downloaded to a temporary local directory and extracted. If the extracted content contains nested .tar, .tgz, or .tar.gz archives, those archives are also extracted in place.

The extracted payload is then uploaded back to the same S3 parent prefix using a folder-like target derived from the original ZIP name. In this context, "normalization" means replacing the original archive object with the extracted directory content under its corresponding S3 prefix.

Example: - input href: s3://bucket/path/some_adfs.zip - extracted content: file.xml and content.tar.gz - final S3 result: s3://bucket/path/some_adfs/ containing file.xml and the extracted content from content.tar.gz

The function returns the new S3 prefix pointing to the extracted content.

Source code in docs/rs-client-libraries/rs_workflows/auxip_flow.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
async def process_asset(asset_href: str, asset_name: str) -> str:
    """
    Process an archived AUXIP asset stored in S3 and replace it with its extracted content.

    If the asset href points to a `.zip`, `.tar`, `.tgz`, or `.tar.gz` object in S3,
    the archive is downloaded to a temporary local directory and extracted. If the
    extracted content contains nested `.tar`, `.tgz`, or `.tar.gz` archives, those
    archives are also extracted in place.

    The extracted payload is then uploaded back to the same S3 parent prefix using a
    folder-like target derived from the original ZIP name. In this context,
    "normalization" means replacing the original archive object with the extracted
    directory content under its corresponding S3 prefix.

    Example:
    - input href: `s3://bucket/path/some_adfs.zip`
    - extracted content: `file.xml` and `content.tar.gz`
    - final S3 result: `s3://bucket/path/some_adfs/` containing `file.xml` and the
    extracted content from `content.tar.gz`

    The function returns the new S3 prefix pointing to the extracted content.
    """
    logger = get_run_logger()
    logger.info(f"Processing asset: {asset_href}")
    is_zip_asset = asset_name.endswith(".zip")
    is_tar_asset = asset_name.endswith((".tar", ".tgz", ".tar.gz"))

    if not (is_zip_asset or is_tar_asset):
        msg = f"Unsupported archive type for asset: {asset_href}"
        raise ValueError(msg)

    with tempfile.TemporaryDirectory() as tmp_dir:
        tmp_dir = Path(tmp_dir)  # type: ignore

        archive_local = tmp_dir / ("archive.zip" if is_zip_asset else Path(asset_href).name)  # type: ignore
        extract_dir = tmp_dir / "extracted"  # type: ignore
        extract_dir.mkdir()

        # 1. Download
        logger.info(f"Downloading {asset_href} -> {archive_local}")
        await s3_download_file(asset_href, archive_local)

        # 2. Remove the original archive before publishing the extracted content.
        logger.info(f"Deleting original archive from S3: {asset_href}")
        s3_delete(asset_href)

        # 3. Extract the main archive first.
        if is_zip_asset:
            extract_zip(archive_local, extract_dir)
        else:
            extract_tar(archive_local, extract_dir)

        # 4. Some AUXIP deliveries contain nested TAR/TGZ/TAR.GZ payloads.
        nested_archives = recursive_extract(extract_dir)
        logger.info(f"Nested extraction complete, processed {nested_archives} archive(s)")

        # 5. Pick the most appropriate directory root for the upload step.
        upload_dir = normalize_extract_dir(extract_dir)
        logger.info(f"Selected upload root: {upload_dir}")

        # 6. Uplaod the extracted payload back to the original S3 prefix.
        prefix = get_upload_prefix(asset_href, asset_name)
        logger.info(f"Uploading to prefix: {prefix}")

        await upload_folder_flat(upload_dir, prefix)

    # Return the folder-like href that now contains the extracted content.
    return prefix

search(env, auxip_cql2, error_if_empty=False) async

Search Auxip products.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment (at least the owner_id is required)

required
auxip_cql2 dict

Auxip CQL2 filter read from the processor tasktable.

required
error_if_empty bool

Raise a ValueError if the results are empty.

False
Source code in docs/rs-client-libraries/rs_workflows/auxip_flow.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
@flow(name="Auxip search")
async def search(
    env: FlowEnvArgs,
    auxip_cql2: dict,
    error_if_empty: bool = False,
) -> ItemCollection | None:
    """
    Search Auxip products.

    Args:
        env: Prefect flow environment (at least the owner_id is required)
        auxip_cql2: Auxip CQL2 filter read from the processor tasktable.
        error_if_empty: Raise a ValueError if the results are empty.
    """
    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "auxip-search"):

        logger.info("Start Auxip search")
        auxip_client: AuxipClient = flow_env.rs_client.get_auxip_client()
        found = auxip_client.search(
            method="POST",
            stac_filter=auxip_cql2.get("filter"),
            max_items=auxip_cql2.get("limit"),
            sortby=auxip_cql2.get("sortby"),
        )
        if (not found) and error_if_empty:
            raise ValueError(
                f"No Auxip product found for CQL2 filter: {json.dumps(auxip_cql2, indent=2)}",
            )
        logger.info(f"Auxip search found {len(found)} result(s): {found.to_dict()}")
        return found

search_task(*args, **kwargs) async

See: search

Source code in docs/rs-client-libraries/rs_workflows/auxip_flow.py
333
334
335
336
@task(name="Auxip search")
async def search_task(*args, **kwargs) -> ItemCollection | None:
    """See: search"""
    return await search.fn(*args, **kwargs)

upload_folder_flat(local_folder, prefix) async

Upload all files under local_folder to the same S3 prefix.

The upload is intentionally flattened: only the filename is kept in the destination key, regardless of the original nested local path.

Source code in docs/rs-client-libraries/rs_workflows/auxip_flow.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
async def upload_folder_flat(local_folder: Path, prefix: str):
    """
    Upload all files under ``local_folder`` to the same S3 prefix.

    The upload is intentionally flattened: only the filename is kept in the
    destination key, regardless of the original nested local path.
    """
    logger = get_run_logger()
    files_to_upload = [path for path in local_folder.rglob("*") if path.is_file()]

    logger.info(
        f"Preparing flat upload of {len(files_to_upload)} file(s) from {local_folder} to {prefix}",
    )

    for file_path in files_to_upload:
        s3_path = prefix + file_path.name
        logger.info(f"Uploading {file_path} -> {s3_path}")
        await s3_upload_file(file_path, s3_path)

    logger.info(f"Finished uploading {len(files_to_upload)} file(s) to {prefix}")