Skip to content

rs_workflows/operation/quota_monitoring_flow.md

<< Back to index

OBS logs ingestion flow implementation

batch_insert(conn, rows)

Insert a batch of rows using execute_values for high performance.

Source code in docs/rs-client-libraries/rs_workflows/operation/quota_monitoring_flow.py
150
151
152
153
154
155
156
157
158
def batch_insert(conn, rows):
    """
    Insert a batch of rows using execute_values for high performance.
    """
    if not rows:
        return
    with conn.cursor() as cur:
        psycopg2.extras.execute_values(cur, INSERT_SQL, rows)
    conn.commit()

clean_obs_logs(retention_days=DEFAULT_MAX_DAYS, env=FlowEnvArgs(owner_id='operator-quota')) async

Purge the table s3_access_log. Args: retention_days : number of days of retention of log information from OBS env (FlowEnvArgs, optional): Flow environment arguments containing owner_id and other configuration parameters. Returns: None Raises: psycopg2.Error: If database connection or insertion operations fail.

Source code in docs/rs-client-libraries/rs_workflows/operation/quota_monitoring_flow.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
@flow(name="clean-obs-logs")
async def clean_obs_logs(
    retention_days: int = DEFAULT_MAX_DAYS,
    env: FlowEnvArgs = FlowEnvArgs(owner_id="operator-quota"),
):
    """
    Purge the table s3_access_log.
    Args:
        retention_days : number of days of retention of log information from OBS
        env (FlowEnvArgs, optional): Flow environment arguments containing owner_id and other
            configuration parameters.
    Returns:
        None
    Raises:
        psycopg2.Error: If database connection or insertion operations fail.
    """

    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "clean-quota-database"):
        logger.info("Retrieve credentials to access Postgres quota database")
        db_user = os.environ["POSTGRES_QUOTA_USER"]
        db_password = os.environ["POSTGRES_QUOTA_PASSWORD"]
        db_host = os.environ["POSTGRES_HOST"]
        db_port = os.environ["POSTGRES_PORT"]

        conn = psycopg2.connect(dbname=DB_NAME, user=db_user, password=db_password, host=db_host, port=db_port)

        with conn:
            with conn.cursor() as cur:
                cur.execute(
                    """
                    DELETE FROM s3_access_log
                    WHERE time < NOW() - INTERVAL %s;
                    """,
                    (f"{retention_days} days",),  # ← tuple obligatoire
                )

        conn.close()
        logger.info("⏳ Old data have been removed.")

collect_obs_logs(platform='rspython-ops', max_files=DEFAULT_MAX_FILES, threshold_minute=DEFAULT_AGE_THRESHOLD, env=FlowEnvArgs(owner_id='operator-quota')) async

Collect and process OVH quota monitoring logs from S3 and insert them into a PostgreSQL database. This async function retrieves observability logs from an S3 bucket, parses them, and performs batch insertions into a Postgres quota database. Args: platform : platform name, will be the prefix of the bucket name max_files : number maximum of files read from the bucket threshold_minute : this threshold is set to avoid reading and deleting a log file whereas it is filled env (FlowEnvArgs, optional): Flow environment arguments containing owner_id and other configuration parameters. Returns: None Raises: psycopg2.Error: If database connection or insertion operations fail. KeyError: If required environment variables are not set. Exception: If S3 operations or log parsing encounters errors. Environment Variables Required: - POSTGRES_QUOTA_USER: PostgreSQL username for quota database - POSTGRES_QUOTA_PASSWORD: PostgreSQL password - POSTGRES_HOST: PostgreSQL host address - POSTGRES_PORT: PostgreSQL port number - S3_QUOTA_ENDPOINT: S3 endpoint URL to access the bucket with logs - S3_QUOTA_ACCESSKEY: S3 access key ID to access the bucket with logs - S3_QUOTA_SECRETKEY: S3 secret access key to access the bucket with logs - S3_QUOTA_REGION: S3 region name to access the bucket with logs

