workflow icon indicating copy to clipboard operation
workflow copied to clipboard

Windows下基于iocp的的异步文件IO

Open holmes1412 opened this issue 3 years ago • 12 comments

Workflow支持异步文件IO任务,具体实现目前在Linux下是由操作系统支持的异步IO系统,在非Linux的系统下是用多线程实现的。而Windows下目前也这个需求,所以欢迎熟悉iocp开发的小伙伴可以积极参与共建~

在此把原异步文件IO的流程大概梳理如下,以供参考:

  1. 用户层接口,我们以create_pread_task()为例子:
class WFTaskFactory                                                             
{
    static WFFileIOTask *create_pread_task(const std::string& pathname,                               
                                           void *buf,                              
                                           size_t count,                           
                                           off_t offset,                           
                                           fio_callback_t callback);
    ...
  1. Workflow内部都是行为派生,所以用户拿到的都是WFFileIOTask *类型的task,而内部会根据pread行为创建一个__WFFilepreadTask
WFFileIOTask *WFTaskFactory::create_pread_task(const std::string& pathname,                            
                                               void *buf,                          
                                               size_t count,                       
                                               off_t offset,                       
                                               fio_callback_t callback)            
{                                                                                  
    return new __WFFilepreadTask(pathname, buf, count, offset,                             
                                 WFGlobal::get_io_service(),                         
                                 std::move(callback));                          
}
  1. __WFFilepreadTask需要实现prepare(),供内部IOService调用,具体是做与异步文件相关的起始操作:
class __WFFilepreadTask : public WFFilepreadTask                          
{ 
protected:                                                                         
    virtual int prepare()
    {
        // 这里调用了IOSession层的prep_preadv(),不同系统实现不一样;
    }
  1. 在Linux和Windows中,WFFileTask和IORequest的定义都是一样的。不同点在于上面提到的IOService和IOSession。 IOService是接管所有文件异步IO的服务,IOSession是一次IO请求的上下文,需要根据Windows下iocp的机制来具体实现。 虽然Linux使用了系统的libaio,但大家要做的事情是类似:
class IOService                                                                    
{                                                                                  
public:   
    int request(IOSession *session); // 用于用户提交一个文件io任务

private:                                                                           
    int event_fd;  // 用于结合libaio机制的eventfd,多个IO事件也只用一个,希望在windows下也尽量少占用系统资源
                                                                
private:                                                                        
    struct list_head session_list; // 用链表管理了此时发出的多个任务                                    
                                                               
private:                                                                        
    static void *aio_finish(void *context); // 用于结合libaio机制的回调函数,有事件通知会回到这里

