Skip to content

Commit

Permalink
Periodically flush buffered file output (#445)
Browse files Browse the repository at this point in the history
* Document the new behavior of the BufferedIO option

* Add unit test to ensure bufered output is asynchronously flushed

* MSYS2 kills threads before all onexit functions complete so LogManager::shutdown is required on MSYS2
  • Loading branch information
swebb2066 authored Dec 22, 2024
1 parent cfbc89e commit 976955b
Show file tree
Hide file tree
Showing 12 changed files with 514 additions and 118 deletions.
29 changes: 23 additions & 6 deletions src/main/cpp/aprinitializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,28 @@ using namespace LOG4CXX_NS;

bool APRInitializer::isDestructed = false;

using IdentifiedObject = std::pair<size_t, ObjectPtr>;

struct APRInitializer::APRInitializerPrivate{
APRInitializerPrivate() :
p(0),
startTime(0),
tlsKey(0){

}
~APRInitializerPrivate()
{
// Delete in reverse order
while (!objects.empty())
objects.pop_back();
}

apr_pool_t* p;
std::mutex mutex;
std::list<FileWatchdog*> watchdogs;
log4cxx_time_t startTime;
apr_threadkey_t* tlsKey;
std::map<size_t, ObjectPtr> objects;
std::vector<IdentifiedObject> objects;
};

namespace
Expand Down Expand Up @@ -171,7 +179,13 @@ void APRInitializer::unregisterCleanup(FileWatchdog* watchdog)
void APRInitializer::addObject(size_t key, const ObjectPtr& pObject)
{
std::lock_guard<std::mutex> lock(m_priv->mutex);
m_priv->objects[key] = pObject;
auto pItem = std::find_if(m_priv->objects.begin(), m_priv->objects.end()
, [key](const IdentifiedObject& item) { return item.first == key; }
);
if (m_priv->objects.end() != pItem)
pItem->second = pObject;
else
m_priv->objects.emplace_back(key, pObject);
}

const ObjectPtr& APRInitializer::findOrAddObject(size_t key, std::function<ObjectPtr()> creator)
Expand All @@ -182,8 +196,11 @@ const ObjectPtr& APRInitializer::findOrAddObject(size_t key, std::function<Objec
// Ensure the internal logger has a longer life than other Log4cxx static data
LogLog::debug(LOG4CXX_STR("Started"));
}
auto pItem = m_priv->objects.find(key);
if (m_priv->objects.end() == pItem)
pItem = m_priv->objects.emplace(key, creator()).first;
return pItem->second;
auto pItem = std::find_if(m_priv->objects.begin(), m_priv->objects.end()
, [key](const IdentifiedObject& item) { return item.first == key; }
);
if (m_priv->objects.end() != pItem)
return pItem->second;
m_priv->objects.emplace_back(key, creator());
return m_priv->objects.back().second;
}
39 changes: 37 additions & 2 deletions src/main/cpp/fileappender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <log4cxx/helpers/outputstreamwriter.h>
#include <log4cxx/helpers/bufferedwriter.h>
#include <log4cxx/helpers/bytebuffer.h>
#include "log4cxx/helpers/threadutility.h"
#include <log4cxx/private/writerappender_priv.h>
#include <log4cxx/private/fileappender_priv.h>
#include <mutex>
Expand Down Expand Up @@ -81,6 +82,8 @@ FileAppender::FileAppender(std::unique_ptr<FileAppenderPriv> priv)
FileAppender::~FileAppender()
{
finalize();
if (auto p = _priv->taskManager.lock())
p->value().removePeriodicTask(getName());
}

