-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Need Support for Stored Procedures, Updates, Deletes #21
Comments
@furlong46 You can use the query option for all operations except sprocs. You would first have to stage data into another table before issuing updates etc: `articleQuery = """SELECT try: |
On further playing around I see the issue and just assumed this wrapper behaved similar to the scala version. To do a non query you can do something like this: jdbcUsername = "RelationalLayerLogin" url = "jdbc:sqlserver://{SERVER_ADDR};databaseName={DATABASE_NAME};".format(SERVER_ADDR = server,DATABASE_NAME = database) #for executing non-queries drop = "DROP TABLE IF EXISTS Stage.SalesByDateStoreArticle" stmt = con.createStatement() |
with AAD authentication this can look like:
|
Hi all, While the older Azure SQL DB Spark connector did include this functionality, since this new one is based on the Spark DataSource APIs, it is out of scope. This functionality is provided by libraries like pyodbc. |
Hello, can anyone please give me a scala version for B4PJS's code for the DROP TABLE example? I can't get his version to work in scala |
Hi, we faced a similar issue today. What we wanted to do: Here is a fully working example of a Databricks Scala Notebook, accessing an Azure SQL DB with Azure AD and running a Stored Procedure: If you want to use it, don't forget to change the values of <...> appropriately and install the required libraries! // Import libraries
import com.microsoft.aad.adal4j.{AuthenticationContext, ClientCredential}
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.Statement
import java.util.concurrent.Executors
import java.util.Properties
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val secretScope = "<yourSecretScope>"
val clientIdKey = "<servicePrincipalClientId>"
val clientSecretKey = "<servicePrincipalClientSecret>"
val tenantId = "<yourTenantId>"
val authority = s"https://login.windows.net/${tenantId}"
val resourceAppIdUri = "https://database.windows.net/"
val sqlServerName = "<yourSqlServersName>"
val sqlDatabaseName = "yourSqlDatabase"
val url = s"jdbc:sqlserver://${sqlServerName}.database.windows.net:1433;databaseName=${sqlDatabaseName}"
// --- Get AccessToken for ServicePrincipal ---
val service = Executors.newFixedThreadPool(1)
val authenticationContext = new AuthenticationContext(authority, true, service)
val principalClientId = dbutils.secrets.get(scope=secretScope, key=clientIdKey)
val principalSecret = dbutils.secrets.get(scope=secretScope, key=clientSecretKey)
val clientCredential = new ClientCredential(principalClientId, principalSecret)
val authResult = authenticationContext.acquireToken(resourceAppIdUri, clientCredential, null)
val accessToken = authResult.get().getAccessToken
// --- Prepare ResultSet to DataFrame conversion ---
// Define columns & prepare schema
val columns = Seq ("<myColumn1>", "<myColumn2>", "<myColumn3>", "<myColumn4>")
val schema = StructType(List(
StructField("<myColumn1>", StringType, nullable = true),
StructField("<myColumn2>", StringType, nullable = true),
StructField("<myColumn3>", StringType, nullable = true),
StructField("<myColumn4>", StringType, nullable = true)
))
// Define how each record in ResultSet will be converted to a Row at each iteration
def parseResultSet(rs: ResultSet): Row = {
val resultSetRecord = columns.map(c => rs.getString(c))
Row(resultSetRecord:_*)
}
// Define a function to convert the ResultSet to an Iterator[Row] (It will use the function of the previous step)
def resultSetToIter(rs: ResultSet)(f: ResultSet => Row): Iterator[Row] =
new Iterator[Row] {
def hasNext: Boolean = rs.next()
def next(): Row = f(rs)
}
// Define a function that creates an RDD out of an Iterator[Row].toSeq which uses the previously definded functions
// The above created schema will be used to create a DataFrame
def parallelizeResultSet(rs: ResultSet, spark: SparkSession): DataFrame = {
val rdd = spark.sparkContext.parallelize(resultSetToIter(rs)(parseResultSet).toSeq)
spark.createDataFrame(rdd, schema)
}
// --- Execute StoredProcedure ---
// Configure connection properties
val connectionProperties = new Properties()
connectionProperties.setProperty("Driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
connectionProperties.put("accessToken", accessToken)
connectionProperties.put("encrypt", "true")
connectionProperties.put("hostNameInCertificate", "*.database.windows.net")
// Execute the Stored Procedure with "EXEC <nameOfStoredProcedure> <parameterIfNeeded>"
val connection = DriverManager.getConnection(url, connectionProperties)
val resultSet = connection.createStatement.executeQuery("EXEC <nameOfStoredProcedure> <parameterIfNeeded>")
val df = parallelizeResultSet(resultSet, spark)
display(df)
Keep in mind: --> The new SQL Spark Connector should already be installed. If not do so. Hope this is helpful. 🤓 |
May I know how to pass "accessToken" as connection properties to the function in python? Thanks. |
The previous, unrelated version of this solution supported DDL and DML operations. We need the ability to execute Update, Delete, and Stored Procedures from within a Spark notebook.
https://github.com/Azure/azure-sqldb-spark
The text was updated successfully, but these errors were encountered: