Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 13516 #153

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ const (

// MO_PUBS publication meta table
MO_PUBS = "mo_pubs"

// MO_SNAPSHOTS
MO_SNAPSHOTS = "mo_snapshots"
)

const (
Expand Down
14 changes: 14 additions & 0 deletions pkg/cnservice/upgrader/new_add_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,26 @@ var (
primary key(stage_id)
);`, catalog.MO_CATALOG, catalog.MO_STAGES),
}

MoSnapshotsTable = &table.Table{
Account: table.AccountAll,
Database: catalog.MO_CATALOG,
Table: catalog.MO_SNAPSHOTS,
CreateTableSql: fmt.Sprintf(`CREATE TABLE %s.%s (
snapshot_id uuid unique key,
sname varchar(64) primary key,
ts timestamp,
level enum('cluster','account','database','table'),
objname varchar(5000),
);`, catalog.MO_CATALOG, catalog.MO_SNAPSHOTS),
}
)

var needUpgradeNewTable = []*table.Table{
MoTablePartitionsTable,
SysDaemonTaskTable,
MoStagesTable,
MoSnapshotsTable,
}

var PARTITIONSView = &table.Table{
Expand Down
221 changes: 216 additions & 5 deletions pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"golang.org/x/sync/errgroup"

"github.com/google/uuid"
"github.com/matrixorigin/matrixone/pkg/queryservice"

"github.com/matrixorigin/matrixone/pkg/catalog"
Expand Down Expand Up @@ -830,6 +831,7 @@ var (
"mo_variables": 0,
"mo_transactions": 0,
"mo_cache": 0,
"mo_snapshots": 0,
}
configInitVariables = map[string]int8{
"save_query_result": 0,
Expand Down Expand Up @@ -867,6 +869,7 @@ var (
"mo_variables": 0,
"mo_transactions": 0,
"mo_cache": 0,
"mo_snapshots": 0,
}
createDbInformationSchemaSql = "create database information_schema;"
createAutoTableSql = fmt.Sprintf(`create table if not exists %s (
Expand Down Expand Up @@ -1004,6 +1007,14 @@ var (
system_variables bool,
primary key(configuration_id)
);`,

`create table mo_snapshots(
snapshot_id uuid unique key,
sname varchar(64) primary key,
ts timestamp,
level enum('cluster','account','database','table'),
objname varchar(5000),
`,
`create table mo_pubs(
pub_name varchar(64) primary key,
database_name varchar(5000),
Expand Down Expand Up @@ -1069,6 +1080,7 @@ var (
`drop view if exists mo_catalog.mo_variables;`,
`drop view if exists mo_catalog.mo_transactions;`,
`drop view if exists mo_catalog.mo_cache;`,
`drop table if exists mo_catalog.mo_snapshots;`,
}
dropMoMysqlCompatibilityModeSql = `drop table if exists mo_catalog.mo_mysql_compatibility_mode;`
dropMoPubsSql = `drop table if exists mo_catalog.mo_pubs;`
Expand Down Expand Up @@ -1097,6 +1109,13 @@ var (
created_time,
comment) values ('%s','%s', '%s', '%s','%s', '%s');`

insertIntoMoSnapshots = `insert into mo_catalog.mo_snapshots(
snapshot_id,
sname,
ts,
level,
objname) values ('%s','%s', '%s', '%s','%s');`

initMoUserDefinedFunctionFormat = `insert into mo_catalog.mo_user_defined_function(
name,
owner,
Expand Down Expand Up @@ -1526,6 +1545,8 @@ const (

dropStageFormat = `delete from mo_catalog.mo_stages where stage_name = '%s' order by stage_id;`

dropSnapshotFormat = `delete from mo_catalog.mo_snapshots where sname = '%s' order by snapshot_id;`

updateStageUrlFotmat = `update mo_catalog.mo_stages set url = '%s' where stage_name = '%s' order by stage_id;`

updateStageCredentialsFotmat = `update mo_catalog.mo_stages set stage_credentials = '%s' where stage_name = '%s' order by stage_id;`
Expand All @@ -1534,6 +1555,8 @@ const (

updateStageCommentFotmat = `update mo_catalog.mo_stages set comment = '%s' where stage_name = '%s' order by stage_id;`

checkSnapshotFormat = `select snapshot_id from mo_catalog.mo_snapshots where sname = "%s" order by snapshot_id;`

getDbIdAndTypFormat = `select dat_id,dat_type from mo_catalog.mo_database where datname = '%s' and account_id = %d;`
insertIntoMoPubsFormat = `insert into mo_catalog.mo_pubs(pub_name,database_name,database_id,all_table,table_list,account_list,created_time,owner,creator,comment) values ('%s','%s',%d,%t,'%s','%s',now(),%d,%d,'%s');`
getPubInfoFormat = `select account_list,comment from mo_catalog.mo_pubs where pub_name = '%s';`
Expand Down Expand Up @@ -1604,6 +1627,14 @@ func getSqlForCheckStage(ctx context.Context, stage string) (string, error) {
return fmt.Sprintf(checkStageFormat, stage), nil
}

func getSqlForCheckSnapshot(ctx context.Context, snapshot string) (string, error) {
err := inputNameIsInvalid(ctx, snapshot)
if err != nil {
return "", err
}
return fmt.Sprintf(checkSnapshotFormat, snapshot), nil
}

func getSqlForCheckStageStatus(ctx context.Context, status string) string {
return fmt.Sprintf(checkStageStatusFormat, status)
}
Expand All @@ -1619,14 +1650,30 @@ func getSqlForCheckStageStatusWithStageName(ctx context.Context, stage string) (
return fmt.Sprintf(checkStageStatusWithStageNameFormat, stage), nil
}

func getSqlForInsertIntoMoStages(ctx context.Context, stageName, url, credentials, status, createdTime, comment string) string {
return fmt.Sprintf(insertIntoMoStages, stageName, url, credentials, status, createdTime, comment)
func getSqlForInsertIntoMoStages(ctx context.Context, stageName, url, credentials, status, createdTime, comment string) (string, error) {
err := inputNameIsInvalid(ctx, stageName)
if err != nil {
return "", err
}
return fmt.Sprintf(insertIntoMoStages, stageName, url, credentials, status, createdTime, comment), nil
}

func getSqlForCreateSnapshot(ctx context.Context, snapshotId, snapshotName, ts, level, objName string) (string, error) {
err := inputNameIsInvalid(ctx, snapshotName)
if err != nil {
return "", err
}
return fmt.Sprintf(insertIntoMoSnapshots, snapshotId, snapshotName, ts, level, objName), nil
}

func getSqlForDropStage(stageName string) string {
return fmt.Sprintf(dropStageFormat, stageName)
}

func getSqlForDropSnapshot(snapshotName string) string {
return fmt.Sprintf(dropSnapshotFormat, snapshotName)
}

func getsqlForUpdateStageUrl(stageName, url string) string {
return fmt.Sprintf(updateStageUrlFotmat, url, stageName)
}
Expand Down Expand Up @@ -3570,7 +3617,10 @@ func doCreateStage(ctx context.Context, ses *Session, cs *tree.CreateStage) (err
comment = cs.Comment.Comment
}

sql = getSqlForInsertIntoMoStages(ctx, string(cs.Name), cs.Url, credentials, StageStatus, types.CurrentTimestamp().String2(time.UTC, 0), comment)
sql, err = getSqlForInsertIntoMoStages(ctx, string(cs.Name), cs.Url, credentials, StageStatus, types.CurrentTimestamp().String2(time.UTC, 0), comment)
if err != nil {
return err
}

err = bh.Exec(ctx, sql)
if err != nil {
Expand Down Expand Up @@ -3766,7 +3816,6 @@ func doAlterStage(ctx context.Context, ses *Session, as *tree.AlterStage) (err e

func doDropStage(ctx context.Context, ses *Session, ds *tree.DropStage) (err error) {
var sql string
//var err error
var stageExist bool
bh := ses.GetBackgroundExec(ctx)
defer bh.Close()
Expand Down Expand Up @@ -5854,7 +5903,7 @@ func determinePrivilegeSetOfStatement(stmt tree.Statement) *privilege {
*tree.ShowTableValues, *tree.ShowNodeList, *tree.ShowRolesStmt,
*tree.ShowLocks, *tree.ShowFunctionOrProcedureStatus, *tree.ShowPublications, *tree.ShowSubscriptions,
*tree.ShowBackendServers, *tree.ShowStages, *tree.ShowConnectors, *tree.DropConnector,
*tree.PauseDaemonTask, *tree.CancelDaemonTask, *tree.ResumeDaemonTask:
*tree.PauseDaemonTask, *tree.CancelDaemonTask, *tree.ResumeDaemonTask, *tree.ShowSnapShots:
objType = objectTypeNone
kind = privilegeKindNone
canExecInRestricted = true
Expand Down Expand Up @@ -5917,6 +5966,9 @@ func determinePrivilegeSetOfStatement(stmt tree.Statement) *privilege {
case *tree.CreateStage, *tree.AlterStage, *tree.DropStage:
objType = objectTypeNone
kind = privilegeKindNone
case *tree.CreateSnapShot, *tree.DropSnapShot:
objType = objectTypeNone
kind = privilegeKindNone
case *tree.BackupStart:
objType = objectTypeNone
kind = privilegeKindNone
Expand Down Expand Up @@ -9504,3 +9556,162 @@ func postAlterSessionStatus(
err = queryservice.RequestMultipleCn(ctx, nodes, qc, genRequest, handleValidResponse, handleInvalidResponse)
return errors.Join(err, retErr)
}

func doCreateSnapshot(ctx context.Context, ses *Session, stmt *tree.CreateSnapShot) error {
var err error
var snapshotLevel tree.SnapshotLevel
var snapshotForAccount string
var snapshotName string
var snapshotExist bool
var snapshotId string
var snapshotTs string
var sql string

// check create stage priv
err = doCheckRole(ctx, ses)
if err != nil {
return err
}

bh := ses.GetBackgroundExec(ctx)
defer bh.Close()
err = bh.Exec(ctx, "begin;")
defer func() {
err = finishTxn(ctx, bh, err)
}()
if err != nil {
return err
}

// check create snapshot priv

// 1.only admin can create tenant level snapshot for himself
err = doCheckRole(ctx, ses)
if err != nil {
return err
}
// 2.only sys can create cluster level snapshot
tenantInfo := ses.GetTenantInfo()
currentAccount := tenantInfo.GetTenant()
snapshotLevel = stmt.Obeject.SLevel.Level
if snapshotLevel == tree.SNAPSHOTLEVELCLUSTER && currentAccount != sysAccountName {
return moerr.NewInternalError(ctx, "only sys tenant can create cluster level snapshot")
}
// 3.only sys can create tenant level snapshot for other tenant
if snapshotLevel == tree.SNAPSHOTLEVELACCOUNT {
snapshotForAccount = string(stmt.Obeject.ObjName)
if currentAccount != sysAccountName && currentAccount != snapshotForAccount {
return moerr.NewInternalError(ctx, "only sys tenant can create tenant level snapshot for other tenant")
}
}

// check snapshot exists or not
snapshotName = string(stmt.Name)
snapshotExist, err = checkSnapShotExistOrNot(ctx, bh, snapshotName)
if err != nil {
return err
}
if snapshotExist {
if !stmt.IfNotExists {
return moerr.NewInternalError(ctx, "snapshot %s already exists", snapshotName)
} else {
return nil
}
} else {
// insert record to the system table

// 1. get snapshot id
newUUid, err := uuid.NewV7()
if err != nil {
return err
}
snapshotId = newUUid.String()

// 2. get snapshot ts
ts := ses.proc.TxnOperator.SnapshotTS()
snapshotTs = ts.String()

sql, err = getSqlForCreateSnapshot(ctx, snapshotId, snapshotName, snapshotTs, snapshotLevel.String(), string(stmt.Obeject.ObjName))
if err != nil {
return err
}

err = bh.Exec(ctx, sql)
if err != nil {
return err
}
}

// insert record to the system table

return err
}

func doDropSnapshot(ctx context.Context, ses *Session, stmt *tree.DropSnapShot) (err error) {
var sql string
var stageExist bool
bh := ses.GetBackgroundExec(ctx)
defer bh.Close()

// check create stage priv
// only admin can drop snapshot for himself
err = doCheckRole(ctx, ses)
if err != nil {
return err
}

err = bh.Exec(ctx, "begin;")
defer func() {
err = finishTxn(ctx, bh, err)
}()
if err != nil {
return err
}

// check stage
stageExist, err = checkSnapShotExistOrNot(ctx, bh, string(stmt.Name))
if err != nil {
return err
}

if !stageExist {
if !stmt.IfExists {
return moerr.NewInternalError(ctx, "snapshot %s does not exist", string(stmt.Name))
} else {
// do nothing
return err
}
} else {
sql = getSqlForDropSnapshot(string(stmt.Name))
err = bh.Exec(ctx, sql)
if err != nil {
return err
}
}
return err
}

func checkSnapShotExistOrNot(ctx context.Context, bh BackgroundExec, snapshotName string) (bool, error) {
var sql string
var erArray []ExecResult
var err error
sql, err = getSqlForCheckSnapshot(ctx, snapshotName)
if err != nil {
return false, err
}
bh.ClearExecResultSet()
err = bh.Exec(ctx, sql)
if err != nil {
return false, err
}

erArray, err = getResultSet(ctx, bh)
if err != nil {
return false, err
}

if execResultArrayHasData(erArray) {
return true, nil
}
return false, nil
}
Loading
Loading