Skip to content

rs_dpr_service/safe_to_zarr.md

<< Back to index

Convert a legacy product (safe format) product to Zarr format using EOPF.

References: - EOPF documentation: https://cpm.pages.eopf.copernicus.eu/eopf-cpm/main/index.html

Will run inside EOPF Dask cluster worker

main()

Convert from legacy product (safe format) into Zarr format using EOPF in a subprocess.

Source code in docs/rs-dpr-service/rs_dpr_service/safe_to_zarr.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def main():
    """Convert from legacy product (safe format) into Zarr format using EOPF in a subprocess."""
    if len(sys.argv) < 2:
        print("Usage: python safe_to_zarr.py '<json_string>'", file=sys.stderr)
        sys.exit(1)

    try:
        cfg = json.loads(sys.argv[1])
    except json.JSONDecodeError as e:
        print(f"Failed to decode config JSON: {e}", file=sys.stderr)
        sys.exit(1)

    # do not use dask cluster
    EOConfiguration()["store__convert__use_multithreading"] = False

    # Converting a legacy product stored in a s3 bucket (safe format) into new Zarr format
    safe_uri = cfg["safe_uri"]
    zarr_uri = cfg["zarr_uri"]
    s3_cfg = {
        "key": os.environ["S3_ACCESSKEY"],
        "secret": os.environ["S3_SECRETKEY"],
        "client_kwargs": {
            "endpoint_url": os.environ["S3_ENDPOINT"],
            "region_name": os.environ["S3_REGION"],
        },
    }
    try:
        safe = AnyPath(safe_uri, **s3_cfg)
        zarr = AnyPath(zarr_uri, **s3_cfg)
        convert(safe, zarr)

        print(
            json.dumps(
                {
                    "message": "Conversion finished",
                    "eopf_version": eopf.__version__,
                    "safe_uri": safe_uri,
                    "zarr_uri": zarr_uri,
                },
            ),
        )
    except Exception as e:  # pylint: disable=broad-exception-caught
        print(f"Conversion failed safe_to_zarr: {e}", file=sys.stderr)
        sys.exit(1)