workflow icon indicating copy to clipboard operation
workflow copied to clipboard

基于wf现有架构,实现full-duplex 通信---channel

Open gnblao opened this issue 3 years ago • 29 comments

目的是为了更好的让wf支持,full-duplex 通信协议 websocket,quic,http2。。。。

想法: https://github.com/sogou/workflow/issues/833

channel逻辑
https://github.com/gnblao/workflow/tree/channel

现有在channe的基础上实现两种协议

  • Websocket
  • Stream [Tcp/Tcp+ssl]原始数据流
  • Quic[todo]

具体参考

https://github.com/gnblao/workflow/blob/channel/docs/about-channel.md

gnblao avatar Apr 25 '22 10:04 gnblao

这么牛逼……

Barenboim avatar Apr 25 '22 10:04 Barenboim

好家伙!我直呼好家伙!

holmes1412 avatar Apr 25 '22 10:04 holmes1412

这么牛逼……

还是大佬牛逼,打造的wf生态,很好玩~~~ 感觉nginx之后,workflow的设计实现已经上了另一新阶段咯~向大佬学习~~

gnblao avatar Apr 25 '22 10:04 gnblao

好家伙!我直呼好家伙! 我看workflow代码的时候,好家伙!我也直呼好家伙!~~

gnblao avatar Apr 25 '22 10:04 gnblao

你好用户,你的代码真是惊艳到我们了,非常感谢参与我们生态开发! 我看了一下修改的Communicator代码,有几个问题需要确认一下:

  • 你如何实现WebSocketFrame的保序问题?我们认为任何全双工通信,用户对帧的处理应该是严格有序的。所以我们之前websocket分枝才搞出CommChannel这个类,这个类里,所有的handle_in都是在一个线程上执行。但你现在的代码,好像还是走了msgqueue。这样两个handle之间好像是可以并行和乱序的。似乎有问题。
  • chanel_send_one似乎不能解决并发的问题。因为channel随时可能被关闭。如果你参考我们的CommChannel,这里我们只要established了,用户需要调用shutdown来主动关闭连接(哪怕对端已经关闭了)。在shutdown之前,都可以向channel send数据,当然send可能直接失败。这个地方我们的代码实现非常复杂。你现在的代码明显简单不少,我不太确定是不是严密。
  • 你现在每个frame的append返回1之后,都直接回到session::handle,然后重新启动session。最近,我们刚刚上线了一个模块,可能可以简化你的工作,不用让session回到handle,而是每条消息都可以被回调,并且保序(在poller线程里)。这个模块 是PackageWrapper。原理是我们的消息是可以做协议叠加的,比如SSLWrapper就是一种协议叠加器。而对于WebSocketet一个channel上的读,可以认为是一堆frame的叠加。每个frame的append返回1之后,wrapper调用next操作得到下一个frame,传输可以断续进行,无需回到session的handle。这个协议叠加器,也可以完美解决所谓粘包问题。你大概看一下这个,应该能很快理解。

Barenboim avatar Apr 25 '22 11:04 Barenboim

你好用户,你的代码真是惊艳到我们了,非常感谢参与我们生态开发! 我看了一下修改的Communicator代码,有几个问题需要确认一下:

  • 你如何实现WebSocketFrame的保序问题?我们认为任何全双工通信,用户对帧的处理应该是严格有序的。所以我们之前websocket分枝才搞出CommChannel这个类,这个类里,所有的handle_in都是在一个线程上执行。但你现在的代码,好像还是走了msgqueue。这样两个handle之间好像是可以并行和乱序的。似乎有问题。
  • chanel_send_one似乎不能解决并发的问题。因为channel随时可能被关闭。如果你参考我们的CommChannel,这里我们只要established了,用户需要调用shutdown来主动关闭连接(哪怕对端已经关闭了)。在shutdown之前,都可以向channel send数据,当然send可能直接失败。这个地方我们的代码实现非常复杂。你现在的代码明显简单不少,我不太确定是不是严密。
  • 你现在每个frame的append返回1之后,都直接回到session::handle,然后重新启动session。最近,我们刚刚上线了一个模块,可能可以简化你的工作,不用让session回到handle,而是每条消息都可以被回调,并且保序(在poller线程里)。这个模块 是PackageWrapper。原理是我们的消息是可以做协议叠加的,比如SSLWrapper就是一种协议叠加器。而对于WebSocketet一个channel上的读,可以认为是一堆frame的叠加。每个frame的append返回1之后,wrapper调用next操作得到下一个frame,传输可以断续进行,无需回到session的handle。这个协议叠加器,也可以完美解决所谓粘包问题。你大概看一下这个,应该能很快理解。