    ...
};
  1. 此IOService需要通过CommScheduler::io_bind()把自己的eventfd和回调绑定到通信器中,同理也需要io_unbind()。其他内部接口需要根据iocp的机制按需添加。目的是做到当系统有异步事件的时候,会通过注册到通信器的机制来告诉框架,框架调起当时的那片上下文的handle(),即可回到task的逻辑中:
void Communicator::handle_aio_result(struct poller_result *res) 
{
    ...
    session->handle(state, error);

  1. 如果希望默认使用此异步文件服务,可以参考现在的__FileIOService, 从IOService派生,并且在全局单例中提供接口供调用,这样也可以保证不用异步文件IO的用户不会创建相应资源:
class __CommManager
{
    IOService *get_io_service()
    {
        if (!fio_flag_)
            fio_service_ = new __FileIOService(&scheduler_);
        ...
    }
}

以上是整个异步文件IO的基本流程,希望在windows下的实现同时遵循Workflow一如既往的对资源的极度节制以及对高并发的严谨。如有了解iocp的小伙伴愿意尝试欢迎随时交流。

holmes1412 avatar Nov 10 '21 14:11 holmes1412

这个help也太难了吧😓 完全实现kernel目录里IOService和IOSession两个类的接口语意就可以了,其它代码几乎不用改,除了把fd改成HANDLE。 现在有两个IOService的实现可以参考,分别是kernel目录下的IOService_linux.h和IOService_thread.h。其中IOService_thread.h用多线程模拟aio,主要用于macOS,Linux也可以用,但windows依然用不了。Windows下基于iocp可以实现。

Barenboim avatar Nov 10 '21 15:11 Barenboim

https://github.com/sogou/workflow/blob/master/docs/tutorial-09-http_file_server.md 文档中提到"我们正在研发一套文件管理,将来用户只需要传入文件名,对跨平台更友好" 为什么不将WFTaskFactory::create_pread_task中的第一个参数fd改为std::function来作为一个回调函数 这样create_pread_task中new WFFilepreadTask后调用 平台相关的文件接口打开文件,并将文件相关信息作为参数传给回调函数 用户可以在回调函数中判断如何修改自己的读取参数,还可以判断是否继续 根据返回值我们可以让 Task成为一个bad Task直接去结束自己 好处是用户不需要再自己去调用open CreateFile这类平台相关的函数来打开文件

lainswork avatar Nov 10 '21 15:11 lainswork

https://github.com/sogou/workflow/blob/master/docs/tutorial-09-http_file_server.md 文档中提到"我们正在研发一套文件管理,将来用户只需要传入文件名,对跨平台更友好" 为什么不将WFTaskFactory::create_pread_task中的第一个参数fd改为std::function来作为一个回调函数 这样create_pread_task中new WFFilepreadTask后调用 平台相关的文件接口打开文件,并将文件相关信息作为参数传给回调函数 用户可以在回调函数中判断如何修改自己的读取参数,还可以判断是否继续 根据返回值我们可以让 Task成为一个bad Task直接去结束自己 好处是用户不需要再自己去调用open CreateFile这类平台相关的函数来打开文件

文件名的接口已经写好了,忘了更新文档。 https://github.com/sogou/workflow/blob/b71d744d6d8ead2e7d44d39cdff5b4660b7cd695/src/factory/WFTaskFactory.h#L190 改std::function不现实。接口太难看了。文件名的话就可以跨平台了。

Barenboim avatar Nov 10 '21 15:11 Barenboim

嗯....传文件名字作为参数,那现在在读取前,仍然不知道文件的大小和相关信息,这代表create_pread_task前还是要先打开文件获取到文件大小再调用create_pread_task来读取。这难道不会导致create_pread_task使用文件名字作为参数没有意义嘛

lainswork avatar Nov 11 '21 01:11 lainswork

如果要读取整个文件,Linux下的做法是先用state函数,得到文件大小。 有用户问过这个问题:https://github.com/sogou/workflow/issues/632

Barenboim avatar Nov 11 '21 02:11 Barenboim

所以说我提出将一个回调作为参数,然后文件名也作为参数 这样我们在实现文件中通过平台相关api打开文件后,可以让用户处理接下来的操作,比如具体申请多大的内存 好处是我们将所有的与平台有关的内容都写到了cpp中,隐藏了真实的文件打开流程,方便用户编写跨平台代码。 这个需求其实是很现实的,需要考虑的是怎么优雅的实现,你可能更介意std::fun的不太好看,hahahah

lainswork avatar Nov 11 '21 03:11 lainswork

我们没有这种形式的接口。我们之前另一个想法是做一个WFFileSystem,用文件名创建任务的时候,由这个类来管理fd或HANDLE。你说的这些功能我更愿意用这个方式来解决。 不过现在考虑这些都太早了,毕竟还没有实现windows下的文件异步IO呢。IOSerivce的接口不是那么好实现,很多东西必须语义非常精确。

Barenboim avatar Nov 11 '21 05:11 Barenboim

能不能麻烦解释下IOService中的各个成员的作用 包括: int init(unsigned int maxevents);中的maxevents参数 virtual void handle_stop(int error) { } virtual void handle_unbound() = 0; private: void incref(); void decref(); private: int event_fd; int ref;

lainswork avatar Nov 12 '21 01:11 lainswork

能不能麻烦解释下IOService中的各个成员的作用 包括: int init(unsigned int maxevents);中的maxevents参数 virtual void handle_stop(int error) { } virtual void handle_unbound() = 0; private: void incref(); void decref(); private: int event_fd; int ref;

目前我已经理解了框架中的task如何提交到Service这种运作方式。 但是还不清楚框架源码里面的其他机制,比如单个IOSession被Service处理完成后task所在的series是如何确定它被执行完毕的 还有就是上面提到的IOService各个成员,目前对wf框架的理解还停留在“使用”阶段,我想理解其中的细节,以便可以实现wf的win平台异步文件部分。

lainswork avatar Nov 14 '21 14:11 lainswork

IOService和series是两个层次的东西。最好不用混着一起看。如果你想实现iocp下的文件aio,只需要实现iocp版IOService和IOSession的接口就可以了。但这个真的挺难的。 使用上IOSerive被io_bind到Communicator,就可以通过IOService::request接口提交IOSession。而IOService使用结束后,需要调用Communicator::io_unbind。unbind过程又是异步的,unbind完成之后,IOService::handle_unbound会被调用,对象归还给外部。 已经绑定在Communicator上的IOService如果停止工作,IOService::handle_stop会被调用。停止工作有两种原因,一种是发生错误(stop的错误码为errno),另一种是Communicator::deinit()被调用(stop错误码为0)。handle_stop被调用时,IOService并未解绑,依然需要调用Communicator::io_unbind来解绑IOSerivce。 这部分代码逻辑太复杂了,几乎没有办法文字描述清楚。你有兴趣可以看看代码。IOService_thread的实现简单一些,当IOSession被请求是,实时创建一个线程进行IO。 另外,IOService和用于网络服务的CommService接口上是几乎一致的。IOService的请求来自本地调用,CommService的请求来自网络连接。

Barenboim avatar Nov 14 '21 14:11 Barenboim

IOService和series是两个层次的东西。最好不用混着一起看。如果你想实现iocp下的文件aio,只需要实现iocp版IOService和IOSession的接口就可以了。但这个真的挺难的。 使用上IOSerive被io_bind到Communitor,就可以通过IOService::request接口提及IOSession。而IOService使用结束后,需要调用Communicator::io_unbind。unbind过程又是异步的,unbind完成之后,IOService::handle_unbound会被调用,对象归还给外部。 已经绑定在Communicator上的IOService如果停止工作,IOService::handle_stop会被调用。停止工作有两种原因,一种是发生错误(stop的错误码为errno),另一种是Communiator::deinit()被调用(stop错误码为0)。handle_stop被调用时,IOService并未解绑,依然需要调用Communicator::io_unbind来解绑IOSerivce。 这部分代码逻辑太复杂了,几乎没有办法文字描述清楚。你有兴趣可以看看代码。IOService_thread的实现简单一些,当IOSession被请求是,实时创建一个线程进行IO。 另外,IOService和用于网络服务的CommService接口上是几乎一致的。IOService的请求来自本地调用,CommSession来自网络连接。

收到,阅读源码过程中最不好理解的就是这个部分,现在有了你的描述就好说了

lainswork avatar Nov 14 '21 15:11 lainswork

class __WFFilepreadTask : public WFFilepreadTask
{
public:
    __WFFilepreadTask(const std::string& path, void *buf, size_t count,
                      off_t offset, IOService *service, fio_callback_t&& cb):
        WFFilepreadTask(-1, buf, count, offset, service, std::move(cb)),
        pathname(path)
    {
    }

protected:
    virtual int prepare()
    {

       HANDLE handle = CreateFile(
            this->pathname.c_str(),// 文件路径
            GENERIC_READ,// 打开文件以进行读取
            FILE_SHARE_READ,// 共享模式
            NULL,// 安全属性(可以为NULL)
            OPEN_EXISTING,// 打开现有文件
            FILE_ATTRIBUTE_NORMAL,// 文件属性
            NULL// 模板文件句柄(可以为NULL)
           );
       int fd = _open_osfhandle((intptr_t)handle, 0);
       this->args.fd =fd;

        if (this->args.fd < 0)
            return -1;

        return WFFilepreadTask::prepare();
    }

    virtual SubTask *done()
    {
        if (this->args.fd >= 0)
        {
              close(this->args.fd);
            this->args.fd = -1;
        }

        return WFFilepreadTask::done();
    }

protected:
    std::string pathname;
};

class __WFFilepwriteTask : public WFFilepwriteTask
{
public:
    __WFFilepwriteTask(const std::string& path, const void *buf, size_t count,
                       off_t offset, IOService *service, fio_callback_t&& cb):
        WFFilepwriteTask(-1, buf, count, offset, service, std::move(cb)),
        pathname(path)
    {
    }

protected:
    virtual int prepare()
    {
        HANDLE handle = CreateFile(
            this->pathname.c_str(),// 文件路径
            GENERIC_READ,// 打开文件以进行读取
            FILE_SHARE_READ,// 共享模式
            NULL,// 安全属性(可以为NULL)
            OPEN_EXISTING,// 打开现有文件
            FILE_ATTRIBUTE_NORMAL,// 文件属性
            NULL// 模板文件句柄(可以为NULL)
            );
        int fd = _open_osfhandle((intptr_t)handle, 0);
        this->args.fd =fd;
        if (this->args.fd < 0)
            return -1;

        return WFFilepwriteTask::prepare();
    }

    virtual SubTask *done()
    {
        if (this->args.fd >= 0)
        {
             close(this->args.fd);
            this->args.fd = -1;
        }

        return WFFilepwriteTask::done();
    }

protected:
    std::string pathname;
};

class __WFFilepreadvTask : public WFFilepreadvTask
{
public:
    __WFFilepreadvTask(const std::string& path, const struct iovec *iov,
                       int iovcnt, off_t offset, IOService *service,
                       fvio_callback_t&& cb) :
        WFFilepreadvTask(-1, iov, iovcnt, offset, service, std::move(cb)),
        pathname(path)
    {
    }

protected:
    virtual int prepare()
    {
        HANDLE handle = CreateFile(
            this->pathname.c_str(),// 文件路径
            GENERIC_READ,// 打开文件以进行读取
            FILE_SHARE_READ,// 共享模式
            NULL,// 安全属性(可以为NULL)
            OPEN_EXISTING,// 打开现有文件
            FILE_ATTRIBUTE_NORMAL,// 文件属性
            NULL// 模板文件句柄(可以为NULL)
            );
        int fd = _open_osfhandle((intptr_t)handle, 0);
        this->args.fd =fd;
        if (this->args.fd < 0)
            return -1;

        return WFFilepreadvTask::prepare();
    }

    virtual SubTask *done()
    {
        if (this->args.fd >= 0)
        {
           close(this->args.fd);
            this->args.fd = -1;
        }

        return WFFilepreadvTask::done();
    }

protected:
    std::string pathname;
};

class __WFFilepwritevTask : public WFFilepwritevTask
{
public:
    __WFFilepwritevTask(const std::string& path, const struct iovec *iov,
                        int iovcnt, off_t offset, IOService *service,
                        fvio_callback_t&& cb) :
        WFFilepwritevTask(-1, iov, iovcnt, offset, service, std::move(cb)),
        pathname(path)
    {
    }

protected:
    virtual int prepare()
    {
        HANDLE handle = CreateFile(
            this->pathname.c_str(),// 文件路径
            GENERIC_READ,// 打开文件以进行读取
            FILE_SHARE_READ,// 共享模式
            NULL,// 安全属性(可以为NULL)
            OPEN_EXISTING,// 打开现有文件
            FILE_ATTRIBUTE_NORMAL,// 文件属性
            NULL// 模板文件句柄(可以为NULL)
            );
        int fd = _open_osfhandle((intptr_t)handle, 0);
        this->args.fd =fd;
        if (this->args.fd < 0)
            return -1;

        return WFFilepwritevTask::prepare();
    }

protected:
    virtual SubTask *done()
    {
        if (this->args.fd >= 0)
        {
          close(this->args.fd);
            this->args.fd = -1;
        }

        return WFFilepwritevTask::done();
    }

protected:
    std::string pathname;
};

static int __writefile_io(IOCPData *iocp_data, int timeout)
{

	WriteContext *ctx = (WriteContext *)iocp_data->data.context;

	int ret = WriteFile(iocp_data->data.handle, ctx->entry, ctx->count, NULL,
						&iocp_data->overlap);
	if (ret == 0 || WSAGetLastError() == WSA_IO_PENDING)
	{
		if (ret != 0 && timeout == 0)
			CancelIoEx(iocp_data->data.handle, &iocp_data->overlap);

		return -1;
	}
	errno = WSAGetLastError();
	return 0; // 成功启动异步操作
}

static int __readfile_io(IOCPData *iocp_data, int timeout)
{

	ReadContext *ctx = (ReadContext *)iocp_data->data.context;
	int ret = ReadFile(iocp_data->data.handle, ctx->entry, ctx->msgsize, NULL,
					   &iocp_data->overlap);

	if (ret == 0 || WSAGetLastError() == WSA_IO_PENDING)
	{
		if (ret != 0 && timeout == 0)
			CancelIoEx(iocp_data->data.handle, &iocp_data->overlap);

		return -1;
	}
	errno = WSAGetLastError();
	return 0; // 成功启动异步操作
}

各位大神看看这个可以不

519984307 avatar Apr 07 '24 12:04 519984307