-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstate.go
321 lines (267 loc) · 8.37 KB
/
state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
package vv104
import (
"container/ring"
"context"
"sync"
"time"
)
type State struct {
Config Config
ConnState ConnState
Chans AllChans
Objects *Objects
Running bool // true when trying to connect/waiting for connection
TcpConnected bool
Ctx context.Context
Wg sync.WaitGroup
Cancel context.CancelFunc
dt_act_sent UFormat // for notification of state machine if a startdt_act or stopdt_act was sent
manualDisconnect bool
sendAck ack
recvAck ack
tickers tickers
}
type tickers struct {
t1ticker *time.Ticker
t2tickerReceivedItems *time.Ticker
t2tickerSentItems *time.Ticker
t3ticker *time.Ticker
}
type ConnState int
type AllChans struct {
CommandsFromStdin chan string
Received chan Apdu
ToSend chan Apdu
}
type ack struct {
seqNumber SeqNumber
openFrames int
ring *ring.Ring
}
type seqNumberAndTimetag struct {
seqNumber SeqNumber
timetag time.Time
}
const (
STOPPED ConnState = iota
STARTED
PENDING_UNCONFIRMED_STOPPED
PENDING_STARTED
PENDING_STOPPED
STOP_CONN
START_CONN
)
func NewState() State {
objects := NewObjects()
return State{
Config: Config{},
ConnState: 0,
sendAck: ack{},
recvAck: ack{},
Chans: AllChans{},
Objects: objects,
Wg: sync.WaitGroup{},
Ctx: nil,
Cancel: func() {
},
dt_act_sent: 0,
manualDisconnect: false,
Running: false,
TcpConnected: false,
tickers: tickers{},
}
}
func (state *State) Start() {
initLogger(state.Config)
printConfig(state.Config)
state.Running = true
for !state.manualDisconnect {
state.Chans.Received = make(chan Apdu, state.Config.W)
state.Chans.ToSend = make(chan Apdu, state.Config.K)
state.Ctx, state.Cancel = context.WithCancel(context.Background())
state.sendAck = newAck(state.Config.K)
state.recvAck = newAck(state.Config.K)
if state.Config.InteractiveMode {
state.Chans.CommandsFromStdin = make(chan string, 30)
go state.readCommandsFromStdIn()
}
// always start evaluateInteractiveCommands, we need it to control automatic sending, even if InteractiveMode is off
go state.evaluateInteractiveCommands()
go state.startConnection()
<-state.Ctx.Done()
state.Wg.Wait()
if !state.manualDisconnect {
logInfo.Println("Restart!")
}
time.Sleep(1500 * time.Millisecond)
}
defer logDebug.Println("Start() returned")
// disconnect was done purposely, exit
state.manualDisconnect = false
state.Running = false
}
func (state *State) connectionStateMachine() {
var apduToSend Apdu
var apduReceived Apdu
isServer := state.Config.Mode == "server"
isClient := state.Config.Mode == "client"
state.Wg.Add(1)
defer state.Wg.Done()
state.ConnState = STOPPED
logInfo.Println("Entering state STOPPED")
if isClient {
// we need to trigger stardt_act here, it will trigger a notification for the blocking received channel, to jump over it
state.Chans.CommandsFromStdin <- "startdt_act"
}
for {
select {
// block until apdu is received. some apdus are used as internal notifications with special type ids (are not sent)
case apduReceived = <-state.Chans.Received:
if (apduReceived.Apci.FrameFormat != IFormatFrame) || apduReceived.Asdu.TypeId < INTERNAL_STATE_MACHINE_NOTIFIER {
// real apdu received, not an internal notification
logInfo.Println("<<RX:", state.Objects.objNameOrIoa(apduReceived.Asdu), apduReceived)
state.tickers.t3ticker.Reset(time.Duration(state.Config.T3) * time.Second)
}
if apduReceived.Apci.UFormat == TestFRAct {
// always reply to testframes
apduToSend = NewApdu()
apduToSend.Apci.FrameFormat = UFormatFrame
apduToSend.Apci.UFormat = TestFRCon
state.Chans.ToSend <- apduToSend
continue
}
// state machine
switch state.ConnState {
case STOPPED:
if isServer {
if apduReceived.Apci.UFormat == StartDTAct {
// startdt act received
apduToSend = NewApdu()
apduToSend.Apci.FrameFormat = UFormatFrame
apduToSend.Apci.UFormat = StartDTCon
state.Chans.ToSend <- apduToSend
state.ConnState = STARTED
logInfo.Println("Entering state STARTED")
}
}
if isClient && (state.dt_act_sent == StartDTAct) {
state.dt_act_sent = 0
state.ConnState = PENDING_STARTED
logInfo.Println("Entering state PENDING_STARTED")
}
case PENDING_STARTED:
if apduReceived.Apci.UFormat == StartDTCon {
logInfo.Println("Entering state STARTED")
state.ConnState = STARTED
}
case STARTED:
if isServer {
if apduReceived.Apci.UFormat == StopDTAct {
// stopdt act received
// todo if unconfirmed frames
// state.ConnState = PENDING_UNCONFIRMED_STOPPED
apduToSend.Apci.FrameFormat = UFormatFrame
apduToSend.Apci.UFormat = StopDTCon
state.Chans.ToSend <- apduToSend
state.ConnState = STOPPED
logInfo.Println("Entering state STOPPED")
}
}
if isClient {
if state.dt_act_sent == StopDTAct {
// we have sent stopdt act as a client
state.dt_act_sent = 0
// todo if unconfirmed frames
// state.ConnState = PENDING_UNCONFIRMED_STOPPED
state.ConnState = PENDING_STOPPED
logInfo.Println("Entering state PENDING_STOPPED")
}
}
case PENDING_STOPPED:
if apduReceived.Apci.UFormat == StopDTCon {
// we have sent stopdt act as a client OR received Stopdt con (whichever comes first)
// bug: we could receive stopdt_con twice without having sent stopdt_act
logInfo.Println("Entering state STOPPED")
state.ConnState = STOPPED
}
}
case <-state.Ctx.Done():
logDebug.Println("serverStateMachine received ctx.Done, returns")
return
}
}
}
func incrementSeqNumber(seqNumber SeqNumber) SeqNumber {
return SeqNumber((int(seqNumber) + 1) % 32768)
}
func newAck(length int) ack {
ack := ack{}
ack.openFrames = 0
ack.seqNumber = 0
ack.ring = ring.New(length)
for i := 0; i < ack.ring.Len(); i++ {
ack.ring.Value = seqNumberAndTimetag{
seqNumber: 0,
timetag: time.Time{},
}
ack.ring = ack.ring.Next()
}
return ack
}
// queueApdu adds i-formats to the ring, because they need to be ack'ed
func (ack *ack) queueApdu(apdu Apdu) {
// check if received ssn is okay
if apdu.Apci.Ssn != ack.seqNumber {
logError.Println("Error received SSn does not match internal state, received ssn: ", apdu.Apci.Ssn, "state: ", ack.seqNumber)
}
ack.ring = ack.ring.Next()
ack.ring.Value = seqNumberAndTimetag{
seqNumber: apdu.Apci.Ssn,
timetag: time.Now(),
}
ack.seqNumber = incrementSeqNumber(ack.seqNumber)
ack.openFrames++
}
// ackApdu is called when we send an i- or s-format and acknowledge received frames
// or if we receive an i- or s-format which acknowledges sent frames
func (ack *ack) ackApdu(seqNumber SeqNumber, t2ticker *time.Ticker, t2 time.Duration) {
var stillUnacked int = 0
// we go back in the ring to find the ack'ed sequence number
// the more we have to go back, the more frames are still unack'ed
for stillUnacked = 0; stillUnacked < ack.openFrames; stillUnacked++ {
// fmt.Printf("%t\n", ack.ring.Value)
if ack.ring.Value.(seqNumberAndTimetag).seqNumber == seqNumber-1 {
// all until this seq number are acknowledged
// we might have received more already, which are still open (stillUnAcked)
// fmt.Println("all acked until", seqNumber)
// fmt.Println("still unacked:", still_unacked)
ack.openFrames = stillUnacked
if stillUnacked > 0 {
timetag := ack.ring.Value.(seqNumberAndTimetag).timetag
// fmt.Println("ttag", timetag)
// fmt.Println("tnow", time.Now())
frameIsUnackedTime := time.Now().Sub(timetag)
// fmt.Println("frameisUnacked", frameIsUnackedTime)
// fmt.Println("t2:", t2)
frameMustBeAckedIn := t2 - frameIsUnackedTime
// fmt.Println("will be acked in ", frameMustBeAckedIn)
t2ticker.Reset(frameMustBeAckedIn)
} else {
// we ack'ed all items, stop ticker
// TODO: Ticker should be mutexed, because we might stop it here, although it was just started by a received I frame!
t2ticker.Stop()
}
break
}
ack.ring = ack.ring.Prev()
}
}
func (ack *ack) checkForAck(maxOpenFrames int) (bool, SeqNumber) {
if ack.openFrames >= maxOpenFrames {
seqNumber := ack.seqNumber
// fmt.Println("seq number to ack:", seqNumber)
// fmt.Println("openFrames:", ack.openFrames)
return true, seqNumber
}
return false, 0
}