Dask Gateway
Prerequisite
Have a token generated from https://processing.{{ platform_domain_name }}/jupyter/hub/token and also set in the rs-server-staging configuration.
Connect to the Gateway
# Set up the environement and connect to the dask-gateway
os.environ["JUPYTERHUB_API_TOKEN"] = "<TOKEN_GENERATED_FROM_PREVIOUS_STEP>"
from dask_gateway import Gateway
gateway = Gateway(
address="http://traefik-dask-gateway.dask-gateway.svc.cluster.local",
auth="jupyterhub"
)
Get the list of clusters
gateway.list_clusters()
Start cluster
Create single cluster (without option) (Option 1)
cluster = gateway.new_cluster()
print (cluster.name)
Create a basic cluster with options (Option 2)
# List of options available
options = gateway.cluster_options()
for key in options.keys():
print(f"{key}: {options[key]}")
cluster = gateway.new_cluster(worker_cores=1, worker_memory=2.0, namespace='dask-gateway', image='ghcr.io/rs-python/rs-infra-core-dask-eopf:latest')
print (cluster.name)
gateway.scale_cluster(cluster.name, 3)
Create a staging cluster with options (Option 3)
# List of options available
options = gateway.cluster_options()
for key in options.keys():
print(f"{key}: {options[key]}")
cluster = gateway.new_cluster(worker_cores=1, worker_memory=2.0, namespace='dask-gateway', image='ghcr.io/rs-python/rs-infra-core-dask-staging:latest', cluster_name='dask-staging', scheduler_extra_pod_labels={'cluster_name': 'dask-staging'})
print (cluster.name)
gateway.scale_cluster(cluster.name, 3)
Shutdown all the dask clusters
clusters = gateway.list_clusters()
# Shutting down all clusters
for cluster_info in clusters:
try:
cluster = gateway.connect(cluster_info.name)
cluster.shutdown()
print(f"Cluster {cluster_info.name} successfully stopped.")
except Exception as e:
print(f"Error stopping cluster {cluster_info.name}: {e}")