Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into json-encoding-line-sep
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxGekk committed Apr 27, 2018
2 parents a7be182 + 3fd297a commit e0cebf4
Show file tree
Hide file tree
Showing 216 changed files with 8,348 additions and 1,730 deletions.
4 changes: 4 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ exportMethods("%<=>%",
"approxCountDistinct",
"approxQuantile",
"array_contains",
"array_max",
"array_min",
"array_position",
"asc",
"ascii",
"asin",
Expand Down Expand Up @@ -245,6 +248,7 @@ exportMethods("%<=>%",
"decode",
"dense_rank",
"desc",
"element_at",
"encode",
"endsWith",
"exp",
Expand Down
69 changes: 67 additions & 2 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ NULL
#' the map or array of maps.
#' \item \code{from_json}: it is the column containing the JSON string.
#' }
#' @param value A value to compute on.
#' \itemize{
#' \item \code{array_contains}: a value to be checked if contained in the column.
#' \item \code{array_position}: a value to locate in the given array.
#' }
#' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains
#' additional named properties to control how it is converted, accepts the same
#' options as the JSON data source.
Expand All @@ -201,14 +206,17 @@ NULL
#' df <- createDataFrame(cbind(model = rownames(mtcars), mtcars))
#' tmp <- mutate(df, v1 = create_array(df$mpg, df$cyl, df$hp))
#' head(select(tmp, array_contains(tmp$v1, 21), size(tmp$v1)))
#' head(select(tmp, array_max(tmp$v1), array_min(tmp$v1)))
#' head(select(tmp, array_position(tmp$v1, 21)))
#' tmp2 <- mutate(tmp, v2 = explode(tmp$v1))
#' head(tmp2)
#' head(select(tmp, posexplode(tmp$v1)))
#' head(select(tmp, sort_array(tmp$v1)))
#' head(select(tmp, sort_array(tmp$v1, asc = FALSE)))
#' tmp3 <- mutate(df, v3 = create_map(df$model, df$cyl))
#' head(select(tmp3, map_keys(tmp3$v3)))
#' head(select(tmp3, map_values(tmp3$v3)))}
#' head(select(tmp3, map_values(tmp3$v3)))
#' head(select(tmp3, element_at(tmp3$v3, "Valiant")))}
NULL

#' Window functions for Column operations
Expand Down Expand Up @@ -2975,7 +2983,6 @@ setMethod("row_number",
#' \code{array_contains}: Returns null if the array is null, true if the array contains
#' the value, and false otherwise.
#'
#' @param value a value to be checked if contained in the column
#' @rdname column_collection_functions
#' @aliases array_contains array_contains,Column-method
#' @note array_contains since 1.6.0
Expand All @@ -2986,6 +2993,48 @@ setMethod("array_contains",
column(jc)
})

#' @details
#' \code{array_max}: Returns the maximum value of the array.
#'
#' @rdname column_collection_functions
#' @aliases array_max array_max,Column-method
#' @note array_max since 2.4.0
setMethod("array_max",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "array_max", x@jc)
column(jc)
})

#' @details
#' \code{array_min}: Returns the minimum value of the array.
#'
#' @rdname column_collection_functions
#' @aliases array_min array_min,Column-method
#' @note array_min since 2.4.0
setMethod("array_min",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "array_min", x@jc)
column(jc)
})

#' @details
#' \code{array_position}: Locates the position of the first occurrence of the given value
#' in the given array. Returns NA if either of the arguments are NA.
#' Note: The position is not zero based, but 1 based index. Returns 0 if the given
#' value could not be found in the array.
#'
#' @rdname column_collection_functions
#' @aliases array_position array_position,Column-method
#' @note array_position since 2.4.0
setMethod("array_position",
signature(x = "Column", value = "ANY"),
function(x, value) {
jc <- callJStatic("org.apache.spark.sql.functions", "array_position", x@jc, value)
column(jc)
})

#' @details
#' \code{map_keys}: Returns an unordered array containing the keys of the map.
#'
Expand All @@ -3012,6 +3061,22 @@ setMethod("map_values",
column(jc)
})

#' @details
#' \code{element_at}: Returns element of array at given index in \code{extraction} if
#' \code{x} is array. Returns value for the given key in \code{extraction} if \code{x} is map.
#' Note: The position is not zero based, but 1 based index.
#'
#' @param extraction index to check for in array or key to check for in map
#' @rdname column_collection_functions
#' @aliases element_at element_at,Column-method
#' @note element_at since 2.4.0
setMethod("element_at",
signature(x = "Column", extraction = "ANY"),
function(x, extraction) {
jc <- callJStatic("org.apache.spark.sql.functions", "element_at", x@jc, extraction)
column(jc)
})

