diff --git a/.travis.yml b/.travis.yml index d9161aae..270bfecf 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,11 +10,15 @@ matrix: # Scala 2.10.5 tests: - jdk: openjdk7 scala: 2.10.5 - env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0" + env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0" AWS_JAVA_SDK_VERSION="1.10.22" # Scala 2.11 tests: - jdk: openjdk7 scala: 2.11.7 - env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0" + env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0" AWS_JAVA_SDK_VERSION="1.10.22" + # Test with an old version of the AWS Java SDK + - jdk: openjdk7 + scala: 2.11.7 + env: HADOOP_VERSION="2.2.0" SPARK_VERSION="2.0.0" SPARK_AVRO_VERSION="3.0.0" AWS_JAVA_SDK_VERSION="1.7.4" env: global: # AWS_REDSHIFT_JDBC_URL diff --git a/dev/run-tests-travis.sh b/dev/run-tests-travis.sh index 161f9e38..b0b398b7 100755 --- a/dev/run-tests-travis.sh +++ b/dev/run-tests-travis.sh @@ -7,6 +7,7 @@ sbt ++$TRAVIS_SCALA_VERSION "test:scalastyle" sbt ++$TRAVIS_SCALA_VERSION "it:scalastyle" sbt \ + -Daws.testVersion=$AWS_JAVA_SDK_VERSION \ -Dhadoop.testVersion=$HADOOP_VERSION \ -Dspark.testVersion=$SPARK_VERSION \ -DsparkAvro.testVersion=$SPARK_AVRO_VERSION \ @@ -15,6 +16,7 @@ sbt \ if [ "$TRAVIS_SECURE_ENV_VARS" == "true" ]; then sbt \ + -Daws.testVersion=$AWS_JAVA_SDK_VERSION \ -Dhadoop.testVersion=$HADOOP_VERSION \ -Dspark.testVersion=$SPARK_VERSION \ -DsparkAvro.testVersion=$SPARK_AVRO_VERSION \ diff --git a/project/SparkRedshiftBuild.scala b/project/SparkRedshiftBuild.scala index cee809a3..9766f229 100644 --- a/project/SparkRedshiftBuild.scala +++ b/project/SparkRedshiftBuild.scala @@ -14,6 +14,8 @@ * limitations under the License. */ +import scala.math.Ordering.Implicits._ +import org.apache.maven.artifact.versioning.ComparableVersion import org.scalastyle.sbt.ScalastylePlugin.rawScalastyleSettings import sbt._ import sbt.Keys._ @@ -28,6 +30,7 @@ object SparkRedshiftBuild extends Build { val testSparkVersion = settingKey[String]("Spark version to test against") val testSparkAvroVersion = settingKey[String]("spark-avro version to test against") val testHadoopVersion = settingKey[String]("Hadoop version to test against") + val testAWSJavaSDKVersion = settingKey[String]("AWS Java SDK version to test against") // Define a custom test configuration so that unit test helper classes can be re-used under // the integration tests configuration; see http://stackoverflow.com/a/20635808. @@ -48,6 +51,7 @@ object SparkRedshiftBuild extends Build { testSparkVersion := sys.props.get("spark.testVersion").getOrElse(sparkVersion.value), testSparkAvroVersion := sys.props.get("sparkAvro.testVersion").getOrElse("3.0.0"), testHadoopVersion := sys.props.get("hadoop.testVersion").getOrElse("2.2.0"), + testAWSJavaSDKVersion := sys.props.get("aws.testVersion").getOrElse("1.10.22"), spName := "databricks/spark-redshift", sparkComponents ++= Seq("sql", "hive"), spIgnoreProvided := true, @@ -58,19 +62,6 @@ object SparkRedshiftBuild extends Build { libraryDependencies ++= Seq( "org.slf4j" % "slf4j-api" % "1.7.5", "com.eclipsesource.minimal-json" % "minimal-json" % "0.9.4", - // These Amazon SDK depdencies are marked as 'provided' in order to reduce the risk of - // dependency conflicts with other user libraries. In many environments, such as EMR and - // Databricks, the Amazon SDK will already be on the classpath. In other cases, the SDK is - // likely to be provided via a dependency on the S3NativeFileSystem. If this was not marked - // as provided, then we would have to worry about the SDK's own dependencies evicting - // earlier versions of those dependencies that are required by the end user's own code. - // There's a trade-off here and we've chosen to err on the side of minimizing dependency - // conflicts for a majority of users while adding a minor inconvienece (adding one extra - // depenendecy by hand) for a smaller set of users. - // We exclude jackson-databind to avoid a conflict with Spark's version (see #104). - "com.amazonaws" % "aws-java-sdk-core" % "1.10.22" % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"), - "com.amazonaws" % "aws-java-sdk-s3" % "1.10.22" % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"), - "com.amazonaws" % "aws-java-sdk-sts" % "1.10.22" % "test" exclude("com.fasterxml.jackson.core", "jackson-databind"), // We require spark-avro, but avro-mapred must be provided to match Hadoop version. // In most cases, avro-mapred will be provided as part of the Spark assembly JAR. "com.databricks" %% "spark-avro" % "3.0.0", @@ -90,6 +81,25 @@ object SparkRedshiftBuild extends Build { "org.scalatest" %% "scalatest" % "2.2.1" % "test", "org.mockito" % "mockito-core" % "1.10.19" % "test" ), + libraryDependencies ++= (if (new ComparableVersion(testAWSJavaSDKVersion.value) < new ComparableVersion("1.8.10")) { + // These Amazon SDK depdencies are marked as 'provided' in order to reduce the risk of + // dependency conflicts with other user libraries. In many environments, such as EMR and + // Databricks, the Amazon SDK will already be on the classpath. In other cases, the SDK is + // likely to be provided via a dependency on the S3NativeFileSystem. If this was not marked + // as provided, then we would have to worry about the SDK's own dependencies evicting + // earlier versions of those dependencies that are required by the end user's own code. + // There's a trade-off here and we've chosen to err on the side of minimizing dependency + // conflicts for a majority of users while adding a minor inconvienece (adding one extra + // depenendecy by hand) for a smaller set of users. + // We exclude jackson-databind to avoid a conflict with Spark's version (see #104). + Seq("com.amazonaws" % "aws-java-sdk" % testAWSJavaSDKVersion.value % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind")) + } else { + Seq( + "com.amazonaws" % "aws-java-sdk-core" % testAWSJavaSDKVersion.value % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"), + "com.amazonaws" % "aws-java-sdk-s3" % testAWSJavaSDKVersion.value % "provided" exclude("com.fasterxml.jackson.core", "jackson-databind"), + "com.amazonaws" % "aws-java-sdk-sts" % testAWSJavaSDKVersion.value % "test" exclude("com.fasterxml.jackson.core", "jackson-databind") + ) + }), libraryDependencies ++= (if (testHadoopVersion.value.startsWith("1")) { Seq( "org.apache.hadoop" % "hadoop-client" % testHadoopVersion.value % "test" force(), diff --git a/project/plugins.sbt b/project/plugins.sbt index 8f74621a..631f2884 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -18,3 +18,5 @@ addSbtPlugin("me.lessis" % "bintray-sbt" % "0.3.0") addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.0") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") + +libraryDependencies += "org.apache.maven" % "maven-artifact" % "3.3.9" diff --git a/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala index 0ec5a7a4..e4afeb79 100644 --- a/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala +++ b/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala @@ -23,7 +23,7 @@ import java.net.URI import scala.collection.JavaConverters._ import com.amazonaws.auth.AWSCredentials -import com.amazonaws.services.s3.{AmazonS3Client, AmazonS3URI} +import com.amazonaws.services.s3.AmazonS3Client import com.eclipsesource.json.Json import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources._ @@ -124,7 +124,7 @@ private[redshift] case class RedshiftRelation( val filesToRead: Seq[String] = { val cleanedTempDirUri = Utils.fixS3Url(Utils.removeCredentialsFromURI(URI.create(tempDir)).toString) - val s3URI = new AmazonS3URI(cleanedTempDirUri) + val s3URI = Utils.createS3URI(cleanedTempDirUri) val s3Client = s3ClientFactory(creds) val is = s3Client.getObject(s3URI.getBucket, s3URI.getKey + "manifest").getObjectContent val s3Files = try { diff --git a/src/main/scala/com/databricks/spark/redshift/Utils.scala b/src/main/scala/com/databricks/spark/redshift/Utils.scala index 0414d918..52770351 100644 --- a/src/main/scala/com/databricks/spark/redshift/Utils.scala +++ b/src/main/scala/com/databricks/spark/redshift/Utils.scala @@ -59,6 +59,38 @@ private[redshift] object Utils { url.replaceAll("s3[an]://", "s3://") } + /** + * Factory method to create new S3URI in order to handle various library incompatibilities with + * older AWS Java Libraries + */ + def createS3URI(url: String): AmazonS3URI = { + try { + // try to instantiate AmazonS3URI with url + new AmazonS3URI(url) + } catch { + case e: IllegalArgumentException if e.getMessage. + startsWith("Invalid S3 URI: hostname does not appear to be a valid S3 endpoint") => { + new AmazonS3URI(addEndpointToUrl(url)) + } + } + } + + /** + * Since older AWS Java Libraries do not handle S3 urls that have just the bucket name + * as the host, add the endpoint to the host + */ + def addEndpointToUrl(url: String, domain: String = "s3.amazonaws.com"): String = { + val uri = new URI(url) + val hostWithEndpoint = uri.getHost + "." + domain + new URI(uri.getScheme, + uri.getUserInfo, + hostWithEndpoint, + uri.getPort, + uri.getPath, + uri.getQuery, + uri.getFragment).toString + } + /** * Returns a copy of the given URI with the user credentials removed. */ @@ -87,7 +119,7 @@ private[redshift] object Utils { tempDir: String, s3Client: AmazonS3Client): Unit = { try { - val s3URI = new AmazonS3URI(Utils.fixS3Url(tempDir)) + val s3URI = createS3URI(Utils.fixS3Url(tempDir)) val bucket = s3URI.getBucket assert(bucket != null, "Could not get bucket from S3 URI") val key = Option(s3URI.getKey).getOrElse("") diff --git a/src/test/scala/com/databricks/spark/redshift/UtilsSuite.scala b/src/test/scala/com/databricks/spark/redshift/UtilsSuite.scala index d83e8b2f..f9043a52 100644 --- a/src/test/scala/com/databricks/spark/redshift/UtilsSuite.scala +++ b/src/test/scala/com/databricks/spark/redshift/UtilsSuite.scala @@ -44,6 +44,11 @@ class UtilsSuite extends FunSuite with Matchers { Utils.fixS3Url("s3n://foo/bar/baz") shouldBe "s3://foo/bar/baz" } + test("addEndpointToUrl produces urls with endpoints added to host") { + Utils.addEndpointToUrl("s3a://foo/bar/12345") shouldBe "s3a://foo.s3.amazonaws.com/bar/12345" + Utils.addEndpointToUrl("s3n://foo/bar/baz") shouldBe "s3n://foo.s3.amazonaws.com/bar/baz" + } + test("temp paths are random subdirectories of root") { val root = "s3n://temp/" val firstTempPath = Utils.makeTempPath(root)