Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not able to save to redshift using spark-redshift package v 0.6 #158

Closed
wmdoble opened this issue Jan 17, 2016 · 22 comments
Closed

Not able to save to redshift using spark-redshift package v 0.6 #158

wmdoble opened this issue Jan 17, 2016 · 22 comments
Assignees
Milestone

Comments

@wmdoble
Copy link

wmdoble commented Jan 17, 2016

I'm running your tutorial for the spark-redshift package on Amazon EMR - emr-4.2.0, Spark 1.5.2, spark-redshfit 0.6.0 but I can't seem to get any tables written to Redshift. Both S3 buckets and Redshift cluster are in us-east-1. I can see the temp files created in S3, despite the Error: An error occurred while trying to read the S3 bucket lifecycle configuration com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403...)

Is this a known issue? Any best practices or workaround?

@JoshRosen
Copy link
Contributor

Can you please share more of the log? Are there any additional log messages which follow the one that you posted here?

The "The AWS Access Key Id you provided does not exist in our records" message makes this sound like some sort of credentials problem, so it would be helpful if you could let me know more about how you've configured access to S3. Specifically:

  • What's the format of your tempdir URI? Does it contain inline credentials / access keys?
  • Since you're on EMR, are you using IAM instance profiles to handle authentication to S3?
  • Did you set any credentials via Hadoop configuration?

@JoshRosen
Copy link
Contributor

Also, since you're on EMR, have you seen the "IAM instance profiles" section of https://github.com/databricks/spark-redshift#aws-credentials? If you're using instance profiles and not S3 keys, then that could explain why Spark is able to authenticate to S3 when writing the Avro file. If this is the case and you can't use keys, you'll want to use the security token service to obtain temporary credentials to pass to Redshift.

There are some examples of this at https://stackoverflow.com/questions/33797693/how-to-properly-provide-credentials-for-spark-redshift-in-emr-instances, but I haven't tried them out / vetted them for accuracy. It would be great if someone submitted a patch to automatically obtain the tokens; I don't have time to work on that feature myself, but would be glad to review pull requests for it.

@wmdoble
Copy link
Author

wmdoble commented Jan 18, 2016

Thanks for your quick response ...Josh.

Yes, I'm using this example ( https://stackoverflow.com/questions/33797693/how-to-properly-provide-credentials-for-spark-redshift-in-emr-instances) to setup credentials as follows:

val provider = new InstanceProfileCredentialsProvider()
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials]
val token = credentials.getSessionToken
val awsAccessKey = credentials.getAWSAccessKeyId
val awsSecretKey = credentials.getAWSSecretKey

Setting hadoop credentials as follows:

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKey)
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretKey)

//s3 path
val tempS3Dir = "s3n://my-bucket-us-east/"

The following is not working but temp files are generated in the S3 bucket:

