Skip to content

Commit

Permalink
Merge pull request apache#11 from kai-chi/HOPSWORKS-1081
Browse files Browse the repository at this point in the history
[HOPSWORKS-1081] Upgrade Spark to 2.4.3
  • Loading branch information
tkakantousis authored Aug 6, 2019
2 parents 69d7cfb + ad8028b commit 434f24d
Show file tree
Hide file tree
Showing 416 changed files with 7,724 additions and 2,241 deletions.
1 change: 0 additions & 1 deletion LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ com.google.code.gson:gson
com.google.inject:guice
com.google.inject.extensions:guice-servlet
com.twitter:parquet-hadoop-bundle
commons-beanutils:commons-beanutils-core
commons-cli:commons-cli
commons-dbcp:commons-dbcp
commons-io:commons-io
Expand Down
10 changes: 5 additions & 5 deletions R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
Package: SparkR
Type: Package
Version: 2.4.0
Title: R Frontend for Apache Spark
Description: Provides an R Frontend for Apache Spark.
Version: 2.4.3.0
Title: R Front End for 'Apache Spark'
Description: Provides an R Front end for 'Apache Spark' <https://spark.apache.org>.
Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
email = "shivaram@cs.berkeley.edu"),
person("Xiangrui", "Meng", role = "aut",
Expand All @@ -11,8 +11,8 @@ Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
email = "felixcheung@apache.org"),
person(family = "The Apache Software Foundation", role = c("aut", "cph")))
License: Apache License (== 2.0)
URL: http://www.apache.org/ http://spark.apache.org/
BugReports: http://spark.apache.org/contributing.html
URL: https://www.apache.org/ https://spark.apache.org/
BugReports: https://spark.apache.org/contributing.html
SystemRequirements: Java (== 8)
Depends:
R (>= 3.0),
Expand Down
1 change: 1 addition & 0 deletions R/pkg/tests/fulltests/test_streaming.R
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ test_that("Specify a schema by using a DDL-formatted string when reading", {
expect_false(awaitTermination(q, 5 * 1000))
callJMethod(q@ssq, "processAllAvailable")
expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
stopQuery(q)

expect_error(read.stream(path = parquetPath, schema = "name stri"),
"DataType stri is not supported.")
Expand Down
14 changes: 14 additions & 0 deletions R/pkg/vignettes/sparkr-vignettes.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ First, let's load and attach the package.
library(SparkR)
```

```{r, include=FALSE}
# disable eval if java version not supported
override_eval <- tryCatch(!is.numeric(SparkR:::checkJavaVersion()),
error = function(e) { TRUE },
warning = function(e) { TRUE })
if (override_eval) {
opts_hooks$set(eval = function(options) {
options$eval = FALSE
options
})
}
```

`SparkSession` is the entry point into SparkR which connects your R program to a Spark cluster. You can create a `SparkSession` using `sparkR.session` and pass in options such as the application name, any Spark packages depended on, etc.

We use default settings in which it runs in local mode. It auto downloads Spark package in the background if no previous installation is found. For more details about setup, see [Spark Session](#SetupSparkSession).
Expand Down
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.0.1</version>
<version>2.4.3.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
5 changes: 4 additions & 1 deletion bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi

export _SPARK_CMD_USAGE="Usage: ./bin/spark-shell [options]"
export _SPARK_CMD_USAGE="Usage: ./bin/spark-shell [options]
Scala REPL options:
-I <file> preload <file>, enforcing line-by-line interpretation"

# SPARK-4161: scala does not assume use of the java classpath,
# so we need to add the "-Dscala.usejavacp=true" flag manually. We
Expand Down
8 changes: 7 additions & 1 deletion bin/spark-shell2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ rem
rem Figure out where the Spark framework is installed
call "%~dp0find-spark-home.cmd"

set _SPARK_CMD_USAGE=Usage: .\bin\spark-shell.cmd [options]
set LF=^


rem two empty lines are required
set _SPARK_CMD_USAGE=Usage: .\bin\spark-shell.cmd [options]^%LF%%LF%^%LF%%LF%^
Scala REPL options:^%LF%%LF%^
-I ^<file^> preload ^<file^>, enforcing line-by-line interpretation

rem SPARK-4161: scala does not assume use of the java classpath,
rem so we need to add the "-Dscala.usejavacp=true" flag manually. We
Expand Down
10 changes: 7 additions & 3 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`
export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
"${ZINC_BIN}" -shutdown -port ${ZINC_PORT}
"${ZINC_BIN}" -start -port ${ZINC_PORT} \
-server 127.0.0.1 -idle-timeout 30m \
-server 127.0.0.1 -idle-timeout 3h \
-scala-compiler "${SCALA_COMPILER}" \
-scala-library "${SCALA_LIBRARY}" &>/dev/null
fi
Expand All @@ -163,8 +163,12 @@ export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}

echo "Using \`mvn\` from path: $MVN_BIN" 1>&2

# Last, call the `mvn` command as usual
# call the `mvn` command as usual
# SPARK-25854
"${MVN_BIN}" -DzincPort=${ZINC_PORT} "$@"
MVN_RETCODE=$?

# Try to shut down zinc explicitly
# Try to shut down zinc explicitly if the server is still running.
"${ZINC_BIN}" -shutdown -port ${ZINC_PORT}

exit $MVN_RETCODE
2 changes: 1 addition & 1 deletion common/kvstore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.0.1</version>
<version>2.4.3.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion common/network-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.4.0.1</version>
<version>2.4.3.0</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,21 @@ public void close() throws IOException {
// accurately report the errors when they happen.
RuntimeException error = null;
byte[] dummy = new byte[8];
try {
doCipherOp(encryptor, dummy, true);
} catch (Exception e) {
error = new RuntimeException(e);
if (encryptor != null) {
try {
doCipherOp(Cipher.ENCRYPT_MODE, dummy, true);
} catch (Exception e) {
error = new RuntimeException(e);
}
encryptor = null;
}
try {
doCipherOp(decryptor, dummy, true);
} catch (Exception e) {
error = new RuntimeException(e);
if (decryptor != null) {
try {
doCipherOp(Cipher.DECRYPT_MODE, dummy, true);
} catch (Exception e) {
error = new RuntimeException(e);
}
decryptor = null;
}
random.close();

Expand All @@ -189,11 +195,11 @@ byte[] rawResponse(byte[] challenge) {
}

private byte[] decrypt(byte[] in) throws GeneralSecurityException {
return doCipherOp(decryptor, in, false);
return doCipherOp(Cipher.DECRYPT_MODE, in, false);
}

private byte[] encrypt(byte[] in) throws GeneralSecurityException {
return doCipherOp(encryptor, in, false);
return doCipherOp(Cipher.ENCRYPT_MODE, in, false);
}

private void initializeForAuth(String cipher, byte[] nonce, SecretKeySpec key)
Expand All @@ -205,11 +211,13 @@ private void initializeForAuth(String cipher, byte[] nonce, SecretKeySpec key)
byte[] iv = new byte[conf.ivLength()];
System.arraycopy(nonce, 0, iv, 0, Math.min(nonce.length, iv.length));

encryptor = CryptoCipherFactory.getCryptoCipher(cipher, cryptoConf);
encryptor.init(Cipher.ENCRYPT_MODE, key, new IvParameterSpec(iv));
CryptoCipher _encryptor = CryptoCipherFactory.getCryptoCipher(cipher, cryptoConf);
_encryptor.init(Cipher.ENCRYPT_MODE, key, new IvParameterSpec(iv));
this.encryptor = _encryptor;

decryptor = CryptoCipherFactory.getCryptoCipher(cipher, cryptoConf);
decryptor.init(Cipher.DECRYPT_MODE, key, new IvParameterSpec(iv));
CryptoCipher _decryptor = CryptoCipherFactory.getCryptoCipher(cipher, cryptoConf);
_decryptor.init(Cipher.DECRYPT_MODE, key, new IvParameterSpec(iv));
this.decryptor = _decryptor;
}

/**
Expand Down Expand Up @@ -241,29 +249,52 @@ private SecretKeySpec generateKey(String kdf, int iterations, byte[] salt, int k
return new SecretKeySpec(key.getEncoded(), conf.keyAlgorithm());
}

private byte[] doCipherOp(CryptoCipher cipher, byte[] in, boolean isFinal)
private byte[] doCipherOp(int mode, byte[] in, boolean isFinal)
throws GeneralSecurityException {

Preconditions.checkState(cipher != null);
CryptoCipher cipher;
switch (mode) {
case Cipher.ENCRYPT_MODE:
cipher = encryptor;
break;
case Cipher.DECRYPT_MODE:
cipher = decryptor;
break;
default:
throw new IllegalArgumentException(String.valueOf(mode));
}

int scale = 1;
while (true) {
int size = in.length * scale;
byte[] buffer = new byte[size];
try {
int outSize = isFinal ? cipher.doFinal(in, 0, in.length, buffer, 0)
: cipher.update(in, 0, in.length, buffer, 0);
if (outSize != buffer.length) {
byte[] output = new byte[outSize];
System.arraycopy(buffer, 0, output, 0, output.length);
return output;
} else {
return buffer;
Preconditions.checkState(cipher != null, "Cipher is invalid because of previous error.");

try {
int scale = 1;
while (true) {
int size = in.length * scale;
byte[] buffer = new byte[size];
try {
int outSize = isFinal ? cipher.doFinal(in, 0, in.length, buffer, 0)
: cipher.update(in, 0, in.length, buffer, 0);
if (outSize != buffer.length) {
byte[] output = new byte[outSize];
System.arraycopy(buffer, 0, output, 0, output.length);
return output;
} else {
return buffer;
}
} catch (ShortBufferException e) {
// Try again with a bigger buffer.
scale *= 2;
}
} catch (ShortBufferException e) {
// Try again with a bigger buffer.
scale *= 2;
}
} catch (InternalError ie) {
// SPARK-25535. The commons-cryto library will throw InternalError if something goes wrong,
// and leave bad state behind in the Java wrappers, so it's not safe to use them afterwards.
if (mode == Cipher.ENCRYPT_MODE) {
this.encryptor = null;
} else {
this.decryptor = null;
}
throw ie;
}
}

Expand Down
Loading

0 comments on commit 434f24d

Please sign in to comment.