Skip to content

Commit

Permalink
CR
Browse files Browse the repository at this point in the history
  • Loading branch information
budevg committed Sep 17, 2024
1 parent 8059129 commit 6f76645
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 150 deletions.
124 changes: 105 additions & 19 deletions cloud/filestore/libs/vfs_fuse/fs_impl_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,28 +445,27 @@ void TFileSystem::FSync(
TNodeId{fi ? ino : InvalidNodeId},
THandle{fi ? fi->fh : InvalidHandle});

std::function<void(const TFuture<NProto::TError>&)> callback =
[=, ptr = weak_from_this(), requestInfo = std::move(requestInfo)](
const auto& future) mutable
{
auto self = ptr.lock();
if (!self) {
return;
}
std::function<void(const TFuture<NProto::TError>&)>
callback = [=, ptr = weak_from_this(), requestInfo = std::move(requestInfo)]
(const auto& future) mutable {
auto self = ptr.lock();
if (!self) {
return;
}

const auto& response = future.GetValue();
const auto& response = future.GetValue();

FinalizeProfileLogRequestInfo(
std::move(requestInfo),
Now(),
self->Config->GetFileSystemId(),
response,
self->ProfileLog);
FinalizeProfileLogRequestInfo(
std::move(requestInfo),
Now(),
self->Config->GetFileSystemId(),
response,
self->ProfileLog);

if (self->CheckError(*callContext, req, response)) {
self->ReplyError(*callContext, response, req, 0);
}
};
if (self->CheckError(*callContext, req, response)) {
self->ReplyError(*callContext, response, req, 0);
}
};

if (fi) {
callback = [ptr = weak_from_this(),
Expand Down Expand Up @@ -515,4 +514,91 @@ void TFileSystem::FSync(
}
}

void TFileSystem::FSyncDir(
TCallContextPtr callContext,
fuse_req_t req,
fuse_ino_t ino,
int datasync,
fuse_file_info* fi)
{
Y_ABORT_UNLESS(fi);

STORAGE_DEBUG("FSyncDir #" << ino << " @" << fi->fh);

if (!ValidateNodeId(*callContext, req, ino)) {
return;
}

if (!ValidateDirectoryHandle(*callContext, req, ino, fi->fh)) {
return;
}

const auto reqId = callContext->RequestId;

NProto::TProfileLogRequestInfo requestInfo;
InitProfileLogRequestInfo(
requestInfo,
EFileStoreFuseRequest::FsyncDir,
Now());
InitNodeInfo(
requestInfo,
datasync,
TNodeId{ino},
THandle{fi->fh});

auto callback = [=, ptr = weak_from_this(), requestInfo = std::move(requestInfo)]
(const auto& future) mutable {
auto self = ptr.lock();
if (!self) {
return;
}

const auto& response = future.GetValue();

FinalizeProfileLogRequestInfo(
std::move(requestInfo),
Now(),
self->Config->GetFileSystemId(),
response,
self->ProfileLog);

if (self->CheckError(*callContext, req, response)) {
self->ReplyError(*callContext, response, req, 0);
}
};

auto waitCallback =
[ptr = weak_from_this(),
callContext,
ino,
datasync,
callback = std::move(callback)](const auto& future) mutable
{
auto self = ptr.lock();
if (!self) {
return;
}

if (HasError(future.GetValue())) {
callback(future);
return;
}

auto request = StartRequest<NProto::TFsyncDirRequest>(ino);
request->SetDataSync(datasync);
self->Session->FsyncDir(callContext, std::move(request))
.Apply([](const auto& future)
{ return future.GetValue().GetError(); })
.Subscribe(std::move(callback));
};

if (datasync) {
FSyncQueue.WaitForDataRequests(reqId).Subscribe(
std::move(waitCallback));
} else {
FSyncQueue.WaitForRequests(reqId).Subscribe(
std::move(waitCallback));
}
}

} // namespace NCloud::NFileStore::NFuse
105 changes: 0 additions & 105 deletions cloud/filestore/libs/vfs_fuse/fs_impl_list.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#include "fs_impl.h"

#include <cloud/filestore/libs/diagnostics/profile_log.h>
#include <cloud/filestore/libs/diagnostics/profile_log_events.h>
#include <cloud/filestore/libs/vfs/fsync_queue.h>

