Skip to content

Commit

Permalink
fix(logs): synchronise log file rotation and compression.
Browse files Browse the repository at this point in the history
  • Loading branch information
matejk committed Jan 29, 2024
1 parent d352594 commit 72496f9
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 36 deletions.
15 changes: 15 additions & 0 deletions Foundation/include/Poco/ArchiveStrategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "Poco/File.h"
#include "Poco/DateTimeFormatter.h"
#include "Poco/NumberFormatter.h"
#include "Poco/Mutex.h"
#include "Poco/Condition.h"
#include <atomic>


Expand Down Expand Up @@ -51,17 +53,30 @@ class Foundation_API ArchiveStrategy
/// and creates and returns a new log file.
/// The given LogFile object is deleted.

void close();

void compress(bool flag = true);
/// Enables or disables compression of archived files.

protected:
void moveFile(const std::string& oldName, const std::string& newName);
bool exists(const std::string& name);

Poco::FastMutex _rotateMutex;

// Log rotation must wait until all of the compression tasks complete
int _compressingCount;
Poco::Condition _compressingComplete;

private:

friend class ArchiveCompressor;

ArchiveStrategy(const ArchiveStrategy&);
ArchiveStrategy& operator = (const ArchiveStrategy&);

void compressFile(const std::string& path);

std::atomic<bool> _compress;
std::atomic<ArchiveCompressor*> _pCompressor;
};
Expand Down
139 changes: 108 additions & 31 deletions Foundation/src/ArchiveStrategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "Poco/Void.h"
#include "Poco/FileStream.h"

#include <string_view>

namespace Poco {

Expand All @@ -45,35 +46,18 @@ class ArchiveCompressor: public ActiveDispatcher
{
}

ActiveMethod<void, std::string, ArchiveCompressor, ActiveStarter<ActiveDispatcher>> compress;
struct ArchiveToCompress
{
ArchiveStrategy* as;
std::string path;
};

ActiveMethod<void, ArchiveToCompress, ArchiveCompressor, ActiveStarter<ActiveDispatcher>> compress;

protected:
void compressImpl(const std::string& path)
void compressImpl(const ArchiveToCompress& ac)
{
std::string gzPath(path);
gzPath.append(".gz");
FileInputStream istr(path);
FileOutputStream ostr(gzPath);
try
{
DeflatingOutputStream deflater(ostr, DeflatingStreamBuf::STREAM_GZIP);
StreamCopier::copyStream(istr, deflater);
if (!deflater.good() || !ostr.good()) throw WriteFileException(gzPath);
deflater.close();
ostr.close();
istr.close();
}
catch (Poco::Exception&)
{
// deflating failed - remove gz file and leave uncompressed log file
ostr.close();
Poco::File gzf(gzPath);
gzf.remove();
return;
}
File f(path);
f.remove();
return;
ac.as->compressFile(ac.path);
}
};

Expand All @@ -82,17 +66,41 @@ class ArchiveCompressor: public ActiveDispatcher
// ArchiveStrategy
//

// Prefix that is added to the file being compressed to be skipped by the
// purge strategy.
static const std::string compressFilePrefix ( ".~" );


ArchiveStrategy::ArchiveStrategy():
_compressingCount(0),
_compress(false),
_pCompressor(0)
_pCompressor(nullptr)
{
}


ArchiveStrategy::~ArchiveStrategy()
{
try
{
close();
}
catch(...)
{
poco_unexpected();
}
}


void ArchiveStrategy::close()
{
FastMutex::ScopedLock l(_rotateMutex);

while (_compressingCount > 0)
_compressingComplete.wait(_rotateMutex, 1000);

delete _pCompressor;
_pCompressor = nullptr;
}


