Skip to content

Commit

Permalink
hello world (#2)
Browse files Browse the repository at this point in the history
* topsql: make topsql enable only be controlled by pub/sub sink (pingcap#31209)

* ddl: support batch create table  (pingcap#28763)

* executor: fix data race in IndexMergeReaderExec (pingcap#31230)

close pingcap#31229

* server: filter the EOF error for normal closed at handshake  (pingcap#31081)

close pingcap#31063

* expression: change date add function return type (pingcap#28133)

close pingcap#27573

* support create interval partition

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* support create interval partition (support int/timestamp partition key)

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* parser: support alter table partitions move engine statement

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* support ddl operation

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* support interval partition manager

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* support interval partition manager handle job framwork

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* support auto create interval partition when insert meet no partition suitable error

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix bug

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix cancel job and load old job then continue to do

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* make partition readonly work(not allow to insert/update/delete)

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* add begin,end time in tables

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* tiny fix for auto create interval partition in concurrent case

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* init

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* init

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* todo: remove flag

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* fix dumpling

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* remove data in aws s3 when drop/truncate table/partition

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* make hello world work

Signed-off-by: crazycs520 <crazycs520@gmail.com>

* remove debug info

Signed-off-by: crazycs520 <crazycs520@gmail.com>

Co-authored-by: xhe <xw897002528@gmail.com>
Co-authored-by: guo-shaoge <shaoge1994@163.com>
Co-authored-by: knull-cn <hu__haifeng@163.com>
Co-authored-by: Meng Xin <tregoldmeng@gmail.com>
  • Loading branch information
5 people authored Jan 5, 2022
1 parent b0571f8 commit 5ab74c9
Show file tree
Hide file tree
Showing 63 changed files with 9,877 additions and 7,171 deletions.
4 changes: 2 additions & 2 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo)
if len(schema.Charset) == 0 {
schema.Charset = mysql.DefaultCharset
}
return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore, true)
return d.CreateSchemaWithInfo(gs.se, schema, ddl.OnExistIgnore)
}

// CreateTable implements glue.Session.
Expand All @@ -143,7 +143,7 @@ func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, tabl
newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...)
table.Partition = &newPartition
}
return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore, true)
return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore)
}

