From 67a9e4d8173c95698ca94227495a44fb490e3bc6 Mon Sep 17 00:00:00 2001 From: Erik Welch Date: Mon, 4 Apr 2022 14:31:27 -0500 Subject: [PATCH] Add initial attempt at pagerank with hacky tests (#1) * Add initial attempt at pagerank with hacky tests * Install networkx too * Fix failing test; maybe faster/better? * Even better * Split pagerank into two functions: grblas-native and networkx-facing. Also, optimize if adjacency matrix is iso-valued. I would bring this implementation to a benchmarking shootout! * Change how we convert NetworkX dicts to vectors. Also, sparsify vectors. May be a decent idea? Not sure. It probably doesn't matter most of the time, but I guess there's a chance it can make the matrix-vector multiply faster for some inputs. We don't drop 0s from the input matrix, because that would be expensive. * Clean up * Don't be cute; don't use masks, because they're bad for benchmarks. Also, add basic benchmark script. * Update to use latest grblas; also, add verify option to bench script. * Show grid of absolute differences between benchmark results --- .github/workflows/test.yml | 2 +- README.md | 13 +- graphblas_algorithms/__init__.py | 1 + graphblas_algorithms/conftest.py | 1 + graphblas_algorithms/link_analysis.py | 155 ++++++++++++ graphblas_algorithms/tests/__init__.py | 0 graphblas_algorithms/tests/test_pagerank.py | 21 ++ scripts/bench_pagerank.py | 250 ++++++++++++++++++++ setup.py | 4 +- 9 files changed, 440 insertions(+), 7 deletions(-) create mode 100644 graphblas_algorithms/conftest.py create mode 100644 graphblas_algorithms/link_analysis.py create mode 100644 graphblas_algorithms/tests/__init__.py create mode 100644 graphblas_algorithms/tests/test_pagerank.py create mode 100644 scripts/bench_pagerank.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ad80d4c..03675fc 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -30,7 +30,7 @@ jobs: activate-environment: testing - name: Install dependencies run: | - conda install -c conda-forge grblas pytest coverage black flake8 coveralls + conda install -c conda-forge grblas networkx scipy pytest coverage black flake8 coveralls pip install -e . - name: Style checks run: | diff --git a/README.md b/README.md index 4cbfe0e..86ea6e5 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,18 @@ # **GraphBLAS Algorithms** -[![conda-forge](https://img.shields.io/conda/vn/conda-forge/graphblas-algorithms.svg)](https://anaconda.org/conda-forge/graphblas-algorithms) [![pypi](https://img.shields.io/pypi/v/graphblas-algorithms.svg)](https://pypi.python.org/pypi/graphblas-algorithms/) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://github.com/metagraph-dev/graphblas-algorithms/blob/main/LICENSE) [![Tests](https://github.com/metagraph-dev/graphblas-algorithms/workflows/Tests/badge.svg?branch=main)](https://github.com/metagraph-dev/graphblas-algorithms/actions) -[![Docs](https://readthedocs.org/projects/graphblas-algorithms/badge/?version=latest)](https://graphblas-algorithms.readthedocs.io/en/latest/) [![Coverage](https://coveralls.io/repos/metagraph-dev/graphblas-algorithms/badge.svg?branch=main)](https://coveralls.io/r/metagraph-dev/graphblas-algorithms) [![Code style](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) + + -GraphBLAS algorithms written in Python with [`grblas`](https://github.com/metagraph-dev/grblas). +GraphBLAS algorithms written in Python with [`grblas`](https://github.com/metagraph-dev/grblas). We are trying to target the NetworkX API algorithms where possible. -This is a work in progress. Stay tuned! +### Installation +``` +pip install graphblas-algorithms +``` + +This is a work in progress. Stay tuned (or come help 😃)! diff --git a/graphblas_algorithms/__init__.py b/graphblas_algorithms/__init__.py index 4d52a61..3a60de1 100644 --- a/graphblas_algorithms/__init__.py +++ b/graphblas_algorithms/__init__.py @@ -1,3 +1,4 @@ from . import _version +from .link_analysis import pagerank # noqa __version__ = _version.get_versions()["version"] diff --git a/graphblas_algorithms/conftest.py b/graphblas_algorithms/conftest.py new file mode 100644 index 0000000..c598c08 --- /dev/null +++ b/graphblas_algorithms/conftest.py @@ -0,0 +1 @@ +from networkx.conftest import * # noqa diff --git a/graphblas_algorithms/link_analysis.py b/graphblas_algorithms/link_analysis.py new file mode 100644 index 0000000..cb786a9 --- /dev/null +++ b/graphblas_algorithms/link_analysis.py @@ -0,0 +1,155 @@ +from collections import OrderedDict +from warnings import warn + +import grblas as gb +import networkx as nx +from grblas import Vector, binary, unary +from grblas.semiring import plus_first, plus_times + + +def pagerank_core( + A, + alpha=0.85, + personalization=None, + max_iter=100, + tol=1e-06, + nstart=None, + dangling=None, + row_degrees=None, + name="pagerank", +): + N = A.nrows + if A.nvals == 0: + return Vector.new(float, N, name=name) + + # Initial vector + x = Vector.new(float, N, name="x") + if nstart is None: + x[:] = 1.0 / N + else: + denom = nstart.reduce(allow_empty=False).value + if denom == 0: + raise ZeroDivisionError() + x << nstart / denom + + # Personalization vector or scalar + if personalization is None: + p = 1.0 / N + else: + denom = personalization.reduce(allow_empty=False).value + if denom == 0: + raise ZeroDivisionError() + p = (personalization / denom).new(name="p") + + # Inverse of row_degrees + # Fold alpha constant into S + if row_degrees is None: + S = A.reduce_rowwise().new(float, name="S") + S << alpha / S + else: + S = (alpha / row_degrees).new(name="S") + + if A.ss.is_iso: + # Fold iso-value of A into S + # This lets us use the plus_first semiring, which is faster + iso_value = A.ss.iso_value + if iso_value != 1: + S *= iso_value + semiring = plus_first[float] + else: + semiring = plus_times[float] + + is_dangling = S.nvals < N + if is_dangling: + dangling_mask = Vector.new(float, N, name="dangling_mask") + dangling_mask(mask=~S.S) << 1.0 + # Fold alpha constant into dangling_weights (or dangling_mask) + if dangling is not None: + dangling_weights = (alpha / dangling.reduce(allow_empty=False).value * dangling).new( + name="dangling_weights" + ) + elif personalization is None: + # Fast case (and common case); is iso-valued + dangling_mask(mask=dangling_mask.S) << alpha * p + else: + dangling_weights = (alpha * p).new(name="dangling_weights") + + # Fold constant into p + p *= 1 - alpha + + # Power iteration: make up to max_iter iterations + xprev = Vector.new(float, N, name="x_prev") + w = Vector.new(float, N, name="w") + for _ in range(max_iter): + xprev, x = x, xprev + + # x << alpha * ((xprev * S) @ A + "dangling_weights") + (1 - alpha) * p + x << p + if is_dangling: + if dangling is None and personalization is None: + # Fast case: add a scalar; x is still iso-valued (b/c p is also scalar) + x += xprev @ dangling_mask + else: + # Add a vector + x += plus_first(xprev @ dangling_mask) * dangling_weights + w << xprev * S + x += semiring(w @ A) # plus_first if A.ss.is_iso else plus_times + + # Check convergence, l1 norm: err = sum(abs(xprev - x)) + xprev << binary.minus(xprev | x, require_monoid=False) + xprev << unary.abs(xprev) + err = xprev.reduce().value + if err < N * tol: + x.name = name + return x + raise nx.PowerIterationFailedConvergence(max_iter) + + +def pagerank( + G, + alpha=0.85, + personalization=None, + max_iter=100, + tol=1e-06, + nstart=None, + weight="weight", + dangling=None, +): + warn("", DeprecationWarning, stacklevel=2) + N = len(G) + if N == 0: + return {} + node_ids = OrderedDict((k, i) for i, k in enumerate(G)) + A = gb.io.from_networkx(G, nodelist=node_ids, weight=weight, dtype=float) + + x = p = dangling_weights = None + # Initial vector (we'll normalize later) + if nstart is not None: + indices, values = zip(*((node_ids[key], val) for key, val in nstart.items())) + x = Vector.from_values(indices, values, size=N, dtype=float, name="nstart") + # Personalization vector (we'll normalize later) + if personalization is not None: + indices, values = zip(*((node_ids[key], val) for key, val in personalization.items())) + p = Vector.from_values(indices, values, size=N, dtype=float, name="personalization") + # Dangling nodes (we'll normalize later) + row_degrees = A.reduce_rowwise().new(name="row_degrees") + if dangling is not None: + if row_degrees.nvals < N: # is_dangling + indices, values = zip(*((node_ids[key], val) for key, val in dangling.items())) + dangling_weights = Vector.from_values( + indices, values, size=N, dtype=float, name="dangling" + ) + result = pagerank_core( + A, + alpha=alpha, + personalization=p, + max_iter=max_iter, + tol=tol, + nstart=x, + dangling=dangling_weights, + row_degrees=row_degrees, + ) + if result.nvals != N: + # Not likely, but fill with 0 just in case + result(mask=~result.S) << 0 + return dict(zip(node_ids, result.to_values()[1])) diff --git a/graphblas_algorithms/tests/__init__.py b/graphblas_algorithms/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/graphblas_algorithms/tests/test_pagerank.py b/graphblas_algorithms/tests/test_pagerank.py new file mode 100644 index 0000000..65dd8db --- /dev/null +++ b/graphblas_algorithms/tests/test_pagerank.py @@ -0,0 +1,21 @@ +import inspect + +import networkx as nx + +from graphblas_algorithms import pagerank + +nx_pagerank = nx.pagerank +nx_pagerank_scipy = nx.pagerank_scipy + +nx.pagerank = pagerank +nx.pagerank_scipy = pagerank +nx.algorithms.link_analysis.pagerank_alg.pagerank_scipy = pagerank + + +def test_signatures(): + nx_sig = inspect.signature(nx_pagerank) + sig = inspect.signature(pagerank) + assert nx_sig == sig + + +from networkx.algorithms.link_analysis.tests.test_pagerank import * # isort:skip diff --git a/scripts/bench_pagerank.py b/scripts/bench_pagerank.py new file mode 100644 index 0000000..fb054f9 --- /dev/null +++ b/scripts/bench_pagerank.py @@ -0,0 +1,250 @@ +import click +import networkx as nx + + +def best_units(num): + """Returns scale factor and prefix such that 1 <= num*scale < 1000""" + if num < 1e-12: + return 1e15, "f" + if num < 1e-9: + return 1e12, "p" + if num < 1e-6: + return 1e9, "n" + if num < 1e-3: + return 1e6, "u" + if num < 1: + return 1e3, "m" + if num < 1e3: + return 1.0, "" + if num < 1e6: + return 1e-3, "k" + if num < 1e9: + return 1e-6, "M" + if num < 1e12: + return 1e-9, "G" + return 1e-12, "T" + + +def stime(time): + scale, units = best_units(time) + return f"{time * scale:4.3g} {units}s" + + +# Copied and modified from networkx +def pagerank_scipy( + A, + alpha=0.85, + personalization=None, + max_iter=100, + tol=1.0e-6, + nstart=None, + weight="weight", + dangling=None, +): + import numpy as np + import scipy as sp + import scipy.sparse # call as sp.sparse + + N = A.shape[0] + if A.nnz == 0: + return {} + + # nodelist = list(G) + S = A.sum(axis=1) + S[S != 0] = 1.0 / S[S != 0] + # TODO: csr_array + Q = sp.sparse.csr_array(sp.sparse.spdiags(S.T, 0, *A.shape)) + A = Q @ A + + # initial vector + if nstart is None: + x = np.repeat(1.0 / N, N) + else: + raise NotImplementedError() + # Personalization vector + if personalization is None: + p = np.repeat(1.0 / N, N) + else: + raise NotImplementedError() + # Dangling nodes + if dangling is None: + dangling_weights = p + else: + raise NotImplementedError() + is_dangling = np.where(S == 0)[0] + + # power iteration: make up to max_iter iterations + for _ in range(max_iter): + xlast = x + x = alpha * (x @ A + sum(x[is_dangling]) * dangling_weights) + (1 - alpha) * p + # check convergence, l1 norm + err = np.absolute(x - xlast).sum() + if err < N * tol: + return x + # return dict(zip(nodelist, map(float, x))) + raise nx.PowerIterationFailedConvergence(max_iter) + + +@click.command() +@click.argument("filename") +@click.option( + "-b", + "--backend", + default="graphblas", + type=click.Choice(["graphblas", "networkx", "scipy", "gb", "nx", "sp", "gbnx"]), +) +@click.option( + "-t", + "--time", + default=3, + type=click.FloatRange(min=0, min_open=True), +) +@click.option( + "-n", + default=None, + type=click.IntRange(min=1), +) +@click.option( + "--verify", + is_flag=True, +) +@click.option( + "--alpha", + default=0.85, + type=click.FloatRange(min=0, max=1), +) +@click.option( + "--tol", + default=1e-06, + type=click.FloatRange(min=0, min_open=True), +) +def main(filename, backend, time, n, verify, alpha, tol, _get_result=False): + import statistics + import timeit + import warnings + + import numpy as np + + warnings.simplefilter("ignore") + if verify: + gb_result = main.callback(filename, "gb", None, None, False, alpha, tol, _get_result=True) + sp_result = main.callback(filename, "sp", None, None, False, alpha, tol, _get_result=True) + rtol = tol / gb_result.size + atol = 1e-16 + np.testing.assert_allclose(gb_result, sp_result, rtol=rtol, atol=atol) + print(" |- graphblas and scipy.sparse match") + nx_result = main.callback(filename, "nx", None, None, False, alpha, tol, _get_result=True) + np.testing.assert_allclose(gb_result, nx_result, rtol=rtol, atol=atol) + print(" |- graphblas and networkx match") + np.testing.assert_allclose(sp_result, nx_result, rtol=rtol, atol=atol) + print(" |- scipy.sparse and networkx match") + gbnx_result = main.callback( + filename, "gbnx", None, None, False, alpha, tol, _get_result=True + ) + np.testing.assert_allclose(gbnx_result, gb_result, rtol=rtol, atol=atol) + np.testing.assert_allclose(gbnx_result, sp_result, rtol=rtol, atol=atol) + np.testing.assert_allclose(gbnx_result, nx_result, rtol=rtol, atol=atol) + print("All good!") + # Show a grid of total absolute differences between results + results = { + "gb": gb_result, + "sp": sp_result, + "nx": nx_result, + "gbnx": gbnx_result, + } + print(" ", end="") + for k1 in results: + print("%9s" % k1, end="") + print() + for k1, v1 in results.items(): + print("%5s" % k1, end="") + for k2, v2 in results.items(): + print("%9.2g" % np.abs(v1 - v2).sum(), end="") + print() + return + + backend = { + "gb": "graphblas", + "nx": "networkx", + "sp": "scipy", + }.get(backend, backend) + print(f"Filename: {filename} ; backend: {backend}") + + if backend == "graphblas": + import pandas as pd + from grblas import Matrix + + from graphblas_algorithms.link_analysis import pagerank_core as pagerank + + start = timeit.default_timer() + df = pd.read_csv(filename, delimiter="\t", names=["row", "col"]) + G = Matrix.from_values(df["row"].values, df["col"].values, 1) + stop = timeit.default_timer() + num_nodes = G.nrows + num_edges = G.nvals + if _get_result: + result = pagerank(G, alpha=alpha, tol=tol) + result(~result.S) << 0 # Densify just in case + return result.to_values()[1] + + elif backend == "scipy": + import pandas as pd + import scipy.sparse + + start = timeit.default_timer() + df = pd.read_csv(filename, delimiter="\t", names=["row", "col"]) + G = scipy.sparse.csr_array((np.repeat(1.0, len(df)), (df["row"].values, df["col"].values))) + pagerank = pagerank_scipy + stop = timeit.default_timer() + num_nodes = G.shape[0] + num_edges = G.nnz + if _get_result: + return pagerank(G, alpha=alpha, tol=tol) + else: + if backend == "networkx": + from networkx import pagerank + else: + from graphblas_algorithms.link_analysis import pagerank + + start = timeit.default_timer() + G = nx.read_edgelist(filename, delimiter="\t", nodetype=int, create_using=nx.DiGraph) + N = max(G) + for i in range(N): + if i not in G: + G.add_node(i) + stop = timeit.default_timer() + num_nodes = len(G.nodes) + num_edges = len(G.edges) + + if _get_result: + result = pagerank(G, alpha=alpha, tol=tol) + return np.array([result.get(key, 0) for key in range(N + 1)]) + + print("Num nodes:", num_nodes) + print("Num edges:", num_edges) + print("Load time:", stime(stop - start)) + timer = timeit.Timer( + "pagerank(G, alpha=alpha, tol=tol)", + globals=dict(pagerank=pagerank, G=G, alpha=alpha, tol=tol), + ) + first_time = timer.timeit(1) + if time == 0: + n = 1 + elif n is None: + n = 2 ** max(0, int(np.ceil(np.log2(time / first_time)))) + print("Number of runs:", n) + print("first: ", stime(first_time)) + if n > 1: + results = timer.repeat(n - 1, 1) + results.append(first_time) + print("median:", stime(statistics.median(results))) + print("mean: ", stime(statistics.mean(results))) + # print("hmean: ", stime(statistics.harmonic_mean(results))) + # print("gmean: ", stime(statistics.geometric_mean(results))) + print("stdev: ", stime(statistics.stdev(results))) + print("min: ", stime(min(results))) + print("max: ", stime(max(results))) + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index 5719843..32d632a 100644 --- a/setup.py +++ b/setup.py @@ -17,8 +17,8 @@ url="https://github.com/metagraph-dev/graphblas-algorithms", packages=find_packages(), python_requires=">=3.8", - install_requires=["grblas"], - tests_require=["pytest"], + install_requires=["grblas >=2022.4.0", "networkx"], + tests_require=["pytest", "scipy"], include_package_data=True, license="Apache License 2.0", keywords=[