Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24571][SQL] Support Char literals #4

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
3bf7691
[SPARK-24531][TESTS] Replace 2.3.0 version with 2.3.1
mgaido91 Jun 13, 2018
97097f5
A test for converting Char to String
MaxGekk Jun 13, 2018
87640c7
Support Char in StringConverter
MaxGekk Jun 13, 2018
0fb4669
Evaluate Char literal as String literal
MaxGekk Jun 13, 2018
99dfbfe
Added a test for filtering rows by using Char literal
MaxGekk Jun 14, 2018
56fd592
Cover the case of java.lang.Character
MaxGekk Jun 14, 2018
657f7be
Improving of the test
MaxGekk Jun 14, 2018
534065e
[MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInMemorySorterSuite
jiangxb1987 Jun 14, 2018
fdadc4b
[SPARK-24495][SQL] EnsureRequirement returns wrong plan when reorderi…
mgaido91 Jun 14, 2018
d3eed8f
[SPARK-24563][PYTHON] Catch TypeError when testing existence of HiveC…
icexelloss Jun 14, 2018
b8f27ae
[SPARK-24543][SQL] Support any type as DDL string for from_json's schema
MaxGekk Jun 14, 2018
18cb0c0
[SPARK-24319][SPARK SUBMIT] Fix spark-submit execution where no main …
gaborgsomogyi Jun 14, 2018
270a9a3
[SPARK-24248][K8S] Use level triggering and state reconciliation in s…
mccheah Jun 14, 2018
22daeba
[SPARK-24478][SQL] Move projection and filter push down to physical c…
rdblue Jun 15, 2018
6567fc4
[PYTHON] Fix typo in serializer exception
rberenguel Jun 15, 2018
495d8cf
[SPARK-24490][WEBUI] Use WebUI.addStaticHandler in web UIs
jaceklaskowski Jun 15, 2018
b5ccf0d
[SPARK-24396][SS][PYSPARK] Add Structured Streaming ForeachWriter for…
tdas Jun 15, 2018
90da7dc
[SPARK-24452][SQL][CORE] Avoid possible overflow in int add or multiple
kiszk Jun 15, 2018
e4fee39
[SPARK-24525][SS] Provide an option to limit number of rows in a Memo…
mukulmurthy Jun 15, 2018
0f09ab2
Merge remote-tracking branch 'origin/master' into char-to-string
MaxGekk Jun 16, 2018
4210146
Adding ticket number to test's titles
MaxGekk Jun 16, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.

(BSD 3 Clause) netlib core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
(BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.2.7 - https://github.com/jpmml/jpmml-model)
(BSD 3 Clause) jmock (org.jmock:jmock-junit4:2.8.4 - http://jmock.org/)
(BSD License) AntLR Parser Generator (antlr:antlr:2.7.7 - http://www.antlr.org/)
(BSD License) ANTLR 4.5.2-1 (org.antlr:antlr4:4.5.2-1 - http://wwww.antlr.org/)
(BSD licence) ANTLR ST4 4.0.4 (org.antlr:ST4:4.0.4 - http://www.stringtemplate.org)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
// must be stored in the same memory page.
// (8 byte key length) (key) (value) (8 byte pointer to next value)
int uaoSize = UnsafeAlignedOffset.getUaoSize();
final long recordLength = (2 * uaoSize) + klen + vlen + 8;
final long recordLength = (2L * uaoSize) + klen + vlen + 8;
if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
if (!acquireNewPage(recordLength + uaoSize)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class HistoryServer(

attachHandler(ApiRootResource.getServletHandler(this))

attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
addStaticHandler(SparkUI.STATIC_RESOURCE_DIR)

val contextHandler = new ServletContextHandler
contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class MasterWebUI(
val masterPage = new MasterPage(this)
attachPage(new ApplicationPage(this))
attachPage(masterPage)
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
addStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)
attachHandler(createRedirectHandler(
"/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
attachHandler(createRedirectHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private[deploy] class DriverRunner(
// check if attempting another run
keepTrying = supervise && exitCode != 0 && !killed
if (keepTrying) {
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000L) {
waitSeconds = 1
}
logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class WorkerWebUI(
val logPage = new LogPage(this)
attachPage(logPage)
attachPage(new WorkerPage(this))
attachHandler(createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"))
addStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE)
attachHandler(createServletHandler("/log",
(request: HttpServletRequest) => logPage.renderLog(request),
worker.securityMgr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = Math.max(1,
(1.5 * num * partsScanned / results.size).toInt - partsScanned)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4L)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ private[spark] class BlockManager(
case e: Exception if i < MAX_ATTEMPTS =>
logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
+ s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
Thread.sleep(SLEEP_TIME_SECS * 1000)
Thread.sleep(SLEEP_TIME_SECS * 1000L)
case NonFatal(e) =>
throw new SparkException("Unable to register with external shuffle server due to : " +
e.getMessage, e)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private[spark] class SparkUI private (
attachTab(new StorageTab(this, store))
attachTab(new EnvironmentTab(this, store))
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
addStaticHandler(SparkUI.STATIC_RESOURCE_DIR)
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
attachHandler(ApiRootResource.getServletHandler(this))

Expand Down
52 changes: 27 additions & 25 deletions core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,25 @@ private[spark] abstract class WebUI(
def getHandlers: Seq[ServletContextHandler] = handlers
def getSecurityManager: SecurityManager = securityManager

/** Attach a tab to this UI, along with all of its attached pages. */
def attachTab(tab: WebUITab) {
/** Attaches a tab to this UI, along with all of its attached pages. */
def attachTab(tab: WebUITab): Unit = {
tab.pages.foreach(attachPage)
tabs += tab
}

def detachTab(tab: WebUITab) {
/** Detaches a tab from this UI, along with all of its attached pages. */
def detachTab(tab: WebUITab): Unit = {
tab.pages.foreach(detachPage)
tabs -= tab
}

def detachPage(page: WebUIPage) {
/** Detaches a page from this UI, along with all of its attached handlers. */
def detachPage(page: WebUIPage): Unit = {
pageToHandlers.remove(page).foreach(_.foreach(detachHandler))
}

/** Attach a page to this UI. */
def attachPage(page: WebUIPage) {
/** Attaches a page to this UI. */
def attachPage(page: WebUIPage): Unit = {
val pagePath = "/" + page.prefix
val renderHandler = createServletHandler(pagePath,
(request: HttpServletRequest) => page.render(request), securityManager, conf, basePath)
Expand All @@ -88,41 +90,41 @@ private[spark] abstract class WebUI(
handlers += renderHandler
}

/** Attach a handler to this UI. */
def attachHandler(handler: ServletContextHandler) {
/** Attaches a handler to this UI. */
def attachHandler(handler: ServletContextHandler): Unit = {
handlers += handler
serverInfo.foreach(_.addHandler(handler))
}

/** Detach a handler from this UI. */
def detachHandler(handler: ServletContextHandler) {
/** Detaches a handler from this UI. */
def detachHandler(handler: ServletContextHandler): Unit = {
handlers -= handler
serverInfo.foreach(_.removeHandler(handler))
}

/**
* Add a handler for static content.
* Detaches the content handler at `path` URI.
*
* @param resourceBase Root of where to find resources to serve.
* @param path Path in UI where to mount the resources.
* @param path Path in UI to unmount.
*/
def addStaticHandler(resourceBase: String, path: String): Unit = {
attachHandler(JettyUtils.createStaticHandler(resourceBase, path))
def detachHandler(path: String): Unit = {
handlers.find(_.getContextPath() == path).foreach(detachHandler)
}

/**
* Remove a static content handler.
* Adds a handler for static content.
*
* @param path Path in UI to unmount.
* @param resourceBase Root of where to find resources to serve.
* @param path Path in UI where to mount the resources.
*/
def removeStaticHandler(path: String): Unit = {
handlers.find(_.getContextPath() == path).foreach(detachHandler)
def addStaticHandler(resourceBase: String, path: String = "/static"): Unit = {
attachHandler(JettyUtils.createStaticHandler(resourceBase, path))
}

/** Initialize all components of the server. */
/** A hook to initialize components of the UI */
def initialize(): Unit

/** Bind to the HTTP server behind this web interface. */
/** Binds to the HTTP server behind this web interface. */
def bind(): Unit = {
assert(serverInfo.isEmpty, s"Attempted to bind $className more than once!")
try {
Expand All @@ -136,17 +138,17 @@ private[spark] abstract class WebUI(
}
}

/** Return the url of web interface. Only valid after bind(). */
/** @return The url of web interface. Only valid after [[bind]]. */
def webUrl: String = s"http://$publicHostName:$boundPort"

/** Return the actual port to which this server is bound. Only valid after bind(). */
/** @return The actual port to which this server is bound. Only valid after [[bind]]. */
def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)

/** Stop the server behind this web interface. Only valid after bind(). */
/** Stops the server behind this web interface. Only valid after [[bind]]. */
def stop(): Unit = {
assert(serverInfo.isDefined,
s"Attempted to stop $className before binding to a server!")
serverInfo.get.stop()
serverInfo.foreach(_.stop())
}
}

Expand Down
31 changes: 28 additions & 3 deletions core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ package org.apache.spark.util

import java.util.concurrent._

import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}
import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.duration.Duration
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.util.control.NonFatal

import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}

import org.apache.spark.SparkException

private[spark] object ThreadUtils {
Expand Down Expand Up @@ -103,6 +102,22 @@ private[spark] object ThreadUtils {
executor
}

/**
* Wrapper over ScheduledThreadPoolExecutor.
*/
def newDaemonThreadPoolScheduledExecutor(threadNamePrefix: String, numThreads: Int)
: ScheduledExecutorService = {
val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(s"$threadNamePrefix-%d")
.build()
val executor = new ScheduledThreadPoolExecutor(numThreads, threadFactory)
// By default, a cancelled task is not automatically removed from the work queue until its delay
// elapses. We have to enable it manually.
executor.setRemoveOnCancelPolicy(true)
executor
}

/**
* Run a piece of code in a new thread and return the result. Exception in the new thread is
* thrown in the caller thread with an adjusted stack trace that removes references to this
Expand Down Expand Up @@ -229,4 +244,14 @@ private[spark] object ThreadUtils {
}
}
// scalastyle:on awaitready

def shutdown(
executor: ExecutorService,
gracePeriod: Duration = FiniteDuration(30, TimeUnit.SECONDS)): Unit = {
executor.shutdown()
executor.awaitTermination(gracePeriod.toMillis, TimeUnit.MILLISECONDS)
if (!executor.isShutdown) {
executor.shutdownNow()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ public int compare(
final UnsafeSorterIterator iter = sorter.getSortedIterator();
int iterLength = 0;
long prevPrefix = -1;
Arrays.sort(dataToSort);
while (iter.hasNext()) {
iter.loadNext();
final String str =
Expand Down
36 changes: 25 additions & 11 deletions launcher/src/main/java/org/apache/spark/launcher/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.launcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -54,10 +55,12 @@ public static void main(String[] argsArray) throws Exception {
String className = args.remove(0);

boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
AbstractCommandBuilder builder;
Map<String, String> env = new HashMap<>();
List<String> cmd;
if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
try {
builder = new SparkSubmitCommandBuilder(args);
AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(args);
cmd = buildCommand(builder, env, printLaunchCommand);
} catch (IllegalArgumentException e) {
printLaunchCommand = false;
System.err.println("Error: " + e.getMessage());
Expand All @@ -76,17 +79,12 @@ public static void main(String[] argsArray) throws Exception {
help.add(parser.className);
}
help.add(parser.USAGE_ERROR);
builder = new SparkSubmitCommandBuilder(help);
AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(help);
cmd = buildCommand(builder, env, printLaunchCommand);
}
} else {
builder = new SparkClassCommandBuilder(className, args);
}

Map<String, String> env = new HashMap<>();
List<String> cmd = builder.buildCommand(env);
if (printLaunchCommand) {
System.err.println("Spark Command: " + join(" ", cmd));
System.err.println("========================================");
AbstractCommandBuilder builder = new SparkClassCommandBuilder(className, args);
cmd = buildCommand(builder, env, printLaunchCommand);
}

if (isWindows()) {
Expand All @@ -101,6 +99,22 @@ public static void main(String[] argsArray) throws Exception {
}
}

/**
* Prepare spark commands with the appropriate command builder.
* If printLaunchCommand is set then the commands will be printed to the stderr.
*/
private static List<String> buildCommand(
AbstractCommandBuilder builder,
Map<String, String> env,
boolean printLaunchCommand) throws IOException, IllegalArgumentException {
List<String> cmd = builder.buildCommand(env);
if (printLaunchCommand) {
System.err.println("Spark Command: " + join(" ", cmd));
System.err.println("========================================");
}
return cmd;
}

/**
* Prepare a command line for execution from a Windows batch script.
*
Expand Down
Loading