Skip to content

Commit

Permalink
use insert as select for dynamic table data processing (matrixorigin#…
Browse files Browse the repository at this point in the history
…13594)

Use "insert into as select" for dynamic table problem.

之前是先select再insert, 现在一步完成, 可以节省处理时间

Approved by: @daviszhen, @sukki37
  • Loading branch information
gavinyue authored Dec 21, 2023
1 parent 4c7d5d0 commit 668605f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pkg/frontend/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (mce *MysqlCmdExecutor) handleCreateDynamicTable(ctx context.Context, st *t
}
}

options[moconnector.OptConnectorSql] = "use " + mce.ses.GetDatabaseName() + "; " + tree.String(st.AsSource, dialect.MYSQL)
options[moconnector.OptConnectorSql] = tree.String(st.AsSource, dialect.MYSQL)
if err := createConnector(
ctx,
mce.ses.GetTenantInfo().TenantID,
Expand Down
29 changes: 11 additions & 18 deletions pkg/stream/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package moconnector

import (
"context"
"fmt"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -323,27 +324,19 @@ func (k *KafkaMoConnector) Close() error {

func (k *KafkaMoConnector) insertRow(msgs []*kafka.Message) {
opts := ie.SessionOverrideOptions{}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
res := k.queryResult(k.options["sql"], msgs)
if res.RowCount() == 0 || res.ColumnCount() == 0 {
ctx := context.Background()
sql := k.options["sql"]
dbName := k.options[mokafka.DatabaseKey]
tableName := k.options[mokafka.TableKey]
if sql == "" {
return
}
sql, err := k.converter.Convert(ctx, res)
if err != nil {
k.logger.Error("failed to get sql", zap.String("SQL", sql), zap.Error(err))
}
err = k.ie.Exec(ctx, sql, opts)
ctx = context.WithValue(ctx, defines.SourceScanResKey{}, msgs)

sql = fmt.Sprintf("USE %s; INSERT INTO %s.%s %s ",
dbName, dbName, tableName, sql)
err := k.ie.Exec(ctx, sql, opts)
if err != nil {
k.logger.Error("failed to insert row", zap.String("SQL", sql), zap.Error(err))
}
}

func (k *KafkaMoConnector) queryResult(sql string, msgs []*kafka.Message) ie.InternalExecResult {
opts := ie.SessionOverrideOptions{}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*100)
ctx = context.WithValue(ctx, defines.SourceScanResKey{}, msgs)
defer cancel()
res := k.ie.Query(ctx, sql, opts)
return res
}

0 comments on commit 668605f

Please sign in to comment.