Skip to content

rs_workflows/aux_flow.md

<< Back to index

Auxiliary product search and staging flows.

aux_staging(env, cql2_filter, catalog_collection_identifier, timeout_seconds=-1, source=AuxiliarySource.AUXIP, selected_assets=None) async

Generic flow to retrieve a list of items matching the STAC CQL2 filter given, and to stage the ones that are not already in the catalog.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment

required
stac_query dict

CQL2 filter to select which files to stage

required
catalog_collection_identifier str

Catalog collection identifier where auxiliary data are staged

required
timeout_seconds int

Timeout value for the AUX search task. Optional, if no value is given the process will run until it is completed

-1
source str

STAC source where auxiliary products are searched. Default: auxip.

AUXIP
selected_assets list[str] | None

Optional asset keys to stage.

None

Returns:

Name Type Description
bool bool

Return status: False if staging failed, True otherwise

ItemCollection ItemCollection | None

List of catalog Items staged from AUX station

Source code in docs/rs-client-libraries/rs_workflows/aux_flow.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
@flow(name="stage-aux")
async def aux_staging(
    env: FlowEnvArgs,
    cql2_filter: dict,
    catalog_collection_identifier: str,
    timeout_seconds: int = -1,
    source: AuxiliarySource = AuxiliarySource.AUXIP,
    selected_assets: list[str] | None = None,
) -> tuple[bool, ItemCollection | None]:
    """
    Generic flow to retrieve a list of items matching the STAC CQL2 filter given, and to stage the ones
    that are not already in the catalog.

    Args:
        env (FlowEnvArgs): Prefect flow environment
        stac_query (dict): CQL2 filter to select which files to stage
        catalog_collection_identifier (str): Catalog collection identifier where auxiliary data are staged
        timeout_seconds (int): Timeout value for the AUX search task.
            Optional, if no value is given the process will run until it is completed
        source (str): STAC source where auxiliary products are searched. Default: auxip.
        selected_assets: Optional asset keys to stage.

    Returns:
        bool: Return status: False if staging failed, True otherwise
        ItemCollection: List of catalog Items staged from AUX station
    """
    logger = get_run_logger()
    logger.setLevel(logging.DEBUG)

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "aux-staging"):

        # Search AUX products
        aux_items: ItemCollection | None = (
            search_task.with_options(timeout_seconds=timeout_seconds if timeout_seconds >= 0 else None)
            .submit(
                flow_env.serialize(),
                aux_cql2=cql2_filter,
                error_if_empty=False,
                source=source,
            )
            .result()  # type: ignore
        )

        # Stop process if search task didn't return any item
        if not aux_items or len(aux_items) == 0:
            logger.info("Nothing to stage: AUX search with given filter returned empty result.")
            return True, None

        # Catalog results are already staged items, so pass them directly to the next processing step.
        if source == AuxiliarySource.CATALOG:
            logger.info("AUX items found in catalog; skipping staging.")
            return True, aux_items

        # Stage AUX items
        asset_names = selected_assets or ({"product"} if source == AuxiliarySource.CDSE else None)
        staged = staging_task.submit(
            flow_env.serialize(),
            aux_items,
            catalog_collection_identifier,
            asset_names=asset_names,
        )

        # Wait for last task to end.
        # NOTE: use .result() and not .wait() to unwrap and propagate exceptions, if any.
        staging_results = staged.result()

        # Check that all jobs monitored were successful. Otherwise, return status is "False"
        return_status = True
        for job_name in staging_results:
            job_result = staging_results[job_name]
            if "status" not in job_result or job_result["status"] != "successful":
                logger.info(
                    f"❌ Staging job '{job_name}' with ID {job_result['jobID']} FAILED.\n"
                    f"Status: {job_result['status']} - Reason: {job_result['message']}",
                )
                logger.debug({job_name: job_result})
                return_status = False

        # Get staged items from catalog (to have the correct href)
        catalog_client: CatalogClient = flow_env.rs_client.get_catalog_client()
        catalog_items = ItemCollection(
            catalog_client.get_items(
                collection_id=catalog_collection_identifier,
                items_ids=[item.id for item in aux_items],
            ),
        )

        # Create artifact if all jobs succeeded
        if return_status:
            logger.info("✅ Staging successful, creating artifact with a list of staged items.")
            artifact_key_name: str = "auxiliary-stac-item"
            md = "# Auxiliary file \n\n```json\n" + json.dumps(catalog_items.to_dict(), indent=2) + "\n```"
            await acreate_markdown_artifact(
                markdown=md,
                key=artifact_key_name,
                description="Auxiliary files added to catalog.",
            )
            logger.info(f"📌 Artifact named '{artifact_key_name}' has been linked to this flow.")

        return return_status, catalog_items

aux_staging_task(*args, **kwargs) async

See: aux_staging

Source code in docs/rs-client-libraries/rs_workflows/aux_flow.py
244
245
246
247
@task(name="stage-aux")
async def aux_staging_task(*args, **kwargs) -> tuple[bool, ItemCollection | None]:
    """See: aux_staging"""
    return await aux_staging.fn(*args, **kwargs)

aux_unzip_decompress_task(*args, **kwargs) async

See: aux_unzip_decompress

Source code in docs/rs-client-libraries/rs_workflows/aux_flow.py
250
251
252
253
@task(name="Aux unzip and decompress")
async def aux_unzip_decompress_task(*args, **kwargs) -> Item:
    """See: aux_unzip_decompress"""
    return await asset_unzip_decompress.fn(*args, **kwargs)

on_demand_aux_staging(env, start_datetime, end_datetime, product_type, catalog_collection_identifier) async

