From 44d52ac138f32944c5f71d816fe24af69437511c Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Wed, 7 Oct 2015 02:14:18 -0700 Subject: [PATCH] Fully lock HH node queue creation I believe this change address the issues with hinted-handoff not fully replicating all data to nodes that come back online after an outage.. A detailed explanation follows. During testing of of hinted-handoff (HH) under various scenarios, HH stats showed that the HH Processor was occasionally encountering errors while unmarshalling hinted data. This error was not handled completely correctly, and in clusters with more than 3 nodes, this could cause the HH service to stall until the node was restarted. This was the high-level reason why HH data was not being replicated. Furthermore by watching, at the byte-level, the hinted-handoff data it could be seen that HH segment block lengths were getting randomly set to 0, but the block data itself was fine (Block data contains hinted writes). This was the root cause of the unmarshalling errors outlined above. This, in turn, was tracked down to the HH system opening each segment file multiple times concurrently, which was not file-level thread-safe, so these mutiple open calls were corrupting the file. Finally, the reason a segment file was being opened multiple times in parallel was because WriteShard on the HH Processor was checking for node queues in an unsafe manner. Since WriteShard can be called concurrently this was adding queues for the same node more than once, and each queue-addition results in opening segment files. This change fixes the locking in WriteShard such the check for an existing HH queue for a given node is performed in a synchronized manner. --- services/hh/processor.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/services/hh/processor.go b/services/hh/processor.go index f0c200eb23d..1b3b5b323ad 100644 --- a/services/hh/processor.go +++ b/services/hh/processor.go @@ -103,10 +103,9 @@ func (p *Processor) loadQueues() error { return nil } +// addQueue adds a hinted-handoff queue for the given node. This function is not thread-safe +// and the caller must ensure this function is not called concurrently. func (p *Processor) addQueue(nodeID uint64) (*queue, error) { - p.mu.Lock() - defer p.mu.Unlock() - path := filepath.Join(p.dir, strconv.FormatUint(nodeID, 10)) if err := os.MkdirAll(path, 0700); err != nil { return nil, err @@ -128,11 +127,27 @@ func (p *Processor) addQueue(nodeID uint64) (*queue, error) { return queue, nil } +// WriteShard writes hinted-handoff data for the given shard and node. Since it may manipulate +// hinted-handoff queues, and be called concurrently, it takes a lock during queue access. func (p *Processor) WriteShard(shardID, ownerID uint64, points []models.Point) error { + p.mu.RLock() queue, ok := p.queues[ownerID] + p.mu.RUnlock() if !ok { - var err error - if queue, err = p.addQueue(ownerID); err != nil { + if err := func() error { + // Check again under write-lock. + p.mu.Lock() + defer p.mu.Unlock() + + queue, ok = p.queues[ownerID] + if !ok { + var err error + if queue, err = p.addQueue(ownerID); err != nil { + return err + } + } + return nil + }(); err != nil { return err } }