Skip to content

rs_dpr_service/processors/generic_processor.md

<< Back to index

S1L0 and S3L0 Processors

GenericProcessor

Bases: BaseProcessor

Common signature of a processor in DPR-service.

NOTE: a new instance of this class is called for every endpoint call.

Source code in docs/rs-dpr-service/rs_dpr_service/processors/generic_processor.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
class GenericProcessor(BaseProcessor):
    """
    Common signature of a processor in DPR-service.

    NOTE: a new instance of this class is called for every endpoint call.
    """

    def __init__(
        self,
        db_process_manager: PostgreSQLManager,
        cluster_info: ClusterInfo,
        local_mode_address: str = "",
        tasktable_module: str = "",
        tasktable_class: str = "",
    ):  # pylint: disable=super-init-not-called
        self.use_mockup = False
        self.tasktable_module = tasktable_module
        self.tasktable_class = tasktable_class
        self.job_logger = JobLogger(db_process_manager)
        self.cluster_handler = DaskClusterHandler(cluster_info, local_mode_address)

    def replace_placeholders(self, obj):
        """
        Recursively replaces placeholders in the form ${PLACEHODER} within a nested structure (dict, list, str)
        using corresponding environment variable values.

        If an environment variable is not found, the placeholder is left unchanged and a warning is logged.

        Args:
            obj (Any): The input object, typically a dict or list, containing strings with placeholders.

        Returns:
            Any: The same structure with all placeholders replaced where possible.
        """
        pattern = re.compile(r"\$\{(\w+)\}")

        if isinstance(obj, dict):
            return {k: self.replace_placeholders(v) for k, v in obj.items()}
        if isinstance(obj, list):
            return [self.replace_placeholders(item) for item in obj]
        if isinstance(obj, str):

            def replacer(match):
                key = match.group(1)
                value = os.environ.get(key)
                if value is None:
                    logger.warning("Environment variable '%s' not found; leaving placeholder unchanged.", key)
                    return match.group(0)
                return value

            return pattern.sub(replacer, obj)
        return obj

    async def get_tasktable(self):
        """Return the EOPF tasktable for a given module and class names"""
        dask_client = self.cluster_handler.setup_dask_connection()

        try:
            # Extract span infos to send to Dask
            span_context = trace.get_current_span().get_span_context()

            dpr_processor = call_dask.ProcessorCaller(
                caller_env=dict(os.environ),
                span_context=span_context,
                cluster_address=self.cluster_handler.cluster_address,
                cluster_info=self.cluster_handler.cluster_info,
                data={},  # not used for the tasktables
                use_mockup=self.use_mockup,
            )

            # Run processor in the dask client
            task_table_task = dask_client.submit(
                dpr_processor.get_tasktable,
                module_name=self.tasktable_module,
                class_name=self.tasktable_class,
                pure=False,  # disable cache
            )
            res = task_table_task.result()

            # Return a default hardcoded value for the mockup
            if (not res) and self.use_mockup:
                with open(Path(__file__).parent.parent / "config" / "tasktable.json", encoding="utf-8") as tf:
                    return json.loads(tf.read())
            return res
        except Exception as e:  # pylint: disable=broad-exception-caught
            logger.exception(f"Submitting task to dask cluster failed. Reason: {traceback.format_exc()}")
            raise e
        finally:
            # cleanup by disconnecting the dask client
            dask_client.close()

    # Override from BaseProcessor, execute is async in RSPYProcessor
    async def execute(  # pylint: disable=invalid-overridden-method
        self,
        data: dict,
        outputs=None,
    ) -> tuple[str, dict]:
        """
        Asynchronously execute the dpr process in the dask cluster
        """

        # self.logger.debug(f"Executing staging processor for {data}")

        self.job_logger.log_job_execution(JobStatus.running, 0, "Processor execution started")
        # Start execution
        loop = asyncio.get_event_loop()
        if loop.is_running():
            # If the loop is running, schedule the async function
            asyncio.create_task(self.start_processor(data))
        else:
            # If the loop is not running, run it until complete
            loop.run_until_complete(self.start_processor(data))

        return self.job_logger.get_execute_result()

    async def start_processor(
        self,
        data: dict,
    ) -> tuple[str, dict]:
        """
        Method used to trigger dask distributed streaming process.
        It creates dask client object, gets the external dpr_payload sources access token
        Prepares the tasks for execution
        Manage eventual runtime exceptions

        Args:
            catalog_collection (str): Name of the catalog collection.

        Returns:
            tuple: tuple of MIME type and process response (dictionary containing the job ID and a
                status message).
                Example: ("application/json", {"running": <job_id>})
        """
        logger.debug("Starting main loop")

        try:
            experimental_config = ExperimentalConfig(**data.get("experimental_config", {}))

            # For testing: run the eopf-cpm scheduler on local.
            # It will then init a dask LocalCluster instance itself.
            if LOCAL_MODE and experimental_config.local_cluster.enabled:
                dask_client = None

            # Nominal case: run the eopf-cpm scheduler on a dedicated cluster pod, not locally.
            else:
                dask_client = self.cluster_handler.setup_dask_connection()
        except KeyError as ke:
            return self.job_logger.log_job_execution(
                JobStatus.failed,
                0,
                f"Failed to start the dpr-service process: No env var {ke} found",
                log_exception=True,
            )
        except RuntimeError:
            return self.job_logger.log_job_execution(
                JobStatus.failed,
                0,
                f"Failed to start the dpr-service process: {traceback.format_exc()}",
                log_exception=True,
            )

        self.job_logger.log_job_execution(JobStatus.running, 0, "Sending task to the dask cluster")

        # Manage dask tasks in a separate thread
        # starting a thread for managing the dask callbacks
        logger.debug("Starting tasks monitoring thread")
        try:
            await asyncio.to_thread(
                self.manage_dask_tasks,
                dask_client,
                data,
            )
        except Exception:  # pylint: disable=broad-exception-caught
            self.job_logger.log_job_execution(
                JobStatus.failed,
                0,
                f"Error from tasks monitoring thread: {traceback.format_exc()}",
                log_exception=True,
            )

        # cleanup by disconnecting the dask client
        if dask_client:
            dask_client.close()

        return self.job_logger.get_execute_result()

    def manage_dask_tasks(self, dask_client: Client | None, data: dict):
        """
        Manages Dask tasks where the dpr processor is started.
        """
        logger.info("Tasks monitoring started")
        res = None

        self.job_logger.log_job_execution(
            JobStatus.running,
            50,
            "In progress",
        )
        try:
            # For the mockup, replace placeholders by env vars.
            # For the real processor, it is done automatically by eopf.
            if self.use_mockup:
                data = self.replace_placeholders(data)

            # Extract span infos to send to Dask
            span_context = trace.get_current_span().get_span_context()

            dpr_processor = call_dask.ProcessorCaller(
                caller_env=dict(os.environ),
                span_context=span_context,
                cluster_address=self.cluster_handler.cluster_address,
                cluster_info=self.cluster_handler.cluster_info,
                data=data,
                use_mockup=self.use_mockup,
            )

            # Nominal usecase: run processor in the dask client
            if dask_client:
                dpr_task = dask_client.submit(
                    dpr_processor.run_processor,
                    pure=False,  # disable cache
                )

            # Specific case for local debugging
            else:
                dpr_task = None
                res = dpr_processor.run_processor()

        except Exception:  # pylint: disable=broad-exception-caught
            if not dask_client:
                logger.exception(traceback.format_exc())
                raise
            self.job_logger.log_job_execution(
                JobStatus.failed,
                None,
                f"Submitting task to dask cluster failed. Reason: {traceback.format_exc()}",
                log_exception=True,
            )
            return

        try:
            if dpr_task:
                res = dpr_task.result()  # This will raise the exception from the task if it failed
                logger.info("%s Task streaming completed", dpr_task.key)

        except Exception:  # pylint: disable=broad-exception-caught
            # Update status for the job
            self.job_logger.log_job_execution(
                JobStatus.failed,
                None,
                f"The dpr processing task failed: {traceback.format_exc()}",
                log_exception=True,
            )
            return

        # Update status and insert the result of the dask task in the jobs table
        self.job_logger.log_job_execution(JobStatus.successful, 100, str(res))
        # write the results in a s3 bucket file

        # Update the subscribers for token refreshment
        logger.info("Tasks monitoring finished")

