Parallel Processing with Dask on GridEngine Clusters

written by Eric J. Ma on 2018-10-11

parallel dask gridengine data science optimization

I recently just figured out how to get this working... and it's awesome! :D

Motivation

If I'm developing an analysis in the Jupyter notebook, and I have one semi-long-running function (e.g. takes dozens of seconds) that I need to run over tens to hundreds of thousands of similar inputs, it'll take ages for this to complete in serial. For a sense of scale, a function that takes ~20 seconds per call run serially over 10,000 similar inputs would take 200,000 seconds, which is 2 days of run-time (not including any other overhead). That's not feasible for interactive exploration of data. If I could somehow parallelize just the function over 500 compute nodes, we could take the time down to 7 minutes.

GridEngine-based compute clusters are one of many options for parallelizing work. During grad school at MIT, and at work at Novartis, the primary compute cluster environment that I've encountered has been GridEngine-based. However, because they are designed for batch jobs, as a computational scientist, we have to jump out of whatever development environment we're currently using, and move to custom scripts.

In order to do parallelism with traditional GridEngine systems, I would have to jump out of the notebook and start writing job submission scripts, which disrupts my workflow. I would be disrupting my thought process, and lose the interactivity that I might need to prototype my work faster.

Enter Dask

dask, alongside dask-jobqueue enables computational scientists like myself to take advantage of existing GridEngine setups to do interactive, parallel work. As long as I have a Jupyter notebook server running on a GridEngine-connected compute node, I can submit functions to the GridEngine cluster and collect back those results to do further processing, in a fraction of the time that it would take, thus enabling me to do my science faster than if I did everything single core/single node.

In this blog post, I'd like to share an annotated, minimal setup for using Dask on a GridEngine cluster. (Because we use Dask, more complicated pipelines are possible as well - I would encourage you to read the Dask docs for more complex examples.) I will assume that you are working in a Jupyter notebook environment, and that the notebook you are working out of is hosted on a GridEngine-connected compute node, from which you are able to qsub tasks. Don't worry, you won't be qsub-ing anything though!

Setup

To start, we need a cell that houses the following code block:

from dask_jobqueue import SGECluster
from dask import delayed, compute

cluster = SGECluster(queue='default.q',
                     walltime="1500000",
                     processes=1,
                     memory='1GB',
                     cores=1,
                     env_extra=['source /path/to/custom/script.sh',
                                'export ENV_VARIABLE="SOMETHING"']
                       )

Here, we are instantiating an SGECluster object under the variable name cluster. What cluster stores is essentially a configuration for a block of worker nodes that you will be requesting. Under the hood, what dask-jobqueue is doing is submitting jobs to the GridEngine scheduler, which will block off a specified amount of compute resources (e.g. number of cores, amount of RAM, whether you want GPUs or not, etc.) for a pre-specified amount of time, on which Dask then starts a worker process to communicate with the head process coordinating tasks amongst workers.

As such, you do need to know two pieces of information:

  1. queue: The queue that jobs are to be submitted to. Usually, it is named something like default.q, but you will need to obtain this through GridEngine. If you have the ability to view all jobs that are running, you can call qstat at the command line to see what queues are being used. Otherwise, you might have to ping your system administrator for this information.
  2. walltime: You will also need to pre-estimate the wall clock time, in seconds, that you want the worker node to be alive for. It should be significantly longer than the expected time you think you will need, so that your function call doesn't timeout unexpectedly. I have defaulted to 1.5 million seconds, which is about 18 days of continual runtime. In practice, I usually kill those worker processes after just a few hours.

Besides that, you also need to specify the resources that you need per worker process. In my example above, I'm asking for each worker process to use only 1GB of RAM, 1 core, and to use only 1 process per worker (i.e. no multiprocessing, I think).

Finally, I can also specify extra environment setups that I will need. Because each worker process is a new process that has no knowledge of the parent process' environment, you might have to source some bash script, or activate a Python environment, or export some environment variable. This can be done under the env_extra keyword, which accepts a list of strings.

Request for worker compute "nodes"

I put "nodes" in quotation marks, because they are effectively logical nodes, rather than actual compute nodes. (Technically, I think a compute node minimally means one physical hardware unit with CPUs and RAM).

In order to request for worker nodes to run your jobs, you need the next line of code:

cluster.scale(500)

With this line, under the hood, dask-jobqueue will start submitting 500 jobs, each requesting 1GB of RAM and 1 core, populating my compute environment according to the instructions I provided under env_extra.

At the end of this, I effectively have a 500-node cluster on the larger GridEngine cluster (let's call this a "virtual cluster"), each with 1GB of RAM and 1 core available to it, on which I can submit functions to run.

Start a client process

In order to submit jobs to my virtual cluster, I have to instantiate a client that is connected to the cluster, and is responsible for sending functions there.

from dask.distributed import Client

client = Client(cluster)

Compute!

With this setup complete (I have it stored as a TextExpander snippets), we can now start submitting functions to the virtual cluster!

To simulate this, let's define a square-rooting function that takes 2-3 seconds to run each time it is called, and returns the square of its inputs. This simulates a function call that is computationally semi-expensive to run a few times, but because call on this hundreds of thousands of time, the total running time to run it serially would be too much.

from time import sleep
from math import sqrt
from random import random
def slow_sqrt(x):
    """
    Simulates the run time needed for a semi-expensive function call.
    """
    assert x > 0
    # define sleeping time in seconds, between 2-3 seconds.
    t = 2 + random()
    sleep(t)

    return sqrt(x)

Serial Execution

In a naive, serial setting, we would call on the function in a for-loop:

results = []
for i in range(10000):
    results.append(slow_sqrt(i))

This would take us anywhere between 20,000 to 30,000 seconds (approximately 8 hours, basically).

Parallel Execution

In order to execute this in parallel instead, we could do one of the following three ways:

map

sq_roots = client.map(slow_sqrt, range(10000))
sq_roots = client.gather(sq_roots)

for-loop

sq_roots = []
for i in range(10000):
    sq_roots.append(client.submit(slow_sqrt, i, restart=20))  # submit the function as first argument, then rest of arguments
sq_roots = client.gather(sq_roots)

delayed

sq_roots = []
for i in range(10000):
    sq_roots.append(delayed(slow_sqrt)(i))
sq_roots = compute(*sq_roots)

I have some comments on each of the three methods, each of which I have used.

First off, each of them do require us to change the code that we would have written in serial. This little bit of overhead is the only tradeoff we really need to make in order to gain parallelism.

In terms of readability, all of them are quite readable, though in my case, I tend to favour the for-loop with client.submit. Here is why.

For readability, the for-loop explicitly indicates that we are looping over something. It's probably more easy for novices to approach my code that way.

For debuggability, client.submit returns a Futures object (same goes for client.map). A "Futures" object might be confusing at first, so let me start by demystifying that. A Futures object promises that the result that is computed from slow_sqrt will exist, and actually contains a ton of diagnostic information, including the type of the object (which can be useful for diagnosing whether my function actually ran correctly). In addition to that, I can call on Futures.result() to inspect the actual result (in this case, sq_roots[0].result()). This is good for debugging the function call, in case there are issues when scaling up. (At work, I was pinging a database in parallel, and sometimes the ping would fail; debugging led me to include some failsafe code, including retries and sleeps with random lengths to stagger out database calls.)

Finally, the Futures interface is non-blocking on my Jupyter notebook session. Once I've submitted the jobs, I can continue with other development work in my notebook in later cells, and check back when the Dask dashboard indicates that the jobs are done.

That said, I like the delayed interface as well. Once I was done debugging and confident that my own data pipeline at work wouldn't encounter the failure modes I was seeing, I switched over to the delayed interface and scaled up my analysis. I was willing to trade in the interactivity using the Futures interface for the automation provided by the delayed interface. (I also first used Dask on a single node through the delayed interface as well).

Of course, there's something also to be said for the simplicity of two lines of code for parallelism (with the client.map example).

The final line in each of the code blocks allows us to "gather" the results back into my coordinator node's memory, thus completing the function call and giving us the result we needed.

Conclusions

That concludes it! The two key ideas illustrated in this blog post were:

  1. To set up a virtual cluster on a GridEngine system, we essentially harness the existing job submission system to generate workers that listen for tasks.
  2. A useful programming pattern is to submit functions using the client object using client.submit(func, *args, **kwargs). This requires minimal changes from serial code.

Practical Tips

Here's some tips for doing parallel processing, which I've learned over the years.

Firstly, never prematurely parallelize. It's as bad as prematurely optimizing code. If your code is running slowly, check first to make sure that there aren't algorithmic complexity issues, or bandwidths being clogged up (e.g. I/O bound). As the Dask docs state, it is easier to achieve those gains first before doing parallelization.

Secondly, when developing parallel workflows, make sure to test the pipeline on subsets of input data first, and slowly scale up. It is during this period that you can also profile memory usage to check to see if you need to request for more RAM per worker.

Thirdly, for GridEngine clusters, it is usually easier to request for many small worker nodes that consume few cores and small amounts of RAM. If your job is trivially parallelizable, this may be a good thing.

Fourthly, it's useful to have realistic expectations on the kinds of speed-ups you can expect to gain. At work, through some ad-hoc profiling, I quickly came to the realization that concurrent database pings were the most likely bottleneck in my code's speed, and that nothing apart from increasing the number of concurrent database pings allowed would make my parallel code go faster.

Finally, on a shared cluster, be respectful of others' usage. Don't request for unreasonable amounts of compute time. And when you're confirmed done with your analysis work, remember to shut down the virtual cluster! :)

Did you enjoy this blog post? Let's discuss more!


Optimizing Block Sparse Matrix Creation with Python

written by Eric J. Ma on 2018-09-04

graph optimization numba python data science sparse matrix

import networkx as nx
import numpy as np
import scipy.sparse as sp
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
from typing import List
from numba import jit

sns.set_context('talk')
sns.set_style('white')

%load_ext autoreload
%autoreload 2
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

Introduction

At work, I recently encountered a neat problem. I'd like to share it with you all.

One of my projects involves graphs; specifically, it involves taking individual graphs and turning them into one big graph. If you've taken my Network Analysis Made Simple workshops before, you'll have learned that graphs can be represented as a matrix, such as the one below:

G = nx.erdos_renyi_graph(n=10, p=0.2)
A = nx.to_numpy_array(G)

sns.heatmap(A)

png

Because the matrix is so sparse, we can actually store it as a sparse matrix:

A_sparse = nx.adjacency_matrix(G).tocoo()
[A_sparse.row, A_sparse.col, A_sparse.data]
[array([0, 0, 1, 2, 3, 3, 3, 4, 4, 5, 5, 5, 5, 6, 7, 7, 8, 8], dtype=int32),
 array([5, 7, 4, 7, 5, 6, 8, 1, 5, 0, 3, 4, 8, 3, 0, 2, 3, 5], dtype=int32),
 array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], dtype=int64)]

