Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Secret service fix #30

Merged
merged 5 commits into from
Feb 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.airavata.mft.admin.models;

import java.util.List;

public class TransferCommand {

private String transferId;
Expand All @@ -34,6 +32,7 @@ public class TransferCommand {
private String destinationToken;
private String destResourceBackend;
private String destCredentialBackend;
private String mftAuthorizationToken;

public String getTransferId() {
return transferId;
Expand Down Expand Up @@ -151,4 +150,12 @@ public TransferCommand setDestCredentialBackend(String destCredentialBackend) {
this.destCredentialBackend = destCredentialBackend;
return this;
}

public String getMftAuthorizationToken() {
return mftAuthorizationToken;
}

public void setMftAuthorizationToken(String mftAuthorizationToken) {
this.mftAuthorizationToken = mftAuthorizationToken;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class TransferRequest {
private String destinationToken;
private String destResourceBackend;
private String destCredentialBackend;
private String mftAuthorizationToken;
private boolean affinityTransfer;
private Map<String, Integer> targetAgents;

Expand Down Expand Up @@ -161,4 +162,12 @@ public TransferRequest setTargetAgents(Map<String, Integer> targetAgents) {
this.targetAgents = targetAgents;
return this;
}

public String getMftAuthorizationToken() {
return mftAuthorizationToken;
}

public void setMftAuthorizationToken(String mftAuthorizationToken) {
this.mftAuthorizationToken = mftAuthorizationToken;
}
}
44 changes: 25 additions & 19 deletions agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.airavata.mft.agent.http.HttpServer;
import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
import org.apache.airavata.mft.agent.rpc.RPCParser;
import org.apache.airavata.mft.core.AuthZToken;
import org.apache.airavata.mft.core.ConnectorResolver;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
import org.apache.airavata.mft.core.api.Connector;
Expand Down Expand Up @@ -64,6 +65,10 @@ public class MFTAgent implements CommandLineRunner {
@org.springframework.beans.factory.annotation.Value("${agent.id}")
private String agentId;


@org.springframework.beans.factory.annotation.Value("${agent.secret}")
private String agentSecret;

@org.springframework.beans.factory.annotation.Value("${agent.host}")
private String agentHost;

Expand Down Expand Up @@ -116,8 +121,8 @@ public class MFTAgent implements CommandLineRunner {
private HttpTransferRequestsStore transferRequestsStore;

public void init() {
transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId );
rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId );
transferMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_TRANSFER_REQUEST_MESSAGE_PATH + agentId);
rpcMessageCache = KVCache.newCache(mftConsulClient.getKvClient(), MFTConsulClient.AGENTS_RPC_REQUEST_MESSAGE_PATH + agentId);
}

private void acceptRPCRequests() {
Expand Down Expand Up @@ -159,13 +164,14 @@ private void acceptTransferRequests() {
.setPublisher(agentId)
.setDescription("Starting the transfer"));

AuthZToken authZToken = new AuthZToken(request.getMftAuthorizationToken(), agentId, agentSecret);
Optional<Connector> inConnectorOpt = ConnectorResolver.resolveConnector(request.getSourceType(), "IN");
Connector inConnector = inConnectorOpt.orElseThrow(() -> new Exception("Could not find an in connector for given input"));
inConnector.init(request.getSourceStorageId(), request.getSourceToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
inConnector.init(authZToken,request.getSourceStorageId(), request.getSourceToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);

Optional<Connector> outConnectorOpt = ConnectorResolver.resolveConnector(request.getDestinationType(), "OUT");
Connector outConnector = outConnectorOpt.orElseThrow(() -> new Exception("Could not find an out connector for given input"));
outConnector.init(request.getDestinationStorageId(), request.getDestinationToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
outConnector.init(authZToken, request.getDestinationStorageId(), request.getDestinationToken(), resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);

Optional<MetadataCollector> srcMetadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(request.getSourceType());
MetadataCollector srcMetadataCollector = srcMetadataCollectorOp.orElseThrow(() -> new Exception("Could not find a metadata collector for source"));
Expand All @@ -183,22 +189,22 @@ private void acceptTransferRequests() {
.setDescription("Started the transfer"));


String transferId = mediator.transfer(request, inConnector, outConnector, srcMetadataCollector, dstMetadataCollector,
(id, st) -> {
try {
mftConsulClient.submitTransferStateToProcess(id, agentId, st.setPublisher(agentId));
} catch (MFTConsulClientException e) {
logger.error("Failed while updating transfer state", e);
String transferId = mediator.transfer(authZToken,request, inConnector, outConnector, srcMetadataCollector, dstMetadataCollector,
(id, st) -> {
try {
mftConsulClient.submitTransferStateToProcess(id, agentId, st.setPublisher(agentId));
} catch (MFTConsulClientException e) {
logger.error("Failed while updating transfer state", e);
}
},
(id, transferSuccess) -> {
try {
// Delete scheduled key as the transfer completed / failed if it was placed in current session
mftConsulClient.getKvClient().deleteKey(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentId + "/" + session + "/" + id);
} catch (Exception e) {
logger.error("Failed while deleting scheduled path for transfer {}", id);
}
}
},
(id, transferSuccess) -> {
try {
// Delete scheduled key as the transfer completed / failed if it was placed in current session
mftConsulClient.getKvClient().deleteKey(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentId + "/" + session + "/" + id);
} catch (Exception e) {
logger.error("Failed while deleting scheduled path for transfer {}", id);
}
}
);

logger.info("Started the transfer " + transferId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ public void destroy() {
executor.shutdown();
}

public String transfer(TransferCommand command, Connector inConnector, Connector outConnector, MetadataCollector srcMetadataCollector,
public String transfer(AuthZToken authZToken, TransferCommand command, Connector inConnector, Connector outConnector, MetadataCollector srcMetadataCollector,
MetadataCollector destMetadataCollector, BiConsumer<String, TransferState> onStatusCallback,
BiConsumer<String, Boolean> exitingCallback) throws Exception {

FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(
FileResourceMetadata srcMetadata = srcMetadataCollector.getFileResourceMetadata(authZToken,
command.getSourceStorageId(),
command.getSourcePath(),
command.getSourceToken());
Expand Down Expand Up @@ -129,15 +129,17 @@ public void run() {
command.getDestinationPath(),
command.getDestinationToken());


if (!transferred) {
logger.error("Transfer completed but resource is not available in destination");
throw new Exception("Transfer completed but resource is not available in destination");
}

FileResourceMetadata destMetadata = destMetadataCollector.getFileResourceMetadata(
FileResourceMetadata destMetadata = destMetadataCollector.getFileResourceMetadata(authZToken,
command.getDestinationStorageId(),
command.getDestinationPath(),
command.getDestinationToken());
command.getDestinationToken());


boolean doIntegrityVerify = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,11 @@
package org.apache.airavata.mft.agent.http;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelProgressiveFuture;
import io.netty.channel.ChannelProgressiveFutureListener;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedStream;
import io.netty.util.CharsetUtil;
import org.apache.airavata.mft.core.ConnectorContext;
import org.apache.airavata.mft.core.DoubleStreamingBuffer;
import org.apache.airavata.mft.core.FileResourceMetadata;
import org.apache.airavata.mft.core.TransferTask;
import org.apache.airavata.mft.core.*;
import org.apache.airavata.mft.core.api.Connector;
import org.apache.airavata.mft.core.api.MetadataCollector;
import org.slf4j.Logger;
Expand All @@ -41,9 +33,9 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static io.netty.handler.codec.http.HttpMethod.*;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpResponseStatus.*;
import static io.netty.handler.codec.http.HttpVersion.*;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

Expand Down Expand Up @@ -84,7 +76,9 @@ public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) thr

ConnectorParams params = httpTransferRequest.getConnectorParams();

connector.init(params.getStorageId(), params.getCredentialToken(), params.getResourceServiceHost(),
AuthZToken authZToken = new AuthZToken();

connector.init(authZToken, params.getStorageId(), params.getCredentialToken(), params.getResourceServiceHost(),
params.getResourceServicePort(), params.getSecretServiceHost(), params.getSecretServicePort());

metadataCollector.init(params.getResourceServiceHost(), params.getResourceServicePort(),
Expand All @@ -101,7 +95,7 @@ public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) thr
return;
}

FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(params.getStorageId(),
FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(authZToken, params.getStorageId(),
httpTransferRequest.getTargetResourcePath(),
params.getCredentialToken());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.airavata.mft.agent.rpc;
package org.apache.airavata.mft.agent.rpc;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.airavata.mft.admin.models.rpc.SyncRPCRequest;
Expand All @@ -24,6 +24,7 @@
import org.apache.airavata.mft.agent.http.HttpTransferRequest;
import org.apache.airavata.mft.agent.http.HttpTransferRequestsStore;
import org.apache.airavata.mft.core.ConnectorResolver;
import org.apache.airavata.mft.core.AuthZToken;
import org.apache.airavata.mft.core.DirectoryResourceMetadata;
import org.apache.airavata.mft.core.FileResourceMetadata;
import org.apache.airavata.mft.core.MetadataCollectorResolver;
Expand Down Expand Up @@ -73,12 +74,15 @@ public String resolveRPCRequest(SyncRPCRequest request) throws Exception {
String resourceType = request.getParameters().get("resourceType");
String resourceToken = request.getParameters().get("resourceToken");
String mftAuthorizationToken = request.getParameters().get("mftAuthorizationToken");
String agentId = request.getAgentId();
String agentSecret = request.getParameters().get("agentSecret");

Optional<MetadataCollector> metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(resourceType);
if (metadataCollectorOp.isPresent()) {
MetadataCollector metadataCollector = metadataCollectorOp.get();
metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(resourceId, resourceToken);
FileResourceMetadata fileResourceMetadata = metadataCollector
.getFileResourceMetadata(new AuthZToken(mftAuthorizationToken, agentId, agentSecret), resourceId, resourceToken);
return mapper.writeValueAsString(fileResourceMetadata);
}
break;
Expand All @@ -89,12 +93,15 @@ public String resolveRPCRequest(SyncRPCRequest request) throws Exception {
resourceToken = request.getParameters().get("resourceToken");
String childPath = request.getParameters().get("childPath");
mftAuthorizationToken = request.getParameters().get("mftAuthorizationToken");
agentId = request.getAgentId();
agentSecret = request.getParameters().get("agentSecret");

metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(resourceType);
if (metadataCollectorOp.isPresent()) {
MetadataCollector metadataCollector = metadataCollectorOp.get();
metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
FileResourceMetadata fileResourceMetadata = metadataCollector.getFileResourceMetadata(resourceId, childPath, resourceToken);
FileResourceMetadata fileResourceMetadata = metadataCollector
.getFileResourceMetadata(new AuthZToken(mftAuthorizationToken, agentId, agentSecret), resourceId, childPath, resourceToken);
return mapper.writeValueAsString(fileResourceMetadata);
}
break;
Expand All @@ -104,12 +111,15 @@ public String resolveRPCRequest(SyncRPCRequest request) throws Exception {
resourceType = request.getParameters().get("resourceType");
resourceToken = request.getParameters().get("resourceToken");
mftAuthorizationToken = request.getParameters().get("mftAuthorizationToken");
agentId = request.getAgentId();
agentSecret = request.getParameters().get("agentSecret");

metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(resourceType);
if (metadataCollectorOp.isPresent()) {
MetadataCollector metadataCollector = metadataCollectorOp.get();
metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
DirectoryResourceMetadata dirResourceMetadata = metadataCollector.getDirectoryResourceMetadata(resourceId, resourceToken);
DirectoryResourceMetadata dirResourceMetadata = metadataCollector
.getDirectoryResourceMetadata(new AuthZToken(mftAuthorizationToken, agentId, agentSecret), resourceId, resourceToken);
return mapper.writeValueAsString(dirResourceMetadata);
}
break;
Expand All @@ -120,12 +130,15 @@ public String resolveRPCRequest(SyncRPCRequest request) throws Exception {
resourceToken = request.getParameters().get("resourceToken");
childPath = request.getParameters().get("childPath");
mftAuthorizationToken = request.getParameters().get("mftAuthorizationToken");
agentId = request.getAgentId();
agentSecret = request.getParameters().get("agentSecret");

metadataCollectorOp = MetadataCollectorResolver.resolveMetadataCollector(resourceType);
if (metadataCollectorOp.isPresent()) {
MetadataCollector metadataCollector = metadataCollectorOp.get();
metadataCollector.init(resourceServiceHost, resourceServicePort, secretServiceHost, secretServicePort);
DirectoryResourceMetadata dirResourceMetadata = metadataCollector.getDirectoryResourceMetadata(resourceId, childPath, resourceToken);
DirectoryResourceMetadata dirResourceMetadata = metadataCollector
.getDirectoryResourceMetadata(new AuthZToken(mftAuthorizationToken, agentId, agentSecret), resourceId, childPath, resourceToken);
return mapper.writeValueAsString(dirResourceMetadata);
}
break;
Expand Down Expand Up @@ -169,7 +182,7 @@ public SyncRPCResponse processRPCRequest(SyncRPCRequest request) {
response.setResponseStatus(SyncRPCResponse.ResponseStatus.SUCCESS);
} catch (Exception e) {
logger.error("Errored while processing the rpc request for message {} and method {}",
request.getMessageId(), request.getMethod(), e);
request.getMessageId(), request.getMethod(), e);
response.setErrorAsStr(e.getMessage());
response.setResponseStatus(SyncRPCResponse.ResponseStatus.FAIL);
}
Expand Down
3 changes: 2 additions & 1 deletion agent/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
#

spring.main.web-application-type=NONE
agent.id=agent0
agent.id=mft-agent-c
agent.secret=CHANGE_ME
agent.host=localhost
agent.user=dimuthu
agent.http.port=3333
Expand Down
Loading