forked from nextflow-io/nextflow
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2 from nextcode-health/NEX-14
NEX-14: Initial implementation
- Loading branch information
Showing
6 changed files
with
411 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
package nextflow.cloud.gce; | ||
|
||
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; | ||
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; | ||
import com.google.api.client.http.HttpTransport; | ||
import com.google.api.client.json.JsonFactory; | ||
import com.google.api.client.json.jackson2.JacksonFactory; | ||
import com.google.api.services.compute.Compute; | ||
import com.google.api.services.compute.model.*; | ||
|
||
import java.io.IOException; | ||
import java.math.BigInteger; | ||
import java.security.GeneralSecurityException; | ||
import java.util.*; | ||
|
||
/** | ||
* Helper class for Google Compute Engine | ||
* | ||
* @author Vilmundur Pálmason <vilmundur@wuxinextcode.com> | ||
*/ | ||
public class GceApiHelper { | ||
private static final String PROJECT_PREFIX = "https://www.googleapis.com/compute/v1/projects/"; | ||
private final String project; | ||
private final String zone; | ||
private Compute compute; | ||
Random random = new Random(); | ||
|
||
public GceApiHelper(String project, String zone) throws IOException, GeneralSecurityException { | ||
this.project = project; | ||
this.zone = zone; | ||
this.compute = createComputeService(); | ||
} | ||
|
||
public Compute compute() { | ||
return compute; | ||
} | ||
|
||
public Compute createComputeService() throws IOException, GeneralSecurityException { | ||
HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport(); | ||
JsonFactory jsonFactory = JacksonFactory.getDefaultInstance(); | ||
|
||
GoogleCredential credential = GoogleCredential.getApplicationDefault(); | ||
|
||
if (credential.createScopedRequired()) { | ||
credential = | ||
credential.createScoped(Arrays.asList("https://www.googleapis.com/auth/cloud-platform")); | ||
} | ||
|
||
return new Compute.Builder(httpTransport, jsonFactory, credential) | ||
.setApplicationName("NextCode-Experiments/0.1") | ||
.build(); | ||
} | ||
|
||
public String projectZonePrefix() { | ||
return PROJECT_PREFIX + project + "/zones/" + zone +"/"; | ||
} | ||
|
||
/** | ||
* Full name of machine type | ||
* @param shortName Short name such as "n1-standard-1" | ||
* @return Fully qualifie machine type | ||
*/ | ||
public String instanceType(String shortName) { | ||
return projectZonePrefix() + "machineTypes/"+shortName; | ||
} | ||
|
||
/** | ||
* Full name of image. | ||
* @param imagePath including image project (e.g. "debian-cloud/global/images/debian-7-wheezy-v20150710" ) | ||
* @return Fully qualified image name | ||
*/ | ||
public String imageName(String imagePath) { | ||
return PROJECT_PREFIX + imagePath; | ||
} | ||
|
||
public AttachedDisk setBootDisk(Instance instance, String imagePath) { | ||
AttachedDisk disk = new AttachedDisk(); | ||
disk.setBoot(true); | ||
disk.setAutoDelete(true); | ||
disk.setType("PERSISTENT"); | ||
AttachedDiskInitializeParams params = new AttachedDiskInitializeParams(); | ||
// Assign the Persistent Disk the same name as the VM Instance. | ||
if (instance.getName() != null) { | ||
params.setDiskName(instance.getName()); | ||
} | ||
// Specify the source operating system machine image to be used by the VM Instance. | ||
params.setSourceImage(imageName(imagePath)); | ||
// Specify the disk type as Standard Persistent Disk | ||
params.setDiskType(projectZonePrefix()+ "diskTypes/pd-standard"); | ||
disk.setInitializeParams(params); | ||
instance.setDisks(Collections.singletonList(disk)); | ||
return disk; | ||
} | ||
|
||
public NetworkInterface setNetworkInterface(Instance inst) { | ||
NetworkInterface ifc = new NetworkInterface(); | ||
ifc.setNetwork(PROJECT_PREFIX + project + "/global/networks/default"); | ||
List<AccessConfig> configs = new ArrayList<>(); | ||
AccessConfig config = new AccessConfig(); | ||
config.setType("ONE_TO_ONE_NAT"); | ||
config.setName("External NAT"); | ||
configs.add(config); | ||
ifc.setAccessConfigs(configs); | ||
inst.setNetworkInterfaces(Collections.singletonList(ifc)); | ||
return ifc; | ||
} | ||
|
||
public String randomName(String baseName) { | ||
return baseName + randomName(); | ||
} | ||
|
||
public String randomName() { | ||
byte[] bytes = new byte[5]; | ||
random.nextBytes(bytes); | ||
return new BigInteger(bytes).abs().toString(16); | ||
} | ||
|
||
public Metadata createMetadata(String ... tagVal) { | ||
Metadata metadata = new Metadata(); | ||
|
||
List<Metadata.Items> items = new ArrayList<>(); | ||
for (int i=0;i<tagVal.length -1; i+=2) { | ||
Metadata.Items it = new Metadata.Items(); | ||
it.set(tagVal[i],tagVal[i+1]); | ||
items.add(it); | ||
} | ||
metadata.setItems(items); | ||
System.out.println(metadata); | ||
return metadata; | ||
} | ||
|
||
public Operation.Error blockUntilComplete(Iterable<Operation> ops, long timeoutMs) throws InterruptedException, IOException { | ||
long start = System.currentTimeMillis(); | ||
for (Operation op: ops) { | ||
Operation.Error result = blockUntilComplete(op,timeoutMs - (System.currentTimeMillis() - start)); | ||
if (result != null) return null; | ||
} | ||
return null; | ||
} | ||
|
||
public Operation.Error blockUntilComplete(Operation operation, long timeoutMs) throws InterruptedException, IOException { | ||
long start = System.currentTimeMillis(); | ||
final long pollInterval = 5 * 1000; | ||
String zone = operation.getZone(); // null for global/regional operations | ||
if (zone != null) { | ||
String[] bits = zone.split("/"); | ||
zone = bits[bits.length - 1]; | ||
} | ||
String status = operation.getStatus(); | ||
String opId = operation.getName(); | ||
while (operation != null && !status.equals("DONE")) { | ||
Thread.sleep(pollInterval); | ||
long elapsed = System.currentTimeMillis() - start; | ||
if (elapsed >= timeoutMs) { | ||
throw new InterruptedException("Timed out waiting for operation to complete"); | ||
} | ||
System.out.println("waiting..."); | ||
if (zone != null) { | ||
Compute.ZoneOperations.Get get = compute.zoneOperations().get(project, zone, opId); | ||
operation = get.execute(); | ||
} else { | ||
Compute.GlobalOperations.Get get = compute.globalOperations().get(project, opId); | ||
operation = get.execute(); | ||
} | ||
if (operation != null) { | ||
status = operation.getStatus(); | ||
} | ||
} | ||
return operation == null ? null : operation.getError(); | ||
} | ||
|
||
public Image lookupImage(String imagePath) throws IOException { | ||
return compute.images().get(project,imageName(imagePath)).execute(); | ||
} | ||
|
||
public MachineType lookupMachineType(String machineType) throws IOException { | ||
return compute.machineTypes().get(project,zone,machineType).execute(); | ||
} | ||
} |
183 changes: 183 additions & 0 deletions
183
src/main/groovy/nextflow/cloud/gce/GoogleCloudDriver.groovy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
package nextflow.cloud.gce | ||
|
||
import com.google.api.services.compute.Compute | ||
import com.google.api.services.compute.model.Instance | ||
import com.google.api.services.compute.model.Operation | ||
import groovy.transform.CompileDynamic | ||
import groovy.transform.CompileStatic | ||
import groovy.transform.stc.ClosureParams | ||
import groovy.transform.stc.SimpleType | ||
import groovy.util.logging.Slf4j | ||
import nextflow.Global | ||
import nextflow.cloud.CloudDriver | ||
import nextflow.cloud.LaunchConfig | ||
import nextflow.cloud.types.CloudInstanceStatus | ||
import nextflow.cloud.types.CloudInstanceType | ||
import nextflow.exception.AbortOperationException | ||
import nextflow.util.ServiceName | ||
|
||
import java.security.GeneralSecurityException | ||
|
||
@Slf4j | ||
@CompileStatic | ||
@ServiceName('gce') | ||
/** | ||
* Cloud driver implementation for Google Compute Engine | ||
* | ||
* @author Vilmundur Pálmason <vilmundur@wuxinextcode.com> | ||
*/ | ||
class GoogleCloudDriver implements CloudDriver { | ||
|
||
/** | ||
* The GCE zone eg. {@code us-central1-f}. If it's not specified the current region is retrieved from | ||
* the GCE instance metadata | ||
*/ | ||
private String zone | ||
private String project | ||
private String group = "inst" | ||
|
||
private GceApiHelper helper | ||
|
||
/** | ||
* Initialise the Google cloud driver with default (empty) parameters | ||
*/ | ||
GoogleCloudDriver() { | ||
this(Collections.emptyMap()) | ||
} | ||
|
||
/** | ||
* Initialise the Google cloud driver with the specified parameters | ||
* | ||
* @param config | ||
* A map holding the driver parameters: | ||
* - zone: the GCE zone | ||
* - project: GCE project id | ||
*/ | ||
@CompileDynamic | ||
GoogleCloudDriver(Map config) { | ||
log.debug("Config: {}",config) | ||
log.debug("Global config: {}",Global.getConfig()) | ||
this.zone = config.zone ?: Global.getConfig().gce.zone | ||
this.project = config.project ?: Global.getConfig().gce.project | ||
if (!(this.zone && this.project)) { | ||
throw new AbortOperationException("Need GCE project and region") | ||
} | ||
log.debug("Starting GoogleCloudDriver in project {} and zone {}",this.project,this.zone) | ||
this.helper = new GceApiHelper(project,zone) | ||
} | ||
|
||
/** | ||
* Gets {@link Compute} instance given the current | ||
* configuration parameter | ||
* | ||
* @return | ||
* An {@link Compute} instance | ||
*/ | ||
synchronized Compute getClient() { | ||
return helper.compute() | ||
} | ||
|
||
@Override | ||
void validate(LaunchConfig config) { | ||
if( !config.imageId ) | ||
throw new IllegalArgumentException("Missing mandatory cloud `imageId` setting") | ||
|
||
if( !config.instanceType ) | ||
throw new IllegalStateException("Missing mandatory cloud `instanceType` setting") | ||
|
||
if( !helper.lookupMachineType(config.instanceType) ) | ||
throw new IllegalArgumentException("Unknown GCE machine type: ${config.instanceType}") | ||
|
||
} | ||
|
||
@Override | ||
List<String> launchInstances(int instanceCount, LaunchConfig config) { | ||
def result = new ArrayList<>() | ||
def ops = new ArrayList<Operation>() | ||
instanceCount.times { | ||
Instance inst = new Instance(); | ||
inst.setName(helper.randomName(config.getClusterName() + "-")); | ||
inst.setMachineType(helper.instanceType(config.getInstanceType())); | ||
helper.setBootDisk(inst,config.getImageId()); | ||
helper.setNetworkInterface(inst); | ||
Compute.Instances.Insert insert = helper.compute().instances().insert(project, zone, inst); | ||
|
||
result << inst.getName() | ||
ops << insert.execute() | ||
System.out.println("Launching: "+result) | ||
} | ||
helper.blockUntilComplete(ops,5*60*1000); | ||
System.out.println("Launched: "+result) | ||
return result | ||
} | ||
|
||
@Override | ||
void waitInstanceStatus(Collection<String> instanceIds, CloudInstanceStatus status) { | ||
unsupported("waitInstanceStatus") | ||
} | ||
|
||
@Override | ||
void tagInstances(Collection<String> instanceIds, Map<String, String> tags) { | ||
unsupported("tagInstances") | ||
} | ||
|
||
@Override | ||
void eachSpotPrice(List<String> instanceTypes, | ||
@ClosureParams(value=SimpleType, options = ['nextflow.cloud.types.CloudSpotPrice']) Closure callback) { | ||
unsupported("eachSpotPrice") | ||
|
||
} | ||
|
||
@Override | ||
void eachInstanceWithTags(Map tags, | ||
@ClosureParams(value=SimpleType, options = ['nextflow.cloud.types.CloudInstance']) Closure callback) { | ||
unsupported("eachInstanceWithTags") | ||
|
||
} | ||
|
||
@Override | ||
void eachInstanceWithIds(List<String> instanceIds, | ||
@ClosureParams(value=SimpleType, options = ['nextflow.cloud.types.CloudInstance']) Closure callback) { | ||
unsupported("eachInstanceWithIds") | ||
} | ||
|
||
@Override | ||
void eachInstance( | ||
@ClosureParams(value=SimpleType, options = ['nextflow.cloud.types.CloudInstance']) Closure callback) { | ||
unsupported("eachInstance") | ||
} | ||
|
||
@Override | ||
List<String> listPrivateIPs(String clusterName) { | ||
unsupported("listPrivateIPs") | ||
return null | ||
} | ||
|
||
@Override | ||
void terminateInstances(Collection<String> instanceIds) { | ||
unsupported("terminateInstances") | ||
} | ||
|
||
@Override | ||
String getLocalInstanceId() { | ||
unsupported("getLocalInstanceId") | ||
} | ||
|
||
@Override | ||
String getLocalTerminationNotice() { | ||
unsupported("getLocalTerminationNotice") | ||
} | ||
|
||
@Override | ||
CloudInstanceType describeInstanceType(String instanceType) { | ||
unsupported("describeInstanceType") | ||
return null | ||
} | ||
|
||
/** | ||
* @TODO: This method will be removed once all methods are implemented | ||
*/ | ||
private void unsupported(String msg) { | ||
log.warn("UNSUPPORTED: "+msg) | ||
} | ||
} |
Oops, something went wrong.