In some of my research, I found myself comparing graphs with different metrics under changing environments. One of these sets of graphs was simply of size 831
num_graphs = len(df_both["graph_theme"].unique())
f"{num_graphs} graphs makes {int(((num_graphs**2)-num_graphs)/2)} pairs"
‘863 graphs makes 371953 pairs’
for which I wanted to compute similarity matrices with the graph edit distance (GED). The graph edit distance is quite expensive to obtain, so I usually selected only graphs I really was interested in and then computed pairwise graph edit distances. Based on properties associated to the graphs, the arrangement of the similarity matrices could change. As I was computing more things on top of that and I needed to share my visualizations with matplotlib/seaborn/plotly both interactively and in good resolution for presentations and papers, I started a mini tool pynbcache that takes a hash of a function and its parameters and caches its result. That way, I could pre-compute results and for presentations of my jupyter notebooks I could work on a local cache or even share the cache through a MinIO cloud instance.
However, at some point I realized that it would be good to have all pairs of graph edit distances available (and needed that caching only for other more dynamic stuff). Earlier, I was already computing the GEDs in parallel, but it always bothered me that I could not really re-use the results or observe when the computations finished. So I wanted to share my solution on using the awesome joblib library in combination of a tqdm progressbar contextmanager with which the progress of e.g. 3720 chunks of graph pairs could be observed while it was computing the distances for about one and a half hours.
The graphs are associated with unique identifiers (as strings) but I found it to make a significant (huge!) difference in RAM usage when using indices. While the UUIDs are good for later identification (instead of indices), for the parallel GED computation I used a mapping idx -> UUID:
map_graphs = {idx: gt for idx, gt in enumerate(df_both["graph_theme"].unique())}
In a next step, I broke pairs of graphs into chunks of e.g. size $C=100$ such that not everything is just parallelized (takes some overhead).
break_into_chunks = lambda ll, n: [ll[i * n:(i + 1) * n] for i in range((len(ll) + n - 1) // n )]
size_chunksofpairs = 100
pairs_graph_ids = list(itertools.combinations(range(num_graphs), r=2))
len(pairs_graphs) # 371953
chunksof_pairs_graph_ids = break_into_chunks(pairs_graphs, size_chunksofpairs)
chunksof_pairs_graph_ids[0][:4]
[(0, 1), (0, 2), (0, 3), (0, 4)]
Now, my function to parallelize is simply the following (to compute the GED):
def obtain_chunk_geds(chunk_ids, map_graphs, path_save:str = None, ctnasapi=api):
geds = []
for ix1, ix2 in chunk_ids:
gid1 = map_graphs[ix1]
gid2 = map_graphs[ix2]
g1 = ctnasapi.get_graph(gid1) # retrieves a graph by UUID
g2 = ctnasapi.get_graph(gid2)
ged = nx.graph_edit_distance(g1, g2)
geds.append([gid1, gid2, ged])
if path_save is not None:
df = pd.DataFrame(geds, columns=["graph_theme_1", "graph_theme_2", "GED"])
df.to_csv(path_save, sep='\t', encoding='utf-8')
return geds
The function needs access to the UUID by the used index (through map_graphs) and optionally stores results in a provided file (in csv format).
obtain_chunk_geds(chunksof_pairs_graph_ids[0][:4], map_graphs)
[[‘5c280f8d-60af-49ab-94fb-a2aa99e80dab’, ‘bd558561-3092-4d01-b5ab-1ba02ff0fc6f’, 4.0], [‘5c280f8d-60af-49ab-94fb-a2aa99e80dab’, ‘f6e954f7-1b0c-4824-bef3-2aaaba5b9b40’, 6.0], [‘5c280f8d-60af-49ab-94fb-a2aa99e80dab’, ‘1f3ede6d-da24-4709-b581-9920a40b1af6’, 5.0], [‘5c280f8d-60af-49ab-94fb-a2aa99e80dab’, ‘026445b9-e2a9-4053-a78e-b12eb5d1d687’, 4.0]]
The parallelization call with joblib now amounts to:
result = Parallel(n_jobs=n_jobs)(
delayed(obtain_chunk_geds)(
chunk_pairs,
map_graphs,
os.path.join(path_chunks, f"ged-graph-themes-chunk{i}.csv")
) for i, chunk_pairs in enumerate(chunksof_pairs_graph_ids)
)
Through the chunking, we call the function only 3720 times in parallel. And for that, I wanted to see the progress bar. There’s an elegant solution by AlanSTACK on stackoverflow:
import contextlib
import joblib
from tqdm import tqdm
@contextlib.contextmanager
def tqdm_joblib(tqdm_object):
"""Context manager to patch joblib to report into tqdm progress bar given as argument"""
class TqdmBatchCompletionCallback(joblib.parallel.BatchCompletionCallBack):
def __call__(self, *args, **kwargs):
tqdm_object.update(n=self.batch_size)
return super().__call__(*args, **kwargs)
old_batch_callback = joblib.parallel.BatchCompletionCallBack
joblib.parallel.BatchCompletionCallBack = TqdmBatchCompletionCallback
try:
yield tqdm_object
finally:
joblib.parallel.BatchCompletionCallBack = old_batch_callback
tqdm_object.close()
which then did not require any (major) change to the invocation. Progress bar for free!
with tqdm_joblib(tqdm(desc="Calculating GEDs", total=len(chunksof_pairs_graph_ids))) as progress_bar:
result = Parallel(n_jobs=n_jobs)(
delayed(obtain_chunk_geds)(
chunk_pairs,
map_graphs,
os.path.join(path_chunks, f"ged-graph-themes-chunk{i}.csv")
) for i, chunk_pairs in enumerate(chunksof_pairs_graph_ids)
)