Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into hadoop-2.7-jdk11
Browse files Browse the repository at this point in the history
# Conflicts:
#	pom.xml
  • Loading branch information
wangyum committed Nov 24, 2019
2 parents 50a11ec + a60da23 commit 728952b
Show file tree
Hide file tree
Showing 642 changed files with 6,193 additions and 3,095 deletions.
46 changes: 43 additions & 3 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ jobs:
matrix:
java: [ '1.8', '11' ]
hadoop: [ 'hadoop-2.7', 'hadoop-3.2' ]
hive: [ 'hive-1.2', 'hive-2.3' ]
exclude:
- java: '11'
hadoop: 'hadoop-2.7'
name: Build Spark with JDK ${{ matrix.java }} and ${{ matrix.hadoop }}
- java: '11'
hive: 'hive-1.2'
- hadoop: 'hadoop-3.2'
hive: 'hive-1.2'
name: Build Spark - JDK${{ matrix.java }}/${{ matrix.hadoop }}/${{ matrix.hive }}

steps:
- uses: actions/checkout@master
Expand All @@ -36,6 +41,18 @@ jobs:
key: ${{ matrix.java }}-${{ matrix.hadoop }}-maven-org-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ matrix.java }}-${{ matrix.hadoop }}-maven-org-
- uses: actions/cache@v1
with:
path: ~/.m2/repository/net
key: ${{ matrix.java }}-${{ matrix.hadoop }}-maven-net-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ matrix.java }}-${{ matrix.hadoop }}-maven-net-
- uses: actions/cache@v1
with:
path: ~/.m2/repository/io
key: ${{ matrix.java }}-${{ matrix.hadoop }}-maven-io-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ matrix.java }}-${{ matrix.hadoop }}-maven-io-
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v1
with:
Expand All @@ -44,13 +61,13 @@ jobs:
run: |
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=1g -Dorg.slf4j.simpleLogger.defaultLogLevel=WARN"
export MAVEN_CLI_OPTS="--no-transfer-progress"
./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -P${{ matrix.hadoop }} -Phadoop-cloud -Djava.version=${{ matrix.java }} install
./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Phive -P${{ matrix.hive }} -Phive-thriftserver -P${{ matrix.hadoop }} -Phadoop-cloud -Djava.version=${{ matrix.java }} install
rm -rf ~/.m2/repository/org/apache/spark
lint:
runs-on: ubuntu-latest
name: Linters
name: Linters (Java/Scala/Python), licenses, dependencies
steps:
- uses: actions/checkout@master
- uses: actions/setup-java@v1
Expand All @@ -72,3 +89,26 @@ jobs:
run: ./dev/check-license
- name: Dependencies
run: ./dev/test-dependencies.sh

