Dask Gateway
Prerequisite
Have a token generated from https://processing.{{ platform_domain_name }}/jupyter/hub/token and also set in the rs-server-staging configuration.
Connect to the Gateway
# Set up the environement and connect to the dask-gateway
os.environ["JUPYTERHUB_API_TOKEN"] = "<TOKEN_GENERATED_FROM_PREVIOUS_STEP>"
from dask_gateway import Gateway
gateway = Gateway(
address="http://traefik-dask-gateway.dask-gateway.svc.cluster.local",
auth="jupyterhub"
)
Get the list of clusters
gateway.list_clusters()
Start cluster
Create single Dask cluster (without option) (Option 1)
cluster = gateway.new_cluster()
print (cluster.name)
Create a basic Dask cluster with options (Option 2)
# List of options available
options = gateway.cluster_options()
for key in options.keys():
print(f"{key}: {options[key]}")
# NOTE: use one of the following images:
# - ghcr.io/rs-python/rs-infra-core-dask-staging
# - ghcr.io/rs-python/rs-infra-core-dask-l0
# - ghcr.io/rs-python/rs-infra-core-dask-s1ard
# - ...
cluster = gateway.new_cluster(
worker_cores=1,
worker_memory=2.0,
namespace='dask-gateway',
image='ghcr.io/rs-python/dask/l0:latest'
)
print (cluster.name)
gateway.scale_cluster(cluster.name, 3)
Create a Dask cluster for staging with options (Option 3)
For the staging, we must configure the cluster_name field and the node affinity for the Dask scheduler and workers. We can also tune the CPU/RAM usage and limits :
# List of options available
options = gateway.cluster_options()
for key in options.keys():
print(f"{key}: {options[key]}")
cluster = gateway.new_cluster(
namespace='dask-gateway',
image='ghcr.io/rs-python/dask/staging:latest',
cluster_name='dask-staging',
scheduler_extra_pod_labels={'cluster_name': 'dask-staging'},
worker_cores=1,
worker_memory=2.0, # In GB
cluster_max_workers=3,
cluster_max_cores=3,
cluster_max_memory=9663676416, # In Bytes
worker_extra_pod_config={
"affinity": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "node-role.kubernetes.io/access_csc",
"operator": "Exists"
}
]
}
]
}
}
},
"tolerations": [
{
"key": "role",
"operator": "Equal",
"value": "access_csc",
"effect": "NoSchedule"
}
]
},
scheduler_extra_pod_config={
"affinity": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "node-role.kubernetes.io/access_csc",
"operator": "Exists"
}
]
}
]
}
}
},
"tolerations": [
{
"key": "role",
"operator": "Equal",
"value": "access_csc",
"effect": "NoSchedule"
}
]
}
)
print (cluster.name)
gateway.scale_cluster(cluster.name, 2)
The calculations for cluster_max_workers, cluster_max_cores, cluster_max_memory are :
cluster_max_workers: $scale\_value + 1$cluster_max_cores: $(scale\_value + 1) * worker\_cores$cluster_max_memory: $(scale\_value + 1) * worker\_memory * 2^{30}$
Where $scale\_value$ = maximum desired worker, 2 in the prevous example.
Create a Dask cluster for DPR (option 4)
For the DPR processing, we must configure the cluster_name field and the node affinity for the Dask scheduler and workers. We must also tune the CPU/RAM usage and limits :
# List of options available
options = gateway.cluster_options()
for key in options.keys():
print(f"{key}: {options[key]}")
# NOTE: use one of the following images and cluster names:
# - ghcr.io/rs-python/rs-infra-core-dask-l0 cluster_name:dask-l0
# - ghcr.io/rs-python/rs-infra-core-dask-s1ard cluster_name:dask-s1ard
# - ...
cluster = gateway.new_cluster(
namespace='dask-gateway',
image='ghcr.io/rs-python/rs-infra-core-dask-l0:latest',
cluster_name='dask-l0',
scheduler_extra_pod_labels={'cluster_name': 'dask-l0'},
worker_cores=1,
worker_memory=2.0, # In GB
cluster_max_workers=3,
cluster_max_cores=3,
cluster_max_memory=9663676416, # In Bytes
scheduler_memory_limit=60, # In GB
worker_extra_pod_config={
"affinity": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "node-role.kubernetes.io/dask_worker_on_demand",
"operator": "Exists"
}
]
}
]
}
}
},
"tolerations": [
{
"key": "role",
"operator": "Equal",
"value": "dask_worker_on_demand",
"effect": "NoSchedule"
}
]
},
scheduler_extra_pod_config={
"affinity": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "node-role.kubernetes.io/dask_scheduler",
"operator": "Exists"
}
]
}
]
}
}
},
"tolerations": [
{
"key": "role",
"operator": "Equal",
"value": "dask_scheduler",
"effect": "NoSchedule"
}
]
}
)
print (cluster.name)
gateway.scale_cluster(cluster.name, 2)
The calculations for cluster_max_workers, cluster_max_cores, cluster_max_memory are :
cluster_max_workers: $scale\_value + 1$cluster_max_cores: $(scale\_value + 1) * worker\_cores$cluster_max_memory: $(scale\_value + 1) * worker\_memory * 2^{30}$
Where $scale\_value$ = maximum desired worker, 2 in the prevous example.
Shutdown all the dask clusters
clusters = gateway.list_clusters()
# Shutting down all clusters
for cluster_info in clusters:
try:
cluster = gateway.connect(cluster_info.name)
cluster.shutdown()
print(f"Cluster {cluster_info.name} successfully stopped.")
except Exception as e:
print(f"Error stopping cluster {cluster_info.name}: {e}")