execute(data, outputs=None) async

Asynchronously execute the dpr process in the dask cluster

Source code in docs/rs-dpr-service/rs_dpr_service/processors/generic_processor.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
async def execute(  # pylint: disable=invalid-overridden-method
    self,
    data: dict,
    outputs=None,
) -> tuple[str, dict]:
    """
    Asynchronously execute the dpr process in the dask cluster
    """

    # self.logger.debug(f"Executing staging processor for {data}")

    self.job_logger.log_job_execution(JobStatus.running, 0, "Processor execution started")
    # Start execution
    loop = asyncio.get_event_loop()
    if loop.is_running():
        # If the loop is running, schedule the async function
        asyncio.create_task(self.start_processor(data))
    else:
        # If the loop is not running, run it until complete
        loop.run_until_complete(self.start_processor(data))

    return self.job_logger.get_execute_result()

get_tasktable() async

Return the EOPF tasktable for a given module and class names

Source code in docs/rs-dpr-service/rs_dpr_service/processors/generic_processor.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
async def get_tasktable(self):
    """Return the EOPF tasktable for a given module and class names"""
    dask_client = self.cluster_handler.setup_dask_connection()

    try:
        # Extract span infos to send to Dask
        span_context = trace.get_current_span().get_span_context()

        dpr_processor = call_dask.ProcessorCaller(
            caller_env=dict(os.environ),
            span_context=span_context,
            cluster_address=self.cluster_handler.cluster_address,
            cluster_info=self.cluster_handler.cluster_info,
            data={},  # not used for the tasktables
            use_mockup=self.use_mockup,
        )

        # Run processor in the dask client
        task_table_task = dask_client.submit(
            dpr_processor.get_tasktable,
            module_name=self.tasktable_module,
            class_name=self.tasktable_class,
            pure=False,  # disable cache
        )
        res = task_table_task.result()

        # Return a default hardcoded value for the mockup
        if (not res) and self.use_mockup:
            with open(Path(__file__).parent.parent / "config" / "tasktable.json", encoding="utf-8") as tf:
                return json.loads(tf.read())
        return res
    except Exception as e:  # pylint: disable=broad-exception-caught
        logger.exception(f"Submitting task to dask cluster failed. Reason: {traceback.format_exc()}")
        raise e
    finally:
        # cleanup by disconnecting the dask client
        dask_client.close()