#include <util/generic/buffer.h>
#include <util/generic/map.h>
#include <util/random/random.h>
Expand All @@ -14,7 +10,6 @@
namespace NCloud::NFileStore::NFuse {

using namespace NCloud::NFileStore::NVFS;
using namespace NThreading;

namespace {

Expand All @@ -41,18 +36,6 @@ struct TDirectoryContent
}
};

void InitNodeInfo(
NProto::TProfileLogRequestInfo& profileLogRequest,
bool dataOnly,
TNodeId nodeId,
THandle handle)
{
auto* nodeInfo = profileLogRequest.MutableNodeInfo();
nodeInfo->SetMode(dataOnly);
nodeInfo->SetNodeId(ToUnderlying(nodeId));
nodeInfo->SetHandle(ToUnderlying(handle));
}

} // namespace

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -446,92 +429,4 @@ bool TFileSystem::ValidateDirectoryHandle(
return true;
}

void TFileSystem::FSyncDir(
TCallContextPtr callContext,
fuse_req_t req,
fuse_ino_t ino,
int datasync,
fuse_file_info* fi)
{
STORAGE_DEBUG("FSyncDir #" << ino << " @" << (fi ? fi->fh : -1llu));

if (!ValidateNodeId(*callContext, req, ino)) {
return;
}

const auto reqId = callContext->RequestId;

NProto::TProfileLogRequestInfo requestInfo;
InitProfileLogRequestInfo(
requestInfo,
EFileStoreFuseRequest::FsyncDir,
Now());
InitNodeInfo(
requestInfo,
datasync,
TNodeId{fi ? ino : InvalidNodeId},
THandle{fi ? fi->fh : InvalidHandle});

std::function<void(const TFuture<NProto::TError>&)> callback =
[=, ptr = weak_from_this(), requestInfo = std::move(requestInfo)](
const auto& future) mutable
{
auto self = ptr.lock();
if (!self) {
return;
}

const auto& response = future.GetValue();

FinalizeProfileLogRequestInfo(
std::move(requestInfo),
Now(),
self->Config->GetFileSystemId(),
response,
self->ProfileLog);

if (self->CheckError(*callContext, req, response)) {
self->ReplyError(*callContext, response, req, 0);
}
};

if (fi) {
if (!ValidateDirectoryHandle(*callContext, req, ino, fi->fh)) {
return;
}

callback = [ptr = weak_from_this(),
callContext,
ino,
datasync,
callback = std::move(callback)](const auto& future) mutable
{
auto self = ptr.lock();
if (!self) {
return;
}

if (HasError(future.GetValue())) {
callback(future);
return;
}

auto request = StartRequest<NProto::TFsyncDirRequest>(ino);
request->SetDataSync(datasync);
self->Session->FsyncDir(callContext, std::move(request))
.Apply([](const auto& future)
{ return future.GetValue().GetError(); })
.Subscribe(std::move(callback));
};
}

if (datasync) {
FSyncQueue.WaitForDataRequests(reqId).Subscribe(
std::move(callback));
} else {
FSyncQueue.WaitForRequests(reqId).Subscribe(
std::move(callback));
}
}

} // namespace NCloud::NFileStore::NFuse
68 changes: 42 additions & 26 deletions cloud/filestore/libs/vfs_fuse/fs_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,18 @@ Y_UNIT_TEST_SUITE(TFileSystemTest)
return MakeFuture(result);
};

int fsyncCalled[2] = {0, 0};
int fsyncCalledWithDataSync = 0;
int fsyncCalledWithoutDataSync = 0;
bootstrap.Service->FsyncHandler = [&] (auto callContext, auto request) {
Y_UNUSED(request);
UNIT_ASSERT_VALUES_EQUAL(FileSystemId, callContext->FileSystemId);

fsyncCalled[request->GetDataSync() ? 1 : 0]++;
NProto::TFsyncResponse result;
return MakeFuture(result);
if (request->GetDataSync()) {
fsyncCalledWithDataSync++;
} else {
fsyncCalledWithoutDataSync++;
}

return MakeFuture(NProto::TFsyncResponse());
};