// Close implements glue.Session.
Expand Down
16 changes: 8 additions & 8 deletions cmd/explaintest/r/tpch.result
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ Sort 2.94 root tpch.lineitem.l_returnflag, tpch.lineitem.l_linestatus
└─HashAgg 2.94 root group by:tpch.lineitem.l_linestatus, tpch.lineitem.l_returnflag, funcs:sum(Column#26)->Column#18, funcs:sum(Column#27)->Column#19, funcs:sum(Column#28)->Column#20, funcs:sum(Column#29)->Column#21, funcs:avg(Column#30, Column#31)->Column#22, funcs:avg(Column#32, Column#33)->Column#23, funcs:avg(Column#34, Column#35)->Column#24, funcs:count(Column#36)->Column#25, funcs:firstrow(tpch.lineitem.l_returnflag)->tpch.lineitem.l_returnflag, funcs:firstrow(tpch.lineitem.l_linestatus)->tpch.lineitem.l_linestatus
└─TableReader 2.94 root data:HashAgg
└─HashAgg 2.94 cop[tikv] group by:tpch.lineitem.l_linestatus, tpch.lineitem.l_returnflag, funcs:sum(tpch.lineitem.l_quantity)->Column#26, funcs:sum(tpch.lineitem.l_extendedprice)->Column#27, funcs:sum(mul(tpch.lineitem.l_extendedprice, minus(1, tpch.lineitem.l_discount)))->Column#28, funcs:sum(mul(mul(tpch.lineitem.l_extendedprice, minus(1, tpch.lineitem.l_discount)), plus(1, tpch.lineitem.l_tax)))->Column#29, funcs:count(tpch.lineitem.l_quantity)->Column#30, funcs:sum(tpch.lineitem.l_quantity)->Column#31, funcs:count(tpch.lineitem.l_extendedprice)->Column#32, funcs:sum(tpch.lineitem.l_extendedprice)->Column#33, funcs:count(tpch.lineitem.l_discount)->Column#34, funcs:sum(tpch.lineitem.l_discount)->Column#35, funcs:count(1)->Column#36
└─Selection 293795345.00 cop[tikv] le(tpch.lineitem.l_shipdate, 1998-08-15)
└─Selection 293795345.00 cop[tikv] le(tpch.lineitem.l_shipdate, 1998-08-15 00:00:00.000000)
└─TableFullScan 300005811.00 cop[tikv] table:lineitem keep order:false
/*
Q2 Minimum Cost Supplier Query
Expand Down Expand Up @@ -300,7 +300,7 @@ Sort 1.00 root tpch.orders.o_orderpriority
└─HashAgg 1.00 root group by:tpch.orders.o_orderpriority, funcs:count(1)->Column#27, funcs:firstrow(tpch.orders.o_orderpriority)->tpch.orders.o_orderpriority
└─IndexHashJoin 2340750.00 root semi join, inner:IndexLookUp, outer key:tpch.orders.o_orderkey, inner key:tpch.lineitem.l_orderkey, equal cond:eq(tpch.orders.o_orderkey, tpch.lineitem.l_orderkey)
├─TableReader(Build) 2925937.50 root data:Selection
│ └─Selection 2925937.50 cop[tikv] ge(tpch.orders.o_orderdate, 1995-01-01 00:00:00.000000), lt(tpch.orders.o_orderdate, 1995-04-01)
│ └─Selection 2925937.50 cop[tikv] ge(tpch.orders.o_orderdate, 1995-01-01 00:00:00.000000), lt(tpch.orders.o_orderdate, 1995-04-01 00:00:00.000000)
│ └─TableFullScan 75000000.00 cop[tikv] table:orders keep order:false
└─IndexLookUp(Probe) 4.05 root
├─IndexRangeScan(Build) 5.06 cop[tikv] table:lineitem, index:PRIMARY(L_ORDERKEY, L_LINENUMBER) range: decided by [eq(tpch.lineitem.l_orderkey, tpch.orders.o_orderkey)], keep order:false
Expand Down Expand Up @@ -353,7 +353,7 @@ Sort 5.00 root Column#49:desc
│ └─TableFullScan 7500000.00 cop[tikv] table:customer keep order:false
└─HashJoin(Probe) 11822812.50 root inner join, equal:[eq(tpch.lineitem.l_orderkey, tpch.orders.o_orderkey)]
├─TableReader(Build) 11822812.50 root data:Selection
│ └─Selection 11822812.50 cop[tikv] ge(tpch.orders.o_orderdate, 1994-01-01 00:00:00.000000), lt(tpch.orders.o_orderdate, 1995-01-01)
│ └─Selection 11822812.50 cop[tikv] ge(tpch.orders.o_orderdate, 1994-01-01 00:00:00.000000), lt(tpch.orders.o_orderdate, 1995-01-01 00:00:00.000000)
│ └─TableFullScan 75000000.00 cop[tikv] table:orders keep order:false
└─HashJoin(Probe) 61163763.01 root inner join, equal:[eq(tpch.supplier.s_suppkey, tpch.lineitem.l_suppkey)]
├─HashJoin(Build) 100000.00 root inner join, equal:[eq(tpch.nation.n_nationkey, tpch.supplier.s_nationkey)]
Expand Down Expand Up @@ -392,7 +392,7 @@ id estRows task access object operator info
StreamAgg 1.00 root funcs:sum(Column#20)->Column#18
└─TableReader 1.00 root data:StreamAgg
└─StreamAgg 1.00 cop[tikv] funcs:sum(mul(tpch.lineitem.l_extendedprice, tpch.lineitem.l_discount))->Column#20
└─Selection 3713857.91 cop[tikv] ge(tpch.lineitem.l_discount, 0.05), ge(tpch.lineitem.l_shipdate, 1994-01-01 00:00:00.000000), le(tpch.lineitem.l_discount, 0.07), lt(tpch.lineitem.l_quantity, 24), lt(tpch.lineitem.l_shipdate, 1995-01-01)
└─Selection 3713857.91 cop[tikv] ge(tpch.lineitem.l_discount, 0.05), ge(tpch.lineitem.l_shipdate, 1994-01-01 00:00:00.000000), le(tpch.lineitem.l_discount, 0.07), lt(tpch.lineitem.l_quantity, 24), lt(tpch.lineitem.l_shipdate, 1995-01-01 00:00:00.000000)
└─TableFullScan 300005811.00 cop[tikv] table:lineitem keep order:false
/*
Q7 Volume Shipping Query
Expand Down Expand Up @@ -669,7 +669,7 @@ Projection 20.00 root tpch.customer.c_custkey, tpch.customer.c_name, Column#39,
└─IndexHashJoin 12222016.17 root inner join, inner:IndexLookUp, outer key:tpch.orders.o_orderkey, inner key:tpch.lineitem.l_orderkey, equal cond:eq(tpch.orders.o_orderkey, tpch.lineitem.l_orderkey)
├─HashJoin(Build) 3017307.69 root inner join, equal:[eq(tpch.customer.c_custkey, tpch.orders.o_custkey)]
│ ├─TableReader(Build) 3017307.69 root data:Selection
│ │ └─Selection 3017307.69 cop[tikv] ge(tpch.orders.o_orderdate, 1993-08-01 00:00:00.000000), lt(tpch.orders.o_orderdate, 1993-11-01)
│ │ └─Selection 3017307.69 cop[tikv] ge(tpch.orders.o_orderdate, 1993-08-01 00:00:00.000000), lt(tpch.orders.o_orderdate, 1993-11-01 00:00:00.000000)
│ │ └─TableFullScan 75000000.00 cop[tikv] table:orders keep order:false
│ └─HashJoin(Probe) 7500000.00 root inner join, equal:[eq(tpch.nation.n_nationkey, tpch.customer.c_nationkey)]
│ ├─TableReader(Build) 25.00 root data:TableFullScan
Expand Down Expand Up @@ -777,7 +777,7 @@ Sort 1.00 root tpch.lineitem.l_shipmode
└─Projection 10023369.01 root tpch.orders.o_orderpriority, tpch.lineitem.l_shipmode
└─IndexJoin 10023369.01 root inner join, inner:TableReader, outer key:tpch.lineitem.l_orderkey, inner key:tpch.orders.o_orderkey, equal cond:eq(tpch.lineitem.l_orderkey, tpch.orders.o_orderkey)
├─TableReader(Build) 10023369.01 root data:Selection
│ └─Selection 10023369.01 cop[tikv] ge(tpch.lineitem.l_receiptdate, 1997-01-01 00:00:00.000000), in(tpch.lineitem.l_shipmode, "RAIL", "FOB"), lt(tpch.lineitem.l_commitdate, tpch.lineitem.l_receiptdate), lt(tpch.lineitem.l_receiptdate, 1998-01-01), lt(tpch.lineitem.l_shipdate, tpch.lineitem.l_commitdate)
│ └─Selection 10023369.01 cop[tikv] ge(tpch.lineitem.l_receiptdate, 1997-01-01 00:00:00.000000), in(tpch.lineitem.l_shipmode, "RAIL", "FOB"), lt(tpch.lineitem.l_commitdate, tpch.lineitem.l_receiptdate), lt(tpch.lineitem.l_receiptdate, 1998-01-01 00:00:00.000000), lt(tpch.lineitem.l_shipdate, tpch.lineitem.l_commitdate)
│ └─TableFullScan 300005811.00 cop[tikv] table:lineitem keep order:false
└─TableReader(Probe) 1.00 root data:TableRangeScan
└─TableRangeScan 1.00 cop[tikv] table:orders range: decided by [tpch.lineitem.l_orderkey], keep order:false
Expand Down Expand Up @@ -848,7 +848,7 @@ Projection 1.00 root div(mul(100.00, Column#27), Column#28)->Column#29
└─Projection 4121984.49 root case(like(tpch.part.p_type, PROMO%, 92), mul(tpch.lineitem.l_extendedprice, minus(1, tpch.lineitem.l_discount)), 0)->Column#31, mul(tpch.lineitem.l_extendedprice, minus(1, tpch.lineitem.l_discount))->Column#32
└─IndexJoin 4121984.49 root inner join, inner:TableReader, outer key:tpch.lineitem.l_partkey, inner key:tpch.part.p_partkey, equal cond:eq(tpch.lineitem.l_partkey, tpch.part.p_partkey)
├─TableReader(Build) 4121984.49 root data:Selection
│ └─Selection 4121984.49 cop[tikv] ge(tpch.lineitem.l_shipdate, 1996-12-01 00:00:00.000000), lt(tpch.lineitem.l_shipdate, 1997-01-01)
│ └─Selection 4121984.49 cop[tikv] ge(tpch.lineitem.l_shipdate, 1996-12-01 00:00:00.000000), lt(tpch.lineitem.l_shipdate, 1997-01-01 00:00:00.000000)
│ └─TableFullScan 300005811.00 cop[tikv] table:lineitem keep order:false
└─TableReader(Probe) 1.00 root data:TableRangeScan
└─TableRangeScan 1.00 cop[tikv] table:part range: decided by [tpch.lineitem.l_partkey], keep order:false
Expand Down Expand Up @@ -1170,7 +1170,7 @@ Sort 20000.00 root tpch.supplier.s_name
│ ├─IndexRangeScan(Build) 4.02 cop[tikv] table:partsupp, index:PRIMARY(PS_PARTKEY, PS_SUPPKEY) range: decided by [eq(tpch.partsupp.ps_partkey, tpch.part.p_partkey)], keep order:false
│ └─TableRowIDScan(Probe) 4.02 cop[tikv] table:partsupp keep order:false
└─TableReader(Probe) 44189356.65 root data:Selection
└─Selection 44189356.65 cop[tikv] ge(tpch.lineitem.l_shipdate, 1993-01-01 00:00:00.000000), lt(tpch.lineitem.l_shipdate, 1994-01-01)
└─Selection 44189356.65 cop[tikv] ge(tpch.lineitem.l_shipdate, 1993-01-01 00:00:00.000000), lt(tpch.lineitem.l_shipdate, 1994-01-01 00:00:00.000000)
└─TableFullScan 300005811.00 cop[tikv] table:lineitem keep order:false
/*
Q21 Suppliers Who Kept Orders Waiting Query
Expand Down
38 changes: 38 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7641,3 +7641,41 @@ func (s *testDBSuite8) TestCreateTextAdjustLen(c *C) {
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustExec("drop table if exists t")
}

func (s *testDBSuite2) TestCreateTables(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists tables_1")
tk.MustExec("drop table if exists tables_2")
tk.MustExec("drop table if exists tables_3")

d := s.dom.DDL()
infos := []*model.TableInfo{}
infos = append(infos, &model.TableInfo{
Name: model.NewCIStr("tables_1"),
})
infos = append(infos, &model.TableInfo{
Name: model.NewCIStr("tables_2"),
})
infos = append(infos, &model.TableInfo{
Name: model.NewCIStr("tables_3"),
})

// correct name
err := d.BatchCreateTableWithInfo(tk.Se, model.NewCIStr("test"), infos, ddl.OnExistError)
c.Check(err, IsNil)

tk.MustQuery("show tables like '%tables_%'").Check(testkit.Rows("tables_1", "tables_2", "tables_3"))
job := tk.MustQuery("admin show ddl jobs").Rows()[0]
c.Assert(job[1], Equals, "test")
c.Assert(job[2], Equals, "tables_1,tables_2,tables_3")
c.Assert(job[3], Equals, "create tables")
c.Assert(job[4], Equals, "public")
// FIXME: we must change column type to give multiple id
// c.Assert(job[6], Matches, "[^,]+,[^,]+,[^,]+")

// duplicated name
infos[1].Name = model.NewCIStr("tables_1")
err = d.BatchCreateTableWithInfo(tk.Se, model.NewCIStr("test"), infos, ddl.OnExistError)
c.Check(terror.ErrorEqual(err, infoschema.ErrTableExists), IsTrue)
}
24 changes: 11 additions & 13 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,35 +120,33 @@ type DDL interface {
CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) error
DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacementPolicyStmt) error
AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacementPolicyStmt) error
AlterTablePartitionsMoveEngine(ctx sessionctx.Context, stmt *ast.AlterTableMoveStmt) error
AlterTablePartitionMeta(ctx sessionctx.Context, dbInfo *model.DBInfo, tbInfo *model.TableInfo, info *AlterTablePartitionInfo) (err error)

// CreateSchemaWithInfo creates a database (schema) given its database info.
//
// If `tryRetainID` is true, this method will try to keep the database ID specified in
// the `info` rather than generating new ones. This is just a hint though, if the ID collides
// with an existing database a new ID will always be used.
//
// WARNING: the DDL owns the `info` after calling this function, and will modify its fields
// in-place. If you want to keep using `info`, please call Clone() first.
CreateSchemaWithInfo(
ctx sessionctx.Context,
info *model.DBInfo,
onExist OnExist,
tryRetainID bool) error
onExist OnExist) error

// CreateTableWithInfo creates a table, view or sequence given its table info.
//
// If `tryRetainID` is true, this method will try to keep the table ID specified in the `info`
// rather than generating new ones. This is just a hint though, if the ID collides with an
// existing table a new ID will always be used.
//
// WARNING: the DDL owns the `info` after calling this function, and will modify its fields
// in-place. If you want to keep using `info`, please call Clone() first.
CreateTableWithInfo(
ctx sessionctx.Context,
schema model.CIStr,
info *model.TableInfo,
onExist OnExist,
tryRetainID bool) error
onExist OnExist) error

// BatchCreateTableWithInfo is like CreateTableWithInfo, but can handle multiple tables.
BatchCreateTableWithInfo(ctx sessionctx.Context,
schema model.CIStr,
info []*model.TableInfo,
onExist OnExist) error

// Start campaigns the owner and starts workers.
// ctxPool is used for the worker's delRangeManager and creates sessions.
Expand Down Expand Up @@ -253,10 +251,10 @@ func asyncNotifyEvent(d *ddlCtx, e *util.Event) {
case d.ddlEventCh <- e:
return
default:
logutil.BgLogger().Warn("[ddl] fail to notify DDL event", zap.String("event", e.String()))
time.Sleep(time.Microsecond * 10)
}
}
logutil.BgLogger().Warn("[ddl] fail to notify DDL event", zap.String("event", e.String()))
}
}

Expand Down
Loading

0 comments on commit 5ab74c9

Please sign in to comment.