diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index e6d29d04acbf3..54608d203133c 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -17,9 +17,9 @@ apacheds-kerberos-codec-2.0.0-M15.jar
api-asn1-api-1.0.0-M20.jar
api-util-1.0.0-M20.jar
arpack_combined_all-0.1.jar
-arrow-format-0.12.0.jar
-arrow-memory-0.12.0.jar
-arrow-vector-0.12.0.jar
+arrow-format-0.15.1.jar
+arrow-memory-0.15.1.jar
+arrow-vector-0.15.1.jar
audience-annotations-0.5.0.jar
automaton-1.11-8.jar
avro-1.8.2.jar
@@ -83,7 +83,6 @@ hadoop-yarn-server-web-proxy-2.7.4.jar
hk2-api-2.5.0.jar
hk2-locator-2.5.0.jar
hk2-utils-2.5.0.jar
-hppc-0.7.2.jar
htrace-core-3.1.0-incubating.jar
httpclient-4.5.6.jar
httpcore-4.4.10.jar
diff --git a/dev/deps/spark-deps-hadoop-3.2 b/dev/deps/spark-deps-hadoop-3.2
index 8f1e7fe125b9f..917fde61fad1a 100644
--- a/dev/deps/spark-deps-hadoop-3.2
+++ b/dev/deps/spark-deps-hadoop-3.2
@@ -12,9 +12,9 @@ antlr4-runtime-4.7.1.jar
aopalliance-1.0.jar
aopalliance-repackaged-2.5.0.jar
arpack_combined_all-0.1.jar
-arrow-format-0.12.0.jar
-arrow-memory-0.12.0.jar
-arrow-vector-0.12.0.jar
+arrow-format-0.15.1.jar
+arrow-memory-0.15.1.jar
+arrow-vector-0.15.1.jar
audience-annotations-0.5.0.jar
automaton-1.11-8.jar
avro-1.8.2.jar
@@ -96,7 +96,6 @@ hive-vector-code-gen-2.3.6.jar
hk2-api-2.5.0.jar
hk2-locator-2.5.0.jar
hk2-utils-2.5.0.jar
-hppc-0.7.2.jar
htrace-core4-4.1.0-incubating.jar
httpclient-4.5.6.jar
httpcore-4.4.10.jar
diff --git a/pom.xml b/pom.xml
index 5110285547ab3..a6a82b3339d08 100644
--- a/pom.xml
+++ b/pom.xml
@@ -200,9 +200,9 @@
1.0.0
- 0.12.0
+ 0.15.1
${java.home}
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index 83afafdd8b138..4260c06f06060 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -160,9 +160,10 @@ def require_minimum_pyarrow_version():
""" Raise ImportError if minimum version of pyarrow is not installed
"""
# TODO(HyukjinKwon): Relocate and deduplicate the version specification.
- minimum_pyarrow_version = "0.12.1"
+ minimum_pyarrow_version = "0.15.1"
from distutils.version import LooseVersion
+ import os
try:
import pyarrow
have_arrow = True
@@ -174,6 +175,9 @@ def require_minimum_pyarrow_version():
if LooseVersion(pyarrow.__version__) < LooseVersion(minimum_pyarrow_version):
raise ImportError("PyArrow >= %s must be installed; however, "
"your version was %s." % (minimum_pyarrow_version, pyarrow.__version__))
+ if os.environ.get("ARROW_PRE_0_15_IPC_FORMAT", "0") == "1":
+ raise RuntimeError("Arrow legacy IPC format is not supported in PySpark, "
+ "please unset ARROW_PRE_0_15_IPC_FORMAT")
def require_test_compiled():
diff --git a/python/setup.py b/python/setup.py
index 092bdd3f90117..138161ff13b41 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -105,7 +105,7 @@ def _supports_symlinks():
# For Arrow, you should also check ./pom.xml and ensure there are no breaking changes in the
# binary format protocol with the Java version, see ARROW_HOME/format/* for specifications.
_minimum_pandas_version = "0.23.2"
-_minimum_pyarrow_version = "0.12.1"
+_minimum_pyarrow_version = "0.15.1"
try:
# We copy the shell script to be under pyspark/python/pyspark so that the launcher scripts
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
index 1a6f4acb63521..d1076d9d0156c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
@@ -26,7 +26,7 @@ import org.apache.arrow.flatbuf.MessageHeader
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector._
import org.apache.arrow.vector.ipc.{ArrowStreamWriter, ReadChannel, WriteChannel}
-import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, MessageSerializer}
+import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, IpcOption, MessageSerializer}
import org.apache.spark.TaskContext
import org.apache.spark.api.java.JavaRDD
@@ -64,7 +64,7 @@ private[sql] class ArrowBatchStreamWriter(
* End the Arrow stream, does not close output stream.
*/
def end(): Unit = {
- ArrowStreamWriter.writeEndOfStream(writeChannel)
+ ArrowStreamWriter.writeEndOfStream(writeChannel, new IpcOption)
}
}
@@ -251,8 +251,8 @@ private[sql] object ArrowConverters {
// Only care about RecordBatch messages, skip Schema and unsupported Dictionary messages
if (msgMetadata.getMessage.headerType() == MessageHeader.RecordBatch) {
- // Buffer backed output large enough to hold the complete serialized message
- val bbout = new ByteBufferOutputStream(4 + msgMetadata.getMessageLength + bodyLength)
+ // Buffer backed output large enough to hold 8-byte length + complete serialized message
+ val bbout = new ByteBufferOutputStream(8 + msgMetadata.getMessageLength + bodyLength)
// Write message metadata to ByteBuffer output stream
MessageSerializer.writeMessageBuffer(