Skip to content

rs_workflows/auxip_flow.md

<< Back to index

Auxip flow implementation

auxip_staging(env, cql2_filter, catalog_collection_identifier, timeout_seconds=-1) async

Generic flow to retrieve a list of items matching the STAC CQL2 filter given, and to stage the ones that are not already in the catalog.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment

required
stac_query dict

CQL2 filter to select which files to stage

required
catalog_collection_identifier str

Catalog collection identifier where CADIP sessions and AUX data are staged

required
timeout_seconds int

Timeout value for the Auxip search task. Optional, if no value is given the process will run until it is completed

-1

Returns:

Name Type Description
bool bool

Return status: False if staging failed, True otherwise

ItemCollection ItemCollection | None

List of catalog Items staged from Auxip station

Source code in docs/rs-client-libraries/rs_workflows/auxip_flow.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@flow(name="Auxip staging")
async def auxip_staging(
    env: FlowEnvArgs,
    cql2_filter: dict,
    catalog_collection_identifier: str,
    timeout_seconds: int = -1,
) -> tuple[bool, ItemCollection | None]:
    """
    Generic flow to retrieve a list of items matching the STAC CQL2 filter given, and to stage the ones
    that are not already in the catalog.

    Args:
        env (FlowEnvArgs): Prefect flow environment
        stac_query (dict): CQL2 filter to select which files to stage
        catalog_collection_identifier (str): Catalog collection identifier where CADIP sessions and AUX data are staged
        timeout_seconds (int): Timeout value for the Auxip search task.
            Optional, if no value is given the process will run until it is completed

    Returns:
        bool: Return status: False if staging failed, True otherwise
        ItemCollection: List of catalog Items staged from Auxip station
    """
    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "auxip-staging"):

        # Search Auxip products
        auxip_items: ItemCollection | None = (
            search_task.with_options(timeout_seconds=timeout_seconds if timeout_seconds >= 0 else None)
            .submit(
                flow_env.serialize(),
                auxip_cql2=cql2_filter,
                error_if_empty=False,
            )
            .result()  # type: ignore
        )

        # Stop process if search task didn't return any item
        if not auxip_items or len(auxip_items) == 0:
            logger.info("Nothing to stage: Auxip search with given filter returned empty result.")
            return True, None

        # Stage Auxip items
        staged = staging_task.submit(
            flow_env.serialize(),
            auxip_items,
            catalog_collection_identifier,
        )

        # Wait for last task to end.
        # NOTE: use .result() and not .wait() to unwrap and propagate exceptions, if any.
        staging_results = staged.result()

        # Check that all jobs monitored were successful. Otherwise, return status is "False"
        return_status = True
        for job_name in staging_results:
            job_result = staging_results[job_name]
            if "status" not in job_result or job_result["status"] != "successful":
                logger.info(
                    f"Staging job '{job_name}' with ID {job_result['jobID']} FAILED.\n"
                    f"Status: {job_result['status']} - Reason: {job_result['message']}",
                )
                logger.debug({job_name: job_result})
                return_status = False

        # Get staged items from catalog (to have the correct href)
        catalog_client: CatalogClient = flow_env.rs_client.get_catalog_client()
        catalog_items = ItemCollection(
            catalog_client.get_items(
                collection_id=catalog_collection_identifier,
                items_ids=[item.id for item in auxip_items],
            ),
        )

        # Create artifact if all jobs succeeded
        if return_status:
            logger.info("Staging successful, creating artifact with a list of staged items.")
            await acreate_markdown_artifact(
                markdown=f"{json.dumps(catalog_items.to_dict(), indent=2)}",
                key="auxiliary-files",
                description="Auxiliary files added to catalog.",
            )

        return return_status, catalog_items

auxip_staging_task(*args, **kwargs) async

See: auxip_staging

Source code in docs/rs-client-libraries/rs_workflows/auxip_flow.py
209
210
211
212
@task(name="Auxip staging")
async def auxip_staging_task(*args, **kwargs) -> tuple[bool, ItemCollection | None]:
    """See: auxip_staging"""
    return await auxip_staging.fn(*args, **kwargs)

on_demand_auxip_staging(env, start_datetime, end_datetime, product_type, catalog_collection_identifier) async