#' @details
#' \code{explode}: Creates a new row for each element in the given array or map column.
#'
Expand Down
16 changes: 16 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,18 @@ setGeneric("approxCountDistinct", function(x, ...) { standardGeneric("approxCoun
#' @name NULL
setGeneric("array_contains", function(x, value) { standardGeneric("array_contains") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("array_max", function(x) { standardGeneric("array_max") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("array_min", function(x) { standardGeneric("array_min") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("array_position", function(x, value) { standardGeneric("array_position") })

#' @rdname column_string_functions
#' @name NULL
setGeneric("ascii", function(x) { standardGeneric("ascii") })
Expand Down Expand Up @@ -886,6 +898,10 @@ setGeneric("decode", function(x, charset) { standardGeneric("decode") })
#' @name NULL
setGeneric("dense_rank", function(x = "missing") { standardGeneric("dense_rank") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("element_at", function(x, extraction) { standardGeneric("element_at") })

#' @rdname column_string_functions
#' @name NULL
setGeneric("encode", function(x, charset) { standardGeneric("encode") })
Expand Down
20 changes: 18 additions & 2 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1479,24 +1479,40 @@ test_that("column functions", {
df5 <- createDataFrame(list(list(a = "010101")))
expect_equal(collect(select(df5, conv(df5$a, 2, 16)))[1, 1], "15")

# Test array_contains() and sort_array()
# Test array_contains(), array_max(), array_min(), array_position(), element_at()
# and sort_array()
df <- createDataFrame(list(list(list(1L, 2L, 3L)), list(list(6L, 5L, 4L))))
result <- collect(select(df, array_contains(df[[1]], 1L)))[[1]]
expect_equal(result, c(TRUE, FALSE))

result <- collect(select(df, array_max(df[[1]])))[[1]]
expect_equal(result, c(3, 6))

result <- collect(select(df, array_min(df[[1]])))[[1]]
expect_equal(result, c(1, 4))

result <- collect(select(df, array_position(df[[1]], 1L)))[[1]]
expect_equal(result, c(1, 0))

result <- collect(select(df, element_at(df[[1]], 1L)))[[1]]
expect_equal(result, c(1, 6))

result <- collect(select(df, sort_array(df[[1]], FALSE)))[[1]]
expect_equal(result, list(list(3L, 2L, 1L), list(6L, 5L, 4L)))
result <- collect(select(df, sort_array(df[[1]])))[[1]]
expect_equal(result, list(list(1L, 2L, 3L), list(4L, 5L, 6L)))

# Test map_keys() and map_values()
# Test map_keys(), map_values() and element_at()
df <- createDataFrame(list(list(map = as.environment(list(x = 1, y = 2)))))
result <- collect(select(df, map_keys(df$map)))[[1]]
expect_equal(result, list(list("x", "y")))

result <- collect(select(df, map_values(df$map)))[[1]]
expect_equal(result, list(list(1, 2)))

result <- collect(select(df, element_at(df$map, "y")))[[1]]
expect_equal(result, 2)

# Test that stats::lag is working
expect_equal(length(lag(ldeaths, 12)), 72)

Expand Down
8 changes: 8 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,14 @@
<artifactId>spark-hadoop-cloud_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<!--
Redeclare this dependency to force it into the distribution.
-->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<scope>${hadoop.deps.scope}</scope>
</dependency>
</dependencies>
</profile>
</profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import org.apache.commons.lang3.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -98,6 +99,7 @@ private void init(String hostToBind, int portToBind) {
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, allocator)
.option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
.childOption(ChannelOption.ALLOCATOR, allocator);

this.metrics = new NettyMemoryMetrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ public static long nextPowerOf2(long num) {
}

public static int roundNumberOfBytesToNearestWord(int numBytes) {
int remainder = numBytes & 0x07; // This is equivalent to `numBytes % 8`
return (int)roundNumberOfBytesToNearestWord((long)numBytes);
}

public static long roundNumberOfBytesToNearestWord(long numBytes) {
long remainder = numBytes & 0x07; // This is equivalent to `numBytes % 8`
if (remainder == 0) {
return numBytes;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ private void check(MemoryBlock memory, Object obj, long offset, int length) {
} catch (Exception expected) {
Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
}

memory.setPageNumber(MemoryBlock.NO_PAGE_NUMBER);
}

@Test
Expand Down Expand Up @@ -165,11 +167,13 @@ public void testOffHeapArrayMemoryBlock() {
int length = 56;

check(memory, obj, offset, length);
memoryAllocator.free(memory);

long address = Platform.allocateMemory(112);
memory = new OffHeapMemoryBlock(address, length);
obj = memory.getBaseObject();
offset = memory.getBaseOffset();
check(memory, obj, offset, length);
Platform.freeMemory(address);
}
}
6 changes: 6 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<!-- With curator 2.12 SBT/Ivy doesn't get ZK on the build classpath.
Explicitly declaring it as a dependency fixes this. -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>

<!-- Jetty dependencies promoted to compile here so they are shaded
and inlined into spark-core jar -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS}
import org.apache.spark.internal.config._
import org.apache.spark.metrics.source.Source
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMaster
Expand Down Expand Up @@ -69,6 +69,10 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
* spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
* spark.dynamicAllocation.initialExecutors - Number of executors to start with
*
* spark.dynamicAllocation.executorAllocationRatio -
* This is used to reduce the parallelism of the dynamic allocation that can waste
* resources when tasks are small
*
* spark.dynamicAllocation.schedulerBacklogTimeout (M) -
* If there are backlogged tasks for this duration, add new executors
*
Expand Down Expand Up @@ -116,9 +120,12 @@ private[spark] class ExecutorAllocationManager(
// TODO: The default value of 1 for spark.executor.cores works right now because dynamic
// allocation is only supported for YARN and the default number of cores per executor in YARN is
// 1, but it might need to be attained differently for different cluster managers
private val tasksPerExecutor =
private val tasksPerExecutorForFullParallelism =
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)

private val executorAllocationRatio =
conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)

validateSettings()

// Number of executors to add in the next round
Expand Down Expand Up @@ -209,8 +216,13 @@ private[spark] class ExecutorAllocationManager(
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
if (tasksPerExecutor == 0) {
throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.")
if (tasksPerExecutorForFullParallelism == 0) {
throw new SparkException("spark.executor.cores must not be < spark.task.cpus.")
}

if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
throw new SparkException(
"spark.dynamicAllocation.executorAllocationRatio must be > 0 and <= 1.0")
}
}

Expand Down Expand Up @@ -273,7 +285,9 @@ private[spark] class ExecutorAllocationManager(
*/
private def maxNumExecutorsNeeded(): Int = {
val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
(numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
tasksPerExecutorForFullParallelism)
.toInt
}

private def totalRunningTasks(): Int = synchronized {
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ private[spark] class SecurityManager(
setViewAclsGroups(sparkConf.get("spark.ui.view.acls.groups", ""));
setModifyAclsGroups(sparkConf.get("spark.modify.acls.groups", ""));

private var secretKey: String = _
logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
"; ui acls " + (if (aclsOn) "enabled" else "disabled") +
"; users with view permissions: " + viewAcls.toString() +
Expand Down Expand Up @@ -321,6 +322,12 @@ private[spark] class SecurityManager(
val creds = UserGroupInformation.getCurrentUser().getCredentials()
Option(creds.getSecretKey(SECRET_LOOKUP_KEY))
.map { bytes => new String(bytes, UTF_8) }
// Secret key may not be found in current UGI's credentials.
// This happens when UGI is refreshed in the driver side by UGI's loginFromKeytab but not
// copy secret key from original UGI to the new one. This exists in ThriftServer's Hive
// logic. So as a workaround, storing secret key in a local variable to make it visible
// in different context.
.orElse(Option(secretKey))
.orElse(Option(sparkConf.getenv(ENV_AUTH_SECRET)))
.orElse(sparkConf.getOption(SPARK_AUTH_SECRET_CONF))
.getOrElse {
Expand Down Expand Up @@ -364,8 +371,8 @@ private[spark] class SecurityManager(
rnd.nextBytes(secretBytes)

val creds = new Credentials()
val secretStr = HashCodes.fromBytes(secretBytes).toString()
creds.addSecretKey(SECRET_LOOKUP_KEY, secretStr.getBytes(UTF_8))
secretKey = HashCodes.fromBytes(secretBytes).toString()
creds.addSecretKey(SECRET_LOOKUP_KEY, secretKey.getBytes(UTF_8))
UserGroupInformation.getCurrentUser().addCredentials(creds)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private[spark] abstract class RestSubmissionServer(
new HttpConnectionFactory())
connector.setHost(host)
connector.setPort(startPort)
connector.setReuseAddress(!Utils.isWindows)
server.addConnector(connector)

val mainHandler = new ServletContextHandler
Expand Down
Loading

0 comments on commit e0cebf4

Please sign in to comment.