-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
nsq_to_file: check for rotate size specifically in rev-incr loop #1123
Conversation
instead of calling needsRotation() because it is theoretically possible that the time-format could roll-over in the middle of the updateFile() rev-incr loop, so needsRotation() would always return true, and the loop would never terminate
do we need the rotate interval check copied too? |
The rotate interval should not be hit, because This part of the loop is really about corner cases. What if a later REV of the same datestamp file already exists, and we are not gzipping so we can open an existing file with append mode, but then the existing file already meets the rotate size? I guess this is possible if So back to the rotate-time check: I don't think we can figure out the create time of an existing file on disk in a good way. Posix offers last-metadata-change-time and last-contents-change-time. Maybe if rotating based on time, the best we can do is just always pick a new file instead of an existing file? |
Would some of this be easier if we just always used exclusive mode? What is the argument for preserving the behavior of multiple processes appending to the same file? Is that even safe to do? |
Multiple processes at the same time is not the intended case ... it's for when a service restarts, it can pick up where it left off, no need for REVs (if no gzip and no size or time limit, just "date-file-format") |
I think I'm confused from your comment above:
It doesn't seem like we want it to be possible for two processes to open the same file. The only way to do that would be to always use Regarding the addition of rotate time checking in this loop, your explanation makes sense. I'm still generally unhappy with the complexity of the code in |
Close() should use current filename and rev if it needs to resolve an exclusiveRename() destination conflict
The "nsq_to_file_jepsen" test starts a bunch of What we do want to handle is if the service is restarted. On restart, continuing to append to the same file is probably OK - if the previous run exited cleanly it definitely is, and even if it didn't there's still a good chance that a whole message write finished last, but there's a chance that half the last written message (un-acked) is effectively prepended to the first new message (will be acked), messing it up. We may also want to try to handle two runs accidentally overlapping (on the same server). If in this case one exits because it can't move the file, because the other one did, that seems fine. There's the more worrying chance of interleaved messages not being atomic when both processes append to the same file, but it's still possible (and somewhat likely) for small message writes to be all be cleanly appended, effectively atomically. But I don't want to worry too much about this case ... you could use gzip or (now) rotate-interval to avoid it, if your setup could provoke it. I pushed up two more small tweaks, take a look. Maybe they're enough for now :) |
successfully tested with modified pseudo-jepsen: --- nsq_to_file_jepsen.go.orig 2019-01-08 19:33:04.000000000 -0500
+++ nsq_to_file_jepsen.go 2019-01-08 19:28:43.000000000 -0500
@@ -45,6 +45,7 @@
"os/exec"
"os/signal"
"path/filepath"
+ "strings"
"syscall"
"time"
@@ -97,7 +98,7 @@
"-work-dir", filepath.Join(outputRoot, "work"),
"-filename-format", "<TOPIC>/<TOPIC>.<DATETIME>+0000.<HOST><REV>.log",
"-datetime-format", "%Y-%m-%d_%H-%M",
- "-gzip",
+ "-rotate-interval", "30s",
)
cmd.Stdout = logSink
cmd.Stderr = logSink
@@ -284,13 +285,18 @@
return fmt.Errorf("error opening output file %s: %s", path, err)
}
defer f.Close()
- gzf, err := gzip.NewReader(f)
- if err != nil {
- return fmt.Errorf("error opening gzip reader: %s: %s", path, err)
+ var rf io.Reader = f
+
+ if strings.HasSuffix(path, ".gz") {
+ gzf, err := gzip.NewReader(f)
+ if err != nil {
+ return fmt.Errorf("error opening gzip reader: %s: %s", path, err)
+ }
+ defer gzf.Close()
+ rf = gzf
}
- defer gzf.Close()
- scanner := bufio.NewScanner(gzf)
+ scanner := bufio.NewScanner(rf)
for scanner.Scan() {
line := scanner.Bytes()
msg := &Msg{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, LGTM
I ran --- a/apps/nsq_to_file/file_logger.go
+++ b/apps/nsq_to_file/file_logger.go
@@ -112,16 +112,11 @@ func (f *FileLogger) router() {
f.updateFile()
sync = true
}
- _, err := f.Write(m.Body)
+ _, err := f.Write(append(m.Body, byte('\n')))
if err != nil {
f.logf(lg.FATAL, "writing message to disk: %s", err)
os.Exit(1)
}
- _, err = f.Write([]byte("\n"))
- if err != nil {
- f.logf(lg.FATAL, "writing newline to disk: %s", err)
- os.Exit(1)
- }
output[pos] = m
pos++
if pos == cap(output) { without that tweak, the newlines were mixed up:
|
Interesting, those separate writes stood out to me when I was refactoring, but I suspect that the Ideally we'd use something like vectorio ( Want to give it a try? |
I think the special The message size affects whether an extra copy or an extra syscall is slower ... It'll be at least a few days before I revisit this, so if someone else is interested, go ahead :) |
heh, of course |
instead of calling
needsRotation()
because it is theoretically possible that the time-format couldroll-over in the middle of the
updateFile()
rev-incr loop, soneedsRotation()
would always returntrue
, and the loop would never terminate.cc @mreiferson @jehiah
I thought of this while playing with @mccutchen's
nsq_to_file_jepsen.go
and tweaking it to work with non-gzip mode as well. That doesn't work because all the concurrentnsq_to_file
instances using the same filename format at the same time can open the same file as each other in append mode (if not using gzip), so only one successfully moves the file and the others fail due to source file missing, and exit with failure. (This seems like "working as designed" to me.)While figuring that out, I figured that calling
needsRotation()
insideupdateFile()
is an iffy proposition -f.filename
andf.openTime
could causeneedsRotation()
to return true, but that won't updatef.filename
orf.openTime
. It seemed like duplicating the filesize check was the simplest thing to do here. (And note thatupdateFile()
is always called immediately afterneedsRotation()
.)