manage_dask_tasks(dask_client, data)

Manages Dask tasks where the dpr processor is started.

Source code in docs/rs-dpr-service/rs_dpr_service/processors/generic_processor.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def manage_dask_tasks(self, dask_client: Client | None, data: dict):
    """
    Manages Dask tasks where the dpr processor is started.
    """
    logger.info("Tasks monitoring started")
    res = None

    self.job_logger.log_job_execution(
        JobStatus.running,
        50,
        "In progress",
    )
    try:
        # For the mockup, replace placeholders by env vars.
        # For the real processor, it is done automatically by eopf.
        if self.use_mockup:
            data = self.replace_placeholders(data)

        # Extract span infos to send to Dask
        span_context = trace.get_current_span().get_span_context()

        dpr_processor = call_dask.ProcessorCaller(
            caller_env=dict(os.environ),
            span_context=span_context,
            cluster_address=self.cluster_handler.cluster_address,
            cluster_info=self.cluster_handler.cluster_info,
            data=data,
            use_mockup=self.use_mockup,
        )

        # Nominal usecase: run processor in the dask client
        if dask_client:
            dpr_task = dask_client.submit(
                dpr_processor.run_processor,
                pure=False,  # disable cache
            )

        # Specific case for local debugging
        else:
            dpr_task = None
            res = dpr_processor.run_processor()

    except Exception:  # pylint: disable=broad-exception-caught
        if not dask_client:
            logger.exception(traceback.format_exc())
            raise
        self.job_logger.log_job_execution(
            JobStatus.failed,
            None,
            f"Submitting task to dask cluster failed. Reason: {traceback.format_exc()}",
            log_exception=True,
        )
        return

    try:
        if dpr_task:
            res = dpr_task.result()  # This will raise the exception from the task if it failed
            logger.info("%s Task streaming completed", dpr_task.key)

    except Exception:  # pylint: disable=broad-exception-caught
        # Update status for the job
        self.job_logger.log_job_execution(
            JobStatus.failed,
            None,
            f"The dpr processing task failed: {traceback.format_exc()}",
            log_exception=True,
        )
        return

    # Update status and insert the result of the dask task in the jobs table
    self.job_logger.log_job_execution(JobStatus.successful, 100, str(res))
    # write the results in a s3 bucket file

    # Update the subscribers for token refreshment
    logger.info("Tasks monitoring finished")

