Skip to content

rs_common/utils.md

<< Back to index

This module is used to share common functions between apis

AuthInfo dataclass

User authentication information in Keycloak.

Source code in docs/rs-client-libraries/rs_common/utils.py
29
30
31
32
33
34
35
36
37
38
39
40
@dataclass
class AuthInfo:
    """User authentication information in Keycloak."""

    # User login (preferred username)
    user_login: str

    # IAM roles
    iam_roles: list[str]

    # Oauth2 attributes and/or custom `config` associated to the API key
    attributes: dict[str, Any]

create_valcover_filter(start_datetime, end_datetime, product_type)

Creates a ValCover filter from the input values to be used in flows

Parameters:

Name Type Description Default
start_datetime datetime | str

Start datetime for the time interval used to filter the files

required
end_datetime datetime | str

End datetime for the time interval used to filter the files

required
product_type str

Auxiliary file type wanted

required

Returns:

Name Type Description
dict dict

ValCover filter

Source code in docs/rs-client-libraries/rs_common/utils.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def create_valcover_filter(
    start_datetime: datetime | str,
    end_datetime: datetime | str,
    product_type: str,
) -> dict:
    """Creates a ValCover filter from the input values to be used in flows

    Args:
        start_datetime: Start datetime for the time interval used to filter the files
        end_datetime: End datetime for the time interval used to filter the files
        product_type: Auxiliary file type wanted

    Returns:
        dict: ValCover filter
    """
    # Convert datetime inputs to str
    if isinstance(start_datetime, datetime):
        start_datetime = strftime_millis(start_datetime)
    if isinstance(end_datetime, datetime):
        end_datetime = strftime_millis(end_datetime)

    # CQL2 filter: we use a filter combining a ValCover filter and a product type filter
    return {
        "op": "and",
        "args": [
            {"op": "=", "args": [{"property": "product:type"}, product_type]},
            {
                "op": "t_contains",
                "args": [
                    {"interval": [{"property": "start_datetime"}, {"property": "end_datetime"}]},
                    {"interval": [start_datetime, end_datetime]},
                ],
            },
        ],
    }

env_bool(var, default)

Return True if an environemnt variable is set to 1, true or yes (case insensitive). Return False if set to 0, false or no (case insensitive). Return the default value if not set or set to a different value.

Source code in docs/rs-client-libraries/rs_common/utils.py
67
68
69
70
71
72
73
74
75
76
77
78
def env_bool(var: str, default: bool) -> bool:
    """
    Return True if an environemnt variable is set to 1, true or yes (case insensitive).
    Return False if set to 0, false or no (case insensitive).
    Return the default value if not set or set to a different value.
    """
    val = os.getenv(var, str(default)).lower()
    if val in ("y", "yes", "t", "true", "on", "1"):
        return True
    if val in ("n", "no", "f", "false", "off", "0"):
        return False
    return default

extract_tar(file_path, extract_to)

Extract a TAR-compatible archive into the target directory.

Source code in docs/rs-client-libraries/rs_common/utils.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def extract_tar(file_path: Path, extract_to: Path) -> int:
    """Extract a TAR-compatible archive into the target directory."""
    logger = get_run_logger()
    logger.info(f"Extracting TAR archive: {file_path} -> {extract_to}")

    with tarfile.open(file_path, "r:*") as tar:
        count = 0
        for member in tar.getmembers():
            if member.issym() or member.islnk() or member.isdev():
                logger.warning(f"Skipping unsafe TAR member: {member.name}")
                continue
            if not _is_safe_extract_path(extract_to, member.name):
                logger.warning(f"Skipping unsafe TAR member: {member.name}")
                continue
            destination = (extract_to / member.name).resolve()
            if member.isdir():
                destination.mkdir(parents=True, exist_ok=True)
            else:
                extracted = tar.extractfile(member)
                if extracted is None:
                    continue
                destination.parent.mkdir(parents=True, exist_ok=True)
                with extracted, destination.open("wb") as dst:
                    shutil.copyfileobj(extracted, dst)
            count += 1
        logger.info(f"TAR archive contains {count} safe entries")
        return count

extract_zip(zip_path, extract_to)

Extract a ZIP archive into the target directory.

