Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
yangwenzea committed Dec 6, 2023
2 parents 37c0970 + b689f88 commit 2d6ef74
Show file tree
Hide file tree
Showing 469 changed files with 23,273 additions and 2,468 deletions.
2 changes: 1 addition & 1 deletion .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ and session management.

### Related issues/PRs

Related issues: #590
Related issues: close #590 close #591
Related pr:#591


Expand Down
2 changes: 1 addition & 1 deletion .github/actions/chart-testing-action
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@ package org.apache.linkis.common
class ServiceInstance {
private var applicationName: String = _
private var instance: String = _
private var registryTimestamp: Long = _
def setApplicationName(applicationName: String): Unit = this.applicationName = applicationName
def getApplicationName: String = applicationName
def setInstance(instance: String): Unit = this.instance = instance
def getInstance: String = instance

def setRegistryTimestamp(registryTimestamp: Long): Unit = this.registryTimestamp =
registryTimestamp

def getRegistryTimestamp: Long = registryTimestamp

override def equals(other: Any): Boolean = other match {
case that: ServiceInstance =>
applicationName == that.applicationName &&
Expand All @@ -42,7 +48,9 @@ class ServiceInstance {
.foldLeft(0)((a, b) => 31 * a + b)
}

override def toString: String = s"ServiceInstance($applicationName, $instance)"
override def toString: String =
s"ServiceInstance($applicationName, $instance, $registryTimestamp)"

}

object ServiceInstance {
Expand All @@ -54,6 +62,14 @@ object ServiceInstance {
serviceInstance
}

def apply(applicationName: String, instance: String, registryTimestamp: Long): ServiceInstance = {
val serviceInstance = new ServiceInstance
serviceInstance.setApplicationName(applicationName)
serviceInstance.setInstance(instance)
serviceInstance.setRegistryTimestamp(registryTimestamp)
serviceInstance
}

def unapply(serviceInstance: ServiceInstance): Option[(String, String)] =
if (serviceInstance != null) {
Some(serviceInstance.applicationName, serviceInstance.instance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,19 +232,20 @@ private[conf] object BDPConfiguration extends Logging {

private[common] def formatValue[T](defaultValue: T, value: Option[String]): Option[T] = {
if (value.isEmpty || value.exists(StringUtils.isEmpty)) return Option(defaultValue)
val trimValue = value.map(_.trim)
val formattedValue = defaultValue match {
case _: String => value
case _: Byte => value.map(_.toByte)
case _: Short => value.map(_.toShort)
case _: Char => value.map(_.toCharArray.apply(0))
case _: Int => value.map(_.toInt)
case _: Long => value.map(_.toLong)
case _: Float => value.map(_.toFloat)
case _: Double => value.map(_.toDouble)
case _: Boolean => value.map(_.toBoolean)
case _: TimeType => value.map(new TimeType(_))
case _: ByteType => value.map(new ByteType(_))
case null => value
case _: String => trimValue
case _: Byte => trimValue.map(_.toByte)
case _: Short => trimValue.map(_.toShort)
case _: Char => trimValue.map(_.toCharArray.apply(0))
case _: Int => trimValue.map(_.toInt)
case _: Long => trimValue.map(_.toLong)
case _: Float => trimValue.map(_.toFloat)
case _: Double => trimValue.map(_.toDouble)
case _: Boolean => trimValue.map(_.toBoolean)
case _: TimeType => trimValue.map(new TimeType(_))
case _: ByteType => trimValue.map(new ByteType(_))
case null => trimValue
}
formattedValue.asInstanceOf[Option[T]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object LogUtils {
}

def generateERROR(rawLog: String): String = {
getTimeFormat + " " + "ERROR" + " " + rawLog
getTimeFormat + " " + ERROR_STR + " " + rawLog
}

def generateWarn(rawLog: String): String = {
Expand All @@ -52,4 +52,6 @@ object LogUtils {
getTimeFormat + " " + "SYSTEM-WARN" + " " + rawLog
}

val ERROR_STR = "ERROR"

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import org.apache.linkis.common.conf.CommonVars

import org.apache.commons.lang3.StringUtils

import scala.collection.mutable

object CodeAndRunTypeUtils {
private val CONF_LOCK = new Object()

Expand Down Expand Up @@ -101,7 +103,14 @@ object CodeAndRunTypeUtils {
def getLanguageTypeAndCodeTypeRelationMap: Map[String, String] = {
val codeTypeAndRunTypeRelationMap = getCodeTypeAndLanguageTypeRelationMap
if (codeTypeAndRunTypeRelationMap.isEmpty) Map()
else codeTypeAndRunTypeRelationMap.flatMap(x => x._2.map(y => (y, x._1)))
else {
// codeTypeAndRunTypeRelationMap.flatMap(x => x._2.map(y => (y, x._1)))
val map = mutable.Map[String, String]()
codeTypeAndRunTypeRelationMap.foreach(kv => {
kv._2.foreach(v => map.put(v, kv._1))
})
map.toMap
}
}

def getLanguageTypeByCodeType(codeType: String, defaultLanguageType: String = ""): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;

import org.junit.jupiter.api.Test;

Expand All @@ -40,7 +41,7 @@ void testGenerateException() {
+ "null");
assertEquals(errorException.getClass(), ExceptionManager.generateException(null).getClass());
assertEquals(errorException.toString(), ExceptionManager.generateException(null).toString());
Map<String, Object> map = new HashMap<>();
Map<String, Object> map = new TreeMap<>();
map.put("level", null);
map.put("errCode", 1);
map.put("desc", "test");
Expand Down
80 changes: 76 additions & 4 deletions linkis-commons/linkis-module/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down Expand Up @@ -277,4 +273,80 @@
</plugins>
</build>

<profiles>
<profile>
<id>eureka</id>
<activation>
<activeByDefault>true</activeByDefault>
<property>
<name>discovery</name>
<value>eureka</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
</dependencies>
</profile>
<profile>
<id>nacos</id>
<activation>
<property>
<name>discovery</name>
<value>nacos</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-commons</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</exclusion>
<exclusion>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
</profiles>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,7 @@ object ServerConfiguration extends Logging {
val LINKIS_SERVER_SESSION_PROXY_TICKETID_KEY =
CommonVars("wds.linkis.session.proxy.user.ticket.key", "linkis_user_session_proxy_ticket_id_v1")

val LINKIS_SERVER_ENTRANCE_HEADER_KEY =
CommonVars("linkis.server.entrance.header.key", "jobInstanceKey")

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object RedisClient {
SessionHAConfiguration.RedisHost,
SessionHAConfiguration.RedisPort,
redisTimeout,
SessionHAConfiguration.RedisSentinalServer
SessionHAConfiguration.RedisPassword
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public interface TaskConstant {
String TICKET_ID = "ticketId";
String ENGINE_CONN_TASK_ID = "engineConnTaskId";
String ENGINE_CONN_SUBMIT_TIME = "engineConnSubmitTime";
String FAILOVER_FLAG = "failoverFlag";
String DEBUG_ENBALE = "debug.enable";

String PARAMS_DATA_SOURCE = "dataSources";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.protocol.engine

case class JobInstance(
status: String,
instances: String,
jobReqId: String,
createTimestamp: Long,
instanceRegistryTimestamp: Long
)
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ object TaskUtils {
}
} else params.put(key, waitToAdd)

private def clearMap(params: util.Map[String, AnyRef], key: String): Unit =
if (params != null && params.containsKey(key)) {
params.get(key) match {
case map: util.Map[String, AnyRef] => map.clear()
case _ => params.put(key, new util.HashMap[String, AnyRef]())
}
}

private def getConfigurationMap(
params: util.Map[String, AnyRef],
key: String
Expand Down Expand Up @@ -84,13 +92,20 @@ object TaskUtils {
def addStartupMap(params: util.Map[String, AnyRef], startupMap: util.Map[String, AnyRef]): Unit =
addConfigurationMap(params, startupMap, TaskConstant.PARAMS_CONFIGURATION_STARTUP)

def clearStartupMap(params: util.Map[String, AnyRef]): Unit = {
val configurationMap = getMap(params, TaskConstant.PARAMS_CONFIGURATION)
if (!configurationMap.isEmpty) {
clearMap(configurationMap, TaskConstant.PARAMS_CONFIGURATION_STARTUP)
}
}

def addRuntimeMap(params: util.Map[String, AnyRef], runtimeMap: util.Map[String, AnyRef]): Unit =
addConfigurationMap(params, runtimeMap, TaskConstant.PARAMS_CONFIGURATION_RUNTIME)

def addSpecialMap(params: util.Map[String, AnyRef], specialMap: util.Map[String, AnyRef]): Unit =
addConfigurationMap(params, specialMap, TaskConstant.PARAMS_CONFIGURATION_SPECIAL)

// tdoo
// todo
def getLabelsMap(params: util.Map[String, AnyRef]): util.Map[String, AnyRef] =
getMap(params, TaskConstant.LABELS)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object ZuulEntranceUtils {

private val INSTANCE_SPLIT_TOKEN = "_"

private val EXEC_ID = "exec_id"
val EXEC_ID = "exec_id"

private val SPLIT_LEN = 3

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@ abstract class AbstractGroup extends Group {

private var _status: GroupStatus = _
private var maxRunningJobs: Int = _
private var maxAllowRunningJobs: Int = 0
private var maxAskExecutorTimes: Long = 0L

def setMaxRunningJobs(maxRunningJobs: Int): Unit = this.maxRunningJobs = maxRunningJobs
def getMaxRunningJobs: Int = maxRunningJobs

def setMaxAllowRunningJobs(maxAllowRunningJobs: Int): Unit = this.maxAllowRunningJobs =
maxAllowRunningJobs

def getMaxAllowRunningJobs: Int =
if (maxAllowRunningJobs <= 0) maxRunningJobs else Math.min(maxAllowRunningJobs, maxRunningJobs)

def setMaxAskExecutorTimes(maxAskExecutorTimes: Long): Unit = this.maxAskExecutorTimes =
maxAskExecutorTimes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ object SchedulerEventState extends Enumeration {
SchedulerEventState.withName(jobState)
)

def isInitedByStr(jobState: String): Boolean = SchedulerEventState.withName(jobState) == Inited

def isRunningByStr(jobState: String): Boolean = isRunning(SchedulerEventState.withName(jobState))

}
Loading

0 comments on commit 2d6ef74

Please sign in to comment.