-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathy.go
148 lines (140 loc) · 4.34 KB
/
y.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*
* Copyright (c) 2000-2018, 达梦数据库有限公司.
* All rights reserved.
*/
package dm
import (
"bytes"
"math/rand"
"sync"
"time"
"github.com/gotomicro/dmgo/util"
)
/**
* dm_svc.conf中配置的服务名对应的一组实例, 以及相关属性和状态信息
*
* 需求:
* 1. 连接均匀分布在各个节点上
* 2. loginMode,loginStatus匹配
* 3. 连接异常节点比较耗时,在DB列表中包含异常节点时异常连接尽量靠后,减少对建连接速度的影响
*
*
* DB 连接顺序:
* 1. well distribution,每次连接都从列表的下一个节点开始
* 2. 用DB sort值按从大到小排序,sort为一个四位数XXXX,个位--serverStatus,十位--serverMode,共 有三种模式,最优先的 *100, 次优先的*10
*/
type epGroup struct {
name string
epList []*ep
props *Properties
epStartPos int32 // wellDistribute 起始位置
lock sync.Mutex
}
func newEPGroup(name string, serverList []*ep) *epGroup {
g := new(epGroup)
g.name = name
g.epList = serverList
if serverList == nil || len(serverList) == 0 {
g.epStartPos = -1
} else {
// 保证进程间均衡,起始位置采用随机值
g.epStartPos = rand.Int31n(int32(len(serverList))) - 1
}
return g
}
func (g *epGroup) connect(connector *Connector) (*Connection, error) {
var dbSelector = g.getEPSelector(connector)
var ex error = nil
// 如果配置了loginMode的主、备等优先策略,而未找到最高优先级的节点时持续循环switchtimes次,如果最终还是没有找到最高优先级则选择次优先级的
// 如果只有一个节点,一轮即可决定是否连接;多个节点时保证switchTimes轮尝试,最后一轮决定用哪个节点(由于节点已经按照模式优先级排序,最后一轮理论上就是连第一个节点)
var cycleCount int32
if len(g.epList) == 1 {
cycleCount = 1
} else {
cycleCount = connector.switchTimes + 1
}
for i := int32(0); i < cycleCount; i++ {
// 循环了一遍,如果没有符合要求的, 重新排序, 再尝试连接
conn, err := g.traverseServerList(connector, dbSelector, i == 0, i == cycleCount-1)
if err != nil {
ex = err
time.Sleep(time.Duration(connector.switchInterval) * time.Millisecond)
continue
}
return conn, nil
}
return nil, ex
}
func (g *epGroup) getEPSelector(connector *Connector) *epSelector {
if connector.epSelector == TYPE_HEAD_FIRST {
return newEPSelector(g.epList)
} else {
serverCount := int32(len(g.epList))
sortEPs := make([]*ep, serverCount)
g.lock.Lock()
defer g.lock.Unlock()
g.epStartPos = (g.epStartPos + 1) % serverCount
for i := int32(0); i < serverCount; i++ {
sortEPs[i] = g.epList[(i+g.epStartPos)%serverCount]
}
return newEPSelector(sortEPs)
}
}
/**
* 从指定编号开始,遍历一遍服务名中的ip列表,只连接指定类型(主机或备机)的ip
* @param servers
* @param checkTime
*
* @exception
* DBError.ECJDBC_INVALID_SERVER_MODE 有站点的模式不匹配
* DBError.ECJDBC_COMMUNITION_ERROR 所有站点都连不上
*/
func (g *epGroup) traverseServerList(connector *Connector, epSelector *epSelector, first bool, last bool) (*Connection, error) {
epList := epSelector.sortDBList(first)
errorMsg := bytes.NewBufferString("")
var ex error = nil // 第一个错误
for _, server := range epList {
conn, err := server.connect(connector)
if err != nil {
if ex == nil {
ex = err
}
errorMsg.WriteString("[")
errorMsg.WriteString(server.String())
errorMsg.WriteString("]")
errorMsg.WriteString(err.Error())
errorMsg.WriteString(util.StringUtil.LineSeparator())
continue
}
valid, err := epSelector.checkServerMode(conn, last)
if err != nil {
if ex == nil {
ex = err
}
errorMsg.WriteString("[")
errorMsg.WriteString(server.String())
errorMsg.WriteString("]")
errorMsg.WriteString(err.Error())
errorMsg.WriteString(util.StringUtil.LineSeparator())
continue
}
if !valid {
conn.close()
err = ECGO_INVALID_SERVER_MODE.throw()
if ex == nil {
ex = err
}
errorMsg.WriteString("[")
errorMsg.WriteString(server.String())
errorMsg.WriteString("]")
errorMsg.WriteString(err.Error())
errorMsg.WriteString(util.StringUtil.LineSeparator())
continue
}
return conn, nil
}
if ex != nil {
return nil, ex
}
return nil, ECGO_COMMUNITION_ERROR.addDetail(errorMsg.String()).throw()
}