Skip to content

Commit

Permalink
Handle invalid S3 hostname exceptions with older aws-java-sdk versions
Browse files Browse the repository at this point in the history
We've seen a lot of messages lately regarding the "Invalid S3 URI: hostname does not appear to be a valid S3 endpoint" exception and so thought we should contribute our two cents and the code changes that worked for us. We've tried many approaches listed in that thread including using `spark.executor.extraClassPath` and `spark.driver.extraClassPath` environment variables to prepend to the classpath, including it in the assembled jar or as a shaded jar, Unfortunately many of these approaches failed mainly because we have on the machines themselves the older aws-java-sdk jar and that usually takes precedence. We ended up going with what Josh mentioned earlier about changing the S3 url in the spark-redshift code to add the endpoint to the host (`*.s3.amazonaws.com`).

This logic will try to instantiate a new AmazonS3URI and if it fails, it'll try to add the default S3 Amazon domain to the host.

Author: James Hou <jameshou@data101.udemy.com>
Author: James Hou <james.hou@gmail.com>
Author: Josh Rosen <joshrosen@databricks.com>

Closes #254 from jameshou/feature/add-s3-full-endpoint-v1.
  • Loading branch information
James Hou authored and JoshRosen committed Aug 9, 2016
1 parent 95d92cd commit a560a2d
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 18 deletions.
8 changes: 6 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ matrix:
# Scala 2.10.5 tests:
- jdk: openjdk7
scala: 2.10.5
env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0"
env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0" AWS_JAVA_SDK_VERSION="1.10.22"
# Scala 2.11 tests:
- jdk: openjdk7
scala: 2.11.7
env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0"
env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0" AWS_JAVA_SDK_VERSION="1.10.22"
# Test with an old version of the AWS Java SDK
- jdk: openjdk7
scala: 2.11.7
env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0" AWS_JAVA_SDK_VERSION="1.7.4"
env:
global:
# AWS_REDSHIFT_JDBC_URL
Expand Down
2 changes: 2 additions & 0 deletions dev/run-tests-travis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ sbt ++$TRAVIS_SCALA_VERSION "test:scalastyle"
sbt ++$TRAVIS_SCALA_VERSION "it:scalastyle"

sbt \
-Daws.testVersion=$AWS_JAVA_SDK_VERSION \
-Dhadoop.testVersion=$HADOOP_VERSION \
-Dspark.testVersion=$SPARK_VERSION \
-DsparkAvro.testVersion=$SPARK_AVRO_VERSION \
Expand All @@ -15,6 +16,7 @@ sbt \

if [ "$TRAVIS_SECURE_ENV_VARS" == "true" ]; then
sbt \
-Daws.testVersion=$AWS_JAVA_SDK_VERSION \
-Dhadoop.testVersion=$HADOOP_VERSION \
-Dspark.testVersion=$SPARK_VERSION \
-DsparkAvro.testVersion=$SPARK_AVRO_VERSION \
Expand Down
36 changes: 23 additions & 13 deletions project/SparkRedshiftBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* limitations under the License.
*/

