Skip to content

Commit

Permalink
[SPARK-33036][SQL] Refactor RewriteCorrelatedScalarSubquery code to r…
Browse files Browse the repository at this point in the history
…eplace exprIds in a bottom-up manner

### What changes were proposed in this pull request?

This PR intends to refactor code in `RewriteCorrelatedScalarSubquery` for replacing `ExprId`s in a bottom-up manner instead of doing in a top-down one.

This PR comes from the talk with cloud-fan in apache/spark#29585 (comment).

### Why are the changes needed?

To improve code.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #29913 from maropu/RefactorRewriteCorrelatedScalarSubquery.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
  • Loading branch information
a0x8o committed Oct 7, 2020
1 parent 30eae83 commit dd79b9a
Show file tree
Hide file tree
Showing 27 changed files with 375 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,13 @@ abstract class DockerJDBCIntegrationSuite extends SharedSparkSession with Eventu
val connectionTimeout = timeout(2.minutes)

private var docker: DockerClient = _
protected var externalPort: Int = _
// Configure networking (necessary for boot2docker / Docker Machine)
protected lazy val externalPort: Int = {
val sock = new ServerSocket(0)
val port = sock.getLocalPort
sock.close()
port
}
private var containerId: String = _
protected var jdbcUrl: String = _

Expand All @@ -122,13 +128,6 @@ abstract class DockerJDBCIntegrationSuite extends SharedSparkSession with Eventu
log.warn(s"Docker image ${db.imageName} not found; pulling image from registry")
docker.pull(db.imageName)
}
// Configure networking (necessary for boot2docker / Docker Machine)
externalPort = {
val sock = new ServerSocket(0)
val port = sock.getLocalPort
sock.close()
port
}
val hostConfigBuilder = HostConfig.builder()
.privileged(db.privileged)
.networkMode("bridge")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc.v2

import java.sql.Connection

import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.tags.DockerTest

/**
* The following would be the steps to test this
* 1. Build Oracle database in Docker, please refer below link about how to.
* https://github.com/oracle/docker-images/blob/master/OracleDatabase/SingleInstance/README.md
* 2. export ORACLE_DOCKER_IMAGE_NAME=$ORACLE_DOCKER_IMAGE_NAME
* Pull oracle $ORACLE_DOCKER_IMAGE_NAME image - docker pull $ORACLE_DOCKER_IMAGE_NAME
* 3. Start docker - sudo service docker start
* 4. Run spark test - ./build/sbt -Pdocker-integration-tests
* "test-only org.apache.spark.sql.jdbc.v2.OracleIntegrationSuite"
*
* An actual sequence of commands to run the test is as follows
*
* $ git clone https://github.com/oracle/docker-images.git
* // Head SHA: 3e352a22618070595f823977a0fd1a3a8071a83c
* $ cd docker-images/OracleDatabase/SingleInstance/dockerfiles
* $ ./buildDockerImage.sh -v 18.4.0 -x
* $ export ORACLE_DOCKER_IMAGE_NAME=oracle/database:18.4.0-xe
* $ cd $SPARK_HOME
* $ ./build/sbt -Pdocker-integration-tests
* "test-only org.apache.spark.sql.jdbc.v2.OracleIntegrationSuite"
*
* It has been validated with 18.4.0 Express Edition.
*/
@DockerTest
class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSparkSession {
override val db = new DatabaseOnDocker {
override val imageName = sys.env("ORACLE_DOCKER_IMAGE_NAME")
override val env = Map(
"ORACLE_PWD" -> "oracle"
)
override val usesIpc = false
override val jdbcPort: Int = 1521
override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:oracle:thin:system/oracle@//$ip:$port/xe"
}

override def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.catalog.oracle", classOf[JDBCTableCatalog].getName)
.set("spark.sql.catalog.oracle.url", db.getJdbcUrl(dockerIp, externalPort))

override val connectionTimeout = timeout(7.minutes)
override def dataPreparation(conn: Connection): Unit = {}

test("SPARK-33034: ALTER TABLE ... add new columns") {
withTable("oracle.alt_table") {
sql("CREATE TABLE oracle.alt_table (ID STRING) USING _")
sql("ALTER TABLE oracle.alt_table ADD COLUMNS (C1 STRING, C2 STRING)")
var t = spark.table("oracle.alt_table")
var expectedSchema = new StructType()
.add("ID", StringType)
.add("C1", StringType)
.add("C2", StringType)
assert(t.schema === expectedSchema)
sql("ALTER TABLE oracle.alt_table ADD COLUMNS (C3 STRING)")
t = spark.table("oracle.alt_table")
expectedSchema = expectedSchema.add("C3", StringType)
assert(t.schema === expectedSchema)
// Add already existing column
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE oracle.alt_table ADD COLUMNS (C3 DOUBLE)")
}.getMessage
assert(msg.contains("Cannot add column, because C3 already exists"))
}
// Add a column to not existing table
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE oracle.not_existing_table ADD COLUMNS (C4 STRING)")
}.getMessage
assert(msg.contains("Table not found"))
}