Expand All @@ -105,7 +113,7 @@ void ArchiveStrategy::compress(bool flag)
void ArchiveStrategy::moveFile(const std::string& oldPath, const std::string& newPath)
{
bool compressed = false;
Path p(oldPath);
const Path p(oldPath);
File f(oldPath);
if (!f.exists())
{
Expand All @@ -115,15 +123,23 @@ void ArchiveStrategy::moveFile(const std::string& oldPath, const std::string& ne
std::string mvPath(newPath);
if (_compress || compressed)
mvPath.append(".gz");

if (!_compress || compressed)
{
f.renameTo(mvPath);
}
else
{
f.renameTo(newPath);
if (!_pCompressor) _pCompressor = new ArchiveCompressor;
_pCompressor.load()->compress(newPath);
_compressingCount++;
Path logdir { newPath };
logdir.makeParent();
const auto logfile { Path(newPath).getFileName() };
const auto compressPath = logdir.append(compressFilePrefix + logfile).toString();
f.renameTo(compressPath);
if (!_pCompressor)
_pCompressor = new ArchiveCompressor;

_pCompressor.load()->compress( {this, compressPath} );
}
}

Expand All @@ -146,6 +162,62 @@ bool ArchiveStrategy::exists(const std::string& name)
}


void ArchiveStrategy::compressFile(const std::string& path)
{
FastMutex::ScopedLock l(_rotateMutex);

Path logdir { path };
logdir.makeParent();

auto removeFilePrefix = [&logdir](const std::string& path, const std::string& prefix) -> std::string
{
auto fname { Path(path).getFileName() };
const std::string_view fprefix(fname.data(), prefix.size());
if (fprefix == prefix)
return Path(logdir, fname.substr(prefix.size())).toString();

return path;
};

File f(path);
std::string gzPath(path);
gzPath.append(".gz");
FileInputStream istr(path);
FileOutputStream ostr(gzPath);
try
{
DeflatingOutputStream deflater(ostr, DeflatingStreamBuf::STREAM_GZIP);
StreamCopier::copyStream(istr, deflater);
if (!deflater.good() || !ostr.good())
throw WriteFileException(gzPath);

deflater.close();
ostr.close();
istr.close();

// Remove temporary prefix and set modification time to
// the time of the uncompressed file for purge strategy to work correctly
File zf(gzPath);
zf.renameTo(removeFilePrefix(gzPath, compressFilePrefix));
zf.setLastModified(f.getLastModified());
}
catch (const Poco::Exception&)
{
// deflating failed - remove gz file and leave uncompressed log file
ostr.close();
Poco::File gzf(gzPath);
gzf.remove();

f.renameTo(removeFilePrefix(path, compressFilePrefix));
}
f.remove();

_compressingCount--;
if (_compressingCount < 1)
_compressingComplete.broadcast();
}


//
// ArchiveByNumberStrategy
//
Expand All @@ -169,6 +241,11 @@ LogFile* ArchiveByNumberStrategy::open(LogFile* pFile)

LogFile* ArchiveByNumberStrategy::archive(LogFile* pFile)
{
FastMutex::ScopedLock l(_rotateMutex);

while (_compressingCount > 0)
_compressingComplete.wait(_rotateMutex, 1000);

std::string basePath = pFile->path();
delete pFile;
int n = -1;
Expand Down
13 changes: 8 additions & 5 deletions Foundation/src/FileChannel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ FileChannel::FileChannel():
_compress(false),
_flush(true),
_rotateOnOpen(false),
_pFile(0),
_pFile(nullptr),
_pRotateStrategy(new NullRotateStrategy()),
_pArchiveStrategy(new ArchiveByNumberStrategy),
_pPurgeStrategy(new NullPurgeStrategy())
Expand All @@ -58,7 +58,7 @@ FileChannel::FileChannel(const std::string& path):
_compress(false),
_flush(true),
_rotateOnOpen(false),
_pFile(0),
_pFile(nullptr),
_pRotateStrategy(new NullRotateStrategy()),
_pArchiveStrategy(new ArchiveByNumberStrategy),
_pPurgeStrategy(new NullPurgeStrategy())
Expand Down Expand Up @@ -111,8 +111,11 @@ void FileChannel::close()
{
FastMutex::ScopedLock lock(_mutex);

if (_pFile != nullptr)
_pArchiveStrategy->close();

delete _pFile;
_pFile = 0;
_pFile = nullptr;
}


Expand Down Expand Up @@ -298,7 +301,7 @@ void FileChannel::setRotation(const std::string& rotation)

ArchiveStrategy* FileChannel::createArchiveStrategy(const std::string& archive, const std::string& times) const
{
ArchiveStrategy* pStrategy = 0;
ArchiveStrategy* pStrategy = nullptr;
if (archive == "number")
{
pStrategy = new ArchiveByNumberStrategy;
Expand Down Expand Up @@ -328,7 +331,7 @@ void FileChannel::setArchiveStrategy(ArchiveStrategy* strategy)

void FileChannel::setArchive(const std::string& archive)
{
ArchiveStrategy* pStrategy = 0;
ArchiveStrategy* pStrategy = nullptr;
if (archive == "number")
{
pStrategy = new ArchiveByNumberStrategy;
Expand Down
9 changes: 9 additions & 0 deletions Foundation/src/PurgeStrategy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "Poco/Path.h"
#include "Poco/DirectoryIterator.h"
#include "Poco/Timestamp.h"
#include <algorithm>


namespace Poco {
Expand Down Expand Up @@ -126,6 +127,14 @@ void PurgeByCountStrategy::purge(const std::string& path)
{
std::vector<File> files;
list(path, files);

// Order files in ascending name order. Files with largest
// sequence number will be deleted in case that multiple files
// have the same modification time.
std::sort (files.begin(), files.end(),
[](const Poco::File& a, const Poco::File& b) { return a.path() < b.path(); }
);

while (files.size() > _count)
{
std::vector<File>::iterator it = files.begin();
Expand Down
Loading

0 comments on commit 72496f9

Please sign in to comment.