Helper task to interact with the Dask cluster.
is_dask_cluster_running(dask_cluster_label)
async
Retrieve dask cluster status.
Source code in docs/rs-client-libraries/rs_workflows/utils/dask.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76 | @task(name="Check dask cluster status")
async def is_dask_cluster_running(dask_cluster_label: str) -> bool:
"""
Retrieve dask cluster status.
"""
result = False
logger = get_run_logger()
# Connect to the dask gateway
gateway = Gateway(
address=os.environ["DASK_GATEWAY_ADDRESS"],
auth=JupyterHubAuth(api_token=os.environ["JUPYTERHUB_API_TOKEN"]),
)
# Find the cluster matching the label
clusters = gateway.list_clusters()
cluster_id = None
for cluster in clusters:
cluster_name = cluster.options.get("cluster_name")
if cluster_name == dask_cluster_label:
cluster_id = cluster
cluster_names = [c.options.get("cluster_name", "<unknown>") for c in clusters]
# Check status
if cluster_id is None:
logger.error(f"❌ '{dask_cluster_label}' is not part of deployed dask clusters {cluster_names}.")
else:
logger.info(f"✔️ '{dask_cluster_label}' is part of deployed dask clusters {cluster_names}.")
status_map = {0: "UNKNOWN", 1: "PENDING", 2: "RUNNING", 3: "STOPPING", 4: "STOPPED", 5: "FAILED"}
if cluster_id.status == 2:
result = True
else:
logger.warning(f"⚠️ Cluster status = {cluster_id.status} ({status_map.get(cluster_id.status)})")
# Save artifact
md = (
f"# Dask cluster option for {dask_cluster_label}\n\n```json\n"
+ json.dumps(cluster_id.options, indent=2)
+ "\n```"
)
await acreate_markdown_artifact(
markdown=md,
key="dask-cluster-options",
description=f"Options associated to the running dask cluster {dask_cluster_label}.",
)
logger.info(
"📈 You can monitor the execution from dask dashboard: "
f"{os.environ["DASK_GATEWAY_PUBLIC"]}/clusters/{cluster_id.name}/status",
)
return result
|