Skip to content

rs_dpr_service/utils/job_logger.md

<< Back to index

Advanced logger for Dask jobs.

JobLogger

Advanced logger to track Dask jobs execution.

Source code in docs/rs-dpr-service/rs_dpr_service/utils/job_logger.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class JobLogger:
    """
    Advanced logger to track Dask jobs execution.
    """

    def __init__(self, db_process_manager: PostgreSQLManager):
        self.job_id: str = str(uuid.uuid4())  # Generate a unique job ID
        self.message: str = "Processing Unit was created"
        self.progress: float = 0.0
        self.status = JobStatus.accepted
        self.db_process_manager = db_process_manager
        self.create_job_execution()

    def get_execute_result(self) -> tuple[str, dict]:
        """
        Returns current status of the job logger.

        Returns:
            tuple: tuple of MIME type and process response (dictionary containing the job ID and a
                status message).
                Example: ("application/json", {"running": <job_id>})
        """
        return "application/json", {self.status.value: self.job_id}

    def create_job_execution(self):
        """
        Creates a new job execution entry and tracks its status.

        This method creates a job entry in the tracker with the current job's ID, status,
        progress, and message. The job information is stored in a persistent tracker to allow
        monitoring and updating of the job's execution state.

        The following information is stored:
            - `job_id`: The unique identifier for the job.
            - `status`: The current status of the job, converted to a JSON-serializable format.
            - `progress`: The progress of the job execution.
            - `message`: Additional details about the job's execution.

        Notes:
            - The `self.tracker` is expected to have an `insert` method to store the job information.
            - The status is converted to JSON using `JobStatus.to_json()`.

        """
        job_metadata = {
            "identifier": self.job_id,
            "processID": "dpr-service",
            "status": self.status.value,
            "progress": self.progress,
            "message": self.message,
        }
        self.db_process_manager.add_job(job_metadata)

    def log_job_execution(
        self,
        status: JobStatus | None = None,
        progress: int | None = None,
        message: str | None = None,
        log_exception: bool = False,
    ) -> tuple[str, dict]:
        """
        Method used to log progress into db.

        Args:
            status (JobStatus): new job status
            progress (int): new job progress (percentage)
            message (str): new job current information message
            log_exception (bool): log.exception the message

        Returns:
            tuple: tuple of MIME type and process response (dictionary containing the job ID and a
                status message).
                Example: ("application/json", {"running": <job_id>})
        """

        # Update both runtime and db status and progress
        self.status = status if status else self.status
        self.progress = progress if progress else self.progress
        self.message = message if message else self.message

        if log_exception:
            logger.exception(self.message)

        update_data = {
            "status": self.status.value,
            "progress": self.progress,
            "message": self.message,
            "updated": datetime.now(),  # Update updated each time a change is made
        }
        if status == JobStatus.failed:
            logger.error(f"Updating failed job {self.job_id}: {update_data}")
        else:
            logger.info(f"Updating job {self.job_id}: {update_data}")

        self.db_process_manager.update_job(self.job_id, update_data)

        return self.get_execute_result()

create_job_execution()

Creates a new job execution entry and tracks its status.

This method creates a job entry in the tracker with the current job's ID, status, progress, and message. The job information is stored in a persistent tracker to allow monitoring and updating of the job's execution state.

The following information is stored
  • job_id: The unique identifier for the job.
  • status: The current status of the job, converted to a JSON-serializable format.
  • progress: The progress of the job execution.
  • message: Additional details about the job's execution.
Notes
  • The self.tracker is expected to have an insert method to store the job information.
  • The status is converted to JSON using JobStatus.to_json().
Source code in docs/rs-dpr-service/rs_dpr_service/utils/job_logger.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def create_job_execution(self):
    """
    Creates a new job execution entry and tracks its status.

    This method creates a job entry in the tracker with the current job's ID, status,
    progress, and message. The job information is stored in a persistent tracker to allow
    monitoring and updating of the job's execution state.

    The following information is stored:
        - `job_id`: The unique identifier for the job.
        - `status`: The current status of the job, converted to a JSON-serializable format.
        - `progress`: The progress of the job execution.
        - `message`: Additional details about the job's execution.

    Notes:
        - The `self.tracker` is expected to have an `insert` method to store the job information.
        - The status is converted to JSON using `JobStatus.to_json()`.

    """
    job_metadata = {
        "identifier": self.job_id,
        "processID": "dpr-service",
        "status": self.status.value,
        "progress": self.progress,
        "message": self.message,
    }
    self.db_process_manager.add_job(job_metadata)

get_execute_result()

Returns current status of the job logger.

Returns:

Name Type Description
tuple tuple[str, dict]

tuple of MIME type and process response (dictionary containing the job ID and a status message). Example: ("application/json", {"running": })

Source code in docs/rs-dpr-service/rs_dpr_service/utils/job_logger.py
43
44
45
46
47
48
49
50
51
52
def get_execute_result(self) -> tuple[str, dict]:
    """
    Returns current status of the job logger.

    Returns:
        tuple: tuple of MIME type and process response (dictionary containing the job ID and a
            status message).
            Example: ("application/json", {"running": <job_id>})
    """
    return "application/json", {self.status.value: self.job_id}

log_job_execution(status=None, progress=None, message=None, log_exception=False)

Method used to log progress into db.

Parameters:

Name Type Description Default
status JobStatus

new job status

None
progress int

new job progress (percentage)

None
message str

new job current information message

None
log_exception bool

log.exception the message

False

Returns:

Name Type Description
tuple tuple[str, dict]

tuple of MIME type and process response (dictionary containing the job ID and a status message). Example: ("application/json", {"running": })

Source code in docs/rs-dpr-service/rs_dpr_service/utils/job_logger.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def log_job_execution(
    self,
    status: JobStatus | None = None,
    progress: int | None = None,
    message: str | None = None,
    log_exception: bool = False,
) -> tuple[str, dict]:
    """
    Method used to log progress into db.

    Args:
        status (JobStatus): new job status
        progress (int): new job progress (percentage)
        message (str): new job current information message
        log_exception (bool): log.exception the message

    Returns:
        tuple: tuple of MIME type and process response (dictionary containing the job ID and a
            status message).
            Example: ("application/json", {"running": <job_id>})
    """

    # Update both runtime and db status and progress
    self.status = status if status else self.status
    self.progress = progress if progress else self.progress
    self.message = message if message else self.message

    if log_exception:
        logger.exception(self.message)

    update_data = {
        "status": self.status.value,
        "progress": self.progress,
        "message": self.message,
        "updated": datetime.now(),  # Update updated each time a change is made
    }
    if status == JobStatus.failed:
        logger.error(f"Updating failed job {self.job_id}: {update_data}")
    else:
        logger.info(f"Updating job {self.job_id}: {update_data}")

    self.db_process_manager.update_job(self.job_id, update_data)

    return self.get_execute_result()