Skip to content

rs_workflows/operation/quota_monitoring_flow.md

<< Back to index

OBS logs ingestion flow implementation

batch_insert(conn, rows)

Insert a batch of rows using execute_values for high performance.

Source code in docs/rs-client-libraries/rs_workflows/operation/quota_monitoring_flow.py
149
150
151
152
153
154
155
156
157
def batch_insert(conn, rows):
    """
    Insert a batch of rows using execute_values for high performance.
    """
    if not rows:
        return
    with conn.cursor() as cur:
        psycopg2.extras.execute_values(cur, INSERT_SQL, rows)
    conn.commit()

collect_obs_logs(platform='rspython-ops', max_files=DEFAULT_MAX_FILES, threshold_minute=DEFAULT_AGE_THRESHOLD, env=FlowEnvArgs(owner_id='operator-quota')) async

Collect and process OVH quota monitoring logs from S3 and insert them into a PostgreSQL database. This async function retrieves observability logs from an S3 bucket, parses them, and performs batch insertions into a Postgres quota database. Args: platform : platform name, will be the prefix of the bucket name max_files : number maximum of files read from the bucket threshold_minute : this threshold is set to avoid reading and deleting a log file whereas it is filled env (FlowEnvArgs, optional): Flow environment arguments containing owner_id and other configuration parameters. Returns: None Raises: psycopg2.Error: If database connection or insertion operations fail. KeyError: If required environment variables are not set. Exception: If S3 operations or log parsing encounters errors. Environment Variables Required: - POSTGRES_QUOTA_USER: PostgreSQL username for quota database - POSTGRES_QUOTA_PASSWORD: PostgreSQL password - POSTGRES_HOST: PostgreSQL host address - POSTGRES_PORT: PostgreSQL port number - S3_QUOTA_ENDPOINT: S3 endpoint URL to access the bucket with logs - S3_QUOTA_ACCESSKEY: S3 access key ID to access the bucket with logs - S3_QUOTA_SECRETKEY: S3 secret access key to access the bucket with logs - S3_QUOTA_REGION: S3 region name to access the bucket with logs

