Skip to content

rs_server_common/stac_cql2.md

<< Back to index

Module to parse CQL2 filter expressions, adapted from pgstac.sql, see https://github.com/stac-utils/pgstac/blob/main/src/pgstac/pgstac.sql

parse_dtrange(_indate, relative_base=None)

parse datetime range

Source code in docs/rs-server/services/common/rs_server_common/stac_cql2.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def parse_dtrange(  # noqa: C901 # pylint: disable=too-many-branches
    _indate: str | dict | list,
    relative_base: datetime | None = None,
) -> tuple[datetime, datetime]:
    """parse datetime range"""
    if relative_base is None:
        relative_base = datetime.now(timezone.utc)

    if isinstance(_indate, str):
        try:
            _indate = json.loads(_indate)
        except json.JSONDecodeError:
            _indate = [_indate]

    if isinstance(_indate, dict):
        if "timestamp" in _indate:
            timestrs = [_indate["timestamp"]]
        elif "interval" in _indate:
            timestrs = _indate["interval"] if isinstance(_indate["interval"], list) else [_indate["interval"]]
        else:
            timestrs = re.split(r"/", _indate.get("0", ""))
    elif isinstance(_indate, list):
        timestrs = _indate
    else:
        raise ValueError(f"Invalid input format: {_indate}")

    if len(timestrs) == 1:
        if timestrs[0].upper().startswith("P"):
            delta = parse_interval(timestrs[0])
            return (relative_base - delta, relative_base)
        s = datetime.fromisoformat(timestrs[0])
        return (s, s)

    if len(timestrs) != 2:
        raise ValueError(f"Timestamp cannot have more than 2 values: {timestrs}")

    if timestrs[0] in ["..", ""]:
        s = datetime.min
        e = datetime.fromisoformat(timestrs[1])
    elif timestrs[1] in ["..", ""]:
        s = datetime.fromisoformat(timestrs[0])
        e = datetime.max
    elif timestrs[0].upper().startswith("P") and not timestrs[1].upper().startswith("P"):
        e = datetime.fromisoformat(timestrs[1])
        s = e - parse_interval(timestrs[0])
    elif timestrs[1].upper().startswith("P") and not timestrs[0].upper().startswith("P"):
        s = datetime.fromisoformat(timestrs[0])
        e = s + parse_interval(timestrs[1])
    else:
        s = datetime.fromisoformat(timestrs[0])
        e = datetime.fromisoformat(timestrs[1])

    return (s, e)

parse_interval(interval)

parse interval

Source code in docs/rs-server/services/common/rs_server_common/stac_cql2.py
 98
 99
100
101
102
103
104
def parse_interval(interval: str) -> timedelta:
    """parse interval"""
    match = re.match(r"P(?:(\d+)D)?(?:T(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?)?", interval.upper())
    if match:
        days, hours, minutes, seconds = (int(v) if v else 0 for v in match.groups())
        return timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds)
    raise ValueError(f"Invalid interval format: {interval}")

temporal_op_query(op, args, temporal_mapping, is_cadip=False)

temporal operation query

Source code in docs/rs-server/services/common/rs_server_common/stac_cql2.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def temporal_op_query(op: str, args: list[dict], temporal_mapping: dict[str, str], is_cadip: bool = False) -> str:
    """temporal operation query"""
    if op.lower() not in temporal_operations:
        raise ValueError(f"Invalid temporal operator: {op}")
    if not temporal_mapping:
        raise ValueError("Undefined temporal property mapping")

    props: list[dict] = args[0]["interval"] if "interval" in args[0].keys() else [args[0]]
    rrange = parse_dtrange(args[1])
    outq = (
        temporal_operations[op.lower()]
        .replace("ll", temporal_mapping[props[0]["property"]])
        .replace("lh", temporal_mapping[props[1 if len(props) > 1 else 0]["property"]])
        .replace("rl", strftime_millis(rrange[0]))
        .replace("rh", strftime_millis(rrange[1]))
        # Note: lte and gte are currently not supported in Cadip stations, so we use lt and gt instead
        # Whenever this gets fixed, remove the two ifs below and the "is_cadip" input
        .replace("<=", "lte" if not is_cadip else "lt")
        .replace(">=", "gte" if not is_cadip else "gt")
        .replace("=", "eq")
        .replace("<", "lt")
        .replace(">", "gt")
    )

    return f"({outq})"