Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get also mapreduce history service DelegationToken #197

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,13 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${hadoopVersion}</version>
<scope>provided</scope>
</dependency>

<!--Test Dependencies-->
<dependency>
<groupId>junit</groupId>
Expand Down
69 changes: 66 additions & 3 deletions java/src/main/java/com/anaconda/skein/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslProvider;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -22,8 +24,15 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Master;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
Expand All @@ -48,6 +57,9 @@
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level;
Expand All @@ -62,6 +74,7 @@
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -364,15 +377,17 @@ public ApplicationId submitApplication(final Model.ApplicationSpec spec)
} else {
return UserGroupInformation.createProxyUser(spec.getUser(), ugi).doAs(
new PrivilegedExceptionAction<ApplicationId>() {
public ApplicationId run() throws IOException, YarnException {
public ApplicationId run() throws IOException, YarnException,
InterruptedException {
return submitApplicationInner(getYarnClient(), getFs(), spec);
}
});
}
}

private ApplicationId submitApplicationInner(YarnClient yarnClient,
FileSystem fs, Model.ApplicationSpec spec) throws IOException, YarnException {
FileSystem fs, Model.ApplicationSpec spec) throws IOException, YarnException,
InterruptedException {
// First validate the spec request
spec.validate();

Expand Down Expand Up @@ -452,8 +467,53 @@ private ApplicationId submitApplicationInner(YarnClient yarnClient,
return appId;
}

private MRClientProtocol instantiateHistoryProxy()
throws IOException {
final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
if (StringUtils.isEmpty(serviceAddr)) {
return null;
}
LOG.debug("Connecting to HistoryServer at: " + serviceAddr);
final YarnRPC rpc = YarnRPC.create(conf);
LOG.debug("Connected to HistoryServer at: " + serviceAddr);
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
NetUtils.createSocketAddr(serviceAddr), conf);
}
});
}

private Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy)
throws IOException, InterruptedException {
final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
GetDelegationTokenRequest request;
request = recordFactory.newRecordInstance(GetDelegationTokenRequest.class);
request.setRenewer(Master.getMasterPrincipal(conf));
org.apache.hadoop.yarn.api.records.Token mrDelegationToken;
mrDelegationToken = hsProxy.getDelegationToken(request).getDelegationToken();
return ConverterUtils.convertFromYarn(mrDelegationToken, hsProxy.getConnectAddress());
}


private void addMapReduceHistoryToken(Credentials credentials)
throws IOException, InterruptedException {
// RPC to history server
MRClientProtocol hsProxy = instantiateHistoryProxy();
if (UserGroupInformation.isSecurityEnabled() && (hsProxy != null)) {
Text hsService = SecurityUtil.buildTokenService(hsProxy.getConnectAddress());
if (credentials.getToken(hsService) == null) {
credentials.addToken(hsService, getDelegationTokenFromHS(hsProxy));
// }
}
}
}

private ByteBuffer collectTokens(YarnClient yarnClient, FileSystem fs,
Model.ApplicationSpec spec) throws IOException, YarnException {
Model.ApplicationSpec spec) throws IOException, YarnException,
InterruptedException {
// Collect security tokens as needed
LOG.debug("Collecting filesystem delegation tokens");
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
Expand Down Expand Up @@ -483,6 +543,9 @@ private ByteBuffer collectTokens(YarnClient yarnClient, FileSystem fs,
);
credentials.addToken(rmDelegationTokenService, rmToken);
}
if (spec.getAcquireMapReduceDelegationToken()) {
addMapReduceHistoryToken(credentials);
}

DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
Expand Down
11 changes: 10 additions & 1 deletion java/src/main/java/com/anaconda/skein/Model.java
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,14 @@ public static class ApplicationSpec {
private Acls acls;
private Master master;
private Map<String, Service> services;
private boolean acquireMapReduceDelegationToken;

public ApplicationSpec() {}

public ApplicationSpec(String name, String queue, String user,
String nodeLabel, int maxAttempts, Set<String> tags,
List<Path> fileSystems, Acls acls, Master master,
Map<String, Service> services) {
Map<String, Service> services, boolean acquireMapReduceDelegationToken) {
this.name = name;
this.queue = queue;
this.user = user;
Expand All @@ -320,6 +321,7 @@ public ApplicationSpec(String name, String queue, String user,
this.acls = acls;
this.master = master;
this.services = services;
this.acquireMapReduceDelegationToken = acquireMapReduceDelegationToken;
}

public String toString() {
Expand Down Expand Up @@ -365,6 +367,13 @@ public void setFileSystems(List<Path> fileSystems) {
public void setServices(Map<String, Service> services) { this.services = services; }
public Map<String, Service> getServices() { return services; }

public void setAcquireMapReduceDelegationToken(boolean acquireMapReduceDelegationToken) {
this.acquireMapReduceDelegationToken = acquireMapReduceDelegationToken;
}
public boolean getAcquireMapReduceDelegationToken() {
return acquireMapReduceDelegationToken;
}

public void validate() throws IllegalArgumentException {
throwIfNull(name, "name");
throwIfNull(queue, "queue");
Expand Down
5 changes: 3 additions & 2 deletions java/src/main/java/com/anaconda/skein/MsgUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,8 @@ public static Msg.ApplicationSpec writeApplicationSpec(Model.ApplicationSpec spe
.addAllTags(spec.getTags())
.setAcls(writeAcls(spec.getAcls()))
.setMaster(writeMaster(spec.getMaster()))
.addAllFileSystems(Lists.transform(spec.getFileSystems(), Functions.toStringFunction()));
.addAllFileSystems(Lists.transform(spec.getFileSystems(), Functions.toStringFunction()))
.setAcquireMapReduceDelegationToken(spec.getAcquireMapReduceDelegationToken());

for (Map.Entry<String, Model.Service> entry : spec.getServices().entrySet()) {
builder.putServices(entry.getKey(), writeService(entry.getValue()));
Expand Down Expand Up @@ -618,7 +619,7 @@ public static Model.ApplicationSpec readApplicationSpec(Msg.ApplicationSpec spec
fileSystems,
readAcls(spec.getAcls()),
readMaster(spec.getMaster()),
services);
services, spec.getAcquireMapReduceDelegationToken());
}

public static Msg.Container writeContainer(Model.Container container) {
Expand Down
1 change: 1 addition & 0 deletions java/src/main/proto/skein.proto
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ message ApplicationSpec {
Acls acls = 8;
Master master = 9;
map<string, Service> services = 10;
bool acquire_map_reduce_delegation_token = 11;
}


Expand Down
16 changes: 13 additions & 3 deletions skein/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1188,14 +1188,20 @@ class ApplicationSpec(Specification):
The maximum number of submission attempts before marking the
application as failed. Note that this only considers failures of the
application master during startup. Default is 1.
acquire_map_reduce_delegation_token: bool, optional
Ask delegation token to connect to the map reduce history server.
Mandatory if a map reduce job is launched from skein.
Default is False
"""
__slots__ = ('services', 'master', 'name', 'queue', 'user', 'node_label',
'tags', 'file_systems', 'acls', 'max_attempts')
'tags', 'file_systems', 'acls', 'max_attempts',
'acquire_map_reduce_delegation_token')
_protobuf_cls = _proto.ApplicationSpec

def __init__(self, services=None, master=None, name='skein',
queue='default', user='', node_label='', tags=None,
file_systems=None, acls=None, max_attempts=1):
file_systems=None, acls=None, max_attempts=1,
acquire_map_reduce_delegation_token=False):
self.services = {} if services is None else services
self.master = Master() if master is None else master
self.name = name
Expand All @@ -1206,6 +1212,7 @@ def __init__(self, services=None, master=None, name='skein',
self.file_systems = [] if file_systems is None else file_systems
self.acls = ACLs() if acls is None else acls
self.max_attempts = max_attempts
self.acquire_map_reduce_delegation_token = acquire_map_reduce_delegation_token
self._validate()

def __repr__(self):
Expand All @@ -1225,6 +1232,7 @@ def _validate(self):
self._check_is_type('master', Master)
self.master._validate()
self._check_is_dict_of('services', str, Service)
self._check_is_type('acquire_map_reduce_delegation_token', bool)

if not self.services and not self.master.script:
raise context.ValueError("There must be at least one service")
Expand Down Expand Up @@ -1282,6 +1290,7 @@ def from_dict(cls, obj, **kwargs):
def from_protobuf(cls, obj):
services = {k: Service.from_protobuf(v)
for k, v in obj.services.items()}
ask_map_reduce_history_token = obj.acquire_map_reduce_delegation_token
return cls(name=obj.name,
queue=obj.queue,
user=obj.user,
Expand All @@ -1291,7 +1300,8 @@ def from_protobuf(cls, obj):
max_attempts=min(1, obj.max_attempts),
acls=ACLs.from_protobuf(obj.acls),
master=Master.from_protobuf(obj.master),
services=services)
services=services,
acquire_map_reduce_delegation_token=ask_map_reduce_history_token)

@classmethod
def from_file(cls, path, format='infer'):
Expand Down