关于乱序的问题,考虑到已经在wfchannel->channel_fanout_msg_in里面进行重组了,重组的逻辑类似tcp数据重组,我在每条msg进来的时候加了个seq。以此为排序依据。排序在处理

chanel_send_one 逻辑里面,is_channel = 2表明异步或者有其他现场正在ing写,这次不写下次在写。这里又个逻辑写的时候“必须”一个个msg写操作。

最后一个问题加了CONN_STATE_ESTABLISHED,主要的逻辑这操作session 的in和out上

gnblao avatar Apr 25 '22 11:04 gnblao

你好用户,你的代码真是惊艳到我们了,非常感谢参与我们生态开发! 我看了一下修改的Communicator代码,有几个问题需要确认一下:

  • 你如何实现WebSocketFrame的保序问题?我们认为任何全双工通信,用户对帧的处理应该是严格有序的。所以我们之前websocket分枝才搞出CommChannel这个类,这个类里,所有的handle_in都是在一个线程上执行。但你现在的代码,好像还是走了msgqueue。这样两个handle之间好像是可以并行和乱序的。似乎有问题。
  • chanel_send_one似乎不能解决并发的问题。因为channel随时可能被关闭。如果你参考我们的CommChannel,这里我们只要established了,用户需要调用shutdown来主动关闭连接(哪怕对端已经关闭了)。在shutdown之前,都可以向channel send数据,当然send可能直接失败。这个地方我们的代码实现非常复杂。你现在的代码明显简单不少,我不太确定是不是严密。
  • 你现在每个frame的append返回1之后,都直接回到session::handle,然后重新启动session。最近,我们刚刚上线了一个模块,可能可以简化你的工作,不用让session回到handle,而是每条消息都可以被回调,并且保序(在poller线程里)。这个模块 是PackageWrapper。原理是我们的消息是可以做协议叠加的,比如SSLWrapper就是一种协议叠加器。而对于WebSocketet一个channel上的读,可以认为是一堆frame的叠加。每个frame的append返回1之后,wrapper调用next操作得到下一个frame,传输可以断续进行,无需回到session的handle。这个协议叠加器,也可以完美解决所谓粘包问题。你大概看一下这个,应该能很快理解。

关于乱序的问题,考虑到已经在wfchannel->channel_fanout_msg_in里面进行重组了,重组的逻辑类似tcp数据重组,我在每条msg进来的时候加了个seq。以此为排序依据。排序在处理

chanel_send_one 逻辑里面,is_channel = 2表明异步或者有其他现场正在ing写,这次不写下次在写。这里又个逻辑写的时候“必须”一个个msg写操作。

最后一个问题加了CONN_STATE_ESTABLISHED,主要的逻辑这操作session 的in和out上

其实新现实的WFChannel,只是留下Channel的名字,和之前CommChannel 的抽象已经没有任何逻辑联系了

gnblao avatar Apr 25 '22 11:04 gnblao

嗯嗯,我也看了代码,和我们的CommChannel没有关系。CommChannel那个模块我也并不是特别喜欢,所以我现在试图通过协议叠加的方式来实现流式的传输。目前我们PackageWrapper的第一个应该可能用在MySQL binlog上。

Barenboim avatar Apr 25 '22 12:04 Barenboim

