From 4bff923c09f8126e84b48c678027106c46aa7c34 Mon Sep 17 00:00:00 2001 From: Matt Brittan Date: Wed, 15 Nov 2023 17:10:55 +1300 Subject: [PATCH] Handle corrupt files in store. A corrupt file in the store should be ignored (quarantined if possible) to avoid either preventing the connection completing or an infinite loop. Also added test and modified queue for consistency. closes #195 --- autopaho/auto.go | 10 ++--- autopaho/queue/file/queue.go | 34 +++++--------- autopaho/queue/file/queue_test.go | 5 +-- autopaho/queue/memory/queue.go | 6 +-- autopaho/queue/memory/queue_test.go | 10 ++--- autopaho/queue/queue.go | 8 ++-- autopaho/queue_test.go | 19 +++++++- internal/testserver/testserver.go | 9 ++-- paho/session/state/state.go | 70 +++++++++++++++++------------ paho/session/state/store.go | 10 ++++- paho/store/file/store.go | 49 +++++++++++++++----- paho/store/file/store_test.go | 21 +++++++++ paho/store/memory/store.go | 6 +++ 13 files changed, 169 insertions(+), 88 deletions(-) diff --git a/autopaho/auto.go b/autopaho/auto.go index aca176f..3843982 100644 --- a/autopaho/auto.go +++ b/autopaho/auto.go @@ -477,7 +477,7 @@ connectionLoop: // Connection is up, and we have at least one thing to send for { - entry, err := c.queue.Peek() // If this succeeds, we MUST call Remove, Error or Leave + entry, err := c.queue.Peek() // If this succeeds, we MUST call Remove, Quarantine or Leave if errors.Is(err, queue.ErrEmpty) { c.debug.Println("everything in queue transmitted") continue queueLoop @@ -493,8 +493,8 @@ connectionLoop: c.errors.Printf("error retrieving packet from queue: %s", err) // If the packet cannot be processed, then we need to remove it from the queue // (ideally into an error queue). - if err := entry.Error(); err != nil { - c.errors.Printf("error moving queue entry to error state: %s", err) + if err := entry.Quarantine(); err != nil { + c.errors.Printf("error moving queue entry to quarantine: %s", err) } continue } @@ -502,8 +502,8 @@ connectionLoop: pub, ok := p.Content.(*packets.Publish) if !ok { c.errors.Printf("packet from queue is not a Publish") - if qErr := entry.Error(); qErr != nil { - c.errors.Printf("error moving queue entry to error state: %s", err) + if qErr := entry.Quarantine(); qErr != nil { + c.errors.Printf("error moving queue entry to quarantine: %s", err) } continue } diff --git a/autopaho/queue/file/queue.go b/autopaho/queue/file/queue.go index 97d27d6..bee833e 100644 --- a/autopaho/queue/file/queue.go +++ b/autopaho/queue/file/queue.go @@ -18,6 +18,7 @@ import ( const ( folderPermissions = os.FileMode(0770) filePermissions = os.FileMode(0666) + corruptExtension = ".CORRUPT" // quarantined files will be given this extension ) var ( @@ -30,7 +31,6 @@ type Queue struct { path string prefix string extension string - errExtension string queueEmpty bool // true is the queue is currently empty waiting []chan<- struct{} // closed when something arrives in the queue waitingForEmpty []chan<- struct{} // closed when queue is empty @@ -71,10 +71,9 @@ func New(path string, prefix string, extension string) (*Queue, error) { } q := &Queue{ - path: path, - prefix: prefix, - extension: extension, - errExtension: ".corrupt", + path: path, + prefix: prefix, + extension: extension, } _, err := q.oldestEntry() @@ -88,16 +87,7 @@ func New(path string, prefix string, extension string) (*Queue, error) { } -// SetErrorExtension sets the extension to use when an error is flagged with the file -// The extension defaults to ".corrupt" (which should be fine in most situations) -// This must not be the same as the extension passed to New. -func (q *Queue) SetErrorExtension(ext string) { - q.mu.Lock() - defer q.mu.Unlock() - q.errExtension = ext -} - -// Wait returns a channel that is closed when there is something in the queue +// Wait returns a channel that will be closed when there is something in the queue func (q *Queue) Wait() chan struct{} { c := make(chan struct{}) q.mu.Lock() @@ -161,6 +151,7 @@ func (q *Queue) Peek() (queue.Entry, error) { // put writes out an item to disk func (q *Queue) put(p io.Reader) error { + // Use CreateTemp to generate a file with a unique name (it will be removed when packet has been transmitted) f, err := os.CreateTemp(q.path, q.prefix+"*"+q.extension) if err != nil { return err @@ -189,7 +180,7 @@ func (q *Queue) get() (entry, error) { if err != nil { return entry{}, err } - return entry{f: f, errExt: q.errExtension}, nil + return entry{f: f}, nil } // oldestEntry returns the filename of the oldest entry in the queue (if any - io.EOF means none) @@ -232,8 +223,7 @@ func (q *Queue) oldestEntry() (string, error) { // entry is used to return a queue entry from Peek type entry struct { - f *os.File - errExt string + f *os.File } // Reader provides access to the file contents @@ -258,12 +248,12 @@ func (e entry) Remove() error { return nil } -// Flag that this entry has an error (remove from queue, potentially retaining data with error flagged) -func (e entry) Error() error { +// Quarantine flag that this entry has an error (remove from queue, potentially retaining data with error flagged) +func (e entry) Quarantine() error { cErr := e.f.Close() // Want to attempt to move the file regardless of any errors here - // Attempt to add an extension so the file no longer be found by Peek - if err := os.Rename(e.f.Name(), e.f.Name()+e.errExt); err != nil { + // Attempt to add an extension so Peek no longer finds the file. + if err := os.Rename(e.f.Name(), e.f.Name()+corruptExtension); err != nil { // Attempt to remove the file (important that we don't end up in an infinite loop retrieving the same file!) if rErr := os.Remove(e.f.Name()); rErr != nil { return err // Error from rename is best thing to return diff --git a/autopaho/queue/file/queue_test.go b/autopaho/queue/file/queue_test.go index d9543d3..747fe8b 100644 --- a/autopaho/queue/file/queue_test.go +++ b/autopaho/queue/file/queue_test.go @@ -93,7 +93,6 @@ func TestLeaveAndError(t *testing.T) { if err != nil { t.Fatalf("failed to create queue: %s", err) } - q.SetErrorExtension(".corrupt") if _, err := q.Peek(); !errors.Is(err, queue.ErrEmpty) { t.Fatalf("expected ErrEmpty, got %s", err) @@ -114,7 +113,7 @@ func TestLeaveAndError(t *testing.T) { // Move entry to error state if entry, err := q.Peek(); err != nil { t.Fatalf("error peeking test entry: %s", err) - } else if err = entry.Error(); err != nil { + } else if err = entry.Quarantine(); err != nil { t.Fatalf("error erroring test entry: %s", err) } @@ -134,7 +133,7 @@ func TestLeaveAndError(t *testing.T) { if entry.IsDir() { continue } - if strings.HasSuffix(entry.Name(), ".corrupt") { + if strings.HasSuffix(entry.Name(), corruptExtension) { found = true break } diff --git a/autopaho/queue/memory/queue.go b/autopaho/queue/memory/queue.go index eddee54..ad9977c 100644 --- a/autopaho/queue/memory/queue.go +++ b/autopaho/queue/memory/queue.go @@ -101,9 +101,9 @@ func (q *Queue) Remove() error { return q.remove() } -// Error implements Entry.Error - Flag that this entry has an error (remove from queue, potentially retaining data with error flagged) -func (q *Queue) Error() error { - return q.remove() // No way for us to flag an error so we just remove the item from the queue +// Quarantine implements Entry.Quarantine - Flag that this entry has an error (remove from queue, potentially retaining data with error flagged) +func (q *Queue) Quarantine() error { + return q.remove() // No way for us to actually quarantine this, so we just remove the item from the queue } // remove removes the first item in the queue. diff --git a/autopaho/queue/memory/queue_test.go b/autopaho/queue/memory/queue_test.go index c630446..23758f5 100644 --- a/autopaho/queue/memory/queue_test.go +++ b/autopaho/queue/memory/queue_test.go @@ -81,8 +81,8 @@ func TestMemoryQueue(t *testing.T) { } } -// TestLeaveAndError checks that the Leave and Error functions do what is expected -func TestLeaveAndError(t *testing.T) { +// TestLeaveAndQuarantine checks that the Leave and Quarantine functions do what is expected +func TestLeaveAndQuarantine(t *testing.T) { q := New() if _, err := q.Peek(); !errors.Is(err, queue.ErrEmpty) { @@ -101,14 +101,14 @@ func TestLeaveAndError(t *testing.T) { t.Fatalf("error leaving test entry: %s", err) } - // Move entry to error state + // Quarantine entry if entry, err := q.Peek(); err != nil { t.Fatalf("error peeking test entry: %s", err) - } else if err = entry.Error(); err != nil { + } else if err = entry.Quarantine(); err != nil { t.Fatalf("error erroring test entry: %s", err) } - // As the file has been moved to error state is should not be part of the queue + // As the file has been moved to quarantine it should not be part of the queue if _, err := q.Peek(); !errors.Is(err, queue.ErrEmpty) { t.Errorf("expected ErrEmpty, got %s", err) } diff --git a/autopaho/queue/queue.go b/autopaho/queue/queue.go index dbc11cf..6ec559d 100644 --- a/autopaho/queue/queue.go +++ b/autopaho/queue/queue.go @@ -10,13 +10,13 @@ var ( ) // Entry - permits access to a queue entry -// Users must call one of Leave, Remove, or Error when done with the entry (and before calling Peek again) -// Note that `Reader()` must noth be called after calling Leave, Remove, or Error +// Users must call one of Leave, Remove, or Quarantine when done with the entry (and before calling Peek again) +// `Reader()` must not be called after calling Leave, Remove, or Quarantine (and any Reader previously requestes should be considered invalid) type Entry interface { Reader() (io.Reader, error) // Provides access to the file contents, subsequent calls may return the same reader Leave() error // Leave the entry in the queue (same entry will be returned on subsequent calls to Peek). Remove() error // Remove this entry from the queue. Returns queue.ErrEmpty if queue is empty after operation - Error() error // Flag that this entry has an error (remove from queue, potentially retaining data with error flagged) + Quarantine() error // Flag that this entry has an error (remove from queue, potentially retaining data with error flagged) } // Queue provides the functionality needed to manage queued messages @@ -29,7 +29,7 @@ type Queue interface { Enqueue(p io.Reader) error // Peek retrieves the oldest item from the queue without removing it - // Users must call one of Close, Remove, or Error when done with the entry, and before calling Peek again. + // Users must call one of Close, Remove, or Quarantine when done with the entry, and before calling Peek again. // Warning: Peek is not safe for concurrent use (it may return the same Entry leading to unpredictable results) Peek() (Entry, error) } diff --git a/autopaho/queue_test.go b/autopaho/queue_test.go index 7031c99..d4b46eb 100644 --- a/autopaho/queue_test.go +++ b/autopaho/queue_test.go @@ -10,11 +10,13 @@ import ( "testing" "time" + memqueue "github.com/eclipse/paho.golang/autopaho/queue/memory" "github.com/eclipse/paho.golang/internal/testserver" "github.com/eclipse/paho.golang/packets" "github.com/eclipse/paho.golang/paho" paholog "github.com/eclipse/paho.golang/paho/log" "github.com/eclipse/paho.golang/paho/session/state" + memstore "github.com/eclipse/paho.golang/paho/store/memory" ) // @@ -63,18 +65,32 @@ func TestQueuedMessages(t *testing.T) { var allowConnection atomic.Bool + // Add a corrupt item to the queue (zero bytes) - this should be logged and ignored + q := memqueue.New() + if err := q.Enqueue(bytes.NewReader(nil)); err != nil { + t.Fatalf("failed to add corrupt zero byte item to queue") + } + // custom session because we don't want the client to close it when the connection is lost var tsDone chan struct{} // Set on AttemptConnection and closed when that test server connection is done - session := state.NewInMemory() + clientStore := memstore.New() + serverStore := memstore.New() + session := state.New(clientStore, serverStore) session.SetErrorLogger(paholog.NewTestLogger(t, "sessionError:")) session.SetDebugLogger(paholog.NewTestLogger(t, "sessionDebug:")) defer session.Close() + + // Add a corrupt item to the store (zero bytes) - this should be logged and ignored + clientStore.Put(1, packets.PUBLISH, bytes.NewReader(nil)) + serverStore.Put(1, packets.PUBREC, bytes.NewReader(nil)) + connectCount := 0 config := ClientConfig{ ServerUrls: []*url.URL{server}, KeepAlive: 60, ConnectRetryDelay: 500 * time.Millisecond, // Retry connection very quickly! ConnectTimeout: shortDelay, // Connection should come up very quickly + Queue: q, AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) { if !allowConnection.Load() { return nil, fmt.Errorf("some random error") @@ -91,6 +107,7 @@ func TestQueuedMessages(t *testing.T) { } }, Debug: logger, + Errors: logger, PahoDebug: logger, PahoErrors: logger, CleanStartOnInitialConnection: false, // Want session to stay up (this is the default) diff --git a/internal/testserver/testserver.go b/internal/testserver/testserver.go index a9d722b..cb3bebb 100644 --- a/internal/testserver/testserver.go +++ b/internal/testserver/testserver.go @@ -257,6 +257,11 @@ func (i *Instance) processIncoming(cp *packets.ControlPacket, out chan<- *packet p := cp.Content.(*packets.Connect) response := packets.NewControlPacket(packets.CONNACK) + i.sessionExpiryInterval = 0 + if p.Properties.SessionExpiryInterval != nil { + i.sessionExpiryInterval = *p.Properties.SessionExpiryInterval + } + // Session is only retained if there was one (i.sessionExpiryInterval) and connect does not clean it if i.sessionExpiryInterval == 0 || p.CleanStart == true { i.subscriptions = make(map[string]subscription) @@ -272,10 +277,6 @@ func (i *Instance) processIncoming(cp *packets.ControlPacket, out chan<- *packet i.serverMIDs.Allocate(mid) } } - i.sessionExpiryInterval = 0 - if p.Properties.SessionExpiryInterval != nil { - i.sessionExpiryInterval = *p.Properties.SessionExpiryInterval - } // We return whatever session expiry interval was requested response.Content.(*packets.Connack).Properties = &packets.Properties{ SessionExpiryInterval: &i.sessionExpiryInterval, diff --git a/paho/session/state/state.go b/paho/session/state/state.go index c5c5272..c17f21d 100644 --- a/paho/session/state/state.go +++ b/paho/session/state/state.go @@ -1,7 +1,6 @@ package state import ( - "bytes" "context" "errors" "fmt" @@ -202,44 +201,51 @@ func (s *State) ConAckReceived(conn io.Writer, cp *packets.Connect, ca *packets. s.debug.Printf("resending message ID %d", id) r, err := s.clientStore.Get(id) if err != nil { - s.errors.Printf("failed to load packet %d from store: %s", id, err) + s.errors.Printf("failed to load packet %d from client store: %s", id, err) continue } - // The messages being retransmitted form part of the "send quota"; we currently add this without blocking - // meaning that inflight messages may exceed Receive Maximum if this has decreased since the last connection. - // This means that we should really hold off resending PUBLISH messages in excess of Receive Maximum until - // acknowledgment of previous messages is received. However, this appears to be a fairly unusual situation - // (receive maximum rarely changes between connections) and implementing a fix will increase complexity. - // TODO: Honor Receive Maximum when retransmitting messages. - s.inflight.Retransmit() // This will never block (but is needed to block new messages) - - // DUP needs to be set when resending PUBLISH (but there is no real need to fully parse the message, so we don't) - fixedHeader := make([]byte, 2) - _, err = io.ReadFull(r, fixedHeader) - if err != nil { - _ = r.Close() - return fmt.Errorf("failed to retrieve packet %d from client store: %w", id, err) + // DUP needs to be set when resending PUBLISH + // Read/parse the full packet, so we can detect corruption (e.g. 0 byte file) + p, err := packets.ReadPacket(r) + if cErr := r.Close(); cErr != nil { + s.errors.Printf("failed to close stored client packet %d: %s", id, cErr) } - packetType := fixedHeader[0] >> 4 - switch packetType { + if err != nil { // If the packet cannot be read, we quarantine it; otherwise we may retry infinitely. + if err := s.clientStore.Quarantine(id); err != nil { + s.errors.Printf("failed to quarantine packet %d from client store: %s", id, err) + } + s.errors.Printf("failed to retrieve/parse packet %d from client store: %s", id, err) + continue + } + + switch p.Type { case packets.PUBLISH: - fixedHeader[0] |= 1 << 3 // Set the DUP flag + pub := p.Content.(*packets.Publish) + pub.Duplicate = true case packets.PUBREL: default: - return fmt.Errorf("unexpected packet type %d (for packet identifier %d) in client store", packetType, id) + if err := s.clientStore.Quarantine(id); err != nil { + s.errors.Printf("failed to quarantine packet %d from client store: %s", id, err) + } + s.errors.Printf("unexpected packet type %d (for packet identifier %d) in client store", p.Type, id) + continue } - _, err = io.Copy(conn, io.MultiReader(bytes.NewReader(fixedHeader), r)) - _ = r.Close() - if err != nil { + + // The messages being retransmitted form part of the "send quota"; however, as per the V5 spec, + // the limit does not apply to messages being resent (the quota can go under 0) + s.inflight.Retransmit() // This will never block (but is needed to block new messages) + + // Any failure from this point should result in loss of connection (so fatal) + if _, err := p.WriteTo(conn); err != nil { s.debug.Printf("retransmitting of identifier %d failed: %s", id, err) return fmt.Errorf("failed to retransmit message (%d): %w", id, err) } s.debug.Printf("retransmitted message with identifier %d", id) - // On initial connection, the packet needs to be added to our record of client generated packets. + // On initial connection, the packet needs to be added to our record of client-generated packets. if _, ok := s.clientPackets[id]; !ok { s.clientPackets[id] = clientGenerated{ - packetType: packetType, + packetType: p.Type, responseChan: make(chan packets.ControlPacket, 1), // Nothing will wait on this } } @@ -259,7 +265,7 @@ func (s *State) loadServerSession(ca *packets.Connack) error { for _, id := range ids { r, err := s.serverStore.Get(id) if err != nil { - s.errors.Printf("failed to load packet %d from store: %s", id, err) + s.errors.Printf("failed to load packet %d from server store: %s", id, err) continue } // We only need to know the packet type so there is no need to process the entire packet @@ -267,7 +273,11 @@ func (s *State) loadServerSession(ca *packets.Connack) error { _, err = r.Read(byte1) _ = r.Close() if err != nil { - return fmt.Errorf("failed to retrieve packet %d from server store: %w", id, err) + if err := s.serverStore.Quarantine(id); err != nil { + s.errors.Printf("failed to quarantine packet %d from server store (failed to read): %s", id, err) + } + s.errors.Printf("packet %d from server store could not be read: %s", id, err) + continue // don't want to fail so quarantine and continue is the best we can do } packetType := byte1[0] >> 4 switch packetType { @@ -276,7 +286,11 @@ func (s *State) loadServerSession(ca *packets.Connack) error { case packets.PUBREC: s.serverPackets[id] = packets.PUBREC default: - return fmt.Errorf("unexpected packet type %d (for packet identifier %d) in server store", packetType, id) + if err := s.serverStore.Quarantine(id); err != nil { + s.errors.Printf("failed to quarantine packet %d from server store: %s", id, err) + } + s.errors.Printf("packet %d from server store had unexpected type %d", id, packetType) + continue // don't want to fail so quarantine and continue is the best we can do } } return nil diff --git a/paho/session/state/store.go b/paho/session/state/store.go index 38001a2..b6617df 100644 --- a/paho/session/state/store.go +++ b/paho/session/state/store.go @@ -9,6 +9,12 @@ type storer interface { Put(packetID uint16, packetType byte, w io.WriterTo) error // Store the packet Get(packetID uint16) (io.ReadCloser, error) // Retrieve the packet with the specified in ID Delete(id uint16) error // Removes the message with the specified store ID - List() ([]uint16, error) // Returns packet IDs in the order they were Put - Reset() error // Clears the store (deleting all messages) + + // Quarantine sets the message with the specified store ID into an error state; this may mean deleting it or storing + // it somewhere separate. This is intended for use when a corrupt packet is detected (as this may result in data + // loss, it's beneficial to have access to corrupt packets for analysis). + Quarantine(id uint16) error + + List() ([]uint16, error) // Returns packet IDs in the order they were Put + Reset() error // Clears the store (deleting all messages) } diff --git a/paho/store/file/store.go b/paho/store/file/store.go index 808f081..ed82547 100644 --- a/paho/store/file/store.go +++ b/paho/store/file/store.go @@ -15,6 +15,7 @@ const ( folderPermissions = os.FileMode(0770) filePermissions = os.FileMode(0666) tmpExtension = ".tmp" + corruptExtension = ".CORRUPT" // quarantined files will be given this extension ) // New creates a file Store. Note that a file is written, read and deleted as part of this process to check that the @@ -74,19 +75,23 @@ type Store struct { func (s *Store) Put(packetID uint16, packetType byte, w io.WriterTo) error { s.Lock() defer s.Unlock() - tmpFn := s.tmpPathForId(packetID) - f, err := os.OpenFile(tmpFn, os.O_RDWR|os.O_CREATE|os.O_TRUNC, filePermissions) + f, err := os.CreateTemp(s.path, s.fileNamePrefix(packetID)+"-*"+s.extension+tmpExtension) if err != nil { return fmt.Errorf("failed to create temp file: %w", err) } + tmpFn := f.Name() if _, err = w.WriteTo(f); err != nil { + f.Close() + _ = os.Remove(tmpFn) return fmt.Errorf("failed to write packet to temp file: %w", err) } if err = f.Close(); err != nil { + _ = os.Remove(tmpFn) return fmt.Errorf("failed to close temp file: %w", err) } - if err = os.Rename(tmpFn, s.pathForId(packetID)); err != nil { + if err = os.Rename(tmpFn, s.filePathForId(packetID)); err != nil { + _ = os.Remove(tmpFn) return fmt.Errorf("failed to rename temp file: %w", err) } return nil @@ -97,7 +102,7 @@ func (s *Store) Put(packetID uint16, packetType byte, w io.WriterTo) error { func (s *Store) Get(packetID uint16) (io.ReadCloser, error) { s.Lock() defer s.Unlock() - f, err := os.OpenFile(s.pathForId(packetID), os.O_RDONLY, 0) + f, err := os.OpenFile(s.filePathForId(packetID), os.O_RDONLY, 0) if err != nil { return nil, fmt.Errorf("failed to open packet file: %w", err) } @@ -111,6 +116,28 @@ func (s *Store) Delete(id uint16) error { return s.delete(id) } +// Quarantine is called if a corrupt packet is detected. +// There is little we can do other than deleting the packet. +func (s *Store) Quarantine(id uint16) error { + s.Lock() + defer s.Unlock() + f, err := os.CreateTemp(s.path, s.fileNamePrefix(id)+"-*"+s.extension+corruptExtension) + if err != nil { + s.delete(id) // delete the file (otherwise it may be sent on every reconnection) + return fmt.Errorf("failed to create quarantine file: %w", err) + } + tmpFn := f.Name() + if err := f.Close(); err != nil { + s.delete(id) // delete the file (otherwise it may be sent on every reconnection) + return fmt.Errorf("failed to close newly created quarantine file: %w", err) + } + if err := os.Rename(s.filePathForId(id), tmpFn); err != nil { + s.delete(id) // delete the file (otherwise it may be sent on every reconnection) + return fmt.Errorf("failed to move packet into quarantine: %w", err) + } + return nil +} + type idAndModTime struct { id uint16 modTime time.Time @@ -188,18 +215,18 @@ func (s *Store) list() ([]uint16, error) { // delete removes the message with the specified store ID // caller must gain any required locks func (s *Store) delete(id uint16) error { - if err := os.Remove(s.pathForId(id)); err != nil { + if err := os.Remove(s.filePathForId(id)); err != nil { return fmt.Errorf("failed to remove packet file: %w", err) } return nil } -// pathForId returns the full path of the file used to store info for the passed in packet id -func (s *Store) pathForId(packetID uint16) string { - return filepath.Join(s.path, s.prefix+strconv.FormatInt(int64(packetID), 10)+s.extension) +// filePathForId returns the full path of the file used to store info for the passed in packet id +func (s *Store) filePathForId(packetID uint16) string { + return filepath.Join(s.path, s.fileNamePrefix(packetID)+s.extension) } -// tmpPathForId returns path to a temporary file to be used when writing out a packet -func (s *Store) tmpPathForId(packetID uint16) string { - return filepath.Join(s.path, s.prefix+strconv.FormatInt(int64(packetID), 10)+tmpExtension) +// fileNamePrefix returns the beginning of the filename for the specified packet ID +func (s *Store) fileNamePrefix(packetID uint16) string { + return s.prefix + strconv.FormatInt(int64(packetID), 10) } diff --git a/paho/store/file/store_test.go b/paho/store/file/store_test.go index f87f32f..4d147cf 100644 --- a/paho/store/file/store_test.go +++ b/paho/store/file/store_test.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "os" + "strings" "testing" "github.com/eclipse/paho.golang/packets" @@ -110,6 +111,26 @@ func TestFileStoreNaming(t *testing.T) { if entries[0].Name() != "BlahXX1.txt" { t.Fatalf("filename not as expected; got %s", entries[0].Name()) } + + if err := s.Quarantine(1); err != nil { + t.Fatalf("failed to Quarantine: %s", err) + } + entries, err = os.ReadDir(dir) + if err != nil { + t.Fatalf("failed to read dir: %s", err) + } + if len(entries) != 1 { + t.Fatalf("should be one file; got %#v", entries) + } + fn := entries[0].Name() + if !strings.HasPrefix(fn, "BlahXX1") { + t.Fatalf("quarantine filename prefix not as expected; got %s", entries[0].Name()) + } + + if !strings.HasSuffix(fn, corruptExtension) { + t.Fatalf("quarantine filename suffix not as expected; got %s", entries[0].Name()) + } + } // TestFileStoreBig creates a fully populated Store and checks things work diff --git a/paho/store/memory/store.go b/paho/store/memory/store.go index e7d745e..fc608e2 100644 --- a/paho/store/memory/store.go +++ b/paho/store/memory/store.go @@ -77,6 +77,12 @@ func (m *Store) Delete(id uint16) error { return nil } +// Quarantine is called if a corrupt packet is detected. +// There is little we can do other than deleting the packet. +func (m *Store) Quarantine(id uint16) error { + return m.Delete(id) +} + // List returns packet IDs in the order they were Put func (m *Store) List() ([]uint16, error) { m.Lock()