The most straightforward way of storing sparse matrices is in the COO (COOrdinate) format, which is also known as the "triplet" format, or the "ijv" format. ("i" is row, "j" is col, "v" is value)

If we want to have two or more graphs stored together in a single matrix, which was what my projects required, then one way of representing them is as follows:

G2 = nx.erdos_renyi_graph(n=15, p=0.2)
A2 = nx.to_numpy_array(G2)
sns.heatmap(sp.block_diag([A, A2]).todense())

png

Now, notice how there's 25 nodes in total (0 to 24), and that they form what we call a "block diagonal" format. In its "dense" form, we have to represent $25^2$ values inside the matrix. That's fine for small amounts of data, but if we have tens of thousands of graphs, that'll be impossible to deal with!

You'll notice I used a function from scipy.sparse, the block_diag function, which will create a block diagonal sparse matrix from an iterable of input matrices.

block_diag is the function that I want to talk about in this post.

Profiling block_diag performance

I had noticed that when dealing with tens of thousands of graphs, block_diag was not performing up to scratch. Specifically, the time it needed would scale quadratically with the number of matrices provided.

Let's take a look at some simulated data to illustrate this.

%%time
Gs = []
As = []
for i in range(3000):
    n = np.random.randint(10, 30)
    p = 0.2
    G = nx.erdos_renyi_graph(n=n, p=p)
    Gs.append(G)
    A = nx.to_numpy_array(G)
    As.append(A)

Let's now define a function to profile the code.

from time import time
from random import sample

def profile(func):
    n_graphs = [100, 200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000]
    data = []
    for n in tqdm(n_graphs):
        for i in range(3):  # 3 replicates per n
            start = time()
            func(sample(As, n))
            end = time()
            data.append((n, end - start))
    return data

data_sp = profile(sp.block_diag)
plt.scatter(*zip(*data_sp))
plt.xlabel('number of graphs')
plt.ylabel('time (s)')

png

