Skip to content

rs_common/prefect_utils.md

<< Back to index

Utility Python module for the tutorials, to be shared with the prefect or dask workers.

WARNING: AFTER EACH MODIFICATION, RESTART THE JUPYTER NOTEBOOK KERNEL !

format_env_user(block_name, any_owner_id)

Format the Prefect secret block for the current user

Source code in docs/rs-client-libraries/rs_common/prefect_utils.py
91
92
93
94
def format_env_user(block_name: str, any_owner_id: str):
    """Format the Prefect secret block for the current user"""
    name = block_name.format(any_owner_id).lower()
    return re.sub("[^a-zA-Z0-9]", "-", name)  # replace special characters by dash

get_ip_address()

Return IP address, see: https://stackoverflow.com/a/166520

Source code in docs/rs-client-libraries/rs_common/prefect_utils.py
86
87
88
def get_ip_address() -> str:
    """Return IP address, see: https://stackoverflow.com/a/166520"""
    return socket.gethostbyname(socket.gethostname())

get_s3_bucket(s3_path)

Return a prefect S3 bucket object and S3 "object name" (= S3 path without s3://bucket-name) from the given S3 path. We use the prefect higher-level functions instead of those from boto3.

Source code in docs/rs-client-libraries/rs_common/prefect_utils.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def get_s3_bucket(s3_path: str) -> tuple[S3Bucket, str]:
    """
    Return a prefect S3 bucket object and S3 "object name" (= S3 path without s3://bucket-name)
    from the given S3 path.
    We use the prefect higher-level functions instead of those from boto3.
    """

    # Remove the s3:// prefix and split by /
    split = s3_path.removeprefix("s3").removeprefix("S3").strip(":/").split("/")

    # Filter empty elements (if we had double //)
    split = list(filter(None, split))

    if not split:
        raise ValueError(f"Invalid S3 path: {s3_path!r}")

    bucket_name = split[0]
    object_name = "/".join(split[1:])

    # Try to return an existing bucket object
    try:
        return S3_BUCKETS[bucket_name], object_name

    # Else create a new one
    except KeyError:
        aws_credentials = AwsCredentials(
            aws_access_key_id=os.environ["S3_ACCESSKEY"],
            aws_secret_access_key=os.environ["S3_SECRETKEY"],
            region_name=os.environ["S3_REGION"],
            aws_client_parameters={"endpoint_url": os.environ["S3_ENDPOINT"]},
        )
        s3_bucket = S3Bucket(
            bucket_name=bucket_name,
            credentials=aws_credentials,
            bucket_folder="",  # no prefixed folder
        )
        S3_BUCKETS[bucket_name] = s3_bucket
        return s3_bucket, object_name

get_share_bucket(sub_folder='') async

Get the prefect share bucket folder.

Parameters:

Name Type Description Default
sub_folder str

subfolder to use in the bucket. If empty, use the default folder and secret block.

''
Else use a specific secret block. WARNING

this only works in a single-threaded environment

required

Returns:

Type Description
tuple[S3Bucket, str]

The prefect block for the share bucket and the block name.

Source code in docs/rs-client-libraries/rs_common/prefect_utils.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
@sync_compatible
async def get_share_bucket(sub_folder: str = "") -> tuple[S3Bucket, str]:
    """
    Get the prefect share bucket folder.

    Args:
        sub_folder: subfolder to use in the bucket. If empty, use the default folder and secret block.
        Else use a specific secret block. WARNING: this only works in a single-threaded environment
        else these specific secret blocks may overwrite each other.

    Returns:
        The prefect block for the share bucket and the block name.
    """

    # Use the default secret block for the default folder
    if not sub_folder:
        block_name = BLOCK_NAME_SHARE_BUCKET_GLOBAL

        # Try to read and return the prefect block, if it already exists.
        try:
            return await S3Bucket.load(block_name), block_name

        # If it doesn't exist yet, create and return it
        except ValueError:
            pass

    # Use a specific block for the current user. Always overwrite it, because the subfolder may have changed.
    else:
        block_name = format_env_user(BLOCK_NAME_SHARE_BUCKET_USER, owner_id)  # type: ignore

    # Read the prefect blocks that contain the S3 authentication
    bucket_name = os.environ["PREFECT_BUCKET_NAME"]
    bucket_folder = os.environ["PREFECT_BUCKET_FOLDER"]

    if sub_folder:
        bucket_folder = os.path.join(bucket_folder, sub_folder)

    # Get a s3 bucket object from its name
    generic_bucket, _ = get_s3_bucket(bucket_name)

    # Create a new object with the same credentials and a prefixed folder
    share_bucket = S3Bucket(
        bucket_name=bucket_name,
        bucket_folder=bucket_folder,
        credentials=generic_bucket.credentials,
    )

    # Save it as a prefect block and return it
    await share_bucket.save(block_name, overwrite=True)
    return share_bucket, block_name

hack_for_jupyter(func, *args, **kwargs)

From Jupyter we need this hack to deploy prefect flows

Source code in docs/rs-client-libraries/rs_common/prefect_utils.py
218
219
220
221
def hack_for_jupyter(func: Callable, *args, **kwargs) -> asyncio.Task:
    """From Jupyter we need this hack to deploy prefect flows"""
    coroutine = run_in_threadpool(func, *args, **kwargs)
    return asyncio.create_task(coroutine)

init_global_env(new_owner_id=None)

Init the global variables above from the env vars. Needs to be called from the client, then called again from the prefect workers.

Parameters:

Name Type Description Default
new_owner_id str | None

new ower/user id, if known.

None
Source code in docs/rs-client-libraries/rs_common/prefect_utils.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def init_global_env(new_owner_id: str | None = None):
    """
    Init the global variables above from the env vars.
    Needs to be called from the client, then called again from the prefect workers.

    Args:
        new_owner_id: new ower/user id, if known.
    """
    global local_mode, cluster_mode, owner_id

    local_mode = env_bool("RSPY_LOCAL_MODE", default=False)
    cluster_mode = not local_mode

    # Override the owner id or read it from the env vars
    if new_owner_id:
        owner_id = new_owner_id
    else:
        owner_id = os.getenv("JUPYTERHUB_USER") if cluster_mode else os.getenv("RSPY_HOST_USER")

init_prefect_blocks() async

Init prefect blocks from the client environment (= from jupyter)

Source code in docs/rs-client-libraries/rs_common/prefect_utils.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@sync_compatible
async def init_prefect_blocks():
    """Init prefect blocks from the client environment (= from jupyter)"""

    #
    # Env vars for all users

    # In local mode, create the block that contains the environment variables for all users.
    # In cluster mode, this block has already been created by an admin.
    # All fields are mandatory.
    if local_mode:
        await Secret(
            value={  # type: ignore[arg-type]
                "RSPY_LOCAL_MODE": "1",
                "PREFECT_BUCKET_NAME": os.environ["PREFECT_BUCKET_NAME"],
                "PREFECT_BUCKET_FOLDER": os.getenv("PREFECT_BUCKET_FOLDER", "prefect-share"),
                "S3_ACCESSKEY": os.environ["S3_ACCESSKEY"],
                "S3_SECRETKEY": os.environ["S3_SECRETKEY"],
                "S3_REGION": os.environ["S3_REGION"],
                "S3_ENDPOINT": os.environ["S3_ENDPOINT"],
                "LOCAL_DASK_USERNAME": os.environ["LOCAL_DASK_USERNAME"],
                "LOCAL_DASK_PASSWORD": os.environ["LOCAL_DASK_PASSWORD"],
                "POSTGRES_USER": os.environ["POSTGRES_USER"],
                "POSTGRES_PASSWORD": os.environ["POSTGRES_PASSWORD"],
                "POSTGRES_PORT": os.environ["POSTGRES_PORT"],
                "POSTGRES_PI_DB": os.environ["POSTGRES_PI_DB"],
                "POSTGRES_HOST": os.environ["POSTGRES_HOST"],
            },
        ).save(BLOCK_NAME_ENV_GLOBAL, overwrite=True)

    #
    # Env vars for current user/owner_id

    # In cluster mode, read the API key
    if cluster_mode:
        await read_apikey()

    # Get all env var names that start with DASK_GATEWAY_
    regex = re.compile("^DASK_GATEWAY_.*")
    dask_gateway_vars = [v for v in os.environ if regex.match(v)]

    # Read env vars that are available from the client env in both local and cluster mode.
    # They are optional.
    env_vars = {}
    for key in (
        "AWS_REQUEST_CHECKSUM_CALCULATION",
        "AWS_RESPONSE_CHECKSUM_VALIDATION",
        "OTEL_PYTHON_REQUESTS_TRACE_HEADERS",
        "OTEL_PYTHON_REQUESTS_TRACE_BODY",
        "RSPY_APIKEY",
        "RSPY_OAUTH2_COOKIE",
        "RSPY_UAC_CHECK_URL",
        "RSPY_WEBSITE",
        "TEMPO_ENDPOINT",
        *dask_gateway_vars,
    ):
        if value := os.getenv(key):
            env_vars[key] = value

    # Save env vars in a secret block for the current user
    await Secret(value=env_vars).save(format_env_user(BLOCK_NAME_ENV_USER, owner_id), overwrite=True)  # type: ignore

    # Now read back the blocks so we are sure our env vars are up-to-date
    await read_prefect_blocks(owner_id)

read_apikey(optional=False, save_to_env=True) async

Read the API key, either from the environment variable or from an interactive input form.

Parameters:

Name Type Description Default
optional bool

If False and if the env var is missing, ask it from an interactive input form.

False
save_to_env bool

If True, saves the API key to the ~/.env file.

True
Source code in docs/rs-client-libraries/rs_common/prefect_utils.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@sync_compatible
async def read_apikey(optional: bool = False, save_to_env: bool = True) -> None:
    """
    Read the API key, either from the environment variable or from an interactive input form.

    Args:
        optional (bool): If False and if the env var is missing, ask it from an interactive input form.
        save_to_env (bool): If True, saves the API key to the ~/.env file.
    """
    # No API key in local mode
    if local_mode:
        return

    # If the API is saved as an env var in the ~/.env file, then it has already
    # been read automatically by rs-infra-core/.github/jupyter/resources/00-read-env.py
    apikey = os.getenv("RSPY_APIKEY")
    if apikey:
        logger.debug(f"Use API key (probably from '~/.env'): '{apikey[:4]}***'")
    elif optional:
        logger.debug("Don't use any API key")
    else:
        # Else read it from user input
        apikey = getpass.getpass("Enter your API key:")

        # Save the env var
        os.environ["RSPY_APIKEY"] = apikey

        # Append it to the ~/.env file, if requested.
        # Don't overwrite the full ~/.env file because it can contain other user info.
        if save_to_env:
            with open(os.path.expanduser("~/.env"), "a", encoding="utf-8") as env_file:
                env_file.write(f"\nRSPY_APIKEY={apikey}\n")
                logger.debug("API key saved to '~/.env'")

read_prefect_blocks(any_owner_id=None) async

Read prefect blocks from the prefect flow and tasks into env vars and global vars.

Parameters:

Name Type Description Default
owner_id

Read prefect blocks for a specific user.

required
Source code in docs/rs-client-libraries/rs_common/prefect_utils.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
@sync_compatible
async def read_prefect_blocks(any_owner_id: str | None = None):
    """
    Read prefect blocks from the prefect flow and tasks into env vars and global vars.

    Args:
        owner_id: Read prefect blocks for a specific user.
    """

    # Read the env vars for all users
    os.environ.update((await Secret.load(BLOCK_NAME_ENV_GLOBAL)).get())

    # Read the env vars for the given user
    if any_owner_id:
        os.environ.update((await Secret.load(format_env_user(BLOCK_NAME_ENV_USER, any_owner_id))).get())

    # Init the env of the current module from the env vars we have just read
    init_global_env(any_owner_id)

s3_delete(s3_prefix, log=False)

Remove all files from S3 bucket with the given prefix, using low-level client.

Source code in docs/rs-client-libraries/rs_common/prefect_utils.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
def s3_delete(s3_prefix: str, log: bool = False):
    """Remove all files from S3 bucket with the given prefix, using low-level client."""
    s3_bucket, prefix = get_s3_bucket(s3_prefix)
    objects = s3_bucket._get_bucket_resource().objects  # pylint: disable=protected-access

    # objects.filter(Prefix=prefix) makes a recursive search and returns only files, not folders.
    # If we want to delete a file, call the filter and only keep the exact matches.
    try_file = [obj.key for obj in objects.filter(Prefix=prefix)]
    if prefix in try_file:
        objects_to_delete = [{"Key": prefix}]

    # If we want to delete a folder, make sure the prefix ends by /, call the filter and keep everything
    else:
        objects_to_delete = [{"Key": obj.key} for obj in objects.filter(Prefix=prefix.rstrip("/") + "/")]

    if not objects_to_delete:
        return

    s3_client = s3_bucket._get_s3_client()  # pylint: disable=protected-access
    endpoint_url = s3_bucket.credentials.aws_client_parameters.endpoint_url

    if log:
        keys = [f"s3://{s3_bucket.bucket_name}/{o.get('Key')}" for o in objects_to_delete]
        s3_bucket.logger.info(
            f"Delete from {endpoint_url!r}: {json.dumps(keys, indent=2)}",  # nosec hardcoded_sql_expressions
        )

    # Split the list of objects to delete
    chunk_size = 1000
    for i in range(0, len(objects_to_delete), chunk_size):
        s3_client.delete_objects(
            Bucket=s3_bucket.bucket_name,
            Delete={"Objects": objects_to_delete[i : i + chunk_size], "Quiet": True},  # noqa: E203
        )

s3_download_dir(s3_path, local_path=None) async

See: S3Bucket.get_directory

Source code in docs/rs-client-libraries/rs_common/prefect_utils.py
393
394
395
396
397
398
399
400
@sync_compatible
async def s3_download_dir(
    s3_path: str,
    local_path: str | None = None,
) -> None:
    """See: S3Bucket.get_directory"""
    s3_bucket, from_path = get_s3_bucket(s3_path)
    await s3_bucket.get_directory(from_path, local_path)

s3_download_file(s3_path, to_path, **download_kwargs) async

See: S3Bucket.download_object_to_path

Source code in docs/rs-client-libraries/rs_common/prefect_utils.py
382
383
384
385
386
387
388
389
390
@sync_compatible
async def s3_download_file(
    s3_path: str,
    to_path: str | Path | None,
    **download_kwargs: dict[str, Any],
) -> Path:
    """See: S3Bucket.download_object_to_path"""
    s3_bucket, from_path = get_s3_bucket(s3_path)
    return await s3_bucket.download_object_to_path(from_path, to_path, **download_kwargs)

s3_upload_dir(from_folder, s3_path, **upload_kwargs) async

See: S3Bucket.upload_from_folder

Uploads files within a folder (excluding the folder itself) to the object storage service folder.

Source code in docs/rs-client-libraries/rs_common/prefect_utils.py
367
368
369
370
371
372
373
374
375
376
377
378
379
@sync_compatible
async def s3_upload_dir(
    from_folder: str | Path,
    s3_path: str,
    **upload_kwargs: dict[str, Any],
) -> str | None:
    """
    See: S3Bucket.upload_from_folder

    Uploads files *within* a folder (excluding the folder itself) to the object storage service folder.
    """
    s3_bucket, to_path = get_s3_bucket(s3_path)
    return await s3_bucket.upload_from_folder(from_folder, to_path, **upload_kwargs)

s3_upload_empty_file(s3_path, **upload_kwargs) async

Upload an empty temp file to the S3 bucket.

Source code in docs/rs-client-libraries/rs_common/prefect_utils.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
@sync_compatible
async def s3_upload_empty_file(
    s3_path: str,
    **upload_kwargs: dict[str, Any],
) -> str:
    """Upload an empty temp file to the S3 bucket."""

    # Create a tmp file
    with tempfile.NamedTemporaryFile() as tmp:

        # Add contents to the file or boto3 has a strange behavior after uploading an empty file
        tmp.write(b"empty")
        tmp.flush()

        # Upload the file
        return await s3_upload_file(tmp.name, s3_path, **upload_kwargs)

s3_upload_file(from_path, s3_path, **upload_kwargs) async

See: S3Bucket.upload_from_path

Source code in docs/rs-client-libraries/rs_common/prefect_utils.py
338
339
340
341
342
343
344
345
346
@sync_compatible
async def s3_upload_file(
    from_path: str | Path,
    s3_path: str,
    **upload_kwargs: dict[str, Any],
) -> str:
    """See: S3Bucket.upload_from_path"""
    s3_bucket, to_path = get_s3_bucket(s3_path)
    return await s3_bucket.upload_from_path(from_path, to_path, **upload_kwargs)

wait_for_deployment(name, wait=1, max_retry=30) async

Wait for prefect deployment to be finished.

Source code in docs/rs-client-libraries/rs_common/prefect_utils.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
async def wait_for_deployment(name: str, wait: int | float = 1, max_retry: int = 30):
    """Wait for prefect deployment to be finished."""
    # Taken from prefect/cli/deployment.py::inspect
    retry = 0
    async with get_client() as client:
        while True:
            try:
                await client.read_deployment_by_name(name)
                logger.info(f"Finished deploying prefect flow: {name!r}")
                return
            except ObjectNotFound:
                retry += 1
                if retry >= max_retry:
                    raise
                logger.info(f"Wait for deployment of prefect flow: {name!r} ...")
                await asyncio.sleep(wait)