Skip to content

rs_server_common/stac_cql2.md

<< Back to index

Module to parse CQL2 filter expressions, adapted from pgstac.sql, see https://github.com/stac-utils/pgstac/blob/main/src/pgstac/pgstac.sql

extract_interval(args)

extract literal interval and its argument position.

Source code in docs/rs-server/services/common/rs_server_common/stac_cql2.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def extract_interval(args):
    """extract literal interval and its argument position."""
    for index, a in enumerate(args):
        if not (isinstance(a, dict) and "interval" in a):
            continue

        interval = a["interval"]

        if isinstance(interval, list) and interval and isinstance(interval[0], str):
            return index, a

    # Handle simple timestamp literal operators such as t_before
    for index, a in enumerate(args):
        if isinstance(a, str):
            return index, a

    raise ValueError(f"No literal interval in args: {args}")

extract_properties(args)

extract property

Source code in docs/rs-server/services/common/rs_server_common/stac_cql2.py
117
118
119
120
121
122
123
124
125
126
127
128
129
def extract_properties(args):
    """extract property"""
    props = [a for a in args if isinstance(a, dict) and "property" in a]

    if not props:
        for a in args:
            if isinstance(a, dict) and "interval" in a and isinstance(a["interval"], list):
                props.extend(p for p in a["interval"] if isinstance(p, dict) and "property" in p)

    if not props:
        raise ValueError(f"No temporal property in args: {args}")

    return props

parse_dtrange(_indate, relative_base=None)

parse datetime range

Source code in docs/rs-server/services/common/rs_server_common/stac_cql2.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def parse_dtrange(  # noqa: C901 # pylint: disable=too-many-branches
    _indate: str | dict | list,
    relative_base: datetime | None = None,
) -> tuple[datetime, datetime]:
    """parse datetime range"""
    if relative_base is None:
        relative_base = datetime.now(timezone.utc)

    if isinstance(_indate, str):
        try:
            _indate = json.loads(_indate)
        except json.JSONDecodeError:
            _indate = [_indate]

    if isinstance(_indate, dict):
        if "timestamp" in _indate:
            timestrs = [_indate["timestamp"]]
        elif "interval" in _indate:
            timestrs = _indate["interval"] if isinstance(_indate["interval"], list) else [_indate["interval"]]
        else:
            timestrs = re.split(r"/", _indate.get("0", ""))
    elif isinstance(_indate, list):
        timestrs = _indate
    else:
        raise ValueError(f"Invalid input format: {_indate}")

    if len(timestrs) == 1:
        if timestrs[0].upper().startswith("P"):
            delta = parse_interval(timestrs[0])
            return (relative_base - delta, relative_base)
        s = datetime.fromisoformat(timestrs[0])
        return (s, s)

    if len(timestrs) != 2:
        raise ValueError(f"Timestamp cannot have more than 2 values: {timestrs}")

    if timestrs[0] in ["..", ""]:
        s = datetime.min
        e = datetime.fromisoformat(timestrs[1])
    elif timestrs[1] in ["..", ""]:
        s = datetime.fromisoformat(timestrs[0])
        e = datetime.max
    elif timestrs[0].upper().startswith("P") and not timestrs[1].upper().startswith("P"):
        e = datetime.fromisoformat(timestrs[1])
        s = e - parse_interval(timestrs[0])
    elif timestrs[1].upper().startswith("P") and not timestrs[0].upper().startswith("P"):
        s = datetime.fromisoformat(timestrs[0])
        e = s + parse_interval(timestrs[1])
    else:
        s = datetime.fromisoformat(timestrs[0])
        e = datetime.fromisoformat(timestrs[1])

    return (s, e)

parse_interval(interval)

parse interval

Source code in docs/rs-server/services/common/rs_server_common/stac_cql2.py
154
155
156
157
158
159
160
def parse_interval(interval: str) -> timedelta:
    """parse interval"""
    match = re.match(r"P(?:(\d+)D)?(?:T(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?)?", interval.upper())
    if match:
        days, hours, minutes, seconds = (int(v) if v else 0 for v in match.groups())
        return timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds)
    raise ValueError(f"Invalid interval format: {interval}")

swap_temporal_sides(operation)

Swap left and right temporal operands in an operation template.

Source code in docs/rs-server/services/common/rs_server_common/stac_cql2.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def swap_temporal_sides(operation: str) -> str:
    """Swap left and right temporal operands in an operation template."""
    operation = (
        operation.replace("ll", "__LEFT_LOW__")
        .replace("lh", "__LEFT_HIGH__")
        .replace("rl", "ll")
        .replace("rh", "lh")
        .replace("__LEFT_LOW__", "rl")
        .replace("__LEFT_HIGH__", "rh")
    )
    for right in ("rl", "rh"):
        for left in ("ll", "lh"):
            operation = (
                operation.replace(f"{right} <= {left}", f"{left} >= {right}")
                .replace(f"{right} >= {left}", f"{left} <= {right}")
                .replace(f"{right} < {left}", f"{left} > {right}")
                .replace(f"{right} > {left}", f"{left} < {right}")
                .replace(f"{right} = {left}", f"{left} = {right}")
            )
    return operation

temporal_op_query(op, args, temporal_mapping)

temporal operation query

Source code in docs/rs-server/services/common/rs_server_common/stac_cql2.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def temporal_op_query(op: str, args: list[dict], temporal_mapping: dict[str, str]) -> str:
    """temporal operation query"""
    if op.lower() not in temporal_operations:
        raise ValueError(f"Invalid temporal operator: {op}")
    if not temporal_mapping:
        raise ValueError("Undefined temporal property mapping")

    props = extract_properties(args)
    interval_position, interval = extract_interval(args)
    rrange = parse_dtrange(interval)

    operation = temporal_operations[op.lower()]
    if interval_position == 0:
        operation = swap_temporal_sides(operation)

    outq = (
        operation.replace("ll", temporal_mapping[props[0]["property"]])
        .replace("lh", temporal_mapping[props[1 if len(props) > 1 else 0]["property"]])
        .replace("rl", strftime_millis(rrange[0]))
        .replace("rh", strftime_millis(rrange[1]))
        .replace("<=", "le")
        .replace(">=", "ge")
        .replace("=", "eq")
        .replace("<", "lt")
        .replace(">", "gt")
    )

    return f"({outq})"