workflow
workflow copied to clipboard
Windows下基于iocp的的异步文件IO
Workflow支持异步文件IO任务,具体实现目前在Linux下是由操作系统支持的异步IO系统,在非Linux的系统下是用多线程实现的。而Windows下目前也这个需求,所以欢迎熟悉iocp开发的小伙伴可以积极参与共建~
在此把原异步文件IO的流程大概梳理如下,以供参考:
- 用户层接口,我们以
create_pread_task()
为例子:
class WFTaskFactory
{
static WFFileIOTask *create_pread_task(const std::string& pathname,
void *buf,
size_t count,
off_t offset,
fio_callback_t callback);
...
- Workflow内部都是行为派生,所以用户拿到的都是
WFFileIOTask *
类型的task,而内部会根据pread行为创建一个__WFFilepreadTask
:
WFFileIOTask *WFTaskFactory::create_pread_task(const std::string& pathname,
void *buf,
size_t count,
off_t offset,
fio_callback_t callback)
{
return new __WFFilepreadTask(pathname, buf, count, offset,
WFGlobal::get_io_service(),
std::move(callback));
}
-
__WFFilepreadTask
需要实现prepare()
,供内部IOService调用,具体是做与异步文件相关的起始操作:
class __WFFilepreadTask : public WFFilepreadTask
{
protected:
virtual int prepare()
{
// 这里调用了IOSession层的prep_preadv(),不同系统实现不一样;
}
- 在Linux和Windows中,WFFileTask和IORequest的定义都是一样的。不同点在于上面提到的IOService和IOSession。 IOService是接管所有文件异步IO的服务,IOSession是一次IO请求的上下文,需要根据Windows下iocp的机制来具体实现。 虽然Linux使用了系统的libaio,但大家要做的事情是类似:
class IOService
{
public:
int request(IOSession *session); // 用于用户提交一个文件io任务
private:
int event_fd; // 用于结合libaio机制的eventfd,多个IO事件也只用一个,希望在windows下也尽量少占用系统资源
private:
struct list_head session_list; // 用链表管理了此时发出的多个任务
private:
static void *aio_finish(void *context); // 用于结合libaio机制的回调函数,有事件通知会回到这里
...
};
- 此IOService需要通过
CommScheduler::io_bind()
把自己的eventfd和回调绑定到通信器中,同理也需要io_unbind()
。其他内部接口需要根据iocp的机制按需添加。目的是做到当系统有异步事件的时候,会通过注册到通信器的机制来告诉框架,框架调起当时的那片上下文的handle(),即可回到task的逻辑中:
void Communicator::handle_aio_result(struct poller_result *res)
{
...
session->handle(state, error);
- 如果希望默认使用此异步文件服务,可以参考现在的
__FileIOService
, 从IOService
派生,并且在全局单例中提供接口供调用,这样也可以保证不用异步文件IO的用户不会创建相应资源:
class __CommManager
{
IOService *get_io_service()
{
if (!fio_flag_)
fio_service_ = new __FileIOService(&scheduler_);
...
}
}
以上是整个异步文件IO的基本流程,希望在windows下的实现同时遵循Workflow一如既往的对资源的极度节制以及对高并发的严谨。如有了解iocp的小伙伴愿意尝试欢迎随时交流。
这个help也太难了吧😓 完全实现kernel目录里IOService和IOSession两个类的接口语意就可以了,其它代码几乎不用改,除了把fd改成HANDLE。 现在有两个IOService的实现可以参考,分别是kernel目录下的IOService_linux.h和IOService_thread.h。其中IOService_thread.h用多线程模拟aio,主要用于macOS,Linux也可以用,但windows依然用不了。Windows下基于iocp可以实现。
https://github.com/sogou/workflow/blob/master/docs/tutorial-09-http_file_server.md 文档中提到"我们正在研发一套文件管理,将来用户只需要传入文件名,对跨平台更友好" 为什么不将WFTaskFactory::create_pread_task中的第一个参数fd改为std::function来作为一个回调函数 这样create_pread_task中new WFFilepreadTask后调用 平台相关的文件接口打开文件,并将文件相关信息作为参数传给回调函数 用户可以在回调函数中判断如何修改自己的读取参数,还可以判断是否继续 根据返回值我们可以让 Task成为一个bad Task直接去结束自己 好处是用户不需要再自己去调用open CreateFile这类平台相关的函数来打开文件
https://github.com/sogou/workflow/blob/master/docs/tutorial-09-http_file_server.md 文档中提到"我们正在研发一套文件管理,将来用户只需要传入文件名,对跨平台更友好" 为什么不将WFTaskFactory::create_pread_task中的第一个参数fd改为std::function来作为一个回调函数 这样create_pread_task中new WFFilepreadTask后调用 平台相关的文件接口打开文件,并将文件相关信息作为参数传给回调函数 用户可以在回调函数中判断如何修改自己的读取参数,还可以判断是否继续 根据返回值我们可以让 Task成为一个bad Task直接去结束自己 好处是用户不需要再自己去调用open CreateFile这类平台相关的函数来打开文件
文件名的接口已经写好了,忘了更新文档。 https://github.com/sogou/workflow/blob/b71d744d6d8ead2e7d44d39cdff5b4660b7cd695/src/factory/WFTaskFactory.h#L190 改std::function不现实。接口太难看了。文件名的话就可以跨平台了。
嗯....传文件名字作为参数,那现在在读取前,仍然不知道文件的大小和相关信息,这代表create_pread_task前还是要先打开文件获取到文件大小再调用create_pread_task来读取。这难道不会导致create_pread_task使用文件名字作为参数没有意义嘛
如果要读取整个文件,Linux下的做法是先用state函数,得到文件大小。 有用户问过这个问题:https://github.com/sogou/workflow/issues/632
所以说我提出将一个回调作为参数,然后文件名也作为参数 这样我们在实现文件中通过平台相关api打开文件后,可以让用户处理接下来的操作,比如具体申请多大的内存 好处是我们将所有的与平台有关的内容都写到了cpp中,隐藏了真实的文件打开流程,方便用户编写跨平台代码。 这个需求其实是很现实的,需要考虑的是怎么优雅的实现,你可能更介意std::fun的不太好看,hahahah
我们没有这种形式的接口。我们之前另一个想法是做一个WFFileSystem,用文件名创建任务的时候,由这个类来管理fd或HANDLE。你说的这些功能我更愿意用这个方式来解决。 不过现在考虑这些都太早了,毕竟还没有实现windows下的文件异步IO呢。IOSerivce的接口不是那么好实现,很多东西必须语义非常精确。
能不能麻烦解释下IOService中的各个成员的作用 包括: int init(unsigned int maxevents);中的maxevents参数 virtual void handle_stop(int error) { } virtual void handle_unbound() = 0; private: void incref(); void decref(); private: int event_fd; int ref;
能不能麻烦解释下IOService中的各个成员的作用 包括: int init(unsigned int maxevents);中的maxevents参数 virtual void handle_stop(int error) { } virtual void handle_unbound() = 0; private: void incref(); void decref(); private: int event_fd; int ref;
目前我已经理解了框架中的task如何提交到Service这种运作方式。 但是还不清楚框架源码里面的其他机制,比如单个IOSession被Service处理完成后task所在的series是如何确定它被执行完毕的 还有就是上面提到的IOService各个成员,目前对wf框架的理解还停留在“使用”阶段,我想理解其中的细节,以便可以实现wf的win平台异步文件部分。
IOService和series是两个层次的东西。最好不用混着一起看。如果你想实现iocp下的文件aio,只需要实现iocp版IOService和IOSession的接口就可以了。但这个真的挺难的。 使用上IOSerive被io_bind到Communicator,就可以通过IOService::request接口提交IOSession。而IOService使用结束后,需要调用Communicator::io_unbind。unbind过程又是异步的,unbind完成之后,IOService::handle_unbound会被调用,对象归还给外部。 已经绑定在Communicator上的IOService如果停止工作,IOService::handle_stop会被调用。停止工作有两种原因,一种是发生错误(stop的错误码为errno),另一种是Communicator::deinit()被调用(stop错误码为0)。handle_stop被调用时,IOService并未解绑,依然需要调用Communicator::io_unbind来解绑IOSerivce。 这部分代码逻辑太复杂了,几乎没有办法文字描述清楚。你有兴趣可以看看代码。IOService_thread的实现简单一些,当IOSession被请求是,实时创建一个线程进行IO。 另外,IOService和用于网络服务的CommService接口上是几乎一致的。IOService的请求来自本地调用,CommService的请求来自网络连接。
IOService和series是两个层次的东西。最好不用混着一起看。如果你想实现iocp下的文件aio,只需要实现iocp版IOService和IOSession的接口就可以了。但这个真的挺难的。 使用上IOSerive被io_bind到Communitor,就可以通过IOService::request接口提及IOSession。而IOService使用结束后,需要调用Communicator::io_unbind。unbind过程又是异步的,unbind完成之后,IOService::handle_unbound会被调用,对象归还给外部。 已经绑定在Communicator上的IOService如果停止工作,IOService::handle_stop会被调用。停止工作有两种原因,一种是发生错误(stop的错误码为errno),另一种是Communiator::deinit()被调用(stop错误码为0)。handle_stop被调用时,IOService并未解绑,依然需要调用Communicator::io_unbind来解绑IOSerivce。 这部分代码逻辑太复杂了,几乎没有办法文字描述清楚。你有兴趣可以看看代码。IOService_thread的实现简单一些,当IOSession被请求是,实时创建一个线程进行IO。 另外,IOService和用于网络服务的CommService接口上是几乎一致的。IOService的请求来自本地调用,CommSession来自网络连接。
收到,阅读源码过程中最不好理解的就是这个部分,现在有了你的描述就好说了
class __WFFilepreadTask : public WFFilepreadTask
{
public:
__WFFilepreadTask(const std::string& path, void *buf, size_t count,
off_t offset, IOService *service, fio_callback_t&& cb):
WFFilepreadTask(-1, buf, count, offset, service, std::move(cb)),
pathname(path)
{
}
protected:
virtual int prepare()
{
HANDLE handle = CreateFile(
this->pathname.c_str(),// 文件路径
GENERIC_READ,// 打开文件以进行读取
FILE_SHARE_READ,// 共享模式
NULL,// 安全属性(可以为NULL)
OPEN_EXISTING,// 打开现有文件
FILE_ATTRIBUTE_NORMAL,// 文件属性
NULL// 模板文件句柄(可以为NULL)
);
int fd = _open_osfhandle((intptr_t)handle, 0);
this->args.fd =fd;
if (this->args.fd < 0)
return -1;
return WFFilepreadTask::prepare();
}
virtual SubTask *done()
{
if (this->args.fd >= 0)
{
close(this->args.fd);
this->args.fd = -1;
}
return WFFilepreadTask::done();
}
protected:
std::string pathname;
};
class __WFFilepwriteTask : public WFFilepwriteTask
{
public:
__WFFilepwriteTask(const std::string& path, const void *buf, size_t count,
off_t offset, IOService *service, fio_callback_t&& cb):
WFFilepwriteTask(-1, buf, count, offset, service, std::move(cb)),
pathname(path)
{
}
protected:
virtual int prepare()
{
HANDLE handle = CreateFile(
this->pathname.c_str(),// 文件路径
GENERIC_READ,// 打开文件以进行读取
FILE_SHARE_READ,// 共享模式
NULL,// 安全属性(可以为NULL)
OPEN_EXISTING,// 打开现有文件
FILE_ATTRIBUTE_NORMAL,// 文件属性
NULL// 模板文件句柄(可以为NULL)
);
int fd = _open_osfhandle((intptr_t)handle, 0);
this->args.fd =fd;
if (this->args.fd < 0)
return -1;
return WFFilepwriteTask::prepare();
}
virtual SubTask *done()
{
if (this->args.fd >= 0)
{
close(this->args.fd);
this->args.fd = -1;
}
return WFFilepwriteTask::done();
}
protected:
std::string pathname;
};
class __WFFilepreadvTask : public WFFilepreadvTask
{
public:
__WFFilepreadvTask(const std::string& path, const struct iovec *iov,
int iovcnt, off_t offset, IOService *service,
fvio_callback_t&& cb) :
WFFilepreadvTask(-1, iov, iovcnt, offset, service, std::move(cb)),
pathname(path)
{
}
protected:
virtual int prepare()
{
HANDLE handle = CreateFile(
this->pathname.c_str(),// 文件路径
GENERIC_READ,// 打开文件以进行读取
FILE_SHARE_READ,// 共享模式
NULL,// 安全属性(可以为NULL)
OPEN_EXISTING,// 打开现有文件
FILE_ATTRIBUTE_NORMAL,// 文件属性
NULL// 模板文件句柄(可以为NULL)
);
int fd = _open_osfhandle((intptr_t)handle, 0);
this->args.fd =fd;
if (this->args.fd < 0)
return -1;
return WFFilepreadvTask::prepare();
}
virtual SubTask *done()
{
if (this->args.fd >= 0)
{
close(this->args.fd);
this->args.fd = -1;
}
return WFFilepreadvTask::done();
}
protected:
std::string pathname;
};
class __WFFilepwritevTask : public WFFilepwritevTask
{
public:
__WFFilepwritevTask(const std::string& path, const struct iovec *iov,
int iovcnt, off_t offset, IOService *service,
fvio_callback_t&& cb) :
WFFilepwritevTask(-1, iov, iovcnt, offset, service, std::move(cb)),
pathname(path)
{
}
protected:
virtual int prepare()
{
HANDLE handle = CreateFile(
this->pathname.c_str(),// 文件路径
GENERIC_READ,// 打开文件以进行读取
FILE_SHARE_READ,// 共享模式
NULL,// 安全属性(可以为NULL)
OPEN_EXISTING,// 打开现有文件
FILE_ATTRIBUTE_NORMAL,// 文件属性
NULL// 模板文件句柄(可以为NULL)
);
int fd = _open_osfhandle((intptr_t)handle, 0);
this->args.fd =fd;
if (this->args.fd < 0)
return -1;
return WFFilepwritevTask::prepare();
}
protected:
virtual SubTask *done()
{
if (this->args.fd >= 0)
{
close(this->args.fd);
this->args.fd = -1;
}
return WFFilepwritevTask::done();
}
protected:
std::string pathname;
};
static int __writefile_io(IOCPData *iocp_data, int timeout)
{
WriteContext *ctx = (WriteContext *)iocp_data->data.context;
int ret = WriteFile(iocp_data->data.handle, ctx->entry, ctx->count, NULL,
&iocp_data->overlap);
if (ret == 0 || WSAGetLastError() == WSA_IO_PENDING)
{
if (ret != 0 && timeout == 0)
CancelIoEx(iocp_data->data.handle, &iocp_data->overlap);
return -1;
}
errno = WSAGetLastError();
return 0; // 成功启动异步操作
}
static int __readfile_io(IOCPData *iocp_data, int timeout)
{
ReadContext *ctx = (ReadContext *)iocp_data->data.context;
int ret = ReadFile(iocp_data->data.handle, ctx->entry, ctx->msgsize, NULL,
&iocp_data->overlap);
if (ret == 0 || WSAGetLastError() == WSA_IO_PENDING)
{
if (ret != 0 && timeout == 0)
CancelIoEx(iocp_data->data.handle, &iocp_data->overlap);
return -1;
}
errno = WSAGetLastError();
return 0; // 成功启动异步操作
}
各位大神看看这个可以不