Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Issue #151: Bind not restarting when user uses CloseBind #152

Merged
merged 9 commits into from
Nov 28, 2024
38 changes: 9 additions & 29 deletions receivable.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,6 @@ func (t *receivable) start() {
}()
}

// check error and do closing if needed
func (t *receivable) check(err error) (closing bool) {
if err == nil {
return
}

if t.settings.OnReceivingError != nil {
t.settings.OnReceivingError(err)
}

closing = true
return
}

func (t *receivable) loop() {
var err error
for {
Expand All @@ -96,9 +82,13 @@ func (t *receivable) loop() {
if err = t.conn.SetReadTimeout(t.settings.ReadTimeout); err == nil {
p, err = pdu.Parse(t.conn)
}
closeOnError := t.check(err)
if closeOnError {
t.closing(InvalidStreaming)
if err != nil {
if atomic.LoadInt32(&t.aliveState) == Alive {
if t.settings.OnReceivingError != nil {
t.settings.OnReceivingError(err)
}
t.closing(InvalidStreaming)
}
return
}

Expand Down Expand Up @@ -150,13 +140,9 @@ func (t *receivable) handleWindowPdu(p pdu.PDU) (closing bool) {
if t.settings.EnableAutoRespond {
t.settings.response(pp.GetResponse())
} else if t.settings.OnReceivedPduRequest != nil {
r, closeBind := t.settings.OnReceivedPduRequest(p)
r, _ := t.settings.OnReceivedPduRequest(p)
t.settings.response(r)
if closeBind {
time.Sleep(50 * time.Millisecond)
closing = true
t.closing(UnbindClosing)
}

}
case *pdu.Unbind:
if t.settings.EnableAutoRespond {
Expand All @@ -165,15 +151,12 @@ func (t *receivable) handleWindowPdu(p pdu.PDU) (closing bool) {
// wait to send response before closing
time.Sleep(50 * time.Millisecond)
closing = true
t.closing(UnbindClosing)
} else if t.settings.OnReceivedPduRequest != nil {
r, closeBind := t.settings.OnReceivedPduRequest(p)
t.settings.response(r)
if closeBind {
time.Sleep(50 * time.Millisecond)
closing = true
t.closing(UnbindClosing)

}
}
default:
Expand All @@ -183,7 +166,6 @@ func (t *receivable) handleWindowPdu(p pdu.PDU) (closing bool) {
if closeBind {
time.Sleep(50 * time.Millisecond)
closing = true
t.closing(UnbindClosing)
}
}
}
Expand All @@ -198,7 +180,6 @@ func (t *receivable) handleAllPdu(p pdu.PDU) (closing bool) {
if closeBind {
time.Sleep(50 * time.Millisecond)
closing = true
t.closing(UnbindClosing)
}
}
return
Expand All @@ -216,7 +197,6 @@ func (t *receivable) handleOrClose(p pdu.PDU) (closing bool) {
time.Sleep(50 * time.Millisecond)

closing = true
t.closing(UnbindClosing)

default:
var responded bool
Expand Down
41 changes: 24 additions & 17 deletions transceivable.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,20 +115,7 @@ func (t *transceivable) SystemID() string {

// Close transceiver and stop underlying daemons.
func (t *transceivable) Close() (err error) {
if atomic.CompareAndSwapInt32(&t.aliveState, Alive, Closed) {
// closing input and output
_ = t.out.close(StoppingProcessOnly)
_ = t.in.close(StoppingProcessOnly)

// close underlying conn
err = t.conn.Close()

// notify transceiver closed
if t.settings.OnClosed != nil {
t.settings.OnClosed(ExplicitClosing)
}
}
return
return t.closing(ExplicitClosing)
}
linxGnu marked this conversation as resolved.
Show resolved Hide resolved

// Submit a PDU.
Expand Down Expand Up @@ -159,9 +146,8 @@ func (t *transceivable) windowCleanup() {
if time.Since(request.TimeSent) > t.settings.PduExpireTimeOut {
_ = t.requestStore.Delete(ctx, request.GetSequenceNumber())
if t.settings.OnExpiredPduRequest != nil {
bindClose := t.settings.OnExpiredPduRequest(request.PDU)
if bindClose {
_ = t.Close()
if t.settings.OnExpiredPduRequest(request.PDU) {
_ = t.closing(ConnectionIssue)
}
}
}
Expand All @@ -170,3 +156,24 @@ func (t *transceivable) windowCleanup() {
}
}
}

func (t *transceivable) closing(state State) (err error) {
if atomic.CompareAndSwapInt32(&t.aliveState, Alive, Closed) {
t.cancel()

// closing input and output
_ = t.out.close(StoppingProcessOnly)
_ = t.in.close(StoppingProcessOnly)

// close underlying conn
err = t.conn.Close()

// notify transceiver closed
if t.settings.OnClosed != nil {
t.settings.OnClosed(state)
}

t.wg.Wait()
}
return
}