void FileAppender::setAppend(bool fileAppend1)
Expand Down Expand Up @@ -140,6 +143,11 @@ void FileAppender::setOption(const LogString& option,
std::lock_guard<std::recursive_mutex> lock(_priv->mutex);
_priv->bufferSize = OptionConverter::toFileSize(value, 8 * 1024);
}
else if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("BUFFEREDSECONDS"), LOG4CXX_STR("bufferedseconds")))
{
std::lock_guard<std::recursive_mutex> lock(_priv->mutex);
_priv->bufferedSeconds = OptionConverter::toInt(value, 0);
}
else
{
WriterAppender::setOption(option, value);
Expand Down Expand Up @@ -184,6 +192,23 @@ void FileAppender::activateOptionsInternal(Pool& p)
if (errors == 0)
{
WriterAppender::activateOptions(p);

if (!_priv->bufferedIO)
;
else if (0 < _priv->bufferedSeconds)
{
auto taskManager = ThreadUtility::instancePtr();
taskManager->value().addPeriodicTask(getName()
, std::bind(&WriterAppenderPriv::flush, _priv)
, std::chrono::seconds(_priv->bufferedSeconds)
);
_priv->taskManager = taskManager;
}
else if (0 == _priv->bufferedSeconds)
{
if (auto p = _priv->taskManager.lock())
p->value().removePeriodicTask(getName());
}
}
}

Expand Down Expand Up @@ -370,9 +395,19 @@ int FileAppender::getBufferSize() const
return _priv->bufferSize;
}

void FileAppender::setBufferSize(int bufferSize1)
int FileAppender::getBufferedSeconds() const
{
return _priv->bufferedSeconds;
}

void FileAppender::setBufferSize(int newValue)
{
_priv->bufferSize = newValue;
}

void FileAppender::setBufferedSeconds(int newValue)
{
_priv->bufferSize = bufferSize1;
_priv->bufferedSeconds = newValue;
}

bool FileAppender::getAppend() const
Expand Down
116 changes: 49 additions & 67 deletions src/main/cpp/filewatchdog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
#include <log4cxx/helpers/stringhelper.h>
#include <functional>
#include <chrono>

#if LOG4CXX_EVENTS_AT_EXIT
#include <log4cxx/private/atexitregistry.h>
#endif
#include <thread>
#include <condition_variable>

using namespace LOG4CXX_NS;
using namespace LOG4CXX_NS::helpers;
Expand All @@ -37,14 +35,10 @@ long FileWatchdog::DEFAULT_DELAY = 60000;
struct FileWatchdog::FileWatchdogPrivate{
FileWatchdogPrivate(const File& file1) :
file(file1), delay(DEFAULT_DELAY), lastModif(0),
warnedAlready(false), interrupted(0), thread()
#if LOG4CXX_EVENTS_AT_EXIT
, atExitRegistryRaii([this]{stopWatcher();})
#endif
warnedAlready(false),
taskName{ LOG4CXX_STR("WatchDog_") + file1.getName() }
{ }

~FileWatchdogPrivate()
{ stopWatcher(); }

/**
The name of the file to observe for changes.
Expand All @@ -57,26 +51,15 @@ struct FileWatchdog::FileWatchdogPrivate{
long delay;
log4cxx_time_t lastModif;
bool warnedAlready;
volatile int interrupted;
#if LOG4CXX_ABI_VERSION <= 15
int interrupted{ 0 };
Pool pool;
std::thread thread;
std::condition_variable interrupt;
std::mutex interrupt_mutex;

#if LOG4CXX_EVENTS_AT_EXIT
helpers::AtExitRegistry::Raii atExitRegistryRaii;
#endif

void stopWatcher()
{
{
std::lock_guard<std::mutex> lock(interrupt_mutex);
interrupted = 0xFFFF;
}
interrupt.notify_all();
if (thread.joinable())
thread.join();
}
LogString taskName;
ThreadUtility::ManagerWeakPtr taskManager;
};

FileWatchdog::FileWatchdog(const File& file1)
Expand All @@ -86,20 +69,30 @@ FileWatchdog::FileWatchdog(const File& file1)

FileWatchdog::~FileWatchdog()
{
if (m_priv->thread.joinable())
stop();
stop();
}


bool FileWatchdog::is_active()
{
return m_priv->thread.joinable();
bool result = false;
if (auto p = m_priv->taskManager.lock())
result = p->value().hasPeriodicTask(m_priv->taskName);
return result;
}

void FileWatchdog::stop()
{
LogLog::debug(LOG4CXX_STR("Stopping file watchdog"));
m_priv->stopWatcher();
if (auto p = m_priv->taskManager.lock())
p->value().removePeriodicTask(m_priv->taskName);
}

/**
Stop all tasks that periodically checks for a file change.
*/
void FileWatchdog::stopAll()
{
ThreadUtility::instance()->removePeriodicTasksMatching(LOG4CXX_STR("WatchDog_"));
}

const File& FileWatchdog::file()
Expand Down Expand Up @@ -141,50 +134,39 @@ void FileWatchdog::checkAndConfigure()
}
}

