Skip to content

Commit

Permalink
Adapt DistROOT.py to work in SWAN
Browse files Browse the repository at this point in the history
  • Loading branch information
etejedor committed Sep 22, 2017
1 parent 2b9cd3c commit c1f9162
Showing 1 changed file with 18 additions and 18 deletions.
36 changes: 18 additions & 18 deletions DistROOT.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@
######################################################################

def InitSpark():
import os
return SparkContext.getOrCreate()

conf = SparkConf()
conf.setAppName("ROOT")
#conf.set('spark.executor.extraLibraryPath', os.environ['LD_LIBRARY_PATH'])
#conf.set('spark.executorEnv.KRB5CCNAME', os.environ['KRB5CCNAME'])
#conf.set('spark.yarn.dist.files', os.environ['KRB5CCNAME'] + '#krbcache')
#conf.set('spark.executor.memory', '1g')
#conf.set('spark.driver.memory', '2g')
#conf.set('spark.driver.maxResultSize', '1g')

#conf.set('spark.driver.port', '9000')
#conf.set('spark.fileserver.port', '9001')
#conf.set('spark.blockManager.port', '9002')
######################################################################
# Function to initialize user credentials on the Spark executors. #
######################################################################

def InitUserCredentials(sc):
def configMap(_):
import os
return os.system("ln -sf " + SparkFiles.get('krbcache') + " " + os.environ["KRB5CCNAME"])

sc = SparkContext(conf = conf)
return sc
results = sc.parallelize(range(sc.defaultParallelism)).map(configMap).collect()
for r in results:
if r != 0:
print "Error initializing user credentials on Spark cluster"
exit(1)


######################################################################
Expand Down Expand Up @@ -81,7 +81,10 @@ def __init__(self, filelist, treename, npartitions):

# Initialize Spark context
sc = InitSpark()


# Initialize user credentials for EOS access
InitUserCredentials(sc)

# Parallelize the ranges with Spark
self.ranges = sc.parallelize(ranges, npartitions)

Expand Down Expand Up @@ -112,9 +115,6 @@ def ProcessAndMerge(self, fMap, fReduce):
""".format(reduceHash = GetFunctionHash(GetFunctionCode(reducerName))))

def mapWrapper(rg):
#import os
#os.system("ln -sf " + SparkFiles.get('krbcache') + " " + os.environ["KRB5CCNAME"].split(":")[1])

import ROOT
ROOT.TH1.AddDirectory(False)
reader = ROOT.TTreeReader(rg.chain)
Expand Down

0 comments on commit c1f9162

Please sign in to comment.