I recently just figured out how to get this working... and it's awesome! :D
If I'm developing an analysis in the Jupyter notebook, and I have one semi-long-running function (e.g. takes dozens of seconds) that I need to run over tens to hundreds of thousands of similar inputs, it'll take ages for this to complete in serial. For a sense of scale, a function that takes ~20 seconds per call run serially over 10,000 similar inputs would take 200,000 seconds, which is 2 days of run-time (not including any other overhead). That's not feasible for interactive exploration of data. If I could somehow parallelize just the function over 500 compute nodes, we could take the time down to 7 minutes.
GridEngine-based compute clusters are one of many options for parallelizing work. During grad school at MIT, and at work at Novartis, the primary compute cluster environment that I've encountered has been GridEngine-based. However, because they are designed for batch jobs, as a computational scientist, we have to jump out of whatever development environment we're currently using, and move to custom scripts.
In order to do parallelism with traditional GridEngine systems, I would have to jump out of the notebook and start writing job submission scripts, which disrupts my workflow. I would be disrupting my thought process, and lose the interactivity that I might need to prototype my work faster.
dask-jobqueue enables computational scientists like myself to take advantage of existing GridEngine setups to do interactive, parallel work. As long as I have a Jupyter notebook server running on a GridEngine-connected compute node, I can submit functions to the GridEngine cluster and collect back those results to do further processing, in a fraction of the time that it would take, thus enabling me to do my science faster than if I did everything single core/single node.
In this blog post, I'd like to share an annotated, minimal setup for using Dask on a GridEngine cluster. (Because we use Dask, more complicated pipelines are possible as well - I would encourage you to read the Dask docs for more complex examples.) I will assume that you are working in a Jupyter notebook environment, and that the notebook you are working out of is hosted on a GridEngine-connected compute node, from which you are able to
qsub tasks. Don't worry, you won't be qsub-ing anything though!
To start, we need a cell that houses the following code block:
from dask_jobqueue import SGECluster from dask import delayed, compute cluster = SGECluster(queue='default.q', walltime="1500000", processes=1, memory='1GB', cores=1, env_extra=['source /path/to/custom/script.sh', 'export ENV_VARIABLE="SOMETHING"'] )
Here, we are instantiating an
SGECluster object under the variable name
cluster stores is essentially a configuration for a block of worker nodes that you will be requesting. Under the hood, what
dask-jobqueue is doing is submitting jobs to the GridEngine scheduler, which will block off a specified amount of compute resources (e.g. number of cores, amount of RAM, whether you want GPUs or not, etc.) for a pre-specified amount of time, on which Dask then starts a worker process to communicate with the head process coordinating tasks amongst workers.
As such, you do need to know two pieces of information:
queue: The queue that jobs are to be submitted to. Usually, it is named something like
default.q, but you will need to obtain this through GridEngine. If you have the ability to view all jobs that are running, you can call
qstatat the command line to see what queues are being used. Otherwise, you might have to ping your system administrator for this information.
walltime: You will also need to pre-estimate the wall clock time, in seconds, that you want the worker node to be alive for. It should be significantly longer than the expected time you think you will need, so that your function call doesn't timeout unexpectedly. I have defaulted to 1.5 million seconds, which is about 18 days of continual runtime. In practice, I usually kill those worker processes after just a few hours.
Besides that, you also need to specify the resources that you need per worker process. In my example above, I'm asking for each worker process to use only 1GB of RAM, 1 core, and to use only 1 process per worker (i.e. no multiprocessing, I think).
Finally, I can also specify extra environment setups that I will need. Because each worker process is a new process that has no knowledge of the parent process' environment, you might have to source some bash script, or activate a Python environment, or export some environment variable. This can be done under the
env_extra keyword, which accepts a list of strings.
I put "nodes" in quotation marks, because they are effectively logical nodes, rather than actual compute nodes. (Technically, I think a compute node minimally means one physical hardware unit with CPUs and RAM).
In order to request for worker nodes to run your jobs, you need the next line of code:
With this line, under the hood,
dask-jobqueue will start submitting 500 jobs, each requesting 1GB of RAM and 1 core, populating my compute environment according to the instructions I provided under
At the end of this, I effectively have a 500-node cluster on the larger GridEngine cluster (let's call this a "virtual cluster"), each with 1GB of RAM and 1 core available to it, on which I can submit functions to run.
In order to submit jobs to my virtual cluster, I have to instantiate a client that is connected to the cluster, and is responsible for sending functions there.
from dask.distributed import Client client = Client(cluster)
With this setup complete (I have it stored as a TextExpander snippets), we can now start submitting functions to the virtual cluster!
To simulate this, let's define a square-rooting function that takes 2-3 seconds to run each time it is called, and returns the square of its inputs. This simulates a function call that is computationally semi-expensive to run a few times, but because call on this hundreds of thousands of time, the total running time to run it serially would be too much.
from time import sleep from math import sqrt from random import random def slow_sqrt(x): """ Simulates the run time needed for a semi-expensive function call. """ assert x > 0 # define sleeping time in seconds, between 2-3 seconds. t = 2 + random() sleep(t) return sqrt(x)
In a naive, serial setting, we would call on the function in a for-loop:
results =  for i in range(10000): results.append(slow_sqrt(i))
This would take us anywhere between 20,000 to 30,000 seconds (approximately 8 hours, basically).
In order to execute this in parallel instead, we could do one of the following three ways:
sq_roots = client.map(slow_sqrt, range(10000)) sq_roots = client.gather(sq_roots)
sq_roots =  for i in range(10000): sq_roots.append(client.submit(slow_sqrt, i, restart=20)) # submit the function as first argument, then rest of arguments sq_roots = client.gather(sq_roots)
sq_roots =  for i in range(10000): sq_roots.append(delayed(slow_sqrt)(i)) sq_roots = compute(*sq_roots)
I have some comments on each of the three methods, each of which I have used.
First off, each of them do require us to change the code that we would have written in serial. This little bit of overhead is the only tradeoff we really need to make in order to gain parallelism.
In terms of readability, all of them are quite readable, though in my case, I tend to favour the for-loop with
client.submit. Here is why.
For readability, the for-loop explicitly indicates that we are looping over something. It's probably more easy for novices to approach my code that way.
client.submit returns a Futures object (same goes for
client.map). A "Futures" object might be confusing at first, so let me start by demystifying that. A Futures object promises that the result that is computed from
slow_sqrt will exist, and actually contains a ton of diagnostic information, including the
type of the object (which can be useful for diagnosing whether my function actually ran correctly). In addition to that, I can call on
Futures.result() to inspect the actual result (in this case,
sq_roots.result()). This is good for debugging the function call, in case there are issues when scaling up. (At work, I was pinging a database in parallel, and sometimes the ping would fail; debugging led me to include some failsafe code, including retries and sleeps with random lengths to stagger out database calls.)
Finally, the Futures interface is non-blocking on my Jupyter notebook session. Once I've submitted the jobs, I can continue with other development work in my notebook in later cells, and check back when the Dask dashboard indicates that the jobs are done.
That said, I like the
delayed interface as well. Once I was done debugging and confident that my own data pipeline at work wouldn't encounter the failure modes I was seeing, I switched over to the
delayed interface and scaled up my analysis. I was willing to trade in the interactivity using the
Futures interface for the automation provided by the
delayed interface. (I also first used Dask on a single node through the delayed interface as well).
Of course, there's something also to be said for the simplicity of two lines of code for parallelism (with the
The final line in each of the code blocks allows us to "gather" the results back into my coordinator node's memory, thus completing the function call and giving us the result we needed.
That concludes it! The two key ideas illustrated in this blog post were:
submitfunctions using the
client.submit(func, *args, **kwargs). This requires minimal changes from serial code.
Here's some tips for doing parallel processing, which I've learned over the years.
Firstly, never prematurely parallelize. It's as bad as prematurely optimizing code. If your code is running slowly, check first to make sure that there aren't algorithmic complexity issues, or bandwidths being clogged up (e.g. I/O bound). As the Dask docs state, it is easier to achieve those gains first before doing parallelization.
Secondly, when developing parallel workflows, make sure to test the pipeline on subsets of input data first, and slowly scale up. It is during this period that you can also profile memory usage to check to see if you need to request for more RAM per worker.
Thirdly, for GridEngine clusters, it is usually easier to request for many small worker nodes that consume few cores and small amounts of RAM. If your job is trivially parallelizable, this may be a good thing.
Fourthly, it's useful to have realistic expectations on the kinds of speed-ups you can expect to gain. At work, through some ad-hoc profiling, I quickly came to the realization that concurrent database pings were the most likely bottleneck in my code's speed, and that nothing apart from increasing the number of concurrent database pings allowed would make my parallel code go faster.
Finally, on a shared cluster, be respectful of others' usage. Don't request for unreasonable amounts of compute time. And when you're confirmed done with your analysis work, remember to shut down the virtual cluster! :)
Did you enjoy this blog post? Let's discuss more!
import networkx as nx import numpy as np import scipy.sparse as sp import matplotlib.pyplot as plt import seaborn as sns from tqdm import tqdm from typing import List from numba import jit sns.set_context('talk') sns.set_style('white') %load_ext autoreload %autoreload 2 %matplotlib inline %config InlineBackend.figure_format = 'retina'
At work, I recently encountered a neat problem. I'd like to share it with you all.
One of my projects involves graphs; specifically, it involves taking individual graphs and turning them into one big graph. If you've taken my Network Analysis Made Simple workshops before, you'll have learned that graphs can be represented as a matrix, such as the one below:
G = nx.erdos_renyi_graph(n=10, p=0.2) A = nx.to_numpy_array(G) sns.heatmap(A)
Because the matrix is so sparse, we can actually store it as a sparse matrix:
A_sparse = nx.adjacency_matrix(G).tocoo() [A_sparse.row, A_sparse.col, A_sparse.data]
[array([0, 0, 1, 2, 3, 3, 3, 4, 4, 5, 5, 5, 5, 6, 7, 7, 8, 8], dtype=int32), array([5, 7, 4, 7, 5, 6, 8, 1, 5, 0, 3, 4, 8, 3, 0, 2, 3, 5], dtype=int32), array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], dtype=int64)]
The most straightforward way of storing sparse matrices is in the COO (COOrdinate) format, which is also known as the "triplet" format, or the "ijv" format. ("i" is row, "j" is col, "v" is value)
If we want to have two or more graphs stored together in a single matrix, which was what my projects required, then one way of representing them is as follows:
G2 = nx.erdos_renyi_graph(n=15, p=0.2) A2 = nx.to_numpy_array(G2) sns.heatmap(sp.block_diag([A, A2]).todense())
Now, notice how there's 25 nodes in total (0 to 24), and that they form what we call a "block diagonal" format. In its "dense" form, we have to represent $25^2$ values inside the matrix. That's fine for small amounts of data, but if we have tens of thousands of graphs, that'll be impossible to deal with!
You'll notice I used a function from
block_diag function, which will create a block diagonal sparse matrix from an iterable of input matrices.
block_diag is the function that I want to talk about in this post.
I had noticed that when dealing with tens of thousands of graphs,
block_diag was not performing up to scratch. Specifically, the time it needed would scale quadratically with the number of matrices provided.
Let's take a look at some simulated data to illustrate this.
%%time Gs =  As =  for i in range(3000): n = np.random.randint(10, 30) p = 0.2 G = nx.erdos_renyi_graph(n=n, p=p) Gs.append(G) A = nx.to_numpy_array(G) As.append(A)
Let's now define a function to profile the code.
from time import time from random import sample def profile(func): n_graphs = [100, 200, 400, 600, 800, 1000, 1200, 1400, 1600, 1800, 2000] data =  for n in tqdm(n_graphs): for i in range(3): # 3 replicates per n start = time() func(sample(As, n)) end = time() data.append((n, end - start)) return data data_sp = profile(sp.block_diag) plt.scatter(*zip(*data_sp)) plt.xlabel('number of graphs') plt.ylabel('time (s)')
It is quite clear that the increase in time is super-linear, showing $O(n^2)$ scaling. (Out of impatience, I did not go beyond 50,000 graphs in this post, but at work, I did profile performance up to that many graphs. For reference, it took about 5 minutes to finish creating the scipy sparse matrix for 50K graphs.
I decided to take a stab at creating an optimized version of
block_diag. Having profiled my code and discovering that sparse block diagonal matrix creation was a bottleneck, I implemented my own sparse block diagonal matrix creation routine using pure Python.
def _block_diag(As: List[np.array]): """ Return the (row, col, data) triplet for a block diagonal matrix. Intended to be put into a coo_matrix. Can be from scipy.sparse, but also can be cupy.sparse, or Torch sparse etc. Example usage: >>> row, col, data = _block_diag(As) >>> coo_matrix((data, (row, col))) :param As: A list of numpy arrays to create a block diagonal matrix. :returns: (row, col, data), each as lists. """ row =  col =  data =  start_idx = 0 for A in As: nrows, ncols = A.shape for r in range(nrows): for c in range(ncols): if A[r, c] != 0: row.append(r + start_idx) col.append(c + start_idx) data.append(A[r, c]) start_idx = start_idx + nrows return row, col, data
Running it through the same profiling routine:
data_custom = profile(_block_diag) plt.scatter(*zip(*data_custom), label='custom') plt.scatter(*zip(*data_sp), label='scipy.sparse') plt.legend() plt.xlabel('number of graphs') plt.ylabel('time (s)')
I also happened to have listened in on a talk by Siu Kwan Lam during lunch, on
numba, the JIT optimizer that he has been developing for the past 5 years now. Seeing as how the code I had written in
_block_diag was all numeric code, which is exactly what
numba was designed for, I decided to try optimizing it with JIT.
from numba import jit data_jit = profile(jit(_block_diag)) plt.scatter(*zip(*data_custom), label='custom') plt.scatter(*zip(*data_sp), label='scipy.sparse') plt.scatter(*zip(*data_jit), label='jit') plt.legend() plt.xlabel('number of graphs') plt.ylabel('time (s)')
Notice the speed-up that JIT-ing the code provided! (Granted, that first run was a "warm-up" run; once JIT-compiled, everything is really fast!)
My custom implementation only returns the (row, col, data) triplet. This is an intentional design choice - having profiled the code with and without calling a COO matrix creation routine, I found the JIT-optimized performance to be significantly better without creating the COO matrix routine. As I still have to create a sparse matrix, I ended up with the following design:
def block_diag(As): row, col, data = jit(_block_diag)(As) return sp.coo_matrix((data, (row, col))) data_wrap = profile(block_diag) plt.scatter(*zip(*data_custom), label='custom') plt.scatter(*zip(*data_sp), label='scipy.sparse') plt.scatter(*zip(*data_jit), label='jit') plt.scatter(*zip(*data_wrap), label='wrapped') plt.legend() plt.xlabel('number of graphs') plt.ylabel('time (s)')
You'll notice that the array creation step induces a consistent overhead on top of the sparse matrix triplet creation routine, but stays flat and trends the "jit" dots quite consistently. It intersects the "custom" dots at about $10^3$ graphs. Given the problem that I've been tackling, which involves $10^4$ to $10^6$ graphs at a time, it is an absolutely worthwhile improvement to JIT-compile the
This was simultaneously a fun and useful exercise in optimizing my code!
A few things I would take away from this:
numbacan really speed up array-oriented Python computation. It lives up to the claims on its documentation.
I hope you learned something new, and I hope you also enjoyed reading this post as much as I enjoyed writing it!
Did you enjoy this blog post? Let's discuss more!
Joint probability, conditional probability, and marginal probability... These are three central terms when learning about probability, and they show up in Bayesian statistics as well. However... I never really could remember what they were, especially since we were usually taught them using formulas, rather than pictures.
Well, for those who learn a bit better using pictures... if you know what a probability distribution is, then hopefully these help with remembering what these terms mean. (Clicking on the image will bring you to the original, hosted on GitHub.)
Did you enjoy this blog post? Let's discuss more!
Yesterday evening, I had an empty block of time during which I finally did a worked example of finding whether two nodes are "d-separated" in a causal graph. It was pretty instructive to implement the algorithm. It also reminded me yet again: there's this weird thing about me where I need programming to learn math!
Anyways, if you're interested in seeing the implementation, it's available at GitHub.
Did you enjoy this blog post? Let's discuss more!
A new version of nxviz is released!
In this update, I have added a declarative interface for visualizing geographically-constrained graphs. Here, nodes in a graph have their placement constrained by longitude and latitude.
An example of how to use it is below:
In the GeoPlot constructor API, the keyword arguments
node_lon specify which node metadata are to be used to place nodes on the x- and y- axes.
By no means do I intend for GeoPlot to replace more sophisticated analysis methods; like
seaborn, the interface is declarative; for me, the intent is to provide a very quick-and-dirty way for an end user to visualize graphs with spatially constrained nodes.
Did you enjoy this blog post? Let's discuss more!