import scala.math.Ordering.Implicits._
import org.apache.maven.artifact.versioning.ComparableVersion
import org.scalastyle.sbt.ScalastylePlugin.rawScalastyleSettings
import sbt._
import sbt.Keys._
Expand All @@ -28,6 +30,7 @@ object SparkRedshiftBuild extends Build {
val testSparkVersion = settingKey[String]("Spark version to test against")
val testSparkAvroVersion = settingKey[String]("spark-avro version to test against")
val testHadoopVersion = settingKey[String]("Hadoop version to test against")
val testAWSJavaSDKVersion = settingKey[String]("AWS Java SDK version to test against")

// Define a custom test configuration so that unit test helper classes can be re-used under
// the integration tests configuration; see http://stackoverflow.com/a/20635808.
Expand All @@ -48,6 +51,7 @@ object SparkRedshiftBuild extends Build {
testSparkVersion := sys.props.get("spark.testVersion").getOrElse(sparkVersion.value),
testSparkAvroVersion := sys.props.get("sparkAvro.testVersion").getOrElse("3.0.0"),
testHadoopVersion := sys.props.get("hadoop.testVersion").getOrElse("2.2.0"),
testAWSJavaSDKVersion := sys.props.get("aws.testVersion").getOrElse("1.10.22"),
spName := "databricks/spark-redshift",
sparkComponents ++= Seq("sql", "hive"),
spIgnoreProvided := true,
Expand All @@ -58,19 +62,6 @@ object SparkRedshiftBuild extends Build {
libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-api" % "1.7.5",
"com.eclipsesource.minimal-json" % "minimal-json" % "0.9.4",
// These Amazon SDK depdencies are marked as 'provided' in order to reduce the risk of
// dependency conflicts with other user libraries. In many environments, such as EMR and
// Databricks, the Amazon SDK will already be on the classpath. In other cases, the SDK is
// likely to be provided via a dependency on the S3NativeFileSystem. If this was not marked
// as provided, then we would have to worry about the SDK's own dependencies evicting
// earlier versions of those dependencies that are required by the end user's own code.
// There's a trade-off here and we've chosen to err on the side of minimizing dependency
// conflicts for a majority of users while adding a minor inconvienece (adding one extra
// depenendecy by hand) for a smaller set of users.
// We exclude jackson-databind to avoid a conflict with Spark's version (see #104).
"com.amazonaws" % "aws-java-sdk-core" % "1.10.22" % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"),
"com.amazonaws" % "aws-java-sdk-s3" % "1.10.22" % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"),
"com.amazonaws" % "aws-java-sdk-sts" % "1.10.22" % "test" exclude("com.fasterxml.jackson.core", "jackson-databind"),
// We require spark-avro, but avro-mapred must be provided to match Hadoop version.
// In most cases, avro-mapred will be provided as part of the Spark assembly JAR.
"com.databricks" %% "spark-avro" % "3.0.0",
Expand All @@ -90,6 +81,25 @@ object SparkRedshiftBuild extends Build {
"org.scalatest" %% "scalatest" % "2.2.1" % "test",
"org.mockito" % "mockito-core" % "1.10.19" % "test"
),
libraryDependencies ++= (if (new ComparableVersion(testAWSJavaSDKVersion.value) < new ComparableVersion("1.8.10")) {
// These Amazon SDK depdencies are marked as 'provided' in order to reduce the risk of
// dependency conflicts with other user libraries. In many environments, such as EMR and
// Databricks, the Amazon SDK will already be on the classpath. In other cases, the SDK is
// likely to be provided via a dependency on the S3NativeFileSystem. If this was not marked
// as provided, then we would have to worry about the SDK's own dependencies evicting
// earlier versions of those dependencies that are required by the end user's own code.
// There's a trade-off here and we've chosen to err on the side of minimizing dependency
// conflicts for a majority of users while adding a minor inconvienece (adding one extra
// depenendecy by hand) for a smaller set of users.
// We exclude jackson-databind to avoid a conflict with Spark's version (see #104).
Seq("com.amazonaws" % "aws-java-sdk" % testAWSJavaSDKVersion.value % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"))
} else {
Seq(
"com.amazonaws" % "aws-java-sdk-core" % testAWSJavaSDKVersion.value % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"),
"com.amazonaws" % "aws-java-sdk-s3" % testAWSJavaSDKVersion.value % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"),
"com.amazonaws" % "aws-java-sdk-sts" % testAWSJavaSDKVersion.value % "test" exclude("com.fasterxml.jackson.core", "jackson-databind")
)
}),
libraryDependencies ++= (if (testHadoopVersion.value.startsWith("1")) {
Seq(
"org.apache.hadoop" % "hadoop-client" % testHadoopVersion.value % "test" force(),
Expand Down
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.0")

addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")

libraryDependencies += "org.apache.maven" % "maven-artifact" % "3.3.9"
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.net.URI
import scala.collection.JavaConverters._

import com.amazonaws.auth.AWSCredentials
import com.amazonaws.services.s3.{AmazonS3Client, AmazonS3URI}
import com.amazonaws.services.s3.AmazonS3Client
import com.eclipsesource.json.Json
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -124,7 +124,7 @@ private[redshift] case class RedshiftRelation(
val filesToRead: Seq[String] = {
val cleanedTempDirUri =
Utils.fixS3Url(Utils.removeCredentialsFromURI(URI.create(tempDir)).toString)
val s3URI = new AmazonS3URI(cleanedTempDirUri)
val s3URI = Utils.createS3URI(cleanedTempDirUri)
val s3Client = s3ClientFactory(creds)
val is = s3Client.getObject(s3URI.getBucket, s3URI.getKey + "manifest").getObjectContent
val s3Files = try {
Expand Down
34 changes: 33 additions & 1 deletion src/main/scala/com/databricks/spark/redshift/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,38 @@ private[redshift] object Utils {
url.replaceAll("s3[an]://", "s3://")
}

/**
* Factory method to create new S3URI in order to handle various library incompatibilities with
* older AWS Java Libraries
*/
def createS3URI(url: String): AmazonS3URI = {
try {
// try to instantiate AmazonS3URI with url
new AmazonS3URI(url)
} catch {
case e: IllegalArgumentException if e.getMessage.
startsWith("Invalid S3 URI: hostname does not appear to be a valid S3 endpoint") => {
new AmazonS3URI(addEndpointToUrl(url))
}
}
}

/**
* Since older AWS Java Libraries do not handle S3 urls that have just the bucket name
* as the host, add the endpoint to the host
*/
def addEndpointToUrl(url: String, domain: String = "s3.amazonaws.com"): String = {
val uri = new URI(url)
val hostWithEndpoint = uri.getHost + "." + domain
new URI(uri.getScheme,
uri.getUserInfo,
hostWithEndpoint,
uri.getPort,
uri.getPath,
uri.getQuery,
uri.getFragment).toString
}

/**
* Returns a copy of the given URI with the user credentials removed.
*/
Expand Down Expand Up @@ -87,7 +119,7 @@ private[redshift] object Utils {
tempDir: String,
s3Client: AmazonS3Client): Unit = {
try {
val s3URI = new AmazonS3URI(Utils.fixS3Url(tempDir))
val s3URI = createS3URI(Utils.fixS3Url(tempDir))
val bucket = s3URI.getBucket
assert(bucket != null, "Could not get bucket from S3 URI")
val key = Option(s3URI.getKey).getOrElse("")
Expand Down
5 changes: 5 additions & 0 deletions src/test/scala/com/databricks/spark/redshift/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class UtilsSuite extends FunSuite with Matchers {
Utils.fixS3Url("s3n://foo/bar/baz") shouldBe "s3://foo/bar/baz"
}

test("addEndpointToUrl produces urls with endpoints added to host") {
Utils.addEndpointToUrl("s3a://foo/bar/12345") shouldBe "s3a://foo.s3.amazonaws.com/bar/12345"
Utils.addEndpointToUrl("s3n://foo/bar/baz") shouldBe "s3n://foo.s3.amazonaws.com/bar/baz"
}

test("temp paths are random subdirectories of root") {
val root = "s3n://temp/"
val firstTempPath = Utils.makeTempPath(root)
Expand Down

0 comments on commit a560a2d

Please sign in to comment.