It is quite clear that the increase in time is super-linear, showing $O(n^2)$ scaling. (Out of impatience, I did not go beyond 50,000 graphs in this post, but at work, I did profile performance up to that many graphs. For reference, it took about 5 minutes to finish creating the scipy sparse matrix for 50K graphs.

Optimizing block_diag performance

I decided to take a stab at creating an optimized version of block_diag. Having profiled my code and discovering that sparse block diagonal matrix creation was a bottleneck, I implemented my own sparse block diagonal matrix creation routine using pure Python.

def _block_diag(As: List[np.array]):
    """
    Return the (row, col, data) triplet for a block diagonal matrix.

    Intended to be put into a coo_matrix. Can be from scipy.sparse, but
    also can be cupy.sparse, or Torch sparse etc.

    Example usage:

    >>> row, col, data = _block_diag(As)
    >>> coo_matrix((data, (row, col)))

    :param As: A list of numpy arrays to create a block diagonal matrix.
    :returns: (row, col, data), each as lists.
    """
    row = []
    col = []
    data = []
    start_idx = 0
    for A in As:
        nrows, ncols = A.shape
        for r in range(nrows):
            for c in range(ncols):
                if A[r, c] != 0:
                    row.append(r + start_idx)
                    col.append(c + start_idx)
                    data.append(A[r, c])
        start_idx = start_idx + nrows
    return row, col, data

Running it through the same profiling routine:

data_custom = profile(_block_diag)
plt.scatter(*zip(*data_custom), label='custom')
plt.scatter(*zip(*data_sp), label='scipy.sparse')
plt.legend()
plt.xlabel('number of graphs')
plt.ylabel('time (s)')

png

I also happened to have listened in on a talk by Siu Kwan Lam during lunch, on numba, the JIT optimizer that he has been developing for the past 5 years now. Seeing as how the code I had written in _block_diag was all numeric code, which is exactly what numba was designed for, I decided to try optimizing it with JIT.

from numba import jit

data_jit = profile(jit(_block_diag))

plt.scatter(*zip(*data_custom), label='custom')
plt.scatter(*zip(*data_sp), label='scipy.sparse')
plt.scatter(*zip(*data_jit), label='jit')
plt.legend()
plt.xlabel('number of graphs')
plt.ylabel('time (s)')

png

Notice the speed-up that JIT-ing the code provided! (Granted, that first run was a "warm-up" run; once JIT-compiled, everything is really fast!)

My custom implementation only returns the (row, col, data) triplet. This is an intentional design choice - having profiled the code with and without calling a COO matrix creation routine, I found the JIT-optimized performance to be significantly better without creating the COO matrix routine. As I still have to create a sparse matrix, I ended up with the following design:

def block_diag(As):
    row, col, data = jit(_block_diag)(As)
    return sp.coo_matrix((data, (row, col)))

data_wrap = profile(block_diag)
plt.scatter(*zip(*data_custom), label='custom')
plt.scatter(*zip(*data_sp), label='scipy.sparse')
plt.scatter(*zip(*data_jit), label='jit')
plt.scatter(*zip(*data_wrap), label='wrapped')
plt.legend()
plt.xlabel('number of graphs')
plt.ylabel('time (s)')

png

You'll notice that the array creation step induces a consistent overhead on top of the sparse matrix triplet creation routine, but stays flat and trends the "jit" dots quite consistently. It intersects the "custom" dots at about $10^3$ graphs. Given the problem that I've been tackling, which involves $10^4$ to $10^6$ graphs at a time, it is an absolutely worthwhile improvement to JIT-compile the _block_diag function.

Conclusion

This was simultaneously a fun and useful exercise in optimizing my code!

A few things I would take away from this:

I hope you learned something new, and I hope you also enjoyed reading this post as much as I enjoyed writing it!

Did you enjoy this blog post? Let's discuss more!


Joint, conditional, and marginal probability distributions

written by Eric J. Ma on 2018-08-07

statistics probability bayesian data science

Joint probability, conditional probability, and marginal probability... These are three central terms when learning about probability, and they show up in Bayesian statistics as well. However... I never really could remember what they were, especially since we were usually taught them using formulas, rather than pictures.

Well, for those who learn a bit better using pictures... if you know what a probability distribution is, then hopefully these help with remembering what these terms mean. (Clicking on the image will bring you to the original, hosted on GitHub.)

Did you enjoy this blog post? Let's discuss more!


d-separation in causal inference

written by Eric J. Ma on 2018-08-06

causal inference bayesian data science

Yesterday evening, I had an empty block of time during which I finally did a worked example of finding whether two nodes are "d-separated" in a causal graph. It was pretty instructive to implement the algorithm. It also reminded me yet again: there's this weird thing about me where I need programming to learn math!

Anyways, if you're interested in seeing the implementation, it's available at GitHub.

Did you enjoy this blog post? Let's discuss more!


nxviz 0.5 released!

written by Eric J. Ma on 2018-08-01

nxviz visualization data science software open source

A new version of nxviz is released!

In this update, I have added a declarative interface for visualizing geographically-constrained graphs. Here, nodes in a graph have their placement constrained by longitude and latitude.

An example of how to use it is below:

nxviz geoplots

In the GeoPlot constructor API, the keyword arguments node_lat and node_lon specify which node metadata are to be used to place nodes on the x- and y- axes.

By no means do I intend for GeoPlot to replace more sophisticated analysis methods; like seaborn, the interface is declarative; for me, the intent is to provide a very quick-and-dirty way for an end user to visualize graphs with spatially constrained nodes.

Please enjoy!

Did you enjoy this blog post? Let's discuss more!