lintr:
runs-on: ubuntu-latest
name: Linter (R)
steps:
- uses: actions/checkout@master
- uses: actions/setup-java@v1
with:
java-version: '11'
- name: install R
run: |
echo 'deb https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/' | sudo tee -a /etc/apt/sources.list
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0xE298A3A825C0D65DFD57CBB651716619E084DAB9" | sudo apt-key add
sudo apt-get update
sudo apt-get install -y r-base r-base-dev libcurl4-openssl-dev
- name: install R packages
run: |
sudo Rscript -e "install.packages(c('curl', 'xml2', 'httr', 'devtools', 'testthat', 'knitr', 'rmarkdown', 'roxygen2', 'e1071', 'survival'), repos='https://cloud.r-project.org/')"
sudo Rscript -e "devtools::install_github('jimhester/lintr@v2.0.0')"
- name: package and install SparkR
run: ./R/install-dev.sh
- name: lint-r
run: ./dev/lint-r
2 changes: 1 addition & 1 deletion R/pkg/.lintr
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
linters: with_defaults(line_length_linter(100), multiple_dots_linter = NULL, object_name_linter = NULL, camel_case_linter = NULL, open_curly_linter(allow_single_line = TRUE), closed_curly_linter(allow_single_line = TRUE))
linters: with_defaults(line_length_linter(100), multiple_dots_linter = NULL, object_name_linter = NULL, camel_case_linter = NULL, open_curly_linter(allow_single_line = TRUE), closed_curly_linter(allow_single_line = TRUE), object_usage_linter = NULL, cyclocomp_linter = NULL)
exclusions: list("inst/profile/general.R" = 1, "inst/profile/shell.R")
8 changes: 4 additions & 4 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2252,7 +2252,7 @@ setMethod("mutate",

# The last column of the same name in the specific columns takes effect
deDupCols <- list()
for (i in 1:length(cols)) {
for (i in seq_len(length(cols))) {
deDupCols[[ns[[i]]]] <- alias(cols[[i]], ns[[i]])
}

Expand Down Expand Up @@ -2416,7 +2416,7 @@ setMethod("arrange",
# builds a list of columns of type Column
# example: [[1]] Column Species ASC
# [[2]] Column Petal_Length DESC
jcols <- lapply(seq_len(length(decreasing)), function(i){
jcols <- lapply(seq_len(length(decreasing)), function(i) {
if (decreasing[[i]]) {
desc(getColumn(x, by[[i]]))
} else {
Expand Down Expand Up @@ -2749,7 +2749,7 @@ genAliasesForIntersectedCols <- function(x, intersectedColNames, suffix) {
col <- getColumn(x, colName)
if (colName %in% intersectedColNames) {
newJoin <- paste(colName, suffix, sep = "")
if (newJoin %in% allColNames){
if (newJoin %in% allColNames) {
stop("The following column name: ", newJoin, " occurs more than once in the 'DataFrame'.",
"Please use different suffixes for the intersected columns.")
}
Expand Down Expand Up @@ -3475,7 +3475,7 @@ setMethod("str",
cat(paste0("'", class(object), "': ", length(names), " variables:\n"))

if (nrow(localDF) > 0) {
for (i in 1 : ncol(localDF)) {
for (i in seq_len(ncol(localDF))) {
# Get the first elements for each column

firstElements <- if (types[i] == "character") {
Expand Down
8 changes: 4 additions & 4 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ writeToFileInArrow <- function(fileName, rdf, numPartitions) {
for (rdf_slice in rdf_slices) {
batch <- arrow::record_batch(rdf_slice)
if (is.null(stream_writer)) {
stream <- arrow::FileOutputStream(fileName)
stream <- arrow::FileOutputStream$create(fileName)
schema <- batch$schema
stream_writer <- arrow::RecordBatchStreamWriter(stream, schema)
stream_writer <- arrow::RecordBatchStreamWriter$create(stream, schema)
}

stream_writer$write_batch(batch)
Expand Down Expand Up @@ -197,7 +197,7 @@ getSchema <- function(schema, firstRow = NULL, rdd = NULL) {
as.list(schema)
}
if (is.null(names)) {
names <- lapply(1:length(firstRow), function(x) {
names <- lapply(seq_len(length(firstRow)), function(x) {
paste0("_", as.character(x))
})
}
Expand All @@ -213,7 +213,7 @@ getSchema <- function(schema, firstRow = NULL, rdd = NULL) {
})

types <- lapply(firstRow, infer_type)
fields <- lapply(1:length(firstRow), function(i) {
fields <- lapply(seq_len(length(firstRow)), function(i) {
structField(names[[i]], types[[i]], TRUE)
})
schema <- do.call(structType, fields)
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ spark.getSparkFiles <- function(fileName) {
#' @examples
#'\dontrun{
#' sparkR.session()
#' doubled <- spark.lapply(1:10, function(x){2 * x})
#' doubled <- spark.lapply(1:10, function(x) {2 * x})
#'}
#' @note spark.lapply since 2.0.0
spark.lapply <- function(list, func) {
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ readDeserializeInArrow <- function(inputCon) {
# for now.
dataLen <- readInt(inputCon)
arrowData <- readBin(inputCon, raw(), as.integer(dataLen), endian = "big")
batches <- arrow::RecordBatchStreamReader(arrowData)$batches()
batches <- arrow::RecordBatchStreamReader$create(arrowData)$batches()

if (useAsTibble) {
as_tibble <- get("as_tibble", envir = asNamespace("arrow"))
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/group.R
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ methods <- c("avg", "max", "mean", "min", "sum")
#' @note pivot since 2.0.0
setMethod("pivot",
signature(x = "GroupedData", colname = "character"),
function(x, colname, values = list()){
function(x, colname, values = list()) {
stopifnot(length(colname) == 1)
if (length(values) == 0) {
result <- callJMethod(x@sgd, "pivot", colname)
Expand Down
14 changes: 9 additions & 5 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ hashCode <- function(key) {
} else {
asciiVals <- sapply(charToRaw(key), function(x) { strtoi(x, 16L) })
hashC <- 0
for (k in 1:length(asciiVals)) {
for (k in seq_len(length(asciiVals))) {
hashC <- mult31AndAdd(hashC, asciiVals[k])
}
as.integer(hashC)
Expand Down Expand Up @@ -543,10 +543,14 @@ processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
funcList <- mget(nodeChar, envir = checkedFuncs, inherits = F,
ifnotfound = list(list(NULL)))[[1]]
found <- sapply(funcList, function(func) {
ifelse(identical(func, obj), TRUE, FALSE)
ifelse(
identical(func, obj) &&
# Also check if the parent environment is identical to current parent
identical(parent.env(environment(func)), func.env),
TRUE, FALSE)
})
if (sum(found) > 0) {
# If function has been examined, ignore.
# If function has been examined ignore
break
}
# Function has not been examined, record it and recursively clean its closure.
Expand Down Expand Up @@ -724,7 +728,7 @@ assignNewEnv <- function(data) {
stopifnot(length(cols) > 0)

env <- new.env()
for (i in 1:length(cols)) {
for (i in seq_len(length(cols))) {
assign(x = cols[i], value = data[, cols[i], drop = F], envir = env)
}
env
Expand All @@ -750,7 +754,7 @@ launchScript <- function(script, combinedArgs, wait = FALSE, stdout = "", stderr
if (.Platform$OS.type == "windows") {
scriptWithArgs <- paste(script, combinedArgs, sep = " ")
# on Windows, intern = F seems to mean output to the console. (documentation on this is missing)
shell(scriptWithArgs, translate = TRUE, wait = wait, intern = wait) # nolint
shell(scriptWithArgs, translate = TRUE, wait = wait, intern = wait)
} else {
# http://stat.ethz.ch/R-manual/R-devel/library/base/html/system2.html
# stdout = F means discard output
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/worker/worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ if (isEmpty != 0) {
} else {
# gapply mode
outputs <- list()
for (i in 1:length(data)) {
for (i in seq_len(length(data))) {
# Timing reading input data for execution
inputElap <- elapsedSecs()
output <- compute(mode, partition, serializer, deserializer, keys[[i]],
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ test_that("structField type strings", {
typeList <- c(primitiveTypes, complexTypes)
typeStrings <- names(typeList)

for (i in seq_along(typeStrings)){
for (i in seq_along(typeStrings)) {
typeString <- typeStrings[i]
expected <- typeList[[i]]
testField <- structField("_col", typeString)
Expand Down Expand Up @@ -203,7 +203,7 @@ test_that("structField type strings", {
errorList <- c(primitiveErrors, complexErrors)
typeStrings <- names(errorList)

for (i in seq_along(typeStrings)){
for (i in seq_along(typeStrings)) {
typeString <- typeStrings[i]
expected <- paste0("Unsupported type for SparkDataframe: ", errorList[[i]])
expect_error(structField("_col", typeString), expected)
Expand Down
9 changes: 9 additions & 0 deletions R/pkg/tests/fulltests/test_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ test_that("cleanClosure on R functions", {
actual <- get("y", envir = env, inherits = FALSE)
expect_equal(actual, y)

# Test for combination for nested and sequenctial functions in a closure
f1 <- function(x) x + 1
f2 <- function(x) f1(x) + 2
userFunc <- function(x) { f1(x); f2(x) }
cUserFuncEnv <- environment(cleanClosure(userFunc))
expect_equal(length(cUserFuncEnv), 2)
innerCUserFuncEnv <- environment(cUserFuncEnv$f2)
expect_equal(length(innerCUserFuncEnv), 1)

# Test for function (and variable) definitions.
f <- function(x) {
g <- function(y) { y * 2 }
Expand Down
2 changes: 1 addition & 1 deletion R/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE

SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
FAILED=$((PIPESTATUS[0]||$FAILED))

NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)"
Expand Down
7 changes: 2 additions & 5 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,7 @@ install:
# Install maven and dependencies
- ps: .\dev\appveyor-install-dependencies.ps1
# Required package for R unit tests
- cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'e1071', 'survival'), repos='https://cloud.r-project.org/')"
# Use Arrow R 0.14.1 for now. 0.15.0 seems not working for now. See SPARK-29378.
- cmd: R -e "install.packages(c('assertthat', 'bit64', 'fs', 'purrr', 'R6', 'tidyselect'), repos='https://cloud.r-project.org/')"
- cmd: R -e "install.packages('https://cran.r-project.org/src/contrib/Archive/arrow/arrow_0.14.1.tar.gz', repos=NULL, type='source')"
- cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'e1071', 'survival', 'arrow'), repos='https://cloud.r-project.org/')"
# Here, we use the fixed version of testthat. For more details, please see SPARK-22817.
# As of devtools 2.1.0, it requires testthat higher then 2.1.1 as a dependency. SparkR test requires testthat 1.0.2.
# Therefore, we don't use devtools but installs it directly from the archive including its dependencies.
Expand All @@ -56,7 +53,7 @@ install:
build_script:
# '-Djna.nosys=true' is required to avoid kernel32.dll load failure.
# See SPARK-28759.
- cmd: mvn -DskipTests -Psparkr -Phive -Djna.nosys=true package
- cmd: mvn -DskipTests -Psparkr -Phive -Phive-1.2 -Djna.nosys=true package

environment:
NOT_CRAN: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ CryptoOutputStream createOutputStream(WritableByteChannel ch) throws IOException
return new CryptoOutputStream(cipher, conf, ch, key, new IvParameterSpec(outIv));
}

private CryptoInputStream createInputStream(ReadableByteChannel ch) throws IOException {
@VisibleForTesting
CryptoInputStream createInputStream(ReadableByteChannel ch) throws IOException {
return new CryptoInputStream(cipher, conf, ch, key, new IvParameterSpec(inIv));
}

Expand Down Expand Up @@ -166,34 +167,45 @@ private static class DecryptionHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object data) throws Exception {
if (!isCipherValid) {
throw new IOException("Cipher is in invalid state.");
}
byteChannel.feedData((ByteBuf) data);

byte[] decryptedData = new byte[byteChannel.readableBytes()];
int offset = 0;
while (offset < decryptedData.length) {
// SPARK-25535: workaround for CRYPTO-141.
try {
offset += cis.read(decryptedData, offset, decryptedData.length - offset);
} catch (InternalError ie) {
isCipherValid = false;
throw ie;
ByteBuf buffer = (ByteBuf) data;

try {
if (!isCipherValid) {
throw new IOException("Cipher is in invalid state.");
}
byte[] decryptedData = new byte[buffer.readableBytes()];
byteChannel.feedData(buffer);

int offset = 0;
while (offset < decryptedData.length) {
// SPARK-25535: workaround for CRYPTO-141.
try {
offset += cis.read(decryptedData, offset, decryptedData.length - offset);
} catch (InternalError ie) {
isCipherValid = false;
throw ie;
}
}
}

ctx.fireChannelRead(Unpooled.wrappedBuffer(decryptedData, 0, decryptedData.length));
ctx.fireChannelRead(Unpooled.wrappedBuffer(decryptedData, 0, decryptedData.length));
} finally {
buffer.release();
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// We do the closing of the stream / channel in handlerRemoved(...) as
// this method will be called in all cases:
//
// - when the Channel becomes inactive
// - when the handler is removed from the ChannelPipeline
try {
if (isCipherValid) {
cis.close();
}
} finally {
super.channelInactive(ctx);
super.handlerRemoved(ctx);
}
}
}
Expand Down
Loading

0 comments on commit 728952b

Please sign in to comment.