diff --git a/DistROOT.py b/DistROOT.py index 1cda76e..caadc29 100644 --- a/DistROOT.py +++ b/DistROOT.py @@ -10,23 +10,23 @@ ###################################################################### def InitSpark(): - import os + return SparkContext.getOrCreate() - conf = SparkConf() - conf.setAppName("ROOT") - #conf.set('spark.executor.extraLibraryPath', os.environ['LD_LIBRARY_PATH']) - #conf.set('spark.executorEnv.KRB5CCNAME', os.environ['KRB5CCNAME']) - #conf.set('spark.yarn.dist.files', os.environ['KRB5CCNAME'] + '#krbcache') - #conf.set('spark.executor.memory', '1g') - #conf.set('spark.driver.memory', '2g') - #conf.set('spark.driver.maxResultSize', '1g') - #conf.set('spark.driver.port', '9000') - #conf.set('spark.fileserver.port', '9001') - #conf.set('spark.blockManager.port', '9002') +###################################################################### +# Function to initialize user credentials on the Spark executors. # +###################################################################### + +def InitUserCredentials(sc): + def configMap(_): + import os + return os.system("ln -sf " + SparkFiles.get('krbcache') + " " + os.environ["KRB5CCNAME"]) - sc = SparkContext(conf = conf) - return sc + results = sc.parallelize(range(sc.defaultParallelism)).map(configMap).collect() + for r in results: + if r != 0: + print "Error initializing user credentials on Spark cluster" + exit(1) ###################################################################### @@ -81,7 +81,10 @@ def __init__(self, filelist, treename, npartitions): # Initialize Spark context sc = InitSpark() - + + # Initialize user credentials for EOS access + InitUserCredentials(sc) + # Parallelize the ranges with Spark self.ranges = sc.parallelize(ranges, npartitions) @@ -112,9 +115,6 @@ def ProcessAndMerge(self, fMap, fReduce): """.format(reduceHash = GetFunctionHash(GetFunctionCode(reducerName)))) def mapWrapper(rg): - #import os - #os.system("ln -sf " + SparkFiles.get('krbcache') + " " + os.environ["KRB5CCNAME"].split(":")[1]) - import ROOT ROOT.TH1.AddDirectory(False) reader = ROOT.TTreeReader(rg.chain)