Skip to content

Commit

Permalink
[FLINK-6397] Unset context when integration test is finished.
Browse files Browse the repository at this point in the history
  • Loading branch information
biao.liub committed May 7, 2017
1 parent fde4f90 commit 829f24c
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.TestEnvironment;

import org.junit.After;
Expand All @@ -56,11 +55,6 @@
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -229,13 +223,14 @@ public static void closeCassandra() {

@Before
public void initializeExecutionEnvironment() {
TestStreamEnvironment.setAsContext(flinkCluster, 4);
new TestEnvironment(flinkCluster, 4, false).setAsContext();
}

@After
public void deleteSchema() throws Exception {
session.executeAsync(CLEAR_TABLE_QUERY);

TestEnvironment.unsetAsContext();
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ trait FlinkTestBase extends BeforeAndAfter {

after {
cluster.foreach(c => TestBaseUtils.stopCluster(c, TestBaseUtils.DEFAULT_TIMEOUT))

TestEnvironment.unsetAsContext()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public JobExecutionResult execute(String jobName) throws Exception {
return result;
}

protected void setAsContext() {
public void setAsContext() {
ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
@Override
public ExecutionEnvironment createExecutionEnvironment() {
Expand All @@ -60,4 +60,8 @@ public ExecutionEnvironment createExecutionEnvironment() {

initializeContextEnvironment(factory);
}

public static void unsetAsContext() {
resetContextEnvironment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ protected boolean skipCollectionExecution() {
@Test
public void testJobWithObjectReuse() throws Exception {
isCollectionExecution = false;

startCluster();

try {
// pre-submit
try {
Expand All @@ -109,7 +110,7 @@ public void testJobWithObjectReuse() throws Exception {
e.printStackTrace();
Assert.fail("Pre-submit work caused an error: " + e.getMessage());
}

// prepare the test environment
TestEnvironment env = new TestEnvironment(this.executor, this.parallelism, false);
env.getConfig().enableObjectReuse();
Expand Down Expand Up @@ -143,6 +144,7 @@ public void testJobWithObjectReuse() throws Exception {
}
} finally {
stopCluster();
TestEnvironment.unsetAsContext();
}
}

Expand Down Expand Up @@ -195,6 +197,7 @@ public void testJobWithoutObjectReuse() throws Exception {
}
} finally {
stopCluster();
TestEnvironment.unsetAsContext();
}
}

Expand Down Expand Up @@ -231,6 +234,8 @@ public void testJobCollectionExecution() throws Exception {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Error while calling the test program: " + e.getMessage());
} finally {
env.unsetContext();
}

Assert.assertNotNull("The test program never triggered an execution.", this.latestExecutionResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,9 @@ public enum TestExecutionMode {

protected final TestExecutionMode mode;


public MultipleProgramsTestBase(TestExecutionMode mode) {
this.mode = mode;

switch(mode){
case CLUSTER:
new TestEnvironment(cluster, 4, false).setAsContext();
Expand All @@ -95,14 +94,15 @@ public MultipleProgramsTestBase(TestExecutionMode mode) {
new CollectionTestEnvironment().setAsContext();
break;
}

}

// ------------------------------------------------------------------------
// Cluster setup & teardown
// ------------------------------------------------------------------------

@BeforeClass
public static void setup() throws Exception {
public void setup() throws Exception {
cluster = TestBaseUtils.startCluster(
1,
DEFAULT_PARALLELISM,
Expand All @@ -112,8 +112,11 @@ public static void setup() throws Exception {
}

@AfterClass
public static void teardown() throws Exception {
public void teardown() throws Exception {
stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);

TestEnvironment.unsetAsContext();
CollectionTestEnvironment.unsetAsContext();
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -70,6 +71,11 @@ public void prepare() {
clusterEnv.setAsContext();
}

@After
public void cleanup() {
TestEnvironment.unsetAsContext();
}

// ------------------------------------------------------------------------

/**
Expand Down

0 comments on commit 829f24c

Please sign in to comment.