Skip to content

Commit

Permalink
Merge branch 'master' into cast
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed May 17, 2019
2 parents e558ad4 + e39e97b commit 14e3de3
Show file tree
Hide file tree
Showing 903 changed files with 81,887 additions and 4,976 deletions.
4 changes: 2 additions & 2 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ Python Software Foundation License
----------------------------------

pyspark/heapq3.py

python/docs/_static/copybutton.js

BSD 3-Clause
------------
Expand Down Expand Up @@ -258,4 +258,4 @@ data/mllib/images/kittens/29.5.a_b_EGDP022204.jpg
data/mllib/images/kittens/54893.jpg
data/mllib/images/kittens/DP153539.jpg
data/mllib/images/kittens/DP802813.jpg
data/mllib/images/multi-channel/chr30.4.184.jpg
data/mllib/images/multi-channel/chr30.4.184.jpg
2 changes: 0 additions & 2 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ com.google.code.gson:gson
com.google.inject:guice
com.google.inject.extensions:guice-servlet
com.twitter:parquet-hadoop-bundle
commons-beanutils:commons-beanutils-core
commons-cli:commons-cli
commons-dbcp:commons-dbcp
commons-io:commons-io
Expand Down Expand Up @@ -490,7 +489,6 @@ Eclipse Distribution License (EDL) 1.0
org.glassfish.jaxb:jaxb-runtime
jakarta.xml.bind:jakarta.xml.bind-api
com.sun.istack:istack-commons-runtime
jakarta.activation:jakarta.activation-api