Flow to retrieve Auxip files using a ValCover filter with the given time interval defined by start_datetime and end_datetime, select only the type of files wanted if eopf_type is given, stage the files and add STAC items into the catalog. Informations on ValCover filter: https://pforge-exchange2.astrium.eads.net/confluence/display/COPRS/4.+External+data+selection+policies

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment

required
start_datetime datetime | str

Start datetime for the time interval used to filter the files (select a date or directly enter a timestamp, e.g. "2025-08-07T11:51:12.509000Z")

required
end_datetime datetime | str

End datetime for the time interval used to filter the files (select a date or directly enter a timestamp, e.g. "2025-08-10T14:00:00.509000Z")

required
product_type str

Auxiliary file type wanted

required
catalog_collection_identifier str

Catalog collection identifier where CADIP sessions and AUX data are staged

required

Returns:

Name Type Description
bool bool

Return status: False if staging failed, True otherwise

ItemCollection ItemCollection | None

List of Items retrieved from the Auxip search and staged to the catalog

Source code in docs/rs-client-libraries/rs_workflows/auxip_flow.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@flow(name="On-demand Auxip staging")
async def on_demand_auxip_staging(
    env: FlowEnvArgs,
    start_datetime: datetime.datetime | str,
    end_datetime: datetime.datetime | str,
    product_type: str,
    catalog_collection_identifier: str,
) -> tuple[bool, ItemCollection | None]:
    """
    Flow to retrieve Auxip files using a ValCover filter with the given time interval defined by
    start_datetime and end_datetime, select only the type of files wanted if eopf_type is given, stage
    the files and add STAC items into the catalog.
    Informations on ValCover filter:
    https://pforge-exchange2.astrium.eads.net/confluence/display/COPRS/4.+External+data+selection+policies

    Args:
        env: Prefect flow environment
        start_datetime: Start datetime for the time interval used to filter the files
            (select a date or directly enter a timestamp, e.g. "2025-08-07T11:51:12.509000Z")
        end_datetime: End datetime for the time interval used to filter the files
            (select a date or directly enter a timestamp, e.g. "2025-08-10T14:00:00.509000Z")
        product_type: Auxiliary file type wanted
        catalog_collection_identifier: Catalog collection identifier where CADIP sessions and AUX data are staged

    Returns:
        bool: Return status: False if staging failed, True otherwise
        ItemCollection: List of Items retrieved from the Auxip search and staged to the catalog
    """

    # CQL2 filter: we use a filter combining a ValCover filter and a product type filter
    cql2_filter = create_valcover_filter(start_datetime, end_datetime, product_type)

    return await auxip_staging.fn(
        env=env,
        cql2_filter={"filter": cql2_filter},
        catalog_collection_identifier=catalog_collection_identifier,
    )

search(env, auxip_cql2, error_if_empty=False) async

Search Auxip products.

Parameters:

Name Type Description Default
env FlowEnvArgs

Prefect flow environment (at least the owner_id is required)

required
auxip_cql2 dict

Auxip CQL2 filter read from the processor tasktable.

required
error_if_empty bool

Raise a ValueError if the results are empty.

False
Source code in docs/rs-client-libraries/rs_workflows/auxip_flow.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@flow(name="Auxip search")
async def search(
    env: FlowEnvArgs,
    auxip_cql2: dict,
    error_if_empty: bool = False,
) -> ItemCollection | None:
    """
    Search Auxip products.

    Args:
        env: Prefect flow environment (at least the owner_id is required)
        auxip_cql2: Auxip CQL2 filter read from the processor tasktable.
        error_if_empty: Raise a ValueError if the results are empty.
    """
    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "auxip-search"):

        logger.info("Start Auxip search")
        auxip_client: AuxipClient = flow_env.rs_client.get_auxip_client()
        found = auxip_client.search(
            method="POST",
            stac_filter=auxip_cql2.get("filter"),
            max_items=auxip_cql2.get("limit"),
            sortby=auxip_cql2.get("sortby"),
        )
        if (not found) and error_if_empty:
            raise ValueError(
                f"No Auxip product found for CQL2 filter: {json.dumps(auxip_cql2, indent=2)}",
            )
        logger.info(f"Auxip search found {len(found)} results: {found}")
        return found

search_task(*args, **kwargs) async

See: search

Source code in docs/rs-client-libraries/rs_workflows/auxip_flow.py
203
204
205
206
@task(name="Auxip search")
async def search_task(*args, **kwargs) -> ItemCollection | None:
    """See: search"""
    return await search.fn(*args, **kwargs)