From 6d433558a6c3dd5d09346360f7c6c41d4d468f37 Mon Sep 17 00:00:00 2001 From: Wu Tao Date: Tue, 16 Oct 2018 18:27:11 +0800 Subject: [PATCH] *: expose disk_file that was hidden under dsn_handle_t (#172) --- include/dsn/c/api_layer1.h | 57 --------- include/dsn/tool-api/aio_provider.h | 2 + include/dsn/tool-api/async_calls.h | 65 +--------- include/dsn/tool-api/file_io.h | 91 ++++++++++++++ src/core/core/disk_engine.cpp | 6 +- src/core/core/disk_engine.h | 7 +- src/core/core/file_io.cpp | 113 ++++++++++++++++++ src/core/core/service_api_c.cpp | 69 ----------- src/core/tests/aio.cpp | 26 ++-- src/core/tests/service_api_c.cpp | 55 +++++---- .../common/native_aio_provider.linux.cpp | 2 +- .../tools/common/native_aio_provider.linux.h | 1 + src/dist/nfs/nfs_client_impl.cpp | 2 +- src/dist/nfs/nfs_client_impl.h | 6 +- src/dist/nfs/nfs_server_impl.cpp | 13 +- src/dist/nfs/nfs_server_impl.h | 12 +- src/dist/replication/lib/mutation_log.cpp | 18 +-- src/dist/replication/lib/mutation_log.h | 4 +- .../meta_server/meta_state_service_simple.cpp | 4 +- .../meta_server/meta_state_service_simple.h | 2 +- 20 files changed, 299 insertions(+), 256 deletions(-) create mode 100644 include/dsn/tool-api/file_io.h create mode 100644 src/core/core/file_io.cpp diff --git a/include/dsn/c/api_layer1.h b/include/dsn/c/api_layer1.h index 3a102d1975..cdb6ea0d2a 100644 --- a/include/dsn/c/api_layer1.h +++ b/include/dsn/c/api_layer1.h @@ -276,63 +276,6 @@ typedef struct int size; } dsn_file_buffer_t; -/*! - open file - - \param file_name filename of the file. - \param flag flags such as O_RDONLY | O_BINARY used by ::open - \param pmode permission mode used by ::open - - \return file handle - */ -extern DSN_API dsn_handle_t dsn_file_open(const char *file_name, int flag, int pmode); - -/*! close the file handle */ -extern DSN_API dsn::error_code dsn_file_close(dsn_handle_t file); - -/*! flush the buffer of the given file */ -extern DSN_API dsn::error_code dsn_file_flush(dsn_handle_t file); - -/*! - read file asynchronously - - \param file file handle - \param buffer read buffer - \param count byte size of the read buffer - \param offset offset in the file to start reading - \param cb callback aio task to be executed on completion - */ -extern DSN_API void -dsn_file_read(dsn_handle_t file, char *buffer, int count, uint64_t offset, dsn::aio_task *cb); - -/*! - write file asynchronously - - \param file file handle - \param buffer write buffer - \param count byte size of the to-be-written content - \param offset offset in the file to start write - \param cb callback aio task to be executed on completion - */ -extern DSN_API void dsn_file_write( - dsn_handle_t file, const char *buffer, int count, uint64_t offset, dsn::aio_task *cb); - -/*! - write file asynchronously with vector buffers - - \param file file handle - \param buffers write buffers - \param buffer_count number of write buffers - \param offset offset in the file to start write - \param cb callback aio task to be executed on completion - */ -extern DSN_API void dsn_file_write_vector(dsn_handle_t file, - const dsn_file_buffer_t *buffers, - int buffer_count, - uint64_t offset, - dsn::aio_task *cb); -/*@}*/ - /*! @defgroup env Environment diff --git a/include/dsn/tool-api/aio_provider.h b/include/dsn/tool-api/aio_provider.h index 7863b01421..ec57ed8f1f 100644 --- a/include/dsn/tool-api/aio_provider.h +++ b/include/dsn/tool-api/aio_provider.h @@ -75,7 +75,9 @@ class aio_provider DSN_API service_node *node() const; // return DSN_INVALID_FILE_HANDLE if failed + // TODO(wutao1): return uint64_t instead (because we only support linux now) virtual dsn_handle_t open(const char *file_name, int flag, int pmode) = 0; + virtual error_code close(dsn_handle_t fh) = 0; virtual error_code flush(dsn_handle_t fh) = 0; virtual void aio(aio_task *aio) = 0; diff --git a/include/dsn/tool-api/async_calls.h b/include/dsn/tool-api/async_calls.h index 990fd1182c..afb7567870 100644 --- a/include/dsn/tool-api/async_calls.h +++ b/include/dsn/tool-api/async_calls.h @@ -28,7 +28,7 @@ #include #include -#include +#include #include #include @@ -101,7 +101,7 @@ inline task_ptr enqueue_timer(task_code evt, tsk->enqueue(); return tsk; } -} +} // namespace tasking namespace rpc { @@ -128,7 +128,7 @@ create_rpc_response_task(dsn::message_ex *req, req, tracker, [cb_fwd = std::move(callback)]( - error_code err, dsn::message_ex * req, dsn::message_ex * resp) mutable { + error_code err, dsn::message_ex *req, dsn::message_ex *resp) mutable { typename is_typed_rpc_callback::response_t response = {}; if (err == ERR_OK) { unmarshall(resp, response); @@ -223,60 +223,5 @@ call_wait(rpc_address server, thread_hash, partition_hash)); } -} - -namespace file { - -inline aio_task_ptr -create_aio_task(task_code code, task_tracker *tracker, aio_handler &&callback, int hash = 0) -{ - aio_task_ptr t(new aio_task(code, std::move(callback), hash)); - t->set_tracker((task_tracker *)tracker); - t->spec().on_task_create.execute(task::get_current_task(), t); - return t; -} - -inline aio_task_ptr read(dsn_handle_t fh, - char *buffer, - int count, - uint64_t offset, - task_code callback_code, - task_tracker *tracker, - aio_handler &&callback, - int hash = 0) -{ - auto tsk = create_aio_task(callback_code, tracker, std::move(callback), hash); - dsn_file_read(fh, buffer, count, offset, tsk); - return tsk; -} - -inline aio_task_ptr write(dsn_handle_t fh, - const char *buffer, - int count, - uint64_t offset, - task_code callback_code, - task_tracker *tracker, - aio_handler &&callback, - int hash = 0) -{ - auto tsk = create_aio_task(callback_code, tracker, std::move(callback), hash); - dsn_file_write(fh, buffer, count, offset, tsk); - return tsk; -} - -inline aio_task_ptr write_vector(dsn_handle_t fh, - const dsn_file_buffer_t *buffers, - int buffer_count, - uint64_t offset, - task_code callback_code, - task_tracker *tracker, - aio_handler &&callback, - int hash = 0) -{ - auto tsk = create_aio_task(callback_code, tracker, std::move(callback), hash); - dsn_file_write_vector(fh, buffers, buffer_count, offset, tsk.get()); - return tsk; -} -} - -} // end namespace +} // namespace rpc +} // namespace dsn diff --git a/include/dsn/tool-api/file_io.h b/include/dsn/tool-api/file_io.h new file mode 100644 index 0000000000..9e41adc10c --- /dev/null +++ b/include/dsn/tool-api/file_io.h @@ -0,0 +1,91 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#pragma once + +#include + +namespace dsn { + +// forward declaration +class disk_file; + +namespace file { + +/// open file +/// +/// \param file_name filename of the file. +/// \param flag flags such as O_RDONLY | O_BINARY used by ::open +/// \param pmode permission mode used by ::open +/// +/// \return file handle +/// +extern disk_file *open(const char *file_name, int flag, int pmode); + +/// close the file handle +extern error_code close(disk_file *file); + +/// flush the buffer of the given file +extern error_code flush(disk_file *file); + +inline aio_task_ptr +create_aio_task(task_code code, task_tracker *tracker, aio_handler &&callback, int hash = 0) +{ + aio_task_ptr t(new aio_task(code, std::move(callback), hash)); + t->set_tracker((task_tracker *)tracker); + t->spec().on_task_create.execute(task::get_current_task(), t); + return t; +} + +extern aio_task_ptr read(disk_file *file, + char *buffer, + int count, + uint64_t offset, + task_code callback_code, + task_tracker *tracker, + aio_handler &&callback, + int hash = 0); + +extern aio_task_ptr write(disk_file *file, + const char *buffer, + int count, + uint64_t offset, + task_code callback_code, + task_tracker *tracker, + aio_handler &&callback, + int hash = 0); + +extern aio_task_ptr write_vector(disk_file *file, + const dsn_file_buffer_t *buffers, + int buffer_count, + uint64_t offset, + task_code callback_code, + task_tracker *tracker, + aio_handler &&callback, + int hash = 0); + +} // namespace file +} // namespace dsn diff --git a/src/core/core/disk_engine.cpp b/src/core/core/disk_engine.cpp index 43e922e415..120cbe0d29 100644 --- a/src/core/core/disk_engine.cpp +++ b/src/core/core/disk_engine.cpp @@ -151,7 +151,7 @@ void disk_engine::start(aio_provider *provider) _is_running = true; } -dsn_handle_t disk_engine::open(const char *file_name, int flag, int pmode) +disk_file *disk_engine::open(const char *file_name, int flag, int pmode) { dsn_handle_t nh = _provider->open(file_name, flag, pmode); if (nh != DSN_INVALID_FILE_HANDLE) { @@ -161,7 +161,7 @@ dsn_handle_t disk_engine::open(const char *file_name, int flag, int pmode) } } -error_code disk_engine::close(dsn_handle_t fh) +error_code disk_engine::close(disk_file *fh) { if (nullptr != fh) { auto df = (disk_file *)fh; @@ -173,7 +173,7 @@ error_code disk_engine::close(dsn_handle_t fh) } } -error_code disk_engine::flush(dsn_handle_t fh) +error_code disk_engine::flush(disk_file *fh) { if (nullptr != fh) { auto df = (disk_file *)fh; diff --git a/src/core/core/disk_engine.h b/src/core/core/disk_engine.h index b21874233f..16af16071b 100644 --- a/src/core/core/disk_engine.h +++ b/src/core/core/disk_engine.h @@ -59,6 +59,7 @@ class disk_file aio_task *on_read_completed(aio_task *wk, error_code err, size_t size); aio_task *on_write_completed(aio_task *wk, void *ctx, error_code err, size_t size); + // TODO(wutao1): make it uint64_t dsn_handle_t native_handle() const { return _handle; } private: @@ -76,9 +77,9 @@ class disk_engine void start(aio_provider *provider); // asynchonous file read/write - dsn_handle_t open(const char *file_name, int flag, int pmode); - error_code close(dsn_handle_t fh); - error_code flush(dsn_handle_t fh); + disk_file *open(const char *file_name, int flag, int pmode); + error_code close(disk_file *fh); + error_code flush(disk_file *fh); void read(aio_task *aio); void write(aio_task *aio); diff --git a/src/core/core/file_io.cpp b/src/core/core/file_io.cpp new file mode 100644 index 0000000000..deb83e6b22 --- /dev/null +++ b/src/core/core/file_io.cpp @@ -0,0 +1,113 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2015 Microsoft Corporation + * + * -=- Robust Distributed System Nucleus (rDSN) -=- + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#include + +#include "disk_engine.h" + +namespace dsn { +namespace file { + +/*extern*/ disk_file *open(const char *file_name, int flag, int pmode) +{ + return task::get_current_disk()->open(file_name, flag, pmode); +} + +/*extern*/ error_code close(disk_file *file) { return task::get_current_disk()->close(file); } + +/*extern*/ error_code flush(disk_file *file) { return task::get_current_disk()->flush(file); } + +/*extern*/ aio_task_ptr read(disk_file *file, + char *buffer, + int count, + uint64_t offset, + task_code callback_code, + task_tracker *tracker, + aio_handler &&callback, + int hash /*= 0*/) +{ + auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash); + cb->aio()->buffer = buffer; + cb->aio()->buffer_size = count; + cb->aio()->engine = nullptr; + cb->aio()->file = file; + cb->aio()->file_offset = offset; + cb->aio()->type = AIO_Read; + + task::get_current_disk()->read(cb); + return cb; +} + +/*extern*/ aio_task_ptr write(disk_file *file, + const char *buffer, + int count, + uint64_t offset, + task_code callback_code, + task_tracker *tracker, + aio_handler &&callback, + int hash /*= 0*/) +{ + auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash); + cb->aio()->buffer = (char *)buffer; + cb->aio()->buffer_size = count; + cb->aio()->engine = nullptr; + cb->aio()->file = file; + cb->aio()->file_offset = offset; + cb->aio()->type = AIO_Write; + + task::get_current_disk()->write(cb); + return cb; +} + +/*extern*/ aio_task_ptr write_vector(disk_file *file, + const dsn_file_buffer_t *buffers, + int buffer_count, + uint64_t offset, + task_code callback_code, + task_tracker *tracker, + aio_handler &&callback, + int hash /*= 0*/) +{ + auto cb = create_aio_task(callback_code, tracker, std::move(callback), hash); + cb->aio()->buffer = nullptr; + cb->aio()->buffer_size = 0; + cb->aio()->engine = nullptr; + cb->aio()->file = file; + cb->aio()->file_offset = offset; + cb->aio()->type = AIO_Write; + for (int i = 0; i < buffer_count; i++) { + if (buffers[i].size > 0) { + cb->_unmerged_write_buffers.push_back(buffers[i]); + cb->aio()->buffer_size += buffers[i].size; + } + } + + task::get_current_disk()->write(cb); + return cb; +} + +} // namespace file +} // namespace dsn diff --git a/src/core/core/service_api_c.cpp b/src/core/core/service_api_c.cpp index 7525c5d9ab..25c9b63748 100644 --- a/src/core/core/service_api_c.cpp +++ b/src/core/core/service_api_c.cpp @@ -301,75 +301,6 @@ DSN_API void dsn_rpc_forward(dsn::message_ex *request, dsn::rpc_address addr) ::dsn::rpc_address(addr)); } -//------------------------------------------------------------------------------ -// -// file operations -// -//------------------------------------------------------------------------------ - -DSN_API dsn_handle_t dsn_file_open(const char *file_name, int flag, int pmode) -{ - return ::dsn::task::get_current_disk()->open(file_name, flag, pmode); -} - -DSN_API dsn::error_code dsn_file_close(dsn_handle_t file) -{ - return ::dsn::task::get_current_disk()->close(file); -} - -DSN_API dsn::error_code dsn_file_flush(dsn_handle_t file) -{ - return ::dsn::task::get_current_disk()->flush(file); -} - -DSN_API void -dsn_file_read(dsn_handle_t file, char *buffer, int count, uint64_t offset, dsn::aio_task *cb) -{ - cb->aio()->buffer = buffer; - cb->aio()->buffer_size = count; - cb->aio()->engine = nullptr; - cb->aio()->file = file; - cb->aio()->file_offset = offset; - cb->aio()->type = ::dsn::AIO_Read; - - ::dsn::task::get_current_disk()->read(cb); -} - -DSN_API void -dsn_file_write(dsn_handle_t file, const char *buffer, int count, uint64_t offset, dsn::aio_task *cb) -{ - cb->aio()->buffer = (char *)buffer; - cb->aio()->buffer_size = count; - cb->aio()->engine = nullptr; - cb->aio()->file = file; - cb->aio()->file_offset = offset; - cb->aio()->type = ::dsn::AIO_Write; - - ::dsn::task::get_current_disk()->write(cb); -} - -DSN_API void dsn_file_write_vector(dsn_handle_t file, - const dsn_file_buffer_t *buffers, - int buffer_count, - uint64_t offset, - dsn::aio_task *cb) -{ - cb->aio()->buffer = nullptr; - cb->aio()->buffer_size = 0; - cb->aio()->engine = nullptr; - cb->aio()->file = file; - cb->aio()->file_offset = offset; - cb->aio()->type = ::dsn::AIO_Write; - for (int i = 0; i < buffer_count; i++) { - if (buffers[i].size > 0) { - cb->_unmerged_write_buffers.push_back(buffers[i]); - cb->aio()->buffer_size += buffers[i].size; - } - } - - ::dsn::task::get_current_disk()->write(cb); -} - //------------------------------------------------------------------------------ // // env diff --git a/src/core/tests/aio.cpp b/src/core/tests/aio.cpp index c3a2aff35e..0d28d464ac 100644 --- a/src/core/tests/aio.cpp +++ b/src/core/tests/aio.cpp @@ -55,7 +55,7 @@ TEST(core, aio) int len = (int)strlen(buffer); // write - auto fp = dsn_file_open("tmp", O_RDWR | O_CREAT | O_BINARY, 0666); + auto fp = file::open("tmp", O_RDWR | O_CREAT | O_BINARY, 0666); std::list tasks; uint64_t offset = 0; @@ -101,12 +101,12 @@ TEST(core, aio) t->wait(); EXPECT_TRUE(t->get_transferred_size() == 10 * len); } - auto err = dsn_file_close(fp); + auto err = file::close(fp); EXPECT_TRUE(err == ERR_OK); // read char *buffer2 = (char *)alloca((size_t)len); - fp = dsn_file_open("tmp", O_RDONLY | O_BINARY, 0); + fp = file::open("tmp", O_RDONLY | O_BINARY, 0); // concurrent read offset = 0; @@ -135,7 +135,7 @@ TEST(core, aio) EXPECT_TRUE(memcmp(buffer, buffer2, len) == 0); } - err = dsn_file_close(fp); + err = file::close(fp); EXPECT_TRUE(err == ERR_OK); utils::filesystem::remove_path("tmp"); @@ -147,14 +147,14 @@ TEST(core, aio_share) if (task::get_current_disk() == nullptr) return; - auto fp = dsn_file_open("tmp", O_WRONLY | O_CREAT | O_BINARY, 0666); + auto fp = file::open("tmp", O_WRONLY | O_CREAT | O_BINARY, 0666); EXPECT_TRUE(fp != nullptr); - auto fp2 = dsn_file_open("tmp", O_RDONLY | O_BINARY, 0); + auto fp2 = file::open("tmp", O_RDONLY | O_BINARY, 0); EXPECT_TRUE(fp2 != nullptr); - dsn_file_close(fp); - dsn_file_close(fp2); + file::close(fp); + file::close(fp2); utils::filesystem::remove_path("tmp"); } @@ -165,7 +165,7 @@ TEST(core, operation_failed) if (task::get_current_disk() == nullptr) return; - auto fp = dsn_file_open("tmp_test_file", O_WRONLY, 0600); + auto fp = file::open("tmp_test_file", O_WRONLY, 0600); EXPECT_TRUE(fp == nullptr); ::dsn::error_code *err = new ::dsn::error_code; @@ -175,7 +175,7 @@ TEST(core, operation_failed) *count = n; }; - fp = dsn_file_open("tmp_test_file", O_WRONLY | O_CREAT | O_BINARY, 0666); + fp = file::open("tmp_test_file", O_WRONLY | O_CREAT | O_BINARY, 0666); EXPECT_TRUE(fp != nullptr); char buffer[512]; const char *str = "hello file"; @@ -187,7 +187,7 @@ TEST(core, operation_failed) t->wait(); EXPECT_TRUE(*err == ERR_FILE_OPERATION_FAILED); - auto fp2 = dsn_file_open("tmp_test_file", O_RDONLY | O_BINARY, 0); + auto fp2 = file::open("tmp_test_file", O_RDONLY | O_BINARY, 0); EXPECT_TRUE(fp2 != nullptr); t = ::dsn::file::read(fp2, buffer, 512, 0, LPC_AIO_TEST, nullptr, io_callback, 0); @@ -201,8 +201,8 @@ TEST(core, operation_failed) t = ::dsn::file::read(fp2, buffer, 512, 100, LPC_AIO_TEST, nullptr, io_callback, 0); t->wait(); ddebug("error code: %s", err->to_string()); - dsn_file_close(fp); - dsn_file_close(fp2); + file::close(fp); + file::close(fp2); EXPECT_TRUE(utils::filesystem::remove_path("tmp_test_file")); } diff --git a/src/core/tests/service_api_c.cpp b/src/core/tests/service_api_c.cpp index 8ea1a7202f..927e47ff0d 100644 --- a/src/core/tests/service_api_c.cpp +++ b/src/core/tests/service_api_c.cpp @@ -35,7 +35,7 @@ #include #include -#include +#include #include #include #include @@ -219,23 +219,32 @@ TEST(core, dsn_file) ASSERT_TRUE(utils::filesystem::file_size("command.txt", fin_size)); ASSERT_LT(0, fin_size); - dsn_handle_t fin = dsn_file_open("command.txt", O_RDONLY, 0); + dsn::disk_file *fin = file::open("command.txt", O_RDONLY, 0); ASSERT_NE(nullptr, fin); - dsn_handle_t fout = dsn_file_open("command.copy.txt", O_RDWR | O_CREAT | O_TRUNC, 0666); + dsn::disk_file *fout = file::open("command.copy.txt", O_RDWR | O_CREAT | O_TRUNC, 0666); ASSERT_NE(nullptr, fout); char buffer[1024]; uint64_t offset = 0; while (true) { aio_result rin; - dsn::aio_task_ptr tin = new dsn::aio_task(LPC_AIO_TEST_READ, - [&rin](dsn::error_code err, size_t sz) { - rin.err = err; - rin.sz = sz; - }, - 0); + aio_task_ptr tin = file::read(fin, + buffer, + 1024, + offset, + LPC_AIO_TEST_READ, + nullptr, + [&rin](dsn::error_code err, size_t sz) { + rin.err = err; + rin.sz = sz; + }, + 0); ASSERT_NE(nullptr, tin); - ASSERT_EQ(1, tin->get_count()); - dsn_file_read(fin, buffer, 1024, offset, tin); + + if (dsn::tools::get_current_tool()->name() != "simulator") { + // 1 for tin, 1 for disk_engine + ASSERT_EQ(2, tin->get_count()); + } + tin->wait(); ASSERT_EQ(rin.err, tin->error()); if (rin.err != ERR_OK) { @@ -250,14 +259,18 @@ TEST(core, dsn_file) } aio_result rout; - dsn::aio_task_ptr tout = new dsn::aio_task(LPC_AIO_TEST_WRITE, - [&rout](dsn::error_code err, size_t sz) { - rout.err = err; - rout.sz = sz; - }, - 0); + aio_task_ptr tout = file::write(fout, + buffer, + rin.sz, + offset, + LPC_AIO_TEST_WRITE, + nullptr, + [&rout](dsn::error_code err, size_t sz) { + rout.err = err; + rout.sz = sz; + }, + 0); ASSERT_NE(nullptr, tout); - dsn_file_write(fout, buffer, rin.sz, offset, tout); tout->wait(); ASSERT_EQ(ERR_OK, rout.err); ASSERT_EQ(ERR_OK, tout->error()); @@ -268,14 +281,14 @@ TEST(core, dsn_file) ASSERT_EQ(1, tout->get_count()); } - ASSERT_EQ(ERR_OK, dsn_file_flush(fout)); + ASSERT_EQ(ERR_OK, file::flush(fout)); offset += rin.sz; } ASSERT_EQ((uint64_t)fin_size, offset); - ASSERT_EQ(ERR_OK, dsn_file_close(fout)); - ASSERT_EQ(ERR_OK, dsn_file_close(fin)); + ASSERT_EQ(ERR_OK, file::close(fout)); + ASSERT_EQ(ERR_OK, file::close(fin)); ASSERT_TRUE(utils::filesystem::file_size("command.copy.txt", fout_size)); ASSERT_EQ(fin_size, fout_size); diff --git a/src/core/tools/common/native_aio_provider.linux.cpp b/src/core/tools/common/native_aio_provider.linux.cpp index 6f13ab718d..cabf98618a 100644 --- a/src/core/tools/common/native_aio_provider.linux.cpp +++ b/src/core/tools/common/native_aio_provider.linux.cpp @@ -60,7 +60,7 @@ native_linux_aio_provider::~native_linux_aio_provider() void native_linux_aio_provider::start() { - new std::thread([this]() { + _worker = std::thread([this]() { task::set_tls_dsn_context(node(), nullptr); get_event(); }); diff --git a/src/core/tools/common/native_aio_provider.linux.h b/src/core/tools/common/native_aio_provider.linux.h index f31aa7b08e..4189ff1db5 100644 --- a/src/core/tools/common/native_aio_provider.linux.h +++ b/src/core/tools/common/native_aio_provider.linux.h @@ -81,6 +81,7 @@ class native_linux_aio_provider : public aio_provider private: io_context_t _ctx; + std::thread _worker; }; } } diff --git a/src/dist/nfs/nfs_client_impl.cpp b/src/dist/nfs/nfs_client_impl.cpp index cdb6e2b70f..1a35847ee0 100644 --- a/src/dist/nfs/nfs_client_impl.cpp +++ b/src/dist/nfs/nfs_client_impl.cpp @@ -395,7 +395,7 @@ void nfs_client_impl::continue_write() zauto_lock l(fc->user_req->user_req_lock); if (!fc->file_holder->file_handle) { fc->file_holder->file_handle = - dsn_file_open(file_path.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666); + file::open(file_path.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666); } } diff --git a/src/dist/nfs/nfs_client_impl.h b/src/dist/nfs/nfs_client_impl.h index d93eb8672a..ee8d53b3b1 100644 --- a/src/dist/nfs/nfs_client_impl.h +++ b/src/dist/nfs/nfs_client_impl.h @@ -121,14 +121,14 @@ class nfs_client_impl : public ::dsn::service::nfs_client struct file_wrapper : public ::dsn::ref_counter { - dsn_handle_t file_handle; + disk_file *file_handle; file_wrapper() { file_handle = nullptr; } ~file_wrapper() { if (file_handle != nullptr) { - auto err = dsn_file_close(file_handle); - dassert(err == ERR_OK, "dsn_file_close failed, err = %s", err.to_string()); + auto err = file::close(file_handle); + dassert(err == ERR_OK, "file::close failed, err = %s", err.to_string()); } } }; diff --git a/src/dist/nfs/nfs_server_impl.cpp b/src/dist/nfs/nfs_server_impl.cpp index 2f1817999c..7015c9b704 100644 --- a/src/dist/nfs/nfs_server_impl.cpp +++ b/src/dist/nfs/nfs_server_impl.cpp @@ -69,7 +69,7 @@ void nfs_service_impl::on_copy(const ::dsn::service::copy_request &request, std::string file_path = dsn::utils::filesystem::path_combine(request.source_dir, request.file_name); - dsn_handle_t hfile; + disk_file *hfile; { zauto_lock l(_handles_map_lock); @@ -77,15 +77,14 @@ void nfs_service_impl::on_copy(const ::dsn::service::copy_request &request, if (it == _handles_map.end()) // not found { - hfile = dsn_file_open(file_path.c_str(), O_RDONLY | O_BINARY, 0); + hfile = file::open(file_path.c_str(), O_RDONLY | O_BINARY, 0); if (hfile) { - file_handle_info_on_server *fh = new file_handle_info_on_server; + auto fh = std::make_shared(); fh->file_handle = hfile; fh->file_access_count = 1; fh->last_access_time = dsn_now_ms(); - _handles_map.insert( - std::pair(file_path, fh)); + _handles_map.insert(std::make_pair(file_path, std::move(fh))); } } else // found { @@ -236,10 +235,6 @@ void nfs_service_impl::close_file() // release out-of-date file handle dsn_now_ms() - fptr->last_access_time > (uint64_t)_opts.file_close_expire_time_ms) { dinfo("nfs: close file handle %s", it->first.c_str()); it = _handles_map.erase(it); - - ::dsn::error_code err = dsn_file_close(fptr->file_handle); - dassert(err == ERR_OK, "dsn_file_close failed, err = %s", err.to_string()); - delete fptr; } else it++; } diff --git a/src/dist/nfs/nfs_server_impl.h b/src/dist/nfs/nfs_server_impl.h index 884e9f4071..cd62f002d3 100644 --- a/src/dist/nfs/nfs_server_impl.h +++ b/src/dist/nfs/nfs_server_impl.h @@ -35,6 +35,8 @@ #pragma once #include #include + +#include "core/core/disk_engine.h" #include "nfs_server.h" #include "nfs_client_impl.h" @@ -86,7 +88,7 @@ class nfs_service_impl : public ::dsn::service::nfs_service, struct file_handle_info_on_server { - dsn_handle_t file_handle; + disk_file *file_handle; int32_t file_access_count; // concurrent r/w count uint64_t last_access_time; // last touch time @@ -94,6 +96,12 @@ class nfs_service_impl : public ::dsn::service::nfs_service, : file_handle(nullptr), file_access_count(0), last_access_time(0) { } + + ~file_handle_info_on_server() + { + error_code err = file::close(file_handle); + dassert(err == ERR_OK, "file::close failed, err = %s", err.to_string()); + } }; void internal_read_callback(error_code err, size_t sz, callback_para &cp); @@ -104,7 +112,7 @@ class nfs_service_impl : public ::dsn::service::nfs_service, nfs_opts &_opts; zlock _handles_map_lock; - std::unordered_map + std::unordered_map> _handles_map; // cache file handles ::dsn::task_ptr _file_close_timer; diff --git a/src/dist/replication/lib/mutation_log.cpp b/src/dist/replication/lib/mutation_log.cpp index 3c6e228da2..9c08f1b559 100644 --- a/src/dist/replication/lib/mutation_log.cpp +++ b/src/dist/replication/lib/mutation_log.cpp @@ -1745,7 +1745,7 @@ int mutation_log::garbage_collection(const replica_log_info_map &gc_condition, class log_file::file_streamer { public: - explicit file_streamer(dsn_handle_t fd, size_t file_offset) + explicit file_streamer(disk_file *fd, size_t file_offset) : _file_dispatched_bytes(file_offset), _file_handle(fd) { _current_buffer = _buffers + 0; @@ -1891,7 +1891,7 @@ class log_file::file_streamer // number of bytes we have issued read operations size_t _file_dispatched_bytes; - dsn_handle_t _file_handle; + disk_file *_file_handle; }; //------------------- log_file -------------------------- @@ -1940,7 +1940,7 @@ log_file::~log_file() { close(); } return nullptr; } - dsn_handle_t hfile = dsn_file_open(path, O_RDONLY | O_BINARY, 0); + disk_file *hfile = file::open(path, O_RDONLY | O_BINARY, 0); if (!hfile) { err = ERR_FILE_OPERATION_FAILED; dwarn("open log file %s failed", path); @@ -1996,7 +1996,7 @@ log_file::~log_file() { close(); } return nullptr; } - dsn_handle_t hfile = dsn_file_open(path, O_RDWR | O_CREAT | O_BINARY, 0666); + disk_file *hfile = file::open(path, O_RDWR | O_CREAT | O_BINARY, 0666); if (!hfile) { dwarn("create log %s failed", path); return nullptr; @@ -2006,7 +2006,7 @@ log_file::~log_file() { close(); } } log_file::log_file( - const char *path, dsn_handle_t handle, int index, int64_t start_offset, bool is_read) + const char *path, disk_file *handle, int index, int64_t start_offset, bool is_read) { _start_offset = start_offset; _end_offset = start_offset; @@ -2033,8 +2033,8 @@ void log_file::close() // TODO: We need better abstraction to avoid those manual stuffs.. _stream.reset(nullptr); if (_handle) { - error_code err = dsn_file_close(_handle); - dassert(err == ERR_OK, "dsn_file_close failed, err = %s", err.to_string()); + error_code err = file::close(_handle); + dassert(err == ERR_OK, "file::close failed, err = %s", err.to_string()); _handle = nullptr; } @@ -2045,8 +2045,8 @@ void log_file::flush() const dassert(!_is_read, "log file must be of write mode"); if (_handle) { - error_code err = dsn_file_flush(_handle); - dassert(err == ERR_OK, "dsn_file_flush failed, err = %s", err.to_string()); + error_code err = file::flush(_handle); + dassert(err == ERR_OK, "file::flush failed, err = %s", err.to_string()); } } diff --git a/src/dist/replication/lib/mutation_log.h b/src/dist/replication/lib/mutation_log.h index bcc5db530f..e9651f7217 100644 --- a/src/dist/replication/lib/mutation_log.h +++ b/src/dist/replication/lib/mutation_log.h @@ -608,7 +608,7 @@ class log_file : public ref_counter private: // make private, user should create log_file through open_read() or open_write() - log_file(const char *path, dsn_handle_t handle, int index, int64_t start_offset, bool is_read); + log_file(const char *path, disk_file *handle, int index, int64_t start_offset, bool is_read); private: uint32_t _crc32; @@ -617,7 +617,7 @@ class log_file : public ref_counter _end_offset; // end offset in the global space: end_offset = start_offset + file_size class file_streamer; std::unique_ptr _stream; - dsn_handle_t _handle; // file handle + disk_file *_handle; // file handle bool _is_read; // if opened for read or write std::string _path; // file path int _index; // file index diff --git a/src/dist/replication/meta_server/meta_state_service_simple.cpp b/src/dist/replication/meta_server/meta_state_service_simple.cpp index 729a7e80b8..c0930b90a8 100644 --- a/src/dist/replication/meta_server/meta_state_service_simple.cpp +++ b/src/dist/replication/meta_server/meta_state_service_simple.cpp @@ -288,7 +288,7 @@ error_code meta_state_service_simple::initialize(const std::vector } } - _log = dsn_file_open(log_path.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666); + _log = file::open(log_path.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666); if (!_log) { derror("open file failed: %s", log_path.c_str()); return ERR_FILE_OPERATION_FAILED; @@ -504,7 +504,7 @@ task_ptr meta_state_service_simple::get_children(const std::string &node, meta_state_service_simple::~meta_state_service_simple() { _tracker.cancel_outstanding_tasks(); - dsn_file_close(_log); + file::close(_log); } } } diff --git a/src/dist/replication/meta_server/meta_state_service_simple.h b/src/dist/replication/meta_server/meta_state_service_simple.h index 1e920dc347..77e4c66f83 100644 --- a/src/dist/replication/meta_server/meta_state_service_simple.h +++ b/src/dist/replication/meta_server/meta_state_service_simple.h @@ -263,7 +263,7 @@ class meta_state_service_simple : public meta_state_service quick_map _quick_map; // zlock _log_lock; - dsn_handle_t _log; + disk_file *_log; uint64_t _offset; dsn::task_tracker _tracker;