你好用户,你的代码真是惊艳到我们了,非常感谢参与我们生态开发! 我看了一下修改的Communicator代码,有几个问题需要确认一下:

  • 你如何实现WebSocketFrame的保序问题?我们认为任何全双工通信,用户对帧的处理应该是严格有序的。所以我们之前websocket分枝才搞出CommChannel这个类,这个类里,所有的handle_in都是在一个线程上执行。但你现在的代码,好像还是走了msgqueue。这样两个handle之间好像是可以并行和乱序的。似乎有问题。
  • chanel_send_one似乎不能解决并发的问题。因为channel随时可能被关闭。如果你参考我们的CommChannel,这里我们只要established了,用户需要调用shutdown来主动关闭连接(哪怕对端已经关闭了)。在shutdown之前,都可以向channel send数据,当然send可能直接失败。这个地方我们的代码实现非常复杂。你现在的代码明显简单不少,我不太确定是不是严密。
  • 你现在每个frame的append返回1之后,都直接回到session::handle,然后重新启动session。最近,我们刚刚上线了一个模块,可能可以简化你的工作,不用让session回到handle,而是每条消息都可以被回调,并且保序(在poller线程里)。这个模块 是PackageWrapper。原理是我们的消息是可以做协议叠加的,比如SSLWrapper就是一种协议叠加器。而对于WebSocketet一个channel上的读,可以认为是一堆frame的叠加。每个frame的append返回1之后,wrapper调用next操作得到下一个frame,传输可以断续进行,无需回到session的handle。这个协议叠加器,也可以完美解决所谓粘包问题。你大概看一下这个,应该能很快理解。

关于乱序的问题,考虑到已经在wfchannel->channel_fanout_msg_in里面进行重组了,重组的逻辑类似tcp数据重组,我在每条msg进来的时候加了个seq。以此为排序依据。排序在处理 chanel_send_one 逻辑里面,is_channel = 2表明异步或者有其他现场正在ing写,这次不写下次在写。这里又个逻辑写的时候“必须”一个个msg写操作。 最后一个问题加了CONN_STATE_ESTABLISHED,主要的逻辑这操作session 的in和out上

其实新现实的WFChannel,只是留下Channel的名字,和之前CommChannel 的抽象已经没有任何逻辑联系了

考虑到channel_fanout_msg_in/channel_fanout_msg_out中有数据竞态问题,后续想用condwaittask进行优化了,组成任务流形式的排序。可以完全去掉mutex咯~,还请大佬,多拍拍砖

gnblao avatar Apr 25 '22 12:04 gnblao

嗯嗯,wait这块,你也可以看看我们的WFConditional和WFResourcePool。WFResourcePool是对WFConditional的一种组织形式,但WFConditaionl也可以独立使用,通过WFTaskFactory产生,唤醒方调用它的signal操作就可以。 https://github.com/sogou/workflow/blob/master/docs/about-conditional.md

Barenboim avatar Apr 25 '22 12:04 Barenboim

嗯嗯,wait这块,你也可以看看我们的WFConditional和WFResourcePool。WFResourcePool是对WFConditional的一种组织形式,但WFConditaionl也可以独立使用,通过WFTaskFactory产生,唤醒方调用它的signal操作就可以。 https://github.com/sogou/workflow/blob/master/docs/about-conditional.md

嗯嗯,其实WFCondition , WFCondtask的逻辑我搬的websocket分支里面,当时改完下了,发现WFConditional和WFResourcePool已经有,功能还有点重复了=-=

gnblao avatar Apr 25 '22 12:04 gnblao

你好用户,你的代码真是惊艳到我们了,非常感谢参与我们生态开发! 我看了一下修改的Communicator代码,有几个问题需要确认一下:

  • 你如何实现WebSocketFrame的保序问题?我们认为任何全双工通信,用户对帧的处理应该是严格有序的。所以我们之前websocket分枝才搞出CommChannel这个类,这个类里,所有的handle_in都是在一个线程上执行。但你现在的代码,好像还是走了msgqueue。这样两个handle之间好像是可以并行和乱序的。似乎有问题。
  • chanel_send_one似乎不能解决并发的问题。因为channel随时可能被关闭。如果你参考我们的CommChannel,这里我们只要established了,用户需要调用shutdown来主动关闭连接(哪怕对端已经关闭了)。在shutdown之前,都可以向channel send数据,当然send可能直接失败。这个地方我们的代码实现非常复杂。你现在的代码明显简单不少,我不太确定是不是严密。
  • 你现在每个frame的append返回1之后,都直接回到session::handle,然后重新启动session。最近,我们刚刚上线了一个模块,可能可以简化你的工作,不用让session回到handle,而是每条消息都可以被回调,并且保序(在poller线程里)。这个模块 是PackageWrapper。原理是我们的消息是可以做协议叠加的,比如SSLWrapper就是一种协议叠加器。而对于WebSocketet一个channel上的读,可以认为是一堆frame的叠加。每个frame的append返回1之后,wrapper调用next操作得到下一个frame,传输可以断续进行,无需回到session的handle。这个协议叠加器,也可以完美解决所谓粘包问题。你大概看一下这个,应该能很快理解。