Flow to retrieve AUX files using a ValCover filter with the given time interval defined by start_datetime and end_datetime, select only the type of files wanted if eopf_type is given, stage the files and add STAC items into the catalog. Informations on ValCover filter: https://pforge-exchange2.astrium.eads.net/confluence/display/COPRS/4.+External+data+selection+policies

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment

required
start_datetime datetime | str

Start datetime for the time interval used to filter the files (select a date or directly enter a timestamp, e.g. "2025-08-07T11:51:12.509000Z")

required
end_datetime datetime | str

End datetime for the time interval used to filter the files (select a date or directly enter a timestamp, e.g. "2025-08-10T14:00:00.509000Z")

required
product_type str

Auxiliary file type wanted

required
catalog_collection_identifier str

Catalog collection identifier where auxiliary data are staged

required

Returns:

Name Type Description
bool bool

Return status: False if staging failed, True otherwise

ItemCollection ItemCollection | None

List of Items retrieved from the AUX search and staged to the catalog

Source code in docs/rs-client-libraries/rs_workflows/aux_flow.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
@flow(name="On-demand AUX staging")
async def on_demand_aux_staging(
    env: FlowEnvArgs,
    start_datetime: datetime.datetime | str,
    end_datetime: datetime.datetime | str,
    product_type: str,
    catalog_collection_identifier: str,
) -> tuple[bool, ItemCollection | None]:
    """
    Flow to retrieve AUX files using a ValCover filter with the given time interval defined by
    start_datetime and end_datetime, select only the type of files wanted if eopf_type is given, stage
    the files and add STAC items into the catalog.
    Informations on ValCover filter:
    https://pforge-exchange2.astrium.eads.net/confluence/display/COPRS/4.+External+data+selection+policies

    Args:
        env: Prefect flow environment
        start_datetime: Start datetime for the time interval used to filter the files
            (select a date or directly enter a timestamp, e.g. "2025-08-07T11:51:12.509000Z")
        end_datetime: End datetime for the time interval used to filter the files
            (select a date or directly enter a timestamp, e.g. "2025-08-10T14:00:00.509000Z")
        product_type: Auxiliary file type wanted
        catalog_collection_identifier: Catalog collection identifier where auxiliary data are staged

    Returns:
        bool: Return status: False if staging failed, True otherwise
        ItemCollection: List of Items retrieved from the AUX search and staged to the catalog
    """

    # CQL2 filter: we use a filter combining a ValCover filter and a product type filter
    cql2_filter = create_valcover_filter(start_datetime, end_datetime, product_type)

    return await aux_staging.fn(
        env=env,
        cql2_filter={"filter": cql2_filter},
        catalog_collection_identifier=catalog_collection_identifier,
    )

search(env, aux_cql2, error_if_empty=False, source=AuxiliarySource.AUXIP) async

Search AUX products.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment (at least the owner_id is required)

required
aux_cql2 dict

AUX CQL2 filter read from the processor tasktable.

required
error_if_empty bool

Raise a ValueError if the results are empty.

False
source AuxiliarySource

STAC source where auxiliary products are searched. Default: auxip.

AUXIP
Source code in docs/rs-client-libraries/rs_workflows/aux_flow.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@flow(name="search-aux")
async def search(
    env: FlowEnvArgs,
    aux_cql2: dict,
    error_if_empty: bool = False,
    source: AuxiliarySource = AuxiliarySource.AUXIP,
) -> ItemCollection | None:
    """
    Search AUX products.

    Args:
        env: Prefect flow environment (at least the owner_id is required)
        aux_cql2: AUX CQL2 filter read from the processor tasktable.
        error_if_empty: Raise a ValueError if the results are empty.
        source: STAC source where auxiliary products are searched. Default: auxip.
    """
    return await stac.search(
        env=env,
        cql2=aux_cql2,
        span_name="aux-search",
        stac_client_selector=lambda flow_env: select_search_client_and_kwargs(flow_env, source),
        error_if_empty=error_if_empty,
        start_log_message=f"Start AUX search from {source.value}: {aux_cql2}",
    )

search_task(*args, **kwargs) async

See: search

Source code in docs/rs-client-libraries/rs_workflows/aux_flow.py
238
239
240
241
@task(name="search-aux")
async def search_task(*args, **kwargs) -> ItemCollection | None:
    """See: search"""
    return await search.fn(*args, **kwargs)

select_search_client_and_kwargs(flow_env, source)

Select the STAC client and source-specific search kwargs.

Source code in docs/rs-client-libraries/rs_workflows/aux_flow.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def select_search_client_and_kwargs(
    flow_env: FlowEnv,
    source: AuxiliarySource,
) -> tuple[StacBase, dict[str, Any]]:
    """Select the STAC client and source-specific search kwargs."""
    if source == AuxiliarySource.AUXIP:
        return flow_env.rs_client.get_auxip_client(), {}
    if source == AuxiliarySource.CATALOG:
        return flow_env.rs_client.get_catalog_client(), {"owner_id": flow_env.owner_id}
    if source == AuxiliarySource.PRIP:
        return flow_env.rs_client.get_prip_client(), {}
    if source == AuxiliarySource.CADIP:
        return flow_env.rs_client.get_cadip_client(), {}
    if source == AuxiliarySource.CDSE:
        return flow_env.rs_client.get_cdse_client(), {}
    if source == AuxiliarySource.EARTHDATAHUB:
        return flow_env.rs_client.get_earthdatahub_client(), {}
    if source == AuxiliarySource.LTA:
        raise ValueError(
            "LTA auxiliary product search is not supported yet: rs-server does not expose an LTA STAC service "
            "endpoint and rs-client-libraries does not provide an LtaClient.",
        )
    raise ValueError(f"Unsupported auxiliary product source: {source.value!r}")