Source code in docs/rs-client-libraries/rs_workflows/operation/quota_monitoring_flow.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
@flow(timeout_seconds=600, name="collect-obs-logs")
async def collect_obs_logs(
    platform: str = "rspython-ops",
    max_files: int = DEFAULT_MAX_FILES,
    threshold_minute: int = DEFAULT_AGE_THRESHOLD,
    env: FlowEnvArgs = FlowEnvArgs(owner_id="operator-quota"),
):
    """
    Collect and process OVH quota monitoring logs from S3 and insert them into a PostgreSQL database.
    This async function retrieves observability logs from an S3 bucket, parses them, and performs
    batch insertions into a Postgres quota database.
    Args:
        platform : platform name, will be the prefix of the bucket name
        max_files : number maximum of files read from the bucket
        threshold_minute : this threshold is set to avoid reading and deleting a log file whereas it is filled
        env (FlowEnvArgs, optional): Flow environment arguments containing owner_id and other
            configuration parameters.
    Returns:
        None
    Raises:
        psycopg2.Error: If database connection or insertion operations fail.
        KeyError: If required environment variables are not set.
        Exception: If S3 operations or log parsing encounters errors.
    Environment Variables Required:
        - POSTGRES_QUOTA_USER: PostgreSQL username for quota database
        - POSTGRES_QUOTA_PASSWORD: PostgreSQL password
        - POSTGRES_HOST: PostgreSQL host address
        - POSTGRES_PORT: PostgreSQL port number
        - S3_QUOTA_ENDPOINT: S3 endpoint URL to access the bucket with logs
        - S3_QUOTA_ACCESSKEY: S3 access key ID to access the bucket with logs
        - S3_QUOTA_SECRETKEY: S3 secret access key to access the bucket with logs
        - S3_QUOTA_REGION: S3 region name to access the bucket with logs
    """

    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "obs-quota-monitoring"):

        bucket_name = platform + LOG_BUCKET_SUFFIX
        logger.info("Retrieve credentials to access Postgres quota database")
        db_user = os.environ["POSTGRES_QUOTA_USER"]
        db_password = os.environ["POSTGRES_QUOTA_PASSWORD"]
        db_host = os.environ["POSTGRES_HOST"]
        db_port = os.environ["POSTGRES_PORT"]

        logger.info(f"Retrieve credentials to access bucket {bucket_name}.")
        s3 = boto3.client(
            "s3",
            endpoint_url=os.environ["S3_QUOTA_ENDPOINT"],
            aws_access_key_id=os.environ["S3_QUOTA_ACCESSKEY"],
            aws_secret_access_key=os.environ["S3_QUOTA_SECRETKEY"],
            config=Config(signature_version="s3v4"),
            region_name=os.environ["S3_QUOTA_REGION"],
        )
        conn = psycopg2.connect(dbname=DB_NAME, user=db_user, password=db_password, host=db_host, port=db_port)

        batch = []
        logger.info(
            f"⏳ Processing Object Storage logs with batch insert from bucket {bucket_name}.",
        )
        try:
            for key in list_recent_files(s3, platform, max_files, threshold_minute):
                logger.info(f"\n📄 Reading file: {key}")

                for line in read_object(s3, platform, key):
                    parsed = parse_log_line(line)
                    if parsed:
                        batch.append(parsed)

                    # Flush batch when full
                    if len(batch) >= BATCH_SIZE:
                        batch_insert(conn, batch)
                        batch = []

                # Optional: delete processed file
                logger.info(f"\n📄 Deleting file: {key}")
                s3.delete_object(Bucket=platform + LOG_BUCKET_SUFFIX, Key=key)

            # Final flush
            if batch:
                batch_insert(conn, batch)

            conn.close()
            print("✅ Done.")

        except botocore.exceptions.ClientError as err:
            print("❌ S3 access failed.")
            raise RuntimeError(f"S3 access failed: {err} !") from err

        except IntegrityError as exc:
            conn.rollback()
            logger.error(f"❌ Database integrity error: {exc}")
            raise RuntimeError(f"Database integrity error: {exc} !") from exc

        except DatabaseError as exc:
            conn.rollback()
            logger.error(f"❌ Database error: {exc}")
            raise RuntimeError(f"Database error: {exc} !") from exc

consolidate_obs_logs(retention_hours=2, env=FlowEnvArgs(owner_id='operator-quota')) async

Consolidate old OBS access logs into s3_log_consolidate, then purge them from s3_access_log.

Parameters:

Name Type Description Default
retention_hours int

Number of hours to retain in the raw s3_access_log table.

2
env FlowEnvArgs

Flow environment arguments containing owner_id and other configuration parameters.

FlowEnvArgs(owner_id='operator-quota')

Raises:

Type Description
Error

If database connection or SQL operations fail.

