Updated on 2021/07/23
Deep Graph Library (DGL) is a Python package built for easy implementation of graph neural network model family, on top of existing DL frameworks (currently supporting PyTorch, MXNet and TensorFlow). It offers a versatile control of message passing, speed optimization via auto-batching and highly tuned sparse matrix kernels, and multi-GPU/CPU training to scale to graphs of hundreds of millions of nodes and edges. —— DGL team @ dgl.ai
More recently, there are a lot of new deep learning researches based on graph-structured data. And DGL has become one of the best GNN frameworks that researcher used to fulfill their works. The researchers leverage DGL to solve some conventional graph related problems, like social networks, chemical molecules and recommender systems. The DGL eco-system also has many high-leveled packages, which are developed for domain applications, such as DGL-KE, DGL-LifeSci and DGL-RecSys. In Qihoo 360, some ML/DL teams would like to do experiments with DGL and DGL-KE framework, so we just kicked off the project on distributed DGL training.
Native DGL distributed training is based on data parallelism, and we (360 AI Infra team) applied this fully distributed approach on operator/controller philosophy for Kubernetes. This proposal is aimed at defining what the DGL Operator should behave, and add it to Kubeflow.
- Partition: A phase in DGL workflow, used to do partitioning on a large graph and then generate sub-graphs. DGL has 2 approaches to partition, one is single-thread (DGL built-in API), the other one is multi-process (ParMETIS, not in DGL). In the K8s & Operator context, DGL Operator wants to apply these and support 3 partition modes
- Single-Pod, single-thread partitioning, will call SSP or
DGL-API
later - Single-Pod, multi-process partitioning (native ParMETIS version), will call SMP or
ParMETIS
later - Multi-Pod, single-thread partitioning (may need some adoptions in ParMETIS), will call MSP or
DistParMETIS
later
- Single-Pod, single-thread partitioning, will call SSP or
- Working Pod definitions:
- Launcher: A very light weight Pod traces the whole DGL workflow on the Pod level.
- Partitioner: A Pod does the graph partitioning.
- Worker: One of the Pods does the DGL training.
dglrun
: A DGL workflow script file, used to take the control of graph partitioning, distributed training and some utils on the container level.watcher-loop
: A variant ofkubectl-delivery
, which can track not only the readiness of all workloads and also the completion of all workloads. Moreover,kubectl
part is removed.kubectl-download
: A tiny image that only doeskubectl
part ofkubectl-delivery
, downloading thekubectl
binary file.ipconfig
: A text file lists the worker pods' name, ip, port (used in training phase of dgl server) and the available slots (GPUs) in each pod.kubexec.sh
: A helper shell script that can invokekubectl exec
for remote execution
A Kubeflow user should be able to run training using DGL as easily as then can using Tensorflow or PyTorch. This proposal is centered around a Kubernetes operator for DGL. A user should be able to run single worker job with DGL (1 launcher, 1 partitioner and 1 worker), distributed training jobs with DGL and DGL-API
partitioning mode (1 launcher, 1 partitioner and multiple workers), distributed training jobs with DGL and ParMETIS
partitioning mode (1 launcher, 1 partitioner and multiple workers), and distributed training jobs with DGL and DistParMETIS
partitioning mode (1 launcher and multiple workers).
This proposal defines the following:
- A DGL operator
- A way to deploy the DGL Operator
- A single worker DGL example
- A distributed DGL example with
DGL-API
partitioning mode - A distributed DGL example with
ParMETIS
partitioning mode - A distributed DGL example with
DistParMETIS
partitioning mode
Currently, for the scope of this proposal, we won't be addressing the method for serving the model.
- A requirement on user's container image is that it includes DGL, related libraries, and our workflow scripts (
dglrun
and utils). - SSH is not needed (or used).
The CRD yaml example is as following:
apiVersion: qihoo.net/v1alpha1
kind: DGLJob
metadata:
name: dgl-graphsage
namespace: dgl-operator
spec:
cleanPodPolicy: Running
partitionMode: DGL-API
dglReplicaSpecs:
Launcher:
replicas: 1
template:
spec:
containers:
- image: dgloperator/graphsage:v0.1.0
name: dgl-graphsage
command:
- dglrun
args:
- --graph-name
- graphsage
# partition arguments
- --partition-entry-point
- code/load_and_partition_graph.py
- --num-partitions
- "2"
# training arguments
- --train-entry-point
- code/train_dist.py
- --num-epochs
- "1"
- --batch-size
- "1000"
Worker:
replicas: 2
template:
spec:
containers:
- image: dgloperator/graphsage:v0.1.0
name: dgl-graphsage
-
The
cleanPodPolicy
can be optionally configured asRunning
/None
/All
, indicating whether to delete the pod when the task is terminated. -
The content of
Launcher
andWorker
are followed thePodTemplateSpec
. Users can be free to add more native key-values according to the spec. -
The
partitionMode
can be configured asDGL-API
/ParMETIS
/DistParMETIS
, indicating how to partition the graph.DGL-API
is default, because of the better precision.DGL-API
uses DGL APIdgl.distributed.partition_graph
ParMETIS
uses DGL recommended work-around solution ParMETIS.DistParMETIS
uses multi-Pod version ofParMETIS
, still in research.
The resulting launcher resembles ones in MPI Operator very much. Like, kubectl-download
downloads the kubectl
binary file and watcher-loop
makes sure all the working pods are ready or finished.
kind: Pod
apiVersion: v1
metadata:
name: dgl-graphsage-launcher
spec:
volumes:
- name: kube-volume
emptyDir: {}
- name: dataset-volume
emptyDir: {}
- name: config-volume
configMap:
name: dgl-graphsage-config
items:
- key: kubexec.sh
path: kubexec.sh
mode: 365
- key: hostfile
path: hostfile
mode: 292
- key: partfile
path: partfile
mode: 292
initContainers:
- name: kubectl-download
image: 'dgloperator/kubectl-download:v0.1.0'
volumeMounts:
- name: kube-volume
mountPath: /opt/kube
imagePullPolicy: Always
- name: watcher-loop-partitioner
image: 'dgloperator/watcher-loop:v0.1.0'
env:
- name: WATCHERFILE
value: /etc/dgl/partfile
- name: WATCHERMODE
value: finished
volumeMounts:
- name: config-volume
mountPath: /etc/dgl
- name: dataset-volume
mountPath: /dgl_workspace/dataset
imagePullPolicy: Always
- name: watcher-loop-worker
image: 'dgloperator/watcher-loop:v0.1.0'
env:
- name: WATCHERFILE
value: /etc/dgl/hostfile
- name: WATCHERMODE
value: ready
volumeMounts:
- name: config-volume
mountPath: /etc/dgl
imagePullPolicy: Always
containers:
- name: dgl-graphsage
image: 'dgloperator/graphsage:v0.1.0'
command:
- dglrun
args:
- '--graph-name'
- graphsage
- '--partition-entry-point'
- code/load_and_partition_graph.py
- '--num-partitions'
- '2'
- '--balance-train'
- '--balance-edges'
- '--train-entry-point'
- code/train_dist.py
- '--num-epochs'
- '1'
- '--batch-size'
- '1000'
- '--num-trainers'
- '1'
- '--num-samplers'
- '4'
- '--num-servers'
- '1'
volumeMounts:
- name: kube-volume
mountPath: /opt/kube
- name: config-volume
mountPath: /etc/dgl
- name: dataset-volume
mountPath: /dgl_workspace/dataset
imagePullPolicy: Always
restartPolicy: Never
The resulting partitioner is auto-generated by DGL Operator, and users cannot modify this.
kind: Pod
apiVersion: v1
metadata:
name: dgl-graphsage-partitioner
spec:
volumes:
- name: config-volume
configMap:
name: dgl-graphsage-config
items:
- key: kubexec.sh
path: kubexec.sh
mode: 365
- key: hostfile
path: hostfile
mode: 292
- key: partfile
path: partfile
mode: 292
- key: leadfile
path: leadfile
mode: 292
- name: kube-volume
emptyDir: {}
initContainers:
- name: kubectl-download
image: 'dgloperator/kubectl-download:v0.1.0'
volumeMounts:
- name: kube-volume
mountPath: /opt/kube
imagePullPolicy: Always
containers:
- name: dgl-graphsage
image: 'dgloperator/graphsage:v0.1.0'
env:
- name: DGL_OPERATOR_PHASE_ENV
value: Partitioner
volumeMounts:
- name: config-volume
mountPath: /etc/dgl
- name: kube-volume
mountPath: /opt/kube
imagePullPolicy: Always
restartPolicy: Never
kind: Pod
apiVersion: v1
metadata:
name: dgl-graphsage-worker-0
spec:
volumes:
- name: shm-volume
emptyDir:
medium: Memory
sizeLimit: 10G # sizeLimit will auto set as the half of limited memory amount
- name: config-volume
configMap:
name: dgl-graphsage-config
items:
- key: kubexec.sh
path: kubexec.sh
mode: 365
- key: hostfile
path: hostfile
mode: 292
- key: partfile
path: partfile
mode: 292
- key: leadfile
path: leadfile
mode: 292
containers:
- name: dgl-graphsage
image: 'dgloperator/graphsage:v0.1.0'
command:
- sleep
args:
- 365d
ports:
- name: dglserver
containerPort: 30050
protocol: TCP
volumeMounts:
- name: shm-volume
mountPath: /dev/shm
- name: config-volume
mountPath: /etc/dgl
imagePullPolicy: Always
restartPolicy: Never
This part is mainly about operator-wise workflow on the Pod level
- Create a
ConfigMap
that containskubexec.sh
andipconfig
. - Create the RBAC resources (
Role
,ServiceAccount
,RoleBinding
) to allow remote execution (pods/exec
). - Create
kubectl-download
(aninitContainer
of Partitioner Pod). It takes the responsibility to downloadkubectl
binary, and move it to anemptyDir
volume. - Create Partitioner Pod to do the
DGL-API
orParMETIS
partitioning. - Create
watcher-loop-partitioner
(aninitContainer
of Partitioner Pod). It takes the responsibility to wait for Partitioner Pod is finished, and continue. - Create Worker Pods that reaches the desired replicas. The command is set to sleep forever.
- Create
watcher-loop-worker
(aninitContainer
of Partitioner Pod). It takes the responsibility to wait for all Worker Pods are ready. - Take off main container of the Launcher Pod, and run the
dglrun
command and arguments. - When
DGLJob
finishes, it will clean all workers.
- Create a
ConfigMap
that containskubexec.sh
andipconfig
. - Create the RBAC resources (
Role
,ServiceAccount
,RoleBinding
) to allow remote execution (pods/exec
). - Create
kubectl-download
(aninitContainer
of Partitioner Pod). It takes the responsibility to downloadkubectl
binary, and move it to anemptyDir
volume. - Create Worker Pods that reaches the desired replicas. The command is set to sleep forever.
- Create
watcher-loop-worker
(aninitContainer
of Partitioner Pod). It takes the responsibility to wait for all Worker Pods are ready. - Take off main container of the Launcher Pod, and run the
dglrun
command and arguments.DistParMETIS
partitioning will be executed indglrun
scope. - When
DGLJob
finishes, it will clean all workers.
This part is mainly about training-wise workflow (dglrun), which is executed in the main container of Launcher/Partitioner Pod
Default communication backend: gloo
How does DGL implement distributed training
- data parallelism: partitioned sub-graph is like the divided data
- the allreduce way: updating dense parameters is used
DDP
in PyTorch
How do we leverage DGL to run DGL-API
or ParMETIS
partitioning and distributed training on Kubernetes
Prerequisite: the raw dataset must be included in Partitioner Pod image, like OGB.
- In the main container of Partitioner Pod,
dglrun
runs user-defined partioning code orParMETIS
commond, and saves the partitioned sub-graphs.dglrun
invokes akubectl cp
commond to copy all sub-graphs from Partitioner Pod to the Launcher Pod.
- And then Partitioner Pod will be completed.
- In the main container of Launcher Pod,
dglrun
invokes akubectl cp
commond to copy each sub-graphs from Launcher Pod to the specific Worker Pod.- After finished graph dispatching,
dglrun
invokes distributed training on every Worker Pod. Guide here.
- Finally, Launcher Pod will be completed and Worker Pods will be deleted.
How do we leverage DGL to run DistParMETIS
partitioning and training on Kubernetes
Prerequisite: the dataset must be included in Worker Pod image, like OGB.
- In the main container of Launcher Pod,
dglrun
invokes a partition commond to doDistParMETIS
partitioning on every Worker Pod.- After finished
DistParMETIS
partitioning,dglrun
invokes distributed training on every Worker Pod. Guide here.
- Finally, Launcher Pod will be completed and Worker Pods will be deleted.
Currently, for distributed partitioning, DGL uses DistParMETIS
to split big graphs up and store in parallel on each Worker Pods, which is empowered by MPI. For distributed training, DGL uses PyTorch's DDP
in dense parameters updating, and self-developed methods in other scenarios.
So in this manner, a fully-informed user, who understand any details about DGL and Operator philosophy, can hack some DGL workloads using MPI Operator. But this hack approach is not a good idea, because
- User need to revise
ipconfig
processing in DGL library to adapt MPI Operator'shostfile
. - User may need involve intrusive code in partition and train script.
However, there is a way to reuse MPI Operator as much as possible, MPI Operator may need,
- Imporve the customerization of the hostfile generating formats. DGL Operator's ipconfig format:
[ip] [dgl server port] [worker num] [host/pod name] [gpu num]
- Improve the customization of the path of config_volume
- Improve the customization of the initContainers to support
DGL-API
partitioning, and this should not involve intrusive code or YAML.
In my opinion, these customizations will not only include tons of work to do, and also lower the specificity of MPI Operator things. We should have a trade-off on this.
Appreciate you guys any feedback on this topic. In general, DGL Operator will follow the community, to provide native DGL distributed training experience. There is no need for users to change any code, transferring the workload from physical machines (VMs) to Kubernetes.