bootstrap.Start();
Expand All @@ -286,14 +290,17 @@ Y_UNIT_TEST_SUITE(TFileSystemTest)
nodeId, handleId, 0, CreateBuffer(4096, 'a'));
UNIT_ASSERT_NO_EXCEPTION(write.GetValue(WaitTimeout));

for (auto datasync: {true, false}) {
auto fsync = bootstrap.Fuse->SendRequest<TFsyncRequest>(
nodeId, handleId, datasync);
UNIT_ASSERT_NO_EXCEPTION(fsync.GetValue(WaitTimeout));
}

UNIT_ASSERT_VALUES_EQUAL(1, fsyncCalled[0]);
UNIT_ASSERT_VALUES_EQUAL(1, fsyncCalled[1]);
auto fsync = bootstrap.Fuse->SendRequest<TFsyncRequest>(
nodeId, handleId, false /* no data sync */);
UNIT_ASSERT_NO_EXCEPTION(fsync.GetValue(WaitTimeout));
UNIT_ASSERT(
fsyncCalledWithoutDataSync == 1 && fsyncCalledWithDataSync == 0);

fsync = bootstrap.Fuse->SendRequest<TFsyncRequest>(
nodeId, handleId, true /* data sync */);
UNIT_ASSERT_NO_EXCEPTION(fsync.GetValue(WaitTimeout));
UNIT_ASSERT(
fsyncCalledWithoutDataSync == 1 && fsyncCalledWithDataSync == 1);
}

Y_UNIT_TEST(ShouldPassSessionId)
Expand Down Expand Up @@ -519,14 +526,18 @@ Y_UNIT_TEST_SUITE(TFileSystemTest)
return MakeFuture(result);
};

int fsyncDirCalled[2] = {0, 0};
int fsyncDirCalledWithDataSync = 0;
int fsyncDirCalledWithoutDataSync = 0;
bootstrap.Service->FsyncDirHandler = [&] (auto callContext, auto request) {
Y_UNUSED(request);
UNIT_ASSERT_VALUES_EQUAL(FileSystemId, callContext->FileSystemId);

fsyncDirCalled[request->GetDataSync() ? 1 : 0]++;
NProto::TFsyncDirResponse result;
return MakeFuture(result);
if (request->GetDataSync()) {
fsyncDirCalledWithDataSync++;
} else {
fsyncDirCalledWithoutDataSync++;
}

return MakeFuture(NProto::TFsyncDirResponse());
};

bootstrap.Start();
Expand All @@ -548,14 +559,19 @@ Y_UNIT_TEST_SUITE(TFileSystemTest)
size = read.GetValue();
UNIT_ASSERT_VALUES_EQUAL(size, 0);

for (auto datasync: {true, false}) {
auto fsyncdir = bootstrap.Fuse->SendRequest<TFsyncDirRequest>(
nodeId, handleId, datasync);
UNIT_ASSERT_NO_EXCEPTION(fsyncdir.GetValue(WaitTimeout));
}

UNIT_ASSERT_VALUES_EQUAL(1, fsyncDirCalled[0]);
UNIT_ASSERT_VALUES_EQUAL(1, fsyncDirCalled[1]);
auto fsyncdir = bootstrap.Fuse->SendRequest<TFsyncDirRequest>(
nodeId, handleId, false /* no data sync */);
UNIT_ASSERT_NO_EXCEPTION(fsyncdir.GetValue(WaitTimeout));
UNIT_ASSERT(
fsyncDirCalledWithoutDataSync == 1 &&
fsyncDirCalledWithDataSync == 0);

fsyncdir = bootstrap.Fuse->SendRequest<TFsyncDirRequest>(
nodeId, handleId, true /* data sync */);
UNIT_ASSERT_NO_EXCEPTION(fsyncdir.GetValue(WaitTimeout));
UNIT_ASSERT(
fsyncDirCalledWithoutDataSync == 1 &&
fsyncDirCalledWithDataSync == 1);

auto close = bootstrap.Fuse->SendRequest<TReleaseDirRequest>(nodeId, handleId);
UNIT_ASSERT_NO_EXCEPTION(close.GetValue(WaitTimeout));
Expand Down

0 comments on commit 6f76645

Please sign in to comment.