val eventsDF = sqlContext.read.format("com.databricks.spark.redshift").option("url", jdbcURL).option("tempdir", tempS3Dir).option("dbtable", "event").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load()
Stack trace:
scala> val eventsDF = sqlContext.read.format("com.databricks.spark.redshift").option("url", jdbcURL).option("tempdir", tempS3Dir).option("dbtable", "event").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load()
16/01/17 23:19:18 INFO EmrFileSystem: Consistency disabled, using com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem as filesystem implementation
16/01/17 23:19:18 INFO latency: StatusCode=[403], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: E93ECE5F9860C7DC), S3 Extended Request ID: qZVp88X6Q0Kcz/Mtvp4ANpuc2I8eu9UhHadtYSTylX57louS/+KanxHpaEy576ESu2SSm6EmLSw=], ServiceName=[Amazon S3], AWSErrorCode=[403 Forbidden], AWSRequestID=[E93ECE5F9860C7DC], ServiceEndpoint=[https://my-bucket-us-east.s3.amazonaws.com], Exception=1, HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=0, ClientExecuteTime=[389.455], HttpRequestTime=[351.487], HttpClientReceiveResponseTime=[6.637], RequestSigningTime=[28.867], HttpClientSendRequestTime=[0.349],
eventsDF: org.apache.spark.sql.DataFrame = [eventid: int, venueid: int, catid: int, dateid: int, eventname: string, starttime: timestamp]

Here's the partial stack trace for:
scala> eventsDF.show()

16/01/17 23:19:40 WARN Utils$: An error occurred while trying to read the S3 bucket lifecycle configuration
com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId; Request ID: 16CD2614C4B93FDA), S3 Extended Request ID: ZkT5JSxlS5qcwVrCwAZsE961a6bzB9zfgaeryTIv5KSG0zumoGHH5GFGn1/qNkfRNL5zS2Zj1nk=
        at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1219)
        at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:803)
        at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:505)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:317)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3595)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3548)
        at com.amazonaws.services.s3.AmazonS3Client.getBucketLifecycleConfiguration(AmazonS3Client.java:1896)
        at com.amazonaws.services.s3.AmazonS3Client.getBucketLifecycleConfiguration(AmazonS3Client.java:1883)
        at com.databricks.spark.redshift.Utils$.checkThatBucketHasObjectLifecycleConfiguration(Utils.scala:94)
        at com.databricks.spark.redshift.RedshiftRelation.buildScan(RedshiftRelation.scala:90)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:53)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:53)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:287)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:286)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:318)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:282)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:49)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
        at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:374)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
        at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:926)
        at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:924)
        at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:930)
        at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:930)
        at org.apache.spark.sql.SQLContext$QueryExecution$$anonfun$toString$6.apply(SQLContext.scala:955)
        at org.apache.spark.sql.SQLContext$QueryExecution$$anonfun$toString$6.apply(SQLContext.scala:955)
        at org.apache.spark.sql.SQLContext$QueryExecution.stringOrError(SQLContext.scala:936)
        at org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:955)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52)
        at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
        at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
        at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314)
        at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1377)
        at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:401)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:362)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:370)
        at $line41.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
        at $line41.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
        at $line41.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
        at $line41.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
        at $line41.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:61)
        at $line41.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:63)
        at $line41.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:65)
        at $line41.$read$$iwC$$iwC$$iwC.<init>(<console>:67)
        at $line41.$read$$iwC$$iwC.<init>(<console>:69)
        at $line41.$read$$iwC.<init>(<console>:71)
        at $line41.$read.<init>(<console>:73)
        at $line41.$read$.<init>(<console>:77)
        at $line41.$read$.<clinit>(<console>)
        at $line41.$eval$.<init>(<console>:7)
        at $line41.$eval$.<clinit>(<console>)
        at $line41.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
16/01/17 23:19:40 INFO RedshiftRelation: UNLOAD ('SELECT "eventid", "venueid", "catid", "dateid", "eventname", "starttime" FROM "PUBLIC"."event" ') TO 's3://my-bucket-us-east/349481db-a8d8-48f0-adea-c3fd5599ba1b/' WITH CREDENTIALS 'aws_access_key_id=<actual redentials>;aws_secret_access_key=<actual secret key>;token=<actual token>

Here's the save routine:

scala> sqlContext.sql("SELECT * FROM myevent WHERE eventid<=1000").withColumnRenamed("eventid", "id").write.format("com.databricks.spark.redshift").option("url", jdbcURL).option("tempdir", tempS3Dir).option("dbtable", "redshiftevent").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).mode(SaveMode.Overwrite).save()
16/01/17 23:21:33 ERROR Utils$: The S3 bucket wd-spark-us-east does not exist
16/01/17 23:21:33 WARN Utils$: An error occurred while trying to read the S3 bucket lifecycle configuration
com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId; Request ID: 208479900C54DF6B), S3 Extended Request ID: My4p2YoPsjwlnXkEJUEz/kEOKq7oReX6CZmtvs8Uwa3656pl8FHVl1ejnrBXMVR26giFKWKWkgU=
        at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1219)
        at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:803)
        at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:505)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:317)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3595)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3548)
        at com.amazonaws.services.s3.AmazonS3Client.getBucketLifecycleConfiguration(AmazonS3Client.java:1896)
        at com.amazonaws.services.s3.AmazonS3Client.getBucketLifecycleConfiguration(AmazonS3Client.java:1883)
        at com.databricks.spark.redshift.Utils$.checkThatBucketHasObjectLifecycleConfiguration(Utils.scala:94)
        at com.databricks.spark.redshift.RedshiftRelation.buildScan(RedshiftRelation.scala:90)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:53)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:53)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:287)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:286)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:326)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:282)
        at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:49)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
        at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:926)
        at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:924)
        at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:930)
        at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:930)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
        at org.apache.spark.sql.DataFrame.rdd$lzycompute(DataFrame.scala:1486)
        at org.apache.spark.sql.DataFrame.rdd(DataFrame.scala:1483)
        at org.apache.spark.sql.DataFrame.mapPartitions(DataFrame.scala:1351)
        at com.databricks.spark.redshift.RedshiftWriter.unloadData(RedshiftWriter.scala:261)
        at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:367)
        at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:106)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:170)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
        at $line53.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
        at $line53.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:53)
        at $line53.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:55)
        at $line53.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:57)
        at $line53.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
        at $line53.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:61)
        at $line53.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:63)
        at $line53.$read$$iwC$$iwC$$iwC.<init>(<console>:65)
        at $line53.$read$$iwC$$iwC.<init>(<console>:67)
        at $line53.$read$$iwC.<init>(<console>:69)
        at $line53.$read.<init>(<console>:71)
        at $line53.$read$.<init>(<console>:75)
        at $line53.$read$.<clinit>(<console>)
        at $line53.$eval$.<init>(<console>:7)
        at $line53.$eval$.<clinit>(<console>)
        at $line53.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
