Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

*: expose disk_file that was hidden under dsn_handle_t #172

Merged
merged 10 commits into from
Oct 16, 2018
57 changes: 0 additions & 57 deletions include/dsn/c/api_layer1.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,63 +276,6 @@ typedef struct
int size;
} dsn_file_buffer_t;

/*!
open file

\param file_name filename of the file.
\param flag flags such as O_RDONLY | O_BINARY used by ::open
\param pmode permission mode used by ::open

\return file handle
*/
extern DSN_API dsn_handle_t dsn_file_open(const char *file_name, int flag, int pmode);

/*! close the file handle */
extern DSN_API dsn::error_code dsn_file_close(dsn_handle_t file);

/*! flush the buffer of the given file */
extern DSN_API dsn::error_code dsn_file_flush(dsn_handle_t file);

/*!
read file asynchronously

\param file file handle
\param buffer read buffer
\param count byte size of the read buffer
\param offset offset in the file to start reading
\param cb callback aio task to be executed on completion
*/
extern DSN_API void
dsn_file_read(dsn_handle_t file, char *buffer, int count, uint64_t offset, dsn::aio_task *cb);

/*!
write file asynchronously

\param file file handle
\param buffer write buffer
\param count byte size of the to-be-written content
\param offset offset in the file to start write
\param cb callback aio task to be executed on completion
*/
extern DSN_API void dsn_file_write(
dsn_handle_t file, const char *buffer, int count, uint64_t offset, dsn::aio_task *cb);

/*!
write file asynchronously with vector buffers

\param file file handle
\param buffers write buffers
\param buffer_count number of write buffers
\param offset offset in the file to start write
\param cb callback aio task to be executed on completion
*/
extern DSN_API void dsn_file_write_vector(dsn_handle_t file,
const dsn_file_buffer_t *buffers,
int buffer_count,
uint64_t offset,
dsn::aio_task *cb);
/*@}*/

