Skip to content

rs_dpr_service/processors/conversion_processor.md

<< Back to index

Conversion Processor for converting a legacy product (safe format) into new Zarr format

ConversionProcessor

Bases: GenericProcessor

Runs an legacy product (safe format) conversion into new zarr format as a Dask job via subprocess.

Source code in docs/rs-dpr-service/rs_dpr_service/processors/conversion_processor.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class ConversionProcessor(GenericProcessor):
    """Runs an legacy product (safe format) conversion into new zarr format as a Dask job via subprocess."""

    def __init__(self, db_process_manager: PostgreSQLManager, cluster_info: ClusterInfo):
        """
        Initialize Conversion Processor
        """
        super().__init__(
            db_process_manager=db_process_manager,
            cluster_info=cluster_info,
            local_mode_address="DASK_GATEWAY_L0_ADDRESS",
        )

    def _check_s3_config(self):
        """Validate the S3 bucket credentials."""
        try:
            s3_config = {
                "key": os.environ["S3_ACCESSKEY"],
                "secret": os.environ["S3_SECRETKEY"],
                "client_kwargs": {
                    "endpoint_url": os.environ["S3_ENDPOINT"],
                    "region_name": os.environ["S3_REGION"],
                },
            }
        except (KeyError, TypeError) as e:
            raise ValueError(f"Missing safe S3 config parameter: {e}") from e

        try:
            fs = fsspec.filesystem("s3", **s3_config)
            fs.ls("/")  # Minimal check to force auth
            return fs
        except Exception as e:
            raise ConnectionError(f"Failed to connect to safe S3: {e}") from e

    def _check_input_output_uris(self, s3_fs, data: dict):
        """Check that input legacy product exists and output bucket path exists."""

        safe_uri = data.get("input_safe_path", "")
        out_dir = data.get("output_zarr_dir_path", "").rstrip("/")
        if not safe_uri.startswith("s3://"):
            raise ValueError(f"Invalid input_safe_path format (must start with 's3://'): {safe_uri}")
        if not out_dir.startswith("s3://"):
            raise ValueError(f"Invalid output_zarr_dir_path format (must start with 's3://'): {out_dir}")

        path = safe_uri.replace("s3://", "")
        if not s3_fs.exists(path):
            raise FileNotFoundError(f"Input SAFE path does not exist: {safe_uri}")

        bucket = out_dir.replace("s3://", "").split("/", 1)[0]
        if not s3_fs.exists(bucket):
            raise FileNotFoundError(f"Output S3 bucket does not exist: {out_dir}")

    def _check_write_permission(self, fs, out_dir: str):
        """Check write permission on the output bucket."""
        bucket = out_dir.replace("s3://", "").split("/", 1)[0]
        test_key = f"{bucket}/.perm_test_{uuid.uuid4().hex}"
        try:
            with fs.open(test_key, "wb"):
                pass
            fs.rm(test_key, recursive=False)
        except Exception as e:
            if "AccessDenied" in str(e) or "UnauthorizedOperation" in str(e):
                raise PermissionError(f"No write permission on bucket: {out_dir}") from e
            raise RuntimeError(f"Error checking write permissions: {e}") from e

    async def execute(
        self,
        data: dict,
        outputs=None,
    ) -> tuple[str, dict]:
        """
        Asynchronously execute the conversion process.
        """
        try:
            s3_fs = self._check_s3_config()
            self._check_input_output_uris(s3_fs, data)
            self._check_write_permission(s3_fs, data["output_zarr_dir_path"])
        except Exception as e:  # pylint: disable=broad-exception-caught
            msg = str(e)
            logger.error(f"Conversion failed: {msg}")
            self.job_logger.log_job_execution(JobStatus.failed, None, msg)
            return self.job_logger.get_execute_result()

        # Start execution
        return await super().execute(data, outputs)

    def manage_dask_tasks(self, dask_client: Client | None, data: dict):
        """
        Schedule SAFE to Zarr conversion on the Dask cluster using a nested subprocess task.
        """
        if not dask_client:
            raise RuntimeError("Dask client is undefined")

        # Log start
        self.job_logger.log_job_execution(JobStatus.running, 5, "Preparing conversion")
        try:
            # extract input parameter values
            safe_uri = data.get("input_safe_path")
            out_dir = data.get("output_zarr_dir_path", "").rstrip("/")
            basename = str(safe_uri).rsplit("/", 1)[-1].split(".", 1)[0]
            zarr_uri = f"{out_dir}/{basename}.zarr"

            # submit the task
            cfg = {
                "safe_uri": safe_uri,
                "zarr_uri": zarr_uri,
                "safe_s3_config": data.get("safe_s3_config", {}),
                "zarr_s3_config": data.get("zarr_s3_config", {}),
            }
            future = dask_client.submit(convert_safe_to_zarr, cfg)
            self.job_logger.log_job_execution(JobStatus.running, 50, "Conversion job submitted to cluster")

            # wait for result
            res = future.result()
            self.job_logger.log_job_execution(JobStatus.successful, 100, res)
        except Exception as e:  # pylint: disable=broad-exception-caught
            logger.error(f"Conversion failed: {e}")
            self.job_logger.log_job_execution(JobStatus.failed, None, f"Conversion failed: {e}")
        finally:
            dask_client.close()