test("SPARK-33034: ALTER TABLE ... update column type") {
withTable("oracle.alt_table") {
sql("CREATE TABLE oracle.alt_table (ID INTEGER) USING _")
sql("ALTER TABLE oracle.alt_table ALTER COLUMN id TYPE STRING")
val t = spark.table("oracle.alt_table")
val expectedSchema = new StructType().add("ID", StringType)
assert(t.schema === expectedSchema)
// Update column type from STRING to INTEGER
val msg1 = intercept[AnalysisException] {
sql("ALTER TABLE oracle.alt_table ALTER COLUMN id TYPE INTEGER")
}.getMessage
assert(msg1.contains("Cannot update alt_table field ID: string cannot be cast to int"))
// Update not existing column
val msg2 = intercept[AnalysisException] {
sql("ALTER TABLE oracle.alt_table ALTER COLUMN bad_column TYPE DOUBLE")
}.getMessage
assert(msg2.contains("Cannot update missing field bad_column"))
// Update column to wrong type
val msg3 = intercept[ParseException] {
sql("ALTER TABLE oracle.alt_table ALTER COLUMN id TYPE bad_type")
}.getMessage
assert(msg3.contains("DataType bad_type is not supported"))
}
// Update column type in not existing table
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE oracle.not_existing_table ALTER COLUMN id TYPE DOUBLE")
}.getMessage
assert(msg.contains("Table not found"))
}

test("SPARK-33034: ALTER TABLE ... update column nullability") {
withTable("oracle.alt_table") {
sql("CREATE TABLE oracle.alt_table (ID STRING NOT NULL) USING _")
sql("ALTER TABLE oracle.alt_table ALTER COLUMN ID DROP NOT NULL")
val t = spark.table("oracle.alt_table")
val expectedSchema = new StructType().add("ID", StringType, nullable = true)
assert(t.schema === expectedSchema)
// Update nullability of not existing column
val msg = intercept[AnalysisException] {
sql("ALTER TABLE oracle.alt_table ALTER COLUMN bad_column DROP NOT NULL")
}.getMessage
assert(msg.contains("Cannot update missing field bad_column"))
}
// Update column nullability in not existing table
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE oracle.not_existing_table ALTER COLUMN ID DROP NOT NULL")
}.getMessage
assert(msg.contains("Table not found"))
}
}
2 changes: 1 addition & 1 deletion python/docs/source/development/debugging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ with JVM. Profiling and debugging JVM is described at `Useful Developer Tools <h

Note that,

- If you are running locally, you can directly debug the driver side via using your IDE without the remote debug feature. Setting PySpark with IDEs is documented `here <setting.html#setting-up-pyspark>`__.
- If you are running locally, you can directly debug the driver side via using your IDE without the remote debug feature. Setting PySpark with IDEs is documented `here <setting_ide.rst#pycharm>`__.
- *There are many other ways of debugging PySpark applications*. For example, you can remotely debug by using the open source `Remote Debugger <https://www.pydev.org/manual_adv_remote_debugger.html>`_ instead of using PyCharm Professional documented here.


Expand Down
2 changes: 1 addition & 1 deletion python/docs/source/development/setting_ide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ Let's go to the path ``python/pyspark/tests`` in PyCharm and try to run the any
You might can see the ``KeyError: 'SPARK_HOME'`` because the environment variable has not been set yet.

Go **Run -> Edit Configurations**, and set the environment variables as below.
Please make sure to specify your own path for ``SPARK_HOME`` rather than ``/.../spark``. After completing the variable, click **Okay** to apply the changes.
Please make sure to specify your own path for ``SPARK_HOME`` rather than ``/.../spark``. After completing the variable, click **OK** to apply the changes.

.. image:: ../../../../docs/img/pycharm-with-pyspark2.png
:alt: Setting up SPARK_HOME
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Spark Context APIs
SparkContext.defaultParallelism
SparkContext.dump_profiles
SparkContext.emptyRDD
SparkContext.getCheckpointDir
SparkContext.getConf
SparkContext.getLocalProperty
SparkContext.getOrCreate
Expand Down
5 changes: 4 additions & 1 deletion python/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,8 @@ ignore_missing_imports = True
[mypy-pandas.*]
ignore_missing_imports = True

[mypy-pyarrow]
[mypy-pyarrow.*]
ignore_missing_imports = True

[mypy-psutil.*]
ignore_missing_imports = True
4 changes: 3 additions & 1 deletion python/pyspark/accumulators.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.

from typing import Callable, Generic, Tuple, Type, TypeVar
from typing import Callable, Dict, Generic, Tuple, Type, TypeVar

import socketserver.BaseRequestHandler # type: ignore

Expand All @@ -27,6 +27,8 @@ U = TypeVar("U", bound=SupportsIAdd)

import socketserver as SocketServer

_accumulatorRegistry: Dict[int, Accumulator]

class Accumulator(Generic[T]):
aid: int
accum_param: AccumulatorParam[T]
Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/broadcast.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
# under the License.

import threading
from typing import Any, Generic, Optional, TypeVar
from typing import Any, Dict, Generic, Optional, TypeVar

T = TypeVar("T")

_broadcastRegistry: Dict[int, Broadcast]

class Broadcast(Generic[T]):
def __init__(
self,
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ def dumps(self, obj):

# Hack namedtuple, make it picklable

__cls = {}
__cls = {} # type: ignore


def _restore(name, fields, value):
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
if os.environ.get("SPARK_EXECUTOR_URI"):
SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])

SparkContext._ensure_initialized()
SparkContext._ensure_initialized() # type: ignore

try:
spark = SparkSession._create_shell_session()
spark = SparkSession._create_shell_session() # type: ignore
except Exception:
import sys
import traceback
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from py4j.clientserver import ClientServer

__all__ = []
__all__ = [] # type: ignore


def print_exec(stream):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
}
RenameTable(catalog.asTableCatalog, oldName.asIdentifier, newNameParts.asIdentifier)

case DescribeColumnStatement(
NonSessionCatalogAndTable(catalog, tbl), colNameParts, isExtended) =>
throw new AnalysisException("Describing columns is not supported for v2 tables.")

case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
Expand Down
Loading

0 comments on commit dd79b9a

Please sign in to comment.