Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to read multiple active event files per directory #1867

Merged
merged 11 commits into from
Jul 29, 2019

Conversation

nfelt
Copy link
Contributor

@nfelt nfelt commented Feb 19, 2019

This adds an experimental option --reload_multifile=true that addresses #1063 by polling for new data from multiple active event files within a given run directory, rather than only polling the most recent event file. (See #1063 for why that behavior has been problematic.)

To avoid runaway resource usage, we don't unconditionally poll all event files, since in most cases many of them are no longer receiving new data. This PR instead introduces a notion of an "inactive age" (settable via --reload_multifile_inactive_secs) beyond which an eventfile with not event wall time more recent than that will be considered "inactive" and no longer polled. This means, for example, that users who accumulate many runs over a period of weeks in the same logdir will only poll new data from runs that have data written within a recent window. (In this respect, the new code can significantly improve resource usage against logdirs in which many runs are entirely static.) The default value of this flag is set to 4000 (somewhat over an hour) to accommodate hourly update frequencies while still representing a fairly aggressive cutoff. In theory we could compute a smarter threshold by taking into account the length of time taken by previous reloads as well as the --reload_interval value, but this seems reasonable for now.

Enabling the behavior may still result in a large increase in memory usage and disk reads / network requests for logdirs that have directories with many recently written files, since those will now all be polled. Possible further efficiency improvements include:

  1. Keeping a global upper bound on the number of open loaders (and corresponding memory buffers) to avoid runaway memory usage, and re-initializing loaders as needed (which requires some care to take underlying buffering into account and avoid re-buffering the same data needlessly), e.g. after a stat() call shows the file has grown
  2. Using backoff to poll "staler" files less often, e.g. only every N reload cycles, rather than having only a fixed cutoff of "active" vs "inactive"
  3. Modifying the event file writing logic to A) always open a new filename after some fixed age N, which would allow us to aggressively default the inactive age to just above "N", and/or B) add a sentinel event like SessionLog.STOP when closing an event file, so that when the file is truly done we can stop polling immediately
  4. Attempt to integrate with inotify/FSEvents-style change notifications (but this is very platform-specific, doesn't work for remote filesystems, and would require a much larger rewrite)

@nfelt nfelt requested a review from wchargin February 19, 2019 21:40
@nfelt nfelt changed the title Add optional logic to read multiple active event files per directory Add option to read multiple active event files per directory Feb 19, 2019
Copy link
Contributor

@stephanwlee stephanwlee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The time-based predicate and overall design looks okay to me. Few questions:

  1. when can the timestamp from TimestampedEventFileLoader be None?
  2. it looks like once an event file's time fails to write within the inactive_secs, it will never have a chance to be visited again. Kinda sucks for the case when an epoch for that took inactive_secs + 1s. Is there a way to discern if a file is current opened by another process?

@wchargin
Copy link
Contributor

High-level design in the PR description sounds good to me. I have only
skimmed the code.

3. Modifying the event file writing logic to A) always open a new
filename after some fixed age N

I like this idea. Emitting SessionLog.STOP has the problem of course
that a job may be killed ungracefully by any means, but capping the
output ourselves doesn’t have that problem. We’d need to be at least a
little careful about clock skew, etc., but that should be doable.

Apart from efficiency improvements, a stability improvement for this
solution could be to keep track of how long each event file “typically”
spends between updates. That is, empirically at runtime estimate the
“maximum interval between summary flushes to a single active file”
term. We could also directly measure the time that a reload cycle takes,
and of course we know the reload interval value, so plugging this into
your formula and pad it a bit. If we additionally switch from cutoff to
backoff, then we could consider eliminating the config option entirely.

4. Attempt to integrate with inotify/FSEvents-style change
notifications (but this is very platform-specific, doesn't work for
remote filesystems, and would require a much larger rewrite)

True, though I hear that Go’s abstracted over the platform-specificity
well enough for people to like fsnotify.

Copy link
Contributor

@wchargin wchargin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/me submits an official “review” to stop GitHub from nagging me. :-)

@nfelt
Copy link
Contributor Author

nfelt commented Jul 24, 2019

I've finally updated the PR with a different approach to the flags (rather than a single one that forces the user to set a threshold, I now have a flag to enable and a different flag to customize the threshold). I also added tests :) And I moved the new code into a new file directory_loader.py instead of intermingling it with directory_watcher.py, since it seemed cleaner to me to keep the tests separate and at that point it felt like the implementations might as well be separate too (and it's not like they actually share code).

PTAL :)

def testInactiveSecsNegative(self):
flags = FakeFlags('logdir', reload_multifile=True,
reload_multifile_inactive_secs=-1)
filter = application._get_eventfile_active_filter(flags)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we call this filter_ to avoid shadowing the builtin? Makes
the highlighting a bit confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to filter_fn if that's okay.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly.

