Skip to content

rs_workflows/utils/dask.md

<< Back to index

Helper task to interact with the Dask cluster.

is_dask_cluster_running(dask_cluster_label) async

Retrieve dask cluster status.

Source code in docs/rs-client-libraries/rs_workflows/utils/dask.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@task(name="Check dask cluster status")
async def is_dask_cluster_running(dask_cluster_label: str) -> bool:
    """
    Retrieve dask cluster status.
    """
    result = False
    logger = get_run_logger()

    # Connect to the dask gateway
    gateway = Gateway(
        address=os.environ["DASK_GATEWAY_ADDRESS"],
        auth=JupyterHubAuth(api_token=os.environ["JUPYTERHUB_API_TOKEN"]),
    )

    # Find the cluster matching the label
    clusters = gateway.list_clusters()
    cluster_id = None
    for cluster in clusters:
        cluster_name = cluster.options.get("cluster_name")
        if cluster_name == dask_cluster_label:
            cluster_id = cluster
    cluster_names = [c.options.get("cluster_name", "<unknown>") for c in clusters]

    # Check status
    if cluster_id is None:
        logger.error(f"❌ '{dask_cluster_label}' is not part of deployed dask clusters {cluster_names}.")
    else:
        logger.info(f"✔️ '{dask_cluster_label}' is part of deployed dask clusters {cluster_names}.")
        status_map = {0: "UNKNOWN", 1: "PENDING", 2: "RUNNING", 3: "STOPPING", 4: "STOPPED", 5: "FAILED"}
        if cluster_id.status == 2:
            result = True
        else:
            logger.warning(f"⚠️ Cluster status = {cluster_id.status} ({status_map.get(cluster_id.status)})")

        # Save artifact
        md = (
            f"# Dask cluster option for {dask_cluster_label}\n\n```json\n"
            + json.dumps(cluster_id.options, indent=2)
            + "\n```"
        )
        await acreate_markdown_artifact(
            markdown=md,
            key="dask-cluster-options",
            description=f"Options associated to the running dask cluster {dask_cluster_label}.",
        )
        logger.info(
            "📈 You can monitor the execution from dask dashboard: "
            f"{os.environ["DASK_GATEWAY_PUBLIC"]}/clusters/{cluster_id.name}/status",
        )

    return result