Module to parse CQL2 filter expressions, adapted from pgstac.sql,
see https://github.com/stac-utils/pgstac/blob/main/src/pgstac/pgstac.sql
parse_dtrange(_indate, relative_base=None)
parse datetime range
Source code in docs/rs-server/services/common/rs_server_common/stac_cql2.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 | def parse_dtrange( # noqa: C901 # pylint: disable=too-many-branches
_indate: str | dict | list,
relative_base: datetime | None = None,
) -> tuple[datetime, datetime]:
"""parse datetime range"""
if relative_base is None:
relative_base = datetime.now(timezone.utc)
if isinstance(_indate, str):
try:
_indate = json.loads(_indate)
except json.JSONDecodeError:
_indate = [_indate]
if isinstance(_indate, dict):
if "timestamp" in _indate:
timestrs = [_indate["timestamp"]]
elif "interval" in _indate:
timestrs = _indate["interval"] if isinstance(_indate["interval"], list) else [_indate["interval"]]
else:
timestrs = re.split(r"/", _indate.get("0", ""))
elif isinstance(_indate, list):
timestrs = _indate
else:
raise ValueError(f"Invalid input format: {_indate}")
if len(timestrs) == 1:
if timestrs[0].upper().startswith("P"):
delta = parse_interval(timestrs[0])
return (relative_base - delta, relative_base)
s = datetime.fromisoformat(timestrs[0])
return (s, s)
if len(timestrs) != 2:
raise ValueError(f"Timestamp cannot have more than 2 values: {timestrs}")
if timestrs[0] in ["..", ""]:
s = datetime.min
e = datetime.fromisoformat(timestrs[1])
elif timestrs[1] in ["..", ""]:
s = datetime.fromisoformat(timestrs[0])
e = datetime.max
elif timestrs[0].upper().startswith("P") and not timestrs[1].upper().startswith("P"):
e = datetime.fromisoformat(timestrs[1])
s = e - parse_interval(timestrs[0])
elif timestrs[1].upper().startswith("P") and not timestrs[0].upper().startswith("P"):
s = datetime.fromisoformat(timestrs[0])
e = s + parse_interval(timestrs[1])
else:
s = datetime.fromisoformat(timestrs[0])
e = datetime.fromisoformat(timestrs[1])
return (s, e)
|
parse_interval(interval)
parse interval
Source code in docs/rs-server/services/common/rs_server_common/stac_cql2.py
98
99
100
101
102
103
104 | def parse_interval(interval: str) -> timedelta:
"""parse interval"""
match = re.match(r"P(?:(\d+)D)?(?:T(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?)?", interval.upper())
if match:
days, hours, minutes, seconds = (int(v) if v else 0 for v in match.groups())
return timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds)
raise ValueError(f"Invalid interval format: {interval}")
|
temporal_op_query(op, args, temporal_mapping, is_cadip=False)
temporal operation query
Source code in docs/rs-server/services/common/rs_server_common/stac_cql2.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 | def temporal_op_query(op: str, args: list[dict], temporal_mapping: dict[str, str], is_cadip: bool = False) -> str:
"""temporal operation query"""
if op.lower() not in temporal_operations:
raise ValueError(f"Invalid temporal operator: {op}")
if not temporal_mapping:
raise ValueError("Undefined temporal property mapping")
props: list[dict] = args[0]["interval"] if "interval" in args[0].keys() else [args[0]]
rrange = parse_dtrange(args[1])
outq = (
temporal_operations[op.lower()]
.replace("ll", temporal_mapping[props[0]["property"]])
.replace("lh", temporal_mapping[props[1 if len(props) > 1 else 0]["property"]])
.replace("rl", strftime_millis(rrange[0]))
.replace("rh", strftime_millis(rrange[1]))
# Note: lte and gte are currently not supported in Cadip stations, so we use lt and gt instead
# Whenever this gets fixed, remove the two ifs below and the "is_cadip" input
.replace("<=", "lte" if not is_cadip else "lt")
.replace(">=", "gte" if not is_cadip else "gt")
.replace("=", "eq")
.replace("<", "lt")
.replace(">", "gt")
)
return f"({outq})"
|