Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
systay committed Sep 13, 2024
1 parent 4e5ff21 commit 83d753f
Show file tree
Hide file tree
Showing 8 changed files with 347 additions and 45 deletions.
26 changes: 26 additions & 0 deletions go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,32 @@ func TestAggregateTypes(t *testing.T) {
})
}

func TestTraceAggregateTypes(t *testing.T) {
mcmp, closer := start(t)
defer closer()
test := func(q string) {
res := utils.Exec(t, mcmp.VtConn, "vexplain trace "+q)
fmt.Printf("Query: %s\n", q)
fmt.Printf("Result: %s\n", res.Rows[0][0].ToString())
}
mcmp.Exec("insert into aggr_test(id, val1, val2) values(1,'a',1), (2,'A',1), (3,'b',1), (4,'c',3), (5,'c',4)")
mcmp.Exec("insert into aggr_test(id, val1, val2) values(6,'d',null), (7,'e',null), (8,'E',1)")
test("select val1, count(distinct val2), count(*) from aggr_test group by val1")
test("select val1, count(distinct val2), count(*) from aggr_test group by val1")
test("select val1, sum(distinct val2), sum(val2) from aggr_test group by val1")
test("select val1, count(distinct val2) k, count(*) from aggr_test group by val1 order by k desc, val1")
test("select val1, count(distinct val2) k, count(*) from aggr_test group by val1 order by k desc, val1 limit 4")

test("select ascii(val1) as a, count(*) from aggr_test group by a")
test("select ascii(val1) as a, count(*) from aggr_test group by a order by a")
test("select ascii(val1) as a, count(*) from aggr_test group by a order by 2, a")

test("select val1 as a, count(*) from aggr_test group by a")
test("select val1 as a, count(*) from aggr_test group by a order by a")
test("select val1 as a, count(*) from aggr_test group by a order by 2, a")
test("select sum(val1) from aggr_test")
}