/*!
@defgroup env Environment

Expand Down
2 changes: 2 additions & 0 deletions include/dsn/tool-api/aio_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ class aio_provider
DSN_API service_node *node() const;

// return DSN_INVALID_FILE_HANDLE if failed
// TODO(wutao1): return uint64_t instead (because we only support linux now)
virtual dsn_handle_t open(const char *file_name, int flag, int pmode) = 0;

virtual error_code close(dsn_handle_t fh) = 0;
virtual error_code flush(dsn_handle_t fh) = 0;
virtual void aio(aio_task *aio) = 0;
Expand Down
65 changes: 5 additions & 60 deletions include/dsn/tool-api/async_calls.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

#include <dsn/service_api_c.h>
#include <dsn/utility/function_traits.h>
#include <dsn/tool-api/task.h>
#include <dsn/tool-api/file_io.h>
#include <dsn/tool-api/task_tracker.h>
#include <dsn/cpp/serialization.h>

Expand Down Expand Up @@ -101,7 +101,7 @@ inline task_ptr enqueue_timer(task_code evt,
tsk->enqueue();
return tsk;
}
}
} // namespace tasking

namespace rpc {

Expand All @@ -128,7 +128,7 @@ create_rpc_response_task(dsn::message_ex *req,
req,
tracker,
[cb_fwd = std::move(callback)](
error_code err, dsn::message_ex * req, dsn::message_ex * resp) mutable {
error_code err, dsn::message_ex *req, dsn::message_ex *resp) mutable {
typename is_typed_rpc_callback<TCallback>::response_t response = {};
if (err == ERR_OK) {
unmarshall(resp, response);
Expand Down Expand Up @@ -223,60 +223,5 @@ call_wait(rpc_address server,
thread_hash,
partition_hash));
}
}

namespace file {

inline aio_task_ptr
create_aio_task(task_code code, task_tracker *tracker, aio_handler &&callback, int hash = 0)
{
aio_task_ptr t(new aio_task(code, std::move(callback), hash));
t->set_tracker((task_tracker *)tracker);
t->spec().on_task_create.execute(task::get_current_task(), t);
return t;
}

inline aio_task_ptr read(dsn_handle_t fh,
char *buffer,
int count,
uint64_t offset,
task_code callback_code,
task_tracker *tracker,
aio_handler &&callback,
int hash = 0)
{
auto tsk = create_aio_task(callback_code, tracker, std::move(callback), hash);
dsn_file_read(fh, buffer, count, offset, tsk);
return tsk;
}

inline aio_task_ptr write(dsn_handle_t fh,
const char *buffer,
int count,
uint64_t offset,
task_code callback_code,
task_tracker *tracker,
aio_handler &&callback,
int hash = 0)
{
auto tsk = create_aio_task(callback_code, tracker, std::move(callback), hash);
dsn_file_write(fh, buffer, count, offset, tsk);
return tsk;
}

inline aio_task_ptr write_vector(dsn_handle_t fh,
const dsn_file_buffer_t *buffers,
int buffer_count,
uint64_t offset,
task_code callback_code,
task_tracker *tracker,
aio_handler &&callback,
int hash = 0)
{
auto tsk = create_aio_task(callback_code, tracker, std::move(callback), hash);
dsn_file_write_vector(fh, buffers, buffer_count, offset, tsk.get());
return tsk;
}
}

} // end namespace
} // namespace rpc
} // namespace dsn
91 changes: 91 additions & 0 deletions include/dsn/tool-api/file_io.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

#pragma once

#include <dsn/tool-api/task.h>

namespace dsn {

// forward declaration
class disk_file;

namespace file {

/// open file
///
/// \param file_name filename of the file.
/// \param flag flags such as O_RDONLY | O_BINARY used by ::open
/// \param pmode permission mode used by ::open
///
/// \return file handle
///
extern disk_file *open(const char *file_name, int flag, int pmode);

/// close the file handle
extern error_code close(disk_file *file);

/// flush the buffer of the given file
extern error_code flush(disk_file *file);

inline aio_task_ptr
create_aio_task(task_code code, task_tracker *tracker, aio_handler &&callback, int hash = 0)
{
aio_task_ptr t(new aio_task(code, std::move(callback), hash));
t->set_tracker((task_tracker *)tracker);
t->spec().on_task_create.execute(task::get_current_task(), t);
return t;
}

extern aio_task_ptr read(disk_file *file,
char *buffer,
int count,
uint64_t offset,
task_code callback_code,
task_tracker *tracker,
aio_handler &&callback,
int hash = 0);

extern aio_task_ptr write(disk_file *file,
const char *buffer,
int count,
uint64_t offset,
task_code callback_code,
task_tracker *tracker,
aio_handler &&callback,
int hash = 0);

extern aio_task_ptr write_vector(disk_file *file,
const dsn_file_buffer_t *buffers,
int buffer_count,
uint64_t offset,
task_code callback_code,
task_tracker *tracker,
aio_handler &&callback,
int hash = 0);

} // namespace file
} // namespace dsn
6 changes: 3 additions & 3 deletions src/core/core/disk_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ void disk_engine::start(aio_provider *provider)
_is_running = true;
}

dsn_handle_t disk_engine::open(const char *file_name, int flag, int pmode)
disk_file *disk_engine::open(const char *file_name, int flag, int pmode)
{
dsn_handle_t nh = _provider->open(file_name, flag, pmode);
if (nh != DSN_INVALID_FILE_HANDLE) {
Expand All @@ -161,7 +161,7 @@ dsn_handle_t disk_engine::open(const char *file_name, int flag, int pmode)
}
}

error_code disk_engine::close(dsn_handle_t fh)
error_code disk_engine::close(disk_file *fh)
{
if (nullptr != fh) {
auto df = (disk_file *)fh;
Expand All @@ -173,7 +173,7 @@ error_code disk_engine::close(dsn_handle_t fh)
}
}

error_code disk_engine::flush(dsn_handle_t fh)
error_code disk_engine::flush(disk_file *fh)
{
if (nullptr != fh) {
auto df = (disk_file *)fh;
Expand Down
7 changes: 4 additions & 3 deletions src/core/core/disk_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class disk_file
aio_task *on_read_completed(aio_task *wk, error_code err, size_t size);
aio_task *on_write_completed(aio_task *wk, void *ctx, error_code err, size_t size);

// TODO(wutao1): make it uint64_t
dsn_handle_t native_handle() const { return _handle; }

private:
Expand All @@ -76,9 +77,9 @@ class disk_engine
void start(aio_provider *provider);

// asynchonous file read/write
dsn_handle_t open(const char *file_name, int flag, int pmode);
error_code close(dsn_handle_t fh);
error_code flush(dsn_handle_t fh);
disk_file *open(const char *file_name, int flag, int pmode);
error_code close(disk_file *fh);
error_code flush(disk_file *fh);
void read(aio_task *aio);
void write(aio_task *aio);

Expand Down
Loading