Skip to content

rs_workflows/utils/cadip.md

<< Back to index

Helper task to interact with the rs-cadip.

Search for CADIP sessions within a given time interval.

Parameters:

Name Type Description Default
env FlowEnvArgs

Flow environment arguments (e.g., owner_id, credentials).

required
cadip_collection_identifier list[str]

CADIP collection identifier (e.g., "s1_sgs") to specify the station.

required
limit int

Number maximum of STAC items to be retrieved

10

Returns:

Name Type Description
ItemCollection ItemCollection

A pystac ItemCollection containing the sessions found.

Source code in docs/rs-client-libraries/rs_workflows/utils/cadip.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@task(name="Cadip session search")
async def cadip_session_search(
    env: FlowEnvArgs,
    cadip_collection_identifier: list[str],
    limit: int = 10,
) -> ItemCollection:
    """
    Search for CADIP sessions within a given time interval.

    Parameters:
        env:
            Flow environment arguments (e.g., owner_id, credentials).
        cadip_collection_identifier:
            CADIP collection identifier (e.g., "s1_sgs") to specify the station.
        limit:
            Number maximum of STAC items to be retrieved

    Returns:
        ItemCollection:
            A pystac ItemCollection containing the sessions found.
    """
    logger = get_run_logger()

    # Initialize flow environment and telemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "cadip-search"):

        cadip_client: CadipClient = flow_env.rs_client.get_cadip_client()

        # Current time in UTC
        end_datetime: datetime = datetime.now(timezone.utc)

        # Go back 10 hours
        start_datetime: datetime = end_datetime - timedelta(hours=10)

        # Format timestamps in ISO 8601 with Z suffix
        start_str = start_datetime.strftime("%Y-%m-%dT%H:%M:%S.000Z")
        end_str = end_datetime.strftime("%Y-%m-%dT%H:%M:%S.000Z")

        # Validate input datetimes
        if not start_str or not end_str:
            raise ValueError("start_datetime or end_datetime is not set properly")

        # Build CQL2 query for temporal intersection
        cadip_cql2_query = {
            "filter": {
                "op": "t_intersects",
                "args": [
                    {"property": "published"},
                    {"interval": [start_str, end_str]},
                ],
            },
            "limit": limit,
            "sortby": [{"field": "published", "direction": "desc"}],
        }

        # Log query for debugging
        logger.info(f"CQL2 query={json.dumps(cadip_cql2_query, indent=2)}")
        logger.info("Start request on CADIP station")

        # Execute search request
        found = cadip_client.search(
            method="POST",
            collections=[cadip_collection_identifier],
            stac_filter=cadip_cql2_query.get("filter"),
            max_items=cadip_cql2_query.get("limit"),
            sortby=cadip_cql2_query.get("sortby"),
        )

        return found

get_cadip_station(flow_env, session, cadip_collections) async

Retrieve a cadip station that owns the session.

Source code in docs/rs-client-libraries/rs_workflows/utils/cadip.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@task(name="Search the cadip station that owns the session")
async def get_cadip_station(flow_env: FlowEnv, session: str, cadip_collections: list[str]) -> str | None:
    """
    Retrieve a cadip station that owns the session.
    """
    logger = get_run_logger()
    result = None

    # Initialize flow environment and telemetry span
    cadip_client: CadipClient = flow_env.rs_client.get_cadip_client()

    # Log query for debugging
    logger.info(f"Search a cadip station between [{', '.join(cadip_collections)}] looking for the session'{session}'")

    # Execute search request
    item_col: ItemCollection = cadip_client.search(
        method="GET",
        ids=[session],
        collections=cadip_collections,
        max_items=1,
        limit=1,
    )

    if len(item_col) == 1:
        # Check that the session has not been evicted
        evicted, eviction_date = is_evicted(item_col[0])
        if evicted:
            logger.error(f"❌ The session '{session}' has been evicted (eviction date = {eviction_date}) ")
        else:
            if is_published(item_col[0]):
                # Extract of the station name
                collection_links = [link for link in item_col[0].links if link.rel == "collection"]
                if collection_links:
                    href = collection_links[0].href
                    result = href.rstrip("/").split("/")[-1]
                    logger.info(f"✔️ The session '{session}' is available at station {result}")
            else:
                logger.error(f"❌ The session '{session}' has not been published yet.")
    if result is None:
        logger.info(f"❌ The session '{session}' can not be found on stations [{', '.join(cadip_collections)}]")

    return result