Source code in docs/rs-client-libraries/rs_workflows/operation/quota_monitoring_flow.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
@flow(name="consolidate-obs-logs")
async def consolidate_obs_logs(
    retention_hours: int = 2,
    env: FlowEnvArgs = FlowEnvArgs(owner_id="operator-quota"),
):
    """
    Consolidate old OBS access logs into s3_log_consolidate, then purge them
    from s3_access_log.

    Args:
        retention_hours: Number of hours to retain in the raw s3_access_log table.
        env (FlowEnvArgs, optional): Flow environment arguments containing owner_id
            and other configuration parameters.

    Raises:
        psycopg2.Error: If database connection or SQL operations fail.
    """

    logger = get_run_logger()

    # Init flow environment and opentelemetry span
    flow_env = FlowEnv(env)
    with flow_env.start_span(__name__, "clean-quota-database"):
        logger.info("Retrieve credentials to access Postgres quota database")
        db_user = os.environ["POSTGRES_QUOTA_USER"]
        db_password = os.environ["POSTGRES_QUOTA_PASSWORD"]
        db_host = os.environ["POSTGRES_HOST"]
        db_port = os.environ["POSTGRES_PORT"]

        conn = psycopg2.connect(
            dbname=DB_NAME,
            user=db_user,
            password=db_password,
            host=db_host,
            port=db_port,
        )

        with conn:
            with conn.cursor() as cur:
                logger.info("⏳ Consolidating and cleaning OBS logs older than %s hours", retention_hours)

                cur.execute(
                    """
                    WITH params AS (
                        SELECT (now() - INTERVAL %s) AS max_time
                    ),
                    ins AS (
                        INSERT INTO s3_log_consolidate (
                            bucket,
                            time,
                            requester,
                            operation,
                            bytes_sent,
                            object_size,
                            total_time_ms,
                            turnaround_time_ms
                        )
                        SELECT
                            bucket,
                            date_trunc('hour', time) AS date_truncated,
                            requester,
                            operation,
                            SUM(bytes_sent)  AS bytes_sent,
                            SUM(object_size) AS object_size,
                            SUM(total_time_ms) AS total_time_ms,
                            SUM(turnaround_time_ms) AS turnaround_time_ms
                        FROM s3_access_log, params
                        WHERE time < params.max_time
                          AND http_status = 200
                          AND operation NOT IN (
                                'REST.GET.ACL',
                                'REST.HEAD.BUCKET',
                                'REST.GET.TAGGING',
                                'REST.GET.BUCKETVERSIONS'
                          )
                        GROUP BY 1, 2, 3, 4
                        RETURNING 1
                    )
                    DELETE FROM s3_access_log
                    WHERE time < (SELECT max_time FROM params);
                    """,
                    (f"{retention_hours} hours",),
                )

        conn.close()
        logger.info("✔ Consolidation and cleanup completed.")

list_recent_files(s3, platform, max_files, threshold_minute)

List up to 'max_files' files older than 'threshold_minute'.

Source code in docs/rs-client-libraries/rs_workflows/operation/quota_monitoring_flow.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def list_recent_files(s3, platform: str, max_files: int, threshold_minute: int):
    """
    List up to 'max_files' files older than 'threshold_minute'.
    """
    cutoff = datetime.now(timezone.utc) - timedelta(minutes=threshold_minute)
    count = 0

    paginator = s3.get_paginator("list_objects_v2")

    for page in paginator.paginate(Bucket=platform + LOG_BUCKET_SUFFIX, Prefix=LOG_PREFIX):
        for obj in page.get("Contents", []):
            key = obj["Key"]
            last_modified = obj["LastModified"]

            if last_modified > cutoff:
                continue

            yield key
            count += 1

            if count >= max_files:
                return

parse_log_line(line)

Parse a single OVH/S3 access log line into a list of fields.

Source code in docs/rs-client-libraries/rs_workflows/operation/quota_monitoring_flow.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def parse_log_line(line: str):
    """
    Parse a single OVH/S3 access log line into a list of fields.
    """
    match = LOG_PATTERN.match(line)
    if not match:
        return None

    fields = list(match.groups())

    # Convert timestamp
    fields[2] = datetime.strptime(fields[2], "%d/%b/%Y:%H:%M:%S %z")

    # Convert numeric fields
    numeric_indices = [9, 11, 12, 13, 14]
    for idx in numeric_indices:
        fields[idx] = None if fields[idx] == "-" else int(fields[idx])

    # Replace "-" with None
    fields = [None if f == "-" else f for f in fields]

    return fields

read_object(s3, platform, key)

Stream-read an S3 object line by line.

Source code in docs/rs-client-libraries/rs_workflows/operation/quota_monitoring_flow.py
70
71
72
73
74
75
76
77
78
79
80
81
def read_object(s3, platform, key):
    """
    Stream-read an S3 object line by line.
    """
    logger = get_run_logger()
    try:
        response = s3.get_object(Bucket=platform + LOG_BUCKET_SUFFIX, Key=key)
        for line in response["Body"].iter_lines():
            if line:
                yield line.decode("utf-8")
    except botocore.exceptions.ClientError as e:
        logger.error(f"Error reading {key}: {e}")