关于乱序的问题,考虑到已经在wfchannel->channel_fanout_msg_in里面进行重组了,重组的逻辑类似tcp数据重组,我在每条msg进来的时候加了个seq。以此为排序依据。排序在处理

chanel_send_one 逻辑里面,is_channel = 2表明异步或者有其他现场正在ing写,这次不写下次在写。这里又个逻辑写的时候“必须”一个个msg写操作。

最后一个问题加了CONN_STATE_ESTABLISHED,主要的逻辑这操作session 的in和out上

其实这边的第三个问题是在Communicator::create_message 中利用CONN_STATE_ESTABLISHED 和session->message_in() 中的new_channel_msg_session 来解决所谓粘包问题。这个地方我有这样的思考:channel粘包数据,也是channel的next一个数据,wf架构只管message_in就可以,这个地方我使用new_channel_msg_session 来解决粘包,这样从架构上感觉更直观一些~

gnblao avatar Apr 25 '22 13:04 gnblao

你好用户,你的代码真是惊艳到我们了,非常感谢参与我们生态开发! 我看了一下修改的Communicator代码,有几个问题需要确认一下:

  • 你如何实现WebSocketFrame的保序问题?我们认为任何全双工通信,用户对帧的处理应该是严格有序的。所以我们之前websocket分枝才搞出CommChannel这个类,这个类里,所有的handle_in都是在一个线程上执行。但你现在的代码,好像还是走了msgqueue。这样两个handle之间好像是可以并行和乱序的。似乎有问题。
  • chanel_send_one似乎不能解决并发的问题。因为channel随时可能被关闭。如果你参考我们的CommChannel,这里我们只要established了,用户需要调用shutdown来主动关闭连接(哪怕对端已经关闭了)。在shutdown之前,都可以向channel send数据,当然send可能直接失败。这个地方我们的代码实现非常复杂。你现在的代码明显简单不少,我不太确定是不是严密。
  • 你现在每个frame的append返回1之后,都直接回到session::handle,然后重新启动session。最近,我们刚刚上线了一个模块,可能可以简化你的工作,不用让session回到handle,而是每条消息都可以被回调,并且保序(在poller线程里)。这个模块 是PackageWrapper。原理是我们的消息是可以做协议叠加的,比如SSLWrapper就是一种协议叠加器。而对于WebSocketet一个channel上的读,可以认为是一堆frame的叠加。每个frame的append返回1之后,wrapper调用next操作得到下一个frame,传输可以断续进行,无需回到session的handle。这个协议叠加器,也可以完美解决所谓粘包问题。你大概看一下这个,应该能很快理解。

关于乱序的问题,考虑到已经在wfchannel->channel_fanout_msg_in里面进行重组了,重组的逻辑类似tcp数据重组,我在每条msg进来的时候加了个seq。以此为排序依据。排序在处理 chanel_send_one 逻辑里面,is_channel = 2表明异步或者有其他现场正在ing写,这次不写下次在写。这里又个逻辑写的时候“必须”一个个msg写操作。 最后一个问题加了CONN_STATE_ESTABLISHED,主要的逻辑这操作session 的in和out上

其实这边的第三个问题是在Communicator::create_message 中利用CONN_STATE_ESTABLISHED 和session->message_in() 中的new_channel_msg_session 来解决所谓粘包问题。这个地方我有这样的思考:channel粘包数据,也是channel的next一个数据,wf架构只管message_in就可以,这个地方我使用new_channel_msg_session 来解决粘包,这样从架构上感觉更直观一些~

我们的PackageWrapper,就是一个message_in。它的append实现是:

int PackageWrapper::append(const void *buf, size_t *size)
{
	int ret = this->ProtocolWrapper::append(buf, size);

	if (ret > 0)
	{
		this->msg = this->next(this->msg);
		if (this->msg)
		{
			this->renew();
			ret = 0;
		}
	}

	return ret;
}

Wrapper构造的时候传进来一个msg(比如一个Websocket frame),这一行:

int ret = this->ProtocolWrapper::append(buf, size);

