Skip to content

Commit

Permalink
Periodically flush buffered file output
Browse files Browse the repository at this point in the history
  • Loading branch information
stephen-webb committed Dec 19, 2024
1 parent cfbc89e commit bed196f
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 82 deletions.
22 changes: 22 additions & 0 deletions src/main/cpp/fileappender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <log4cxx/helpers/outputstreamwriter.h>
#include <log4cxx/helpers/bufferedwriter.h>
#include <log4cxx/helpers/bytebuffer.h>
#include "log4cxx/helpers/threadutility.h"
#include <log4cxx/private/writerappender_priv.h>
#include <log4cxx/private/fileappender_priv.h>
#include <mutex>
Expand Down Expand Up @@ -81,6 +82,8 @@ FileAppender::FileAppender(std::unique_ptr<FileAppenderPriv> priv)
FileAppender::~FileAppender()
{
finalize();
if (_priv->bufferedIO && 0 < _priv->bufferedSeconds)
ThreadUtility::instance()->removePeriodicTask(getName());
}

void FileAppender::setAppend(bool fileAppend1)
Expand Down Expand Up @@ -140,6 +143,11 @@ void FileAppender::setOption(const LogString& option,
std::lock_guard<std::recursive_mutex> lock(_priv->mutex);
_priv->bufferSize = OptionConverter::toFileSize(value, 8 * 1024);
}
else if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("BUFFEREDSECONDS"), LOG4CXX_STR("bufferedseconds")))
{
std::lock_guard<std::recursive_mutex> lock(_priv->mutex);
_priv->bufferedSeconds = OptionConverter::toInt(value, 0);
}
else
{
WriterAppender::setOption(option, value);
Expand Down Expand Up @@ -184,6 +192,20 @@ void FileAppender::activateOptionsInternal(Pool& p)
if (errors == 0)
{
WriterAppender::activateOptions(p);

if (!_priv->bufferedIO)
;
else if (0 < _priv->bufferedSeconds)
{
ThreadUtility::instance()->addPeriodicTask(getName()
, std::bind(&WriterAppenderPriv::flush, _priv)
, std::chrono::seconds(_priv->bufferedSeconds)
);
}
else if (0 == _priv->bufferedSeconds)
{
ThreadUtility::instance()->removePeriodicTask(getName());
}
}
}

Expand Down
103 changes: 36 additions & 67 deletions src/main/cpp/filewatchdog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,6 @@
#include <functional>
#include <chrono>

#if LOG4CXX_EVENTS_AT_EXIT
#include <log4cxx/private/atexitregistry.h>
#endif

using namespace LOG4CXX_NS;
using namespace LOG4CXX_NS::helpers;

Expand All @@ -37,14 +33,9 @@ long FileWatchdog::DEFAULT_DELAY = 60000;
struct FileWatchdog::FileWatchdogPrivate{
FileWatchdogPrivate(const File& file1) :
file(file1), delay(DEFAULT_DELAY), lastModif(0),
warnedAlready(false), interrupted(0), thread()
#if LOG4CXX_EVENTS_AT_EXIT
, atExitRegistryRaii([this]{stopWatcher();})
#endif
warnedAlready(false)
{ }

~FileWatchdogPrivate()
{ stopWatcher(); }

/**
The name of the file to observe for changes.
Expand All @@ -57,26 +48,14 @@ struct FileWatchdog::FileWatchdogPrivate{
long delay;
log4cxx_time_t lastModif;
bool warnedAlready;
volatile int interrupted;
#if LOG4CXX_ABI_VERSION <= 15
int interrupted{ 0 };
Pool pool;
std::thread thread;
std::condition_variable interrupt;
std::mutex interrupt_mutex;

#if LOG4CXX_EVENTS_AT_EXIT
helpers::AtExitRegistry::Raii atExitRegistryRaii;
#endif

void stopWatcher()
{
{
std::lock_guard<std::mutex> lock(interrupt_mutex);
interrupted = 0xFFFF;
}
interrupt.notify_all();
if (thread.joinable())
thread.join();
}
LogString taskName{LOG4CXX_STR("FileWatchdog")};
};

FileWatchdog::FileWatchdog(const File& file1)
Expand All @@ -86,20 +65,22 @@ FileWatchdog::FileWatchdog(const File& file1)

FileWatchdog::~FileWatchdog()
{
if (m_priv->thread.joinable())
stop();
stop();
}


bool FileWatchdog::is_active()
{
return m_priv->thread.joinable();
return ThreadUtility::instance()->hasPeriodicTask(m_priv->taskName);
}

void FileWatchdog::stop()
{
LogLog::debug(LOG4CXX_STR("Stopping file watchdog"));
m_priv->stopWatcher();
if (is_active())
{
LogLog::debug(LOG4CXX_STR("Stopping file watchdog"));
ThreadUtility::instance()->removePeriodicTask(m_priv->taskName);
}
}

const File& FileWatchdog::file()
Expand Down Expand Up @@ -141,50 +122,38 @@ void FileWatchdog::checkAndConfigure()
}
}

void FileWatchdog::run()
{
if (LogLog::isDebugEnabled())
{
LogString msg(LOG4CXX_STR("Checking ["));
msg += m_priv->file.getPath();
msg += LOG4CXX_STR("] at ");
StringHelper::toString((int)m_priv->delay, m_priv->pool, msg);
msg += LOG4CXX_STR(" ms interval");
LogLog::debug(msg);
}

while (!is_interrupted())
{
std::unique_lock<std::mutex> lock( m_priv->interrupt_mutex );
if (!m_priv->interrupt.wait_for( lock, std::chrono::milliseconds( m_priv->delay ),
std::bind(&FileWatchdog::is_interrupted, this) ))
checkAndConfigure();
}

if (LogLog::isDebugEnabled())
{
LogString msg2(LOG4CXX_STR("Stop checking ["));
msg2 += m_priv->file.getPath();
msg2 += LOG4CXX_STR("]");
LogLog::debug(msg2);
}
}

void FileWatchdog::start()
{
checkAndConfigure();
if (!m_priv->thread.joinable())
auto p = ThreadUtility::instance();
if (!p->hasPeriodicTask(m_priv->taskName))
{
m_priv->interrupted = 0;
m_priv->thread = ThreadUtility::instance()->createThread(LOG4CXX_STR("FileWatchdog"), &FileWatchdog::run, this);
if (LogLog::isDebugEnabled())
{
Pool p;
LogString msg(LOG4CXX_STR("Checking ["));
msg += m_priv->file.getPath();
msg += LOG4CXX_STR("] at ");
StringHelper::toString((int)m_priv->delay, p, msg);
msg += LOG4CXX_STR(" ms interval");
LogLog::debug(msg);
}
p->addPeriodicTask(m_priv->taskName
, std::bind(&FileWatchdog::checkAndConfigure, this)
, std::chrono::milliseconds(m_priv->delay)
);
}
}

bool FileWatchdog::is_interrupted()
{
return m_priv->interrupted == 0xFFFF;
}

void FileWatchdog::setDelay(long delay1){
m_priv->delay = delay1;
auto p = ThreadUtility::instance();
if (p->hasPeriodicTask(m_priv->taskName))
{
p->removePeriodicTask(m_priv->taskName);
p->addPeriodicTask(m_priv->taskName
, std::bind(&FileWatchdog::checkAndConfigure, this)
, std::chrono::milliseconds(m_priv->delay)
);
}
}
Loading

0 comments on commit bed196f

Please sign in to comment.