Skip to content

Commit

Permalink
merge 1.7.1 to 1.8.0 (apache#592)
Browse files Browse the repository at this point in the history
* Dev 1.7.1 add python module load hook (apache#584)

* ai generate code init

* 人工修改代码

* 人工修改提交

* 【1.7.1】python物料管理 (apache#583)

* 代码由AI自动生成接口和AI插件生成

* 代码由AI自动生成接口和AI插件生成

* 代码由人工修改

---------

Co-authored-by: “v_kkhuang” <“420895376@qq.com”>

* 人工修改 code format and add rpc func

* fix bug

* Dev 1.7.0 python udf manager (apache#586)

* 代码由AI自动生成接口和AI插件生成

* 代码由AI自动生成接口和AI插件生成

* 代码由人工修改

* bug  fix

* bug  fix

---------

Co-authored-by: “v_kkhuang” <“420895376@qq.com”>

* fix bug

* code format

* 使用ai生成单元测试案例

* 人工修改单元测试

* 人工修改单元测试

* Dev 1.7.1 webank test (apache#587)

* 使用ai生成单元测试案例

* 人工修改单元测试

* 人工修改单元测试

* add instance info

* update file permission (apache#590)

Co-authored-by: “v_kkhuang” <“420895376@qq.com”>

* chore: 1.7.1 (apache#591)

* chore: 1.7.1

* upd: config file

---------

Co-authored-by: v-kkhuang <62878639+v-kkhuang@users.noreply.github.com>
Co-authored-by: “v_kkhuang” <“420895376@qq.com”>
Co-authored-by: Yonghao Mei <73584269+mayinrain@users.noreply.github.com>
  • Loading branch information
4 people authored Aug 30, 2024
1 parent 722af32 commit d221ce5
Show file tree
Hide file tree
Showing 89 changed files with 4,853 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ object Configuration extends Logging {

val VARIABLE_OPERATION: Boolean = CommonVars("wds.linkis.variable.operation", false).getValue

val IS_VIEW_FS_ENV = CommonVars("wds.linkis.env.is.viewfs", true)

val ERROR_MSG_TIP =
CommonVars(
"linkis.jobhistory.error.msg.tip",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineconn.computation.executor.hook

import org.apache.linkis.common.conf.Configuration.IS_VIEW_FS_ENV
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf
import org.apache.linkis.engineconn.computation.executor.execute.{
ComputationExecutor,
EngineExecutionContext
}
import org.apache.linkis.engineconn.core.engineconn.EngineConnManager
import org.apache.linkis.engineconn.core.executor.ExecutorManager
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.RunType.RunType
import org.apache.linkis.rpc.Sender
import org.apache.linkis.udf.UDFClientConfiguration
import org.apache.linkis.udf.api.rpc.{RequestPythonModuleProtocol, ResponsePythonModuleProtocol}
import org.apache.linkis.udf.entity.PythonModuleInfoVO

import org.apache.commons.lang3.StringUtils

import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable

/**
* The PythonModuleLoad class is designed to load Python modules into the execution environment
* dynamically. This class is not an extension of UDFLoad, but shares a similar philosophy of
* handling dynamic module loading based on user preferences and system configurations.
*/
abstract class PythonModuleLoad extends Logging {

/** Abstract properties to be defined by the subclass */
protected val engineType: String
protected val runType: RunType

protected def getEngineType(): String = engineType

protected def constructCode(pythonModuleInfo: PythonModuleInfoVO): String

private def queryPythonModuleRpc(
userName: String,
engineType: String
): java.util.List[PythonModuleInfoVO] = {
val infoList = Sender
.getSender(UDFClientConfiguration.UDF_SERVICE_NAME.getValue)
.ask(RequestPythonModuleProtocol(userName, engineType))
.asInstanceOf[ResponsePythonModuleProtocol]
.getModulesInfo()
infoList
}

protected def getLoadPythonModuleCode: Array[String] = {
val engineCreationContext =
EngineConnManager.getEngineConnManager.getEngineConn.getEngineCreationContext
val user = engineCreationContext.getUser

var infoList: util.List[PythonModuleInfoVO] =
Utils.tryAndWarn(queryPythonModuleRpc(user, getEngineType()))
if (infoList == null) {
logger.info("rpc get info is empty.")
infoList = new util.ArrayList[PythonModuleInfoVO]()
}

// 替换Viewfs
if (IS_VIEW_FS_ENV.getValue) {
infoList.asScala.foreach { info =>
val path = info.getPath
logger.info(s"python path: ${path}")
if (path.startsWith("hdfs") || path.startsWith("viewfs")) {
info.setPath(path.replace("hdfs://", "viewfs://"))
} else {
info.setPath("viewfs://" + path)
}
}
} else {

infoList.asScala.foreach { info =>
val path = info.getPath
logger.info(s"hdfs python path: ${path}")
if (!path.startsWith("hdfs")) {
info.setPath("hdfs://" + path)
}
}
}

logger.info(s"${user} load python modules: ")
infoList.asScala.foreach(l => logger.info(s"module name:${l.getName}, path:${l.getPath}\n"))

// 创建加载code
val codes: mutable.Buffer[String] = infoList.asScala
.filter { info => StringUtils.isNotEmpty(info.getPath) }
.map(constructCode)
// 打印codes
val str: String = codes.mkString("\n")
logger.info(s"python codes: $str")
codes.toArray
}

private def executeFunctionCode(codes: Array[String], executor: ComputationExecutor): Unit = {
if (null == codes || null == executor) {
return
}
codes.foreach { code =>
logger.info("Submit function registration to engine, code: " + code)
Utils.tryCatch(executor.executeLine(new EngineExecutionContext(executor), code)) {
t: Throwable =>
logger.error("Failed to load python module", t)
null
}
}
}

/**
* Generate and execute the code necessary for loading Python modules.
*
* @param executor
* An object capable of executing code in the current engine context.
*/
protected def loadPythonModules(labels: Array[Label[_]]): Unit = {

val codes = getLoadPythonModuleCode
logger.info(s"codes length: ${codes.length}")
if (null != codes && codes.nonEmpty) {
val executor = ExecutorManager.getInstance.getExecutorByLabels(labels)
if (executor != null) {
val className = executor.getClass.getName
logger.info(s"executor class: ${className}")
} else {
logger.error(s"Failed to load python, executor is null")
}

executor match {
case computationExecutor: ComputationExecutor =>
executeFunctionCode(codes, computationExecutor)
case _ =>
}
}
logger.info(s"Successful to load python, engineType : ${engineType}")
}

}

// Note: The actual implementation of methods like `executeFunctionCode` and `construct
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineconn.computation.executor.hook

import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.engineconn.common.creation.EngineCreationContext
import org.apache.linkis.engineconn.common.engineconn.EngineConn
import org.apache.linkis.engineconn.common.hook.EngineConnHook
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.CodeLanguageLabel

abstract class PythonModuleLoadEngineConnHook
extends PythonModuleLoad
with EngineConnHook
with Logging {

override def afterExecutionExecute(
engineCreationContext: EngineCreationContext,
engineConn: EngineConn
): Unit = {
Utils.tryAndWarnMsg {
val codeLanguageLabel = new CodeLanguageLabel
codeLanguageLabel.setCodeType(runType.toString)
logger.info(s"engineType: ${engineType}")
val labels = Array[Label[_]](codeLanguageLabel)
loadPythonModules(labels)
}(s"Failed to load Python Modules: ${engineType}")

}

override def afterEngineServerStartFailed(
engineCreationContext: EngineCreationContext,
throwable: Throwable
): Unit = {
logger.error(s"Failed to start Engine Server: ${throwable.getMessage}", throwable)
}

override def beforeCreateEngineConn(engineCreationContext: EngineCreationContext): Unit = {
logger.info("Preparing to load Python Module...")
}

override def beforeExecutionExecute(
engineCreationContext: EngineCreationContext,
engineConn: EngineConn
): Unit = {
logger.info(s"Before executing command on load Python Module.")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineconn.computation.executor.hook

import org.apache.linkis.manager.label.entity.engine.RunType
import org.apache.linkis.manager.label.entity.engine.RunType.RunType
import org.apache.linkis.udf.entity.PythonModuleInfoVO

/**
* 定义一个用于Spark引擎的Python模块加载与执行挂钩的类
*/
class PythonSparkEngineHook extends PythonModuleLoadEngineConnHook {

// 设置engineType属性为"spark",表示此挂钩适用于Spark数据处理引擎
override val engineType: String = "spark"

// 设置runType属性为RunType.PYSPARK,表示此挂钩将执行PySpark类型的代码
override protected val runType: RunType = RunType.PYSPARK

// 重写constructCode方法,用于根据Python模块信息构造加载模块的代码
override protected def constructCode(pythonModuleInfo: PythonModuleInfoVO): String = {
// 使用pythonModuleInfo的path属性,构造SparkContext.addPyFile的命令字符串
// 这个命令在PySpark环境中将模块文件添加到所有worker上,以便在代码中可以使用
val path: String = pythonModuleInfo.getPath
val loadCode = s"sc.addPyFile('${path}')"
logger.info(s"pythonLoadCode: ${loadCode}")
loadCode
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.engineconn.computation.executor.hook

import org.apache.linkis.engineconn.common.creation.{DefaultEngineCreationContext, EngineCreationContext}
import org.apache.linkis.engineconn.common.engineconn.DefaultEngineConn
import org.apache.linkis.manager.label.entity.engine.CodeLanguageLabel
import org.junit.jupiter.api.Test
import org.mockito.Mockito.{mock, verify, when}


// 单元测试案例
class PythonModuleLoadEngineConnHookTest {

@Test
def testAfterExecutionExecute(): Unit = {
// 创建模拟对象
val mockEngineCreationContext = new DefaultEngineCreationContext
val mockEngineConn = mock[DefaultEngineConn]
val hook = new PythonSparkEngineHook

// 设置模拟行为
var labels = new CodeLanguageLabel
labels.setCodeType("spark")

// 执行测试方法
hook.afterExecutionExecute(mockEngineCreationContext, mockEngineConn)

}

@Test
def testAfterEngineServerStartFailed(): Unit = {
// 创建模拟对象
val mockEngineCreationContext = mock[EngineCreationContext]
val mockThrowable = mock[Throwable]
val hook = new PythonSparkEngineHook

// 设置模拟行为
var labels = new CodeLanguageLabel
labels.setCodeType("spark")

// 执行测试方法
hook.afterEngineServerStartFailed(mockEngineCreationContext, mockThrowable)

}

@Test
def testBeforeCreateEngineConn(): Unit = {
// 创建模拟对象

// 验证调用

}

@Test
def testBeforeExecutionExecute(): Unit = {
// 创建模拟对象
val mockEngineCreationContext = mock[EngineCreationContext]
val mockEngineConn = mock[DefaultEngineConn]
val hook = new PythonSparkEngineHook

// 执行测试方法
hook.beforeExecutionExecute(mockEngineCreationContext, mockEngineConn)


}
}
Loading

0 comments on commit d221ce5

Please sign in to comment.