调用实际msg的append操作。当这个append返回>0时,一个msg操作接收完成,虚函数PackageWrapper::next()被调用。如果next返回NULL,则整个package接收完成。如果next返回下一个msg,则PackageWrapper::append()返回>0,继续收消息。 不一定适合你的用法。因为这个方法是让channel一直收消息,而不是收到一个msg就到handle。外部通过派生next函数,实现用户通知。

Barenboim avatar Apr 25 '22 14:04 Barenboim

你好用户,你的代码真是惊艳到我们了,非常感谢参与我们生态开发! 我看了一下修改的Communicator代码,有几个问题需要确认一下:

  • 你如何实现WebSocketFrame的保序问题?我们认为任何全双工通信,用户对帧的处理应该是严格有序的。所以我们之前websocket分枝才搞出CommChannel这个类,这个类里,所有的handle_in都是在一个线程上执行。但你现在的代码,好像还是走了msgqueue。这样两个handle之间好像是可以并行和乱序的。似乎有问题。
  • chanel_send_one似乎不能解决并发的问题。因为channel随时可能被关闭。如果你参考我们的CommChannel,这里我们只要established了,用户需要调用shutdown来主动关闭连接(哪怕对端已经关闭了)。在shutdown之前,都可以向channel send数据,当然send可能直接失败。这个地方我们的代码实现非常复杂。你现在的代码明显简单不少,我不太确定是不是严密。
  • 你现在每个frame的append返回1之后,都直接回到session::handle,然后重新启动session。最近,我们刚刚上线了一个模块,可能可以简化你的工作,不用让session回到handle,而是每条消息都可以被回调,并且保序(在poller线程里)。这个模块 是PackageWrapper。原理是我们的消息是可以做协议叠加的,比如SSLWrapper就是一种协议叠加器。而对于WebSocketet一个channel上的读,可以认为是一堆frame的叠加。每个frame的append返回1之后,wrapper调用next操作得到下一个frame,传输可以断续进行,无需回到session的handle。这个协议叠加器,也可以完美解决所谓粘包问题。你大概看一下这个,应该能很快理解。

关于乱序的问题,考虑到已经在wfchannel->channel_fanout_msg_in里面进行重组了,重组的逻辑类似tcp数据重组,我在每条msg进来的时候加了个seq。以此为排序依据。排序在处理 chanel_send_one 逻辑里面,is_channel = 2表明异步或者有其他现场正在ing写,这次不写下次在写。这里又个逻辑写的时候“必须”一个个msg写操作。 最后一个问题加了CONN_STATE_ESTABLISHED,主要的逻辑这操作session 的in和out上

其实这边的第三个问题是在Communicator::create_message 中利用CONN_STATE_ESTABLISHED 和session->message_in() 中的new_channel_msg_session 来解决所谓粘包问题。这个地方我有这样的思考:channel粘包数据,也是channel的next一个数据,wf架构只管message_in就可以,这个地方我使用new_channel_msg_session 来解决粘包,这样从架构上感觉更直观一些~

我们的PackageWrapper,就是一个message_in。它的append实现是:

int PackageWrapper::append(const void *buf, size_t *size)
{
	int ret = this->ProtocolWrapper::append(buf, size);

	if (ret > 0)
	{
		this->msg = this->next(this->msg);
		if (this->msg)
		{
			this->renew();
			ret = 0;
		}
	}

	return ret;
}

Wrapper构造的时候传进来一个msg(比如一个Websocket frame),这一行:

int ret = this->ProtocolWrapper::append(buf, size);

调用实际msg的append操作。当这个append返回>0时,一个msg操作接收完成,虚函数PackageWrapper::next()被调用。如果next返回NULL,则整个package接收完成。如果next返回下一个msg,则PackageWrapper::append()返回>0,继续收消息。 不一定适合你的用法。因为这个方法是让channel一直收消息,而不是收到一个msg就到handle。外部通过派生next函数,实现用户通知。

嗯,这个场景确实不太适合。wfChannel的设计场景,就是既要让channel在poller中append,同时也要让msg在工作线程中handle。

gnblao avatar Apr 25 '22 15:04 gnblao

你好用户,你的代码真是惊艳到我们了,非常感谢参与我们生态开发! 我看了一下修改的Communicator代码,有几个问题需要确认一下:

  • 你如何实现WebSocketFrame的保序问题?我们认为任何全双工通信,用户对帧的处理应该是严格有序的。所以我们之前websocket分枝才搞出CommChannel这个类,这个类里,所有的handle_in都是在一个线程上执行。但你现在的代码,好像还是走了msgqueue。这样两个handle之间好像是可以并行和乱序的。似乎有问题。
  • chanel_send_one似乎不能解决并发的问题。因为channel随时可能被关闭。如果你参考我们的CommChannel,这里我们只要established了,用户需要调用shutdown来主动关闭连接(哪怕对端已经关闭了)。在shutdown之前,都可以向channel send数据,当然send可能直接失败。这个地方我们的代码实现非常复杂。你现在的代码明显简单不少,我不太确定是不是严密。
  • 你现在每个frame的append返回1之后,都直接回到session::handle,然后重新启动session。最近,我们刚刚上线了一个模块,可能可以简化你的工作,不用让session回到handle,而是每条消息都可以被回调,并且保序(在poller线程里)。这个模块 是PackageWrapper。原理是我们的消息是可以做协议叠加的,比如SSLWrapper就是一种协议叠加器。而对于WebSocketet一个channel上的读,可以认为是一堆frame的叠加。每个frame的append返回1之后,wrapper调用next操作得到下一个frame,传输可以断续进行,无需回到session的handle。这个协议叠加器,也可以完美解决所谓粘包问题。你大概看一下这个,应该能很快理解。

想了一下,关于shutdown的问题,之前逻辑是依据read = 0/timeout/ send error 来被动执行的。缺乏主动shutdown的场景,刚刚加了一下直接利用poller_del来shutdown。这样的话,代码改动比较少,也能复用wf架构close fd的逻辑。大佬,有空闲的话,拍拍🧱~~

gnblao avatar Apr 26 '22 04:04 gnblao

CommChannel的shutdown主要是为了解决竞争问题,因为send的一瞬间,你不能确定连接会不会被关闭并释放。所以,只要连接建立成功,需要在上面增加一个ref,在shutdown时减去。这样无论如何不会出现send时连接对象不存在的问题。

Barenboim avatar Apr 26 '22 04:04 Barenboim

CommChannel的shutdown主要是为了解决竞争问题,因为send的一瞬间,你不能确定连接会不会被关闭并释放。所以,只要连接建立成功,需要在上面增加一个ref,在shutdown时减去。这样无论如何不会出现send时连接对象不存在的问题。

嗯嗯,我在wfchannel中也引入的类似的做法。复用connEntry中ref,在channel也引入了一个ref(作为msg级别的)

gnblao avatar Apr 27 '22 03:04 gnblao

py测试工具

留个简单记录

py_cli:

#!/usr/bin/env python
#coding=utf-8

import asyncio
import websockets
import sys
# 向服务器端发送认证后的消息
async def send_msg(websocket):
    while True:
        _text = input("please enter your context: ")
        if _text == "exit":
            print(f'you have enter "exit", goodbye')
            await websocket.close(reason="user exit")
            return False
        await websocket.send(_text)
        recv_text = await websocket.recv()
        print(f"{recv_text}")

# wss客户端主逻辑
ssl_context = ssl._create_unverified_context() 
async def main_logic_wss(p):
         async with websockets.connect('wss://127.0.0.1:5679', ssl=ssl_context) as websocket:
              await send_msg(websocket)

#ws
async def main_logic(p):
         async with websockets.connect('ws://127.0.0.1:5679') as websocket:
              await send_msg(websocket)
asyncio.get_event_loop().run_until_complete(main_logic(sys.argv[1]))

py_srv:

#!/usr/bin/env python3
# coding=utf-8

import asyncio
import websockets

# 接收客户端消息并处理,这里只是简单把客户端发来的返回回去
async def recv_msg(websocket):
    while True:
        recv_text = await websocket.recv()
        response_text = f"your submit context: {recv_text}"
        print(response_text)
        await websocket.send(response_text)

# 服务器端主逻辑
# websocket和path是该函数被回调时自动传过来的,不需要自己传
async def main_logic(websocket, path):
    #await check_permit(websocket)
    await recv_msg(websocket)
# 把ip换成自己本地的ip
start_server = websockets.serve(main_logic, '127.0.0.1', 5678)

#wss
#ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER, ssl_version=ssl.PROTOCOL_TLSv1_2)
#ssl_context.load_cert_chain(
#   pathlib.Path(__file__).with_name('server.crt'), pathlib.Path(__file__).with_name('server.key'))

#start_server = websockets.serve(main_logic, '127.0.0.1', 5678, ssl=ssl_context)


asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

gnblao avatar Apr 30 '22 13:04 gnblao

对于chanel send one操作,如果数据很大,需要异步写。你现在的逻辑正确吗?我原来的做法是dup一个fd。

Barenboim avatar May 01 '22 10:05 Barenboim

对于chanel send one操作,如果数据很大,需要异步写。你现在的逻辑正确吗?我原来的做法是dup一个fd。

嗯,现在想法还是保持poller一致,fd在异步同一时间只能读,或只能写。不过还是保持写优先 在channel中有个写的list,在异步写回调中检查list是否为空,不为空就继续写。 chanel send one中在触发sync的情况下要连续发送list中数据,直到触发async或list为空就返回。

上面的设想,会在极端情况下会造成read延时,不过现阶段还不用考虑,后续的话在考虑是用dup,还是改poller同时支持读写来优化吧

gnblao avatar May 02 '22 02:05 gnblao

好家伙!我直呼好家伙!

我是说楼主的头像。。。

lesismal avatar May 15 '22 14:05 lesismal

好家伙!我直呼好家伙!

我是说楼主的头像。。。

🤣🤣🤣🤦

gnblao avatar May 15 '22 15:05 gnblao

@gnblao 麻烦打开一下你项目的issue啊。

Barenboim avatar Mar 13 '23 15:03 Barenboim

@gnblao 麻烦打开一下你项目的issue啊。 哈哈,之前没注意,原以为是默认开着的~~

gnblao avatar Mar 14 '23 02:03 gnblao

大佬,关于channel功能,基本开发完了。我有想法推到你们仓库,不然在我那边基本没人用了哈~抽空帮忙review下,看看是否合适~~

gnblao avatar May 04 '23 11:05 gnblao

大佬,关于channel功能,基本开发完了。我有想法推到你们仓库,不然在我那边基本没人用了哈~抽空帮忙review下,看看是否合适~~

我可以review,也可以想办法帮你宣传。但是这个代码合进来不太现实啊,我们主分支尽量只保持必要的功能(我自己都拒绝了自己的一个channel修改)。作为一个增强功能的fork,也是很符合github的玩法的。

Barenboim avatar May 04 '23 11:05 Barenboim

大佬,关于channel功能,基本开发完了。我有想法推到你们仓库,不然在我那边基本没人用了哈~抽空帮忙review下,看看是否合适~~

我可以review,也可以想办法帮你宣传。但是这个代码合进来不太现实啊,我们主分支尽量只保持必要的功能(我自己都拒绝了自己的一个channel修改)。作为一个增强功能的fork,也是很符合github的玩法的。

了解,主分支尽量简洁~~

gnblao avatar May 04 '23 11:05 gnblao

大佬,关于channel功能,基本开发完了。我有想法推到你们仓库,不然在我那边基本没人用了哈~抽空帮忙review下,看看是否合适~~

我可以review,也可以想办法帮你宣传。但是这个代码合进来不太现实啊,我们主分支尽量只保持必要的功能(我自己都拒绝了自己的一个channel修改)。作为一个增强功能的fork,也是很符合github的玩法的。

了解,主分支尽量简洁~~

实在是很不好意思啊,到时候我在主页上加个显眼的链接到你那边吧,有这个需求的用户肯定会使用的。

Barenboim avatar May 04 '23 11:05 Barenboim

大佬,关于channel功能,基本开发完了。我有想法推到你们仓库,不然在我那边基本没人用了哈~抽空帮忙review下,看看是否合适~~

我可以review,也可以想办法帮你宣传。但是这个代码合进来不太现实啊,我们主分支尽量只保持必要的功能(我自己都拒绝了自己的一个channel修改)。作为一个增强功能的fork,也是很符合github的玩法的。

了解,主分支尽量简洁~~

实在是很不好意思啊,到时候我在主页上加个显眼的链接到你那边吧,有这个需求的用户肯定会使用的。

大佬创新设计一个架构不容易,我从中也白嫖了不少好东西,继续大佬学习,哈哈哈~

gnblao avatar May 04 '23 12:05 gnblao