__init__(db_process_manager, cluster_info)

Initialize Conversion Processor

Source code in docs/rs-dpr-service/rs_dpr_service/processors/conversion_processor.py
36
37
38
39
40
41
42
43
44
def __init__(self, db_process_manager: PostgreSQLManager, cluster_info: ClusterInfo):
    """
    Initialize Conversion Processor
    """
    super().__init__(
        db_process_manager=db_process_manager,
        cluster_info=cluster_info,
        local_mode_address="DASK_GATEWAY_L0_ADDRESS",
    )

execute(data, outputs=None) async

Asynchronously execute the conversion process.

Source code in docs/rs-dpr-service/rs_dpr_service/processors/conversion_processor.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
async def execute(
    self,
    data: dict,
    outputs=None,
) -> tuple[str, dict]:
    """
    Asynchronously execute the conversion process.
    """
    try:
        s3_fs = self._check_s3_config()
        self._check_input_output_uris(s3_fs, data)
        self._check_write_permission(s3_fs, data["output_zarr_dir_path"])
    except Exception as e:  # pylint: disable=broad-exception-caught
        msg = str(e)
        logger.error(f"Conversion failed: {msg}")
        self.job_logger.log_job_execution(JobStatus.failed, None, msg)
        return self.job_logger.get_execute_result()

    # Start execution
    return await super().execute(data, outputs)

manage_dask_tasks(dask_client, data)

Schedule SAFE to Zarr conversion on the Dask cluster using a nested subprocess task.

Source code in docs/rs-dpr-service/rs_dpr_service/processors/conversion_processor.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def manage_dask_tasks(self, dask_client: Client | None, data: dict):
    """
    Schedule SAFE to Zarr conversion on the Dask cluster using a nested subprocess task.
    """
    if not dask_client:
        raise RuntimeError("Dask client is undefined")

    # Log start
    self.job_logger.log_job_execution(JobStatus.running, 5, "Preparing conversion")
    try:
        # extract input parameter values
        safe_uri = data.get("input_safe_path")
        out_dir = data.get("output_zarr_dir_path", "").rstrip("/")
        basename = str(safe_uri).rsplit("/", 1)[-1].split(".", 1)[0]
        zarr_uri = f"{out_dir}/{basename}.zarr"

        # submit the task
        cfg = {
            "safe_uri": safe_uri,
            "zarr_uri": zarr_uri,
            "safe_s3_config": data.get("safe_s3_config", {}),
            "zarr_s3_config": data.get("zarr_s3_config", {}),
        }
        future = dask_client.submit(convert_safe_to_zarr, cfg)
        self.job_logger.log_job_execution(JobStatus.running, 50, "Conversion job submitted to cluster")

        # wait for result
        res = future.result()
        self.job_logger.log_job_execution(JobStatus.successful, 100, res)
    except Exception as e:  # pylint: disable=broad-exception-caught
        logger.error(f"Conversion failed: {e}")
        self.job_logger.log_job_execution(JobStatus.failed, None, f"Conversion failed: {e}")
    finally:
        dask_client.close()