16/01/17 23:21:33 INFO RedshiftRelation: UNLOAD ('SELECT "eventname", ....

Any help would really be appreciated.

@JoshRosen
Copy link
Contributor

In the example code that you posted above, it looks like you're taking STS session credentials and are configuring sc.hadoopConf to use them, which I believe is a misuse of STS session credentials. As I understand it, access keys that are created via the STS APIs don't function quite like regular keys: you also need to specify the session token along with your requests. According to https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_use-resources.html#RequestWithSTS:

If you're making direct HTTPS API requests to AWS, you can sign those requests with the temporary security credentials that you get from AWS Security Token Service (AWS STS). To do this, you do the following:

  • Use the access key ID and secret access key that are provided with the temporary security credentials the same way you would use long-term credentials to sign a request. [...]
  • Use the session token that is provided with the temporary security credentials. [...]

Similarly, according to https://docs.aws.amazon.com/redshift/latest/dg/r_copy-temporary-security-credentials.html:

To use temporary security credentials with a COPY command, include the token=argument in the credentials string. You must supply the access key id and secret access key that were provided with the token.

Since s3n doesn't have / can't supply the session token, you won't be able to use those STS keys there.

What I think is happening here is that the invalid credentials in sc.hadoopConf are taking precedence over the IAM instance profile configuration, causing the error that you saw. Therefore, I recommend the following steps to address this problem:

  • Don't set AWS keys in the sc.hadoopConf; this should be unnecessary when running on EMR and using IAM instance profiles.
  • Use the s3:// subprotocol, as is recommended in the EMR docs.

Regarding that bucket lifecycle exception, I think that's a harmless but annoying bug in spark-redshift: the STS credentials specified in the configuration are supposed to take precedence over all other forms of credential specification, but it looks like that single call site didn't handle this properly. I'll fix this shortly, but note that this was not the cause of your failed reads; the Hadoop configuration is the root cause AFAIK.

@JoshRosen
Copy link
Contributor

By the way, I would appreciate any suggestions on how I can make the documentation more clear. If my suggestion fixes things for you and you have time, feel free to submit a pull request to add clarifications to the README.

@JoshRosen
Copy link
Contributor

I've opened #159 to fix the spurious bucket lifecycle check warning.

@wmdoble
Copy link
Author

wmdoble commented Jan 18, 2016

Based on your recommendations, i went ahead and implemented made the following updates:

  • val tempS3Dir = "s3://"+ awsAccessKey + ":" + awsSecretKey + "@my-bucket-us-east/"

  • removed the sc.hadoopConf settings for AWS keys
    However, things are not working, still getting the same stack trace. I'm beginning to wonder if the problem is due to specifying S3 bucket without the embedded credentials as shown in the logs:

    INFO RedshiftRelation: UNLOAD ('SELECT "eventid", "venueid", "catid", "dateid", "eventname", "starttime" FROM "PUBLIC"."event" ') TO 's3://my-bucket-us-east/8d0b7afe-f6be-49b5-a582-a2abab06ea6c/' WITH CREDENTIALS 'aws_access_key_id=ACCESS-KEY;aws_secret_access_key=SECRET-KEY;token=TOKEN=' ESCAPE

I'm wondering if this line of code in RedshiftRelation.scala needs to retain the credentials in the S3 url: https://github.com/databricks/spark-redshift/blob/master/src/main/scala/com/databricks/spark/redshift/RedshiftRelation.scala#L155.

@wmdoble
Copy link
Author

wmdoble commented Jan 18, 2016

This is the stack trace:
INFO latency: StatusCode=[403], Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId; Request ID: 1E5972EDD9AAA06F), S3 Extended Request ID: +Oh+vgmiKPAnrgO98GaKK1YunG1Nb9pY8k1HGBCpHaFSCGWsqDrcyWbZlwTVchzz], ServiceName=[Amazon S3], AWSErrorCode=[InvalidAccessKeyId], AWSRequestID=[1E5972EDD9AAA06F], ServiceEndpoint=[https://my-bucket-us-east.s3.amazonaws.com], Exception=1, HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1, ClientExecuteTime=[7.285], HttpRequestTime=[5.976], HttpClientReceiveResponseTime=[3.499], RequestSigningTime=[0.298], HttpClientSendRequestTime=[0.186],
com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId; Request ID: 1E5972EDD9AAA06F), S3 Extended Request ID: +Oh+vgmiKPAnrgO98GaKK1YunG1Nb9pY8k1HGBCpHaFSCGWsqDrcyWbZlwTVchzz
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1219)
at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:803)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:505)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:317)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3595)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3548)
at com.amazonaws.services.s3.AmazonS3Client.listObjects(AmazonS3Client.java:647)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:277)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:260)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:253)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy48.list(Unknown Source)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:756)
at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)
at org.apache.hadoop.fs.Globber.glob(Globber.java:252)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1700)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.globStatus(EmrFileSystem.java:373)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:292)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:264)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:350)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:115)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:193)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1377)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:401)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:362)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:370)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:60)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:65)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:67)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:69)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:71)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:73)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:75)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:77)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:79)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:81)
at $iwC$$iwC$$iwC$$iwC.(:83)
at $iwC$$iwC$$iwC.(:85)
at $iwC$$iwC.(:87)
at $iwC.(:89)
at (:91)
at .(:95)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

