From 7348f7dee57f5b0db0a22f366e699dc52dd41ac1 Mon Sep 17 00:00:00 2001 From: robertwhiffin <82869776+robertwhiffin@users.noreply.github.com> Date: Wed, 2 Oct 2024 15:26:23 +0100 Subject: [PATCH] Updated sql migration assistant UI (#260) Revamps the UI for the migration assistant, adds workflow automation --- sql_migration_assistant/__init__.py | 1 + sql_migration_assistant/app/llm.py | 38 +- sql_migration_assistant/gradio_app.py | 728 +++++++++++++----- sql_migration_assistant/gradio_app_backup.py | 298 +++++++ .../infra/app_serving_cluster_infra.py | 3 +- sql_migration_assistant/infra/chat_infra.py | 64 +- sql_migration_assistant/infra/jobs_infra.py | 116 +++ .../infra/secrets_infra.py | 8 +- .../infra/unity_catalog_infra.py | 78 +- .../infra/vector_search_infra.py | 18 +- sql_migration_assistant/jobs/__init__.py | 0 .../jobs/bronze_to_silver.py | 209 +++++ sql_migration_assistant/jobs/call_agents.py | 108 +++ .../jobs/silver_to_gold.py | 176 +++++ sql_migration_assistant/requirements.txt | 1 - .../run_app_from_databricks_notebook.py | 6 +- sql_migration_assistant/utils/initialsetup.py | 67 +- .../utils/run_review_app.py | 6 +- .../utils/upload_files_to_workspace.py | 13 +- 19 files changed, 1639 insertions(+), 299 deletions(-) create mode 100644 sql_migration_assistant/gradio_app_backup.py create mode 100644 sql_migration_assistant/infra/jobs_infra.py create mode 100644 sql_migration_assistant/jobs/__init__.py create mode 100644 sql_migration_assistant/jobs/bronze_to_silver.py create mode 100644 sql_migration_assistant/jobs/call_agents.py create mode 100644 sql_migration_assistant/jobs/silver_to_gold.py diff --git a/sql_migration_assistant/__init__.py b/sql_migration_assistant/__init__.py index dcfe9e9a..c0d899ca 100644 --- a/sql_migration_assistant/__init__.py +++ b/sql_migration_assistant/__init__.py @@ -9,6 +9,7 @@ def hello(): w = WorkspaceClient(product="sql_migration_assistant", product_version="0.0.1") p = Prompts() setter_upper = SetUpMigrationAssistant() + setter_upper.check_cloud(w) final_config = setter_upper.setup_migration_assistant(w, p) current_path = Path(__file__).parent.resolve() diff --git a/sql_migration_assistant/app/llm.py b/sql_migration_assistant/app/llm.py index 1d8c7fc9..ce409967 100644 --- a/sql_migration_assistant/app/llm.py +++ b/sql_migration_assistant/app/llm.py @@ -1,24 +1,15 @@ -import logging +import gradio as gr from databricks.sdk import WorkspaceClient from databricks.sdk.service.serving import ChatMessage, ChatMessageRole -w = WorkspaceClient() -foundation_llm_name = "databricks-meta-llama-3-1-405b-instruct" -max_token = 4096 -messages = [ - ChatMessage(role=ChatMessageRole.SYSTEM, content="You are an unhelpful assistant"), - ChatMessage(role=ChatMessageRole.USER, content="What is RAG?"), -] - class LLMCalls: - def __init__(self, foundation_llm_name, max_tokens): + def __init__(self, foundation_llm_name): self.w = WorkspaceClient() self.foundation_llm_name = foundation_llm_name - self.max_tokens = int(max_tokens) - def call_llm(self, messages): + def call_llm(self, messages, max_tokens, temperature): """ Function to call the LLM model and return the response. :param messages: list of messages like @@ -29,8 +20,17 @@ def call_llm(self, messages): ] :return: the response from the model """ + + max_tokens = int(max_tokens) + temperature = float(temperature) + # check to make sure temperature is between 0.0 and 1.0 + if temperature < 0.0 or temperature > 1.0: + raise gr.Error("Temperature must be between 0.0 and 1.0") response = self.w.serving_endpoints.query( - name=foundation_llm_name, max_tokens=max_token, messages=messages + name=self.foundation_llm_name, + max_tokens=max_tokens, + messages=messages, + temperature=temperature, ) message = response.choices[0].message.content return message @@ -53,14 +53,16 @@ def convert_chat_to_llm_input(self, system_prompt, chat): # this is called to actually send a request and receive response from the llm endpoint. - def llm_translate(self, system_prompt, input_code): + def llm_translate(self, system_prompt, input_code, max_tokens, temperature): messages = [ ChatMessage(role=ChatMessageRole.SYSTEM, content=system_prompt), ChatMessage(role=ChatMessageRole.USER, content=input_code), ] # call the LLM end point. - llm_answer = self.call_llm(messages=messages) + llm_answer = self.call_llm( + messages=messages, max_tokens=max_tokens, temperature=temperature + ) # Extract the code from in between the triple backticks (```), since LLM often prints the code like this. # Also removes the 'sql' prefix always added by the LLM. translation = llm_answer # .split("Final answer:\n")[1].replace(">>", "").replace("<<", "") @@ -73,12 +75,14 @@ def llm_chat(self, system_prompt, query, chat_history): llm_answer = self.call_llm(messages=messages) return llm_answer - def llm_intent(self, system_prompt, input_code): + def llm_intent(self, system_prompt, input_code, max_tokens, temperature): messages = [ ChatMessage(role=ChatMessageRole.SYSTEM, content=system_prompt), ChatMessage(role=ChatMessageRole.USER, content=input_code), ] # call the LLM end point. - llm_answer = self.call_llm(messages=messages) + llm_answer = self.call_llm( + messages=messages, max_tokens=max_tokens, temperature=temperature + ) return llm_answer diff --git a/sql_migration_assistant/gradio_app.py b/sql_migration_assistant/gradio_app.py index 990f570a..b1b15d31 100644 --- a/sql_migration_assistant/gradio_app.py +++ b/sql_migration_assistant/gradio_app.py @@ -1,6 +1,10 @@ +import json import os +import datetime from databricks.labs.lsql.core import StatementExecutionExt from databricks.sdk import WorkspaceClient +from databricks.sdk.service.workspace import ImportFormat, Language +import base64 import gradio as gr from app.llm import LLMCalls @@ -23,14 +27,16 @@ CODE_INTENT_TABLE_NAME = os.environ.get("CODE_INTENT_TABLE_NAME") CATALOG = os.environ.get("CATALOG") SCHEMA = os.environ.get("SCHEMA") - +VOLUME_NAME_INPUT_PATH = os.environ.get("VOLUME_NAME_INPUT_PATH") +VOLUME_NAME = os.environ.get("VOLUME_NAME") +DATABRICKS_HOST = os.environ.get("DATABRICKS_HOST") +TRANSFORMATION_JOB_ID = os.environ.get("TRANSFORMATION_JOB_ID") +WORKSPACE_LOCATION = os.environ.get("WORKSPACE_LOCATION") w = WorkspaceClient(product="sql_migration_assistant", product_version="0.0.1") -see = StatementExecutionExt(w, SQL_WAREHOUSE_ID) -translation_llm = LLMCalls( - foundation_llm_name=FOUNDATION_MODEL_NAME, max_tokens=MAX_TOKENS -) -intent_llm = LLMCalls(foundation_llm_name=FOUNDATION_MODEL_NAME, max_tokens=MAX_TOKENS) +see = StatementExecutionExt(w, warehouse_id=SQL_WAREHOUSE_ID) +translation_llm = LLMCalls(foundation_llm_name=FOUNDATION_MODEL_NAME) +intent_llm = LLMCalls(foundation_llm_name=FOUNDATION_MODEL_NAME) similar_code_helper = SimilarCode( workspace_client=w, see=see, @@ -51,237 +57,561 @@ gr.Markdown( """logo -## A migration assistant for explaining the intent of SQL code and conversion to Spark SQL +# Databricks Legion Migration Accelerator + +Legion is an AI powered tool that aims to accelerate the migration of code to Databricks for low cost and effort. It +does this by using AI to translate, explain, and make discoverable your code. + +This interface is the Legion Control Panel. Here you are able to configure the AI agents for translation and explanation +to fit your needs, incorporating your expertise and knowledge of the codebase by adjusting the AI agents' instructions. + +Legion can work in a batch or interactive fashion. -#### This demo relies on the tables and columns referenced in the SQL query being present in Unity Catalogue and having their table comments and column comments populated. For the purpose of the demo, this was generated using the Databricks AI Generated Comments tool. +*Interactive operation* +Fine tune the AI agents on a single file and output the result as a Databricks notebook. +Use this UI to adjust the system prompts and instructions for the AI agents to generate the best translation and intent. + +*Batch operation* +Process a Volume of files to generate Databricks notebooks. Use this UI to fine tune your agent prompts against selected + files before executing a Workflow to transform all files in the Volume, outputting Databricks notebooks with the AI + generated intent and translation. + + +Please select your mode of operation to get started. """ ) + operation = gr.Radio( + label="Select operation mode", + choices=["Interactive mode", "Batch mode"], + value="Interactive mode", + type="value", + interactive=True, + ) + ################################################################################ + #### STORAGE SETTINGS TAB + ################################################################################ + + with gr.Tab(label="Input code", visible=True) as interactive_input_code_tab: + + gr.Markdown( + f"""## Paste in some code to test your agents on. + """ + ) + interactive_code_button = gr.Button("Ingest code") + interactive_code = gr.Code( + label="Paste your code in here", language="sql-msSQL" + ) + interactive_code_button.click(fn=lambda: gr.Info("Code ingested!")) + + with gr.Tab(label="Select code", visible=False) as batch_input_code_tab: + + gr.Markdown( + f"""## Select a file to test your agents on. + + Legion can batch process a Volume of files to generate Databricks notebooks. The files to translate must be + added to the *Input Code* folder in the UC Volume [here]({DATABRICKS_HOST}/explore/data/volumes/{CATALOG}/{SCHEMA}/{VOLUME_NAME}). + + Here you can select a file to fine tune your agent prompts against. + """ + ) + volume_path = gr.Textbox(value=VOLUME_NAME_INPUT_PATH, visible=False) + + load_files = gr.Button("Load Files from Volume") + select_code_file = gr.Radio(label="Select Code File") + selected_file = gr.Code(label="Selected Code File", language="sql-msSQL") + + def list_files(path_to_volume): + file_infos = w.dbutils.fs.ls(path_to_volume) + file_names = [x.name for x in file_infos] + file_name_radio = gr.Radio(label="Select Code File", choices=file_names) + return file_name_radio + + load_files.click(list_files, volume_path, select_code_file) + + def read_code_file(volume_path, file_name): + file_name = os.path.join(volume_path, file_name) + file = w.files.download(file_name) + code = file.contents.read().decode("utf-8") + return code ################################################################################ - #### TRANSLATION ADVANCED OPTIONS PANE + #### EXPLANATION TAB ################################################################################ - with gr.Accordion(label="Translation Advanced Settings", open=False): - with gr.Row(): - transation_system_prompt = gr.Textbox( - label="Instructions for the LLM translation tool.", - value=""" - You are an expert in multiple SQL dialects. You only reply with SQL code and with no other text. - Your purpose is to translate the given SQL query to Databricks Spark SQL. - You must follow these rules: - - You must keep all original catalog, schema, table, and field names. - - Convert all dates to dd-MMM-yyyy format using the date_format() function. - - Subqueries must end with a semicolon. - - Ensure queries do not have # or @ symbols. - - ONLY if the original query uses temporary tables (e.g. "INTO #temptable"), re-write these as either CREATE OR REPLACE TEMPORARY VIEW or CTEs. . - - Square brackets must be replaced with backticks. - - Custom field names should be surrounded by backticks. - - Ensure queries do not have # or @ symbols. - - Only if the original query contains DECLARE and SET statements, re-write them according to the following format: - DECLARE VARIABLE variable TYPE DEFAULT value; For example: DECLARE VARIABLE number INT DEFAULT 9; - SET VAR variable = value; For example: SET VAR number = 9; - - Write an initial draft of the translated query. Then double check the output for common mistakes, including: - - Using NOT IN with NULL values - - Using UNION when UNION ALL should have been used - - Using BETWEEN for exclusive ranges - - Data type mismatch in predicates - - Properly quoting identifiers - - Using the correct number of arguments for functions - - Casting to the correct data type - - Using the proper columns for joins - - Return the final translated query only. Include comments. Include only SQL. - """.strip(), - lines=40, + with gr.Tab(label="Code Explanation"): + gr.Markdown( + """ + ## An AI tool to generate the intent of your code. + + In this panel you need to iterate on the system prompt to refine the intent the AI generates for your code. + This intent will be stored in Unity Catalog, and can be used for finding similar code, for documentation, + and to help with writing new code in Databricks to achieve the same goal. + """ + ) + with gr.Accordion(label="Advanced Intent Settings", open=True): + gr.Markdown( + """ ### Advanced settings for the generating the intent of the input code. + + The *Temperature* paramater controls the randomness of the AI's response. Higher values will result in + more creative responses, while lower values will result in more predictable responses. + """ ) + with gr.Row(): + intent_temperature = gr.Number( + label="Temperature. Float between 0.0 and 1.0", value=0.0 + ) + intent_max_tokens = gr.Number( + label="Max tokens. Check your LLM docs for limit.", value=3500 + ) + with gr.Row(): + intent_system_prompt = gr.Textbox( + label="System prompt of the LLM to generate the intent.", + value="""Your job is to explain intent of the provided SQL code. + """.strip(), + ) + with gr.Accordion(label="Intent Pane", open=True): + gr.Markdown( + """ ## AI generated intent of what your code aims to do. + """ + ) + explain_button = gr.Button("Explain") + with gr.Row(): + with gr.Column(): + gr.Markdown(""" ## Input Code.""") + + # input box for SQL code with nice formatting + intent_input_code = gr.Code( + label="Input SQL", + language="sql-msSQL", + ) + # a button labelled translate + + with gr.Column(): + # divider subheader + gr.Markdown(""" ## Code intent""") + # output box of the T-SQL translated to Spark SQL + explained = gr.Textbox(label="AI generated intent of your code.") + + def llm_intent_wrapper(system_prompt, input_code, max_tokens, temperature): + # call the LLM to translate the code + intent = intent_llm.llm_intent( + system_prompt, input_code, max_tokens, temperature + ) + return intent + + # reset hidden chat history and prompt + # do translation + explain_button.click( + fn=llm_intent_wrapper, + inputs=[ + intent_system_prompt, + intent_input_code, + intent_max_tokens, + intent_temperature, + ], + outputs=explained, + ) ################################################################################ - #### TRANSLATION PANE + #### TRANSLATION TAB ################################################################################ - # subheader - - with gr.Accordion(label="Translation Pane", open=True): + with gr.Tab(label="Translation"): gr.Markdown( - """ ### Input your T-SQL code here for automatic translation to Spark-SQL and use AI to generate a statement of intent for the code's purpose.""" + """ + ## An AI tool to translate your code. + + In this panel you need to iterate on the system prompt to refine the translation the AI generates for your code. + + """ ) - # hidden chat interface - to enable chatbot functionality - translation_chat = gr.Chatbot(visible=False) - with gr.Row(): - with gr.Column(): - gr.Markdown( - """ ### Input your T-SQL code here for translation to Spark-SQL.""" - ) + with gr.Accordion(label="Translation Advanced Settings", open=True): + gr.Markdown( + """ ### Advanced settings for the translating the input code. - # input box for SQL code with nice formatting - input_code = gr.Code( - label="Input SQL", - language="sql-msSQL", - value="""SELECT - c.[country_name], - AVG([dep_count]) AS average_dependents -FROM - ( - SELECT - e.[employee_id] - ,e.[department_id] - ,COUNT(d.[dependent_id]) AS dep_count - FROM - [robert_whiffin].[code_assistant].[employees] e - LEFT JOIN [robert_whiffin].[code_assistant].[dependents] d ON e.[employee_id] = d.[employee_id] - GROUP BY - e.[employee_id] - ,e.[department_id] - ) AS subquery - JOIN [robert_whiffin].[code_assistant].[departments] dep ON subquery.[department_id] = dep.[department_id] - JOIN [robert_whiffin].[code_assistant].[locations] l ON dep.[location_id] = l.[location_id] - JOIN [robert_whiffin].[code_assistant].[countries] c ON l.[country_id] = c.[country_id] -GROUP BY - c.[country_name] -ORDER BY - c.[country_name]""", + The *Temperature* paramater controls the randomness of the AI's response. Higher values will result in + more creative responses, while lower values will result in more predictable responses. + """ + ) + with gr.Row(): + translation_temperature = gr.Number( + label="Temperature. Float between 0.0 and 1.0", value=0.0 + ) + translation_max_tokens = gr.Number( + label="Max tokens. Check your LLM docs for limit.", value=3500 + ) + with gr.Row(): + translation_system_prompt = gr.Textbox( + label="Instructions for the LLM translation tool.", + value=""" + You are an expert in multiple SQL dialects. You only reply with SQL code and with no other text. + Your purpose is to translate the given SQL query to Databricks Spark SQL. + You must follow these rules: + - You must keep all original catalog, schema, table, and field names. + - Convert all dates to dd-MMM-yyyy format using the date_format() function. + - Subqueries must end with a semicolon. + - Ensure queries do not have # or @ symbols. + - ONLY if the original query uses temporary tables (e.g. "INTO #temptable"), re-write these as either CREATE OR REPLACE TEMPORARY VIEW or CTEs. . + - Square brackets must be replaced with backticks. + - Custom field names should be surrounded by backticks. + - Ensure queries do not have # or @ symbols. + - Only if the original query contains DECLARE and SET statements, re-write them according to the following format: + DECLARE VARIABLE variable TYPE DEFAULT value; For example: DECLARE VARIABLE number INT DEFAULT 9; + SET VAR variable = value; For example: SET VAR number = 9; + + Write an initial draft of the translated query. Then double check the output for common mistakes, including: + - Using NOT IN with NULL values + - Using UNION when UNION ALL should have been used + - Using BETWEEN for exclusive ranges + - Data type mismatch in predicates + - Properly quoting identifiers + - Using the correct number of arguments for functions + - Casting to the correct data type + - Using the proper columns for joins + + Return the final translated query only. Include comments. Include only SQL. + """.strip(), + lines=20, ) - # a button labelled translate - translate_button = gr.Button("Translate") - with gr.Column(): - # divider subheader - gr.Markdown(""" ### Your Code Translated to Spark-SQL""") - # output box of the T-SQL translated to Spark SQL - translated = gr.Code( - label="Your code translated to Spark SQL", language="sql-sparkSQL" + with gr.Accordion(label="Translation Pane", open=True): + gr.Markdown(""" ### Input your code here for translation to Spark-SQL.""") + # a button labelled translate + translate_button = gr.Button("Translate") + with gr.Row(): + with gr.Column(): + gr.Markdown(""" ## Input code.""") + + # input box for SQL code with nice formatting + translation_input_code = gr.Code( + label="Input SQL", + language="sql-msSQL", + ) + + with gr.Column(): + # divider subheader + gr.Markdown(""" ## Translated Code""") + # output box of the T-SQL translated to Spark SQL + translated = gr.Code( + label="Your code translated to Spark SQL", + language="sql-sparkSQL", + ) + + # helper function to take the output from llm_translate and return outputs for chatbox and textbox + # chatbox input is a list of lists, each list is a message from the user and the response from the LLM + # textbox input is a string + def llm_translate_wrapper( + system_prompt, input_code, max_tokens, temperature + ): + # call the LLM to translate the code + translated_code = translation_llm.llm_translate( + system_prompt, input_code, max_tokens, temperature ) - translation_prompt = gr.Textbox(label="Adjustments for translation") - - def translate_respond(system_prompt, message, chat_history): - bot_message = translation_llm.llm_chat(system_prompt, message, chat_history) - chat_history.append([message, bot_message]) - return chat_history, chat_history[-1][1] - - # helper function to take the output from llm_translate and return outputs for chatbox and textbox - # chatbox input is a list of lists, each list is a message from the user and the response from the LLM - # textbox input is a string - def llm_translate_wrapper(system_prompt, input_code): - # call the LLM to translate the code - translated_code = translation_llm.llm_translate(system_prompt, input_code) - # wrap the translated code in a list of lists for the chatbot - chat_history = [[input_code, translated_code]] - return chat_history, translated_code - - # reset hidden chat history and prompt - translate_button.click( - fn=lambda: ([["", ""]], ""), - inputs=None, - outputs=[translation_chat, translation_prompt], - ) - # do translation - translate_button.click( - fn=llm_translate_wrapper, - inputs=[transation_system_prompt, input_code], - outputs=[translation_chat, translated], - ) - # refine translation - translation_prompt.submit( - fn=translate_respond, - inputs=[transation_system_prompt, translation_prompt, translation_chat], - outputs=[translation_chat, translated], - ) + return translated_code + + # reset hidden chat history and prompt + # do translation + translate_button.click( + fn=llm_translate_wrapper, + inputs=[ + translation_system_prompt, + translation_input_code, + translation_max_tokens, + translation_temperature, + ], + outputs=translated, + ) ################################################################################ - #### AI GENERATED INTENT PANE + #### SIMILAR CODE TAB ################################################################################ - # divider subheader - with gr.Accordion(label="Advanced Intent Settings", open=False): + with gr.Tab(label="Find Similar Code"): gr.Markdown( - """ ### Advanced settings for the generating the intent of the input code.""" + """ + # ** Work in Progress ** + ## An AI tool to find similar code. + """ ) - with gr.Row(): - intent_system_prompt = gr.Textbox( - label="System prompt of the LLM to generate the intent. Editing will reset the intent.", - value="""Your job is to explain intent of the provided SQL code. - """.strip(), + with gr.Accordion(label="Similar Code Pane", open=True): + gr.Markdown( + """ ## Similar code + + This code is thought to be similar to what you are doing, based on comparing the intent of your code with the intent of this code. + """ ) - with gr.Accordion(label="Intent Pane", open=True): - gr.Markdown( - """ ## AI generated intent of what your code aims to do. - - Intent is determined by an LLM which uses the code and table & column metadata. + # a button + find_similar_code = gr.Button("Find similar code") + # a row with an code and text box to show the similar code + with gr.Row(): + similar_code_input = gr.Code( + label="Input Code.", language="sql-sparkSQL" + ) + similar_code_output = gr.Code( + label="Similar code to yours.", language="sql-sparkSQL" + ) + similar_intent = gr.Textbox(label="The similar codes intent.") - ***If the intent is incorrect, please edit***. Once you are happy that the description is correct, please click the button below to save the intent. - - """ - ) - # a box to give the LLM generated intent of the code. This is editable as well. - explain_button = gr.Button("Explain code intent using AI.") - explained = gr.Textbox(label="AI generated intent of your code.", visible=False) - - chatbot = gr.Chatbot(label="AI Chatbot for Intent Extraction", height="70%") - - msg = gr.Textbox(label="Instruction") - clear = gr.ClearButton([msg, chatbot]) - - def intent_respond(system_prompt, message, chat_history): - bot_message = intent_llm.llm_chat(system_prompt, message, chat_history) - chat_history.append([message, bot_message]) - return chat_history, "", bot_message - - def llm_chat_wrapper(system_prompt, input_code): - # call the LLM to translate the code - intent = intent_llm.llm_intent(system_prompt, input_code) - # wrap the translated code in a list of lists for the chatbot - chat_history = [[input_code, intent]] - return chat_history, "", intent - - explain_button.click( - fn=llm_chat_wrapper, - inputs=[intent_system_prompt, input_code], - outputs=[chatbot, msg, explained], - ) - msg.submit( - fn=intent_respond, - inputs=[intent_system_prompt, msg, chatbot], - outputs=[chatbot, msg, explained], + # a button + submit = gr.Button("Save code and intent") + + # assign actions to buttons when clicked. + find_similar_code.click( + fn=similar_code_helper.get_similar_code, + inputs=similar_code_input, + outputs=[similar_code_output, similar_intent], ) - clear.click(lambda: None, None, chatbot, queue=False) + + def save_intent_wrapper(input_code, explained): + gr.Info("Saving intent") + similar_code_helper.save_intent(input_code, explained) + gr.Info("Intent saved") + + submit.click(save_intent_wrapper, inputs=[translation_input_code, explained]) ################################################################################ - #### SIMILAR CODE PANE + #### EXECUTE JOB TAB ################################################################################ - # divider subheader + with gr.Tab(label="Execute Job", visible=False) as batch_output_tab: + gr.Markdown( + """ ## Execute Job + + This tab is for executing the job to covert the code files in the Unity Catalog Volume to Databricks + Notebooks. Once you are happy with your system prompts and and the explanation and translation outputs, + click the execute button below. + + This will kick off a Workflow which will ingest the code files, write them to a Delta Table, apply the AI + agents, and output a Databricks Notebook per input code file. This notebook will have the intent at the top + of the notebook in a markdown cell, and the translated code in the cell below. These notebooks are found in + the workspace at *{WORKSPACE_LOCATION}/outputNotebooks* and in the *Output Code* folder in the UC Volume + + The intent will also be stored in a Unity Catalog table and vector search index for finding similar code. + """ + ) + execute = gr.Button( + value="EXECUTE CODE TRANSFORMATION", + size="lg", + ) + run_status = gr.Markdown(label="Job Status Page", visible=False) + + def exectute_workflow( + intent_prompt, + intent_temperature, + intent_max_tokens, + translation_prompt, + translation_temperature, + translation_max_tokens, + ): + gr.Info("Beginning code transformation workflow") + agent_config_payload = [ + [ + { + "translation_agent": { + "system_prompt": translation_prompt, + "endpoint": FOUNDATION_MODEL_NAME, + "max_tokens": translation_max_tokens, + "temperature": translation_temperature, + } + } + ], + [ + { + "explanation_agent": { + "system_prompt": intent_prompt, + "endpoint": FOUNDATION_MODEL_NAME, + "max_tokens": intent_max_tokens, + "temperature": intent_temperature, + } + } + ], + ] + + app_config_payload = { + "VOLUME_NAME_OUTPUT_PATH": os.environ.get("VOLUME_NAME_OUTPUT_PATH"), + "VOLUME_NAME_INPUT_PATH": os.environ.get("VOLUME_NAME_INPUT_PATH"), + "VOLUME_NAME_CHECKPOINT_PATH": os.environ.get( + "VOLUME_NAME_CHECKPOINT_PATH" + ), + "CATALOG": os.environ.get("CATALOG"), + "SCHEMA": os.environ.get("SCHEMA"), + "DATABRICKS_HOST": DATABRICKS_HOST, + "DATABRICKS_TOKEN_SECRET_SCOPE": os.environ.get( + "DATABRICKS_TOKEN_SECRET_SCOPE" + ), + "DATABRICKS_TOKEN_SECRET_KEY": os.environ.get( + "DATABRICKS_TOKEN_SECRET_KEY" + ), + "CODE_INTENT_TABLE_NAME": os.environ.get("CODE_INTENT_TABLE_NAME"), + "WORKSPACE_LOCATION": WORKSPACE_LOCATION, + } + + app_configs = json.dumps(app_config_payload) + agent_configs = json.dumps(agent_config_payload) + + response = w.jobs.run_now( + job_id=int(TRANSFORMATION_JOB_ID), + job_parameters={ + "agent_configs": agent_configs, + "app_configs": app_configs, + }, + ) + run_id = response.run_id + + job_url = f"{DATABRICKS_HOST}/jobs/{TRANSFORMATION_JOB_ID}" + textbox_message = ( + f"Job run initiated. Click [here]({job_url}) to view the job status. " + f"You just executed the run with run_id: {run_id}\n" + f"Output notebooks will be written to the Workspace for immediate use at *{WORKSPACE_LOCATION}/outputNotebooks*" + f", and also in the *Output Code* folder in the UC Volume [here]({DATABRICKS_HOST}/explore/data/volumes/{CATALOG}/{SCHEMA}/{VOLUME_NAME})" + ) + return textbox_message + + def make_status_box_visible(): + return gr.Markdown(label="Job Run Status Page", visible=True) + + execute.click(fn=make_status_box_visible, outputs=run_status) + execute.click( + exectute_workflow, + inputs=[ + intent_system_prompt, + intent_temperature, + intent_max_tokens, + translation_system_prompt, + translation_temperature, + translation_max_tokens, + ], + outputs=run_status, + ) - with gr.Accordion(label="Similar Code Pane", open=True): + with gr.Tab(label="Write file to Workspace") as interactive_output_tab: gr.Markdown( - """ ## Similar code - - This code is thought to be similar to what you are doing, based on comparing the intent of your code with the intent of this code. - """ + f""" ## Write to Workspace + + Write out your explained and translated file to a notebook in the workspace. + You must provide a filename for the notebook. The notebook will be written to the workspace, saved to the + Output Code location in the Unity Catalog Volume [here]({DATABRICKS_HOST}/explore/data/volumes/{CATALOG}/{SCHEMA}/{VOLUME_NAME}) + , and the intent will be saved to the intent table. + """ ) - # a button - find_similar_code = gr.Button("Find similar code") - # a row with an code and text box to show the similar code + template = """ +-- Databricks notebook source +-- MAGIC %md +-- MAGIC # This notebook was AI generated. AI can make mistakes. This is provided as a tool to accelerate your migration. +-- MAGIC +-- MAGIC ### AI Generated Intent +-- MAGIC +-- MAGIC INTENT_GOES_HERE + +-- COMMAND ---------- + +TRANSLATED_CODE_GOES_HERE + """ with gr.Row(): - similar_code = gr.Code( - label="Similar code to yours.", language="sql-sparkSQL" + produce_preview_button = gr.Button("Produce Preview") + with gr.Column(): + file_name = gr.Textbox(label="Filename for the notebook") + write_to_workspace_button = gr.Button("Write to Workspace") + adhoc_write_output = gr.Markdown(label="Notebook output location") + + def produce_preview(explanation, translated_code): + preview_code = template.replace("INTENT_GOES_HERE", explanation).replace( + "TRANSLATED_CODE_GOES_HERE", translated_code + ) + return preview_code + + def write_adhoc_to_workspace(file_name, preview): + + if len(file_name) == 0: + raise gr.Error("Please provide a filename") + + notebook_path_root = f"{WORKSPACE_LOCATION}/outputNotebooks/{str(datetime.datetime.now()).replace(':', '_')}" + notebook_path = f"{notebook_path_root}/{file_name}" + content = preview + w.workspace.mkdirs(notebook_path_root) + w.workspace.import_( + content=base64.b64encode(content.encode("utf-8")).decode("utf-8"), + path=notebook_path, + format=ImportFormat.SOURCE, + language=Language.SQL, + overwrite=True, ) - similar_intent = gr.Textbox(label="The similar codes intent.") + _ = w.workspace.get_status(notebook_path) + id = _.object_id + url = f"{w.config.host}/#notebook/{id}" + output_message = f"Notebook {file_name} written to Databricks [here]({url})" + return output_message + + preview = gr.Code(label="Preview", language="python") + produce_preview_button.click( + produce_preview, inputs=[explained, translated], outputs=preview + ) - # a button - submit = gr.Button("Save code and intent") + # write file to notebook + write_to_workspace_button.click( + fn=write_adhoc_to_workspace, + inputs=[file_name, preview], + outputs=adhoc_write_output, + ) - # assign actions to buttons when clicked. - find_similar_code.click( - fn=similar_code_helper.get_similar_code, - inputs=chatbot, - outputs=[similar_code, similar_intent], - ) + # this handles the code loading for batch mode + # read the selected code file and put it into the other panes + for output in [ + selected_file, + translation_input_code, + intent_input_code, + similar_code_input, + ]: + select_code_file.select( + fn=read_code_file, inputs=[volume_path, select_code_file], outputs=output + ) - def save_intent_wrapper(input_code, explained): - gr.Info("Saving intent") - similar_code_helper.save_intent(input_code, explained) - gr.Info("Intent saved") + # this handles the code loading for interative mode + for output in [ + translation_input_code, + intent_input_code, + similar_code_input, + ]: + interactive_code_button.click( + fn=lambda x: gr.update(value=x), inputs=interactive_code, outputs=output + ) - submit.click(save_intent_wrapper, inputs=[input_code, explained]) + # change the input tabs based on the operation mode + operation.change( + lambda x: ( + gr.update(visible=False) + if x == "Interactive mode" + else gr.update(visible=True) + ), + operation, + batch_input_code_tab, + ) + operation.change( + lambda x: ( + gr.update(visible=True) + if x == "Interactive mode" + else gr.update(visible=False) + ), + operation, + interactive_input_code_tab, + ) + # change the output tabs based on the operation mode + operation.change( + lambda x: ( + gr.update(visible=False) + if x == "Interactive mode" + else gr.update(visible=True) + ), + operation, + batch_output_tab, + ) + operation.change( + lambda x: ( + gr.update(visible=True) + if x == "Interactive mode" + else gr.update(visible=False) + ), + operation, + interactive_output_tab, + ) # for local dev try: diff --git a/sql_migration_assistant/gradio_app_backup.py b/sql_migration_assistant/gradio_app_backup.py new file mode 100644 index 00000000..10407adb --- /dev/null +++ b/sql_migration_assistant/gradio_app_backup.py @@ -0,0 +1,298 @@ +import os +from databricks.labs.lsql.core import StatementExecutionExt +from databricks.sdk import WorkspaceClient +import gradio as gr + +from app.llm import LLMCalls +from app.similar_code import SimilarCode +import logging # For printing translation attempts in console (debugging) + +# Setting up logger +logging.basicConfig +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + + +# # personal access token necessary for authenticating API requests. Stored using a secret + +FOUNDATION_MODEL_NAME = os.environ.get("SERVED_FOUNDATION_MODEL_NAME") +MAX_TOKENS = os.environ.get("MAX_TOKENS") +SQL_WAREHOUSE_ID = os.environ.get("DATABRICKS_WAREHOUSE_ID") +VECTOR_SEARCH_ENDPOINT_NAME = os.environ.get("VECTOR_SEARCH_ENDPOINT_NAME") +VS_INDEX_NAME = os.environ.get("VS_INDEX_NAME") +CODE_INTENT_TABLE_NAME = os.environ.get("CODE_INTENT_TABLE_NAME") +CATALOG = os.environ.get("CATALOG") +SCHEMA = os.environ.get("SCHEMA") + +w = WorkspaceClient(product="sql_migration_assistant", product_version="0.0.1") + +see = StatementExecutionExt(w, warehouse_id=SQL_WAREHOUSE_ID) +translation_llm = LLMCalls( + foundation_llm_name=FOUNDATION_MODEL_NAME, max_tokens=MAX_TOKENS +) +intent_llm = LLMCalls(foundation_llm_name=FOUNDATION_MODEL_NAME, max_tokens=MAX_TOKENS) +similar_code_helper = SimilarCode( + workspace_client=w, + see=see, + catalog=CATALOG, + schema=SCHEMA, + code_intent_table_name=CODE_INTENT_TABLE_NAME, + VS_index_name=VS_INDEX_NAME, + VS_endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME, +) + +################################################################################ +################################################################################ + +# this is the app UI. it uses gradio blocks https://www.gradio.app/docs/gradio/blocks +# each gr.{} call adds a new element to UI, top to bottom. +with gr.Blocks(theme=gr.themes.Soft()) as demo: + # title with Databricks image + gr.Markdown( + """logo + +## A migration assistant for explaining the intent of SQL code and conversion to Spark SQL + +#### This demo relies on the tables and columns referenced in the SQL query being present in Unity Catalogue and having their table comments and column comments populated. For the purpose of the demo, this was generated using the Databricks AI Generated Comments tool. + +""" + ) + + ################################################################################ + #### TRANSLATION ADVANCED OPTIONS PANE + ################################################################################ + with gr.Accordion(label="Translation Advanced Settings", open=False): + with gr.Row(): + transation_system_prompt = gr.Textbox( + label="Instructions for the LLM translation tool.", + value=""" + You are an expert in multiple SQL dialects. You only reply with SQL code and with no other text. + Your purpose is to translate the given SQL query to Databricks Spark SQL. + You must follow these rules: + - You must keep all original catalog, schema, table, and field names. + - Convert all dates to dd-MMM-yyyy format using the date_format() function. + - Subqueries must end with a semicolon. + - Ensure queries do not have # or @ symbols. + - ONLY if the original query uses temporary tables (e.g. "INTO #temptable"), re-write these as either CREATE OR REPLACE TEMPORARY VIEW or CTEs. . + - Square brackets must be replaced with backticks. + - Custom field names should be surrounded by backticks. + - Ensure queries do not have # or @ symbols. + - Only if the original query contains DECLARE and SET statements, re-write them according to the following format: + DECLARE VARIABLE variable TYPE DEFAULT value; For example: DECLARE VARIABLE number INT DEFAULT 9; + SET VAR variable = value; For example: SET VAR number = 9; + + Write an initial draft of the translated query. Then double check the output for common mistakes, including: + - Using NOT IN with NULL values + - Using UNION when UNION ALL should have been used + - Using BETWEEN for exclusive ranges + - Data type mismatch in predicates + - Properly quoting identifiers + - Using the correct number of arguments for functions + - Casting to the correct data type + - Using the proper columns for joins + + Return the final translated query only. Include comments. Include only SQL. + """.strip(), + lines=40, + ) + + ################################################################################ + #### TRANSLATION PANE + ################################################################################ + # subheader + + with gr.Accordion(label="Translation Pane", open=True): + gr.Markdown( + """ ### Input your T-SQL code here for automatic translation to Spark-SQL and use AI to generate a statement of intent for the code's purpose.""" + ) + # hidden chat interface - to enable chatbot functionality + translation_chat = gr.Chatbot(visible=False) + with gr.Row(): + with gr.Column(): + gr.Markdown( + """ ### Input your T-SQL code here for translation to Spark-SQL.""" + ) + + # input box for SQL code with nice formatting + input_code = gr.Code( + label="Input SQL", + language="sql-msSQL", + value="""SELECT + c.[country_name], + AVG([dep_count]) AS average_dependents +FROM + ( + SELECT + e.[employee_id] + ,e.[department_id] + ,COUNT(d.[dependent_id]) AS dep_count + FROM + [robert_whiffin].[code_assistant].[employees] e + LEFT JOIN [robert_whiffin].[code_assistant].[dependents] d ON e.[employee_id] = d.[employee_id] + GROUP BY + e.[employee_id] + ,e.[department_id] + ) AS subquery + JOIN [robert_whiffin].[code_assistant].[departments] dep ON subquery.[department_id] = dep.[department_id] + JOIN [robert_whiffin].[code_assistant].[locations] l ON dep.[location_id] = l.[location_id] + JOIN [robert_whiffin].[code_assistant].[countries] c ON l.[country_id] = c.[country_id] +GROUP BY + c.[country_name] +ORDER BY + c.[country_name]""", + ) + # a button labelled translate + translate_button = gr.Button("Translate") + + with gr.Column(): + # divider subheader + gr.Markdown(""" ### Your Code Translated to Spark-SQL""") + # output box of the T-SQL translated to Spark SQL + translated = gr.Code( + label="Your code translated to Spark SQL", language="sql-sparkSQL" + ) + translation_prompt = gr.Textbox(label="Adjustments for translation") + + def translate_respond(system_prompt, message, chat_history): + bot_message = translation_llm.llm_chat(system_prompt, message, chat_history) + chat_history.append([message, bot_message]) + return chat_history, chat_history[-1][1] + + # helper function to take the output from llm_translate and return outputs for chatbox and textbox + # chatbox input is a list of lists, each list is a message from the user and the response from the LLM + # textbox input is a string + def llm_translate_wrapper(system_prompt, input_code): + # call the LLM to translate the code + translated_code = translation_llm.llm_translate(system_prompt, input_code) + # wrap the translated code in a list of lists for the chatbot + chat_history = [[input_code, translated_code]] + return chat_history, translated_code + + # reset hidden chat history and prompt + translate_button.click( + fn=lambda: ([["", ""]], ""), + inputs=None, + outputs=[translation_chat, translation_prompt], + ) + # do translation + translate_button.click( + fn=llm_translate_wrapper, + inputs=[transation_system_prompt, input_code], + outputs=[translation_chat, translated], + ) + # refine translation + translation_prompt.submit( + fn=translate_respond, + inputs=[transation_system_prompt, translation_prompt, translation_chat], + outputs=[translation_chat, translated], + ) + + ################################################################################ + #### AI GENERATED INTENT PANE + ################################################################################ + # divider subheader + with gr.Accordion(label="Advanced Intent Settings", open=False): + gr.Markdown( + """ ### Advanced settings for the generating the intent of the input code.""" + ) + with gr.Row(): + intent_system_prompt = gr.Textbox( + label="System prompt of the LLM to generate the intent. Editing will reset the intent.", + value="""Your job is to explain intent of the provided SQL code. + """.strip(), + ) + with gr.Accordion(label="Intent Pane", open=True): + gr.Markdown( + """ ## AI generated intent of what your code aims to do. + + Intent is determined by an LLM which uses the code and table & column metadata. + + ***If the intent is incorrect, please edit***. Once you are happy that the description is correct, please click the button below to save the intent. + + """ + ) + # a box to give the LLM generated intent of the code. This is editable as well. + explain_button = gr.Button("Explain code intent using AI.") + explained = gr.Textbox(label="AI generated intent of your code.", visible=False) + + chatbot = gr.Chatbot(label="AI Chatbot for Intent Extraction", height="70%") + + msg = gr.Textbox(label="Instruction") + clear = gr.ClearButton([msg, chatbot]) + + def intent_respond(system_prompt, message, chat_history): + bot_message = intent_llm.llm_chat(system_prompt, message, chat_history) + chat_history.append([message, bot_message]) + return chat_history, "", bot_message + + def llm_chat_wrapper(system_prompt, input_code): + # call the LLM to translate the code + intent = intent_llm.llm_intent(system_prompt, input_code) + # wrap the translated code in a list of lists for the chatbot + chat_history = [[input_code, intent]] + return chat_history, "", intent + + explain_button.click( + fn=llm_chat_wrapper, + inputs=[intent_system_prompt, input_code], + outputs=[chatbot, msg, explained], + ) + msg.submit( + fn=intent_respond, + inputs=[intent_system_prompt, msg, chatbot], + outputs=[chatbot, msg, explained], + ) + clear.click(lambda: None, None, chatbot, queue=False) + + ################################################################################ + #### SIMILAR CODE PANE + ################################################################################ + # divider subheader + + with gr.Accordion(label="Similar Code Pane", open=True): + gr.Markdown( + """ ## Similar code + + This code is thought to be similar to what you are doing, based on comparing the intent of your code with the intent of this code. + """ + ) + # a button + find_similar_code = gr.Button("Find similar code") + # a row with an code and text box to show the similar code + with gr.Row(): + similar_code = gr.Code( + label="Similar code to yours.", language="sql-sparkSQL" + ) + similar_intent = gr.Textbox(label="The similar codes intent.") + + # a button + submit = gr.Button("Save code and intent") + + # assign actions to buttons when clicked. + find_similar_code.click( + fn=similar_code_helper.get_similar_code, + inputs=chatbot, + outputs=[similar_code, similar_intent], + ) + + def save_intent_wrapper(input_code, explained): + gr.Info("Saving intent") + similar_code_helper.save_intent(input_code, explained) + gr.Info("Intent saved") + + submit.click(save_intent_wrapper, inputs=[input_code, explained]) + + +# for local dev +try: + if os.environ["LOCALE"] == "local_dev": + demo.queue().launch() +except KeyError: + pass + +# this is necessary to get the app to run on databricks +if __name__ == "__main__": + demo.queue().launch( + server_name=os.getenv("GRADIO_SERVER_NAME"), + server_port=int(os.getenv("GRADIO_SERVER_PORT")), + ) diff --git a/sql_migration_assistant/infra/app_serving_cluster_infra.py b/sql_migration_assistant/infra/app_serving_cluster_infra.py index e6992825..60d18ea5 100644 --- a/sql_migration_assistant/infra/app_serving_cluster_infra.py +++ b/sql_migration_assistant/infra/app_serving_cluster_infra.py @@ -14,7 +14,6 @@ def __init__(self, config, workspace_client: WorkspaceClient, p: Prompts): self.node_types = { "azure": "Standard_DS3_v2", "aws": "m5d.xlarge", - "gcp": "n1-standard-4", } self.cloud = self._get_cloud() self.cluster_name = "sql_migration_assistant_review_app_cluster" @@ -60,7 +59,7 @@ def _create_cluster(self): spark_version=self.spark_version, autotermination_minutes=120, cluster_name=self.cluster_name, - data_security_mode=DataSecurityMode.NONE, + data_security_mode=DataSecurityMode.SINGLE_USER, spark_conf={ "spark.databricks.cluster.profile": "singleNode", "spark.master": "local[*]", diff --git a/sql_migration_assistant/infra/chat_infra.py b/sql_migration_assistant/infra/chat_infra.py index c6e04d2e..0b8f29f1 100644 --- a/sql_migration_assistant/infra/chat_infra.py +++ b/sql_migration_assistant/infra/chat_infra.py @@ -47,7 +47,7 @@ def setup_foundation_model_infra(self): if self._pay_per_token_exists(): question = ( "Would you like to use an existing pay per token endpoint? This is recommended for quick testing. " - "The alternative is to create a Provisioned Throughput endpoint, which enables monitoring of " + "The alternative is to create or use a Provisioned Throughput endpoint, which enables monitoring of " "the requests and responses made to the LLM via inference tables. (y/n)" ) choice = self.prompts.question( @@ -59,21 +59,49 @@ def setup_foundation_model_infra(self): self.foundation_llm_name = choice self.config["SERVED_FOUNDATION_MODEL_NAME"] = self.foundation_llm_name return - # create a provisioned throughput endpoint - question = "Choose a foundation model from the system.ai schema to deploy:" - system_models = self._list_models_from_system_ai() - choice = self.prompts.choice(question, system_models) - self.foundation_llm_name = choice - logging.info( - f"Deploying provisioned throughput endpoint {self.provisioned_throughput_endpoint_name} serving" - f" {self.foundation_llm_name}. This may take a few minutes." - ) - self._create_provisioned_throughput_endpoint(self.foundation_llm_name) - # update config with user choice - self.config["SERVED_FOUNDATION_MODEL_NAME"] = self.foundation_llm_name - self.config["PROVISIONED_THROUGHPUT_ENDPOINT_NAME"] = ( - self.provisioned_throughput_endpoint_name - ) + else: + question = "Would you like to use an existing endpoint? (y/n)" + choice = self.prompts.question( + question, validate=lambda x: x.lower() in ["y", "n"] + ) + if choice.lower() == "y": + # get endpoints, filter out pay per token models, present to user + endpoints = self.w.serving_endpoints.list() + endpoint_names = [ep.name for ep in endpoints] + endpoint_names = filter( + lambda x: x not in self.pay_per_token_models, endpoint_names + ) + question = "Please choose an endpoint to use:" + choice = self.prompts.choice(question, endpoint_names) + self.foundation_llm_name = choice + self.config["SERVED_FOUNDATION_MODEL_NAME"] = ( + self.foundation_llm_name + ) + self.provisioned_throughput_endpoint_name = ( + "migration_assistant_endpoint" + ) + else: + # create a provisioned throughput endpoint + question = ( + "Choose a foundation model from the system.ai schema to deploy:" + ) + system_models = self._list_models_from_system_ai() + choice = self.prompts.choice(question, system_models) + self.foundation_llm_name = choice + logging.info( + f"Deploying provisioned throughput endpoint {self.provisioned_throughput_endpoint_name} serving" + f" {self.foundation_llm_name}. This may take a few minutes." + ) + self._create_provisioned_throughput_endpoint( + self.foundation_llm_name + ) + # update config with user choice + self.config["SERVED_FOUNDATION_MODEL_NAME"] = ( + self.foundation_llm_name + ) + self.config["PROVISIONED_THROUGHPUT_ENDPOINT_NAME"] = ( + self.provisioned_throughput_endpoint_name + ) def _pay_per_token_exists(self): """ @@ -81,7 +109,9 @@ def _pay_per_token_exists(self): """ endpoints = self.w.serving_endpoints.list() endpoint_names = set([ep.name for ep in endpoints]) - pay_per_token_exists = max([x in endpoint_names for x in self.pay_per_token_models]) + pay_per_token_exists = max( + [x in endpoint_names for x in self.pay_per_token_models] + ) return pay_per_token_exists def _create_provisioned_throughput_endpoint(self, model_name): diff --git a/sql_migration_assistant/infra/jobs_infra.py b/sql_migration_assistant/infra/jobs_infra.py new file mode 100644 index 00000000..6ec43968 --- /dev/null +++ b/sql_migration_assistant/infra/jobs_infra.py @@ -0,0 +1,116 @@ +from databricks.sdk import WorkspaceClient +from databricks.sdk.errors.platform import BadRequest +from databricks.labs.blueprint.tui import Prompts +from databricks.labs.lsql.core import StatementExecutionExt +from databricks.sdk.service.compute import DataSecurityMode +from databricks.sdk.service.jobs import ( + Task, + NotebookTask, + TaskDependency, + ForEachTask, + JobCluster, + JobParameterDefinition, +) +from databricks.sdk.service import jobs, compute +import os + +""" +Approach + +User first sets all configuration options +validate options +validate user permissions +then create infra +upload app file to databricks + +""" + + +class JobsInfra: + def __init__( + self, + config, + workspace_client: WorkspaceClient, + ): + self.w = workspace_client + self.config = config + + self.spark_version = "15.4.x-scala2.12" + self.node_types = { + "azure": "Standard_DS3_v2", + "aws": "m5d.xlarge", + } + self.cloud = self._get_cloud() + self.job_clusters = [ + JobCluster( + job_cluster_key="sql_migration_job_cluster", + new_cluster=compute.ClusterSpec( + spark_version=self.spark_version, + data_security_mode=DataSecurityMode.SINGLE_USER, + # spark_conf = { + # "spark.databricks.cluster.profile": "singleNode", + # "spark.master": "local[*]", + # }, + num_workers=1, + node_type_id=self.node_types[self.cloud], + ), + ) + ] + + self.job_name = "sql_migration_code_transformation" + self.notebook_root_path = f"/Workspace/Users/{self.w.current_user.me().user_name}/.sql_migration_assistant/jobs/" + self.job_parameters = [ + JobParameterDefinition("agent_configs", ""), + JobParameterDefinition("app_configs", ""), + ] + self.job_tasks = [ + Task( + task_key="ingest_to_holding", + notebook_task=NotebookTask( + notebook_path=self.notebook_root_path + "bronze_to_silver" + ), + disable_auto_optimization=True, + ), + Task( + task_key="call_agents", + for_each_task=ForEachTask( + inputs="{{tasks.ingest_to_holding.values.new_record_ids}}", + task=Task( + task_key="call_agent", + notebook_task=NotebookTask( + notebook_path=self.notebook_root_path + "call_agents", + base_parameters={"record_id": "{{input}}"}, + ), + job_cluster_key="sql_migration_job_cluster", + ), + concurrency=8, + ), + depends_on=[TaskDependency(task_key="ingest_to_holding")], + ), + Task( + task_key="silver_to_gold", + notebook_task=NotebookTask( + notebook_path=self.notebook_root_path + "silver_to_gold" + ), + depends_on=[TaskDependency(task_key="call_agents")], + disable_auto_optimization=True, + ), + ] + + def create_transformation_job(self): + job_id = self.w.jobs.create( + name=self.job_name, + tasks=self.job_tasks, + job_clusters=self.job_clusters, + parameters=self.job_parameters, + ) + self.config["TRANSFORMATION_JOB_ID"] = job_id.job_id + + def _get_cloud(self): + host = self.w.config.host + if "https://adb" in host: + return "azure" + elif ".gcp.databricks" in host: + return "gcp" + else: + return "aws" diff --git a/sql_migration_assistant/infra/secrets_infra.py b/sql_migration_assistant/infra/secrets_infra.py index defef76d..bb49b0af 100644 --- a/sql_migration_assistant/infra/secrets_infra.py +++ b/sql_migration_assistant/infra/secrets_infra.py @@ -17,8 +17,12 @@ def create_secret_PAT(self): print("Creating a Databricks PAT for the SQL Migration Assistant") scopes = self.w.secrets.list_scopes() if scopes == []: - logging.info("No secret scopes found. Please create a secret scope before proceeding.") - print("No secret scopes found. Please create a secret scope before proceeding.") + logging.info( + "No secret scopes found. Please create a secret scope before proceeding." + ) + print( + "No secret scopes found. Please create a secret scope before proceeding." + ) question = "Enter secret scope name:" scope_name = self.prompts.question(question) self.w.secrets.create_scope(scope_name) diff --git a/sql_migration_assistant/infra/unity_catalog_infra.py b/sql_migration_assistant/infra/unity_catalog_infra.py index 8b6d219b..520e4f69 100644 --- a/sql_migration_assistant/infra/unity_catalog_infra.py +++ b/sql_migration_assistant/infra/unity_catalog_infra.py @@ -1,9 +1,12 @@ +import logging + from databricks.sdk import WorkspaceClient from databricks.sdk.errors.platform import BadRequest from databricks.labs.blueprint.tui import Prompts from databricks.labs.lsql.core import StatementExecutionExt -import logging -import time +from databricks.sdk.service.catalog import VolumeType +from databricks.sdk.errors import PermissionDenied +import os """ Approach @@ -30,20 +33,22 @@ def __init__( self.prompts = p self.see = see - # get defaults from config file - self.default_UC_catalog = "sql_migration_assistant" - self.default_UC_schema = "sql_migration_assistant" - # these are updated as the user makes a choice about which UC catalog and schema to use. # the chosen values are then written back into the config file. self.migration_assistant_UC_catalog = None - self.migration_assistant_UC_schema = None + self.migration_assistant_UC_schema = "sql_migration_assistant" # user cannot change these values self.code_intent_table_name = "sql_migration_assistant_code_intent_table" + self.volume_name = "sql_migration_assistant_volume" + self.volume_dirs = { + "checkpoint": "code_ingestion_checkpoints", + "input": "input_code", + "output": "output_code", + } self.warehouseID = self.config.get("DATABRICKS_WAREHOUSE_ID") - # add code intent table name to config + # add values to config self.config["CODE_INTENT_TABLE_NAME"] = self.code_intent_table_name def choose_UC_catalog(self): @@ -63,37 +68,19 @@ def choose_UC_catalog(self): # update config with user choice self.config["CATALOG"] = self.migration_assistant_UC_catalog - def choose_schema_name(self): + def create_schema(self): - use_default_schema_name = self.prompts.confirm( - f"Would you like to use the default schema name: {self.default_UC_schema}? (yes/no)" - ) - if use_default_schema_name: - self.migration_assistant_UC_schema = self.default_UC_schema - else: - # Ask the user to enter a schema name, and validate it. - name_invalid = True - while name_invalid: - # Name cannot include period, space, or forward-slash - schema_name = self.prompts.question("Enter the schema name: ") - if ( - "." not in schema_name - and " " not in schema_name - and "/" not in schema_name - ): - self.migration_assistant_UC_schema = schema_name - name_invalid = False - else: - print("Schema name cannot include period, space, or forward-slash.") # update config with user choice self.config["SCHEMA"] = self.migration_assistant_UC_schema try: self._create_UC_schema() + self._create_UC_volume(self.migration_assistant_UC_schema) except BadRequest as e: if "already exists" in str(e): print( f"Schema already exists. Using existing schema {self.migration_assistant_UC_schema}." ) + self._create_UC_volume(self.migration_assistant_UC_schema) def _create_UC_catalog(self): """Create a new Unity Catalog.""" @@ -110,18 +97,41 @@ def _create_UC_schema(self): comment="Schema for storing assets related to the SQL migration assistant.", ) - def create_code_intent_table(self): + def _create_UC_volume(self, schema): + try: + self.w.volumes.create( + name=self.volume_name, + catalog_name=self.migration_assistant_UC_catalog, + schema_name=schema, + comment="Volume for storing assets related to the SQL migration assistant.", + volume_type=VolumeType.MANAGED, + ) + for key in self.volume_dirs.keys(): + dir_ = self.volume_dirs[key] + volume_path = f"/Volumes/{self.migration_assistant_UC_catalog}/{schema}/{self.volume_name}/{dir_}" + self.w.dbutils.fs.mkdirs(volume_path) + self.config[f"VOLUME_NAME_{key.upper()}_PATH"] = volume_path + self.config["VOLUME_NAME"] = self.volume_name + except PermissionDenied: + print( + "You do not have permission to create a volume. A volume will not be created. You will need to create a " + "volume to run the batch code transformation process." + ) + logging.error( + "You do not have permission to create a volume. A volume will not be created. You will need to create a " + "volume to run the batch code transformation process." + ) + + def create_tables(self): """Create a new table to store code intent data.""" table_name = self.code_intent_table_name _ = self.see.execute( - statement= - f"CREATE TABLE IF NOT EXISTS " + statement=f"CREATE TABLE IF NOT EXISTS " f"`{table_name}`" f" (id BIGINT, code STRING, intent STRING) " f"TBLPROPERTIES (delta.enableChangeDataFeed = true)", catalog=self.migration_assistant_UC_catalog, - schema=self.migration_assistant_UC_schema + schema=self.migration_assistant_UC_schema, ) - diff --git a/sql_migration_assistant/infra/vector_search_infra.py b/sql_migration_assistant/infra/vector_search_infra.py index b293a6a8..4ef8e7a0 100644 --- a/sql_migration_assistant/infra/vector_search_infra.py +++ b/sql_migration_assistant/infra/vector_search_infra.py @@ -15,6 +15,7 @@ from sql_migration_assistant.utils.uc_model_version import get_latest_model_version import time + class VectorSearchInfra: def __init__(self, config, workspace_client: WorkspaceClient, p: Prompts): self.w = workspace_client @@ -77,7 +78,9 @@ def choose_VS_endpoint(self): ) self._create_VS_endpoint() else: - choice = choice.split(" ")[0] #need to remove the (num indices) part of the string + choice = choice.split(" ")[ + 0 + ] # need to remove the (num indices) part of the string self.migration_assistant_VS_endpoint = choice # update config with user choice self.config["VECTOR_SEARCH_ENDPOINT_NAME"] = ( @@ -173,9 +176,16 @@ def create_VS_index(self): f"Index {self.migration_assistant_VS_index} already exists. Using existing index." ) except NotFound as e: - if f"Vector search endpoint {self.migration_assistant_VS_endpoint} not found" in str(e): - logging.info(f"Waiting for Vector Search endpoint to provision. Retrying in 30 seconds.") - print(f"Waiting for Vector Search endpoint to provision. Retrying in 30 seconds.") + if ( + f"Vector search endpoint {self.migration_assistant_VS_endpoint} not found" + in str(e) + ): + logging.info( + f"Waiting for Vector Search endpoint to provision. Retrying in 30 seconds." + ) + print( + f"Waiting for Vector Search endpoint to provision. Retrying in 30 seconds." + ) time.sleep(30) self.create_VS_index() else: diff --git a/sql_migration_assistant/jobs/__init__.py b/sql_migration_assistant/jobs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sql_migration_assistant/jobs/bronze_to_silver.py b/sql_migration_assistant/jobs/bronze_to_silver.py new file mode 100644 index 00000000..d8d97191 --- /dev/null +++ b/sql_migration_assistant/jobs/bronze_to_silver.py @@ -0,0 +1,209 @@ +# Databricks notebook source +# DBTITLE 1,get params +import json +from pyspark.sql.types import ( + ArrayType, + StructType, + StructField, + StringType, + MapType, + IntegerType, + TimestampType, +) +import pyspark.sql.functions as f +from pyspark.sql.functions import udf, pandas_udf + +agent_configs = json.loads(dbutils.widgets.get("agent_configs")) +app_configs = json.loads(dbutils.widgets.get("app_configs")) + + +# COMMAND ---------- + +checkpoint_dir = app_configs["VOLUME_NAME_CHECKPOINT_PATH"] +volume_path = app_configs["VOLUME_NAME_INPUT_PATH"] + + +# COMMAND ---------- + +bronze_raw_code = f'{app_configs["CATALOG"]}.{app_configs["SCHEMA"]}.bronze_raw_code' +spark.sql( + f""" + CREATE TABLE IF NOT EXISTS {bronze_raw_code} ( + path STRING, + modificationTime TIMESTAMP, + length INT, + content STRING, + --content BINARY, + loadDatetime TIMESTAMP + ) + """ +) + +bronze_prompt_config = ( + f'{app_configs["CATALOG"]}.{app_configs["SCHEMA"]}.bronze_prompt_config' +) +spark.sql( + f""" + CREATE TABLE IF NOT EXISTS {bronze_prompt_config} ( + promptID INT, + agentConfigs MAP >, + loadDatetime TIMESTAMP + ) + """ +) + +bronze_holding_table = ( + f'{app_configs["CATALOG"]}.{app_configs["SCHEMA"]}.bronze_holding_table' +) +spark.sql( + f""" + CREATE TABLE IF NOT EXISTS {bronze_holding_table} ( + id LONG, + path STRING, + modificationTime TIMESTAMP, + length INT, + content STRING, + loadDatetime TIMESTAMP, + promptID INT, + agentConfigs MAP > + ) + """ +) + + +silver_llm_responses = ( + f'{app_configs["CATALOG"]}.{app_configs["SCHEMA"]}.silver_llm_responses' +) +spark.sql( + f""" + CREATE TABLE IF NOT EXISTS {silver_llm_responses} ( + path STRING, + promptID INT, + loadDatetime TIMESTAMP, + content STRING, + agentName STRING, + agentResponse STRING + ) + """ +) + + +gold_table = ( + f'{app_configs["CATALOG"]}.{app_configs["SCHEMA"]}.gold_transformed_notebooks' +) +spark.sql( + f""" + CREATE TABLE IF NOT EXISTS {gold_table} ( + promptID INT, + content STRING, + loadDatetime TIMESTAMP, + notebookAsString STRING, + outputVolumePath STRING, + outputNotebookPath STRING + ) + """ +) + + +# COMMAND ---------- + +# DBTITLE 1,convert agent_configs input string to a dataframe +import pyspark.sql.functions as f + +schema = StructType( + [ + StructField( + "agentConfigs", + MapType(StringType(), MapType(StringType(), StringType())), + True, + ) + ] +) +agent_configs_df = ( + spark.createDataFrame(agent_configs, schema) + .withColumn("loadDatetime", f.current_timestamp()) + .withColumn("promptID", f.hash(f.col("loadDatetime").cast("STRING"))) + .select(f.col("promptID"), f.col("agentConfigs"), f.col("loadDatetime")) +) +# display(agent_configs_df) + +agent_configs_df.createOrReplaceTempView("temp_configs") +spark.sql(f"INSERT INTO {bronze_prompt_config} TABLE temp_configs") +spark.sql(f"select * from {bronze_prompt_config}").display() + +# COMMAND ---------- + +# DBTITLE 1,load code files + +raw_stream = ( + spark.readStream.format("cloudFiles") + .option("cloudFiles.format", "text") + .option("wholetext", True) + .load(volume_path) + .withColumn("loadDatetime", f.current_timestamp()) + .withColumn( + "modificationTime", f.current_timestamp() + ) # I'm not sure how to populate this + .withColumn("path", f.col("_metadata.file_name")) + .withColumnRenamed("value", "content") + .withColumn("length", f.length(f.col("content"))) + # .withColumn("content", f.to_binary(f.col("content"))) + .select( + f.col("path"), + f.col("modificationTime"), + f.col("length"), + f.col("content"), + f.col("loadDatetime"), + ) +) + +( + raw_stream.writeStream.format("delta") + .outputMode("append") + .option("checkpointLocation", checkpoint_dir) + .trigger(availableNow=True) + .table(bronze_raw_code) + .processAllAvailable() +) + +display(spark.sql(f"select * from {bronze_raw_code}")) + +# COMMAND ---------- + +# DBTITLE 1,get the prompts which are greater than the loaddate in the silver table + +llm_inputs = spark.sql( + f""" + select monotonically_increasing_id() as id, + brc.path, + modificationTime, + length, + content, + loadDatetime, + promptID, + agentConfigs + from {bronze_raw_code} brc + cross join ( + select bpc.promptID, agentConfigs + from {bronze_prompt_config} bpc + left anti join ( + select distinct promptID from {silver_llm_responses} + ) st on bpc.promptID = st.promptID + ) + """ +) +llm_inputs.write.mode("overwrite").saveAsTable(bronze_holding_table) +display(spark.read.table(bronze_holding_table)) + +# COMMAND ---------- + +# get the id's that will be passed to the for each loop for llm'ing +ids = llm_inputs.select("id").collect() +ids = [x.id for x in ids] +dbutils.jobs.taskValues.set(key="new_record_ids", value=ids) + +# set the promptID as a value. This will be used fo the gold table to pull in +# the latest results from the silver table +promptID = llm_inputs.select("promptID").distinct().collect() +promptID = [x.promptID for x in promptID][0] +dbutils.jobs.taskValues.set(key="promptID", value=promptID) diff --git a/sql_migration_assistant/jobs/call_agents.py b/sql_migration_assistant/jobs/call_agents.py new file mode 100644 index 00000000..598b16b7 --- /dev/null +++ b/sql_migration_assistant/jobs/call_agents.py @@ -0,0 +1,108 @@ +# Databricks notebook source +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.serving import ChatMessage, ChatMessageRole +import json +import os +from pyspark.sql.types import ( + ArrayType, + StructType, + StructField, + StringType, + MapType, + IntegerType, + TimestampType, +) +import pyspark.sql.functions as f +from pyspark.sql.functions import udf, pandas_udf + +# COMMAND ---------- + + +agent_configs = json.loads(dbutils.widgets.get("agent_configs")) +app_configs = json.loads(dbutils.widgets.get("app_configs")) +record_id = dbutils.widgets.get("record_id") +bronze_holding_table = ( + f'{app_configs["CATALOG"]}.{app_configs["SCHEMA"]}.bronze_holding_table' +) +silver_llm_responses = ( + f'{app_configs["CATALOG"]}.{app_configs["SCHEMA"]}.silver_llm_responses' +) +code_intent_table = f'{app_configs["CATALOG"]}.{app_configs["SCHEMA"]}.{app_configs["CODE_INTENT_TABLE_NAME"]}' + +secret_scope = app_configs["DATABRICKS_TOKEN_SECRET_SCOPE"] +secret_key = app_configs["DATABRICKS_TOKEN_SECRET_KEY"] +host = app_configs["DATABRICKS_HOST"] + +# COMMAND ---------- + +print(record_id) + +# COMMAND ---------- + +# need this for when workspace client is created during a job +key = dbutils.secrets.get(scope=secret_scope, key=secret_key) + + +@pandas_udf(MapType(StringType(), StringType())) +def call_llm(input_code_series, agent_configs_series): + def process_row(input_code, agent_configs): + output = {} + for agent in agent_configs.keys(): + agent_app_configs = agent_configs[agent] + system_prompt = agent_app_configs["system_prompt"] + endpoint = agent_app_configs["endpoint"] + max_tokens = agent_app_configs["max_tokens"] + temperature = agent_app_configs["temperature"] + w = WorkspaceClient(host=host, token=key) + messages = [ + ChatMessage(role=ChatMessageRole.SYSTEM, content=system_prompt), + ChatMessage(role=ChatMessageRole.USER, content=input_code), + ] + max_tokens = int(max_tokens) + temperature = float(temperature) + response = w.serving_endpoints.query( + name=endpoint, + max_tokens=max_tokens, + messages=messages, + temperature=temperature, + ) + message = response.choices[0].message.content + output[agent] = message + return output + + return input_code_series.combine(agent_configs_series, process_row) + + +# COMMAND ---------- + +response = ( + spark.read.table(bronze_holding_table) + .where(f.col("id") == f.lit(record_id)) + .withColumn("llm_responses", call_llm(f.col("content"), f.col("agentConfigs"))) + .withColumn("agentName", f.map_keys(f.col("agentConfigs")).getItem(0)) + .withColumn("agentResponse", f.map_values(f.col("llm_responses")).getItem(0)) + .select("path", "promptID", "loadDatetime", "content", "agentName", "agentResponse") + .cache() +) + +(response.write.mode("append").saveAsTable(silver_llm_responses)) + +# COMMAND ---------- + +temp_table_name = f"response{record_id}" +response.createOrReplaceTempView(temp_table_name) +spark.sql( + f""" +MERGE INTO {code_intent_table} AS target +USING ( + SELECT hash(content) AS id, content AS code, agentResponse AS intent + FROM {temp_table_name} + WHERE agentName = "explanation_agent" +) AS source +ON target.id = source.id +WHEN MATCHED THEN + UPDATE SET target.code = source.code, target.intent = source.intent +WHEN NOT MATCHED THEN + INSERT (id, code, intent) VALUES (source.id, source.code, source.intent) +""" +) diff --git a/sql_migration_assistant/jobs/silver_to_gold.py b/sql_migration_assistant/jobs/silver_to_gold.py new file mode 100644 index 00000000..7228aa06 --- /dev/null +++ b/sql_migration_assistant/jobs/silver_to_gold.py @@ -0,0 +1,176 @@ +# Databricks notebook source +import base64 +from databricks.sdk import WorkspaceClient +from databricks.sdk.service.workspace import ImportFormat, Language +from pyspark.sql import functions as f +from pyspark.sql.types import * +import json + +# COMMAND ---------- + +# DBTITLE 1,get params into notebook +agent_configs = json.loads(dbutils.widgets.get("agent_configs")) +app_configs = json.loads(dbutils.widgets.get("app_configs")) + +secret_scope = app_configs["DATABRICKS_TOKEN_SECRET_SCOPE"] +secret_key = app_configs["DATABRICKS_TOKEN_SECRET_KEY"] +host = app_configs["DATABRICKS_HOST"] + +workspace_location = app_configs["WORKSPACE_LOCATION"] +workspace_location = "/Workspace" + workspace_location + +key = dbutils.secrets.get(scope=secret_scope, key=secret_key) + +# COMMAND ---------- + +# DBTITLE 1,extract relevant variables from params + +silver_llm_responses = ( + f'{app_configs["CATALOG"]}.{app_configs["SCHEMA"]}.silver_llm_responses' +) +gold_table = ( + f'{app_configs["CATALOG"]}.{app_configs["SCHEMA"]}.gold_transformed_notebooks' +) +prompt_id = dbutils.jobs.taskValues.get(taskKey="ingest_to_holding", key="promptID") +output_volume_path = app_configs["VOLUME_NAME_OUTPUT_PATH"] + +# COMMAND ---------- + + +# DBTITLE 1,function to write out a notebook as a string +@udf(StringType()) +def write_notebook_code(llm_responses): + for response in llm_responses: + if "explanation_agent" == response[0]: + explanation = response[1] + elif "translation_agent" == response[0]: + translated_code = response[1] + + template = """ +-- Databricks notebook source +-- MAGIC %md +-- MAGIC # This notebook was AI generated. AI can make mistakes. This is provided as a tool to accelerate your migration. +-- MAGIC +-- MAGIC ### AI Generated Intent +-- MAGIC +-- MAGIC INTENT_GOES_HERE + +-- COMMAND ---------- + +TRANSLATED_CODE_GOES_HERE + """ + + output = template.replace("INTENT_GOES_HERE", explanation).replace( + "TRANSLATED_CODE_GOES_HERE", translated_code + ) + return output + + +# COMMAND ---------- + +# DBTITLE 1,write the notebooks into a new column +gold_df = ( + spark.read.table(silver_llm_responses) + .filter(f.col("promptID") == f.lit(prompt_id)) + .withColumn("zipped", f.array(f.col("agentName"), f.col("agentResponse"))) + .groupBy(f.col("content"), f.col("loadDatetime"), f.col("promptID"), f.col("path")) + .agg( + f.collect_list(f.col("zipped")).alias("zipped"), + ) + .withColumn("notebookAsString", write_notebook_code(f.col("zipped"))) + .withColumn("path", f.split(f.col("path"), f.lit("\."))[0]) + .withColumn( + "loadDatetimeStr", f.replace(f.col("loadDatetime"), f.lit(":"), f.lit("_")) + ) + .withColumn( + "outputVolumePath", + f.concat_ws( + "/", f.lit(output_volume_path), f.col("loadDatetimeStr"), f.col("path") + ), + ) + .withColumn( + "outputNotebookPath", + f.concat_ws( + "/", + f.lit(workspace_location), + f.lit("outputNotebooks"), + f.col("loadDatetimeStr"), + f.col("path"), + ), + ) + .select( + "promptID", + "content", + "loadDatetime", + "notebookAsString", + "outputVolumePath", + "outputNotebookPath", + ) +) + +gold_df.display() + + +# COMMAND ---------- + +temp_table_name = "gold_temp" +gold_df.createOrReplaceTempView(temp_table_name) +spark.sql( + f""" + INSERT INTO {gold_table} TABLE {temp_table_name} + """ +) +display( + spark.sql( + f""" + select * from {gold_table} + """ + ) +) + +# COMMAND ---------- + + +pandas_gold = gold_df.toPandas() + +temp_table_name = "gold_temp" +gold_df.createOrReplaceTempView(temp_table_name) +spark.sql( + f""" + INSERT INTO {gold_table} TABLE {temp_table_name} + """ +) +display( + spark.sql( + f""" + select * from {gold_table} + """ + ) +) +w = WorkspaceClient(host=host, token=key) + + +def write_files(row): + volume_path = row["outputVolumePath"] + content = row["notebookAsString"] + # write to a volume + dbutils.fs.put(volume_path, content) + + # write to workspace + + notebook_path = row["outputNotebookPath"] + notebook_path_root = "/".join(notebook_path.split("/")[:-1]) + w.workspace.mkdirs(notebook_path_root) + w.workspace.import_( + content=base64.b64encode(content.encode("utf-8")).decode("utf-8"), + path=notebook_path, + format=ImportFormat.SOURCE, + language=Language.SQL, + overwrite=True, + ) + + +pandas_gold = gold_df.toPandas() +pandas_gold.apply(write_files, axis=1) + +# COMMAND ---------- diff --git a/sql_migration_assistant/requirements.txt b/sql_migration_assistant/requirements.txt index 1b67eb1a..c4dcd90e 100644 --- a/sql_migration_assistant/requirements.txt +++ b/sql_migration_assistant/requirements.txt @@ -1,5 +1,4 @@ databricks-sdk==0.30.0 pyyaml -mlflow[databricks] databricks-labs-blueprint==0.8.2 databricks-labs-lsql==0.9.0 \ No newline at end of file diff --git a/sql_migration_assistant/run_app_from_databricks_notebook.py b/sql_migration_assistant/run_app_from_databricks_notebook.py index 2a75ead4..5552726b 100644 --- a/sql_migration_assistant/run_app_from_databricks_notebook.py +++ b/sql_migration_assistant/run_app_from_databricks_notebook.py @@ -7,13 +7,13 @@ # MAGIC If you want to share the app with users outside of Databricks, for example so non technical SMEs can contribute to LLM prompt development, the notebook needs to run on a no isolation shared cluster. # COMMAND ---------- -pip install databricks-sdk -U +pip install databricks-sdk -U -q # COMMAND ---------- -pip install gradio==4.27.0 pyyaml aiohttp==3.10.5 databricks-labs-blueprint==0.8.2 databricks-labs-lsql==0.9.0 +pip install gradio==4.27.0 pyyaml aiohttp==3.10.5 databricks-labs-blueprint==0.8.2 databricks-labs-lsql==0.9.0 -q # COMMAND ---------- -pip install fastapi==0.112.2 pydantic==2.8.2 dbtunnel==0.14.6 +pip install fastapi==0.112.2 pydantic==2.8.2 dbtunnel==0.14.6 -q # COMMAND ---------- dbutils.library.restartPython() diff --git a/sql_migration_assistant/utils/initialsetup.py b/sql_migration_assistant/utils/initialsetup.py index 9e1ef355..629a4202 100644 --- a/sql_migration_assistant/utils/initialsetup.py +++ b/sql_migration_assistant/utils/initialsetup.py @@ -1,10 +1,12 @@ from databricks.labs.lsql.core import StatementExecutionExt -from databricks.sdk.errors import PermissionDenied, ResourceAlreadyExists, BadRequest +from databricks.sdk.errors import ResourceAlreadyExists, BadRequest +from databricks.sdk.errors.platform import PermissionDenied from sql_migration_assistant.infra.sql_warehouse_infra import SqlWarehouseInfra from sql_migration_assistant.infra.unity_catalog_infra import UnityCatalogInfra from sql_migration_assistant.infra.vector_search_infra import VectorSearchInfra from sql_migration_assistant.infra.chat_infra import ChatInfra from sql_migration_assistant.infra.secrets_infra import SecretsInfra +from sql_migration_assistant.infra.jobs_infra import JobsInfra from sql_migration_assistant.infra.app_serving_cluster_infra import ( AppServingClusterInfra, ) @@ -14,6 +16,7 @@ from sql_migration_assistant.utils.upload_files_to_workspace import FileUploader from sql_migration_assistant.utils.run_review_app import RunReviewApp + class SetUpMigrationAssistant: # this is a decorator to handle errors and do a retry where user is asked to choose an existing resource @@ -64,9 +67,9 @@ def setup_uc_infra(self, config, w, p, see): logging.info("Choose or create catalog") uc_infra.choose_UC_catalog() logging.info("Choose or create schema") - uc_infra.choose_schema_name() + uc_infra.create_schema() logging.info("Create code intent table") - uc_infra.create_code_intent_table() + uc_infra.create_tables() return uc_infra.config @_handle_errors @@ -80,6 +83,13 @@ def setup_vs_infra(self, config, w, p): vs_infra.create_VS_index() return vs_infra.config + # no need to handle errors, no user input + def setup_job(self, config, w): + job_infra = JobsInfra(config, w) + logging.info("Create transformation job") + job_infra.create_transformation_job() + return job_infra.config + @_handle_errors def setup_chat_infra(self, config, w, p): chat_infra = ChatInfra(config, w, p) @@ -94,6 +104,11 @@ def setup_secrets_infra(self, config, w, p): secrets_infra.create_secret_PAT() return secrets_infra.config + def update_config(self, w, config): + uploader = FileUploader(w) + config = uploader.update_config(config) + return config + def setup_migration_assistant(self, w, p): logging.info("Setting up infrastructure") print("\nSetting up infrastructure") @@ -131,6 +146,16 @@ def setup_migration_assistant(self, w, p): print("\nSetting up secrets") config = self.setup_secrets_infra(config, w, p) + ############################################################ + logging.info("Setting up job") + print("\nSetting up job") + config = self.setup_job(config, w) + + ############################################################ + logging.info("Infrastructure setup complete") + print("\nInfrastructure setup complete") + + config = self.update_config(w, config) return config def upload_files(self, w, path): @@ -142,22 +167,27 @@ def upload_files(self, w, path): uploader = FileUploader(w) files_to_upload = [ "utils/runindatabricks.py", - "gradio_app.py", - "run_app_from_databricks_notebook.py", "utils/configloader.py", "utils/run_review_app.py", + "jobs/bronze_to_silver.py", + "jobs/call_agents.py", + "jobs/silver_to_gold.py", + "app/llm.py", + "app/similar_code.py", + "gradio_app.py", + "run_app_from_databricks_notebook.py", "config.yml", ] - files_to_upload = [os.path.join(path, x) for x in files_to_upload] - files_to_upload.extend( - [ - os.path.join(path, "app", x) - for x in os.listdir(os.path.join(path, "app")) - if x[-3:] == ".py" - ] - ) + + def inner(f): + full_file_path = os.path.join(path, f) + logging.info( + f"Uploading {full_file_path} to {uploader.installer.install_folder()}/{f}" + ) + uploader.upload(full_file_path, f) + for f in files_to_upload: - uploader.upload(f) + inner(f) def launch_review_app(self, w, config): logging.info( @@ -168,3 +198,12 @@ def launch_review_app(self, w, config): ) app_runner = RunReviewApp(w, config) app_runner.launch_review_app() + + def check_cloud(self, w): + host = w.config.host + if "https://adb" in host: + pass + elif ".gcp.databricks" in host: + raise Exception("GCP is not supported") + else: + pass diff --git a/sql_migration_assistant/utils/run_review_app.py b/sql_migration_assistant/utils/run_review_app.py index c1653ee3..ae6b8c97 100644 --- a/sql_migration_assistant/utils/run_review_app.py +++ b/sql_migration_assistant/utils/run_review_app.py @@ -44,10 +44,10 @@ def _path_updates(self): self.executor.run( code=f""" import sys -sys.path.insert(0, '/Workspace/Users/{self.w.current_user.me().user_name}/.sql_migration_assistant/sql_migration_assistant/utils') -sys.path.insert(0, '/Workspace/Users/{self.w.current_user.me().user_name}/.sql_migration_assistant/sql_migration_assistant/app') +sys.path.insert(0, '/Workspace/Users/{self.w.current_user.me().user_name}/.sql_migration_assistant/utils') +sys.path.insert(0, '/Workspace/Users/{self.w.current_user.me().user_name}/.sql_migration_assistant/app') import os -path = '/Workspace/Users/{self.w.current_user.me().user_name}/.sql_migration_assistant/sql_migration_assistant' +path = '/Workspace/Users/{self.w.current_user.me().user_name}/.sql_migration_assistant' os.chdir(path) """ ) diff --git a/sql_migration_assistant/utils/upload_files_to_workspace.py b/sql_migration_assistant/utils/upload_files_to_workspace.py index dd2660a5..94e6c392 100644 --- a/sql_migration_assistant/utils/upload_files_to_workspace.py +++ b/sql_migration_assistant/utils/upload_files_to_workspace.py @@ -1,6 +1,6 @@ """ This code is called after the user has run through the configutation steps in initialsetup.py. -This uploads the config, runindatabricks.py, and gradio_app.py files to the Databricks workspace. +This uploads the config, runindatabricks.py, and gradio_app_backup.py files to the Databricks workspace. """ from databricks.labs.blueprint.installation import Installation @@ -12,17 +12,24 @@ class FileUploader: def __init__(self, workspace_client: WorkspaceClient): self.w = workspace_client - self.installer = Installation(self.w, "sql_migration_assistant") + self.installer = Installation(ws=self.w, product="sql_migration_assistant") def upload( self, + file_path, file_name, ): - with open(file_name, "rb") as file: + with open(file_path, "rb") as file: contents = file.read() self.installer.upload(file_name, contents) + def update_config(self, config): + # add in the Workspace location to the config + config["WORKSPACE_LOCATION"] = self.installer.install_folder() + return config + def save_config(self, config): + # not used, need to get working X = make_dataclass("X", fields=config.keys()) config_class = X(**config)