Skip to content

Commit

Permalink
Handle corrupt files in store
Browse files Browse the repository at this point in the history
A corrupt file in the store should be ignored (quarantined if possible) to avoid either
preventing the connection completing or an infinite loop.

Also added test and modified queue for consistency.

closes #195
  • Loading branch information
MattBrittan authored Nov 15, 2023
2 parents a8b46ef + 2bc9396 commit 5f4de1d
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 89 deletions.
10 changes: 5 additions & 5 deletions autopaho/auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ connectionLoop:

// Connection is up, and we have at least one thing to send
for {
entry, err := c.queue.Peek() // If this succeeds, we MUST call Remove, Error or Leave
entry, err := c.queue.Peek() // If this succeeds, we MUST call Remove, Quarantine or Leave
if errors.Is(err, queue.ErrEmpty) {
c.debug.Println("everything in queue transmitted")
continue queueLoop
Expand All @@ -493,17 +493,17 @@ connectionLoop:
c.errors.Printf("error retrieving packet from queue: %s", err)
// If the packet cannot be processed, then we need to remove it from the queue
// (ideally into an error queue).
if err := entry.Error(); err != nil {
c.errors.Printf("error moving queue entry to error state: %s", err)
if err := entry.Quarantine(); err != nil {
c.errors.Printf("error moving queue entry to quarantine: %s", err)
}
continue
}

pub, ok := p.Content.(*packets.Publish)
if !ok {
c.errors.Printf("packet from queue is not a Publish")
if qErr := entry.Error(); qErr != nil {
c.errors.Printf("error moving queue entry to error state: %s", err)
if qErr := entry.Quarantine(); qErr != nil {
c.errors.Printf("error moving queue entry to quarantine: %s", err)
}
continue
}
Expand Down
34 changes: 12 additions & 22 deletions autopaho/queue/file/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
const (
folderPermissions = os.FileMode(0770)
filePermissions = os.FileMode(0666)
corruptExtension = ".CORRUPT" // quarantined files will be given this extension
)

var (
Expand All @@ -30,7 +31,6 @@ type Queue struct {
path string
prefix string
extension string
errExtension string
queueEmpty bool // true is the queue is currently empty
waiting []chan<- struct{} // closed when something arrives in the queue
waitingForEmpty []chan<- struct{} // closed when queue is empty
Expand Down Expand Up @@ -71,10 +71,9 @@ func New(path string, prefix string, extension string) (*Queue, error) {
}

q := &Queue{
path: path,
prefix: prefix,
extension: extension,
errExtension: ".corrupt",
path: path,
prefix: prefix,
extension: extension,
}

_, err := q.oldestEntry()
Expand All @@ -88,16 +87,7 @@ func New(path string, prefix string, extension string) (*Queue, error) {

}

// SetErrorExtension sets the extension to use when an error is flagged with the file
// The extension defaults to ".corrupt" (which should be fine in most situations)
// This must not be the same as the extension passed to New.
func (q *Queue) SetErrorExtension(ext string) {
q.mu.Lock()
defer q.mu.Unlock()
q.errExtension = ext
}

// Wait returns a channel that is closed when there is something in the queue
// Wait returns a channel that will be closed when there is something in the queue
func (q *Queue) Wait() chan struct{} {
c := make(chan struct{})
q.mu.Lock()
Expand Down Expand Up @@ -161,6 +151,7 @@ func (q *Queue) Peek() (queue.Entry, error) {

// put writes out an item to disk
func (q *Queue) put(p io.Reader) error {
// Use CreateTemp to generate a file with a unique name (it will be removed when packet has been transmitted)
f, err := os.CreateTemp(q.path, q.prefix+"*"+q.extension)
if err != nil {
return err
Expand Down Expand Up @@ -189,7 +180,7 @@ func (q *Queue) get() (entry, error) {
if err != nil {
return entry{}, err
}
return entry{f: f, errExt: q.errExtension}, nil
return entry{f: f}, nil
}

// oldestEntry returns the filename of the oldest entry in the queue (if any - io.EOF means none)
Expand Down Expand Up @@ -232,8 +223,7 @@ func (q *Queue) oldestEntry() (string, error) {

// entry is used to return a queue entry from Peek
type entry struct {
f *os.File
errExt string
f *os.File
}

// Reader provides access to the file contents
Expand All @@ -258,12 +248,12 @@ func (e entry) Remove() error {
return nil
}

// Flag that this entry has an error (remove from queue, potentially retaining data with error flagged)
func (e entry) Error() error {
// Quarantine flag that this entry has an error (remove from queue, potentially retaining data with error flagged)
func (e entry) Quarantine() error {
cErr := e.f.Close() // Want to attempt to move the file regardless of any errors here

// Attempt to add an extension so the file no longer be found by Peek
if err := os.Rename(e.f.Name(), e.f.Name()+e.errExt); err != nil {
// Attempt to add an extension so Peek no longer finds the file.
if err := os.Rename(e.f.Name(), e.f.Name()+corruptExtension); err != nil {
// Attempt to remove the file (important that we don't end up in an infinite loop retrieving the same file!)
if rErr := os.Remove(e.f.Name()); rErr != nil {
return err // Error from rename is best thing to return
Expand Down
5 changes: 2 additions & 3 deletions autopaho/queue/file/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func TestLeaveAndError(t *testing.T) {
if err != nil {
t.Fatalf("failed to create queue: %s", err)
}
q.SetErrorExtension(".corrupt")

if _, err := q.Peek(); !errors.Is(err, queue.ErrEmpty) {
t.Fatalf("expected ErrEmpty, got %s", err)
Expand All @@ -114,7 +113,7 @@ func TestLeaveAndError(t *testing.T) {
// Move entry to error state
if entry, err := q.Peek(); err != nil {
t.Fatalf("error peeking test entry: %s", err)
} else if err = entry.Error(); err != nil {
} else if err = entry.Quarantine(); err != nil {
t.Fatalf("error erroring test entry: %s", err)
}

Expand All @@ -134,7 +133,7 @@ func TestLeaveAndError(t *testing.T) {
if entry.IsDir() {
continue
}
if strings.HasSuffix(entry.Name(), ".corrupt") {
if strings.HasSuffix(entry.Name(), corruptExtension) {
found = true
break
}
Expand Down
6 changes: 3 additions & 3 deletions autopaho/queue/memory/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ func (q *Queue) Remove() error {
return q.remove()
}

// Error implements Entry.Error - Flag that this entry has an error (remove from queue, potentially retaining data with error flagged)
func (q *Queue) Error() error {
return q.remove() // No way for us to flag an error so we just remove the item from the queue
// Quarantine implements Entry.Quarantine - Flag that this entry has an error (remove from queue, potentially retaining data with error flagged)
func (q *Queue) Quarantine() error {
return q.remove() // No way for us to actually quarantine this, so we just remove the item from the queue
}

// remove removes the first item in the queue.
Expand Down
10 changes: 5 additions & 5 deletions autopaho/queue/memory/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func TestMemoryQueue(t *testing.T) {
}
}

// TestLeaveAndError checks that the Leave and Error functions do what is expected
func TestLeaveAndError(t *testing.T) {
// TestLeaveAndQuarantine checks that the Leave and Quarantine functions do what is expected
func TestLeaveAndQuarantine(t *testing.T) {
q := New()

if _, err := q.Peek(); !errors.Is(err, queue.ErrEmpty) {
Expand All @@ -101,14 +101,14 @@ func TestLeaveAndError(t *testing.T) {
t.Fatalf("error leaving test entry: %s", err)
}

// Move entry to error state
// Quarantine entry
if entry, err := q.Peek(); err != nil {
t.Fatalf("error peeking test entry: %s", err)
} else if err = entry.Error(); err != nil {
} else if err = entry.Quarantine(); err != nil {
t.Fatalf("error erroring test entry: %s", err)
}

// As the file has been moved to error state is should not be part of the queue
// As the file has been moved to quarantine it should not be part of the queue
if _, err := q.Peek(); !errors.Is(err, queue.ErrEmpty) {
t.Errorf("expected ErrEmpty, got %s", err)
}
Expand Down
8 changes: 4 additions & 4 deletions autopaho/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ var (
)

// Entry - permits access to a queue entry
// Users must call one of Leave, Remove, or Error when done with the entry (and before calling Peek again)
// Note that `Reader()` must noth be called after calling Leave, Remove, or Error
// Users must call one of Leave, Remove, or Quarantine when done with the entry (and before calling Peek again)
// `Reader()` must not be called after calling Leave, Remove, or Quarantine (and any Reader previously requestes should be considered invalid)
type Entry interface {
Reader() (io.Reader, error) // Provides access to the file contents, subsequent calls may return the same reader
Leave() error // Leave the entry in the queue (same entry will be returned on subsequent calls to Peek).
Remove() error // Remove this entry from the queue. Returns queue.ErrEmpty if queue is empty after operation
Error() error // Flag that this entry has an error (remove from queue, potentially retaining data with error flagged)
Quarantine() error // Flag that this entry has an error (remove from queue, potentially retaining data with error flagged)
}

// Queue provides the functionality needed to manage queued messages
Expand All @@ -29,7 +29,7 @@ type Queue interface {
Enqueue(p io.Reader) error

// Peek retrieves the oldest item from the queue without removing it
// Users must call one of Close, Remove, or Error when done with the entry, and before calling Peek again.
// Users must call one of Close, Remove, or Quarantine when done with the entry, and before calling Peek again.
// Warning: Peek is not safe for concurrent use (it may return the same Entry leading to unpredictable results)
Peek() (Entry, error)
}
20 changes: 18 additions & 2 deletions autopaho/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (
"testing"
"time"

"github.com/eclipse/paho.golang/autopaho/queue/memory"
memqueue "github.com/eclipse/paho.golang/autopaho/queue/memory"
"github.com/eclipse/paho.golang/internal/testserver"
"github.com/eclipse/paho.golang/packets"
"github.com/eclipse/paho.golang/paho"
paholog "github.com/eclipse/paho.golang/paho/log"
"github.com/eclipse/paho.golang/paho/session/state"
memstore "github.com/eclipse/paho.golang/paho/store/memory"
)

//
Expand Down Expand Up @@ -66,18 +67,32 @@ func TestQueuedMessages(t *testing.T) {

var allowConnection atomic.Bool

// Add a corrupt item to the queue (zero bytes) - this should be logged and ignored
q := memqueue.New()
if err := q.Enqueue(bytes.NewReader(nil)); err != nil {
t.Fatalf("failed to add corrupt zero byte item to queue")
}

// custom session because we don't want the client to close it when the connection is lost
var tsDone chan struct{} // Set on AttemptConnection and closed when that test server connection is done
session := state.NewInMemory()
clientStore := memstore.New()
serverStore := memstore.New()
session := state.New(clientStore, serverStore)
session.SetErrorLogger(paholog.NewTestLogger(t, "sessionError:"))
session.SetDebugLogger(paholog.NewTestLogger(t, "sessionDebug:"))
defer session.Close()

// Add a corrupt item to the store (zero bytes) - this should be logged and ignored
clientStore.Put(1, packets.PUBLISH, bytes.NewReader(nil))
serverStore.Put(1, packets.PUBREC, bytes.NewReader(nil))

connectCount := 0
config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ConnectRetryDelay: 500 * time.Millisecond, // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
Queue: q,
AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) {
if !allowConnection.Load() {
return nil, fmt.Errorf("some random error")
Expand All @@ -94,6 +109,7 @@ func TestQueuedMessages(t *testing.T) {
}
},
Debug: logger,
Errors: logger,
PahoDebug: logger,
PahoErrors: logger,
CleanStartOnInitialConnection: false, // Want session to stay up (this is the default)
Expand Down
9 changes: 5 additions & 4 deletions internal/testserver/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ func (i *Instance) processIncoming(cp *packets.ControlPacket, out chan<- *packet
p := cp.Content.(*packets.Connect)
response := packets.NewControlPacket(packets.CONNACK)

i.sessionExpiryInterval = 0
if p.Properties.SessionExpiryInterval != nil {
i.sessionExpiryInterval = *p.Properties.SessionExpiryInterval
}

// Session is only retained if there was one (i.sessionExpiryInterval) and connect does not clean it
if i.sessionExpiryInterval == 0 || p.CleanStart == true {
i.subscriptions = make(map[string]subscription)
Expand All @@ -272,10 +277,6 @@ func (i *Instance) processIncoming(cp *packets.ControlPacket, out chan<- *packet
i.serverMIDs.Allocate(mid)
}
}
i.sessionExpiryInterval = 0
if p.Properties.SessionExpiryInterval != nil {
i.sessionExpiryInterval = *p.Properties.SessionExpiryInterval
}
// We return whatever session expiry interval was requested
response.Content.(*packets.Connack).Properties = &packets.Properties{
SessionExpiryInterval: &i.sessionExpiryInterval,
Expand Down
Loading

0 comments on commit 5f4de1d

Please sign in to comment.