@JoshRosen
Copy link
Contributor

I'm wondering if this line of code in RedshiftRelation.scala needs to retain the credentials in the S3 url

You can't embed the credentials into the URI when specifying it as part of the UNLOAD or COPY commands, since that style of credential-embedding is a Hadoop concept which Redshift does not understand. See the documentation on https://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html.

@JoshRosen
Copy link
Contributor

Based on your recommendations, i went ahead and implemented made the following updates:

  • val tempS3Dir = "s3://"+ awsAccessKey + ":" + awsSecretKey + "@my-bucket-us-east/"
    [...]

To clarify, when using IAM instance profiles you should not specify the credentials in the Hadoop configuration or the URI; you should only specify them via the temporary_aws_* configurations in spark-redshift.

@wmdoble
Copy link
Author

wmdoble commented Jan 18, 2016

Do you have an example showing how to provide s3 URL credentials via the temporary_aws_*?

@JoshRosen
Copy link
Contributor

Sure; here's an example:

.option("temporary_aws_access_key_id", STS_ACCESS_KEY_ID)

@wmdoble
Copy link
Author

wmdoble commented Jan 18, 2016

Okay thanks.
I'm using InstanceProfileCredentialsProvider and sending credentials as follows:

val eventsDF = sqlContext.read.format("com.databricks.spark.redshift").option("url", jdbcURL).option("tempdir", tempS3Dir).option("dbtable", "event").option("temporary_aws_access_key_id", awsAccessKey).option("temporary_aws_secret_access_key", awsSecretKey).option("temporary_aws_session_token", token).load()

However, I'm getting the following stack trace:
scala> eventsDF.show()
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively).
at com.databricks.spark.redshift.S3Credentials.initialize(S3Credentials.java:67)
at com.databricks.spark.redshift.AWSCredentialsUtils$.load(AWSCredentialsUtils.scala:49)
at com.databricks.spark.redshift.RedshiftRelation.buildScan(RedshiftRelation.scala:89)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:53)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$3.apply(DataSourceStrategy.scala:53)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:287)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$$anonfun$pruneFilterProject$1.apply(DataSourceStrategy.scala:286)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:318)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:282)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:49)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:374)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:926)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:924)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:930)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:930)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1377)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:401)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:362)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:370)

Do you have an example that uses InstanceProfile?

@JoshRosen
Copy link
Contributor

Aha, that is a bug. This will be fixed as soon as I merge #159. I guess we haven't had other users try this out (I don't have end-to-end integration tests for this STS/IAM path yet; pull requests are welcome for this, since I don't have time to work on this myself).

@JoshRosen JoshRosen added the bug label Jan 18, 2016
@JoshRosen JoshRosen added this to the 0.6.1 milestone Jan 18, 2016
@JoshRosen
Copy link
Contributor

I'm going to merge #159. If you have the means to test out a custom build of spark-redshift, it would be great if you could give it a try and confirm whether this fixes things for you. If not, you can wait for 1.6.1 to be released. Thanks for reporting this problem.

@wmdoble
Copy link
Author