Source code in docs/rs-client-libraries/rs_workflows/operation/quota_monitoring_flow.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
@flow(timeout_seconds=600, name="collect-obs-logs")
async def collect_obs_logs(
    platform: str = "rspython-ops",
    max_files: int = DEFAULT_MAX_FILES,
    threshold_minute: int = DEFAULT_AGE_THRESHOLD,
    env: FlowEnvArgs = FlowEnvArgs(owner_id="operator-quota"),
):
    """
    Collect and process OVH quota monitoring logs from S3 and insert them into a PostgreSQL database.
    This async function retrieves observability logs from an S3 bucket, parses them, and performs
    batch insertions into a Postgres quota database.
    Args:
        platform : platform name, will be the prefix of the bucket name
        max_files : number maximum of files read from the bucket
        threshold_minute : this threshold is set to avoid reading and deleting a log file whereas it is filled
        env (FlowEnvArgs, optional): Flow environment arguments containing owner_id and other
            configuration parameters.
    Returns:
        None
    Raises:
        psycopg2.Error: If database connection or insertion operations fail.
        KeyError: If required environment variables are not set.
        Exception: If S3 operations or log parsing encounters errors.
    Environment Variables Required:
        - POSTGRES_QUOTA_USER: PostgreSQL username for quota database
        - POSTGRES_QUOTA_PASSWORD: PostgreSQL password
        - POSTGRES_HOST: PostgreSQL host address
        - POSTGRES_PORT: PostgreSQL port number
        - S3_QUOTA_ENDPOINT: S3 endpoint URL to access the bucket with logs
        - S3_QUOTA_ACCESSKEY: S3 access key ID to access the bucket with logs
        - S3_QUOTA_SECRETKEY: S3 secret access key to access the bucket with logs
        - S3_QUOTA_REGION: S3 region name to access the bucket with logs
    """

    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "obs-quota-monitoring"):

        bucket_name = platform + LOG_BUCKET_SUFFIX
        logger.info("Retrieve credentials to access Postgres quota database")
        db_user = os.environ["POSTGRES_QUOTA_USER"]
        db_password = os.environ["POSTGRES_QUOTA_PASSWORD"]
        db_host = os.environ["POSTGRES_HOST"]
        db_port = os.environ["POSTGRES_PORT"]

        logger.info(f"Retrieve credentials to access bucket {bucket_name}.")
        s3 = boto3.client(
            "s3",
            endpoint_url=os.environ["S3_QUOTA_ENDPOINT"],
            aws_access_key_id=os.environ["S3_QUOTA_ACCESSKEY"],
            aws_secret_access_key=os.environ["S3_QUOTA_SECRETKEY"],
            config=Config(signature_version="s3v4"),
            region_name=os.environ["S3_QUOTA_REGION"],
        )
        conn = psycopg2.connect(dbname=DB_NAME, user=db_user, password=db_password, host=db_host, port=db_port)

        batch = []
        logger.info(
            f"⏳ Processing Object Storage logs with batch insert from bucket {bucket_name}.",
        )
        try:
            for key in list_recent_files(s3, platform, max_files, threshold_minute):
                logger.info(f"\n📄 Reading file: {key}")

                for line in read_object(s3, platform, key):
                    parsed = parse_log_line(line)
                    if parsed:
                        batch.append(parsed)

                    # Flush batch when full
                    if len(batch) >= BATCH_SIZE:
                        batch_insert(conn, batch)
                        batch = []

                # Optional: delete processed file
                logger.info(f"\n📄 Deleting file: {key}")
                s3.delete_object(Bucket=platform + LOG_BUCKET_SUFFIX, Key=key)

            # Final flush
            if batch:
                batch_insert(conn, batch)

            conn.close()
            print("✅ Done.")

        except botocore.exceptions.ClientError as err:
            print("❌ S3 access failed.")
            return Failed(message=str(err))

        except IntegrityError as exc:
            conn.rollback()
            logger.error(f"❌ Database integrity error: {exc}")
            return Failed(message=str(exc))

        except DatabaseError as exc:
            conn.rollback()
            logger.error(f"❌ Database error: {exc}")
            return Failed(message=str(exc))

list_recent_files(s3, platform, max_files, threshold_minute)

List up to 'max_files' files older than 'threshold_minute'.

Source code in docs/rs-client-libraries/rs_workflows/operation/quota_monitoring_flow.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def list_recent_files(s3, platform: str, max_files: int, threshold_minute: int):
    """
    List up to 'max_files' files older than 'threshold_minute'.
    """
    cutoff = datetime.now(timezone.utc) - timedelta(minutes=threshold_minute)
    count = 0

    paginator = s3.get_paginator("list_objects_v2")

    for page in paginator.paginate(Bucket=platform + LOG_BUCKET_SUFFIX, Prefix=LOG_PREFIX):
        for obj in page.get("Contents", []):
            key = obj["Key"]
            last_modified = obj["LastModified"]

            if last_modified > cutoff:
                continue

            yield key
            count += 1

            if count >= max_files:
                return

parse_log_line(line)

Parse a single OVH/S3 access log line into a list of fields.

Source code in docs/rs-client-libraries/rs_workflows/operation/quota_monitoring_flow.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def parse_log_line(line: str):
    """
    Parse a single OVH/S3 access log line into a list of fields.
    """
    match = LOG_PATTERN.match(line)
    if not match:
        return None

    fields = list(match.groups())

    # Convert timestamp
    fields[2] = datetime.strptime(fields[2], "%d/%b/%Y:%H:%M:%S %z")

    # Convert numeric fields
    numeric_indices = [9, 11, 12, 13, 14]
    for idx in numeric_indices:
        fields[idx] = None if fields[idx] == "-" else int(fields[idx])

    # Replace "-" with None
    fields = [None if f == "-" else f for f in fields]

    return fields

read_object(s3, platform, key)

Stream-read an S3 object line by line.

Source code in docs/rs-client-libraries/rs_workflows/operation/quota_monitoring_flow.py
71
72
73
74
75
76
77
78
79
80
81
82
def read_object(s3, platform, key):
    """
    Stream-read an S3 object line by line.
    """
    logger = get_run_logger()
    try:
        response = s3.get_object(Bucket=platform + LOG_BUCKET_SUFFIX, Key=key)
        for line in response["Body"].iter_lines():
            if line:
                yield line.decode("utf-8")
    except botocore.exceptions.ClientError as e:
        logger.error(f"Error reading {key}: {e}")