@@ -106,11 +106,13 @@ def standard_tensorboard_wsgi(flags, plugin_loaders, assets_zip_provider):
:type plugin_loaders: list[base_plugin.TBLoader]
:rtype: TensorBoardWSGI
"""
eventfile_active_filter = _get_eventfile_active_filter(flags)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Global nit: We’ve generally called them “event files” (two words)
throughout the codebase:

$ git grep -c 'events\?file'
tensorboard/plugins/profile/profile_demo.py:1
tensorboard/plugins/profile/profile_plugin_test.py:1
$ git grep 'events\?[ _]file' | wc -l
347

Shall we stick with that instead of adding “eventfile” as an alternate
term?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, changed to two words.

"""Returns a predicate for whether an eventfile load timestamp is active.

Returns:
A predicate function accepting a single UNIX timestamp float argument.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

…or None if multi-file loading is not enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

except tf.errors.OpError:
if not tf.io.gfile.exists(self._directory):
raise directory_watcher.DirectoryDeletedError(
'Directory %s has been permanently deleted' % self._directory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want else: raise here? Or are there OpErrors that we want to
ignore? (Which ones?)

If this is intended to be else: pass, can we briefly comment why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent was to avoid a behavior change here relative to the existing logic in DirectoryWatcher. I'm a little reluctant to change to else: raise without some understanding of what errors out in the wild are now going to kill the backend reload thread, but I've changed it to at least log at INFO so we could potentially collect that data - WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

self._loaders[path] = loader
logger.info('Loading data from path %s', path)
for timestamp, value in loader.Load():
if max_timestamp is None or (timestamp is not None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn’t loader.Load() always supposed to yield (float, value) pairs?
In what case would timestamp be None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, removed the guard. Somehow I was thinking if omitted in the proto it could be None, but it's 0.0.

# d: empty file (should be considered active in absence of timestamps).
self._WriteToFile('a', ['A1', 'A2'], [1, 2])
self._WriteToFile('b', ['B1'], [1])
self._WriteToFile('c', ['C2', 'C1', 'C0'], [2, 1, 0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something that we see in practice? My understanding was that the
timestamps are added by the C++ writer itself (and users have no control
over them). I suppose third-party writers can do whatever they want, and
I appreciate the test case anyway; mostly just curious.

(I’m deliberately ignoring non-monotonic clocks around DST transitions…)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regrettably, users can indeed control wall_time, because the TF 1.x FileWriter API supports writing arbitrary Events:

python -c 'import glob; import tensorflow as tf; w = tf.summary.FileWriter("/tmp/foo"); w.add_event(tf.Event(wall_time=42.0)); w.flush(); print(list(tf.train.summary_iterator(glob.glob("/tmp/foo/event*")[0])))'
[wall_time: 1564101194.0
file_version: "brain.Event:2"
, wall_time: 42.0
]

FWIW, I deliberately kept that ability out of the TF 2.0 API: tensorflow/tensorflow@1ea3483

I'm optimistic that not many people actually make use of this ability (and obviously if users are doing anything exotic with wall_time then the resource limiting approach here may backfire), but I thought it was at least worth confirming that we do the right thing in a case where e.g. the user takes an eventfile and re-sorts the events, then writes it back out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point. Agreed, then.

all_paths = io_wrapper.ListDirectoryAbsolute(self._directory)
paths = sorted(p for p in all_paths if self._path_filter(p))
for path in paths:
for value in self._LoadPath(path):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bit of a race condition here: if listdir returns an event file that is
subsequently deleted before we invoke _LoadPath on it, the exception
will break us out of the whole Load loop.

The race window may be arbitrarily large, because it spans a coroutine
boundary.

  def testFileDeletedBeforeLoaderOpened(self):
    self._WriteToFile('a', 'a')
    self._WriteToFile('b', 'b')
    load_generator = self._loader.Load()
    self.assertEqual('a', next(load_generator))
    os.unlink(os.path.join(self._directory, 'b'))
    self.assertEqual(list(load_generator), [])  # fails: IOError(ENOENT)

Is this the desired behavior? (There might be a case for that.) I could
also see silently skipping the file, or maybe opening all loaders
eagerly—for loader in [self._LoadPath(path) for path in paths]:—though
maybe we don’t like the performance characteristics of that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Note that this race across the coroutine boundary existed in DirectoryWatcher too; I think the exception was being swallowed by the except clause below. I went with skipping the file, since opening all loaders eagerly is worse for performance (right now, we close inactive ones when we're done, so when reading an old logdir we only ever have one loader open at a time, which is a nice property to retain) and there's still the non-coroutine race in that case anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM; skipping made the most sense to me, too. Thanks!

# Create two separate event files, using filename suffix to ensure a
# deterministic sort order, and then simulate a write to file A, then
# to file B, then another write to file A (with reloads after each).
with test_util.FileWriter(run_path, filename_suffix='.a') as writer_a:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is failing in the TF 2.0 test environment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by updating the test file writer to avoid the guard against use under eager mode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great!

Copy link
Contributor Author

@nfelt nfelt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review - PTAL

"""Returns a predicate for whether an eventfile load timestamp is active.

Returns:
A predicate function accepting a single UNIX timestamp float argument.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -106,11 +106,13 @@ def standard_tensorboard_wsgi(flags, plugin_loaders, assets_zip_provider):
:type plugin_loaders: list[base_plugin.TBLoader]
:rtype: TensorBoardWSGI
"""
eventfile_active_filter = _get_eventfile_active_filter(flags)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, changed to two words.

def testInactiveSecsNegative(self):
flags = FakeFlags('logdir', reload_multifile=True,
reload_multifile_inactive_secs=-1)
filter = application._get_eventfile_active_filter(flags)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to filter_fn if that's okay.

# d: empty file (should be considered active in absence of timestamps).
self._WriteToFile('a', ['A1', 'A2'], [1, 2])
self._WriteToFile('b', ['B1'], [1])
self._WriteToFile('c', ['C2', 'C1', 'C0'], [2, 1, 0])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regrettably, users can indeed control wall_time, because the TF 1.x FileWriter API supports writing arbitrary Events:

python -c 'import glob; import tensorflow as tf; w = tf.summary.FileWriter("/tmp/foo"); w.add_event(tf.Event(wall_time=42.0)); w.flush(); print(list(tf.train.summary_iterator(glob.glob("/tmp/foo/event*")[0])))'
[wall_time: 1564101194.0
file_version: "brain.Event:2"
, wall_time: 42.0
]

FWIW, I deliberately kept that ability out of the TF 2.0 API: tensorflow/tensorflow@1ea3483

I'm optimistic that not many people actually make use of this ability (and obviously if users are doing anything exotic with wall_time then the resource limiting approach here may backfire), but I thought it was at least worth confirming that we do the right thing in a case where e.g. the user takes an eventfile and re-sorts the events, then writes it back out.

self._loaders[path] = loader
logger.info('Loading data from path %s', path)
for timestamp, value in loader.Load():
if max_timestamp is None or (timestamp is not None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, removed the guard. Somehow I was thinking if omitted in the proto it could be None, but it's 0.0.

except tf.errors.OpError:
if not tf.io.gfile.exists(self._directory):
raise directory_watcher.DirectoryDeletedError(
'Directory %s has been permanently deleted' % self._directory)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent was to avoid a behavior change here relative to the existing logic in DirectoryWatcher. I'm a little reluctant to change to else: raise without some understanding of what errors out in the wild are now going to kill the backend reload thread, but I've changed it to at least log at INFO so we could potentially collect that data - WDYT?

# Create two separate event files, using filename suffix to ensure a
# deterministic sort order, and then simulate a write to file A, then
# to file B, then another write to file A (with reloads after each).
with test_util.FileWriter(run_path, filename_suffix='.a') as writer_a:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by updating the test file writer to avoid the guard against use under eager mode.

all_paths = io_wrapper.ListDirectoryAbsolute(self._directory)
paths = sorted(p for p in all_paths if self._path_filter(p))
for path in paths:
for value in self._LoadPath(path):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Note that this race across the coroutine boundary existed in DirectoryWatcher too; I think the exception was being swallowed by the except clause below. I went with skipping the file, since opening all loaders eagerly is worse for performance (right now, we close inactive ones when we're done, so when reading an old logdir we only ever have one loader open at a time, which is a nice property to retain) and there's still the non-coroutine race in that case anyway.

Copy link
Contributor

@wchargin wchargin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two high-level questions:

  • If I understand correctly, the new DirectoryLoader may mark a file
    as inactive even if it’s the only file in the directory. Consider,
    for instance, a long-running job that writes summaries every two
    hours; previous versions of TensorBoard would continue updating
    forever, while under the new semantics we’ll only see it update
    once.

    Is this correct? It seems at least a little unfortunate. Did we
    consider not marking a file inactive when it’s the only active file?

    Test case, in case the above description is unclear
      def testFilter_onlyOneLoader(self):
        """A sole loader can be marked as inactive."""
        loader_registry = []
        loader_factory = functools.partial(
            _TimestampedByteLoader, registry=loader_registry)
        threshold = 0
        active_filter = lambda timestamp: timestamp >= threshold
        self._loader = directory_loader.DirectoryLoader(
            self._directory, loader_factory, active_filter=active_filter)
        def assertLoadersForPaths(paths):
          paths = [os.path.join(self._directory, path) for path in paths]
          self.assertEqual(loader_registry, paths)
        #
        self._WriteToFile('a', ['A1', 'A2'], [1, 2])
        threshold = 2
        # First load pass should leave file C marked inactive.
        self.assertLoaderYields(['A1', 'A2'])
        assertLoadersForPaths(['a'])
        threshold = 3
        # Second load pass should mark file A as inactive (due to newly
        # increased threshold) and thus skip reading data from it.
        self.assertLoaderYields([])
        assertLoadersForPaths([])
        # Even when we get more data and it's the only file. :-(
        self._WriteToFile('a', ['A3', 'A4'], [3, 4])
        self.assertLoaderYields([])
        assertLoadersForPaths([])
  • The two names DirectoryWatcher and DirectoryLoader tripped me up
    a bit in the course of this review. They both watch, and they both
    load; the primary difference is in their multi-file support, yes?
    Can we find clearer names? :-)

def testInactiveSecsNegative(self):
flags = FakeFlags('logdir', reload_multifile=True,
reload_multifile_inactive_secs=-1)
filter = application._get_eventfile_active_filter(flags)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly.

except tf.errors.OpError:
if not tf.io.gfile.exists(self._directory):
raise directory_watcher.DirectoryDeletedError(
'Directory %s has been permanently deleted' % self._directory)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

# d: empty file (should be considered active in absence of timestamps).
self._WriteToFile('a', ['A1', 'A2'], [1, 2])
self._WriteToFile('b', ['B1'], [1])
self._WriteToFile('c', ['C2', 'C1', 'C0'], [2, 1, 0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point. Agreed, then.

next(generator) # Ignore the file_version event.
event = next(generator)
self.assertEqual('a', event.summary.value[0].tag)
os.remove(glob.glob(os.path.join(self._directory, '*.b'))[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably fine here, but I really wish glob.glob had a directory
kwarg so that we could be sure that no globs are expanded in the first
argument to os.path.join. :-( No action required.

# Create two separate event files, using filename suffix to ensure a
# deterministic sort order, and then simulate a write to file A, then
# to file B, then another write to file A (with reloads after each).
with test_util.FileWriter(run_path, filename_suffix='.a') as writer_a:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great!

all_paths = io_wrapper.ListDirectoryAbsolute(self._directory)
paths = sorted(p for p in all_paths if self._path_filter(p))
for path in paths:
for value in self._LoadPath(path):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM; skipping made the most sense to me, too. Thanks!

@nfelt
Copy link
Contributor Author

nfelt commented Jul 27, 2019

Yes, the new loading algorithm will stop polling for new data if no files are active (it will still poll for new runs, to be clear). Honestly, I see this as a feature - it means loading up an old experiment will only read each file once and then only monitor for new files. To continue polling the last file in the directory even if it's inactive would prevent us from getting that nice property, unless we set a separate inactive threshold on that, but I honestly think that would just be more confusing for the user.

I'm open to changing the default age threshold to be longer (maybe even 24 hours) but I guess I was thinking of starting off more aggressive and then walking it back if it's an issue, to be more ambitious about curtailing resource usage, since I think it's overall easier to lengthen the threshold than shorten it (the result being higher resource usage but not unexpectedly absent data).

Re: Watcher and Loader, I originally named it MultiFileDirectoryWatcher but it's a mouthful and I wasn't crazy about that being the long-term name (assuming we migrate to this implementation only at some point). Also, since the EventAccumulator logic actually can use these in the place of an EventFileLoader, they essentially implement a "Loader" contract, so the "Loader" suffix seemed better (and people in the past complained that DirectoryWatcher suggested equivalence with an internal Watcher concept that doesn't have the same semantics). In an admittedly fuzzy sense, the old logic is more like just "watching" the latest file rather than truly loading the whole directory, whereas the new logic really does load the entire directory.

That said, I am sympathetic to the similarity in the names being somewhat confusing. Do you have a suggestion in mind?

Copy link
Contributor

@wchargin wchargin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I see this as a feature […]

Yep, this is pretty reasonable. As long as we message the change
clearly, it makes sense to me.

I'm open to changing the default age threshold to be longer

I’ve got no problem with the proposed default (the “two hours” in my
example was just ”whatever the default is + ε”). Agreed that it’s easier
to revise upward.

Do you have a suggestion in mind?

Nope! :shipit:

@nfelt
Copy link
Contributor Author

nfelt commented Jul 29, 2019

PTAL, I've updated the FAQ blurbs to be more clear that the option changes the polling criteria, rather than just expanding them (which the previous blurbs implied).

@nfelt nfelt merged commit d8d6703 into tensorflow:master Jul 29, 2019
@nfelt nfelt deleted the multifile branch July 29, 2019 21:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants