Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to catch Exception in save() of spark sql #276

Open
gitgraghu opened this issue Oct 9, 2019 · 3 comments
Open

Unable to catch Exception in save() of spark sql #276

gitgraghu opened this issue Oct 9, 2019 · 3 comments

Comments

@gitgraghu
Copy link

I am using HIve Warehouse connector to write a DataFrame to a hive table. Code for save looks like below:

inputDS.write()
.mode(SaveMode.Append)
.format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR)
.option("table","tablename")
.save();

However I am unable to catch exception whenever the executeUpdate fails to insert records into table. I would like to catch exception and stop the spark execution as soon as a Runtime exception happens. I see in code that the exception is logged but not thrown

Is there any way I can stop the spark execution when the save() method ends up in error.

log.error(s"executeUpdate failed for query: ${query}", e)

@rahulmod
Copy link

It looks like the executeUpdate function is not throwing exception and hence the client code is not able to catch. After logging error the function should throw exception at line 228.
You can do following in scala to check the success:

val res = inputDS.write()
.mode(SaveMode.Append)
.format(HiveWarehouseSession.HIVE_WAREHOUSE_CONNECTOR)
.option("table","tablename")
.save()

if(res) print("success") else ("failure")

@gitgraghu
Copy link
Author

Hi rahulmod

Thanks for the reply !!

the save() function does not return a boolean. It returns void in Java and Unit in scala. So the above code which you gave won't work.

This is a problem in the code. They try to catch and throw exception at executeUpdate as below code. But executeUpdate never throws an Exception. So we cannot catch the Exception from save().
There is no way to handle exception on save() in spark with hortonworks warehouse connector plugin.

DefaultJDBCWrapper.executeUpdate(conn, database, loadInto(this.path.toString(), database, table));

@rahulmod
Copy link

You have to use hive_warehouse_connector to connect to Hive and use "insert into table ..." in executeUpdate command. First store data in staging table before inserting into final table. If you use executeUpdate then we can throw exception as I mentioned in my previous comment.
https://www.nitendratech.com/bigdata/spark/access-hive-in-hdp3-using-apache-spark/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants