From 298fd8734ee880959bb810b5701b93e89726424f Mon Sep 17 00:00:00 2001 From: stvoutsin Date: Thu, 30 Apr 2020 20:32:27 +0300 Subject: [PATCH 1/6] Initial list of py packages --- notes/stv/20200429-py-libs.txt | 75 ++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 notes/stv/20200429-py-libs.txt diff --git a/notes/stv/20200429-py-libs.txt b/notes/stv/20200429-py-libs.txt new file mode 100644 index 00000000..2a83c823 --- /dev/null +++ b/notes/stv/20200429-py-libs.txt @@ -0,0 +1,75 @@ +# +# +# +# Copyright (c) 2020, ROE (http://www.roe.ac.uk/) +# +# This information is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This information is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# +# +# + + +# Python libraries and Packages required for our Spark Cluster +# ------------------------------------------------------------ + +Name: + Scikit-learn +Version: + 0.22.2.post1 +Install: + sudo yum install python-scikit-learn +Link: + https://scikit-learn.org/stable/ + + + +Name: + PyDevel +Version: + Latest +Install: + sudo yum install -y python3-devel +Link: + https://pkgs.org/download/python3-devel + + +Name: + Pandas +Version: + 1.0.3 +Link: + https://pandas.pydata.org/ +Install: + sudo /usr/bin/python2.7 -m pip install pandas + + +Name: + PyArrow +Version: + 0.17.0 +Install: + sudo /usr/bin/python2.7 -m pip install pyarrow==0.16.* +Link: + https://pypi.org/project/pyarrow/ + + +Name: + Hdbscan +Version: + 0.8.26 +Install: + sudo /usr/bin/python2.7 -m pip install hdbscan +Link: + https://hdbscan.readthedocs.io/en/latest/ From 3fca12fe49d732b1bd674757b78fe9664704c4a4 Mon Sep 17 00:00:00 2001 From: stvoutsin Date: Thu, 30 Apr 2020 20:36:13 +0300 Subject: [PATCH 2/6] Added a requirements.txt (pip freeze) --- .../gdaf-openstack/config/requirements.txt | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 experiments/stv/gdaf-openstack/config/requirements.txt diff --git a/experiments/stv/gdaf-openstack/config/requirements.txt b/experiments/stv/gdaf-openstack/config/requirements.txt new file mode 100644 index 00000000..fc0fa385 --- /dev/null +++ b/experiments/stv/gdaf-openstack/config/requirements.txt @@ -0,0 +1,59 @@ +asn1crypto==0.24.0 +attrs==18.2.0 +Babel==2.6.0 +boto==2.45.0 +cffi==1.11.5 +chardet==3.0.4 +cloud-init==17.1 +conda==4.6.14 +configobj==5.0.6 +cryptography==2.6.1 +Cython==0.29.16 +cytoolz==0.9.0.1 +dbus-python==1.2.8 +distro==1.4.0 +frozendict==1.2 +gpg==1.12.0 +hdbscan==0.8.26 +heat-cfntools==1.4.2 +idna==2.7 +Jinja2==2.10 +joblib==0.14.1 +jsonpatch==1.21 +jsonpointer==1.10 +jsonschema==3.0.1 +MarkupSafe==1.1.1 +numpy==1.18.2 +oauthlib==2.1.0 +pandas==1.0.3 +pbr==5.1.2 +ply==3.11 +prettytable==0.7.2 +psutil==5.4.3 +py-cpuinfo==4.0.0 +pyarrow==0.17.0 +pyasn1==0.4.4 +pycosat==0.6.3 +pycparser==2.14 +pycrypto==2.6.1 +PyJWT==1.7.1 +pyrsistent==0.14.11 +pyserial==3.4 +PySocks==1.6.8 +python-dateutil==2.8.0 +pytz==2018.5 +PyYAML==5.1 +requests==2.21.0 +rpm==4.14.2.1 +rsa==3.4.2 +ruamel.yaml==0.16.5 +ruamel.yaml.clib==0.1.2 +scikit-learn==0.22.2.post1 +scipy==1.4.1 +sepolicy==1.1 +setools==4.1.1 +six==1.12.0 +toolz==0.9.0 +tqdm==4.37.0 +urllib3==1.24.1 + From a7677427f53ac97a7b05e4833b60f1c52136fc67 Mon Sep 17 00:00:00 2001 From: stvoutsin Date: Thu, 30 Apr 2020 20:51:07 +0300 Subject: [PATCH 3/6] Initial Notes file, with some benchmark tests --- notes/stv/20200430-benchmarks.txt | 145 ++++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 notes/stv/20200430-benchmarks.txt diff --git a/notes/stv/20200430-benchmarks.txt b/notes/stv/20200430-benchmarks.txt new file mode 100644 index 00000000..c9bc59c1 --- /dev/null +++ b/notes/stv/20200430-benchmarks.txt @@ -0,0 +1,145 @@ +# +# +# +# Copyright (c) 2020, ROE (http://www.roe.ac.uk/) +# +# This information is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This information is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# +# +# + + +# ------------------------------------------------# +# Benchmarking a Spark Cluster # +# ------------------------------------------------# + +TODO: + + a) Turn these into either pyUnit tests or something equivalent + b) Create configuration template for the setup + + +Setup: + +7 Nodes + 22GB RAM / node + 6 VCores / node + + +Spark Configuration: + + num-executors 6 + spark.driver.memory 13g + spark.yarn.am.memory 13g + spark.executor.memory 13g + spark.driver.maxResultSize 4096m + +Hadoop Configuration: + + mapred-site.xml + yarn.app.mapreduce.am.resource.mb 15000 + mapreduce.map.memory.mb 7000 + mapreduce.reduce.memory.mb 7000 + + yarn-site.xml + yarn.nodemanager.resource.memory-mb 15000 + yarn.scheduler.maximum-allocation-mb 15000 + yarn.scheduler.minimum-allocation-mb 2000 + + + +Test 1 +------ +%spark.pyspark + +# define the data frame source on the given column selection/predicates: +df = sqlContext.read.parquet( + "/hadoop/gaia/parquet/gdr2/gaia_source/*.parquet" + ).select( + ["designation","source_id","ra","ra_error","dec","dec_error","parallax","parallax_error","parallax_over_error","pmra","pmra_error","pmdec","pmdec_error","l","b"] + ).where( + "abs(b) < 30.0 AND parallax > 1.0 and parallax_over_error > 10.0 AND phot_g_mean_flux_over_error > 36.19 AND astrometric_sigma5d_max < 0.3 AND visibility_periods_used > 8 AND (astrometric_excess_noise < 1 OR (astrometric_excess_noise > 1 AND astrometric_excess_noise_sig < 2))" + ) + +# sanity check +df.show() +print ("Data frame rows: ",df.count()) + + +Time: 3 min 23 sec + + + +Test 2 (Depends on Test 1) +--------------------------- +%spark.pyspark + +import pandas as pd +import numpy as np +spark.conf.set("spark.sql.execution.arrow.enabled", "true") + + +pandas_df = df.select("*").limit(1000).toPandas() +print(type(pandas_df)) +pandas_df.head() + + +Time: 6 min 32 sec + + + +Test 3 +-------- + +%spark.pyspark + +import random +NUM_SAMPLES = 1000000000 + +def inside(p): + x, y = random.random(), random.random() + return x*x + y*y < 1 + +count = sc.parallelize(xrange(0, NUM_SAMPLES)) \ + .filter(inside).count() +print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES) + + +Time: 3 min 12 sec + + + +Test 4 +-------- + +%spark.pyspark + +from random import choice +from string import digits, ascii_lowercase + +chars = digits + ascii_lowercase +seq = ["".join([choice(chars) for i in range(3)]) for j in range(50000000)] +data = sc.parallelize(seq) +counts = data.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).top(5) +dict(counts) + + +Time: 1 min 56 sec + + + + + + From 7dfc648dbf85707db552092e21a209d95ad7bf4e Mon Sep 17 00:00:00 2001 From: stvoutsin Date: Thu, 30 Apr 2020 22:22:14 +0300 Subject: [PATCH 4/6] Added some unit tests for PySpark --- .../tests/src/spark-tests/HOWTO.md | 7 +++ .../tests/src/spark-tests/test_pi.py | 44 ++++++++++++++++++ .../src/spark-tests/test_random_word_count.py | 40 ++++++++++++++++ .../tests/src/spark-tests/test_word_count.py | 29 ++++++++++++ .../tests/src/spark-tests/testbase.py | 41 ++++++++++++++++ .../tests/src/spark-tests/testbase.pyc | Bin 0 -> 1631 bytes 6 files changed, 161 insertions(+) create mode 100644 experiments/stv/gdaf-openstack/tests/src/spark-tests/HOWTO.md create mode 100644 experiments/stv/gdaf-openstack/tests/src/spark-tests/test_pi.py create mode 100644 experiments/stv/gdaf-openstack/tests/src/spark-tests/test_random_word_count.py create mode 100644 experiments/stv/gdaf-openstack/tests/src/spark-tests/test_word_count.py create mode 100644 experiments/stv/gdaf-openstack/tests/src/spark-tests/testbase.py create mode 100644 experiments/stv/gdaf-openstack/tests/src/spark-tests/testbase.pyc diff --git a/experiments/stv/gdaf-openstack/tests/src/spark-tests/HOWTO.md b/experiments/stv/gdaf-openstack/tests/src/spark-tests/HOWTO.md new file mode 100644 index 00000000..1d65341f --- /dev/null +++ b/experiments/stv/gdaf-openstack/tests/src/spark-tests/HOWTO.md @@ -0,0 +1,7 @@ + +### Running a PySpark test + +From the test directory, and assuming we are running on a node with PySpark configured: + +`spark-submit test_pi.py` + diff --git a/experiments/stv/gdaf-openstack/tests/src/spark-tests/test_pi.py b/experiments/stv/gdaf-openstack/tests/src/spark-tests/test_pi.py new file mode 100644 index 00000000..6296f8a0 --- /dev/null +++ b/experiments/stv/gdaf-openstack/tests/src/spark-tests/test_pi.py @@ -0,0 +1,44 @@ +import testbase +import unittest +from datetime import datetime +import random + +NUM_SAMPLES = 1000000000 + +def inside(p): + """ + Helper method for piCalculation + """ + x, y = random.random(), random.random() + return x*x + y*y < 1 + + +def piCalculation(sc, sample): + """ + Calculate the value of pi + """ + count = sc.parallelize(xrange(0, sample)) \ + .filter(inside).count() + return 4.0 * count / sample + +class TestRandomWordCount(testbase.PySparkTestBase): + + def test_word_count(self): + """ + Test the word count method + Assert that the time taken doesn't exceed x seconds + + """ + #TODO: make these configurable + _CHANGE_ME_max_seconds = 5 + _CHANGE_ME_sample = 100000 + + tick = datetime.now() + piCalculation(self.sc, _CHANGE_ME_sample) + tock = datetime.now() + diff = tock - tick + self.assertTrue(diff.seconds <= _CHANGE_ME_max_seconds) + +if __name__ == '__main__': + if __name__ == '__main__': + unittest.main() diff --git a/experiments/stv/gdaf-openstack/tests/src/spark-tests/test_random_word_count.py b/experiments/stv/gdaf-openstack/tests/src/spark-tests/test_random_word_count.py new file mode 100644 index 00000000..03dd6fdd --- /dev/null +++ b/experiments/stv/gdaf-openstack/tests/src/spark-tests/test_random_word_count.py @@ -0,0 +1,40 @@ +import testbase +import unittest +from random import choice +from string import digits, ascii_lowercase +from datetime import datetime + + +def randomWordCount(sc, word_size, sample): + """ + Generate a list of [sample] * random strings + + """ + + chars = digits + ascii_lowercase + seq = ["".join([choice(chars) for i in range(word_size)]) for j in range(sample)] + data = sc.parallelize(seq) + counts = data.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).top(5) + dict(counts) + +class TestRandomWordCount(testbase.PySparkTestBase): + + def test_word_count(self): + """ + Test the word count method + Assert that the time taken doesn't exceed x seconds + + """ + #TODO: make this configurable + _CHANGE_ME_max_seconds = 5 + _CHANGE_ME_sample = 10000 + + tick = datetime.now() + randomWordCount(self.sc, 3, _CHANGE_ME_sample) + tock = datetime.now() + diff = tock - tick + self.assertTrue(diff.seconds <= _CHANGE_ME_max_seconds) + +if __name__ == '__main__': + if __name__ == '__main__': + unittest.main() diff --git a/experiments/stv/gdaf-openstack/tests/src/spark-tests/test_word_count.py b/experiments/stv/gdaf-openstack/tests/src/spark-tests/test_word_count.py new file mode 100644 index 00000000..b8ea4733 --- /dev/null +++ b/experiments/stv/gdaf-openstack/tests/src/spark-tests/test_word_count.py @@ -0,0 +1,29 @@ +import testbase +import unittest + +def wordCount(rdd): + """ + Counts the words in an RDD + """ + wcntRdd = rdd.flatMap(lambda line: line.split()).\ + map(lambda word: (word, 1)).\ + reduceByKey(lambda fa, fb: fa + fb) + return wcntRdd + + +class TestWordCount(testbase.ReusedPySparkTestCase): + def test_word_count(self): + """ + Test the word count method + Assert that the word counts is correct + """ + rdd = self.sc.parallelize(["a b c d", "a c d e", "a d e f"]) + res = wordCount(rdd) + res = res.collectAsMap() + expected = {"a":3, "b":1, "c":2, "d":3, "e":2, "f":1} + self.assertEqual(res,expected) + + +if __name__ == '__main__': + if __name__ == '__main__': + unittest.main() diff --git a/experiments/stv/gdaf-openstack/tests/src/spark-tests/testbase.py b/experiments/stv/gdaf-openstack/tests/src/spark-tests/testbase.py new file mode 100644 index 00000000..73a3d7d7 --- /dev/null +++ b/experiments/stv/gdaf-openstack/tests/src/spark-tests/testbase.py @@ -0,0 +1,41 @@ +import os +import sys +import unittest +from datetime import datetime + + +SPARK_HOME = os.environ["SPARK_HOME"] +os.path.join(SPARK_HOME) +sys.path.insert(1, os.path.join(SPARK_HOME, 'python')) +sys.path.insert(1, os.path.join(SPARK_HOME, 'python', 'pyspark')) +sys.path.insert(1, os.path.join(SPARK_HOME, 'python', 'build')) +sys.path.insert(1, os.path.join(SPARK_HOME, 'python', 'lib/py4j-0.8.2.1-src.zip')) +pyspark_python = sys.executable +os.environ['PYSPARK_PYTHON'] = pyspark_python + +from pyspark.conf import SparkConf +from pyspark.context import SparkContext + + +sc_values = {} + +class PySparkTestBase(unittest.TestCase): + """ + Reusable PySpark Test Case Class + Share a Spark Context + + """ + + @classmethod + def setUpClass(cls): + conf = SparkConf().setMaster("local[2]") \ + .setAppName(cls.__name__) \ + .set("spark.authenticate.secret", "test") + cls.sc = SparkContext(conf=conf) + sc_values[cls.__name__] = cls.sc + + @classmethod + def tearDownClass(cls): + sc_values.clear() + cls.sc.stop() + diff --git a/experiments/stv/gdaf-openstack/tests/src/spark-tests/testbase.pyc b/experiments/stv/gdaf-openstack/tests/src/spark-tests/testbase.pyc new file mode 100644 index 0000000000000000000000000000000000000000..a13d422f7fa76fe15039de351bfe42d41969f7be GIT binary patch literal 1631 zcmb7^(QX?>6o$|2#*X9Ii5rQE3IP^UC0|ens$Nv0LbRb$CDb%pmy0Z-tIh62UbEg^ zdB&x#%2jw99*+lr|3B-Na0BgnXL9E3nVB=+`6uC@!~W;vKVD?AeY*Jj9h&>bh~#*U zlp^sv+emEQ`U8mrwF4 z{#Qiam8(-}bWOVY#dly|tcx;tAHyn(i{oZHX>5>DPDMDx1D88qe*{&f7K5 z&YwSdw%v9Q{uviKsjJI2lfgBYx39cElLw;PSQ)A$5ijZezdISrc4)0G{kV@+j@ zjG10vKV@XZWVq}Jnma@1mH7hv4f9w8@&-kONMJ*RP^Sv!HzVK(&AmaG7{nx9jqD0o zw^La3*@U!D9cxwASz7*d_h)yDn~~r&UHiGMe37AyQ!rXt?{}C5j-p0J#+)3nn4avN z^-ojh?MiEo(Zi-WO(6k>B}oM{NxZVMK4RfLPwBFDc9y(M%e8eAIyEdST}1gz(Av&@ z2J3iUFYWQA&FfWq%!Hiw7XUnksyG9i|JW$(PdTDXf+I7)@5m@$GGV%|$oh`ml_LTm-9qU;kQZ(zoGo+)jeN znGkTKF+4|-EbDw-5+xBPNnU5L_hDyL7kJO>JUZf}kI?;-idyeMFzR92O*F@yyPa-u z*qd-YEm!T@5^kK{7G^wAfv8ap6bIqc1P>EK`zOvcc z^SbE0ZZ@jFWc%NN_jl`>QgN-=2>qIuH%H~v7KhpM#__`GQ9hi2p6o%(`CFbtsZl)x VGYW@hWWF%B^|xn+!3e%?@Hd=QM85z4 literal 0 HcmV?d00001 From 49844b30ab86e700f7ad545a215f045d187c7a2f Mon Sep 17 00:00:00 2001 From: stvoutsin Date: Thu, 30 Apr 2020 22:25:46 +0300 Subject: [PATCH 5/6] Remove unused variable NUM_SAMPLES --- experiments/stv/gdaf-openstack/tests/src/spark-tests/test_pi.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/experiments/stv/gdaf-openstack/tests/src/spark-tests/test_pi.py b/experiments/stv/gdaf-openstack/tests/src/spark-tests/test_pi.py index 6296f8a0..2cc245c1 100644 --- a/experiments/stv/gdaf-openstack/tests/src/spark-tests/test_pi.py +++ b/experiments/stv/gdaf-openstack/tests/src/spark-tests/test_pi.py @@ -3,8 +3,6 @@ from datetime import datetime import random -NUM_SAMPLES = 1000000000 - def inside(p): """ Helper method for piCalculation From ff19bd4d3068348b1d91b50d79d3483e62391b64 Mon Sep 17 00:00:00 2001 From: stvoutsin Date: Fri, 1 May 2020 16:19:40 +0300 Subject: [PATCH 6/6] Remove .pyc file --- .../tests/src/spark-tests/testbase.pyc | Bin 1631 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 experiments/stv/gdaf-openstack/tests/src/spark-tests/testbase.pyc diff --git a/experiments/stv/gdaf-openstack/tests/src/spark-tests/testbase.pyc b/experiments/stv/gdaf-openstack/tests/src/spark-tests/testbase.pyc deleted file mode 100644 index a13d422f7fa76fe15039de351bfe42d41969f7be..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1631 zcmb7^(QX?>6o$|2#*X9Ii5rQE3IP^UC0|ens$Nv0LbRb$CDb%pmy0Z-tIh62UbEg^ zdB&x#%2jw99*+lr|3B-Na0BgnXL9E3nVB=+`6uC@!~W;vKVD?AeY*Jj9h&>bh~#*U zlp^sv+emEQ`U8mrwF4 z{#Qiam8(-}bWOVY#dly|tcx;tAHyn(i{oZHX>5>DPDMDx1D88qe*{&f7K5 z&YwSdw%v9Q{uviKsjJI2lfgBYx39cElLw;PSQ)A$5ijZezdISrc4)0G{kV@+j@ zjG10vKV@XZWVq}Jnma@1mH7hv4f9w8@&-kONMJ*RP^Sv!HzVK(&AmaG7{nx9jqD0o zw^La3*@U!D9cxwASz7*d_h)yDn~~r&UHiGMe37AyQ!rXt?{}C5j-p0J#+)3nn4avN z^-ojh?MiEo(Zi-WO(6k>B}oM{NxZVMK4RfLPwBFDc9y(M%e8eAIyEdST}1gz(Av&@ z2J3iUFYWQA&FfWq%!Hiw7XUnksyG9i|JW$(PdTDXf+I7)@5m@$GGV%|$oh`ml_LTm-9qU;kQZ(zoGo+)jeN znGkTKF+4|-EbDw-5+xBPNnU5L_hDyL7kJO>JUZf}kI?;-idyeMFzR92O*F@yyPa-u z*qd-YEm!T@5^kK{7G^wAfv8ap6bIqc1P>EK`zOvcc z^SbE0ZZ@jFWc%NN_jl`>QgN-=2>qIuH%H~v7KhpM#__`GQ9hi2p6o%(`CFbtsZl)x VGYW@hWWF%B^|xn+!3e%?@Hd=QM85z4