From a560a2df4f23664c746279999caf3640158b09a8 Mon Sep 17 00:00:00 2001 From: James Hou Date: Mon, 8 Aug 2016 18:02:43 -0700 Subject: [PATCH] Handle invalid S3 hostname exceptions with older aws-java-sdk versions We've seen a lot of messages lately regarding the "Invalid S3 URI: hostname does not appear to be a valid S3 endpoint" exception and so thought we should contribute our two cents and the code changes that worked for us. We've tried many approaches listed in that thread including using `spark.executor.extraClassPath` and `spark.driver.extraClassPath` environment variables to prepend to the classpath, including it in the assembled jar or as a shaded jar, Unfortunately many of these approaches failed mainly because we have on the machines themselves the older aws-java-sdk jar and that usually takes precedence. We ended up going with what Josh mentioned earlier about changing the S3 url in the spark-redshift code to add the endpoint to the host (`*.s3.amazonaws.com`). This logic will try to instantiate a new AmazonS3URI and if it fails, it'll try to add the default S3 Amazon domain to the host. Author: James Hou Author: James Hou Author: Josh Rosen Closes #254 from jameshou/feature/add-s3-full-endpoint-v1. --- .travis.yml | 8 +++-- dev/run-tests-travis.sh | 2 ++ project/SparkRedshiftBuild.scala | 36 ++++++++++++------- project/plugins.sbt | 2 ++ .../spark/redshift/RedshiftRelation.scala | 4 +-- .../com/databricks/spark/redshift/Utils.scala | 34 +++++++++++++++++- .../spark/redshift/UtilsSuite.scala | 5 +++ 7 files changed, 73 insertions(+), 18 deletions(-) diff --git a/.travis.yml b/.travis.yml index d9161aae..270bfecf 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,11 +10,15 @@ matrix: # Scala 2.10.5 tests: - jdk: openjdk7 scala: 2.10.5 - env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0" + env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0" AWS_JAVA_SDK_VERSION="1.10.22" # Scala 2.11 tests: - jdk: openjdk7 scala: 2.11.7 - env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0" + env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0" AWS_JAVA_SDK_VERSION="1.10.22" + # Test with an old version of the AWS Java SDK + - jdk: openjdk7 + scala: 2.11.7 + env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0" AWS_JAVA_SDK_VERSION="1.7.4" env: global: # AWS_REDSHIFT_JDBC_URL diff --git a/dev/run-tests-travis.sh b/dev/run-tests-travis.sh index 161f9e38..b0b398b7 100755 --- a/dev/run-tests-travis.sh +++ b/dev/run-tests-travis.sh @@ -7,6 +7,7 @@ sbt ++$TRAVIS_SCALA_VERSION "test:scalastyle" sbt ++$TRAVIS_SCALA_VERSION "it:scalastyle" sbt \ + -Daws.testVersion=$AWS_JAVA_SDK_VERSION \ -Dhadoop.testVersion=$HADOOP_VERSION \ -Dspark.testVersion=$SPARK_VERSION \ -DsparkAvro.testVersion=$SPARK_AVRO_VERSION \ @@ -15,6 +16,7 @@ sbt \ if [ "$TRAVIS_SECURE_ENV_VARS" == "true" ]; then sbt \ + -Daws.testVersion=$AWS_JAVA_SDK_VERSION \ -Dhadoop.testVersion=$HADOOP_VERSION \ -Dspark.testVersion=$SPARK_VERSION \ -DsparkAvro.testVersion=$SPARK_AVRO_VERSION \ diff --git a/project/SparkRedshiftBuild.scala b/project/SparkRedshiftBuild.scala index cee809a3..9766f229 100644 --- a/project/SparkRedshiftBuild.scala +++ b/project/SparkRedshiftBuild.scala @@ -14,6 +14,8 @@ * limitations under the License. */ +import scala.math.Ordering.Implicits._ +import org.apache.maven.artifact.versioning.ComparableVersion import org.scalastyle.sbt.ScalastylePlugin.rawScalastyleSettings import sbt._ import sbt.Keys._ @@ -28,6 +30,7 @@ object SparkRedshiftBuild extends Build { val testSparkVersion = settingKey[String]("Spark version to test against") val testSparkAvroVersion = settingKey[String]("spark-avro version to test against") val testHadoopVersion = settingKey[String]("Hadoop version to test against") + val testAWSJavaSDKVersion = settingKey[String]("AWS Java SDK version to test against") // Define a custom test configuration so that unit test helper classes can be re-used under // the integration tests configuration; see http://stackoverflow.com/a/20635808. @@ -48,6 +51,7 @@ object SparkRedshiftBuild extends Build { testSparkVersion := sys.props.get("spark.testVersion").getOrElse(sparkVersion.value), testSparkAvroVersion := sys.props.get("sparkAvro.testVersion").getOrElse("3.0.0"), testHadoopVersion := sys.props.get("hadoop.testVersion").getOrElse("2.2.0"), + testAWSJavaSDKVersion := sys.props.get("aws.testVersion").getOrElse("1.10.22"), spName := "databricks/spark-redshift", sparkComponents ++= Seq("sql", "hive"), spIgnoreProvided := true, @@ -58,19 +62,6 @@ object SparkRedshiftBuild extends Build { libraryDependencies ++= Seq( "org.slf4j" % "slf4j-api" % "1.7.5", "com.eclipsesource.minimal-json" % "minimal-json" % "0.9.4", - // These Amazon SDK depdencies are marked as 'provided' in order to reduce the risk of - // dependency conflicts with other user libraries. In many environments, such as EMR and - // Databricks, the Amazon SDK will already be on the classpath. In other cases, the SDK is - // likely to be provided via a dependency on the S3NativeFileSystem. If this was not marked - // as provided, then we would have to worry about the SDK's own dependencies evicting - // earlier versions of those dependencies that are required by the end user's own code. - // There's a trade-off here and we've chosen to err on the side of minimizing dependency - // conflicts for a majority of users while adding a minor inconvienece (adding one extra - // depenendecy by hand) for a smaller set of users. - // We exclude jackson-databind to avoid a conflict with Spark's version (see #104). - "com.amazonaws" % "aws-java-sdk-core" % "1.10.22" % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"), - "com.amazonaws" % "aws-java-sdk-s3" % "1.10.22" % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"), - "com.amazonaws" % "aws-java-sdk-sts" % "1.10.22" % "test" exclude("com.fasterxml.jackson.core", "jackson-databind"), // We require spark-avro, but avro-mapred must be provided to match Hadoop version. // In most cases, avro-mapred will be provided as part of the Spark assembly JAR. "com.databricks" %% "spark-avro" % "3.0.0", @@ -90,6 +81,25 @@ object SparkRedshiftBuild extends Build { "org.scalatest" %% "scalatest" % "2.2.1" % "test", "org.mockito" % "mockito-core" % "1.10.19" % "test" ), + libraryDependencies ++= (if (new ComparableVersion(testAWSJavaSDKVersion.value) < new ComparableVersion("1.8.10")) { + // These Amazon SDK depdencies are marked as 'provided' in order to reduce the risk of + // dependency conflicts with other user libraries. In many environments, such as EMR and + // Databricks, the Amazon SDK will already be on the classpath. In other cases, the SDK is + // likely to be provided via a dependency on the S3NativeFileSystem. If this was not marked + // as provided, then we would have to worry about the SDK's own dependencies evicting + // earlier versions of those dependencies that are required by the end user's own code. + // There's a trade-off here and we've chosen to err on the side of minimizing dependency + // conflicts for a majority of users while adding a minor inconvienece (adding one extra + // depenendecy by hand) for a smaller set of users. + // We exclude jackson-databind to avoid a conflict with Spark's version (see #104). + Seq("com.amazonaws" % "aws-java-sdk" % testAWSJavaSDKVersion.value % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind")) + } else { + Seq( + "com.amazonaws" % "aws-java-sdk-core" % testAWSJavaSDKVersion.value % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"), + "com.amazonaws" % "aws-java-sdk-s3" % testAWSJavaSDKVersion.value % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"), + "com.amazonaws" % "aws-java-sdk-sts" % testAWSJavaSDKVersion.value % "test" exclude("com.fasterxml.jackson.core", "jackson-databind") + ) + }), libraryDependencies ++= (if (testHadoopVersion.value.startsWith("1")) { Seq( "org.apache.hadoop" % "hadoop-client" % testHadoopVersion.value % "test" force(), diff --git a/project/plugins.sbt b/project/plugins.sbt index 8f74621a..631f2884 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -18,3 +18,5 @@ addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0") addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.0") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") + +libraryDependencies += "org.apache.maven" % "maven-artifact" % "3.3.9" diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala index 0ec5a7a4..e4afeb79 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala @@ -23,7 +23,7 @@ import java.net.URI import scala.collection.JavaConverters._ import com.amazonaws.auth.AWSCredentials -import com.amazonaws.services.s3.{AmazonS3Client, AmazonS3URI} +import com.amazonaws.services.s3.AmazonS3Client import com.eclipsesource.json.Json import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources._ @@ -124,7 +124,7 @@ private[redshift] case class RedshiftRelation( val filesToRead: Seq[String] = { val cleanedTempDirUri = Utils.fixS3Url(Utils.removeCredentialsFromURI(URI.create(tempDir)).toString) - val s3URI = new AmazonS3URI(cleanedTempDirUri) + val s3URI = Utils.createS3URI(cleanedTempDirUri) val s3Client = s3ClientFactory(creds) val is = s3Client.getObject(s3URI.getBucket, s3URI.getKey + "manifest").getObjectContent val s3Files = try { diff --git a/src/main/scala/com/databricks/spark/redshift/Utils.scala b/src/main/scala/com/databricks/spark/redshift/Utils.scala index 0414d918..52770351 100644 --- a/src/main/scala/com/databricks/spark/redshift/Utils.scala +++ b/src/main/scala/com/databricks/spark/redshift/Utils.scala @@ -59,6 +59,38 @@ private[redshift] object Utils { url.replaceAll("s3[an]://", "s3://") } + /** + * Factory method to create new S3URI in order to handle various library incompatibilities with + * older AWS Java Libraries + */ + def createS3URI(url: String): AmazonS3URI = { + try { + // try to instantiate AmazonS3URI with url + new AmazonS3URI(url) + } catch { + case e: IllegalArgumentException if e.getMessage. + startsWith("Invalid S3 URI: hostname does not appear to be a valid S3 endpoint") => { + new AmazonS3URI(addEndpointToUrl(url)) + } + } + } + + /** + * Since older AWS Java Libraries do not handle S3 urls that have just the bucket name + * as the host, add the endpoint to the host + */ + def addEndpointToUrl(url: String, domain: String = "s3.amazonaws.com"): String = { + val uri = new URI(url) + val hostWithEndpoint = uri.getHost + "." + domain + new URI(uri.getScheme, + uri.getUserInfo, + hostWithEndpoint, + uri.getPort, + uri.getPath, + uri.getQuery, + uri.getFragment).toString + } + /** * Returns a copy of the given URI with the user credentials removed. */ @@ -87,7 +119,7 @@ private[redshift] object Utils { tempDir: String, s3Client: AmazonS3Client): Unit = { try { - val s3URI = new AmazonS3URI(Utils.fixS3Url(tempDir)) + val s3URI = createS3URI(Utils.fixS3Url(tempDir)) val bucket = s3URI.getBucket assert(bucket != null, "Could not get bucket from S3 URI") val key = Option(s3URI.getKey).getOrElse("") diff --git a/src/test/scala/com/databricks/spark/redshift/UtilsSuite.scala b/src/test/scala/com/databricks/spark/redshift/UtilsSuite.scala index d83e8b2f..f9043a52 100644 --- a/src/test/scala/com/databricks/spark/redshift/UtilsSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/UtilsSuite.scala @@ -44,6 +44,11 @@ class UtilsSuite extends FunSuite with Matchers { Utils.fixS3Url("s3n://foo/bar/baz") shouldBe "s3://foo/bar/baz" } + test("addEndpointToUrl produces urls with endpoints added to host") { + Utils.addEndpointToUrl("s3a://foo/bar/12345") shouldBe "s3a://foo.s3.amazonaws.com/bar/12345" + Utils.addEndpointToUrl("s3n://foo/bar/baz") shouldBe "s3n://foo.s3.amazonaws.com/bar/baz" + } + test("temp paths are random subdirectories of root") { val root = "s3n://temp/" val firstTempPath = Utils.makeTempPath(root)