wmdoble commented Jan 20, 2016

That's great news! Let me know how to access the custom build and test it later today.

@JoshRosen
Copy link
Contributor

@wmdoble, the easiest way to test this out is probably to clone this repository and use either sbt package or sbt assembly to generate a JAR or assembly JAR.

@wmdoble
Copy link
Author

wmdoble commented Jan 21, 2016

Confirming your code fix works. Thank you!
It seems there's a minor error reported to the effect that the bucket does not exist, but it seems harmless since am getting output from the tables.

scala> eventsDF.show()
16/01/21 04:13:38 ERROR Utils$: The S3 bucket my-bucket-us-east does not exist
16/01/21 04:13:38 INFO RedshiftRelation: UNLOAD ('SELECT "eventid", "venueid", "catid", "dateid", "eventname", "starttime" FROM "PUBLIC"."event" ') TO 's3://my-bucket-us-east/c67480b3-869e-4545-b214-20ca6e685f75/' WITH CREDENTIALS 'aws_access_key_id=access-key;aws_secret_access_key=secret-key;token=token' ESCAPE

Output:
16/01/21 04:13:44 INFO DAGScheduler: Job 0 finished: show at :50, took 3.663555 s
+-------+-------+-----+------+------------------+--------------------+
|eventid|venueid|catid|dateid| eventname| starttime|
+-------+-------+-----+------+------------------+--------------------+
| 1217| 238| 6| 1827| Mamma Mia!|2008-01-01 20:00:...|
| 1433| 248| 6| 1827| Grease|2008-01-01 19:00:...|

I appreciate your help.

@AndriyLytvynskyy
Copy link

AndriyLytvynskyy commented Apr 28, 2016

I'm having issues with that.
I assume the problem here is that none of "s3", "s3native" in Hadoop 2.7.2 which I'm using do support session credentials, only 's3a' has it. So that fix can't work for 's3' and 's3native'.

I rebuilt the master branch, and having such behaviours:

  1. if the following is not explicitly specified in code:
sqlContext.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKey)
sqlContext.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretKey)

I do get:
xception in thread "main" java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).

2.With the following I do get something better (permission issue):

 sqlContext.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKey)
    sqlContext.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretKey)

    val jdbcURL = s"""jdbc:redshift://$rsURL/$rsDbName?user=$rsUser&password=$rsPassword"""
    val df: DataFrame = sqlContext.read
      .format("com.databricks.spark.redshift")
      .option("url", jdbcURL)
      .option("dbtable", "db_table1")
      .option("tempdir", "s3n://spark-bucket/temp")
      .option("temporary_aws_access_key_id", awsAccessKey)
      .option("temporary_aws_secret_access_key", awsSecretKey)
      .option("temporary_aws_session_token", token)
      .load()

Exception at the end:

Exception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: s3n://spark-bucket/temp/3e91c500-cbc7-4200-986c-597b723bba24/0000_part_00
    at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:449)

I'll try with 's3a'.

@AndriyLytvynskyy
Copy link

With s3a there are problems, but those are related to Hadoop: hadoop-was-2.7.2 calls the non existent method:

Exception in thread "main" java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold(I)V
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:285)

@austinchau
Copy link

austinchau commented Aug 30, 2016

same problem as @AndriyLytvynskyy for me.

py4j.protocol.Py4JJavaError: An error occurred while calling o30.load.
: java.lang.NoSuchMethodError: org.apache.http.conn.ssl.SSLConnectionSocketFactory.(Ljavax/net/ssl/SSLContext;Ljavax/net/ssl/HostnameVerifier;)V
at com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.(SdkTLSSocketFactory.java:56)

I wonder if this is specific with the Python version of this library. my snippet running with these versions of packages:

spark-submit --jars ./RedshiftJDBC41-1.1.13.1013.jar --packages org.apache.hadoop:hadoop-aws:2.6.4,com.amazonaws:aws-java-sdk:1.11.29,com.databricks:spark-redshift_2.10:1.1.0 --driver-class-path ./RedshiftJDBC41-1.1.13.1013.jar main.py

  df = sqlContext.read.format("com.databricks.spark.redshift").options(
    tempdir = s3_tmp_dir, url = jdbc_url, query = sql, temporary_aws_access_key_id=aws_key, temporary_aws_secret_access_key=aws_secret, temporary_aws_session_token=temporary_aws_session_token).load()
  rdd = df.rdd  
  print rdd.collect()

@JoshRosen
Copy link
Contributor

Hey @austinchau, let's continue this discussion over at #260, which I think is a duplicate of your specific issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants