Ergonomics and efficiency of workflows on
HPC clusters
Ergonomie a efektivita workflow na HPC klastrech
Ing. Jakub Beránek
PhD Thesis
Supervisor: Ing. Jan Martinovič, Ph.D.
Ostrava, 2024
Abstrakt a přínos práce
Tato práce se zabývá spouštěním grafů úloh na vysoce výkonných systémech (superpočítačích), se
zaměřením na efektivní využití výpočetních zdrojů a poskytnutí ergonomických rozhraní pro návrh
a spouštění grafů úloh. Programování na základě úloh je oblíbeným způsobem pro definici vědeckých
soustav výpočtů, které jsou určeny pro spouštění na distribuovaných systémech. Nicméně spouštění
těchto úloh na superpočítačích přináší unikátní výzvy, například problémy s výkonem způsobené
značným rozsahem úloh nebo problematickou interakci úloh se systémy pro správu alokací na
superpočítačích, jako jsou například PBS (Portable Batch System) nebo Slurm. Tato práce zkoumá,
jaké jsou hlavní problémy ovlivňující spouštění úloh v této oblasti a navrhuje různé přístupy, které
by měly pomoci tyto problémy částečně či zcela vyřešit, a to jak v oblasti výkonu, tak i ergonomie
vývoje.
Tato práce poskytuje tři hlavní přínosy. Prvním z nich je prostředí pro simulaci spouštění grafů
úloh, které umožňuje jednoduché experimentování a měření různých plánovacích algoritmů. Toto
prostředí bylo použito pro provedení rozsáhlé studie kvality různých plánovačů úloh. Dále práce
analyzuje výkonnostní charakteristiku moderního nástroje pro spouštění úloh Dask, a poskytuje
alternativní implementaci Dask serveru, která výrazně zvyšuje jeho efektivitu v případech, které
vyžadují vysoký výkon. Hlavním přínosem práce je metoda pro plánování úloh a správu zdrojů,
která umožňuje jednoduc spouštění grafů úloh na heterogenních superpočítačích, které zárov
maximalizuje využití dostupných výpočetních zdrojů. Práce také poskytuje referenční implemen-
taci využívající této metody v rámci nástroje HyperQueue, který je dostupný jako software s
otevřeným zdrojovým kódem pod licencí MIT (Massachusetts Institute of Technology) na adrese
https://github.com/it4innovations/hyperqueue.
Klíčová slova
distribuované výpočty, výpočetní grafy, heterogenní zdroje, vysoce výkonné počítání
Abstract and Contributions
This thesis deals with the execution of task graphs on High-performance Computing (HPC) clusters
(supercomputers), with a focus on efficient usage of hardware resources and ergonomic interfaces
for task graph submission. Task-based programming is a popular approach for defining scientific
workflows that can be computed on distributed clusters. However, executing task graphs on su-
percomputers introduces unique challenges, such as performance issues caused by the large scale
of HPC workflows or cumbersome interactions with HPC allocation managers like PBS (Portable
Batch System) or Slurm. This work examines what are the main challenges in this area and how do
they affect task graph execution, and it proposes various approaches for alleviating these challenges,
both in terms of efficiency and developer ergonomics.
This thesis provides three main contributions. Firstly, it provides a task graph simulation en-
vironment that enables prototyping and benchmarking of various task scheduling algorithms, and
performs a comprehensive study of the performance of various task schedulers using this envi-
ronment. Secondly, it analyzes the bottlenecks and overall performance of a state-of-the-art task
runtime Dask and provides an implementation of an alternative Dask server which significantly
improves its performance in HPC use-cases. And primarily, it introduces a unified meta-scheduling
and resource management design for effortless execution of task graphs on heterogeneous HPC
clusters that facilitates efficient usage of hardware resources. It also provides a reference im-
plementation of this design within an HPC-tailored task runtime called HyperQueue, which is
available as open-source software under the MIT (Massachusetts Institute of Technology) license
at https://github.com/it4innovations/hyperqueue.
Keywords
distributed computing, task graphs, heterogeneous resources, high-performance computing
Acknowledgment
I would like to thank my supervisor, Jan Martinovič, for his advice. I would also like to thank Ada
Böhm, Vojtěch Cima and Martin Šurkovský, who have co-authored several publications with me
and supported me during my PhD studies. I am especially grateful to Ada Böhm for her mentorship
and constant readiness to provide both research and technical guidance. I would like to express
my gratitude to all the wonderful people that I met during my internship at the SPCL lab at ETH
Zurich. Furthermore, my thanks also go to Vanessa DeRhen for proofreading this thesis.
The development of Estee was supported by several projects. It has received funding from the
European High-Performance Computing Joint Undertaking (JU) under grant agreement No 955648.
This work was supported by the Ministry of Education, Youth and Sports of the Czech Republic
through the e-INFRA CZ (ID: 90140) and ACROSS (ID: MC2104) projects.
The development of HyperQueue was supported by several projects. It has received funding
from the European High-Performance Computing Joint Undertaking (JU) under grant agreement
No 956137. This work was supported by the Ministry of Education, Youth and Sports of the Czech
Republic through the e-INFRA CZ (ID: 90254) and LIGATE (ID: MC2102) projects.
Last but not least, I thank my wife Jana for her support and endless patience.
Contents
1 Introduction 12
2 Parallel and distributed computing 16
2.1 Parallel programming models . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 16
2.2 Task-based programming models . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 20
3 Task-based programming 27
3.1 Task graphs . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 27
3.2 Task execution . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 31
3.3 Task scheduling . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 34
4 State of the Art 37
4.1 Allocation manager . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 39
4.2 Cluster heterogeneity . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 46
4.3 Performance and scalability . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 47
4.4 Fault tolerance . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 50
4.5 Multi-node tasks . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 51
4.6 Deployment . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 52
4.7 Programming model . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 52
5 Task scheduling analysis 55
5.1 Task graph simulator . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 56
5.2 Task scheduler evaluation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 64
6 Task runtime optimization 75
6.1 Dask task runtime . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 76
6.2 Dask runtime overhead analysis . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 80
6.3 RSDS task runtime . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 86
6.4 Performance comparison of Dask and RSDS . . . . . . . . . . . . . . . . . . . . . . 90
7 Task graph meta-scheduling 99
5
7.1 Meta-scheduling design . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 100
7.2 Heterogeneous resource management . . . . . . . . . . . . . . . . . . . . . . . . . . . 105
7.3 HyperQueue . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 111
7.4 Use-cases . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 129
7.5 Evaluation . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 133
7.6 Comparison with other task runtimes . . . . . . . . . . . . . . . . . . . . . . . . . . . 155
8 Conclusion 160
8.1 Impact . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 163
List of own publication activities 164
Bibliography 167
A Benchmark configurations 178
6
List of Abbreviations
This document is incomplete. The external file associated with the glossary ‘acronym’ (which
should be called main.acr) hasn’t been created.
Check the contents of the file main.acn. If it’s empty, that means you haven’t indexed any of
your entries in this glossary (using commands like \gls or \glsadd) so this list can’t be generated.
If the file isn’t empty, the document build process hasn’t been completed.
Try one of the following:
Add automake to your package option list when you load glossaries-extra.sty. For ex-
ample:
\usepackage[automake]{glossaries-extra}
Run the external (Lua) application:
makeglossaries-lite.lua "main"
Run the external (Perl) application:
makeglossaries "main"
Then rerun L
A
T
E
X on this document.
This message will be removed once the problem has been fixed.
List of Figures
3.1 Simple task graph with six tasks and six data objects . . . . . . . . . . . . . . . . . . 29
3.2 Task graph executed with two different schedules . . . . . . . . . . . . . . . . . . . . 35
5.1 Estee architecture . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 58
5.2 Performance of the random scheduler . . . . . . . . . . . . . . . . . . . . . . . . . . . 66
5.3 Comparison of worker selection strategy . . . . . . . . . . . . . . . . . . . . . . . . . 68
5.4 Comparison of max-min and simple network models (irw dataset) . . . . . . . . . . 69
5.5 Comparison of MSD; cluster 32x4 . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 70
5.6 Comparison of information modes (irw dataset) . . . . . . . . . . . . . . . . . . . . . 71
7
5.7 Scheduler performance relative to blevel in Dask and Estee . . . . . . . . . . . . . 73
6.1 Speedup of Dask/random scheduler; Dask/ws is baseline. . . . . . . . . . . . . . . . 82
6.2 Overhead per task in Dask with an increasing number of tasks. . . . . . . . . . . . . 84
6.3 Overhead per task in Dask with an increasing number of workers. . . . . . . . . . . 84
6.4 Strong scaling of Dask with different task durations (1000 ms, 100 ms and 10 ms). . 85
6.5 Effect of GIL on the performance of the pandas_groupby benchmark . . . . . . . . . 85
6.6 Architecture of RSDS . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 87
6.7 Dask message encoding . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 88
6.8 Speedup of RSDS/ws scheduler; baseline is Dask/ws. . . . . . . . . . . . . . . . . . 91
6.9 Speedup of RSDS/random scheduler; baseline is Dask/ws. . . . . . . . . . . . . . . 92
6.10 Speedup of RSDS/random scheduler; baseline is RSDS/ws. . . . . . . . . . . . . . . 93
6.11 Strong scaling of RSDS vs Dask on selected task graphs . . . . . . . . . . . . . . . 94
6.12 Overhead per task for RSDS and Dask with an increasing number of tasks. . . . . . 95
6.13 Overhead per task for RSDS and Dask with an increasing number of workers. . . . 96
6.14 Overhead per task for various cluster sizes and benchmarks . . . . . . . . . . . . . . 96
6.15 Speedup of RSDS/ws over Dask/ws with the zero worker implementation . . . . . 97
7.1 Architecture of HyperQueue. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 112
7.2 Architecture of the HyperQueue server. . . . . . . . . . . . . . . . . . . . . . . . . 114
7.3 State diagram of HyperQueue tasks . . . . . . . . . . . . . . . . . . . . . . . . . . 119
7.4 Total overhead of HyperQueue (ratio vs theoretically ideal makespan) . . . . . . . 134
7.5 Total overhead of HyperQueue (ratio vs manual process execution) . . . . . . . . . 135
7.6 Tasks processed per second with zero worker mode . . . . . . . . . . . . . . . . . . . 136
7.7 Strong scalability of HyperQueue with a fixed target makespan (300 s) . . . . . . . 138
7.8 Strong scalability of HyperQueue with a fixed task duration (1 s) . . . . . . . . . . 139
7.9 Scalability of HyperQueue vs Dask with a fixed target makespan (30 s) . . . . . . 140
7.10 Scalability of HyperQueue vs Dask with an empty task . . . . . . . . . . . . . . . 141
7.11 Effect of different group allocation strategies . . . . . . . . . . . . . . . . . . . . . . . 142
7.12 GPU hardware utilization improvements with fractional resource requirements . . . 143
7.13 Load-balancing effect of resource variants . . . . . . . . . . . . . . . . . . . . . . . . 145
7.14 CPU utilization of HyperQueue worker nodes . . . . . . . . . . . . . . . . . . . . . 146
7.15 Worker hardware utilization with the LiGen virtual screening workflow . . . . . . . . 148
7.16 Scalability of the LiGen virtual screening workflow . . . . . . . . . . . . . . . . . . . 148
7.17 Scaling of allocations using the automatic allocator . . . . . . . . . . . . . . . . . . . 149
7.18 HyperQueue server CPU time consumption with an increasing number of tasks . . 151
7.19 HyperQueue server CPU time consumption with an increasing number of workers . 152
7.20 Overhead of encryption in HyperQueue communication . . . . . . . . . . . . . . . 153
7.21 Effect of output streaming on the makespan of tasks . . . . . . . . . . . . . . . . . . 154
A.1 Task graph shapes in the elementary Estee benchmark data set . . . . . . . . . . . 178
8
List of Tables
6.1 Geometric mean of speedup over the Dask/ws baseline . . . . . . . . . . . . . . . . 92
6.2 Geometric mean of speedup for random schedulers . . . . . . . . . . . . . . . . . . . 93
7.1 Size of task output on disk with and without I/O streaming . . . . . . . . . . . . . . 155
7.2 Comparison of meta-scheduling task runtimes . . . . . . . . . . . . . . . . . . . . . . 156
A.1 Estee scheduler benchmark task graph properties . . . . . . . . . . . . . . . . . . . 179
A.2 Properties of Dask benchmark task graphs . . . . . . . . . . . . . . . . . . . . . . . 180
9
List of Source Code Listings
2.1 MPI program implemented in C . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 18
2.2 C program using a simple OpenMP annotation . . . . . . . . . . . . . . . . . . . . . 18
2.3 MapReduce word count implemented in Python . . . . . . . . . . . . . . . . . . . . . 23
2.4 Dask DataFrame query and its corresponding task graph . . . . . . . . . . . . . . . 23
2.5 Task-parallel Fibonacci calculation using Cilk . . . . . . . . . . . . . . . . . . . . . . 25
5.1 Simple task graph simulation example using Estee . . . . . . . . . . . . . . . . . . . 59
6.1 Example of a Python program that leverages the Dask Array API . . . . . . . . . . 76
6.2 Example of a Python program that leverages the Dask DataFrame API . . . . . . . 77
7.1 Examples of HyperQueue CLI commands . . . . . . . . . . . . . . . . . . . . . . . 116
7.2 Creating task arrays using the HyperQueue CLI . . . . . . . . . . . . . . . . . . . 118
7.3 Configuring worker resources using the HyperQueue CLI . . . . . . . . . . . . . . . 122
7.4 Configuring task resource requirements using the HyperQueue CLI . . . . . . . . . 124
7.5 Handling task failure using the HyperQueue CLI . . . . . . . . . . . . . . . . . . . 125
7.6 Configuring automatic allocation using the HyperQueue CLI . . . . . . . . . . . . 127
7.7 Hyperparameter search using HyperQueue . . . . . . . . . . . . . . . . . . . . . . . 132
10
List of Definitions
3.1 Task graph . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 28
3.2 Computational environment . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 32
3.2 Task graph execution . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 32
3.2 Dependency constraint . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 33
3.2 Worker and task resource constraint . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 33
3.3 Makespan . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 34
7.2 Computational environment with non-fungible resources . . . . . . . . . . . . . . . . . . 105
7.2 Task graph execution with non-fungible resources . . . . . . . . . . . . . . . . . . . . . . 106
7.2 Worker resource constraint with non-fungible resources . . . . . . . . . . . . . . . . . . 107
7.2 Task resource constraint with non-fungible resources . . . . . . . . . . . . . . . . . . . . 107
7.2 Task graph with fractional resource requirements . . . . . . . . . . . . . . . . . . . . . . 108
7.2 Task graph with resource variants . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 110
7.2 Task graph execution with resource variants . . . . . . . . . . . . . . . . . . . . . . . . . 110
7.2 Worker and task resource constraint with resource variants . . . . . . . . . . . . . . . . 111
11
Chapter 1
Introduction
HPC (High-performance Computing) infrastructures are crucial for the advancement of scientific
research, as they offer unparalleled computational power that can be leveraged to perform the most
complex scientific experiments. This massive performance is required (among other use-cases)
in various scientific domains, such as weather forecasting [5], computational fluid dynamics [6],
bioinformatics [7] or deep learning [8].
Over the last several decades, the performance of HPC clusters (supercomputers) has been
steadily increasing, effectively doubling every few years, in line with Moore’s Law and Dennard
scaling [9]. However, it also became more difficult for HPC users to tap into that performance
increase. Thirty years ago, it was possible to get essentially double the performance for free, just
by using a new (super)computer every two years, without having to modify existing programs.
This phenomenon had started to diminish by the end of the last century, as chip designers became
limited by the memory wall [10] and especially the power wall [11].
To keep up with the expectations of exponential performance increases, CPUs (Central Pro-
cessing Units) had to become more complex. Processor manufacturers started implementing vari-
ous buffers and caches, multiple cores, simultaneous multithreading, out-of-order execution and a
plethora of other techniques that would allow the CPU to run faster, without requiring massive
increases of power draw or memory bandwidth. The existence of multiple cores and sockets and
the need for ever-increasing memory sizes has also made the memory system more complex, with
NUMA (Non-uniform Memory Access) memories becoming commonplace in HPC. To achieve even
more performance, HPC clusters started massively adopting various accelerators, like the Intel Xeon
Phi [12] manycore coprocessor or general-purpose NVIDIA or AMD GPUs (Graphics Processing
Units), which eventually became the backbone of the majority of current supercomputers [13].
Some clusters have also adapted more unconventional accelerators, like reconfigurable hardware,
such as FPGAs (Field-programmable Gate Arrays), or AI (Artificial Intelligence) accelerators, such
as TPUs (Tensor Processing Units). This trend gave rise to heterogeneous clusters that offer various
types of hardware, each designed for specific workloads.
These hardware improvements have managed to keep up with Moore’s Law, but no longer
without requiring changes to the software. The increasing complexity and heterogeneity of HPC
hardware has caused the “HPC software stack” and the corresponding programming models to
12
become more complex, making it far from trivial to leverage the available performance offered by
supercomputers. Individual computers of HPC clusters (called computational nodes) can consist
of hundreds of CPU cores each, yet it is challenging to write programs that can scale to such high
core counts. The RAM (Random-access Memory) of each node contains multiple levels of complex
cache hierarchies, and it has such a large capacity that it has to be split into multiple physical
locations with varying access latencies (NUMA), which requires usage of specialized programming
techniques to achieve optimal performance. And the ever-present accelerators, for example GPUs,
might require their users to adopt completely different programming models and frameworks.
Historically, optimized HPC software was primarily written using system or scientifically fo-
cused programming languages (e.g. C, C++ or Fortran) and specialized libraries for parallelizing
and distributing computation, such as OpenMP (Open Multi-processing) [14], CUDA [15] or MPI
(Message Passing Interface) [16]. While these rather low-level technologies are able to provide the
best possible performance, it can be quite challenging and slow to develop (and maintain) applica-
tions that use them. It is unreasonable to expect that most domain scientists that develop software
for HPC clusters (who are often not primarily software developers) will be able to use all these
technologies efficiently without making the development process slow and cumbersome. This task
should be left to specialized performance engineers, enabling the scientists to focus on the problem
domain [17].
With the advent of more powerful hardware, HPC systems are able to solve new problems,
which are more and more demanding, both in terms of the required computational power, but also
in terms of data management, network communication patterns and general software design and
architecture. Areas such as weather prediction, machine-learning model training or big data analysis
require executing thousands or even millions of simulations and experiments. These experiments
can be very complex, consisting of multiple dependent steps, such as data ingestion, preprocessing,
computation, postprocessing, visualization, etc. It is imperative for scientists to have a quick
way of prototyping these applications, because their requirements change rapidly, and it would be
infeasible to develop them using only very low-level technologies.
The growing complexity of HPC hardware, software and use-cases has given rise to the popular-
ity of task-based programming models and paradigms. Task-oriented programming models allow
users to focus on their problem domain and quickly prototype, while still being able to describe
complicated computations with a large number of individual steps and to efficiently utilize the
available computational resources. With a task-based approach, a complex computation is de-
scribed using a set of atomic computational blocks (tasks) that are composed together in a task
graph which captures dependencies between the individual tasks. Task graphs abstract away most
of the complexity of network communication and parallelization, and they are general enough to
describe a large set of programs in a practical and simple way. At the same time, they remain
amenable to compiler-driven optimization and automatic parallelization, which helps to bring the
performance of programs described by a task graph close to manually parallelized and distributed
programs, at a fraction of the development cost for the application developer. They are also rel-
atively portable by default, as the task graph programming model typically does not make many
assumptions about the target platform; therefore, the same task graph can be executed on various
13
systems and clusters, if the tasks and a task graph execution tool can run on that cluster.
Combined with the fact that task-based tools often allow users to implement their workflows in
very high-level languages, such as Python or various DSLs (Domain-specific Languages), it makes
them an ideal tool for rapid scientific prototyping. However, this does not mean that low-level high-
performance kernels are not being used anymore; a common approach is to describe the high-level
communication structure of the workflow using a task graph where the individual tasks execute the
low-level kernels, rather than implementing a monolithic application that performs all the network
communication explicitly.
Task graphs are already commonly being used and deployed on various distributed systems [18,
19, 20], yet there are certain challenges that limit their usage ergonomics and performance efficiency
when deployed specifically on HPC systems. These challenges stem from various factors, such as
the interaction of task graphs with HPC allocation managers, the heterogeneity and complexity
of HPC cluster hardware, or simply from the potentially enormous computational scale. When
task graph authors encounter these problems, they might have to step out of the comfort zone of
this easy-to-use programming model, and implement parts of their applications using other, more
complicated approaches, to either meet their performance goals or to even make it possible to
execute their application on HPC clusters at all. Removing or alleviating some of those challenges
could lower the barrier of entry, make task graph execution better suited for various HPC use-cases
and turn it into an actual first-class citizen in the world of supercomputing.
To achieve the goal of making it easier and more efficient to execute task graphs on heterogeneous
supercomputers, this thesis sets out the following objectives:
1. Identify and analyze existing challenges and bottlenecks of task graph execution on HPC
clusters, particularly in the areas of efficient hardware utilization and usage ergonomics, and
examine how are existing tools able to deal with them.
2. Introduce a set of guidelines and approaches for overcoming these challenges that would facil-
itate effortless execution of task graphs on modern heterogeneous clusters. These guidelines
should serve as a template for implementing HPC-optimized task graph execution tools.
3. Implement a task graph execution tool using these guidelines and evaluate it on HPC use-
cases.
The thesis is structured as follows. Chapter 2 describes various approaches for designing paral-
lelized programs on distributed clusters, to provide context on how does task-based programming
relate to them. It also concretizes a specific subset of task-based programming relevant for this the-
sis. Chapter 3 then defines key terms related to tasks and task graphs in detail, to provide a shared
vocabulary that will be used throughout this thesis. It is followed by Chapter 4, which discusses
various ergonomic challenges and performance bottlenecks faced by state-of-the-art distributed task
runtimes when executing task graphs on HPC systems.
The following three chapters then discuss designs for overcoming these challenges. Chapters 5
and 6 focus solely on the efficiency aspects. Chapter 5 evaluates the quality of various task schedul-
ing algorithms, which are important for achieving good performance when executing task work-
flows, and introduces Estee, a task graph execution simulator that can be used to prototype new
14
scheduling algorithms. Chapter 6 analyzes the runtime performance of Dask, a state-of-the-art
task runtime, and introduces an alternative implementation of its server called RSDS, which is
able to outperform Dask in various HPC use-cases. Chapter 7 then focuses on improving both
the ergonomic and performance aspects of task execution using a meta-scheduling and resource
management approach designed to facilitate task graph execution on heterogeneous clusters. This
approach has been implemented in the HyperQueue task runtime, which is also described and
evaluated in this chapter in detail. Finally, Chapter 8 summarizes the thesis and outlines future
work.
15
Chapter 2
Parallel and distributed computing
There are many ways to design and implement applications for a distributed cluster, a set of com-
puters, typically labeled as (computational) nodes, that have their own independent processors and
memory and are connected together with a computer network so that they can cooperate on solving
difficult problems. In the context of HPC, such distributed clusters are called supercomputers, and
one of their distinguishing features is that all the computers of the cluster reside within one physical
location, and they are connected with a very high-speed and low-latency network. Even though
there are other kinds of distributed clusters, such as data centers or cloud-based systems, this thesis
focuses exclusively on HPC systems and supercomputers; therefore, the term (distributed) cluster
will denote an HPC cluster in the rest of the text.
Distributed applications are implemented using various programming models that allow express-
ing communication patterns which enable individual cluster nodes to cooperate and exchange data,
and thus efficiently utilize available computational resources. Communication between nodes is cru-
cial, as that is what allows distributed clusters to offer unparalleled performance by distributing the
computational load among multiple computers and thus achieving speedup through parallelization.
This thesis is focused on a particular subset of task-based programming models and their in-
teraction with HPC clusters. The goal of this chapter is to specify this subset. First, it provides a
broad overview of the most popular approaches for implementing distributed and parallel applica-
tions (with particular focus on HPC use-cases), and then it gradually concretizes which niches of
this diverse area are most relevant to the topic of this thesis.
2.1 Parallel programming models
This section describes the most important representatives of programming models that are used
in the world of supercomputing. It divides the programming models into two broad categories:
models that express parallelization and network communication explicitly, and models that do so
implicitly.
16
Explicit parallelization
One way to design distributed applications is to leverage programming models that express the
parallelization of computation and the exchange of data between nodes explicitly. This has been
the predominant way of creating HPC software for many years, and it is still very popular today [21,
22, 23]. Below are a few examples of these explicit approaches.
Message passing has historically been the most popular method for implementing HPC software.
In this model, a distributed computation is performed by a set of processes with separate memory
address spaces that are typically executed across multiple nodes. The processes cooperate together
to solve complex problems by exchanging network messages (hence the term message passing). Mes-
sage passing applications are commonly implemented using the SPMD (Single Program, Multiple
Data) [24] programming model, where the implementation logic of all the processes participating
in the computation is embedded within a single program.
The most popular representative of this programming model is the MPI [25] framework, which
is used by a large number of existing HPC applications [22]. It defines a set of communication
primitives, operators and data types, which can be used to perform computations, exchange data
and synchronize progress between either two (point-to-point communication) or multiple (collective
communication) processes running on remote nodes. Listing 2.1 shows a simple MPI program
designed to be executed in two (potentially distributed) processes. The first process sends a number
to the second process, which waits until that number is received, and then prints it. Notice how
network communication and process synchronization is expressed explicitly, by calling the MPI_Send
and MPI_Recv functions. We can also see the SPMD paradigm in practice, because the code for
both processes is interleaved within the same program.
PGAS (Partitioned Global Address Space) [26] is a relatively similar programming model,
which also often employs the SPMD paradigm. Where it differs from message passing is in the
way it expresses communication between processes. Message passing processes share their memory
by communicating with other processes, while PGAS provides an abstraction of a shared memory
address space and allows processes to communicate through it
1
. PGAS provides an illusion of a
global memory address space that is available to processes that participate in the communication,
which makes it slightly less explicit in terms of expressing the communication patterns within the
program, because it translates certain memory operations into network messages on behalf of the
programmer.
PGAS programs also often employ one-sided communication techniques, such as RDMA (Remote
Direct Memory Access), which allows a process to directly read or write a region of memory from
the address space of a different process (potentially located at a remote node).
Shared-memory multiprocessing is an approach that focuses on the parallelization within a
single computational node, by leveraging multithreading to achieve speedup. In the area of HPC,
it is common to use the OpenMP [14] framework to implement multi-threaded applications. Apart
1
To paraphrase the famous “Do not communicate by sharing memory; instead, share memory by communicating”
quote that originates from the Go programming language community.
17
#include <mpi.h>
#include <stdio.h>
int main() {
MPI_Init(NULL, NULL);
// Find out the ID of this process
int process_id;
MPI_Comm_rank(MPI_COMM_WORLD, &process_id);
if (process_id == 0) {
// Send one integer to process 1
int value = 42;
MPI_Send(&value, 1, MPI_INT, 1, 0, MPI_COMM_WORLD);
} else if (process_id == 1) {
// Receive one integer from process 0
int value = 0;
MPI_Recv(&value, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
printf("Process 1 received number %d from process 0\n", value);
}
MPI_Finalize();
return 0;
}
Listing 2.1: MPI program implemented in C
void compute_parallel(int* items, int count) {
// This loop is executed in parallel
#pragma omp parallel for
for (int i = 0; i < count; i++) {
items[i] = compute(i);
}
}
Listing 2.2: C program using a simple OpenMP annotation
from providing interfaces for parallelizing code, synchronizing threads through barriers or various
locks and performing atomic operations, it is also able to offload computation to various acceler-
ators (like a GPU) attached to a node. OpenMP can be used together with the two previously
mentioned programming models (it is often combined especially with MPI [27]), in order to achieve
parallelization both intra-node (via multithreading) and inter-node (via network communication).
OpenMP does not only offer an API (Application Programming Interface), but it can also be
integrated directly within a compiler, e.g. in GCC (GNU Compiler Collection) for programs written
in the C or C++ programming languages. This enables it to provide source code annotations (called
pragmas), which allow the programmer to parallelize a region of code with very little effort. An
example of this can be seen in Listing 2.2, where a loop is parallelized simply by adding a single
annotation to the source code.
These explicit programming models share a lot of desirable properties. They give their users a
18
lot of control over the exchange of data between individual cores and remote nodes, which allows
creating very performant programs. Having the option to explicitly describe how will the individual
cores and nodes cooperate also enables expressing arbitrarily complex parallelization patterns and
data distribution strategies. However, in order to fully exploit the performance potential of explicit
parallelization, the programmer must have advanced knowledge of the CPU or GPU hardware
micro-architecture [28] and the memory model of the used programming language [29].
Even though explicitly parallelized programs can be very efficient, implementing correct ap-
plications using them is notoriously difficult. Multi-threaded and distributed programs are highly
concurrent, which makes it easy to introduce various programming errors, such as deadlocks, race
conditions or data races. Especially for distributed programs, debugging such issues can be incred-
ibly challenging. Furthermore, programs that leverage explicitly parallel programming models are
typically implemented in languages such as C or C++, which are infamous for making it difficult to
write correct, memory-safe programs without memory errors and undefined behavior [30]. Memory
safety issues are even more problematic in heavily concurrent programs, which further increases
the difficulty and decreases the speed of developing correct distributed programs.
Apart from correctness, using explicit communication interfaces can also lead to over-dependence
on a specific structure of available computational resources. For example, the MPI paradigm typi-
cally assumes that a fixed number of processes participate in the computation and might struggle
with situations where some of these processes crash, which infamously makes it challenging to
implement fully fault-tolerant MPI programs [31].
Implicit parallelization
Since it can take a lot of effort to implement a correct and efficient distributed program using ex-
plicitly parallel models, it would be unreasonable to expect that all users who want to leverage HPC
resources to execute their experiments will “roll up their sleeves” and spend months implementing
an explicitly parallel C++ program that uses MPI and OpenMP. In fact, with scientific experiments
becoming more and more complex each year, in most cases it would not even be feasible to develop
custom (explicitly parallel) code for them from scratch. Instead, high-performance parallelized
primitives implemented by specialized performance engineers [17] are moving into libraries and
frameworks, such as GROMACS [32, 33] or TensorFlow [34, 35], that still leverage technologies like
MPI or OpenMP internally, but do not necessarily expose them to their end users.
This allows users of HPC systems and scientists to focus on their problem domain, as their
responsibility shifts from implementing communication and parallelization techniques by hand to
describing high-level computational workflows using implicitly parallel programming models that
are able to automatically derive the communication and parallelization structure from the program
description. With an implicitly parallel model, the emphasis moves from how to perform a dis-
tributed or parallelized computation (which is the essence of the explicit models) to what should be
computed and how the individual computational steps are related to each other, which is usually
the main aspect that users actually want to focus on.
The primary benefit of implicitly parallel approaches is that they make it easy to define a
computation that can be automatically parallelized, without forcing the user to think about how
19
exactly the parallelization and network communication will be performed. Execution frameworks
are then able to ingest programs implemented using these models and automatically execute them
on a parallel machine or even a distributed cluster in a parallel fashion, thus making it much easier
for the user to leverage available hardware resources.
Implicit models are typically easier to use than the explicit models, and they facilitate rapid
prototyping of parallelized programs. On the other hand, the main disadvantage of these models is
the lack of control of how exactly is parallelization performed. Therefore, programs implemented
using them might unable to achieve the same performance as explicitly parallelized programs.
There are various models that support implicit parallelization, for example stencil computa-
tions [36] or automatically parallelized functional languages [37]. But by far the most popular are
the many tools and models based on tasks. Since task-based programming is a core topic of this
thesis, the rest of the thesis will focus exclusively on programming models that leverage tasks.
In particular, the following section will categorize these models based on several properties and
describe representative tools that implement this paradigm.
2.2 Task-based programming models
In recent years, it has become very popular to define scientific computations running on distributed
and HPC clusters using task-based programming models [18, 38, 19]. These models allow their
users to describe the high-level structure of their computations using computational workflows
(also called pipelines or task graphs
2
). A computational workflow is a DAG (Directed Acyclic
Graph) of tasks, atomic and independent computational blocks with separate inputs and outputs
that can depend on one another, which can be executed in a self-contained way. Such workflows
can naturally express diverse scientific experiments, which typically need to execute and combine
many independent steps with dependencies between themselves (for example preprocessing data,
executing simulations, performing data postprocessing and analysis, etc.). They are also very
flexible and easy to use, which makes them especially popular.
Since task-based programming models are implicitly parallel, their users do not have to im-
peratively specify how their computation should be parallelized, or when and how data should be
exchanged between remote nodes. They merely describe the individual parts of their program that
can theoretically be executed in parallel (the tasks) and then pass the created task graph to a
dedicated execution tool (that we will label as a task runtime) that executes the tasks, typically
on a distributed cluster. Because the program is represented with a graph, the task runtime can
effectively analyze its properties (or even optimize the structure of the graph) in an automated way,
and extract the available parallelism from it without requiring the user to explicitly define how the
program should be parallelized.
It is important to note that from the perspective of a task runtime, each task is opaque. The tool
knows how to execute it, but it typically does not have any further knowledge of the inner structure
of the task. Therefore, the only parallelization opportunities that can be extracted by the tool have
to be expressed by the structure of the task graph. A task graph containing a single task is thus
2
These three terms will be used interchangeably in this thesis.
20
not very useful on its own. The individual tasks can of course also be internally parallel; however,
this parallelization is not provided automatically by the task runtime. Tasks often achieve internal
parallelism using shared memory multiprocessing, for example using the OpenMP framework.
Since task-based programming models are quite popular, there are hundreds of different tools
that leverage them. It can be challenging to understand how these tools relate to one another, be-
cause umbrella terms like task, task graph, DAG, workflow or pipeline can represent vastly different
concepts in different contexts. For example, the term task is used for many unrelated concepts in
computer science, from an execution context in the Linux kernel, through a block of code that can
be executed on a separate thread by OpenMP, to a program that is a part of a complex distributed
computational workflow. It is thus possible to encounter two programming models or tools that
both claim to use “task-based programming”, even though they might have very little in common.
The rest of this section will thus categorize existing state-of-the-art tools that use task-based pro-
gramming models based on several properties, to put them into a broader context; it will also
gradually specify which niches of this diverse area are most relevant for the topic of this thesis.
2.2.1 Batch vs stream processing
One of the most distinguishing properties that divides distributed task processing tools into two
broad categories is the approach used to trigger the execution of the workflow. Stream processing
tools are designed to execute continuously, and react to external events that can arrive asyn-
chronously and at irregular intervals, while typically focusing on low latency. A streaming compu-
tational workflow is executed every time a specific event arrives. The workflow can then analyze
the event, and generate some output, which is then e.g. persisted in a database or sent to another
stream processing system. As an example, a web application can stream its logs, which are being
generated dynamically as users visit the website, to a stream processing tool that analyzes the
logs and extracts information out of them in real time. Popular representatives of stream process-
ing are for example Apache Flink [39] or Apache Kafka [40]. Streaming-based tools can also be
implemented using the dataflow programming model [41, 42].
In contrast, batch processing tools are designed to perform a specific computation over a set of
input data (a batch) that is fully available before the computation starts, while focusing primarily
on maximal throughput. Such workflows are usually triggered manually by a user once all the
data is prepared, and the workflow stops executing once it has processed all of its inputs. In
certain cases, a part of the computation can be defined dynamically while the workflow is already
executing. For example, iterative workflows perform a certain computation repeatedly in a loop
until some condition in met. Popular representatives of batch processing task runtimes are for
example Dask [43], SnakeMake [44] or SciLuigi [45].
Streaming processing is common in the area of cloud computing, and is useful especially for
analyzing data that is being generated in real time. However, it is not very common in the world of
supercomputing, because HPC hardware resources are typically transient, and are not designed to
be available for a single user at all times, which limits their usefulness for handling real-time events
that occur at unpredictable times. On the other hand, batch processing workflows are a better fit
for HPC clusters, since their execution time is bounded by the size of their input, which is often
21
known in advance, and thus they can be more easily mapped to transient, time-limited allocations
of HPC hardware resources. This thesis focuses exclusively on batch processing.
2.2.2 Programming model generality
Even though most workflow processing tools are designed to execute a DAG of tasks, not all of them
support arbitrary task graphs. Some tools use programming models designed for specialized use-
cases, which allows them to offer very high-level and easy-to-use DSLs and APIs that are designed
to perform a specific set of things well, but that do not allow expressing fully general task graphs.
An example of such a constrained approach is the Bulk synchronous parallel [46] model, which
models a distributed computation with a series of steps that are executed in order. Within each
step, a specific computation is performed in parallel, potentially on multiple remote nodes. At the
end of each step, the nodes can exchange data among themselves, and they are synchronized with
a global barrier, to ensure that there are no cyclic communication patterns in the computation.
Even though this model does not use fully arbitrary computational graphs, it is still possible to
express many algorithms with it [47].
A popular instance of this model that has gained a lot of popularity in the area of distributed
computing is MapReduce [48]. Its goal is to allow parallel processing of large amounts of data
on a distributed cluster in a fault-tolerant way, while providing a simple interface for the user. It
does that by structuring the computation into three high-level operations, which correspond to
individual bulk synchronous parallel steps:
1. A map operation (provided by the user) is performed on the input data. This operation
performs data transformation and filtering, associates some form of a key to each data item
and produces a key-value tuple.
2. A shuffle operation (implemented by a MapReduce framework) redistributes the tuples among
a set of remote nodes, based on the key of each tuple, so that tuples with the same key will
end up at the same node.
3. A reduce operation (provided by the user) is performed on each node. The reduction typically
performs some aggregation operation (such as sum) on batches of tuples, where each batch
contains tuples with the same key.
This approach is an instance of the split-apply-combine [49] paradigm, which describes the
following intuitive strategy for parallel computations on a dataset: split the data into multiple
chunks, apply some transformation to each chunk in parallel (this corresponds to the map operation)
and then combine the results (this corresponds to the reduce operation).
Listing 2.3 shows a simple Python program that computes the frequency of individual words
in a body of text
3
using a popular implementation of MapReduce called Apache Spark [50]. Even
though the programmer only needs to provide a few simple map and reduce operations, this short
program can be executed on a distributed cluster and potentially handle large amounts of data.
Note that multiple map and reduce operations can be combined in a single program, which can
also be observed in this example. While this program is implemented in the Python programming
3
This computation is commonly known as word count.
22
def word_count(context):
file = context.textFile("shakespeare.txt")
counts = file.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda x, y: x + y)
output = counts.collect()
print(output)
Listing 2.3: MapReduce word count implemented in Python
language, the most important parts of the computation are expressed using a few very specific
implicitly parallel operations (map and reduce); therefore, it is possible to consider this to be a sort
of DSL for expressing distributed computations.
The primary benefit of this approach is that it is easy to define a computation that can be
automatically parallelized and distributed. Furthermore, the “layered” shape of task graphs pro-
duced by MapReduce programs facilitates implementation of fault tolerance, because there are
clearly delimited checkpoints where the intermediate results of the computation can be persisted
to disk (at the end of each map or reduce operation), so that they can be later restored in case of
failure. This is only possible because the tools that execute MapReduce programs have additional
knowledge and invariants about the shape of the task graph, due to its constrained nature.
However, this is also the main limitation of this model. If a computation cannot be naturally
expressed using the map and reduce operations, it can be challenging, or even impossible, to im-
plement it with the MapReduce paradigm. In particular, MapReduce assumes forward data-flow,
where data is being sent through the computational pipeline in one direction. This is problematic
especially for iterative computations that have to be executed repeatedly until some condition is
met, which typically use the output of a previous iteration as an input for the next iteration, thus
creating a loop in the flow of data.
import dask.dataframe as pd
df = pd.read_csv("dataset.csv")
# This query is translated to the
# task graph shown on the right
df.groupby("Country").GDP.mean()
getitem 00mean-tree
0
1
2
3
mean_chunk0
mean_chunk1
mean_chunk2
mean_chunk3
getitem0
getitem1
getitem2
getitem3
Listing 2.4: Dask DataFrame query and its corresponding task graph
Specialized types of task graphs are also commonly used to parallelize dataframe processing,
23
which is a popular approach for performing exploratory data analytics and OLAP (Online Analyt-
ical Processing) queries over two-dimensional tabular datasets (dataframes). Tools such as Dask
DataFrame [43], Modin [51], Vaex [52] or CuDF [53] offer Python or SQL (Structured Query Lan-
guage) interfaces for expressing queries over dataframes, and then translate these queries into task
graphs that are executed on parallel machines, distributed clusters or GPU accelerators. Users of
these tools do not even necessarily have to know that they are interacting with task graphs, since
their usage is mostly an internal implementation detail. Listing 2.4 shows a short Python program
that leverages the Dask DataFrame API for performing a query on a dataframe loaded from a
CSV (Comma-separated Values) file. The query is internally translated by Dask to the task graph
shown on the right side of the figure, which can then be executed e.g. on a distributed cluster.
DataFrame processing is often used in scientific workflows. However, it typically represents only
a certain part of the workflow (e.g. a single task or a set of tasks), and it is typically not used to
define the structure of the whole workflow.
While MapReduce, DataFrame processing or other similar models are useful for computations
that have a predictable structure, they are not always general enough to express complex HPC
scientific workflows. We will thus further only consider tools and programming models
that allow executing arbitrary task graphs.
2.2.3 Task granularity
An important aspect of tasks is their granularity, which determines how much work the task
performs and into how many subtasks it could be potentially divided to achieve more parallelization
opportunities. In an extreme case, a coarse-grained (low granularity) task could represent the
whole computation that we want to perform, while a fine-grained (high granularity) task could
represent merely the execution of a single CPU instruction. With an increasing granularity, each
task performs less work, and the whole workflow will thus have to be divided into a larger number
of tasks. This makes it easier to parallelize the workflow; however, it also increases the overall
overhead introduced by the task runtime, which has to manage and schedule more tasks. The same
properties also apply inversely for a decreasing granularity of tasks.
Since tasks represent arbitrary computations, it is not always straightforward to determine how
granular they are. Usually it is much simpler to use a proxy metric for task granularity based on
the duration it takes to execute the task. Therefore, if a task takes a short time to execute (e.g.
milliseconds or less), we will consider that it is highly granular, while if a task takes a long time
to execute (e.g. hours or more), we will consider that it has low granularity. However, it should
be noted that what constitutes high or low granularity depends on a specific use-case and the task
runtime used to execute it.
Task granularity is important primarily because it determines how efficiently a task graph can
be parallelized and if it even makes sense to distribute it to multiple nodes at all. For example, if
certain tasks would take mere nanoseconds to execute, there would be no point in dynamically load
balancing them across multiple remote nodes, because the overhead of doing so would dwarf the
execution time of the tasks themselves, due to the latency induced by the network communication.
In other words, it would be faster to just execute all such tasks on the local node rather than sending
24
cilk int fibonacci(int n) {
if (n < 2) {
return n;
} else {
// Spawn a new task for each recursive call
int x = spawn fibonacci(n - 1);
int y = spawn fibonacci(n - 2);
// Use a barrier to wait until the tasks finish
sync;
return x + y;
}
}
Listing 2.5: Task-parallel Fibonacci calculation using Cilk
Source code snippet adapted from [55].
them across the network. It could still be worth it to distribute a large number of such extremely
granular tasks to multiple nodes, but the distribution would need to happen in an amortized way,
for example only once at the beginning of the computation, to avoid excessive communication
overhead.
Some task-based programming models focus primarily on high granularity tasks, for example
StarPU [54], Cilk [55], HPX [56], PaRSEC [57], Legion [58], TBB [59] or OpenMP [14]
4
. In these
models, which are sometimes called task-parallel [60], tasks usually represent either functions or
blocks of code that contain only a couple of instructions, and are typically relatively quick to execute
(they can run e.g. for milliseconds or seconds). While some of these tools also support distributed
computing, their main use-case is to provide intra-node parallelization on multicore systems with
shared memory, and often also to offload computation to attached accelerators, such as GPUs or
FPGAs.
They can be used e.g. to implement high-performance numerically intensive computational
kernels, such as matrix multiplication or QR factorization [61], or to parallelize recursive algorithms
or algorithms with irregular data access patterns. Listing 2.5 shows a program implemented in the
Cilk programming language, which calculates the n-th Fibonacci number using a task-parallel
approach.
Task-parallel models definitely have their place in HPC; however, they are not primarily de-
signed to execute high-level scientific workflows, as they deal with a different set of challenges and
constraints. Therefore, they are not the primary focus of this thesis and will not be considered in
the rest of the text.
There is also a set of task-based tools that reside at the other end of the granularity spectrum,
for example Apache Airflow [62], Dagster [63], Prefect [64] or Argo [65]. These are commonly
labeled as Workflow Management Systems. Although it cannot be said that there is a single
distinguishing feature that would separate these tools from other task runtimes, what they have
in common is that they were not primarily designed for HPC use-cases. They typically work with
4
Note that even though OpenMP has been previously presented as an example of an explicitly parallel model, it
also offers a task system that provides facilities for implicit parallelization.
25
very coarse-grained workflows with a relatively small number of tasks, they put strong emphasis on
workflow reproducibility, data provenance, execution monitoring (typically through a web interface)
and scheduled execution (i.e. “execute this workflow every hour”). They are also typically being
applied in cloud environments, moreso than on supercomputers. This thesis will thus not consider
these tools in great detail.
Summary
This chapter has introduced various approaches for creating distributed applications designed for
HPC clusters. It categorized them into two broad categories, explicitly parallel and implicitly
parallel models, and demonstrated several examples of both approaches.
It has also specified the subset of distributed computing that is most relevant for this thesis. In
particular, this thesis focuses on batch processing task programming models that define computa-
tions using fully general DAGs and that are designed for distributed execution on an HPC cluster,
with tasks whose granularity (duration) typically spans from (tens of) milliseconds to hours.
These programming models are being commonly used for defining scientific workflows using
tools that will be described in Chapter 4, which will also discuss various challenges faced by these
tools on HPC systems. But before that, the following chapter will first define a vocabulary of task
related terms so that we can refer to them in the rest of the thesis.
26
Chapter 3
Task-based programming
In order to discuss task-based programming, it is useful to provide a vocabulary of terms related to
it and a set of definitions that describe the basic properties of task graphs and also the rules of their
execution. These definitions necessarily have to be relatively general, so that they can be applied
to a wide range of systems; even though the previous chapter has specified a relatively precise
subset of task-based programming models that will be examined and analyzed in this thesis, there
is still a large number of tools and systems that belong to this area of interest, each with their own
distinct concepts and semantic rules. Therefore, it would be infeasible to provide a single unifying
and complete task theory that would encompass details of all these tools and programming models.
This chapter defines a set of concepts related to tasks, with a particular focus on properties
that are important for task graph execution in HPC environments, which will be further described
in Chapter 4. The definitions described in this chapter are specifically adapted to the needs and
terminology of this thesis, as is usual for other research works. They form the lowest common
denominator that can be applied to the specific task execution tools and programming models
discussed throughout this thesis. The presented terms are similar to definitions that can be found
in existing works [66, 67, 68], although they differ in several aspects. In particular, we do not
assume that task execution times nor output data sizes are known in advance, and we define a
very general concept of resource management, which can be used to model complex resources of
heterogeneous clusters. Some of the definitions related to resource management will be further
extended in Chapter 7.
3.1 Task graphs
A computational workflow in the task-based programming model is represented with a DAG that we
will label as a task graph. From a high-level perspective, it describes which individual computational
steps should be performed, what are the constraints for where and in which order they should be
computed and how should data be transferred between the individual steps of the workflow.
There are many variations of task graphs that differ in the computational properties they are
able to describe. In the most basic form, task graph vertices represent computations that should
be performed and the graph arcs (edges) represent dependencies between the computations, which
27
enforce an execution order. However, numerous other concepts can also be encoded in a task
graph. For example, in addition to dependencies, the arcs could represent abstract communication
channels, through which the outputs of one computation are transferred to become the inputs of
another computation that depends on it. As another example, there could be a special type of arc
which specifies that the outputs of a computation will be streamed to a follow-up computation,
instead of being transferred in bulk only after the previous computation has finished.
As was already noted, the exact semantics of vertices and arcs of task graphs depend heavily
on the specifics of tools that implement them, and it is thus infeasible to provide a single definition
that would fit all variants used “in the wild”. To provide a baseline definition, we will formally
define a task graph that can model dependencies between tasks, the notion of transferring data
outputs between tasks, and also the resources needed for the execution of individual tasks.
[Definition 1] A task graph is a tuple G = (T, O, A, RK
t
, Res
t
), where:
T is a set of tasks.
O is a set of data objects.
A ((T × O) (O × T )) is a set of arcs.
(T O, A) has to form a finite directed acyclic graph.
The absence of cycles is important; otherwise, the task graph could not be executed.
For every data object, there has to be exactly one task that produces it:
o O : |A (T × {o})| = 1
RK
t
is a set of resource kinds. Each resource kind describes a type of resource (e.g. a CPU
core or a GPU device) that might be required to execute a specific task.
Res
t
: T × RK
t
N
0
is a function that defines the resource requirement of a task for a given
resource kind.
A resource requirement specifies what amount of the given resource kind is required to be
available at a computational provider so that it can execute the given task. Res
t
(t, r) = 0
specifies that task t does not require resource kind r for its execution.
Note that the definition above allows the existence of tasks that have no resource requirements.
While there might be use-cases where this is desirable, in typical situations we might want to ensure
that each task requires at least some resource (e.g. at least a single CPU core), to avoid situations
where all such tasks could be scheduled to a single worker, which could lead to oversubscription.
We could express that constraint using the following property: t T, r RK
t
: Res
t
(t, r) = 0.
Below, we will define several terms that will be useful for describing task graphs and their proper-
ties. For the following definitions, assume that we work with a task graph G = (T, O, A, RK
t
, Res
t
):
If there is an arc from a task to a data object ((t, o) (T × O)), then we call t the producer
of o and o the output of t.
If there is an arc from a data object to a task ((o, t) (O × T )), then we call t the consumer
of o and o the input of t.
28
Let us introduce a binary relation DR
G
:
DR
G
= {(t
1
, t
2
) (T × T ) | t
1
= t
2
o O : (t
1
, o) A (o, t
2
) A}
When (t
1
, t
2
) DR
G
, we say that t
2
directly depends on t
1
. We can also state that t
2
consumes
the output produced by t
1
.
Let us introduce a binary relation D
G
, which forms the transitive closure of DR
G
. More
explicitly, tasks (t, t
) belong to D
G
if there exists a sequence (t
1
, t
2
, . . . , t
n
) such that t =
t
1
t
= t
n
and i {1, 2, . . . , n 1} : (t
i
, t
i+1
) DR
G
.
When (t
1
, t
2
) D
G
, we say that t
2
depends on t
1
and that t
1
is a dependency of t
2
.
We call tasks without any inputs source tasks:
S
G
= {t T | t
d
T : (t
d
, t) / D
G
}
It is a simple observation that unless the task graph is empty (T = ), there is always at least
one source task in the graph, because the graph is acyclic and finite.
We call tasks that are not depended upon by any other task leaf tasks:
L
G
= {t T | t
d
T : (t, t
d
) / D
G
}.
An example of a simple task graph is shown in Figure 3.1. Tasks are represented as circles, data
objects as (rounded) rectangles and arcs as arrows. Task t
1
generates two data objects, which are
then used as inputs for four additional tasks. The outputs of these four tasks are then aggregated
by a final task t
6
. This could correspond e.g. to a workflow where t
1
generates some initial data,
tasks t
25
perform some computation on that data and t
6
then performs a final postprocessing step
and stores the results to disk.
t
1
o
1a
t
2
o
2
t
3
o
3
t
4
o
4
o
1b
t
5
o
5
t
6
Figure 3.1: Simple task graph with six tasks and six data objects
Note that the presented definition of a task graph does not describe its semantics, i.e. how will
the graph be created and executed or what will be the interactions between tasks and data objects.
This depends on the specific tool that will execute the task graph. A baseline formal definition of
task graph execution properties will be provided in Section 3.2.
A task is a serializable description of a computation that can be executed repeatedly. The
serializability property is crucial, as it allow us to treat computation as data. That is a powerful
concept, because it allows tasks to be sent between different nodes in a cluster, stored to disk and
also to be transparently recomputed an arbitrary number of times. Enabling the recomputation
of tasks is useful for achieving fault tolerance, as tasks might need to be recomputed later if some
failure occurs during their execution.
29
In practice, a single task will typically represent either the invocation a function (an executable
block of code) or the execution of a complete program. Multiple tasks in a task graph can refer to
the same function or program; each such task can have different inputs. In fact, this is a common
use-case, as task graphs are often used to parametrize a small set of functions or programs with
many different input parameters.
Even though we have defined the inputs and outputs of tasks as sets, in practice they are
usually stored using either ordered sequences or a mapping that associates a name with each input
or output, because it is important to maintain a specific ordering of both inputs and outputs.
For functions, the inputs are passed as arguments and the output is derived from its return value
(which can potentially form a sequence of values). Therefore, we have to be able to associate each
task input to a specific argument index. The same holds for tasks that execute programs. In this
case, inputs can be mapped to command-line arguments and the content of the standard input
stream, and the output can be e.g. the content of the standard output stream generated by the
executed program or a set of files that it writes to the filesystem.
Each task can define its resource requirements, a set of constraints that need to be satisfied by
a computational provider so that it can execute that task. As an example, a task that performs
training of a machine-learning model might require a GPU to be present on the computational
node where it will be executed. Other resources might include e.g. a specific number of CPU cores
or a minimum amount of RAM necessary to execute a given task.
A data object represents a dynamically computed result of a task; its value is not known at
the time of the task graph creation. Typically, it is a serialized blob of data that is eventually
transferred from the node where its producer was computed to the node where its consumer should
be executed. If a task programming model does not encode direct data transfers between tasks,
then data objects simply serve as “empty” markers of dependencies and they do not hold any actual
data. In that case, we could even remove them from the task graph completely and represent task
dependencies directly with arcs between tasks.
It is important to note that not all data used by tasks has to be encoded as a data object in
the task graph. As an example, tasks that represent function invocations are usually created by
the execution of some program (e.g. a Python script). A task graph defined in this way is usually
constructed with a specific set of input data for its source tasks. This data can be embedded
directly within the definition of the function itself; in that case it is not represented as an explicit
data object. In other words, a task might represent a serializable description of a computation
along with its input data. That is why in the presented formal definition, source tasks do not have
any explicit inputs; it is expected that input data is embedded directly within them.
Additionally, tasks can also read and modify the state of the environment in which they are
being executed, in a way that is observable by other tasks. For example, a function can read or
modify the value of a global variable, while a program can read an environment variable or create
a file on a disk, without it being specified as a task output. Such actions, which are usually called
side effects, are also typically not encoded within the task graph. Tasks should ideally contain
as few side effects as possible, because they can make task execution non-deterministic and cause
them to produce different outputs when executed multiple times, which is typically undesirable.
30
3.2 Task execution
Task graphs merely describe some computation; therefore, they have to be executed in order to
actually produce some outputs and results. This is the responsibility of a task runtime, a tool
that analyzes task graphs and executes them in some computational environment, e.g. a personal
computer or a distributed cluster. Such an environment contains a set of computational providers
that are able to execute tasks. We will label these providers as workers. A worker can execute a task
by invoking its computation description (typically by calling a function or executing a program),
while passing it the previously computed inputs of the task.
There are many existing task runtimes with varying architectures, features and trade-offs, which
affect factors like performance, fault tolerance or expressivity of the supported variant of the task-
based programming model. Several task runtimes will be discussed throughout this thesis. In the
rest of this section, we will consider a case typical for HPC environments; a distributed task runtime
with a central manager that communicates with a set of workers running on remote nodes that
communicate together via a network. We will also define a set of baseline properties and rules that
should be applicable to most task runtimes using this architecture, without necessarily going into
details of execution semantics that could differ across runtimes.
In general, a task runtime oversees all aspects of task graph execution. Its two main respon-
sibilities can be divided into managing communication with workers and handling the scheduling
and execution of tasks.
Worker management involves handling the lifetime of workers, facilitating data transfers between
them or providing resiliency in case of worker failures. A single worker is typically a program
running on a computational node, which is connected to the runtime through a network connection.
It receives commands from the task runtime, executes tasks and sends information about task
execution results back to the runtime. Each worker typically manages some hardware resources
that are available for tasks during their execution. Hardware resources can be assigned to workers
in various ways. There can be a single worker per the whole computational node or there could be
multiple workers per node, each managing a subset of the available resources (e.g. a single worker
per CPU core).
The second main aspect that has to be handled by the runtime is the management of tasks. It has
to keep track of which tasks have already been computed, which tasks are currently being executed
on some worker(s) or which tasks are ready to be executed next. Two important responsibilities in
this area are fault tolerance and scheduling.
We will define fault tolerance as the ability to gracefully handle various kinds of failures that can
happen during task graph execution, such as task or worker failures. When the execution of a task
fails with some error condition (e.g. because a worker executing the task crashes), a fault-tolerant
task runtime will be able to transparently restart it by launching a new execution of that task. We
will use the term task instance (or simply instance) for a specific execution of a task. Task runtimes
might impose limits on retrying failed tasks, e.g. by attempting to execute up to a fixed number of
task instances for each task before giving up, in order to avoid endless failure loops.
31
The fact that it is even possible to execute a task multiple times is one of the main advantages of
the task-based programming model, where tasks declaratively describe a self-contained computation
that can be re-executed arbitrarily many times. This crucial property of tasks makes fault-tolerant
execution of task graphs easier to achieve than in other programming models, where individual
computations are not self-contained and serializable.
Below, we provide several definitions related to task graph execution that should be applicable
to most existing task runtimes. First, we will formally define a computational environment (e.g. a
cluster), an environment in which a task graph can be executed.
[Definition 2] A computational environment is a tuple C = (W, RK
w
, Res
w
), where:
W is a set of workers (computational providers).
RK
w
is a set of resource kinds. Each resource kind describes some type of resource (e.g. a
CPU core or a GPU device) that can be provided a worker.
Note that in all following definitions, we will assume that the set of resource kinds of a
computational environment is equal to the set of resource kinds of a task graph computed in
that environment.
Res
w
: W × RK
w
N
0
is a function which defines how many resources are provided by a
worker for a specific resource kind.
Now we can describe the execution of a task graph. However, formally defining the behavior
of such a dynamic process is much more challenging than defining the previous (relatively static)
concepts, such as the task graph and the computational environment. Each task runtime has its
own set of execution semantics that affect the details of how are tasks assigned to workers, how they
are executed, how is data being transferred over the network, etc. Providing a formal definition of
this process that would be general and could be applied to multiple task runtimes would thus be
infeasible. On the other hand, it would be useful to have an option to examine if a concrete task
graph execution satisfied several constraints related to the dependencies and resource requirements
of tasks that most users would intuitively expect to hold.
Therefore, instead of defining the behavior of an execution itself, we assume that we have
available a set of information about an already performed execution, and we will then examine
if that execution has satisfied various properties. Note that in definitions related to task graph
execution, the set of non-negative real numbers (R
0
) will be used to represent points in time. For
conciseness, the term execution will also be used to denote a task graph execution in the rest of the
text.
[Definition 3] A task graph execution is a tuple E = (G, C, X), where:
G = (T, O, A, RK , Res
t
) forms a task graph.
C = (W, RK , Res
w
) forms a computational environment.
X : T × R
0
W {⊥} is a function that returns the worker that was currently executing
a specific task at a given point in time in E, or if that task was not being executed at the
given time.
32
Each task had to be executed at least once: t T, tp R
0
: X(t, tp) =
Each task had to eventually finish its computation:
t T, tp R
0
, tp
R
0
: tp
> tp X(t, tp
) =
Note that the definition above assumes that each task in E was being executed on at most a
single worker at the same time. It could be generalized for tasks that can be executed on multiple
workers at once; however, that would have to be performed in the context of a specific task runtime,
as the semantics of multi-node execution can vary significantly between runtimes.
It is also important to note that based on the definition of the X function provided above,
each task in E could have been started multiple times (in multiple instances), even on different
workers. This captures a basic form of fault tolerance. However, we assume that each task must
eventually successfully finish its execution. Additional semantics of fault-tolerant task execution are
not defined, because task runtimes handle task re-execution in different ways; it would be infeasible
to provide a general definition of task retry semantics.
Next, we will define three helper functions in the context of task graph execution
E = ((T, O, A, RK , R es
t
), (W, RK , Res
w
), X), which will be used in later definitions.
Let WX
E
: W × R
0
P(T ) be a function that returns the set of tasks that were currently
being executed on a given worker at a given point in time in E:
WX
E
(w, tp) = {t T | X(t, tp) = w}
Let S
E
: T R
0
be a function that returns the earliest point in time at which (any instance
of) a given task started its computation in E:
S
E
(t) = min
tpR
0
X(t, tp) =
Let F
E
: T R
0
be a function that returns the latest point in time at which (any instance
of) a given task finished its computation in E:
F
E
(t) = max
tpR
0
X(t, tp) =
Unless a task runtime has some special semantics, then each execution should uphold the
following three basic constraints, which ensure a reasonable behavior w.r.t. dependencies, task
resource requirements and worker resources:
[Definition 4] A dependency constraint in the context of task graph execution
E = (G, C, X), where G = (T, O, A, RK , Res
t
) is a task graph, is defined as follows:
t
1
T, t
2
T : (t
1
, t
2
) D
G
F
E
(t
1
) S
E
(t
2
)
Informally, this property states that if a task t
2
depends on a task t
1
, then it cannot begin
executing until t
1
has finished executing. This is a common interpretation of the dependence relation
between tasks that is enforced in most task runtimes. We assume that executions performed by all
task runtimes that will be further discussed in this thesis will always uphold this constraint.
[Definition 5] A worker resource constraint and a task resource constraint in the context of
task graph execution E = ((T, O, A, RK , Res
t
), (W, RK , Res
w
), X) are defined as follows:
tp R
0
, w W, r RK :
tWX
E
(w,tp)
Res
t
(t, r)
Res
w
(w, r)
33
This property both ensures that all resource requirements of all tasks are satisfied at any point
in time when these tasks are being executed and also that resources of workers are not being
oversubscribed.
3.3 Task scheduling
One of the most important responsibilities of a task runtime is task scheduling. It is the act of
deciding in which order and on which specific worker(s) should each task execute, in a way that
optimizes some key metric. We will use the term scheduler for a component of the task runtime
that is responsible for assigning tasks to workers by creating some form of a schedule.
In general terms, a schedule is a mapping that assigns tasks to specific workers that should
execute them and also assigns an order in which the tasks should be executed. However, as with
task graph execution, the semantics of scheduling and the structure of schedules depend on the
specific implementation of a task runtime. Schedules can be static, in which case they are produced
just once before the task graph begins executing, or dynamic, where the scheduler generates the
assignments on-the-fly, based on the current utilization of workers and the observed durations of
tasks that have already been executed. Some schedulers also retroactively modify already produced
schedules in reaction to dynamic situations that occur during task graph execution (e.g. if a new
worker connects to the cluster or if some worker is underloaded), while others might not use any
explicit schedules at all. Furthermore, the semantics of scheduling are tightly coupled to the
semantics of task graph execution in each specific task runtime, such as fault tolerance, resource
management, and other aspects. We will thus not provide a formal definition of schedules, as it
would necessarily have to choose a specific schedule structure that might not be applicable to all
task runtimes describes in this thesis.
What we can examine (and define) is some measure of the quality of a specific task graph
execution performed by a task runtime, which is typically affected by the behavior of its scheduler.
There are various metrics that a scheduler can optimize for, such as the latency to execute specific
critical tasks, but the most commonly used metric is makespan:
[Definition 6] The makespan M
E
of execution E = ((T, O, A, RK , Res
t
), C, X) is defined as
follows: M
E
= max
tT
(F
E
(t)) min
tT
(S
E
(t))
Informally, makespan is the duration between the time when the earliest task starts to be
executed to the completion of all tasks.
Task scheduling is so crucial because it has a profound effect on the efficiency of the whole
workflow execution. We can observe that in Figure 3.2, which shows two executions of a simple
task graph that demonstrate how a trivial change in the used schedule can severely affect the
resulting makespan. The figure contains a task graph with four tasks and two data objects. The
size of the circles is proportional to the execution duration of the tasks and the size of the rounded
rectangles is proportional to the size of the data objects.
Let us assume that we want to execute this task graph in a computational environment with
three workers (w
1
, w
2
, w
3
). Two different executions using different schedules are shown in the
34
t
1
d
1
d
2
t
2
t
3
t
4
Schedule S
1
: w
1
={t
1
, t
2
}, w
2
={t
3
}, w
3
={t
4
}
w
1
w
2
w
3
t
1
t
2
d
1
(w
1
)
t
3
d
2
(w
1
)
t
4
Makespan
Schedule S
2
: w
1
={t
1
, t
4
}, w
2
={t
3
}, w
3
={t
2
}
w
1
w
2
w
3
t
1
t
4
d
1
(w
1
)
t
3
d
1
(w
1
)
t
2
Figure 3.2: Task graph executed with two different schedules
figure. Schedule S
1
assigns tasks t
1
and t
2
to worker w
1
, task t
3
to worker w
2
and task t
4
to worker
w
3
, while schedule S
2
assigns tasks t
1
and t
4
to worker w
1
, task t
3
to worker w
2
and task t
2
to worker
w
3
. The two timelines show the execution of tasks (blue rectangles) and the network transfers of
data objects between workers (green rectangles) for each individual worker. It is clear that with
S
2
, the task graph will be computed faster than with S
1
, even though the only difference between
the two schedules is that the tasks t
2
and t
4
were swapped between workers w
1
and w
3
. Note that
the timeline assumes that a worker can overlap the computation of a task with the transfer a data
object to another worker over the network, which is commonly supported by existing task runtimes.
Optimal scheduling of tasks to workers is an NP-hard [69] problem even for the most basic
scenarios, when the exact execution duration of each task is known, and even if we do not consider
the duration of transferring data between workers over a network. Task runtimes thus resort to
various heuristics tailored to their users’ needs. Some classic task scheduling heuristics and their
comparisons can be found in [1, 70, 71, 67, 68]. Chapter 5 provides a comprehensive survey of
various task scheduling algorithms.
Scheduling heuristics have to take many factors into consideration when deciding on which
worker should a task be executed:
Resource requirements The scheduler should respect all resource requirements specified by
tasks. The runtime thus has to observe the dynamically changing available resources of each worker
and schedule tasks accordingly, to uphold their requirements. This can be challenging especially in
the presence of complex resource requirements.
Data transfer cost If the runtime operates within a distributed cluster, one of the most important
scheduling aspects that it needs to consider is the transfer cost of data between workers over the
network. All benefits gained by computing a task on another worker to achieve more parallelization
might be lost if it takes too much time to send the data (task outputs) to that worker.
35
The scheduler thus has to carefully balance the communication-to-computation ratio, based on
the available network bandwidth, sizes of outputs produced by tasks and the current utilization of
workers.
Scheduling overhead The overhead of generating the schedule itself also cannot be underes-
timated. As was already stated, computing an optimal solution quickly is infeasible, but even
heuristical approaches can have wildly different performance characteristics. Producing a lower
quality schedule sooner, rather than a higher quality schedule later, can be sometimes beneficial.
Summary
This chapter has provided a general definition of the most important terms related to task-based
programming models that will be used throughout this chapter. It has introduced the notion of
task graphs, tasks, data objects, resource requirements, workers, task runtimes and task scheduling.
The following chapter will focus on describing what challenges are faced by users and task
runtimes when they execute task graphs on HPC clusters.
36
Chapter 4
State of the Art
In order to design approaches for seamless execution of task graphs on supercomputers, it is first
necessary to examine the current limitations and issues faced by users that want to use task graphs
in an HPC setting. This chapter describes several challenges in this area, which belong to the two
broad categories that form the main focus of this thesis, namely efficiency (how to achieve high
hardware utilization and make task graph execution scalable) and ergonomics (how to make it easy
for users to define and execute task graphs). Most of these challenges stem from the sheer scale of
HPC workloads and clusters and from the complexity and idiosyncrasies of supercomputers, which
have historically been designed for different kinds of programming models.
Execution ergonomics (in the context of task-based programming models) is a broad area that
encompasses several aspects, such as providing an easy way to define the structure of the task graph,
allowing its execution in an effortless manner (both on a local computer and a distributed cluster),
handling fault tolerance without the user’s intervention, allowing users to express complex resource
requirements and many others. In particular, we will focus on identifying ergonomic challenges
that form a barrier for executing task graphs on HPC clusters and that limit efficient usage of
heterogeneous hardware resources.
We will also discuss how are existing task runtimes able to deal with these challenges and what
approaches do they use. This is important, because in order to achieve a seamless task graph
execution experience on supercomputers, one should use a task runtime that takes these HPC
peculiarities in mind; using off-the-shelf tools that were not designed for HPC use-cases might be
challenging.
There is a large body of tools designed for executing batch-oriented task graphs on diverse com-
puting platforms, ranging from consumer-grade laptops, through cloud deployments, to distributed
and HPC clusters. They are known under various terms, such as task executors, job managers,
distributed job schedulers, dataflow engines or orchestrators. We will use the term task runtime for
all such task execution tools in this thesis, as has already been discussed in the previous chapters.
These runtimes are quite popular, and they are being used for computing all kinds of scientific
workflows on HPC clusters [72, 73, 74, 18].
Examples of such task runtimes include e.g. Dask [43], Parsl [75], Ray [76], PyCOMPSs [77],
HyperLoom [78], GNU parallel [79], SnakeMake [44], Merlin [80], AutoSubmit [81] or
37
FireWorks [82]. Each task runtime defines its own instance of a task-based programming model,
and has a different set of trade-offs in areas such as performance and scalability, fault tolerance,
ease-of-use, ease-of-deployment and others.
Various representatives of task runtimes will be described in this chapter in the context of the
individual task graph execution challenges. Chapter 6 will then provide a detailed analysis of Dask,
a state-of-the-art task runtime, and Section 7.6 will compare several existing task runtimes with
HyperQueue, an HPC-optimized task runtime that will be the main topic of Chapter 7.
Below you can find a list of the most important identified challenges; they will be described in
detail in the rest of this chapter.
Allocation manager Any computation that is to be executed on an HPC cluster shared by many
users typically has to go through a queue managed by an allocation manager, which provides access
to the cluster through transient hardware allocations. This complicates the execution of task graphs
due to the need to assign tasks to individual allocations in order to maximize hardware utilization
and overcome various limits. Ideally, task runtimes should offer a way to automate this process.
Cluster heterogeneity Modern HPC clusters are very heterogeneous and might contain various
kinds of accelerator devices. It is important for a task runtime to offer complex support for arbitrary
resource requirements in order to take advantage of complex HPC hardware and achieve high
utilization of hardware.
Performance and scalability The large scale of HPC computations and the vast amount of
available hardware resources introduce unique performance challenges. Task runtimes should have
minimal overhead in order to efficiently execute even very large task graphs.
Fault tolerance The scale of HPC task graphs and clusters and the transient nature of allocations
makes failures an ordinary occurrence, rather than a rare edge case. This necessitates special
considerations for the design of task runtimes.
Multi-node tasks Many HPC applications are designed to run on multiple nodes in parallel. This
is an uncommon requirement for most task-based programming models; it requires special support
to make this use-case a first-class concept.
Deployment Supercomputers typically provide a severely locked-down user environment, where
it can be quite challenging to build and deploy software that requires non-trivial build or runtime
dependencies. Task runtimes should thus be trivial to deploy in order to facilitate their usage on
HPC clusters.
Programming model HPC task graphs can be very diverse, and range from structurally simple
task graphs that contain a large number of tasks to very heterogeneous workflows with complex
dependencies. Task runtimes should ideally provide a programming model that is able to support
diverse use-cases in an ergonomic way.
38
4.1 Allocation manager
Users of HPC clusters are not typically allowed to directly perform arbitrary computations on
the computational nodes (machines designed to perform expensive computations) of the cluster.
Instead, they connect to machines that are usually called login nodes, from which they have to
enqueue their desired computation into a queue handled by a submission system that manages the
hardware resources of the cluster. We will use the term allocation manager for these submission
systems and the term allocation
1
for a computational request submitted by a user into these
managers.
Allocation managers are required for providing fair access to the resources of the cluster, because
HPC clusters are typically used by many people at the same time. Without a centralized man-
agement, hardware resources could be inadvertently shared among multiple users at once, which
could have undesirable performance and security implications, and could lead to oversubscription.
Furthermore, usage of these clusters is usually metered. Users can typically only use a certain
amount of resources assigned to their computational project, and when their resources run out,
they have to ask (or pay) for more resources. Allocation managers thus also implement user and
project accounting, so that there is a clear historical record of how many resources were consumed
by individual users of the cluster.
The majority of HPC clusters [83] use one of the two most popular allocation manager imple-
mentations, either Slurm [84] or PBS (Portable Batch System) [85]
2
. For simplicity, Slurm will be
used as a default representative of allocation managers in the rest of this thesis (unless otherwise
noted), since it shares essentially all the important described constraints with PBS.
The following process describes how computations are typically executed on HPC clusters that
use an allocation manager:
1. The user enqueues a computational request (allocation) into the manager from a login node.
The request typically has to specify at least how many nodes should be allocated and the
maximum duration of the computation (usually labeled as wall-time), after which the compu-
tation will be forcibly stopped by the manager. It can also contain additional configuration,
such as what kinds of nodes should be allocated or what is the priority of the request.
2. The allocation manager puts the request into a queue and schedules it to be executed at some
time in the future. Since users submit their allocations into the manager continuously, each
allocation has different properties and priorities, and it is not possible to precisely predict
for how long an allocation will run, the schedule can be very dynamic and unpredictable, so
users might have to wait seconds, minutes, hours or even days before their allocation starts
to execute.
3. Once the allocation gets to the front of the queue and there are enough resources available, the
manager provisions the requested amount of hardware resources (typically a number of whole
computational nodes) and either executes a script (a batch allocation) or provides the user
with an interactive terminal session on one of the allocated nodes (an interactive allocation).
1
The term job is also commonly used for the concept of HPC computational requests.
2
Or some of its many derivatives, such as TORQUE [86] or OpenPBS [87]
39
Allocations are often configured in a way that provides their authors with exclusive access
to the allocated hardware resources, in which case no other user will be able to use these
resources (nodes) until the allocation finishes.
4. Once the executed script finishes (or the wall-time duration is reached), the allocation ends,
and its hardware resources are released so that they can be used by another allocation.
Although it might not be obvious from the above description at first, the presence of an alloca-
tion manager presents perhaps the largest obstacle for ergonomic execution of task graphs on HPC
clusters. Instead of executing their task graphs directly on the cluster, users first have to think
about how to split their task graph into separate allocations and manage their submission and
execution. While this is true of any computation executed on HPC clusters in general, in the case
of task graphs it is especially difficult due to their complex structure, and also because the concept
of allocations was historically created with different programming models in mind. Mapping task
workflows to HPC allocations can thus be non-trivial [88, 89].
To execute a task graph using an allocation manager, it is desirable to find a way to map tasks
to allocations in a way that efficiently utilizes HPC resources and is as transparent as possible
for the users. There are several approaches that can be used for performing this task-to-allocation
assignment. In order to understand the various trade-offs and constraints being involved (which are
mostly caused by various performance bottlenecks and limitations of existing allocation managers),
we will first describe three straightforward methods for mapping tasks to allocations, which can
be used even without any special support from a task runtime. After that, we will examine more
automated approaches taken by existing task runtimes and other research works.
Single allocation per task graph
At first, it might seem that executing a task graph within a single allocation is a simple solution,
since all the user has to do is submit an allocation that will eventually execute the complete task
graph (using some task runtime), which is a similar approach that would be used for executing the
task graph e.g. on a personal computer.
And it is indeed simple when it is possible at all. The problem is that allocation managers
tend to place fairly strict limits on the maximum possible execution duration of an allocation (the
wall-time) and also on the number of resources (nodes) that can be requested at once, to ensure
a fairer assignment of resources to users of a cluster. Therefore, when a task graph has too many
tasks, takes too long to execute, or the user wants to leverage more computational resources than
can fit in a single allocation, this approach will not work.
In fact, if the task graph computation is short enough that it can fit within a single allocation,
it might not always even make sense to use an HPC cluster to compute it. A more realistic scenario
is that even if an individual task graph can be executed quickly, users might want to execute
many such task graphs (for example to execute many experiments with different parametrizations).
This situation can be seen as a special case of a large task graph that consists of many disjoint
components (smaller task subgraphs). In this case, it will again typically not be possible to execute
all such task graphs inside a single allocation.
40
Even when a task graph can be reasonably executed within a single allocation, this approach
might lead to hardware underutilization and resource waste [90]. Consider a typical data analysis
or a machine learning workflow that works in several stages. First, it loads a dataset from disk and
preprocesses it, then it trains a machine-learning model, and finally it performs some postprocessing
step, which e.g. analyzes the resulting model. The preprocessing and postprocessing steps are
usually not very resource intensive and run on CPUs only, while the training step typically consumes
a lot of resources and is usually executed on a GPU accelerator. To execute such a workflow in a
single allocation, we will need to ask for a set of resources that contain such an accelerator. The
issue is that all resources of an allocation are reserved for its whole duration; therefore, we will have
to pay the price for the expensive accelerated nodes (and prevent others from using them) even
during the execution of workflow steps in which these nodes will be underutilized or completely
idle. This is caused by the fact that all allocated resources are tied to the lifetime of the whole
allocation, and using this approach we have to ask for a set of resources that form a union of the
resource requirements of all tasks of the workflow.
One additional disadvantage of this approach is queuing latency. Large allocations that require
a long wall-time duration or request many computational nodes typically spend a much longer time
in the allocation manager queue, because it is more difficult for the manager to make sure that all
the requested resources are available at the same time. As a concrete example, ten allocations that
require a one hour wall-time can potentially be allocated much quicker than a single allocation that
requests a ten hour wall-time, because these allocations do not need to run at the same time, and
are thus easier for the manager to schedule. Similar behavior can be observed w.r.t. the number of
requested nodes.
This approach can be used with essentially all existing task runtimes, since it does not require
any special support from the runtime. The user simply submits a single allocation, and when it
starts, the whole task graph is computed with a task runtime all in one go. This method is thus
relatively simple for the user, although as has been mentioned, it might not be feasible for many
use-cases, and it can lead to resource waste and a long latency before receiving the first results of
finished tasks.
Separate allocation for each task
The previous approach mapped all tasks to a single allocation. An opposite extreme would be
to map each task to a separate allocation. This approach might seem intuitive, because from a
certain point of view, HPC allocation managers can also be viewed as task runtimes, if we consider
allocations to be tasks; therefore, it might be tempting to treat them as such. Both PBS and
Slurm even support a crude notion of dependencies between allocations, which could allow users to
express computational graphs. Indeed, in an ideal world, there would be no difference between an
allocation manager and a task runtime, and users could simply construct an arbitrarily granular
task graph and execute it simply by submitting it directly into the allocation manager.
However, in practice, this approach is mostly infeasible, at least with the currently popular
allocation managers (PBS and Slurm), because they introduce a non-trivial amount of overhead
per each allocation [91]. In a best case scenario, Slurm is able to launch a few hundred allocations
41
per second [92]. However, more realistically, users on a crowded cluster might experience at least a
few hundred milliseconds overhead per each allocation, if not more, which is order(s) of magnitude
more than the overhead of an efficient task runtime [2].
Even though there is still probably room for reducing the overhead of contemporary HPC
allocation managers, it is important to note that some of the performance limitations are inherent.
Allocation managers have to (among other things) provide accurate accounting, handle robust and
secure provisioning and cleanup of hardware resources provided to allocations, manage user and
process isolation on computational nodes and ensure user fairness. These responsibilities are out
of scope for most task runtimes, and thus it is not surprising that they can usually achieve much
higher performance. While in theory, it could be possible to design an allocation manager that can
also act as a (performant) task runtime at the same time (some efforts have been made on this
front e.g. by Flux [93], which will be described later below), it is probably infeasible to modify the
two most prominent allocation managers that are used almost ubiquitously in the current HPC
ecosystem to support this use-case.
Due to this overhead, allocation managers usually limit the number of allocations that can be
enqueued by a single user at any given time (e.g. to a few hundred or a thousand), to ensure that
the manager is not overloaded and that it can continue to serve requests from other users of the
cluster. Therefore, unless the task graph is relatively small, it will most likely be infeasible to create
a separate allocation for each task.
It should be noted that both Slurm and PBS allow partially overcoming this limitation through
allocation arrays (also labeled as job arrays). This concept allows users to submit a large amount of
computations with the same shape and properties in a single allocation. However, it still has several
disadvantages. One is a lack of flexibility; it does not easily allow submitting heterogeneous tasks
with different resource requirements, and it also has only a very crude support for dependencies
between the submitted tasks. Another disadvantage is fault tolerance; if some of the tasks of the
array fail, users have to manually identify and resubmit them in another allocation, which is far
from effortless. Furthermore, clusters impose limits for allocation arrays as well, and thus even the
array approach might not be enough to encompass all tasks of massive task graphs.
Apart from the maximum allocation count, there are other limitations that can be imposed
by allocation managers. Some tasks of scientific workflows can be granular, and require only few
resources, e.g. a single CPU core. To avoid wasting resources, an allocation mapped to such a task
should thus ask only for the exact set of resources needed by it. However, allocation managers are
sometimes configured in a way that only offers node-level granularity of hardware resources, and
thus does not allow users to request less than a whole computational node [94]. In these cases, it
would be wasteful if we had to request a whole computational node e.g. for a simple task that runs
for just a couple of seconds and requires only a couple of cores.
There are two primary reasons why allocation managers limit the granularity of allocations.
First, reducing the granularity of allocations lessens the overall overhead. If users are able to ask
e.g. for individual cores of each node, the manager would have to manage many more allocations
than when users ask for complete nodes, which could quickly become unmanageable. The second
reason is security. If the manager provides the ability to reserve only a fraction of a node, allocations
42
from multiple users can execute on the same node at the same time. This reduces isolation between
users, which might potentially have security implications. It can also affect performance, since
concurrently running allocations might compete for some shared resource on the same node (e.g.
the operating system process scheduler). Since each node runs a single instance of an operating
system, it forms a natural isolation domain, and it is thus also frequently used as a unit of granularity
for allocations.
Another reason why using an allocation manager directly as a task runtime might be impractical
is that it makes debugging and prototyping task graphs more difficult. It is useful to have the ability
to examine the execution of a workflow locally, e.g. on the user’s personal computer, before running
it on a cluster. However, if the task graph were implemented directly in an allocation manager,
users would have to deploy tools like PBS or Slurm locally in order to debug their workflows, which
could be challenging.
To summarize, using an allocation manager directly as a task runtime is currently mostly
impractical, primarily because of the overhead associated with each allocation and the resulting
difference in task and allocation granularity, which can lead to resource waste. Therefore, users
who want to execute a task graph on an HPC system usually use a separate task runtime rather
than defining task graphs using the allocation manager directly.
Task graph partitioning
Both of the mentioned extreme approaches have severe disadvantages. A compromise between them
is to partition the task graph into smaller subgraphs and submit each subgraph as a separate allo-
cation. This approach allows mapping a large number of tasks into a smaller number of allocations,
and thus amortize the allocation overhead. Most users will probably sooner or later converge to a
similar approach, once their task graph becomes sufficiently large and complex, and they will have
to reconcile the coarse-grained nature of allocations with the fine-grained nature of tasks.
This process is far from straightforward if it is performed manually, i.e. if the user has to manu-
ally split the task graph before submitting the allocations, and then start an independent instance
of a task runtime inside each allocation. Not just because an optimal graph partitioning is itself a
notoriously difficult NP-hard [95] problem in general, but also because it requires implementation
efforts outside the boundaries of the used task-based programming model. In other words, users
might need to reimplement some parts of a task runtime due to the fact that a part of the task
graph execution happens inside allocations and a part happens outside allocations.
For example, the intermediate outputs of computed tasks of a partitioned subgraph might
have to be persisted to a filesystem before the corresponding allocation ends, and the results from
multiple allocations then have to be merged together to produce all the necessary data outputs of
the complete task graph. Another issue is that if some tasks fail and their corresponding allocation
ends, the task runtime does not get a chance to recompute them. Users will thus need to identify
such failed tasks and then selectively resubmit them into new allocations.
43
Meta-scheduling
While the task graph partitioning approach is a general solution to the problem of running task
graphs using allocation managers, it is fairly cumbersome when it has to be performed by users
manually. There are various task runtimes that can help to (either partially or fully) automate
this process. We will use the term meta-schedulers for these task runtimes in this thesis. These
tools typically operate on a level above the allocation manager, instead of only running within
the context of a single allocation. They are thus able to manage task graphs that span multiple
allocations.
Existing task runtimes use various approaches for providing some form of meta-scheduling.
Tools such as GNU parallel [79] can automatically map each task to a separate allocation. While
this can be useful for running simple task graphs on supercomputers, it does not help overcoming
allocation manager submission limits. HyperShell [96] improves upon this by enabling users to
specify a task bundle size parameter, which batches the configured number of tasks together in the
same allocation. While this method can help amortize the cost of allocations for large task graphs,
it is mostly only useful for task graphs without dependencies, for which it is possible to express
the partitioning using a single number. It can be used by HyperShell since it does not support
dependencies between tasks.
SnakeMake [44] allows its users to explicitly assign each task to a group. Tasks with the
same group are then computed in the same allocation. While this approach is more flexible than
using a single batching parameter, it requires users to perform the partitioning manually, which
is quite challenging to do well in order to achieve good node utilization. Without using groups,
SnakeMake executes each task in a separate allocation, which suffers from the already mentioned
disadvantages, and is even actively discouraged on some HPC clusters [97].
An improved partitioning process has been described in [98]. It partitions task graphs based
on their structure, by splitting them into levels that contain subgraphs of tasks that are somehow
related. A similar approach is used by task runtimes such as Pegasus [18] or AutoSubmit [81].
For example, a “horizontal level” partitioning groups tasks that do not have dependencies between
each other and that can run in parallel, while a “vertical level” partitioning groups tasks that
form a dependency chain. While this approach makes partitioning easier for users, they still need
to decide which partitioning mode should be used for specific parts of the graph. Furthermore,
a significant disadvantage is that the partitioning is performed statically, before the task graph
is executed. This can lead to suboptimal hardware utilization, because it is not possible to load
balance tasks across different allocations. It can also lead to resource waste, because allocations
created to execute a group of tasks will typically be configured so that they ask for a sum of all the
resource requirements of tasks within that group.
E-HPC [90] employs a similar approach, where it splits individual stages of workflows into
separate allocations that contain the ideal amount of resources for that stage. To tackle the issue
of allocations that reserve too many hardware resources, it allows these stages to dynamically
modify their resource requirements through the use of checkpointing. When a stage dynamically
asks for a different set of resources, it is checkpointed, moved to a different allocation and then
restarted. While this approach does alleviate some problems with resource waste, it is relatively
44
inflexible; workflows have to be manually separated into individual stages with coarse-grained
resource requirements, and these stages are still submitted as individual allocations rather than
being load balanced across different allocations.
To achieve a better utilization of computational nodes, some task runtimes allow users to deploy
a generic computational provider (a worker) in an allocation, which is then able to execute any
task (based on its resource requirements), instead of directly submitting tasks into allocations. This
enables the task runtime to dynamically assign tasks to workers even across different allocations,
which helps improve the achieved hardware utilization. It also makes the partitioning process much
easier, because users simply have to spawn a set of workers and submit tasks, and the task runtime
then takes care of the rest.
Task runtimes such as Merlin [80] or Parsl [75] support this dynamic meta-scheduling ap-
proach, although they require users to preconfigure each worker for a specific subset of resource
requirements. For example, a worker might be configured for executing tasks that require a specific
number of cores, or tasks that require to be executed on multiple nodes etc. This can be limiting for
very heterogeneous task graphs or clusters; the task scheduler cannot make full use of all available
resources because it has to adhere to worker configurations that were predetermined by the user.
Other task runtimes, such as Balsam [99] or FireWorks [82], support a more flexible ap-
proach, where each worker can execute any task whose resource requirement can be fulfilled by the
resources assigned to that worker. This enables fully dynamic load balancing across allocations and
all available hardware resources.
An additional important feature that simplifies submission of workers is some form of an auto-
matic allocation system. As an example, Balsam or Dask [43] with the Dask JobQueue [100]
plugin are able to automatically submit allocations on behalf of the user in response to computa-
tional load. These allocations then start a generic worker which can immediately start executing
tasks. This removes an additional manual step that would otherwise have to be performed by users.
Alternatives
Apart from meta-scheduling, there are also alternative approaches to resolving limits of allocation
managers. Some works examine what it would take to make existing allocation managers more
workflow friendly. In [89], a modification to the Slurm allocation manager is proposed, which makes
it workflow-aware and adds support for fine-grained resource management to it. This approach
splits tasks of a workflow submitted within a single allocation into a larger number of allocations,
and assigns them corresponding priorities and resources based on their requirements. While this
improves certain scheduling aspects of Slurm, it still creates a separate allocation for each task,
which does not remove the overhead associated with allocations. This method also assumes prior
knowledge of the workflow structure; therefore, it is not possible to easily dynamically add tasks
or computational resources while the workflow is being executed.
Another approach is to create a new kind of an allocation manager designed with tasks and work-
flows in mind. An example could be the Flux [93] HPC scheduler, which serves as a re-imagination
of a modern allocation manager. It treats the whole cluster as a unified pool of resources and al-
lows submitting computations that make use of these resources with high granularity, down to the
45
level of individual cores. Flux also provides dynamic management of non-computational resources
such as storage and power, and even takes them into account during scheduling, for example by
avoiding I/O (Input/Output) intensive computations when not enough I/O bandwidth is currently
available. Furthermore, it enables defining generic resources, unlike traditional allocation managers
that can only manage a small set of known resources, such as CPU cores or memory. Providing
this kind of flexibility and granularity puts a lot of pressure on the system. Flux manages it by
using a distributed and hierarchical scheduling design, where each allocation can act as a separate
Flux instance that can then recursively subdivide its resources into additional nested allocations,
and even dynamically ask for reducing or increasing its set of resources.
While modifying allocation managers to remove some of their workflow handling issues or cre-
ating new allocation managers are viable approaches, it should be noted that Slurm and PBS are
currently dominating the most powerful supercomputers [83], and that replacing (or even heavily
modifying) the allocation manager of an HPC cluster is a very difficult process, which requires a
lot of implementation work, configuration tuning, documentation rewriting and also retraining the
cluster administrators, and more importantly its users. Furthermore, using the allocation manager
directly as a task runtime has a disadvantage that was already mentioned; it might not be so easy
to run the workflows on personal computers or CI (Continuous Integration) servers, which limits
the ability to prototype and test them.
A different approach that is designed to make it easier to use HPC clusters is taken by tools
and frameworks such as cumulus [101], Open OnDemand [102], HEAppE [103] or Lexis [104].
These aim to simplify usage of clusters, and thus also remove the need for users to interact with
allocations manually, by providing high-level web interfaces for managing computations. They
focus on user authorization and authentication, data management, provisioning of resources across
different clusters and also partially on (coarse-grained) workflow execution. This approach can
make it easier for users that do not have experience with the Linux operating system or interacting
with the terminal to leverage HPC resources. However, it does not resolve the limitations of
allocation managers by itself, it simply moves the problem to a different place, and forces the
high-level computational framework to deal with it. They can do that by leveraging task runtimes
which are able to perform automatic meta-scheduling. For example, HEAppE provides support
for integration with the HyperQueue [105] task runtime, which will be described in Chapter 7.
4.2 Cluster heterogeneity
Even though task graphs are designed to be portable and ideally should not depend on any specific
computational environment, for certain types of tasks, we need to be able to describe at least some
generic computational environment constraints. For example, when a task executes a program that
leverages the CUDA programming framework [15], which is designed to be executed on an NVIDIA
graphics accelerator, it has to be executed on a node that has such a GPU available, otherwise it
will not work properly.
It should thus be possible for a task to define resource requirements, which specify resources that
have to be provided by an environment that will execute such a task. For example, a requirement
46
could be the number of cores (some tasks can use only a single core, some can be multi-threaded),
the amount of available main memory, a minimum duration required to execute the task or (either
optional or required) presence of an accelerator like a GPU or an FPGA. In order to remain portable
and independent of a specific computational environment, these requirements should be abstract
and describe general, rather than specific, kinds of resources.
The challenge related to resource requirements of HPC tasks specifically is the diverse kinds
of hardware present in modern HPC clusters, which have started to become increasingly hetero-
geneous in recent years. This trend can be clearly seen in the TOP500 list of the most powerful
supercomputers [106]. Individual cluster nodes contain varying amounts and types of cores and
sockets, main memory, NUMA nodes or accelerators like GPUs or FPGAs. Since HPC software is
often designed to leverage these modern HPC hardware features, this complexity is also propagated
to tasks and their resource requirements.
Existing task runtimes have various levels of support for resource requirements. Most task
runtimes allow configuring at least a known set of resources per task, most often the number of
CPU cores, amount of RAM memory or the number of GPU accelerators. This is enough for simple
use-cases, but not for heterogeneous clusters with e.g. FPGA devices or multiple kinds of GPUs
from different vendors. Some tools, such as Dask or SnakeMake, allow users to define their own
resource kinds, which can be used to describe completely arbitrary kinds of resources. However,
they do not support more complex use-cases, such as specifying that a task requires only a fraction
of a resource, that some resources can have relations between them (for example cores residing in
the same NUMA node) or that a task could support multiple sets of resource requirements.
A resource requirement that is fairly specific to HPC systems is the usage of multiple nodes
per single task. This requirement is necessary for programs that are designed to be executed in a
distributed fashion, such as programs using MPI, which are quite common in the HPC world. The
use-case of tasks using multiple nodes is discussed in more detail later in Section 4.5.
4.3 Performance and scalability
The massive scale of HPC hardware (node count, core count, network interconnect bandwidth)
opens up opportunities for executing large-scale task graphs, but that in turn presents unique
challenges for task runtimes. Below you can find several examples of bottlenecks that might not
matter in a small computational scale, but that can become problematic in the context of HPC-scale
task graphs.
Scheduling Task scheduling is one of the most important responsibilities of a task runtime, and
with large task graphs, it can become a serious performance bottleneck. Existing task runtimes, such
as Dask, can have problems with keeping up with the scale of HPC task graphs. The scheduling
performance of various task scheduling algorithms will be examined in detail in Chapter 5.
Runtime overhead Using a task runtime to execute a task graph will necessarily introduce some
amount of overhead, caused by scheduling, network communication and other actions performed
by the task runtime. Task runtimes should be mindful of the overhead they add, because it can
47
severely affect the duration needed to execute a task graph. For example, even with an overhead
of just 1 ms per task, executing a task graph with a million tasks would result in total accumulated
overhead of more than fifteen minutes.
Many popular task runtimes, e.g. Dask, Parsl, Balsam, AutoSubmit or SnakeMake are
implemented in the Python programming language. While Python provides a lot of benefits in
terms of an ergonomic interface for defining task graphs, using it for the actual implementation of
scheduling, network communication and other performance-critical parts of the task runtime can
lead to severe bottlenecks, as Python programs are infamously difficult to optimize well.
This aspect will be examined in detail in Chapter 6, where we study the performance and overhead
of Dask, a state-of-the-art task runtime implemented of Python.
Architecture A typical architecture of a task runtime consists of a centralized component that
handles task scheduling, and a set of workers that connect to it over the network and receive task
assignments. Even with a central server, the task runtime can achieve good performance, as we will
show in Chapters 6 and 7. However, the performance of the server cannot be increased endlessly,
and at some point, a centralized architecture will become a bottleneck. Even if the workers exchange
data outputs directly between themselves, the central component might become overloaded simply
by coordinating a vast number of workers.
In that case, a decentralized architecture could be leveraged to avoid the reliance on a central
component. Such a decentralized architecture can be found e.g. in Ray [76]. However, to realize
the gains of a decentralized architecture, task submission itself has to be decentralized in some way,
which might not be a natural fit for common task graph workflows. If all tasks are generated from
a single location, the bottleneck will most likely remain even in an otherwise fully decentralized
system.
Communication overhead Scaling the number of tasks and workers will necessarily put a lot of
pressure on the communication network, both in terms of bandwidth (sending large task outputs
between nodes) and latency (sending small management messages between the scheduler and the
workers). Using HPC technologies, such as MPI or a lower-level interface like RDMA, could provide
a non-trivial performance boost in this regard. Some existing runtimes, such as Dask, can make
use of such technologies [107].
As we have demonstrated in [108, 109], in-network computing can be also used to optimize various
networking applications by offloading some computations to an accelerated NIC (Network Interface
Controller). This approach could also be leveraged in task runtimes, for example to reduce the
latency of management messages between the scheduler and workers or to increase the bandwidth
of large data exchanges among workers, by moving these operations directly onto the network card.
This line of research is not pursued in this thesis, although it could serve as a future research
direction.
Task graph materialization Large computations might require building massive task graphs
that contain millions of tasks. The task graphs are typically defined and built outside of compu-
tational nodes, e.g. on login nodes of computing clusters or on client devices (e.g. laptops), whose
48
performance can be limited. It can take a lot of time to build, serialize and transfer such graphs
over the network to the task runtime that runs on powerful computational nodes. This can create
a bottleneck even before any task is executed; it has been identified as an issue in existing task
runtimes [110].
In such case, it can be beneficial to provide an API for defining task graphs in a symbolic way
that could represent a potentially large group of similar tasks with a compressed representation
to reduce the amount of consumed memory. For example, if we want to express a thousand tasks
that all share the same configuration, and differ e.g. only in an input file that they work with, we
could represent this as a group of thousand similar tasks, rather than storing a thousand individual
task instances in memory. This could be seen as an example of the classical Flyweight design
pattern [111].
Such symbolic graphs could then be sent to the runtime in a compressed form and re-materialized
only at the last possible moment. In an extreme form, the runtime could operate on such graphs
in a fully symbolic way, without ever materializing them. Dask is an example of a task runtime
that supports such symbolic task graph representations.
Data transfers After a task is computed, it can produce various outputs, such as standard error
or output streams, files created on a filesystem or data objects that are then passed as inputs to
dependent tasks. Managing these data streams can be a bottleneck if the number of tasks is large.
Some task runtimes, such as SnakeMake, store all task outputs on the filesystem, since it is
relatively simple to implement and it provides support for basic data resiliency out-of-the-box. A
lot of existing software (that might be executed by a task) also makes liberal use of the filesystem,
which can make it challenging to avoid filesystem access altogether. However, HPC nodes might
not contain any local disks. Instead, they tend to use shared filesystems accessed over a network.
While this can be seen as an advantage, since with a shared filesystem it is much easier to share task
outputs among different workers, it can also be a severe bottleneck. Shared networked filesystems
can suffer from high latency and accessing them can consume precious network bandwidth that is
also used e.g. for managing computation (sending commands to workers) or for direct worker-to-
worker data exchange. Furthermore, data produced in HPC computations can be large, and thus
storing it to a disk can be a bottleneck even without considering networked filesystems.
These bottlenecks can be alleviated by transferring data directly over the network (preferably
without accessing the filesystem in the fast path). Direct network transfer of task outputs between
workers is implemented e.g. by Dask. Some runtimes (such as HyperLoom [78]) also leverage
RAM disks, which provide support for tasks that need to interact with a filesystem while avoiding
the performance bottlenecks associated with disk accesses. Some task runtimes, such as Hyper-
Shell or Pegasus, allow streaming standard output and error streams over the network to a
centralized location to reduce filesystem pressure. For all of these approaches, it is also possi-
ble to use HPC-specific technologies, such as InfiniBand [112] or MPI, to improve data transfer
performance by fully exploiting the incredibly fast interconnects available in modern HPC clusters.
49
4.4 Fault tolerance
Fault tolerance is relevant in all distributed computational environments, but HPC systems have
specific requirements in this regard. As was already mentioned, computational resources on HPC
clusters are provided through allocation managers. Computational nodes allocated by these man-
agers are provided only for a limited duration, which means that for long-running computations,
some nodes will disconnect and new nodes might appear dynamically during the lifecycle of the
executed workflow. Furthermore, since the allocations go through a queue, it can take some time
before new computational resources are available. Therefore, the computation can remain in a
paused state, where no tasks are being executed, for potentially long periods of time.
It is important for task runtimes to be prepared for these situations; they should handle node
disconnections gracefully even if a task is being executed on a node that suddenly disconnects, and
they should be able to restart previously interrupted tasks on newly arrived workers. In general, in
HPC scenarios, worker instability and frequent disconnects should be considered the normal mode
of operation rather than just a rare edge case.
Tasks are usually considered to be atomic from the perspective of the runtime, i.e. they either
execute completely (and successfully), or they fail, in which case they might be restarted from
scratch. Task granularity thus plays an important role here, since when a task is large, then a lot
of work might have to be redone if it fails and is re-executed. Some runtimes try to alleviate this
cost by leveraging task checkpointing [113], which is able to persist the computational state of a
task during its execution, and then restore it in case of a failure, thus avoiding the need to start
from the beginning.
Fault tolerance can be challenging in the presence data transfers between dependent tasks.
When a task requires inputs from its dependency, the runtime might have to store them (either in
memory or in a serialized format on disk) even after the task has started executing. Because there
is always a possibility that it will have to restart the task in case of a failure, and thus it needs to
hold on to its inputs. In some cases, it can actually be a better trade-off to avoid storing the inputs
and instead re-execute the dependencies of the task (even if they have been executed successfully
before) to regenerate the inputs. This can help reduce memory footprint, albeit at the cost of
additional computation time, and it also might not work well if the execution of dependencies are
not deterministic.
Because of its importance, fault-tolerant task execution is generally well-supported in most
existing task runtimes. The differences are mostly in the level of automation. For example, Fire-
Works or Merlin require users to manually restart failed tasks, while runtimes such as Dask or
Balsam can restart them automatically without user intervention.
An additional aspect of fault tolerance is persistence of submitted tasks. It can be useful to
have the ability to resume task graph computation if the whole task runtime infrastructure (e.g.
its primary server) crashes, to avoid needless recomputations. Some task runtimes, such as Dask,
do not support such fault tolerance at all, others support it optionally, such as Ray, while some
task runtimes are persistent by default, such as Balsam.
50
4.5 Multi-node tasks
Many existing HPC applications are designed to be executed on multiple (potentially hundreds or
even thousands) nodes in parallel, using e.g. MPI or other communication frameworks. Multi-node
execution could be seen as a special kind of resource requirement, which states that a task should
be executed on multiple workers at once. Support for multi-node tasks is challenging, because it
affects many design areas of a task runtime:
Scheduling When a task requires multiple nodes for execution and not enough nodes are available
at a given moment, the scheduler has to decide on a strategy that will allow the multi-node task
to execute. If it were constantly trying to backfill available workers with single-node tasks, then
multi-node tasks could be starved.
The scheduler might thus have to resort to keeping some nodes idle for a while to enable the
multi-node task to start as soon as possible. Another approach could be to interrupt the currently
executing tasks and checkpoint their state to make space for a multi-node task, and then resume
their execution once the multi-node task finishes.
In a way, this decision-making already has to be performed on the level of individual cores even for
single-node tasks, but adding multiple nodes per task makes the problem much more difficult.
Data transfers It is relatively straightforward to express data transfers between single-node tasks
in a task graph, where a task produces a set of complete data objects, which can then be used as
inputs for dependent tasks. With multi-node tasks, the data distribution patterns become more
complex, because when a task that is executed on multiple nodes produces a data object, the
object itself might be distributed across multiple workers, which makes it more challenging to use
in follow-up tasks. The task graph semantics might have to be extended by expressing various
data distribution strategies, for example a reduction of data objects from multiple nodes to a single
node, to support this use-case. This use-case is supported e.g. by PyCOMPSs [77].
When several multi-node tasks depend on one another, the task runtime should be able to exchange
data between them in an efficient manner. This might require some cooperation with the used
communication framework (e.g. MPI) to avoid needless repeated serialization and deserialization
of data between nodes.
Fault tolerance When a node executing a single-node task crashes or disconnects from the run-
time, its task can be rescheduled to a different worker. In the case of multi-node tasks, failure
handling is generally more complex. For example, when a task is being executed on four nodes
and one of them fails, the runtime has to make sure that the other nodes will be notified of this
situation, so that they can react accordingly (either by finishing the task with a smaller number of
nodes or by failing immediately).
Some task runtimes only consider single-node tasks in their programming model, which forces
users to use various workarounds, such as emulating a multi-node task with a single-node task that
uses additional resources that are not managed by the task runtime itself. In other tools, such
as AutoSubmit, it is only possible to express node counts on the level of individually submitted
51
allocations. While this is useful for running coarse-grained MPI applications, it does not allow
combining these multi-node applications with other, fine-grained single-node tasks in the same task
graph. One notable exception is PyCOMPSs [77], which allows decorating Python functions that
represent tasks with an annotation that specifies how many nodes does that task require, and
combine these tasks with other single-node tasks.
4.6 Deployment
Even though it might sound trivial at first, an important aspect that can affect the ergonomics
of executing task graphs (or any kind of computation, in general) on an HPC cluster is ease-
of-deployment. Supercomputing clusters are notable for providing only a severely locked-down
environment for their users, which does not grant elevated privileges and requires users to either
compile and assemble the build and runtime dependencies of their tools from scratch or choose from
a limited set of precompiled dependencies available on the cluster. However, these precompiled
dependencies can be outdated or incompatible with one another.
Deploying any kind of software (including task runtimes) that has non-trivial runtime depen-
dencies or non-trivial installation steps (if it is not available in a precompiled form for the target
cluster) can serve as a barrier for using it on an HPC cluster. Many existing task runtimes are not
trivial to deploy. For example, several task runtimes, such as Dask, SnakeMake or PyCOMPSs,
are implemented in Python, a language that typically needs an interpreter to be executed, and also
a set of packages in specific versions that need to be available on the filesystem of the target cluster.
Installing the correct version of a Python interpreter and Python package dependencies can already
be slightly challenging in some cases, but there can also be other issues, such as the dependencies
of the task runtime conflicting with the dependencies of the software executed by the task graph
itself. For example, since Dask is a Python package, it depends on a certain version of a Python
interpreter and a certain set of Python packages. It supports tasks that can directly execute Python
code in the process of a Dask worker. However, if the executed code requires a different version
of a Python interpreter or Python packages than Dask itself, this can lead to runtime errors or
subtle behavior changes
3
.
A much bigger challenge is if a task runtime requires some external runtime dependencies. For
example, Merlin requires a message broker backend, such as RabbitMQ or Redis, FireWorks
needs a running instance of the MongoDB database, and Balsam uses the PostgreSQL database for
storing task metadata. Compiling, configuring and deploying these complex runtime dependencies
on HPC clusters can be quite challenging.
4.7 Programming model
One important aspect that affects the simplicity of defining and executing task graphs is the
programming model and interfaces used to describe the execution logic, resource requirements,
3
It is not straightforward to use multiple versions of the same Python package dependency in a single Python
virtual environment, because conflicting versions simply override each other.
52
dependencies and other properties of tasks and task graphs. Below is a list of areas that affect the
ergonomic aspects of task graph definition.
Task definition interfaces The method used to define tasks affects how easy it is to work with
the task runtime, and how complex use-cases can be expressed with it. Existing interfaces usually
belong to one of three broad categories. Tools such as GNU parallel or HyperShell offer a CLI
(Command-line Interface), which enables defining tasks that should be computed directly in the
terminal. This provides a quick way to submit structurally simple task graphs, although it is difficult
to describe complex dependencies between tasks using this approach. A more general method of
defining task graphs is to use a declarative workflow file, often using the YAML (Yet Another
Markup Language) format. This approach is used e.g. by Pegasus, AutoSubmit or Merlin.
Since they are declarative, workflow files make it easier to analyze the task graph description, for
example for providing data provenance. However, they can also be incredibly verbose, and are
not suited for task graphs with a large number of tasks (unless the workflow file is automatically
generated or it supports some form of compressed representation for groups of tasks). The most
general (and very popular) method for describing task graphs is to leverage a general purpose
programming language, very often Python. Many task runtimes provide a Python API for defining
tasks, e.g. Dask, Ray, Parsl, Balsam or Pegasus. This approach allows users to create task
graphs however they like, although it can be unnecessarily verbose for simple use-cases.
Data transfers Direct transfers of data between tasks have already been mentioned as a potential
performance bottleneck, yet they also affect the ease-of-use of a given programming model. Some
use-cases are naturally expressed by tasks passing their outputs directly to depending tasks. Sup-
port for this feature can make such use-cases much simpler, by not forcing users to pass intermediate
task outputs through the filesystem.
Iterative computation Another important aspect of the programming model is its support for
iterative computation. There are various HPC use-cases that are inherently iterative, which means
that they perform some computation repeatedly, until some condition (which is often determined
dynamically) is satisfied. For example, the training of a machine-learning model is typically per-
formed in iterations (called epochs) that continue executing while the prediction error of the model
keeps decreasing. Another example could be a molecular dynamics simulation that is repeated until
a desired level of accuracy has been reached.
One approach to model such iterative computations would be to run the whole iterative process
inside a single task. While this is simple to implement, it might not be very practical, since such
iterative processes can take a long time to execute, and performing them in a single task would
mean that we could not leverage desirable properties offered by the task graph abstraction, for
example fault tolerance. Since the computation within a single task is typically opaque to the task
runtime, if the task fails, it would then need to be restarted from scratch.
A better approach might be to model each iteration as a separate task. In such case, the task
runtime is able to restart the computation from the last iteration if a failure occurs. However,
this approach can be problematic if the number of iterations is not known in advance, since some
53
task runtimes expect that the structure of the task graph will be immutable once the graph has
been submitted for execution. Dask and Ray are examples of task runtimes do not have this
assumption; they allow adding tasks to task graphs dynamically during their execution.
Another option to handle iterative tasks is provided by L-DAG [114], which suggests an approach
for transforming iterative workflows into workflows without loops.
Summary
This chapter has identified a set of challenges that can limit the usage ergonomics and efficiency
of task graphs on supercomputers, and described how do existing task runtimes deal with them.
Even though more HPC peculiarities can always be found, it should be already clear from all the
mentioned challenges that HPC use-cases that leverage task graphs can be very complex and may
require specialized approaches in order to reach maximum efficiency while remaining easy to use
and deploy. The most important challenge that was described is the interaction of tasks with
allocations, which has a fundamental effect on the achievable hardware utilization of the target
cluster, and on the overall simplicity of defining and executing task graphs.
The rest of the thesis will discuss approaches for alleviating these challenges. Chapters 5 and 6
focus on the performance aspects of task graph execution, namely on task scheduling and the
overhead introduced by task runtimes. Chapter 7 then also deals with the ergonomic challenges,
and proposes a meta-scheduling and resource management design that is built from the ground
up with the mentioned challenges in mind, to enable truly first-class task graph execution on
heterogeneous HPC clusters.
54
Chapter 5
Task scheduling analysis
Task scheduling is one of the most important responsibilities of a task runtime, because the quality
of scheduling has an effect on the makespan of the task graph execution and also on the achieved
hardware utilization of worker nodes. It is crucial for the scheduler to be able to distribute the tasks
among all available worker nodes to achieve as much parallelization as possible, without the induced
network communication and task management overhead becoming a bottleneck. Unfortunately,
optimal task scheduling is a very difficult problem, which is NP-hard even in the simplest cases [69],
and there is thus no single scheduling algorithm that could quickly provide an optimal schedule for
an arbitrary task graph.
There are many factors that affect the execution properties of task graphs and that pose some
form of a challenge to task schedulers. The computational environment (e.g. a distributed cluster)
can have varying amounts of nodes with heterogeneous hardware resources, and complex network
topologies that can have a non-trivial effect on the latency and bandwidth of messages sent between
the workers and the scheduler, and thus in turn also on the overall performance of the task graph
execution. Task graphs can also have an arbitrarily complex structure, with large amounts of
different kinds of tasks with diverse execution characteristics and resource requirements.
Furthermore, task graph execution might not be deterministic, and the scheduler has to work
with incomplete information and react to events that dynamically occur during task execution and
that cannot be fully predicted before the task graph execution starts. The communication network
can be congested because of unrelated computations running concurrently on the cluster; tasks
can also be slowed down by congested hardware resources that can be highly non-trivial to model,
such as NUMA effects, and they can also sometimes fail unpredictably and have to be re-executed.
Even the duration of each task, which is perhaps the most crucial property of a task coveted by
the scheduler, is not usually known beforehand; the most the scheduler knows about is either an
estimate from the task graph author or a running average based on historical executions of similar
tasks, both of which can be inaccurate.
In theory, all these factors should be taken into account by task scheduling algorithms. In prac-
tice, it is infeasible to have a completely accurate model of the entire cluster, operating system, task
implementations, networking topology, etc. Therefore, task schedulers omit some of these factors to
provide a reasonable runtime performance. They rely on various heuristics with different trade-offs
55
that make them better suited for specific types of task graphs and computational environments.
These heuristics can suffer from non-obvious edge cases that produce poor quality schedules or
from low runtime efficiency, which can in turn erase any speedup gained from producing a higher
quality schedule.
In HPC use-cases, the performance and quality of task scheduling is even more important,
since the scale and heterogeneity of task graphs provides an additional challenge for the scheduler.
HPC clusters also tend to contain advanced network topologies with low latency and high band-
width [115, 116], which offer the scheduler more leeway to create sophisticated schedules leveraging
large amounts of network communication, which would otherwise be infeasible on clusters with
slower networks.
To better understand the behavior and performance of various scheduling algorithms, and to find
out which scheduling approach is best suited for executing task graphs on distributed clusters, we
have performed an extensive analysis of several task scheduling algorithms in Analysis of workflow
schedulers in simulated distributed environments [1]. The two main contributions of this work are
as follows:
1. We have created an extensible, open-source simulator of task graph execution, which allows
users to easily implement their own scheduling algorithms and compare them, while taking
into account various factors that affect task scheduling.
2. We have benchmarked several task schedulers from existing literature under various condi-
tions, including factors affecting scheduling that have not been explored so far to our knowl-
edge, like the minimum delay between invoking the scheduler or the amount of knowledge
about task durations available to the scheduler, and evaluated the suitability of the individual
algorithms for various types of task graphs. All parts of the benchmark suite, including the
task graphs, source codes of the scheduling algorithms, the simulation environment and also
benchmark scripts are provided in an open and reproducible form.
Various descriptions of schedulers, task graphs and other parts of the simulator and the bench-
mark configuration used in this chapter were adapted from our publication [1].
I have collaborated on this work with Ada Böhm and Vojtěch Cima, we have all contributed to
it equally. Source code contribution statistics for Estee can be found on GitHub
1
.
5.1 Task graph simulator
To analyze scheduling algorithms, some form of an environment for executing tasks has to be used.
One possibility would be to use an actual distributed cluster, and implement multiple schedulers
into an existing task runtime. However, this approach can be expensive, both computationally
(executing a large number of task graphs with various configurations would consume a lot of cluster
computational time) and implementation-wise (adapting existing runtimes to different scheduling
algorithms is challenging). Therefore, task graph scheduling surveys tend to use some form of
a simulated environment, which simulates selected properties of a distributed cluster, and allows
1
https://github.com/it4innovations/estee/graphs/contributors
56
comparing the performance of multiple scheduling algorithms (or other factors of a task runtime)
with a reduced accuracy, but at a fraction of the cost.
Many task scheduler surveys have been published over the years [70, 71, 67, 117, 68], yet it
is difficult to reproduce and extend these results without having access to the exact source code
used to implement the schedulers and the simulation environment used in these surveys. As we
will show in the following chapter, the performance of scheduling algorithms can be affected by
seemingly trivial implementation details, and having access only to a high-level textual description
or pseudocode of a scheduling algorithm does not guarantee that it will be possible to reproduce
it independently with the same performance characteristics. This makes it challenging to compare
results between different simulation environments.
Apart from the environments used in existing surveys, there are also more general task simu-
lation environments. DAGSim [118] offers a framework for comparing scheduling algorithms, and
compares the performance of a few algorithms, but does not provide its implementation, which
makes it difficult to reproduce or extend its results. SimDAG [119] is a task graph simulator fo-
cused on HPC use-cases built on top of the SimGrid [120] framework. It allows relatively simple
implementation of new task scheduling algorithms; however, it does not support any task resource
requirements (e.g. the number of used CPU cores), which are crucial for simulating heterogeneous
task graphs.
In addition to simply comparing the performance of different schedulers, our goal was also to
test two factors for which we have hypothesized that they might affect scheduling; we have not seen
these explored in detail in existing works. Namely, we wanted to examine the effects of Minimal
Scheduling Delay, the delay between two invocations of the scheduler and information mode, the
amount of knowledge of task durations that is available to the scheduler. These factors will be
described in detail in the following section. The existing simulation environments that we have
evaluated did not have support for these factors, and it would be non-trivial to add support for
them.
To summarize, our goal was to provide a simulation environment that would be open-source,
facilitate reproducibility, support basic task resource requirements, and enable us to examine the
two mentioned factors that affect scheduling. To fulfill these goals, we have implemented a task
graph simulation framework called Estee. It is an MIT-licensed open-source tool [121] written
in Python that provides an experimentation testbed for task runtime and scheduler developers
and researchers. It is flexible; it can be used to define a cluster of workers, connect them using
a configurable network model, implement a custom scheduling algorithm and test its performance
on arbitrary task graphs, with support for specifying required CPU core counts for individual
tasks. Additionally, it comes “battery-included”; it contains baseline implementations of several
task schedulers from existing literature and also a task graph generator that can be used to generate
randomized graphs with properties similar to real-world task graphs.
5.1.1 Architecture
Figure 5.1 shows the architecture of Estee. The core of the tool is the Simulator component,
which uses discrete event simulation [122] to simulate the execution of a task graph. It manages
57
Simulator
Scheduler
Worker
Worker
Worker
Network
model
Task graph
Component
User-overloadable component
Task placement
Events
Simulated data transfers
Figure 5.1: Estee architecture
tasks, queries the Scheduler component for task-to-worker assignments (schedules) and then assigns
the tasks to their corresponding workers. It also ensures that all task graph executions satisfy the
dependency constraint (Definition 4) and both task and worker constraints (Definition 5).
The Worker component simulates task execution and uses the provided network model to
simulate exchanges of data (task outputs) between individual workers of the simulated cluster.
Estee provides abstract interfaces for the task scheduler, the worker and the network model
(which simulates network communication and contention). Users can thus easily provide their own
implementations of these interfaces, and in turn override both the behavior of the scheduler and of
the cluster and its network topology.
One of our goals for Estee was to make it very easy to write new scheduling algorithms
and make the scheduler approachable for other researchers that might want to experiment with
task schedulers. That was also one of the motivations for deciding to create Estee in Python,
which facilitates experimentation. Listing 5.1 shows an example of a task graph simulation that
demonstrates the simplicity of defining a task graph simulation using Estee. The output of the
simulation is both the makespan and also a detailed trace that can be used to visualize the individual
task-to-worker assignments and task execution time spans.
Estee supports general task graphs represented by a DAG. Each task has an associated du-
ration, and can contain multiple outputs (data objects), each with an associated size. It can also
specify how many cores it requires, to model the common requirement of executing multi-threaded
functions and programs on modern HPC machines. Tasks do not depend directly on other tasks;
dependencies are formed between a task and a specific data output of another task. The used task
graph model thus corresponds to the task graph definition introduced in Chapter 3.
5.1.2 Communication model
Some previous scheduler surveys assume that the time to transfer a data object from one worker
to another depends merely on the size of the data object, and not on other factors, such as current
58
from estee.common import TaskGraph
from estee.schedulers import BlevelGtScheduler
from estee.simulator import Simulator, Worker, MaxMinFlowNetModel
# Create a task graph containing 3 tasks
# Each task runs for 1s and requires 1 CPU core
#
# t0
# |
# o (50MB output)
# / \
# t1 t2
tg = TaskGraph()
t0 = tg.new_task(duration=1, cpus=1, output_size=50)
t1 = tg.new_task(duration=1, cpus=1)
t1.add_input(t0)
t2 = tg.new_task(duration=1, cpus=1)
t2.add_input(t0)
# Create a task scheduler
scheduler = BlevelGtScheduler()
# Define cluster with 2 workers (1 CPU core each)
workers = [Worker(cpus=1) for _ in range(2)]
# Define MaxMinFlow network model (100MB/s bandwidth)
netmodel = MaxMinFlowNetModel(bandwidth=100)
# Run simulation, return the estimated makespan in seconds
simulator = Simulator(tg, workers, scheduler, netmodel, trace=True)
makespan = simulator.run()
print(f"Task graph execution makespan = {makespan}s")
Listing 5.1: Simple task graph simulation example using Estee
59
network utilization or interference [123, 124, 68, 125]. This is an unrealistic assumption, as the
latency and bandwidth of actual computer networks is affected (among other things) by other com-
munication happening concurrently on the same network. Moreover, a real worker implementation
would download more than a single data object simultaneously, which further affects the transfer
durations, because the worker’s bandwidth will be shared by multiple network transfers. We will
use the term communication model and network model interchangeably in this chapter.
We provide a more realistic network model that simulates full-duplex communication between
workers, where the total (data object) upload and download bandwidth of each worker is lim-
ited. The sharing of bandwidth between worker connections is modeled by the max-min fairness
model [126]. Max-min fairness provides a bandwidth allocation for each worker. If an allocation
of any participant is increased, then we decrease the allocation of some other participant with an
equal or smaller allocation. When a data object transfer starts or finishes, the data flow between
workers is recomputed immediately; thus we neglect the fact that it may take some time for the
bandwidth to fully saturate.
This model is not as accurate as e.g. packet-level simulation implemented in some other simula-
tors [120], but it is a notable improvement over the naive model and it provides reasonable runtime
performance. To provide a baseline that corresponds to the naive model described above, which
has been used in several previous works, Estee also implements a simple network model. The
used networking model can be configured with an arbitrary network bandwidth amount for a given
simulation.
5.1.3 Scheduler parameters
Estee implements support for two parameters that can affect scheduler performance, and which
we have not seen examined in detail in existing literature:
Minimal Scheduling Delay Non-trivial schedulers create task assignments continuously during
task graph execution, based on the current worker load and task completion times. That means
that they are not invoked only once, but rather the task runtime invokes them repeatedly to ask
them to produce assignments for tasks that are (or soon will be) ready to be executed at any given
point in time.
It then becomes important for a task runtime to decide when exactly it should invoke the scheduler.
It could try to make a scheduling decision every time a task is finished; however, in practice there
is often an upper bound on the number of scheduler invocations per second. It might be introduced
artificially, to reduce the scheduling overhead, or it might be caused by a software or hardware
limitation (e.g. messages containing task updates cannot be received more often). Furthermore,
creating a new schedule after each task status change might not be optimal. The runtime can
also accumulate changes for a short time period, and then provide the scheduler with a batch of
status updates. While this increases the latency of task assignments, it can give the scheduler more
context to work with, when it decides how it should assign tasks to workers.
To test our hypothesis that the scheduler invocation rate can affect its performance, we introduce
a parameter called MSD (Minimal Scheduling Delay), which forces a minimal delay between two
60
scheduler invocations, i.e. the scheduler cannot be invoked again before at least MSD time units
have elapsed since its previous invocation.
Information mode Many existing task scheduler descriptions assume that the duration of each
task (and the size of each data object) is known in advance. However, this assumption is very seldom
upheld when executing real-world task graphs. Tasks are usually specified using arbitrary function
or binary invocations, and it is difficult to estimate their duration up front. Task runtimes thus have
to work with completely missing information about task durations, depend on potentially imprecise
user estimates, or calculate their own estimates based on historical task execution data. Task
benchmarks usually use simulated task durations, which are provided to the scheduler. However,
this might not realistically represent the scheduler’s behavior for actual task graphs, for which we
usually do not know task durations precisely before they are executed.
We use a parameter called Information mode (imode), which controls the amount of knowledge the
scheduler has of the duration of tasks. It can be set to one of the following values:
exact The scheduler has access to the exact duration of each task and the exact size of each
data object in the whole task graph.
user The scheduler has access to user-defined estimations for each task in the task graph.
These estimations are sampled from a random distribution that corresponds to a specific kind
of task within the workflow. For example, in a task graph that performs three kinds of tasks
(e.g. preprocessing, computation and postprocessing), each kind of task would have its own
distribution. We have categorized the tasks of task graphs that we have used for scheduler
benchmarks described in Section 5.2 manually, to simulate a user that has some knowledge of
the task graph that they are trying to compute and is able to provide some estimate of task
durations and data object sizes.
mean The scheduler only has access to the mean duration of all tasks and the mean size of all
data objects in the executed task graph. This simulates a situation where a similar task graph
is executed repeatedly, and thus there is at least some aggregated information about the task
properties available from an earlier run.
Another possible mode to consider could be to not provide the scheduler with any task durations
nor data object sizes in advance. This behavior would in fact correspond closely to a real-world
execution of a task graph, where we usually do not know these task properties a priori. However,
it is challenging to use this approach when benchmarking schedulers. Scheduler implementations
are typically described with the assumption that task durations are known, and the scheduling
algorithms are often fundamentally based on calculations that make use of them.
If we took away this information, some schedulers would not be able to function, as their behavior
is strongly influenced by an estimate of the duration of each task. Therefore, we propose using
the mean mode instead of not providing the scheduler with any information. We assume that
even if the scheduler knows nothing in advance, it could always gradually record the durations
and sizes of finished tasks, and these values would eventually converge to the global mean. In
practice, this would take some time, while in our environment the schedulers know about the mean
61
in advance. Nevertheless, as was already mentioned, we can often get a reasonable estimate of the
mean durations based on previous executions of similar workflows.
5.1.4 Schedulers
There are many task scheduling approaches, and an enormous number of various task scheduling
algorithms. We have implemented a set of task schedulers that are representatives of several
common scheduling approaches, inspired by a list of schedulers from a survey performed by Wang
and Sinnen [68]. We have included several representatives of the simplest and perhaps most common
scheduling approach, called list-scheduling, where the scheduler sorts tasks based on some priority
criteria, and then repeatedly chooses the task with the highest priority and assigns it to a worker,
which is selected by some heuristic. In addition to list-scheduling, we have also implemented more
complex approaches, such as schedulers that use work-stealing or genetic algorithms.
Below is a list of schedulers that we have implemented and benchmarked
2
:
blevel HLFET (Highest Level First with Estimated Times) [70] is a foundational list-based schedul-
ing algorithm that prioritizes tasks based on their b-level. B-level of a task is the length of the
longest path from the task to any leaf task (in our case the length of the path is computed using
durations of tasks, without taking data object sizes into account). The tasks are scheduled in a
decreasing order based on their b-level.
tlevel Smallest Co-levels First with Estimated Times [127] is similar to HLFET, with the exception
that the priority value computed for each task (which is called t-level here) is computed as the length
of the longest path from any source task to the given task. This value corresponds to the earliest
time that the task can start. The tasks are scheduled in an increasing order based on their t-level.
dls Dynamic Level Scheduling [128] calculates a dynamic level for each task-worker pair. It is
equal to the static b-level lessened by the earliest time that the task can start on a given worker
(considering necessary data transfers). In each scheduling step, the task-worker pair that maximizes
this value is selected.
mcp The Modified Critical Path [129] scheduler calculates the ALAP (as-late-as-possible) time for
each task. This corresponds to the latest time the task can start without increasing the total sched-
ule makespan. The tasks are then ordered in ascending order based on this value, and scheduled
to the worker that allows their earliest execution.
etf The ETF (Earliest Time First) scheduler [130] selects the task-worker pair that can start at
the earliest time at each scheduling step. Ties are broken by a higher b-level precomputed at the
start of task graph execution.
genetic This scheduler uses a genetic algorithm to schedule tasks to workers, using the mutation
and crossover operators described in [131]. Only valid schedules are considered; if no valid schedule
can be found within a reasonable number of iterations, a random schedule is generated instead.
ws This is an implementation of a simple work-stealing algorithm. The default policy is that each
task that is ready to be executed (all its dependencies are already computed) is always assigned
2
The labels of the individual schedulers correspond to labels used in charts that will be presented in Section 5.2.
62
to a worker where it can be started with a minimal transfer cost. The scheduler then continuously
monitors the load of workers. When a worker starts to starve (it does not have enough tasks to
compute), a portion of tasks assigned to other workers is rescheduled to the starving worker.
In addition to these schedulers, we have also implemented several naive schedulers, which serve
as a baseline for scheduler comparisons.
single This scheduler simply assigns all tasks to a single worker (it selects the worker with the
most cores). The resulting schedule never induces any data transfers between workers, and does
not take advantage of any parallelism between workers.
random This scheduler simply assigns each task to a random worker using a PRNG (Pseudoran-
dom Number Generation) engine.
We have strived to implement the mentioned list-based schedulers (blevel, tlevel, dls, mcp, etf ) as
closely as possible to their original description. These list-based algorithms mostly focus on selecting
the next task to schedule, but an important question (that comes up during their implementation)
is to what worker should the selected task be scheduled. The algorithm descriptions often mention
assigning the task to a worker that allows the earliest start time of the task. While that is surely a
reasonable heuristic, it is not clear how exactly such a worker should be found, because the exact
earliest start time often cannot be determined precisely in advance, since its calculation might
encompass network transfers whose duration is uncertain. This seemingly simple implementation
detail is crucial for implementing the scheduler, and it should thus be included in the description
of all scheduling algorithms that make use of such a heuristic.
Estee implementations of these schedulers use a simple estimation of the earliest start time,
which is based on the currently executing and already scheduled tasks of a worker and an estimated
network transfer cost based on uncongested network bandwidth (the simple network model is used
for the scheduler’s estimation of the network transfer cost).
In order to test our hypothesis that the worker selection approach is important and affects the
scheduler’s behavior, we have also created extended versions of the blevel, tlevel and mcp schedulers.
These modified versions use a worker selection heuristic called “greedy transfer“. We have not
applied this heuristic to other list-based schedulers, because it would fundamentally change their
behavior.
The greedy transfer heuristic assigns the selected task to a worker that has a sufficient number
of free cores on which the task may be executed, and that requires the minimal data transfer (sum
over all sizes of data objects that have to be transferred to that worker). It also adds support for
clusters where some machines have a different number of cores than others. When a task t that
needs c cores cannot be scheduled because of an insufficient number of free cores, the list-scheduling
continues by taking another task in the list instead of waiting for more free cores. This task will
only consider workers that have fewer than c cores. This allows for scheduling more tasks while
it does not modify the priority of tasks because t cannot be scheduled on such workers anyway.
Note that when all workers have the same number of cores, the behavior is identical to ordinary
list-scheduling.
63
5.1.5 Task graphs
To facilitate task scheduler experiments, Estee contains a task graph generator that is able to
generate parametrized instances of various categories of task graphs. Graphs from each category
can be generated using several parameters that affect their resulting size and shape. To increase
the variability of the graphs, properties like task durations or data object sizes are sampled from
a normal distribution. Below is a description of the three categories of task graphs that can be
generated:
elementary This category contains trivial graph shapes, such as tasks with no dependencies or
simple “fork-join” graphs. These graphs can test the behavior of scheduler heuristics on basic task
graph building blocks that frequently form parts of larger workflows. Examples of these graphs can
be found in Appendix A (Figure A.1).
irw This generator creates graphs that are inspired by real-world task graphs, such as machine-
learning cross-validations or map-reduce workflows.
pegasus This category is derived from graphs created by the Synthetic Workflow Generators [132].
The generated graphs correspond to the montage, cybershake, epigenomics, ligo and sipht Pegasus
workflows. The graphs have been extended with additional properties required for testing infor-
mation modes (notably expected task durations and data object sizes for the user information
mode).
5.2 Task scheduler evaluation
We have carried out an extensive analysis of the performance of several task scheduling algorithms
on various task graphs using the Estee simulator. The aim of the analysis was to explore the
behavior of various schedulers in a complex simulation environment. In addition to comparing the
schedulers among each other, we also wanted to test how their performance differs between various
communication models and scheduler parameters.
Note that since the used simulation environment is focused on simulating different task graph
schedules and network transfers, and it does not model the actual execution of the scheduler nor
the task runtime in a cycle-accurate way, the term scheduler performance refers to the simulated
makespan of task graphs executed using schedules provided by the given scheduler, as defined
by Definition 6. In other words, our experiments estimate how quickly a given task graph would
be fully computed on a cluster, using a given network model, while being scheduled by a specific
scheduling algorithm.
5.2.1 Benchmark configuration
Below, you can find descriptions of the cluster, scheduler and task graph configurations that we
have used for our benchmarks.
64
Task graphs The Estee graph generators were used to generate a collection of task graphs that
were used in the benchmarks. The properties of all used graphs are summarized in Appendix A
(Table A.1). The generated task graph dataset is available as a reproducible artifact [133].
Schedulers We have benchmarked all schedulers described in Section 5.1.4. Schedulers that use
the greedy transfer heuristic are labeled in the benchmark results with a -gt suffix.
Scheduler parameters To evaluate the effect of minimal scheduling delay, we have used a baseline
value of zero, where the scheduler is invoked immediately after any task status update, and then a
delay of 0.1, 0.4, 1.6 and 6.4 seconds. In the cases where MSD is non-zero, we have also added a 50
milliseconds delay before sending the scheduler decision to workers, to simulate the time taken by
the scheduler to produce the schedule. For experiments that do not focus on MSD, we always use
an MSD of 0.1 seconds and the 50 milliseconds computation delay. To evaluate information modes,
we have used the exact, user and mean imodes. For experiments that do not focus on imodes, we
always use the exact mode.
Network models The simple (labeled simple) and max-min (labeled max-min) network models
were used, with bandwidth speeds ranging from 32 MiB/s to 8 GiB/s. For experiments that do not
focus on the network model (e.g. when imodes are being compared), we always use the max-min
network model.
Clusters We have used the following five cluster (worker) configurations (where w × c means that
the cluster has w workers and each worker has c cores): 8×4, 16×4, 32×4, 16×8, 32×16.
5.2.2 Evaluation
This section discusses selected results of the described benchmarks. Complete benchmark re-
sults and overview charts can be found in [1]. The benchmarking environment, input task graph
datasets, all benchmark configurations, results and charts are also freely available as reproducible
artifacts [134] for further examination.
The benchmarks were executed on the clusters of the IT4Innovations supercomputing cen-
ter [135]. The actual characteristics of the cluster hardware is not important, because all bench-
marks were executed using the Estee simulator, so the benchmark results do not depend on the
used hardware. Each benchmark that was non-deterministic in any way (e.g. because it used
a pseudorandom number generator) was executed twenty times. Unless otherwise specified, the
individual experiments were performed with the default benchmark configuration that uses the
max-min network model, the exact information mode and a Minimal Scheduling Delay of 0.1 s.
Note that the vertical axis of some charts presented in this section does not start at zero, as
the goal was to focus on the relative difference between different scheduling algorithms rather than
on the absolute makespan durations.
Random scheduler
Given the fact that task scheduling is an NP-hard problem, it would seem that a random scheduling
approach should produce unsatisfying results. Therefore, we wanted to examine how a completely
65
10
2
10
3
10
4
200
400
600
800
1000
8x4
bigmerge
10
2
10
3
10
4
4000
5000
6000
7000
8000
9000
10000
11000
12000
crossv
10
2
10
3
10
4
0
200
400
600
800
1000
1200
gridcat
10
2
10
3
10
4
220
240
260
280
300
320
340
360
splitters
10
2
10
3
10
4
200
400
600
800
1000
16x8
10
2
10
3
10
4
3000
3500
4000
4500
5000
5500
10
2
10
3
10
4
0
200
400
600
800
10
2
10
3
10
4
140
160
180
200
220
240
260
280
300
10
2
10
3
10
4
200
400
600
800
1000
32x16
10
2
10
3
10
4
2800
3000
3200
3400
3600
3800
10
2
10
3
10
4
0
100
200
300
400
500
600
700
10
2
10
3
10
4
150
175
200
225
250
275
300
325
blevel-gt random ws
horizontal axis: bandwidth [MiB/s]; vertical axis: makespan [s]; row: cluster
Figure 5.2: Performance of the random scheduler
66
random scheduler holds up against more sophisticated approaches. Figure 5.2 compares the simu-
lated makespan durations of the random scheduler vs. two other competitive schedulers (blevel-gt
and the work-stealing ws scheduler) on several task graphs.
While there are indeed cases where random scheduling falls short (for example on the cross-
validation crossv task graph, or in situations with many workers and a slow network), in most
cases its performance is similar to other schedulers, and in a few situations it even surpasses them.
Its performance improves with increasing worker count and network speed. This makes intuitive
sense, because if there are enough workers and the network is fast enough to overcome the cost
of exchanging many data objects between them, the specific assignment of tasks between workers
becomes less important. As long as the scheduler is able to keep the workers busy (which can be
ensured even by a random schedule for some task graphs), then the resulting performance might
be reasonable.
We have been able to validate these results in [2], where we have shown that as the worker count
becomes larger, scheduling decisions can in some cases become less important, and other factors
(like the overhead of the task runtime) might start to dominate the overall task graph execution
cost. This will be described in more detail in Chapter 6.
Worker selection strategy
As was explained in Section 5.1.4, the descriptions of several schedulers that we have implemented
in Estee do not specify the concrete strategy for selecting a worker that can start executing a given
task as soon as possible. Yet, as we can see in Figure 5.3, this implementation detail is crucial. This
chart shows the performance of two scheduling algorithms (blevel and mcp), each in two variants,
with the simple selection strategy and with the greedy transfer strategy (the used worker selection
strategy was the only difference between the simple and the -gt suffixed variants).
It can be seen that there is a large difference between these two strategies. In fact, the results
suggest that in these specific scenarios, the worker selection strategy had a larger effect on the
overall performance than the used scheduling (task selection) algorithm, as the variants using
greedy transfer were highly correlated. This suggests that the used worker selection strategy is an
important detail of list-scheduling algorithms that should not be omitted from their descriptions.
Network models
Figure 5.4 demonstrates how the used network model affects simulated task graph makespans for a
selected set of task graphs and schedulers, using task graphs from the irw dataset on the 32x4 cluster
with 32 workers. The Y axis is normalized with respect to the average makespan of simulations
performed with the simple network model.
It is clear that especially for slower network bandwidths, the naive simple model often underes-
timates the resulting makespan. This is caused by the fact that it does not take network contention
into account at all, which causes the overall network transfer duration estimation to be overly
optimistic. As network bandwidth goes up, the difference is reduced, since there is less overall
contention and the transfers are faster in general.
67
10
2
10
3
10
4
300
400
500
600
700
8x4
cybershake
10
2
10
3
10
4
100
200
300
400
500
fastcrossv
10
2
10
3
10
4
100
200
300
400
500
merge_triplets
10
2
10
3
10
4
275
300
325
350
375
400
425
450
475
16x8
10
2
10
3
10
4
50
100
150
200
250
300
10
2
10
3
10
4
50
100
150
200
250
300
350
10
2
10
3
10
4
280
290
300
310
320
32x16
10
2
10
3
10
4
60
80
100
120
140
160
180
200
220
10
2
10
3
10
4
50
100
150
200
250
300
blevel blevel-gt mcp mcp-gt
horizontal axis: bandwidth [MiB/s]; vertical axis: makespan [s]; row: cluster
Figure 5.3: Comparison of worker selection strategy
68
10
2
10
3
10
4
1.0
1.1
1.2
1.3
1.4
blevel-gt
crossv
10
2
10
3
10
4
0.9995
1.0000
1.0005
1.0010
1.0015
1.0020
crossvx
10
2
10
3
10
4
1
2
3
4
5
6
fastcrossv
10
2
10
3
10
4
2
4
6
8
10
12
gridcat
10
2
10
3
10
4
0
5
10
15
20
25
30
35
mapreduce
10
2
10
3
10
4
1.000
1.005
1.010
1.015
1.020
1.025
1.030
1.035
nestedcrossv
10
2
10
3
10
4
1.0000
1.0002
1.0004
1.0006
1.0008
1.0010
1.0012
1.0014
dls
10
2
10
3
10
4
0.95
1.00
1.05
1.10
1.15
1.20
10
2
10
3
10
4
1.0
1.1
1.2
1.3
1.4
1.5
10
2
10
3
10
4
0
10
20
30
40
10
2
10
3
10
4
0.96
0.98
1.00
1.02
1.04
10
2
10
3
10
4
1.0
1.1
1.2
1.3
1.4
10
2
10
3
10
4
0.9
1.0
1.1
1.2
1.3
1.4
ws
10
2
10
3
10
4
1.00
1.05
1.10
1.15
1.20
1.25
1.30
1.35
10
2
10
3
10
4
1
2
3
4
5
6
7
10
2
10
3
10
4
2
4
6
8
10
12
10
2
10
3
10
4
0
10
20
30
40
10
2
10
3
10
4
0.90
0.95
1.00
1.05
1.10
1.15
1.20
maxmin simple
horizontal axis: bandwidth [MiB/s]; vertical axis: makespan normalized to average of the simple model;
row: scheduler; cluster 32x4
Figure 5.4: Comparison of max-min and simple network models (irw dataset)
The makespans of simulations with these two network models are sometimes up to an order
of magnitude apart. This is quite significant, because the difference between the performance of
schedulers (with a fixed network model) is otherwise usually within a factor of two, which was
demonstrated both in [68] and by results of our other experiments. The gap between the two
network models depends heavily on the used task graph.
Minimal Scheduling Delay
In Figure 5.5, we can observe the effect of MSD on graphs from the irw dataset, with the 32x4
cluster configuration. The Y axis is normalized with respect to the configuration where MSD is zero.
The results show that the effect of MSD is relatively limited. There does not seem to be any clear
correlation or pattern that would suggest that a smaller MSD consistently improves performance of
a scheduler. Although interestingly, a higher MSD value caused several makespan improvements,
especially on the gridcat task graph.
This is another example of the non-trivial effect of scheduler heuristics. Increasing the MSD
leads to a batching effect, where the scheduler is allowed to make decisions less often, but it has
knowledge of more task events (that have arrived during the delay) during each decision. Whether
this helps its performance, or hurts it, depends on the specific scheduler implementation and the
task graph that it executes.
69
10
2
10
3
10
4
0.95
1.00
1.05
1.10
1.15
blevel-gt
fastcrossv
10
2
10
3
10
4
0.8
1.0
1.2
1.4
1.6
1.8
gridcat
10
2
10
3
10
4
0.99999999988
0.99999999990
0.99999999992
0.99999999994
0.99999999996
0.99999999998
1.00000000000
1.00000000002
nestedcrossv
10
2
10
3
10
4
0.975
1.000
1.025
1.050
1.075
1.100
1.125
1.150
dls
10
2
10
3
10
4
0.990
0.992
0.994
0.996
0.998
1.000
10
2
10
3
10
4
0.99996
0.99997
0.99998
0.99999
1.00000
1.00001
1.00002
1.00003
10
2
10
3
10
4
0.850
0.875
0.900
0.925
0.950
0.975
1.000
1.025
ws
10
2
10
3
10
4
0.6
0.8
1.0
1.2
1.4
1.6
10
2
10
3
10
4
0.95
1.00
1.05
1.10
1.15
1.20
0.0 0.1 0.4 1.6 6.4
horizontal axis: bandwidth [MiB/s]; vertical axis: makespan normalized to MSD = 0
Figure 5.5: Comparison of MSD; cluster 32x4
70
10
2
10
3
10
4
0.99
1.00
1.01
1.02
1.03
1.04
1.05
1.06
1.07
blevel-gt
crossv
10
2
10
3
10
4
1.00
1.05
1.10
1.15
1.20
1.25
1.30
crossvx
10
2
10
3
10
4
0.95
1.00
1.05
1.10
1.15
1.20
fastcrossv
10
2
10
3
10
4
0.8
1.0
1.2
1.4
1.6
gridcat
10
2
10
3
10
4
1.00
1.05
1.10
1.15
1.20
1.25
mapreduce
10
2
10
3
10
4
1.000
1.025
1.050
1.075
1.100
1.125
1.150
1.175
nestedcrossv
10
2
10
3
10
4
0.70
0.75
0.80
0.85
0.90
0.95
1.00
1.05
dls
10
2
10
3
10
4
0.8
0.9
1.0
1.1
1.2
1.3
1.4
10
2
10
3
10
4
0.6
0.7
0.8
0.9
1.0
1.1
10
2
10
3
10
4
0.94
0.96
0.98
1.00
1.02
1.04
1.06
10
2
10
3
10
4
1.000
1.002
1.004
1.006
1.008
1.010
1.012
1.014
1.016
10
2
10
3
10
4
0.7
0.8
0.9
1.0
1.1
1.2
10
2
10
3
10
4
0.70
0.75
0.80
0.85
0.90
0.95
1.00
mcp
10
2
10
3
10
4
0.8
1.0
1.2
1.4
1.6
10
2
10
3
10
4
0.7
0.8
0.9
1.0
1.1
1.2
1.3
10
2
10
3
10
4
0.94
0.96
0.98
1.00
1.02
1.04
1.06
1.08
10
2
10
3
10
4
1.000
1.002
1.004
1.006
1.008
1.010
1.012
1.014
10
2
10
3
10
4
0.8
1.0
1.2
1.4
1.6
exact mean user
horizontal axis: bandwidth [MiB/s]; vertical axis: makespan normalized to exact imode; row: scheduler;
cluster 32x4
Figure 5.6: Comparison of information modes (irw dataset)
Information modes
Figure 5.6 compares makespans of several scheduler and task graph combinations from the irw
dataset on a 32x4 cluster, with different information modes being used. The results are normalized
to the mean makespan of the default exact information mode. In general, the effect of information
modes is more significant than the effect of the Minimal Scheduling Delay.
An intuitive expectation would be that with more precise task duration information, the sched-
uler will be able to produce a shorter makespan, and this is indeed what happens in several cases,
e.g. on the mapreduce and nestedcrossv task graphs with the blevel-gt scheduler, where the makespan
is up to 25% longer when task durations are not exactly known.
However, there are also opposite cases, for example the dls and mcp schedulers produce better
results on several task graphs when they take only the mean task duration into account. This further
shows the effect of scheduler heuristics, which can produce worse results even when presented with
more accurate data input (and vice versa).
One factor that makes it more difficult for the scheduler to accurately estimate the network
transfer durations and thus make optimal use of the knowledge of task durations is that with the
max-min network model, the scheduler knows only a lower bound on the communication costs,
even if it knows the exact data size in advance. While it has access to the maximum bandwidth of
the network, it does not know the current (and most importantly, future) network utilization; thus
it has only a crude estimation of the real transfer duration.
71
5.2.3 Validation
It is challenging to validate the performance of different task schedulers in actual (non-simulated)
task runtimes. Schedulers tend to be deeply integrated into their task runtime in order to be
as performant as possible. That makes it difficult, or even infeasible, to replace the scheduling
algorithm without also modifying large parts of the task runtime. Furthermore, some scheduling
approaches might not even be compatible with the architecture of the runtime as a whole. For
example, work-stealing schedulers perform a lot of communication between the server and the
workers (potentially even between the workers themselves), and if the runtime does not implement
the necessary infrastructure for facilitating these communication patterns, then implementing a
work-stealing scheduler into such a runtime might amount to rewriting it from scratch.
In order to at least partially validate our simulation results, we have decided to use a modified
version of the Dask [43] task runtime as a validation framework. Apart from validating results
from the Estee simulations, we have also used this modified version of Dask to perform other
experiments and benchmarks that are described in Chapter 6, which also depicts the architecture
of Dask and our performed modifications in detail.
Dask is written in Python, which makes it relatively easy to modify and patch. It uses a
work-stealing scheduler by default, and even though it is relatively deeply integrated within the
Dask runtime, we were able to implement three simple alternative scheduling algorithms into it
3
,
which correspond as closely as possible to the random, blevel and tlevel schedulers from Estee.
The default work-stealing scheduler was compared with our work-stealing implementation of the
ws scheduler.
Apart from implementing new schedulers into Dask, there were several issues that we had to
solve to make sure that the comparison between the simulated and the real environment is as fair
and accurate as possible.
The absolute makespans of task graphs simulated by Estee and task graphs executed by Dask
cannot be compared directly, because there are many aspects of the operating system, network,
implementation of Dask itself and system noise that Estee can not fully simulate. Therefore,
since the primary goal of our task scheduler experiments was to compare the relative performance of
individual schedulers, we have decided to compare the relative makespans normalized to a reference
scheduler (blevel), to test if the makespan ratios between the schedulers is similar in simulation and
in real execution.
In the scheduler benchmarks, we have used many task graphs generated by the Estee task
graph generator. However, it would not be possible to perfectly replicate task durations of these
generated graphs in Dask. Therefore, we have approached this problem from the other direction.
We have executed several task graphs in Dask, and recorded their execution traces, so that we
would have a completely accurate representation of all the executed task durations and data object
sizes, which we could then accurately replicate in the simulated Estee environment. The recorded
task graphs will be described in Section 6.2 and they can also be found in [2]. We have executed
these workflows with a 24x2 cluster (24 cores on two nodes), which corresponds to two nodes of
3
The modified version of Dask with these implemented schedulers can be found at https://github.com/Kobzol/
distributed/tree/simple-frame-sched.
72
ws random tlevel
Scheduler
0.50
0.25
0.00
0.25
0.50
0.75
1.00
1.25
Performance relative to blevel
benchmark = bag-10000-10
ws random tlevel
Scheduler
benchmark = merge-10000
ws random tlevel
Scheduler
benchmark = numpy-20000-10
Dask
Estee
Figure 5.7: Scheduler performance relative to blevel in Dask and Estee
the Salomon [136] cluster, on which were the Dask workflows executed. Each actual execution and
simulation was performed three times.
Figure 5.7 shows the results of the validation comparison for three selected task graphs
4
. The
performance of each scheduler was normalized to the makespan of the blevel scheduler within the
same environment (either Estee or Dask). Note that the relative ratios were centered around
zero by subtracting 1 from them, to focus on the relative differences. For example, if a task graph
execution had a makespan of 100 s with the blevel scheduler, but 110 s with the ws scheduler, the
ratio of the ws scheduler would be 0.1. If the simulation was perfect, the two columns for each
scheduler would have the same height.
The first chart shows a situation where changing the scheduler resulted in large changes in
makespans, and Estee was able to simulate these changes relatively accurately, and reflect the
results measured in Dask. The second chart demonstrates a situation where all schedulers pro-
duce similar makespans; therefore, in this case the scheduling algorithm does not seem to be that
important. Estee was again able to estimate that the differences between schedulers will be small.
In the third chart, we see that Estee was systematically overestimating the makespans of all three
schedulers (with respect to the reference scheduler). The most important difference was in the ws
scheduler, where the simulated result claims that it is slower than blevel, while in reality it was
slightly faster. The work-stealing implementation in Dask is complex, and in this case it was able
to outperform blevel in a way that Estee was not able to simulate.
To summarize the average error of the simulated results, we took the relative makespans of
the individual schedulers w.r.t. the reference bevel scheduler, and calculated the difference between
the executed and simulated relative makespan. The geometric mean of these differences across all
measured benchmarks was 0.0347, which suggests that the differences between the execution and
simulation were relatively small; the simulated makespans were usually within just a few percent
off the actual makespan duration.
4
Extended validation results can be found in [1].
73
Summary
We have implemented a set of well known scheduling heuristics, prepared a benchmark dataset
containing task graphs of different types and scales and designed a simulation environment for task
scheduler experimentation. We have conducted a series of reproducible benchmarks using that
environment, in which we have analyzed the effect of network models, minimal scheduling delays,
information modes and worker selection strategy on the behavior of the implemented schedulers,
and also compared their relative performance.
Our attempts to implement existing scheduling algorithms in Estee and reproduce the results
of previous scheduler benchmarks have shown that various implementation details which are often
missing from the algorithm’s description (like the precise worker selection strategy) can have a large
effect on the final performance of the scheduler. Furthermore, we have been able to confirm our
hypothesis that the naive network model used in several existing works can result in very inaccurate
simulation results.
We have demonstrated that even a completely random scheduler can be competitive with other
scheduling approaches for certain task graphs and cluster configurations. This supports the conclu-
sion made in [68], where the authors have also observed that relatively simple scheduling algorithms
can be competitive, and more complex algorithms are useful mostly for special situations and edge
cases, where the simple heuristics might fail to produce reasonable results.
The Minimal Scheduling Delay had a smaller effect in our simulations than we have expected.
This hints that it might be possible to reduce the scheduling overhead (by invoking the scheduler
less often) without sacrificing schedule quality, but this will be highly dependent on the specific
task runtime implementation.
The effect of different information modes turned out to be significant, although it is unclear
whether schedulers can leverage the additional information about exact task durations when facing
unpredictable network conditions.
All our benchmarks, source code and also experiment results and charts are available in an open
and reproducible form [133, 134], which should facilitate reproducibility of our experiments.
The implemented Estee simulator enables simple prototyping of task schedulers and also allows
simulating execution of various task graphs. We hope that it could have further potential to simplify
the development and evaluation of novel task schedulers.
Through our experiments presented in this chapter, we have learned more about the effect of
scheduling algorithms on the performance of task graphs. We have identified a performant baseline
scheduling approach, work-stealing combined with the b-level heuristic, which we have further
leveraged in work described in the following two chapters.
Although task scheduling is an important factor, it is not the only aspect that contributes to
the overall overhead and performance of task runtimes. The following chapter will examine the
runtime overhead and scalability of a state-of-the-art task runtime in more detail.
74
Chapter 6
Task runtime optimization
This chapter delves further into the performance aspects of task graph execution on HPC clusters.
After performing the task scheduling experiments using Estee, our next goal was to find out how
do other parts of task runtimes (other than the scheduler) affect task graph makespans, in order
to provide guidelines for efficient design of task runtimes. We also wanted to validate some of
the rather surprising results of our experiments, for example the competitiveness of the random
scheduler, in a non-simulated setting. In order to do that, we moved from a simulator to a task
runtime that executes task graphs on an actual distributed cluster.
As was already discussed in Chapter 4, there is a large number of existing task runtimes that
are being used on HPC systems, which could be used as a test vehicle for our experiments. After
evaluating several of them, we have decided to examine the Dask [43] task runtime, for the following
reasons.
It is very popular within the scientific community [137], and thus any insights into how it
could be improved could benefit a wide range of users.
It is implemented in Python, which makes it relatively easy to modify.
It is quite versatile, as it allows executing arbitrary task graphs with dependencies and per-
forming data object exchanges between workers.
It uses a fairly standard distributed architecture with a centralized server that creates task
schedules and assigns tasks to a set of distributed workers. This maps well to the cluster
architecture used by Estee and also to HPC clusters in general.
In terms of task scheduling, Dask uses a work-stealing scheduler, which has been tuned exten-
sively over several years to support various use-cases and to provide better scheduling performance.
Yet it is unclear whether additional effort should be directed into improving the scheduler or if
there are other bottlenecks which should be prioritized.
To answer that question, we have analyzed the runtime performance and the bottlenecks of
Dask in Runtime vs Scheduler: Analyzing Dask’s Overheads
1
[2]. This work provides the following
contributions:
1
Note that this line of research follows after the task scheduler analysis described previously in Chapter 5, even
though it was published at an earlier date.
75
1. We have created a set of benchmarks containing diverse task graphs implemented in Dask.
This benchmark set was then used to analyze Dask’s performance in various HPC-inspired
scenarios. We have evaluated the per-task-overhead and scalability of Dask and compared
how different task scheduling algorithms affect its performance.
2. We demonstrate that even a naive (completely random) scheduling algorithm can be in some
situations competitive with the sophisticated hand-tuned work-stealing scheduler used in
Dask.
3. We provide RSDS, an alternative Dask server that is backwards-compatible with existing
Dask programs and provides significant speed-up vs the baseline Dask server in various
scenarios despite using a simpler task scheduler implementation.
Various descriptions of task graph benchmarks, Dask and RSDS used in this chapter were
adapted from our publication [2].
I have collaborated on this work with Ada Böhm; we have both contributed to it equally. I have
designed and performed all the experiments described in this chapter. Source code contribution
statistics for RSDS can be found on GitHub
2
.
6.1 Dask task runtime
Dask is a distributed task runtime implemented in Python that can parallelize and distribute
Python programs. It offers various programming interfaces (APIs) that mimic the interfaces of
popular Python packages. For example, Dask DataFrame copies the pandas [138] interface for
table processing and database operations, Dask Arrays copies the numpy [139] interface for tensor
computations and Dask ML copies the scikit-learn [140] interface for machine-learning. Thanks
to this interface compatibility, existing Python programs leveraging these libraries can often be
parallelized with Dask by changing only few lines of code.
This is demonstrated in Listings 6.1 and 6.2, which show two small Python programs that lever-
age the Dask Arrays and Dask DataFrame API, respectively. Notably, the only difference between
these programs (which leverage Dask and thus can be parallelized) and a standard sequential
version is the change of imports from numpy and pandas to Dask Python modules.
# import numpy as np
import dask.array as np
x = np.random.random((10000, 10000))
y = (x * 2 + x.T).mean(axis=1)
Listing 6.1: Example of a Python program that leverages the Dask Array API
2
https://github.com/it4innovations/rsds/graphs/contributors
76
# import pandas as pd
import dask.dataframe as pd
df = pd.read_csv("data.csv")
df2 = df[df.y > 0]
df3 = df2.groupby("name").x.std()
Listing 6.2: Example of a Python program that leverages the Dask DataFrame API
Programming model
Dask automatically transforms Python code leveraging these APIs into a task graph, which is then
executed in parallel, possibly on multiple nodes of a distributed cluster. This enables almost trans-
parent parallelization of sequential Python code. However, apart from these high-level interfaces,
it is also possible to build a task graph manually, using the Futures interface, to define complex
computational workflows.
The core computational abstraction of Dask is a task graph, which corresponds closely to the
definition of task graphs that was defined in Chapter 3. Each task represents a single invocation
of a Python function. The return value of the function forms its output (a data object), and the
arguments of the function invocation define the inputs of the task.
One important aspect of the mapping between Python code and task graphs in Dask is the
concept of partitions. It is a configuration parameter that essentially controls the granularity of
tasks created by Dask out of Python code that uses its APIs. For example, a single line of Python
code that performs a query over a pandas-like table (also called DataFrame) will eventually be
converted to a set of tasks; each such task performs the query on a subset of the table’s rows.
The selected number of these tasks (or partitions) is crucial, since it determines how effectively
the operation will be parallelized. Too few (large) tasks can cause computational resources to be
underutilized, while too many (small) tasks can overwhelm the Dask runtime.
Architecture
Dask supports multiple computational backends that can execute the task graphs generated from
Python code. The default backend is able to execute the task graph in a parallelized fashion
on a local computer, but there is also a distributed backend called dask/distributed
3
(or simply
distributed), which is able to execute task graphs on multiple nodes. Since this backend is most
relevant for task graph execution on distributed and HPC clusters, our experiments focus solely on
this backend, and any further reference to Dask in this text will assume that it uses the distributed
backend.
In terms of architecture, Dask is composed of three main components: the client, the server
and the worker. A single server and an arbitrary number of workers deployed together (e.g. on a
local machine or a distributed system) form a Dask cluster.
3
https://distributed.dask.org
77
The client is a user-facing library that offers various APIs used to define computations in Python
that can be converted into a task graph. Once the user defines the computation, the client can
connect to the Dask cluster (more specifically, to the server), submit the task graph, wait for it
to be computed and then gather the results. The client can build the whole task graph eagerly on
the user’s machine and then send it to the server for processing; however, this can consume a lot of
memory and network bandwidth if the task graph is large. For certain types of task graphs, clients
are able to send a much smaller, compressed abstract representation of the task graph that is only
expanded on the server lazily, which can help reduce memory and network transfer bottlenecks.
The server is the central component of the cluster, which communicates with the workers and
clients through a network, usually a TCP/IP (Transmission Control Protocol/Internet Protocol)
connection, handles client requests, coordinates task execution on workers and manages the whole
Dask cluster. Its main duty is to process task graphs submitted by clients by assigning individual
tasks to workers, in order to parallelize the execution of the submitted task graph and in turn
efficiently utilize the whole cluster. It uses a sophisticated work-stealing scheduler that uses many
heuristics, which have been tuned for many years. Some of them are described in the Dask manual
4
.
The scheduler works in the following way: when a task becomes ready, i.e. all its dependencies
are completed, it is immediately assigned to a worker according to a heuristic that tries to minimize
the estimated start time of the task. This estimate is based primarily on any potential data transfers
that would be incurred by moving the data objects of tasks between workers, and also the current
occupancy of workers. When an imbalance occurs, the scheduler tries to steal tasks from overloaded
workers and distribute them to underloaded workers. The scheduler also assigns priorities to tasks,
which are used by workers to decide which tasks should be computed first.
The worker is a process which executes tasks (consisting of serialized Python functions and
their arguments) that are assigned and sent to it by the server. Workers also communicate directly
among themselves to exchange task outputs (data objects) that they require to execute a task.
Tasks assigned to a worker are stored in a local queue; tasks are selected from it based on their
priorities. Each worker can be configured to use multiple threads, some of which handle network
(de)serialization and I/O while the rest are assigned for executing the tasks themselves. However,
there is an important caveat that can limit the parallel execution of tasks within a worker, which
is described below.
Bottlenecks
The primary bottlenecks that limit the efficiency of Dask are related to the programming language
used for its implementation. All described components (server, worker and client), are implemented
in Python, which has non-trivial consequences for its performance, because the Python language
is not well-suited for implementing highly performant software. The most commonly used Python
interpreter (CPython
5
) does not generally allow programmers to make optimal use of hardware in
an easy way due to its indirect memory layout and automatic memory management that pervasively
uses reference-counting.
4
https://distributed.dask.org/en/latest/work-stealing.html
5
https://github.com/python/cpython
78
Crucially, there is a specific quirk of this interpreter that can severely reduce the performance of
task graph execution in Dask; specifically, it affects the performance of its workers. The CPython
interpreter infamously uses a shared, per-process lock called GIL (Global Interpreter Lock), which
synchronizes access to the internal data structures of the interpreter. This means that when a
Python program is interpreted using CPython, only a single thread that executes Python code can
make progress at the same time. It is still possible to achieve concurrency with multiple Python
threads (e.g. by using blocking I/O, which releases the GIL while the thread is blocked), but not
parallelism, under this design. There are some caveats to this, notably code written using native
Python extensions (most commonly written in C, C++ or Rust) can run truly in parallel with other
Python threads, but only if it opts into releasing the global lock; this prevents the native code from
interacting with the Python interpreter while it is running.
The GIL issue does not impact only Dask, of course. It is a pervasive issue across the whole
Python ecosystem. There have been multiple attempts over the years to remove the GIL from
CPython, but they have not been successful yet. The most recent attempt has progressed the
furthest, and it has been accepted as a PEP (Python Enhancement Proposal) 703 [141], so it is
possible that the GIL will eventually be removed from CPython. However, even if this proposal is
adopted, it will probably take years before Python packages and programs will fully adapt to the
change.
The presence of the GIL poses a challenge for Dask workers. Unless a task executed by a
worker releases the GIL (which essentially means that the task either has to be implemented in
native code or it needs to block on an I/O operation), it will block all other tasks from executing
at the same time. Therefore, a single Dask worker can only execute at most one non-native and
non-blocking task at once, which can potentially severely limit its throughput and the efficiency of
task graph execution. It is worth noting that many commonly used data science and data analysis
tasks executed with Dask will likely be implemented in native code (such as numpy, pandas, or
their Dask equivalents). However, for tasks written in pure Python, the worker will essentially
behave in a single-threaded fashion. This has been observed for example in [142], where several
workflows did not benefit at all from multi-threaded Dask workers.
To alleviate this limitation, Dask workers can be executed in multiple instances on the same
node, each running inside a separate process. In this configuration, each worker has its own copy
of the CPython interpreter, and thus also its own copy of the GIL; therefore, tasks running inside
one worker do not affect (and most importantly block) tasks running on other workers. However,
this also means that certain overhead (a TCP/IP connection to the server, a worker entry in the
scheduler, management of worker tasks) is multiplied by the number of spawned workers on each
node. In certain cases, it is thus necessary to carefully balance the trade-off between too few workers
(which can hamper task execution parallelism) and too many workers (which can reduce the overall
efficiency of the Dask runtime).
79
6.2 Dask runtime overhead analysis
We have designed a series of experiments to evaluate the inner overhead of Dask and to find out
which factors affect its runtime performance the most.
6.2.1 Benchmarks
To evaluate the runtime, we have prepared a diverse set of benchmarks that span from simple
map-reduce aggregations to text processing workloads and table queries. The properties of the
task graphs used in our experiments along with the Dask API that was used to create them are
summarized in Appendix A (Table A.2).
Most of the task graphs are heavily inspired by programs from the Dask Examples repository
6
.
The definitions of the benchmarks are available in a GitHub repository
7
. A short summary of the
individual benchmarks is provided below.
merge-n creates n independent trivial tasks that are merged at the end (all of their outputs are
used as input for a final merge task). This benchmark is designed to stress the scheduler and the
server, because the individual tasks are very short (essentially they perform almost no work).
merge_slow-n-t is similar to merge-n, but with longer, t second tasks.
tree-n performs a tree reduction of 2
n
numbers using a binary tree with height n 1.
xarray-n calculates aggregations (mean, sum) on a three-dimensional grid of air temperatures [143];
n specifies size of grid partitions.
bag-n-p works with a dataset of n records in p partitions. It performs a Cartesian product,
filtering and aggregations.
numpy-n-p transposes and aggregates a two-dimensional distributed numpy array using the Arrays
interface. The array has size (n, n) and it is split into partitions of size (n/p, n/p).
groupby-d-f-p works with a table with d days of records, each record is f time units apart, records
are partitioned by p time units. It performs a groupby operation with an aggregation.
join-d-f-p uses the same table, but performs a self-join.
vectorizer-n-p uses Wordbatch
8
, a text processing library, to compute hashed features of n re-
views from a TripAdvisor dataset [144] split into p partitions.
wordbag-n-p uses the same dataset, but computes a full text processing pipeline with text nor-
malization, spelling correction, word counting and feature extraction.
6.2.2 Benchmark configuration
Our experiments were performed on the Salomon supercomputer [136]. Each Salomon node has
two sockets containing Intel Xeon E5-2680v3 with 12 cores clocked at 2.5 GHz (24 cores in total),
6
https://examples.dask.org
7
https://github.com/it4innovations/rsds/blob/master/scripts/usecases.py
8
https://github.com/anttttti/Wordbatch
80
128 GiB of RAM clocked at 2133 MHz and no local disk. The interconnections between nodes use
an InfiniBand FDR56 network with 7D enhanced hypercube topology.
Unless otherwise specified, by default we spawn 24 Dask workers per node, each using a single
thread for task computations. We chose this setting because of the CPython GIL issue described
earlier. Since our benchmarks are mostly compute-bound and not I/O-bound, a single worker
cannot effectively use more than a single thread. Not even the popular numpy and pandas libraries
used in our benchmarks are multi-threaded by default, which is also why Dask provides direct API
support for their parallelization. Using the same number of workers as the available cores ensures
that no more than a single task per core is executed at any given moment, to avoid oversubscription
of the cores.
To validate our choice of this default configuration, we have benchmarked a configuration using
a single worker with 24 threads per each node. We have found that it provides no benefit in
comparison to a single worker with only one thread in any of our benchmarks.
For each of our experiments we state the number of used worker nodes which contain only the
workers. We always use one additional node which runs both the client and the server. For our
scaling experiments, we use 1 to 63 worker nodes (24 to 1512 Dask workers), for the rest of our
experiments we use either 1 or 7 worker nodes (24 or 168 Dask workers). We have chosen these
two cluster sizes to represent a small and a medium sized Dask cluster. The number of workers is
fixed; it does not change during the computation.
We have executed each benchmark configuration five times (except for the scaling benchmarks,
which were executed only twice to lower our computational budget) and averaged the results. We
have measured the makespan as a duration between the initial task graph submission to the server
and the processing of the final task graph output by the client. The whole cluster was reinitialized
between each benchmark execution.
The following abbreviations are used in figures with benchmark results: ws marks the work-
stealing scheduler and random represents the random scheduler.
6.2.3 Evaluation
We have designed three specific experiments that focus on Dask’s scheduler, its inner overhead,
and the effect of GIL on its performance.
Random scheduler
Our original goal of this experiment was to test how different schedulers affect the performance
characteristics of Dask. However, it turned out that plugging a different scheduling algorithm into
it is complicated, because it uses a work-stealing scheduler implementation that is firmly ingrained
into its codebase across multiple places. There was not a single place where the scheduler could
be swapped for a different implementation without affecting other parts of the runtime, unlike in
Estee. Integrating a different complex scheduler into Dask would thus require making further
changes to it, which could introduce bias stemming from arbitrary implementation decisions.
We have thus decided to implement perhaps one of the simplest schedulers possible, which does
not require a complex state and which could be implemented relatively easily within Dask a
81
Figure 6.1: Speedup of Dask/random scheduler; Dask/ws is baseline.
fully random scheduler. This scheduler simply uses a PRNG engine to assign tasks to workers
at random. Our Estee experiments have shown that a completely random scheduler could still
achieve competitive performance in some cases, which was quite a surprising result to us. However,
since these results were only tested in a simulated setting, it was interesting to examine how it
would fare on an actual distributed cluster, while scheduling real-world Dask task graphs.
We can observe the results of benchmarks that compare Dask using its baseline work-stealing
scheduler vs a completely random scheduler in Figure 6.1. Based on these results, it is clear that
the random scheduler fares relatively well on both small and medium sized clusters. At worst, it
produces a makespan twice as long, but overall it is close to the performance of the work-stealing
scheduler, and in some cases it even outperforms it with a 1.4× speedup.
If we aggregate the results with a geometric mean, the random scheduler achieves 88% of the
performance of the work-stealing scheduler with a single node and 95% with seven nodes. The
performance of the random scheduler thus gets closer to the performance of work-stealing when
more workers are used.
82
The fact that the random scheduler’s performance improves with a larger number of nodes is
not surprising. With more nodes, it is easier to fully saturate the potential parallelism contained
in each task graph. Furthermore, a random scheduler produces less network traffic and has a much
smaller computational cost on the server; as we will see in the next experiment, the work-stealing
scheduler’s computational cost increases notably when more workers are present.
Furthermore, for certain task graphs, a complex scheduling algorithm might not be needed, and
a random schedule is sufficient to achieve reasonable results. However, all of that combined is still
a rather weak explanation for why a random scheduler is so competitive with the work-stealing
scheduler in Dask. In the following experiments, we will show that Dask has considerable runtime
overhead, which might introduce a bottleneck that is more impactful than the effect of the used
schedule. In Section 6.3, we will see that with a more efficient runtime and less server overhead,
random schedules will become less competitive.
Overhead per task
We have seen that a random scheduler can be surprisingly competitive with the work-stealing
scheduling implementation in Dask. In this experiment, we further examine this effect by esti-
mating the inner overhead of Dask per each executed task. In order to isolate the effects of the
specific executed tasks, as well as network communication and worker overhead, we have created a
special implementation of the Dask worker that we label zero worker.
Zero worker is a minimal implementation of the Dask worker process, written in Rust. Its
purpose is to simulate a worker with infinite computational speed, infinitely fast worker-to-worker
transfers and zero additional overhead. It actually does not perform any real computation; when a
task is assigned to a zero worker, it immediately returns a message that the task was finished. It
also remembers a set of data-objects that would be placed on the worker in a normal computation.
When a task requires a data object which is not in this list, the worker immediately sends a
message to the server, claiming that the data object was placed on it this simulates an infinitely
fast download of data between workers.
No actual worker-to-worker communication is performed; zero workers respond to every data
object fetch request with a small mocked constant data object. Such requests come from the server
when a client asks for a data object, usually at the end of the computation. Since there is no
worker-to-worker communication when the zero worker is used, fetch requests never come from
other workers.
This idealized worker implementation helps us understand the fundamental overhead of the
Dask runtime, independent of the specific tasks that are being executed. Note that even though
all tasks in this mode are basically the same, the shape and size of the benchmarked task graph
are still important, since they affect the performance of the scheduler and also the bookkeeping
overhead of the runtime.
Using this special mode, we have evaluated the average runtime overhead per each task, which
is calculated as the total makespan divided by the number of tasks in the executed task graph.
Figure 6.2 shows the average overhead per task for the merge benchmark; we can see how the average
overhead per each task (vertical axis) changes with an increasing number of tasks (horizontal axis).
83
10000 20000 30000 40000 50000 60000 70000
Task count
0.0
0.1
0.2
0.3
0.4
Average time per task [ms]
Overhead per task in 'merge' benchmark
Environment
dask-ws-24w
dask-ws-168w
dask-random-24w
dask-random-168w
Figure 6.2: Overhead per task in Dask with an increasing number of tasks.
25 50 75 100 125 150 175
Worker count
0.0
0.1
0.2
0.3
0.4
Average time per task [ms]
Overhead per task in 'merge-25000' benchmark
Environment
dask-ws
dask-random
Figure 6.3: Overhead per task in Dask with an increasing number of workers.
We can observe that the overhead of the random scheduler is smaller than the overhead of the
work-stealing scheduler, as expected. We can also see that the overhead per task increases with
an increasing number of tasks for both schedulers. There is also a distinct increase of overhead
between the configuration with 24 workers (one node) and with 168 workers (seven nodes) for the
work-stealing scheduler.
This effect can be examined in more detail in Figure 6.3, which shows how the overhead increases
when more workers are available. It is clear that the overhead of the work-stealing scheduler
increases when more workers are added to the cluster, while the overhead of the random scheduler
stays almost constant, independent of the number of workers.
The Dask manual states that “Each task suffers about 1ms of overhead. A small computation
and a network roundtrip can complete in less than 10ms.
9
. Our experiment shows that the overhead
is less than 1 ms for most of our benchmarks.
The overhead per task is an important property of the task runtime, since it determines the
minimal duration of a task that is still viable for parallel execution with that runtime. If the
duration of a task is similar or smaller than the overhead of the runtime itself, executing such task
with the runtime will probably not yield any speedup. This is demonstrated in Figure 6.4, which
shows the strong scaling of Dask on the merge_slow benchmark. The three charts demonstrate
how it scales with tasks that take 1000 ms, 100 ms and 10 ms, respectively. With tasks that take
one second, Dask is able to scale relatively well up to 1512 workers. However, when tasks take one
9
https://distributed.dask.org/en/latest
84
24
168
360
552
744
1128
1512
Worker count
0
25
50
75
100
125
benchmark = merge_slow-20K-1
Time [s]
24
168
360
552
744
1128
1512
Worker count
0
20
40
60
80
benchmark = merge_slow-20K-0.1
24
168
360
552
744
1128
1512
Worker count
0
5
10
15
20
25
benchmark = merge_slow-20K-0.01
Figure 6.4: Strong scaling of Dask with different task durations (1000 ms, 100 ms and 10 ms).
process,
threads
processes, thread
processes, threads
Figure 6.5: Effect of GIL on the performance of the pandas_groupby benchmark
tenth of the time, it only scales up to approximately 360 workers, and when tasks take only 10 ms,
Dask scales to approx. 168 workers (7 nodes); adding more workers makes the makespan longer.
The results of this experiment indicate that the general runtime overhead of Dask mainly grows
with an increasing number of tasks, no matter which scheduler is used. On the other hand, overhead
of the work-stealing scheduler grows primarily with the number of workers. This is consistent with
the results of the zero worker experiment presented earlier. They also show that the minimum
duration of tasks executed by Dask should be taken into account in order to avoid introducing too
much runtime overhead.
The fact that Dask might struggle with a larger number of workers would not be an issue on
its own, as every task runtime will have its scaling limit. However, due to the already mentioned
effect of GIL, Dask users might be forced to use more workers than would be otherwise necessary
to achieve reasonable performance. We will examine this in the following experiment.
The effect of GIL
As noted earlier, the GIL can have a non-trivial effect on the performance of Python programs.
This can also be seen in Dask, where the configuration of workers might need to be tuned in order
to achieve optimal performance.
Figure 6.5 demonstrates the effect of GIL on the pandas_groupby benchmark. We have executed
the same benchmark using three Dask worker configurations on a single node. In the default
85
configuration, with a single worker that uses 24 threads (one for each core), the pipeline finishes in
approximately 6.5 seconds. If we instead create a single worker per each core (24 processes, each
with a single thread), the performance improves significantly, by approximately 35%. This makes
it clear that the GIL is a bottleneck in this case, and more Dask workers are needed to improve
performance by enabling parallel task execution.
However, it is not so simple as to always use a single Dask worker per core. As we have
discovered with our earlier benchmarks, more workers introduce non-negligible overhead for the
Dask server. This can be seen from the result of the third configuration, which uses 4 Dask workers
(processes), each leveraging 6 threads. This configuration actually achieves the best performance
out of the three tested configurations. It is thus clear that for some Dask workflows, users might
need to carefully tune the configuration of workers to achieve optimal performance.
Summary
Our experiments have shown that Dask does not scale optimally to a large number of workers and
it might require manual tuning of worker configuration to achieve optimal performance. This led
us to the idea of improving the implementation of the Dask server in order to reduce its overhead
in HPC scenarios.
6.3 RSDS task runtime
In order to examine how much the performance of Dask could be improved if we were able to
reduce its runtime overhead, we have developed RSDS (Rust Dask server), an open-source drop-in
replacement for the Dask server [145]. We had the following goals for its design and implementation:
Keep backwards compatibility with existing Dask programs, so that it can be used to speed
up existing workflows. This also enables us to compare the performance of RSDS and Dask
on the benchmarks described in the previous section.
Design an efficient runtime that could scale to HPC use-cases, to find a baseline for how much
fast Dask could become if its overhead was reduced.
Use a modular architecture that would enable easily replacing the implementation of the
scheduler, to enable easier experimentation with scheduling algorithms on non-simulated dis-
tributed clusters.
Architecture
RSDS is implemented in the Rust programming language, which has a minimal runtime and does
not dictate automatic memory management. This reduces the ubiquitous overhead of reference
counting and data indirection present in Python. It also has direct support for asynchronous I/O
and provides strong memory safety guarantees. Therefore, it is well-suited for writing distributed
applications.
86
Client
Server
Worker
Worker
Worker
Worker
Computing Node
RSDS
Reactor
Scheduler
Figure 6.6: Architecture of RSDS
(Dask components are dashed)
The architecture of RSDS is shown in Figure 6.6. It reuses the client and worker components
from Dask, but replaces its server (and thus also the scheduler). The main architectural difference
between the two implementations is that RSDS separates the server into two parts: reactor and
scheduler. The reactor manages worker and client connections, maintains bookkeeping information
and translates scheduler assignments into Dask network messages which are then sent to the
workers.
The scheduler is a process which receives a task graph and outputs assignments of tasks to
workers. It does not know about worker network connections, Dask network messages or any
other bookkeeping that is not relevant to scheduling. Since it is isolated, it can be easily swapped,
and thus it is trivial to experiment with different schedulers in RSDS. Another benefit is that
we can easily run the scheduler in a separate thread to enable concurrent scheduling and runtime
management. This is possible because the scheduler does not share any data structures with the
reactor. A disadvantage of this scheme is that both the reactor and the scheduler need to maintain
their own copies of submitted task graphs, which increases their memory usage. However, we have
not found this to be an issue in practice.
Same as Dask, the RSDS scheduler also ensures that task dependencies and resource require-
ments are always satisfied and that worker resources are not being oversubscribed, as defined
by Definition 4 and Definition 5.
Compatibility with Dask
Dask components communicate together using a custom communication protocol that contains
dozens of different message types. Messages are represented as dictionaries, where a single field
specifies the type of the message and the remaining fields contain parameters specific to the given
message type.
Dask splits messages into multiple parts called frames and serializes each frame with the Mes-
sagePack
10
binary serialization format. The messages are split mostly for performance reasons. If
a client sends large binary data to a worker (or vice-versa), the server can deserialize only the first
10
https://msgpack.org
87
Original message
{
"op": "compute-task",
"function": <binary-data>
}
Dask encoding
frame 0: [{ "op": "compute-task" }]
frame 1: {[0, "function"]: {"frame": 2}}
frame 2: <binary-data>
Rsds encoding
frame 0: [{
"op": "compute-task",
"function": {"frame": 1}
}]
frame 1: <binary-data>
Figure 6.7: Dask message encoding
frame containing the message metadata, and it then does not need to further process the binary
data; it merely forwards it to the correct destination (worker or client). The protocol is designed