pytdx icon indicating copy to clipboard operation
pytdx copied to clipboard

并行版本接口

Open JaysonAlbert opened this issue 7 years ago • 33 comments

串行的接口在下载历史分钟数据耗时太久,我在cn_zipline ingest分钟数据的时候需要3、4个小时才能完成。 太久的话,不知道是链接不稳定还是怎样的,有时候会报错。所以就想搞一个并行版本的接口出来。 像全市场行情的话,我的网络最快也需要1.5s左右才能拿完。

@rainx 你看下有没有必要搞这个,或者有没有其它的办法

最初版本的代码大概是这样的:

from six import PY2

if not PY2:
    import queue
    from datetime import datetime

    class ConcurrentApi:
        def __init__(self, *args, **kwargs):
            self.thread_num = kwargs.pop('thread_num', 4)
            self.ip = kwargs.pop('ip', '14.17.75.71')
            self.executor = ThreadPoolExecutor(self.thread_num)

            self.queue = queue.Queue(self.thread_num)
            for i in range(self.thread_num):
                api = TdxHq_API(args, kwargs)
                api.connect(self.ip)
                self.queue.put(api)

        def __getattr__(self, item):
            api = self.queue.get()
            func = api.__getattribute__(item)

            def wrapper(*args, **kwargs):
                res = self.executor.submit(func,*args, **kwargs)
                self.queue.put(api)
                return res
            return wrapper

    
    # 获取股票列表,并行版
    def concurrent_api(num=4):
        capi = ConcurrentApi(thread_num=num)
        now = datetime.now()
        data = {capi.get_security_list(0, 100) for i in range(100)}
        dd = [i.result() for i in data]
        return (datetime.now() - now).total_seconds()


    # 获取股票列表,原生版
    def original_api():
        api = TdxHq_API()
        api.connect()
        now = datetime.now()
        dd = [api.get_security_list(0, 100) for i in range(100)]
        return (datetime.now() - now).total_seconds()

    
    #获取全市场行情,并行版
    def concurrent_quotes(num=4):
        capi = ConcurrentApi(thread_num=num, ip=best_ip)
        now = datetime.now()
        data = {capi.get_security_quotes(
            code[80 * pos:80 * (pos + 1)]) for pos in range(int(len(code) / 80) + 1)}

        dd = [i.result() for i in data]
        return (datetime.now() - now).total_seconds()


    #获取全市场行情,原生版
    def original_quotes():
        api = TdxHq_API()
        api.connect(best_ip)
        now = datetime.now()
        data = [api.get_security_quotes(
            code[80 * pos:80 * (pos + 1)]) for pos in range(int(len(code) / 80) + 1)]
        return (datetime.now() - now).total_seconds()

JaysonAlbert avatar Nov 01 '17 16:11 JaysonAlbert

感谢,非常精简和有效的代码, 我感觉可以放到pytdx.concurrent 模块下面,针对TdxHq_API, TdxExHq_API各做一个, 不过稍微有点担心使用的人多之后,对服务器的压力会有点大.. 最好让普通用户优先使用普通接口,然后多速度有需求的用户使用并发版本。

为了解决服务器数据源负载的问题,我在考虑后续是不是提供一个 relay server 功能,对于获取实时全行情这种特殊的场景,专门做一个relay server ,一个用户从relay sever 获取实时行情之后,可以分发给其它的机器。这样减轻服务器的负载。

rainx avatar Nov 02 '17 01:11 rainx

@JaysonAlbert 我看了下你代码 我猜到你为啥要1.5秒了 你拿到的股票列表是全市场 包括基金 可转换债券这些的所有代码 那这样肯定要1.5秒 股票就3000多个 全部列表10000多条呢

yutiansut avatar Nov 02 '17 01:11 yutiansut

@JaysonAlbert 另外 你这个代码写错了 少传了一个参数进去.....其实根本拿不到数据...

