Convert a legacy product (safe format) product to Zarr format using EOPF.
References:
- EOPF documentation: https://cpm.pages.eopf.copernicus.eu/eopf-cpm/main/index.html
Will run inside EOPF Dask cluster worker
main()
Convert from legacy product (safe format) into Zarr format using EOPF in a subprocess.
Source code in docs/rs-dpr-service/rs_dpr_service/safe_to_zarr.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 | def main():
"""Convert from legacy product (safe format) into Zarr format using EOPF in a subprocess."""
if len(sys.argv) < 2:
print("Usage: python safe_to_zarr.py '<json_string>'", file=sys.stderr)
sys.exit(1)
try:
cfg = json.loads(sys.argv[1])
except json.JSONDecodeError as e:
print(f"Failed to decode config JSON: {e}", file=sys.stderr)
sys.exit(1)
# do not use dask cluster
EOConfiguration()["store__convert__use_multithreading"] = False
# Converting a legacy product stored in a s3 bucket (safe format) into new Zarr format
safe_uri = cfg["safe_uri"]
zarr_uri = cfg["zarr_uri"]
s3_cfg = {
"key": os.environ["S3_ACCESSKEY"],
"secret": os.environ["S3_SECRETKEY"],
"client_kwargs": {
"endpoint_url": os.environ["S3_ENDPOINT"],
"region_name": os.environ["S3_REGION"],
},
}
try:
safe = AnyPath(safe_uri, **s3_cfg)
zarr = AnyPath(zarr_uri, **s3_cfg)
convert(safe, zarr)
print(
json.dumps(
{
"message": "Conversion finished",
"eopf_version": eopf.__version__,
"safe_uri": safe_uri,
"zarr_uri": zarr_uri,
},
),
)
except Exception as e: # pylint: disable=broad-exception-caught
print(f"Conversion failed safe_to_zarr: {e}", file=sys.stderr)
sys.exit(1)
|