func TestGroupBy(t *testing.T) {
mcmp, closer := start(t)
defer closer()
Expand Down
256 changes: 255 additions & 1 deletion go/test/endtoend/vtgate/queries/tpch/tpch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ limitations under the License.
package union

import (
"fmt"
"golang.org/x/exp/rand"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
)

func start(t *testing.T) (utils.MySQLCompare, func()) {
func start(t testing.TB) (utils.MySQLCompare, func()) {
mcmp, err := utils.NewMySQLCompare(t, vtParams, mysqlParams)
require.NoError(t, err)

Expand All @@ -40,6 +44,33 @@ func start(t *testing.T) (utils.MySQLCompare, func()) {

deleteAll()

err = utils.WaitForColumn(t, clusterInstance.VtgateProcess, keyspaceName, "region", `R_COMMENT`)
require.NoError(t, err)

// Set the size parameter here. Increase for more data.
size := 100

// Seed the random number generator
rand.Seed(12345)

// Generate dynamic data
regions := generateRegions()
nations := generateNations(size, regions)
suppliers := generateSuppliers(size, nations)
parts := generateParts(size)
customers := generateCustomers(size, nations)
orders := generateOrders(size, customers)
lineitems := generateLineItems(orders, parts, suppliers)

// Execute inserts
mcmp.Exec(buildInsertQuery("region", regions))
mcmp.Exec(buildInsertQuery("nation", nations))
mcmp.Exec(buildInsertQuery("supplier", suppliers))
mcmp.Exec(buildInsertQuery("part", parts))
mcmp.Exec(buildInsertQuery("customer", customers))
mcmp.Exec(buildInsertQuery("orders", orders))
mcmp.Exec(buildInsertQuery("lineitem", lineitems))

return mcmp, func() {
deleteAll()
mcmp.Close()
Expand Down Expand Up @@ -234,3 +265,226 @@ from (select l.l_extendedprice * o.o_totalprice
})
}
}

func BenchmarkQuery(b *testing.B) {
mcmp, closer := start(b)
defer closer()

for i := 0; i < b.N; i++ {
_ = utils.Exec(b, mcmp.VtConn, "vexplain trace "+q)
}
}

const q = `SELECT
o.o_orderpriority,
EXTRACT(YEAR FROM o.o_orderdate) AS order_year,
COUNT(DISTINCT o.o_orderkey) AS order_count,
SUM(l.l_extendedprice * (1 - l.l_discount)) AS total_revenue
FROM
orders o
JOIN
lineitem l ON o.o_orderkey > l.l_orderkey
WHERE
o.o_orderdate BETWEEN '1995-01-01' AND '1996-12-31'
GROUP BY
o.o_orderpriority,
EXTRACT(YEAR FROM o.o_orderdate)
ORDER BY
o.o_orderpriority,
order_year`

func TestVexplain(t *testing.T) {
mcmp, closer := start(t)
defer closer()
err := utils.WaitForColumn(t, clusterInstance.VtgateProcess, keyspaceName, "region", `R_COMMENT`)
require.NoError(t, err)

res := utils.Exec(t, mcmp.VtConn, "vexplain trace "+q)
fmt.Printf("Query: %s\n", q)
fmt.Printf("Result: %s\n", res.Rows[0][0].ToString())
}

func generateRegions() [][]interface{} {
regions := [][]interface{}{
{1, "AMERICA", "New World"},
{2, "ASIA", "Eastern Asia"},
{3, "EUROPE", "Old World"},
{4, "AFRICA", "Dark Continent"},
{5, "AUSTRALIA", "Down Under"},
}
return regions
}

func generateNations(size int, regions [][]interface{}) [][]interface{} {
var nations [][]interface{}
for i := 0; i < size/5; i++ {
for _, region := range regions {
nationKey := len(nations) + 1
regionKey := region[0].(int)
name := fmt.Sprintf("Nation_%d_%d", regionKey, i)
if regionKey == 1 && i == 0 {
name = "BRAZIL"
}
nations = append(nations, []interface{}{nationKey, name, regionKey, fmt.Sprintf("Comment for %s", name)})
}
}
return nations
}

func generateSuppliers(size int, nations [][]interface{}) [][]interface{} {
var suppliers [][]interface{}
for i := 0; i < size; i++ {
nation := nations[rand.Intn(len(nations))]
suppliers = append(suppliers, []interface{}{
i + 1,
fmt.Sprintf("Supplier_%d", i+1),
fmt.Sprintf("Address_%d", i+1),
nation[0],
fmt.Sprintf("%d-123-4567", rand.Intn(100)),
float64(rand.Intn(10000)) + rand.Float64(),
fmt.Sprintf("Comment for Supplier_%d", i+1),
})
}
return suppliers
}

func generateParts(size int) [][]interface{} {
var parts [][]interface{}
types := []string{"ECONOMY ANODIZED STEEL", "LARGE BRUSHED BRASS", "STANDARD POLISHED COPPER", "SMALL PLATED STEEL", "MEDIUM BURNISHED TIN"}
for i := 0; i < size; i++ {
parts = append(parts, []interface{}{
i + 1,
fmt.Sprintf("Part_%d", i+1),
fmt.Sprintf("Manufacturer_%d", rand.Intn(5)+1),
fmt.Sprintf("Brand_%d", rand.Intn(5)+1),
types[rand.Intn(len(types))],
rand.Intn(50) + 1,
fmt.Sprintf("%s BOX", []string{"SM", "LG", "MED", "JUMBO", "WRAP"}[rand.Intn(5)]),
float64(rand.Intn(1000)) + rand.Float64(),
fmt.Sprintf("Comment for Part_%d", i+1),
})
}
return parts
}

func generateCustomers(size int, nations [][]interface{}) [][]interface{} {
var customers [][]interface{}
for i := 0; i < size; i++ {
nation := nations[rand.Intn(len(nations))]
customers = append(customers, []interface{}{
i + 1,
fmt.Sprintf("Customer_%d", i+1),
fmt.Sprintf("Address_%d", i+1),
nation[0],
fmt.Sprintf("%d-987-6543", rand.Intn(100)),
float64(rand.Intn(10000)) + rand.Float64(),
[]string{"AUTOMOBILE", "BUILDING", "FURNITURE", "MACHINERY", "HOUSEHOLD"}[rand.Intn(5)],
fmt.Sprintf("Comment for Customer_%d", i+1),
})
}
return customers
}

func generateOrders(size int, customers [][]interface{}) [][]interface{} {
var orders [][]interface{}
startDate := time.Date(1995, 1, 1, 0, 0, 0, 0, time.UTC)
endDate := time.Date(1996, 12, 31, 0, 0, 0, 0, time.UTC)
for i := 0; i < size*10; i++ {
customer := customers[rand.Intn(len(customers))]
orderDate := startDate.Add(time.Duration(rand.Int63n(int64(endDate.Sub(startDate)))))
orders = append(orders, []interface{}{
i + 1,
customer[0],
[]string{"O", "F", "P"}[rand.Intn(3)],
float64(rand.Intn(100000)) + rand.Float64(),
orderDate.Format("2006-01-02"),
fmt.Sprintf("%d-URGENT", rand.Intn(5)+1),
fmt.Sprintf("Clerk#%05d", rand.Intn(1000)),
rand.Intn(5),
fmt.Sprintf("Comment for Order_%d", i+1),
})
}
return orders
}

func generateLineItems(orders [][]interface{}, parts [][]interface{}, suppliers [][]interface{}) [][]interface{} {
var lineItems [][]interface{}
for _, order := range orders {
for j := 0; j < rand.Intn(7)+1; j++ {
part := parts[rand.Intn(len(parts))]
supplier := suppliers[rand.Intn(len(suppliers))]
orderDate, _ := time.Parse("2006-01-02", order[4].(string))
shipDate := orderDate.Add(time.Duration(rand.Intn(30)) * 24 * time.Hour)
commitDate := orderDate.Add(time.Duration(rand.Intn(30)) * 24 * time.Hour)
receiptDate := shipDate.Add(time.Duration(rand.Intn(30)) * 24 * time.Hour)
lineItems = append(lineItems, []interface{}{
order[0],
part[0],
supplier[0],
j + 1,
rand.Intn(50) + 1,
float64(rand.Intn(100000)) + rand.Float64(),
rand.Float64(),
rand.Float64(),
[]string{"N", "R", "A"}[rand.Intn(3)],
[]string{"O", "F"}[rand.Intn(2)],
shipDate.Format("2006-01-02"),
commitDate.Format("2006-01-02"),
receiptDate.Format("2006-01-02"),
[]string{"DELIVER IN PERSON", "COLLECT COD", "NONE", "TAKE BACK RETURN"}[rand.Intn(4)],
[]string{"TRUCK", "MAIL", "RAIL", "AIR", "SHIP"}[rand.Intn(5)],
fmt.Sprintf("Comment for Lineitem_%d_%d", order[0], j+1),
})
}
}
return lineItems
}

func buildInsertQuery(tableName string, data [][]interface{}) string {
if len(data) == 0 {
return ""
}
columns := getColumns(tableName)
valueStrings := make([]string, 0, len(data))
for _, row := range data {
valueStrings = append(valueStrings, formatRow(row))
}
query := fmt.Sprintf("INSERT INTO %s (%s) VALUES %s", tableName, strings.Join(columns, ", "), strings.Join(valueStrings, ",\n"))
return query
}

func getColumns(tableName string) []string {
switch tableName {
case "region":
return []string{"R_REGIONKEY", "R_NAME", "R_COMMENT"}
case "nation":
return []string{"N_NATIONKEY", "N_NAME", "N_REGIONKEY", "N_COMMENT"}
case "supplier":
return []string{"S_SUPPKEY", "S_NAME", "S_ADDRESS", "S_NATIONKEY", "S_PHONE", "S_ACCTBAL", "S_COMMENT"}
case "part":
return []string{"P_PARTKEY", "P_NAME", "P_MFGR", "P_BRAND", "P_TYPE", "P_SIZE", "P_CONTAINER", "P_RETAILPRICE", "P_COMMENT"}
case "customer":
return []string{"C_CUSTKEY", "C_NAME", "C_ADDRESS", "C_NATIONKEY", "C_PHONE", "C_ACCTBAL", "C_MKTSEGMENT", "C_COMMENT"}
case "orders":
return []string{"O_ORDERKEY", "O_CUSTKEY", "O_ORDERSTATUS", "O_TOTALPRICE", "O_ORDERDATE", "O_ORDERPRIORITY", "O_CLERK", "O_SHIPPRIORITY", "O_COMMENT"}
case "lineitem":
return []string{"L_ORDERKEY", "L_PARTKEY", "L_SUPPKEY", "L_LINENUMBER", "L_QUANTITY", "L_EXTENDEDPRICE", "L_DISCOUNT", "L_TAX", "L_RETURNFLAG", "L_LINESTATUS", "L_SHIPDATE", "L_COMMITDATE", "L_RECEIPTDATE", "L_SHIPINSTRUCT", "L_SHIPMODE", "L_COMMENT"}
default:
return []string{}
}
}

func formatRow(row []interface{}) string {
values := make([]string, len(row))
for i, v := range row {
switch v := v.(type) {
case string:
values[i] = fmt.Sprintf("'%s'", strings.Replace(v, "'", "''", -1))
case float64:
values[i] = fmt.Sprintf("%.2f", v)
default:
values[i] = fmt.Sprintf("%v", v)
}
}
return "(" + strings.Join(values, ", ") + ")"
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (p *Plan) Stats() (execCount uint64, execTime time.Duration, shardQueries,
func (p *Plan) MarshalJSON() ([]byte, error) {
var instructions *PrimitiveDescription
if p.Instructions != nil {
description := PrimitiveToPlanDescription(p.Instructions)
description := PrimitiveToPlanDescription(p.Instructions, nil)
instructions = &description
}

Expand Down
Loading

0 comments on commit 83d753f

Please sign in to comment.