Source code in docs/rs-client-libraries/rs_common/utils.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def extract_zip(zip_path: Path, extract_to: Path):
    """Extract a ZIP archive into the target directory."""
    logger = get_run_logger()
    logger.info(f"Extracting ZIP: {zip_path} -> {extract_to}")

    with zipfile.ZipFile(zip_path, "r") as zip_ref:
        count = 0
        for name in zip_ref.namelist():
            if not _is_safe_extract_path(extract_to, name):
                logger.warning(f"Skipping unsafe ZIP member: {name}")
                continue
            destination = (extract_to / name.replace("\\", "/")).resolve()
            if name.endswith("/"):
                destination.mkdir(parents=True, exist_ok=True)
            else:
                destination.parent.mkdir(parents=True, exist_ok=True)
                with zip_ref.open(name) as src, destination.open("wb") as dst:
                    shutil.copyfileobj(src, dst)
            count += 1
        logger.info(f"ZIP contains {count} safe entries")

get_href_service(rs_server_href, env_var)

Get specific href link.

Source code in docs/rs-client-libraries/rs_common/utils.py
58
59
60
61
62
63
64
def get_href_service(rs_server_href, env_var) -> str:
    """Get specific href link."""
    if from_env := os.getenv(env_var, None):
        return from_env.rstrip("/")
    if not rs_server_href:
        raise RuntimeError("RS-Server URL is undefined")
    return rs_server_href.rstrip("/")

get_upload_prefix(asset_href, asset_name)

Return the S3 prefix where extracted content should be uploaded.

Source code in docs/rs-client-libraries/rs_common/utils.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def get_upload_prefix(asset_href: str, asset_name: str) -> str:
    """Return the S3 prefix where extracted content should be uploaded."""
    parent_prefix = asset_href.rsplit("/", 1)[0]

    if asset_name.endswith(".zip"):
        return parent_prefix + "/"

    last_segment = parent_prefix.rsplit("/", 1)[-1]
    normalized_segment = strip_archive_suffix(last_segment)

    if normalized_segment != last_segment:
        base_prefix = parent_prefix.rsplit("/", 1)[0]
        return base_prefix + "/" + normalized_segment + "/"

    return parent_prefix + "/"

normalize_extract_dir(extract_dir)

Return the directory that should be used as the upload root.

If the extraction produced a single top-level directory, descend into it to avoid creating an unnecessary extra folder level in S3.

Source code in docs/rs-client-libraries/rs_common/utils.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def normalize_extract_dir(extract_dir: Path) -> Path:
    """
    Return the directory that should be used as the upload root.

    If the extraction produced a single top-level directory, descend into it to
    avoid creating an unnecessary extra folder level in S3.
    """
    logger = get_run_logger()
    root_items = list(extract_dir.iterdir())

    if len(root_items) != 1:
        logger.info("No normalization needed, multiple root items found")
        return extract_dir

    root_item = root_items[0]
    if not root_item.is_dir():
        logger.info("No normalization needed, single root item is not a directory")
        return extract_dir

    logger.info(f"Normalizing folder structure: entering {root_item}")
    return root_item

read_response_error(response)

Read and return an HTTP response error detail.

Source code in docs/rs-client-libraries/rs_common/utils.py
43
44
45
46
47
48
49
50
51
52
53
54
55
def read_response_error(response) -> str:
    """Read and return an HTTP response error detail."""

    # Try to read the response detail or error
    try:
        json = response.json()
        if isinstance(json, str):
            return json
        return json.get("detail") or json.get("description") or json["error"]

    # If this fail, get the full response content
    except Exception:  # pylint: disable=broad-exception-caught
        return response.content.decode("utf-8", errors="ignore")

recursive_extract(folder)

Extract nested TAR-compatible archives found anywhere under folder.

Source code in docs/rs-client-libraries/rs_common/utils.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def recursive_extract(folder: Path) -> int:
    """Extract nested TAR-compatible archives found anywhere under ``folder``."""
    logger = get_run_logger()
    extracted_count = 0
    extracted = True

    while extracted:
        extracted = False

        for root, _, files in os.walk(folder):
            for file in files:
                full_path = Path(root) / file

                if file.endswith((".tar", ".tgz", ".tar.gz")) and _extract_nested_archive(full_path):
                    extracted = True
                    extracted_count += 1

    logger.info(f"Processed {extracted_count} nested archive(s)")
    return extracted_count

strftime_millis(date)

Format datetime with milliseconds precision

Source code in docs/rs-client-libraries/rs_common/utils.py
81
82
83
def strftime_millis(date: datetime):
    """Format datetime with milliseconds precision"""
    return date.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"

strip_archive_suffix(name)

Return the asset name without its supported archive suffix.

Source code in docs/rs-client-libraries/rs_common/utils.py
194
195
196
197
198
199
def strip_archive_suffix(name: str) -> str:
    """Return the asset name without its supported archive suffix."""
    for suffix in (".tar.gz", ".tgz", ".zip", ".tar"):
        if name.endswith(suffix):
            return name.removesuffix(suffix)
    return name