33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 | class ConversionProcessor(GenericProcessor):
"""Runs an legacy product (safe format) conversion into new zarr format as a Dask job via subprocess."""
def __init__(self, db_process_manager: PostgreSQLManager, cluster_info: ClusterInfo):
"""
Initialize Conversion Processor
"""
super().__init__(
db_process_manager=db_process_manager,
cluster_info=cluster_info,
local_mode_address="DASK_GATEWAY_L0_ADDRESS",
)
def _check_s3_config(self):
"""Validate the S3 bucket credentials."""
try:
s3_config = {
"key": os.environ["S3_ACCESSKEY"],
"secret": os.environ["S3_SECRETKEY"],
"client_kwargs": {
"endpoint_url": os.environ["S3_ENDPOINT"],
"region_name": os.environ["S3_REGION"],
},
}
except (KeyError, TypeError) as e:
raise ValueError(f"Missing safe S3 config parameter: {e}") from e
try:
fs = fsspec.filesystem("s3", **s3_config)
fs.ls("/") # Minimal check to force auth
return fs
except Exception as e:
raise ConnectionError(f"Failed to connect to safe S3: {e}") from e
def _check_input_output_uris(self, s3_fs, data: dict):
"""Check that input legacy product exists and output bucket path exists."""
safe_uri = data.get("input_safe_path", "")
out_dir = data.get("output_zarr_dir_path", "").rstrip("/")
if not safe_uri.startswith("s3://"):
raise ValueError(f"Invalid input_safe_path format (must start with 's3://'): {safe_uri}")
if not out_dir.startswith("s3://"):
raise ValueError(f"Invalid output_zarr_dir_path format (must start with 's3://'): {out_dir}")
path = safe_uri.replace("s3://", "")
if not s3_fs.exists(path):
raise FileNotFoundError(f"Input SAFE path does not exist: {safe_uri}")
bucket = out_dir.replace("s3://", "").split("/", 1)[0]
if not s3_fs.exists(bucket):
raise FileNotFoundError(f"Output S3 bucket does not exist: {out_dir}")
def _check_write_permission(self, fs, out_dir: str):
"""Check write permission on the output bucket."""
bucket = out_dir.replace("s3://", "").split("/", 1)[0]
test_key = f"{bucket}/.perm_test_{uuid.uuid4().hex}"
try:
with fs.open(test_key, "wb"):
pass
fs.rm(test_key, recursive=False)
except Exception as e:
if "AccessDenied" in str(e) or "UnauthorizedOperation" in str(e):
raise PermissionError(f"No write permission on bucket: {out_dir}") from e
raise RuntimeError(f"Error checking write permissions: {e}") from e
async def execute(
self,
data: dict,
outputs=None,
) -> tuple[str, dict]:
"""
Asynchronously execute the conversion process.
"""
try:
s3_fs = self._check_s3_config()
self._check_input_output_uris(s3_fs, data)
self._check_write_permission(s3_fs, data["output_zarr_dir_path"])
except Exception as e: # pylint: disable=broad-exception-caught
msg = str(e)
logger.error(f"Conversion failed: {msg}")
self.job_logger.log_job_execution(JobStatus.failed, None, msg)
return self.job_logger.get_execute_result()
# Start execution
return await super().execute(data, outputs)
def manage_dask_tasks(self, dask_client: Client | None, data: dict):
"""
Schedule SAFE to Zarr conversion on the Dask cluster using a nested subprocess task.
"""
if not dask_client:
raise RuntimeError("Dask client is undefined")
# Log start
self.job_logger.log_job_execution(JobStatus.running, 5, "Preparing conversion")
try:
# extract input parameter values
safe_uri = data.get("input_safe_path")
out_dir = data.get("output_zarr_dir_path", "").rstrip("/")
basename = str(safe_uri).rsplit("/", 1)[-1].split(".", 1)[0]
zarr_uri = f"{out_dir}/{basename}.zarr"
# submit the task
cfg = {
"safe_uri": safe_uri,
"zarr_uri": zarr_uri,
"safe_s3_config": data.get("safe_s3_config", {}),
"zarr_s3_config": data.get("zarr_s3_config", {}),
}
future = dask_client.submit(convert_safe_to_zarr, cfg)
self.job_logger.log_job_execution(JobStatus.running, 50, "Conversion job submitted to cluster")
# wait for result
res = future.result()
self.job_logger.log_job_execution(JobStatus.successful, 100, res)
except Exception as e: # pylint: disable=broad-exception-caught
logger.error(f"Conversion failed: {e}")
self.job_logger.log_job_execution(JobStatus.failed, None, f"Conversion failed: {e}")
finally:
dask_client.close()
|