yutiansut avatar Nov 02 '17 01:11 yutiansut

测试了一下

       s = original_api()
       print(s)
       s = concurrent_api()
       print(s)

Press ENTER or type command to continue
8.977931
41.095718

用的是best_ip = "14.17.75.71" 在我这里(北京电信)连接算是比较慢的,好像并发版的还耗时多一些

rainx avatar Nov 02 '17 01:11 rainx

并发有时候 会出现一次性请求过多 被服务器拒掉 然后就timeout在那 于是就比较慢了 我估计你是这个原因 @rainx

yutiansut avatar Nov 02 '17 01:11 yutiansut

我并发最快获取股票实时价格是270ms 但是如果是持续获取 还是按顺序是最稳定的 因为持续的并发 就会返回NONE 也就是timeout

顺序获取大致在700-800ms 我觉得是可以接受的范围内了 300ms并发 快了三倍 被拒一次 就变成5秒才能拿到 为了300ms得不偿失

yutiansut avatar Nov 02 '17 01:11 yutiansut

@yutiansut 这部分代码是我从我自己的项目tdx里面截取出来的,所以不是完整代码,应该没办法直接运行的。

另外,我1.5s确实用的是只拿行情,4000多只,并不是拿了全部列表的

JaysonAlbert avatar Nov 02 '17 03:11 JaysonAlbert

@JaysonAlbert 我改了一下 可以测试用


import queue
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from pytdx.hq import TdxHq_API
import asyncio
import tushare as ts
code=ts.get_stock_basics().index.tolist()
best_ip='218.75.126.9'

def get_market(code):
    code = str(code)
    if code[0] in ['5', '6', '9'] or code[:3] in ["009", "126", "110", "201", "202", "203", "204"]:
        return 1
    return 0


def concurrent_quotes(num=2):
    capi = ConcurrentApi(thread_num=num, ip=best_ip)
    now = datetime.now()
    data = {capi.get_security_quotes([(get_market(x), x) for x in code[90 * pos:90 * (pos + 1)]]) for pos in range(int(len(code) / 90) + 1)}
    #print(data)
    dd = [i.result() for i in data]
    print((datetime.now() - now).total_seconds())
    return dd

yutiansut avatar Nov 02 '17 03:11 yutiansut

@rainx 因为这个只是个最初版本的代码,还没来得及优化,而且目前的话,貌似2个进程性能才有提升,进程数量变大的话,反而会变慢。以前是做c++开发的,profiler之类的用的比较多,python还没有优化性能的经验

JaysonAlbert avatar Nov 02 '17 03:11 JaysonAlbert

@JaysonAlbert 这还是线程池 进程池是 concurrent的processpoolexecutor 你这个是 self.executor = ThreadPoolExecutor(self.thread_num) 还是线程池

yutiansut avatar Nov 02 '17 03:11 yutiansut

@yutiansut 目前我那份能运行的版本在tdx的concurrent的分支里面。 这样子,我还没有仔细看过concurrent的用法。

JaysonAlbert avatar Nov 02 '17 03:11 JaysonAlbert

@JaysonAlbert 恩 目前只能用concurrent来做 asyncio不行 需要await的对象 得改造了 要改 asyncio的transport和protocol 做datareceived的处理 然后自定义一个createconnection 把asyncio.transport添加进去

yutiansut avatar Nov 02 '17 03:11 yutiansut

@JaysonAlbert 我建议 要不你设置一个50ms-100ms的timeout 总之测出来 并发容易timeout 不够稳定

yutiansut avatar Nov 02 '17 03:11 yutiansut

我试一下python的profiler吧,看看问题出在哪里

JaysonAlbert avatar Nov 02 '17 03:11 JaysonAlbert

@JaysonAlbert 恩恩 cProfiler蛮爽的

yutiansut avatar Nov 02 '17 03:11 yutiansut