void FileWatchdog::run()
{
if (LogLog::isDebugEnabled())
{
LogString msg(LOG4CXX_STR("Checking ["));
msg += m_priv->file.getPath();
msg += LOG4CXX_STR("] at ");
StringHelper::toString((int)m_priv->delay, m_priv->pool, msg);
msg += LOG4CXX_STR(" ms interval");
LogLog::debug(msg);
}

while (!is_interrupted())
{
std::unique_lock<std::mutex> lock( m_priv->interrupt_mutex );
if (!m_priv->interrupt.wait_for( lock, std::chrono::milliseconds( m_priv->delay ),
std::bind(&FileWatchdog::is_interrupted, this) ))
checkAndConfigure();
}

if (LogLog::isDebugEnabled())
{
LogString msg2(LOG4CXX_STR("Stop checking ["));
msg2 += m_priv->file.getPath();
msg2 += LOG4CXX_STR("]");
LogLog::debug(msg2);
}
}

void FileWatchdog::start()
{
auto taskManager = ThreadUtility::instancePtr();
checkAndConfigure();
if (!m_priv->thread.joinable())
if (!taskManager->value().hasPeriodicTask(m_priv->taskName))
{
m_priv->interrupted = 0;
m_priv->thread = ThreadUtility::instance()->createThread(LOG4CXX_STR("FileWatchdog"), &FileWatchdog::run, this);
if (LogLog::isDebugEnabled())
{
Pool p;
LogString msg(LOG4CXX_STR("Checking ["));
msg += m_priv->file.getPath();
msg += LOG4CXX_STR("] at ");
StringHelper::toString((int)m_priv->delay, p, msg);
msg += LOG4CXX_STR(" ms interval");
LogLog::debug(msg);
}
taskManager->value().addPeriodicTask(m_priv->taskName
, std::bind(&FileWatchdog::checkAndConfigure, this)
, std::chrono::milliseconds(m_priv->delay)
);
m_priv->taskManager = taskManager;
}
}

bool FileWatchdog::is_interrupted()
{
return m_priv->interrupted == 0xFFFF;
}

void FileWatchdog::setDelay(long delay1){
m_priv->delay = delay1;
auto p = m_priv->taskManager.lock();
if (p && p->value().hasPeriodicTask(m_priv->taskName))
{
p->value().removePeriodicTask(m_priv->taskName);
p->value().addPeriodicTask(m_priv->taskName
, std::bind(&FileWatchdog::checkAndConfigure, this)
, std::chrono::milliseconds(m_priv->delay)
);
}
}
2 changes: 2 additions & 0 deletions src/main/cpp/logmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <log4cxx/helpers/exception.h>
#include <log4cxx/helpers/optionconverter.h>
#include <log4cxx/helpers/loglog.h>
#include <log4cxx/helpers/threadutility.h>

#include <log4cxx/spi/loggingevent.h>
#include <log4cxx/file.h>
Expand Down Expand Up @@ -203,6 +204,7 @@ LoggerList LogManager::getCurrentLoggers()
void LogManager::shutdown()
{
APRInitializer::unregisterAll();
ThreadUtility::instance()->removeAllPeriodicTasks();
getLoggerRepository()->shutdown();
}

Expand Down
Loading

0 comments on commit 976955b

Please sign in to comment.