Mozilla Public License (MPL) 1.1
Expand Down
12 changes: 10 additions & 2 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -3589,6 +3589,8 @@ setMethod("element_at",

#' @details
#' \code{explode}: Creates a new row for each element in the given array or map column.
#' Uses the default column name \code{col} for elements in the array and
#' \code{key} and \code{value} for elements in the map unless specified otherwise.
#'
#' @rdname column_collection_functions
#' @aliases explode explode,Column-method
Expand Down Expand Up @@ -3649,7 +3651,9 @@ setMethod("sort_array",

#' @details
#' \code{posexplode}: Creates a new row for each element with position in the given array
#' or map column.
#' or map column. Uses the default column name \code{pos} for position, and \code{col}
#' for elements in the array and \code{key} and \code{value} for elements in the map
#' unless specified otherwise.
#'
#' @rdname column_collection_functions
#' @aliases posexplode posexplode,Column-method
Expand Down Expand Up @@ -3790,7 +3794,8 @@ setMethod("repeat_string",
#' \code{explode}: Creates a new row for each element in the given array or map column.
#' Unlike \code{explode}, if the array/map is \code{null} or empty
#' then \code{null} is produced.
#'
#' Uses the default column name \code{col} for elements in the array and
#' \code{key} and \code{value} for elements in the map unless specified otherwise.
#'
#' @rdname column_collection_functions
#' @aliases explode_outer explode_outer,Column-method
Expand All @@ -3815,6 +3820,9 @@ setMethod("explode_outer",
#' \code{posexplode_outer}: Creates a new row for each element with position in the given
#' array or map column. Unlike \code{posexplode}, if the array/map is \code{null} or empty
#' then the row (\code{null}, \code{null}) is produced.
#' Uses the default column name \code{pos} for position, and \code{col}
#' for elements in the array and \code{key} and \code{value} for elements in the map
#' unless specified otherwise.
#'
#' @rdname column_collection_functions
#' @aliases posexplode_outer posexplode_outer,Column-method
Expand Down
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
# Apache Spark

[![Jenkins Build](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/badge/icon)](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7)
[![AppVeyor Build](https://img.shields.io/appveyor/ci/ApacheSoftwareFoundation/spark/master.svg?style=plastic&logo=appveyor)](https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark)
[![PySpark Coverage](https://img.shields.io/badge/dynamic/xml.svg?label=pyspark%20coverage&url=https%3A%2F%2Fspark-test.github.io%2Fpyspark-coverage-site&query=%2Fhtml%2Fbody%2Fdiv%5B1%5D%2Fdiv%2Fh1%2Fspan&colorB=brightgreen&style=plastic)](https://spark-test.github.io/pyspark-coverage-site)

Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a unified analytics engine for large-scale data processing. It provides
high-level APIs in Scala, Java, Python, and R, and an optimized engine that
supports general computation graphs for data analysis. It also supports a
rich set of higher-level tools including Spark SQL for SQL and DataFrames,
MLlib for machine learning, GraphX for graph processing,
and Spark Streaming for stream processing.
and Structured Streaming for stream processing.

<http://spark.apache.org/>

[![Jenkins Build](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/badge/icon)](https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7)
[![AppVeyor Build](https://img.shields.io/appveyor/ci/ApacheSoftwareFoundation/spark/master.svg?style=plastic&logo=appveyor)](https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark)
[![PySpark Coverage](https://img.shields.io/badge/dynamic/xml.svg?label=pyspark%20coverage&url=https%3A%2F%2Fspark-test.github.io%2Fpyspark-coverage-site&query=%2Fhtml%2Fbody%2Fdiv%5B1%5D%2Fdiv%2Fh1%2Fspan&colorB=brightgreen&style=plastic)](https://spark-test.github.io/pyspark-coverage-site)


## Online Documentation

Expand Down Expand Up @@ -41,19 +41,19 @@ The easiest way to start using Spark is through the Scala shell:

./bin/spark-shell

Try the following command, which should return 1000:
Try the following command, which should return 1,000,000,000:

scala> sc.parallelize(1 to 1000).count()
scala> spark.range(1000 * 1000 * 1000).count()

## Interactive Python Shell

Alternatively, if you prefer Python, you can use the Python shell:

./bin/pyspark

And run the following command, which should also return 1000:
And run the following command, which should also return 1,000,000,000:

>>> sc.parallelize(range(1000)).count()
>>> spark.range(1000 * 1000 * 1000).count()

## Example Programs

Expand Down
2 changes: 1 addition & 1 deletion bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ do
if ! minikube status 1>/dev/null; then
error "Cannot contact minikube. Make sure it's running."
fi
eval $(minikube docker-env)
eval $(minikube docker-env --shell bash)
;;
u) SPARK_UID=${OPTARG};;
esac
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
*/
public class RetryingBlockFetcherSuite {

ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));
private final ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
private final ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
private final ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));

@Test
public void testNoFailures() throws IOException, InterruptedException {
Expand Down Expand Up @@ -291,7 +291,7 @@ private static void performInteractions(List<? extends Map<String, Object>> inte
}

assertNotNull(stub);
stub.when(fetchStarter).createAndStart(any(), anyObject());
stub.when(fetchStarter).createAndStart(any(), any());
String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
}
Expand Down
46 changes: 45 additions & 1 deletion common/network-yarn/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<!-- Make sure all Hadoop dependencies are provided to avoid repackaging. -->
<hadoop.deps.scope>provided</hadoop.deps.scope>
<shuffle.jar>${project.build.directory}/scala-${scala.binary.version}/spark-${project.version}-yarn-shuffle.jar</shuffle.jar>
<shade>org/spark_project/</shade>
<shade>org/sparkproject/</shade>
</properties>

<dependencies>
Expand Down Expand Up @@ -128,6 +128,50 @@
</execution>
</executions>
</plugin>
<!-- shade the native netty libs as well -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>regex-property</id>
<goals>
<goal>regex-property</goal>
</goals>
<configuration>
<name>spark.shade.native.packageName</name>
<value>${spark.shade.packageName}</value>
<regex>\.</regex>
<replacement>_</replacement>
<failIfNoMatch>true</failIfNoMatch>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>unpack</id>
<phase>package</phase>
<configuration>
<target>
<echo message="Shade netty native libraries to ${spark.shade.native.packageName}" />
<unzip src="${shuffle.jar}" dest="${project.build.directory}/exploded/" />
<move file="${project.build.directory}/exploded/META-INF/native/libnetty_transport_native_epoll_x86_64.so"
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_epoll_x86_64.so" />
<move file="${project.build.directory}/exploded/META-INF/native/libnetty_transport_native_kqueue_x86_64.jnilib"
tofile="${project.build.directory}/exploded/META-INF/native/lib${spark.shade.native.packageName}_netty_transport_native_kqueue_x86_64.jnilib" />
<jar destfile="${shuffle.jar}" basedir="${project.build.directory}/exploded" />
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- probes to validate that those dependencies which must be shaded are -->
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.unsafe.types;

import java.io.Serializable;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -66,6 +67,10 @@ private static long toLong(String s) {
}
}

/**
* Convert a string to CalendarInterval. Return null if the input string is not a valid interval.
* This method is case-sensitive and all characters in the input string should be in lower case.
*/
public static CalendarInterval fromString(String s) {
if (s == null) {
return null;
Expand All @@ -87,6 +92,26 @@ public static CalendarInterval fromString(String s) {
}
}

/**
* Convert a string to CalendarInterval. Unlike fromString, this method is case-insensitive and
* will throw IllegalArgumentException when the input string is not a valid interval.
*
* @throws IllegalArgumentException if the string is not a valid internal.
*/
public static CalendarInterval fromCaseInsensitiveString(String s) {
if (s == null || s.trim().isEmpty()) {
throw new IllegalArgumentException("Interval cannot be null or blank.");
}
String sInLowerCase = s.trim().toLowerCase(Locale.ROOT);
String interval =
sInLowerCase.startsWith("interval ") ? sInLowerCase : "interval " + sInLowerCase;
CalendarInterval cal = fromString(interval);
if (cal == null) {
throw new IllegalArgumentException("Invalid interval: " + s);
}
return cal;
}

public static long toLongWithRange(String fieldName,
String s, long minValue, long maxValue) throws IllegalArgumentException {
long result = 0;
Expand Down Expand Up @@ -319,6 +344,8 @@ public String toString() {
appendUnit(sb, rest / MICROS_PER_MILLI, "millisecond");
rest %= MICROS_PER_MILLI;
appendUnit(sb, rest, "microsecond");
} else if (months == 0) {
sb.append(" 0 microseconds");
}

return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public void equalsTest() {
public void toStringTest() {
CalendarInterval i;

i = new CalendarInterval(0, 0);
assertEquals("interval 0 microseconds", i.toString());

i = new CalendarInterval(34, 0);
assertEquals("interval 2 years 10 months", i.toString());

Expand Down Expand Up @@ -101,6 +104,31 @@ public void fromStringTest() {
assertNull(fromString(input));
}

@Test
public void fromCaseInsensitiveStringTest() {
for (String input : new String[]{"5 MINUTES", "5 minutes", "5 Minutes"}) {
assertEquals(fromCaseInsensitiveString(input), new CalendarInterval(0, 5L * 60 * 1_000_000));
}

for (String input : new String[]{null, "", " "}) {
try {
fromCaseInsensitiveString(input);
fail("Expected to throw an exception for the invalid input");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("cannot be null or blank"));
}
}

for (String input : new String[]{"interval", "interval1 day", "foo", "foo 1 day"}) {
try {
fromCaseInsensitiveString(input);
fail("Expected to throw an exception for the invalid input");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("Invalid interval"));
}
}
}

@Test
public void fromYearMonthStringTest() {
String input;
Expand Down
4 changes: 2 additions & 2 deletions conf/log4j.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}:
log4j.logger.org.apache.spark.repl.Main=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.sparkproject.jetty=WARN
log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@
<dependency>
<groupId>net.razorvine</groupId>
<artifactId>pyrolite</artifactId>
<version>4.13</version>
<version>4.23</version>
<exclusions>
<exclusion>
<groupId>net.razorvine</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}:
log4j.logger.org.apache.spark.repl.Main=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.sparkproject.jetty=WARN
log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark
import java.util.{Properties, Timer, TimerTask}

import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.executor.TaskMetrics
Expand Down Expand Up @@ -122,7 +121,7 @@ class BarrierTaskContext private[spark] (
barrierEpoch),
// Set a fixed timeout for RPC here, so users shall get a SparkException thrown by
// BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework.
timeout = new RpcTimeout(31536000 /* = 3600 * 24 * 365 */ seconds, "barrierTimeout"))
timeout = new RpcTimeout(365.days, "barrierTimeout"))
barrierEpoch += 1
logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) finished " +
"global sync successfully, waited for " +
Expand Down
Loading

0 comments on commit 14e3de3

Please sign in to comment.