image image 我自己的测试结果,2个线程的,我感觉用datatime.now()来计时是有问题的,应该不准。

JaysonAlbert avatar Nov 02 '17 03:11 JaysonAlbert

差不多 我大概是300ms和700ms

yutiansut avatar Nov 02 '17 06:11 yutiansut

这两天 拿全行情经常会断开,不会被封掉吧?

gamergamer8888 avatar Nov 03 '17 07:11 gamergamer8888

@1987zfp 别这么玩 我过段时间做个行情分发的东西 暂时别太猛的获取行情 封了就没啥好用的接口了

yutiansut avatar Nov 03 '17 10:11 yutiansut

@rainx 我不知道怎么给一个你没有的分支(async) pull request

写了个asyncio版本的parser和client

https://github.com/JaysonAlbert/pytdx/tree/async/pytdx/async

测试在

https://github.com/JaysonAlbert/pytdx/blob/async/tests/test_async.py

每次请求80条k线,30次请求,异步版本快10倍左右,但是异步版本容易连接失败。

image

JaysonAlbert avatar Nov 07 '17 09:11 JaysonAlbert

嗯,我明天研究一下吧,主要是parser部分共享挺有难度的,之前没有考虑异步的接口,所以设计上面不是特别好分离..

rainx avatar Nov 07 '17 09:11 rainx

我个人在用的是一个多实例的方法。 clients = []

speed = pd.DataFrame([ping(x) for x in hostsip], columns=['t'])
ipidx = speed.sort_values('t').index.tolist()

cnt = min(6, len(ipidx)) #连接6个host
for i in range(0, len(ipidx)):
    api = TdxHq_API(heartbeat=True, raise_exception=True, auto_retry=True)
    try:
        api.connect(hostsip[ipidx[i]], port)
        clients.append(api)

同时链接多个服务器,这样子虽然不是使用最快的服务器,但是减轻了对单个服务器的压力,可以尽情使用。有兴趣的话,可以合并这个多线程思路,会更好用。我目前获取一个完整的snapshot大概3-5秒钟吧。应该说足够了。 我只是用了多实例,没有用多线程。所以速度上并没有什么提高,但是不容易被拒绝。

wuxin1030 avatar Nov 11 '17 03:11 wuxin1030

其实 可以不改造 api 而是 像我这样,在应用层面 使用多线程访问多实例,既并发,也避免了服务器拒绝。

wuxin1030 avatar Nov 11 '17 03:11 wuxin1030

可以薅不同的券商服务器,避免被拒绝。

wuxin1030 avatar Nov 11 '17 03:11 wuxin1030

@wuxin1030 嗯,多连接何以减轻单个服务器的请求,加上并发可以提高速度.. 对于对速度有要求的使用并发版本,如果没有特别的要求,使用这个多实例分别获取数据挺好的

rainx avatar Nov 13 '17 01:11 rainx

现在有稳定快速获取全行情的方法了吗?不断开的,能封装成1个接口不?

gamergamer8888 avatar Nov 15 '17 13:11 gamergamer8888

@1987zfp 可以参考quantaxis 项目新版里的使用方式...

rainx avatar Nov 16 '17 00:11 rainx

https://github.com/yutiansut/QUANTAXIS/blob/master/QUANTAXIS/QAFetch/QATdx_adv.py

yutiansut avatar Nov 27 '17 05:11 yutiansut

用并发多线程测试过,没有做异步处理。每个线程完全向不同的服务器发送请求,发送前测试过每个服务器的时延小于固定值(即服务器选择有效的)。 测试结论是40线程并发效果最好,再增加线程并不会提高速度。而且轮询一次,平均会有1%左右的错误,需要对于错误部分重巡。 这个跟我的代码环境和网络环境应该也有关系,大家参考。

solenbanson avatar Dec 13 '17 03:12 solenbanson

@solensolen 要持续测试一天

yutiansut avatar Dec 13 '17 03:12 yutiansut