Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][JDBC] JDBC driver support pull Multiple result sets #5173

Merged
merged 5 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/check-license.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
echo "rat_file=$rat_file"
if [[ -n "$rat_file" ]];then echo "check error!" && cat $rat_file && exit 123;else echo "check success!" ;fi
- name: Upload the report
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: license-check-report
path: "**/target/rat.txt"
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class InteractiveJobDesc {
private Map<String, Object> labelMap;
private Map<String, Object> sourceMap;

// 需要加到header中的一些参数
private Map<String, String> headers;

public String getSubmitUser() {
return submitUser;
}
Expand Down Expand Up @@ -101,4 +104,12 @@ public Map<String, Object> getLabelMap() {
public void setLabelMap(Map<String, Object> labelMap) {
this.labelMap = labelMap;
}

public Map<String, String> getHeaders() {
return headers;
}

public void setHeaders(Map<String, String> headers) {
this.headers = headers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public LinkisOperResultAdapter submit(InteractiveJobDesc jobDesc)
.setVariableMap(jobDesc.getParamVarsMap())
.setLabels(jobDesc.getLabelMap())
.setSource(jobDesc.getSourceMap())
.setHeaders(jobDesc.getHeaders())
.build();
logger.info("Request info to Linkis: \n{}", CliUtils.GSON.toJson(jobSubmitAction));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ object LinkisJobBuilder {
private var threadPool: ScheduledThreadPoolExecutor = Utils.defaultScheduler
private var serverUrl: String = _

private var authTokenValue: String = CommonVars[String](
var authTokenValue: String = CommonVars[String](
"wds.linkis.client.test.common.tokenValue",
"LINKIS_CLI_TEST"
).getValue // This is the default authToken, we usually suggest set different ones for users.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,37 @@

package org.apache.linkis.computation.client

import org.apache.linkis.computation.client.interactive.InteractiveJob
import org.apache.linkis.computation.client.once.OnceJob
import org.apache.linkis.bml.client.BmlClientFactory
import org.apache.linkis.computation.client.interactive.{InteractiveJob, InteractiveJobBuilder}
import org.apache.linkis.computation.client.once.{LinkisManagerClient, OnceJob}
import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder}
import org.apache.linkis.httpclient.dws.config.DWSClientConfig
import org.apache.linkis.ujes.client.UJESClientImpl

import java.io.Closeable

class LinkisJobClient(clientConfig: DWSClientConfig) extends Closeable {

private val ujseClient = new UJESClientImpl(clientConfig)

private lazy val linkisManagerCLient = LinkisManagerClient(ujseClient)

override def close(): Unit = {
if (null != linkisManagerCLient) {
linkisManagerCLient.close()
}
}

def onceJobBuilder(): SimpleOnceJobBuilder =
SimpleOnceJob.builder(SimpleOnceJobBuilder.getBmlClient(clientConfig), linkisManagerCLient)

def interactiveJobBuilder(): InteractiveJobBuilder = {
val builder = InteractiveJob.builder()
builder.setUJESClient(ujseClient)
}

}

/**
* This class is only used to provide a unified entry for user to build a LinkisJob conveniently and
* simply. Please keep this class lightweight enough, do not set too many field to confuse user.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.linkis.computation.client.once.simple

import org.apache.linkis.bml.client.BmlClient
import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.computation.client.LinkisJobMetrics
import org.apache.linkis.computation.client.job.AbstractSubmittableLinkisJob
import org.apache.linkis.computation.client.once.{LinkisManagerClient, OnceJob, SubmittableOnceJob}
import org.apache.linkis.computation.client.once.action.CreateEngineConnAction
import org.apache.linkis.computation.client.once.result.CreateEngineConnResult
import org.apache.linkis.computation.client.operator.OnceJobOperator

import java.util.Locale
Expand Down Expand Up @@ -109,15 +109,13 @@ class SubmittableSimpleOnceJob(
with AbstractSubmittableLinkisJob {

private var ecmServiceInstance: ServiceInstance = _
private var createEngineConnResult: CreateEngineConnResult = _

def getECMServiceInstance: ServiceInstance = ecmServiceInstance
def getCreateEngineConnResult: CreateEngineConnResult = createEngineConnResult

override protected def doSubmit(): Unit = {
logger.info(s"Ready to create a engineConn: ${createEngineConnAction.getRequestPayload}.")
createEngineConnResult = linkisManagerClient.createEngineConn(createEngineConnAction)
lastNodeInfo = createEngineConnResult.getNodeInfo
val nodeInfo = linkisManagerClient.createEngineConn(createEngineConnAction)
lastNodeInfo = nodeInfo.getNodeInfo
serviceInstance = getServiceInstance(lastNodeInfo)
ticketId = getTicketId(lastNodeInfo)
ecmServiceInstance = getECMServiceInstance(lastNodeInfo)
Expand Down Expand Up @@ -160,6 +158,11 @@ object SimpleOnceJob {

def builder(): SimpleOnceJobBuilder = new SimpleOnceJobBuilder

def builder(
bmlClient: BmlClient,
linkisManagerClient: LinkisManagerClient
): SimpleOnceJobBuilder = new SimpleOnceJobBuilder(bmlClient, linkisManagerClient)

/**
* Build a submitted SimpleOnceJob by id and user.
* @param id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.linkis.computation.client.once.simple
import org.apache.linkis.bml.client.{BmlClient, BmlClientFactory}
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.computation.client.LinkisJobBuilder
import org.apache.linkis.computation.client.LinkisJobBuilder.clientConfig
import org.apache.linkis.computation.client.once.LinkisManagerClient
import org.apache.linkis.computation.client.once.action.CreateEngineConnAction
import org.apache.linkis.computation.client.once.simple.SimpleOnceJobBuilder._
Expand All @@ -28,6 +29,8 @@ import org.apache.linkis.governance.common.entity.job.OnceExecutorContent
import org.apache.linkis.governance.common.utils.OnceExecutorContentUtils
import org.apache.linkis.governance.common.utils.OnceExecutorContentUtils.BmlResource
import org.apache.linkis.httpclient.dws.DWSHttpClient
import org.apache.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy
import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, DWSClientConfigBuilder}
import org.apache.linkis.manager.label.constant.LabelKeyConstant
import org.apache.linkis.protocol.utils.TaskUtils
import org.apache.linkis.ujes.client.exception.UJESJobException
Expand All @@ -38,12 +41,19 @@ import java.util
import scala.collection.convert.WrapAsJava._
import scala.collection.convert.WrapAsScala._

class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[SubmittableSimpleOnceJob] {
class SimpleOnceJobBuilder private[simple] (
private val bmlClient: BmlClient,
private val linkisManagerClient: LinkisManagerClient
) extends LinkisJobBuilder[SubmittableSimpleOnceJob] {

private var createService: String = _
private var maxSubmitTime: Long = _
private var description: String = _

def this() = {
this(null, null)
}

def setCreateService(createService: String): this.type = {
this.createService = createService
this
Expand All @@ -69,10 +79,26 @@ class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[Submittab
val contentMap = OnceExecutorContentUtils.contentToMap(onceExecutorContent)
val bytes = DWSHttpClient.jacksonJson.writeValueAsBytes(contentMap)
val response =
getBmlClient.uploadResource(executeUser, getFilePath, new ByteArrayInputStream(bytes))
getThisBMLClient.uploadResource(executeUser, getFilePath, new ByteArrayInputStream(bytes))
OnceExecutorContentUtils.resourceToValue(BmlResource(response.resourceId, response.version))
}

protected def getThisBMLClient(): BmlClient = {
if (null == this.bmlClient) {
getBmlClient(LinkisJobBuilder.getDefaultClientConfig)
} else {
this.bmlClient
}
}

protected def getThisLinkisManagerClient(): LinkisManagerClient = {
if (null == this.linkisManagerClient) {
getLinkisManagerClient
} else {
this.linkisManagerClient
}
}

override def build(): SubmittableSimpleOnceJob = {
ensureNotNull(labels, "labels")
ensureNotNull(jobContent, "jobContent")
Expand All @@ -99,7 +125,7 @@ class SimpleOnceJobBuilder private[simple] () extends LinkisJobBuilder[Submittab
.setMaxSubmitTime(maxSubmitTime)
.setDescription(description)
.build()
new SubmittableSimpleOnceJob(getLinkisManagerClient, createEngineConnAction)
new SubmittableSimpleOnceJob(getThisLinkisManagerClient, createEngineConnAction)
}

implicit def toMap(map: util.Map[String, Any]): util.Map[String, String] = map.map {
Expand Down Expand Up @@ -128,10 +154,27 @@ object SimpleOnceJobBuilder {
private var bmlClient: BmlClient = _
private var linkisManagerClient: LinkisManagerClient = _

def getBmlClient: BmlClient = {
def getBmlClient(clientConfig: DWSClientConfig): BmlClient = {
if (bmlClient == null) synchronized {
if (bmlClient == null) {
bmlClient = BmlClientFactory.createBmlClient(LinkisJobBuilder.getDefaultClientConfig)
val newClientConfig = DWSClientConfigBuilder
.newBuilder()
.addServerUrl(clientConfig.getServerUrl)
.connectionTimeout(clientConfig.getConnectTimeout)
.discoveryEnabled(clientConfig.isDiscoveryEnabled)
.loadbalancerEnabled(clientConfig.isLoadbalancerEnabled)
.maxConnectionSize(clientConfig.getMaxConnection)
.retryEnabled(clientConfig.isRetryEnabled)
.setRetryHandler(clientConfig.getRetryHandler)
.readTimeout(
clientConfig.getReadTimeout
) // We think 90s is enough, if SocketTimeoutException is throw, just set a new clientConfig to modify it.
.setAuthenticationStrategy(new TokenAuthenticationStrategy())
.setAuthTokenKey(TokenAuthenticationStrategy.TOKEN_KEY)
.setAuthTokenValue(LinkisJobBuilder.authTokenValue)
.setDWSVersion(clientConfig.getDWSVersion)
.build()
bmlClient = BmlClientFactory.createBmlClient(newClientConfig)
Utils.addShutdownHook(() => bmlClient.close())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ abstract class UJESClient extends Closeable with Logging {
* @return
*/
def progress(jobExecuteResult: JobExecuteResult): JobProgressResult =
Utils.tryCatch(executeJobExecIdAction(jobExecuteResult, JobServiceType.JobProgress)) { t =>
Utils.tryCatch(
executeJobExecIdAction(jobExecuteResult, JobServiceType.JobProgress)
.asInstanceOf[JobProgressResult]
) { t =>
logger.warn("Failed to get progress, return empty progress.", t)
val result = new JobProgressResult
result.setProgress(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.linkis.ujes.client.exception.UJESClientBuilderException

import java.util

import scala.collection.JavaConverters.mapAsScalaMapConverter

class JobSubmitAction private () extends POSTAction with UJESJobAction {
override def suffixURLs: Array[String] = Array("entrance", "submit")

Expand Down Expand Up @@ -52,6 +54,8 @@ object JobSubmitAction {

private var source: util.Map[String, AnyRef] = _

private var headers: util.Map[String, String] = _

def addExecuteCode(executeCode: String): Builder = {
if (null == executionContent) executionContent = new util.HashMap[String, AnyRef]()
executionContent.put("code", executeCode)
Expand Down Expand Up @@ -129,6 +133,11 @@ object JobSubmitAction {
this
}

def setHeaders(headers: util.Map[String, String]): Builder = {
this.headers = headers
this
}

def build(): JobSubmitAction = {
val submitAction = new JobSubmitAction
submitAction.setUser(user)
Expand All @@ -145,6 +154,11 @@ object JobSubmitAction {

if (this.labels == null) this.labels = new util.HashMap[String, AnyRef]()
submitAction.addRequestPayload(TaskConstant.LABELS, this.labels)

if (this.headers == null) this.headers = new util.HashMap[String, String]()
this.headers.asScala.foreach { case (k, v) =>
if (k != null && v != null) submitAction.addHeader(k, v)
}
submitAction
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ object ResultSetAction {
// default value is :org.apache.linkis.storage.domain.Dolphin.LINKIS_NULL
private var nullValue: String = "LINKIS_NULL"

private var enableLimit: Boolean = false
private var enableLimit: Option[Boolean] = None
private var columnPage: Int = _
private var columnPageSize: Int = _

def setUser(user: String): Builder = {
this.user = user
Expand Down Expand Up @@ -71,7 +73,17 @@ object ResultSetAction {
}

def setEnableLimit(enableLimit: Boolean): Builder = {
this.enableLimit = enableLimit
this.enableLimit = Some(enableLimit)
this
}

def setColumnPage(columnPage: Int): Builder = {
this.columnPage = columnPage
this
}

def setColumnPageSize(columnPageSize: Int): Builder = {
this.columnPageSize = columnPageSize
this
}

Expand All @@ -83,8 +95,18 @@ object ResultSetAction {
if (page > 0) resultSetAction.setParameter("page", page)
if (pageSize > 0) resultSetAction.setParameter("pageSize", pageSize)
resultSetAction.setParameter("charset", charset)
resultSetAction.setParameter("enableLimit", enableLimit)
if (enableLimit.isDefined) resultSetAction.setParameter("enableLimit", true)
resultSetAction.setParameter("nullValue", nullValue)
if (columnPage > 0) {
resultSetAction.setParameter("columnPage", columnPage)
} else {
resultSetAction.setParameter("columnPage", null)
}
if (columnPageSize > 0) {
resultSetAction.setParameter("columnPageSize", columnPageSize)
} else {
resultSetAction.setParameter("columnPageSize", null)
}
resultSetAction.setUser(user)
resultSetAction
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.linkis.ujes.client.utils

import org.apache.linkis.ujes.client.exception.UJESClientBuilderException
import org.apache.linkis.ujes.client.request.JobExecuteAction.{EngineType, RunType}
import org.apache.linkis.ujes.client.response.ResultSetResult

import java.util
import java.util.Locale
Expand All @@ -27,6 +28,8 @@ import com.google.gson.{Gson, JsonObject}

object UJESClientUtils {

val gson: Gson = new Gson()

def toEngineType(engineType: String): EngineType = engineType match {
case "spark" => EngineType.SPARK
case "hive" => EngineType.HIVE
Expand Down Expand Up @@ -71,13 +74,11 @@ object UJESClientUtils {
case "double" => value.toDouble
case "boolean" => value.toBoolean
case "byte" => value.toByte
case "timestamp" => value
case "date" => value
case "bigint" => value.toLong
case "decimal" => value.toDouble
case "array" => new Gson().fromJson(value, classOf[util.ArrayList[Object]])
case "map" => new Gson().fromJson(value, classOf[util.HashMap[Object, Object]])
case "struct" => new Gson().fromJson(value, classOf[JsonObject])
case "array" => gson.fromJson(value, classOf[util.ArrayList[Object]])
case "map" => gson.fromJson(value, classOf[util.HashMap[Object, Object]])
case "struct" => gson.fromJson(value, classOf[JsonObject])
case _ => value
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class UJESSQLDriver extends UJESSQLDriverMain implements Driver {
static String PASSWORD = "password";
static boolean TABLEAU_SERVER = false;
static String FIXED_SESSION = "fixedSession";
static String ENABLE_MULTI_RESULT = "enableMultiResult";

static String USE_SSL = "useSSL";
static String VERSION = "version";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,6 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope
val runType = EngineType.mapStringToEngineType(engine) match {
case EngineType.SPARK => RunType.SQL
case EngineType.HIVE => RunType.HIVE
case EngineType.REPL => RunType.REPL
case EngineType.DORIS => RunType.DORIS
case EngineType.TRINO => RunType.TRINO_SQL
case EngineType.PRESTO => RunType.PRESTO_SQL
case EngineType.NEBULA => RunType.NEBULA_SQL
Expand Down
Loading
Loading