replace_placeholders(obj)

Recursively replaces placeholders in the form ${PLACEHODER} within a nested structure (dict, list, str) using corresponding environment variable values.

If an environment variable is not found, the placeholder is left unchanged and a warning is logged.

Parameters:

Name Type Description Default
obj Any

The input object, typically a dict or list, containing strings with placeholders.

required

Returns:

Name Type Description
Any

The same structure with all placeholders replaced where possible.

Source code in docs/rs-dpr-service/rs_dpr_service/processors/generic_processor.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def replace_placeholders(self, obj):
    """
    Recursively replaces placeholders in the form ${PLACEHODER} within a nested structure (dict, list, str)
    using corresponding environment variable values.

    If an environment variable is not found, the placeholder is left unchanged and a warning is logged.

    Args:
        obj (Any): The input object, typically a dict or list, containing strings with placeholders.

    Returns:
        Any: The same structure with all placeholders replaced where possible.
    """
    pattern = re.compile(r"\$\{(\w+)\}")

    if isinstance(obj, dict):
        return {k: self.replace_placeholders(v) for k, v in obj.items()}
    if isinstance(obj, list):
        return [self.replace_placeholders(item) for item in obj]
    if isinstance(obj, str):

        def replacer(match):
            key = match.group(1)
            value = os.environ.get(key)
            if value is None:
                logger.warning("Environment variable '%s' not found; leaving placeholder unchanged.", key)
                return match.group(0)
            return value

        return pattern.sub(replacer, obj)
    return obj

start_processor(data) async

Method used to trigger dask distributed streaming process. It creates dask client object, gets the external dpr_payload sources access token Prepares the tasks for execution Manage eventual runtime exceptions

Parameters:

Name Type Description Default
catalog_collection str

Name of the catalog collection.

required

Returns:

Name Type Description
tuple tuple[str, dict]

tuple of MIME type and process response (dictionary containing the job ID and a status message). Example: ("application/json", {"running": })

Source code in docs/rs-dpr-service/rs_dpr_service/processors/generic_processor.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
async def start_processor(
    self,
    data: dict,
) -> tuple[str, dict]:
    """
    Method used to trigger dask distributed streaming process.
    It creates dask client object, gets the external dpr_payload sources access token
    Prepares the tasks for execution
    Manage eventual runtime exceptions

    Args:
        catalog_collection (str): Name of the catalog collection.

    Returns:
        tuple: tuple of MIME type and process response (dictionary containing the job ID and a
            status message).
            Example: ("application/json", {"running": <job_id>})
    """
    logger.debug("Starting main loop")

    try:
        experimental_config = ExperimentalConfig(**data.get("experimental_config", {}))

        # For testing: run the eopf-cpm scheduler on local.
        # It will then init a dask LocalCluster instance itself.
        if LOCAL_MODE and experimental_config.local_cluster.enabled:
            dask_client = None

        # Nominal case: run the eopf-cpm scheduler on a dedicated cluster pod, not locally.
        else:
            dask_client = self.cluster_handler.setup_dask_connection()
    except KeyError as ke:
        return self.job_logger.log_job_execution(
            JobStatus.failed,
            0,
            f"Failed to start the dpr-service process: No env var {ke} found",
            log_exception=True,
        )
    except RuntimeError:
        return self.job_logger.log_job_execution(
            JobStatus.failed,
            0,
            f"Failed to start the dpr-service process: {traceback.format_exc()}",
            log_exception=True,
        )

    self.job_logger.log_job_execution(JobStatus.running, 0, "Sending task to the dask cluster")

    # Manage dask tasks in a separate thread
    # starting a thread for managing the dask callbacks
    logger.debug("Starting tasks monitoring thread")
    try:
        await asyncio.to_thread(
            self.manage_dask_tasks,
            dask_client,
            data,
        )
    except Exception:  # pylint: disable=broad-exception-caught
        self.job_logger.log_job_execution(
            JobStatus.failed,
            0,
            f"Error from tasks monitoring thread: {traceback.format_exc()}",
            log_exception=True,
        )

    # cleanup by disconnecting the dask client
    if dask_client:
        dask_client.close()

    return self.job_logger.get_execute_result()