pytdx
pytdx copied to clipboard
并行版本接口
串行的接口在下载历史分钟数据耗时太久,我在cn_zipline ingest分钟数据的时候需要3、4个小时才能完成。 太久的话,不知道是链接不稳定还是怎样的,有时候会报错。所以就想搞一个并行版本的接口出来。 像全市场行情的话,我的网络最快也需要1.5s左右才能拿完。
@rainx 你看下有没有必要搞这个,或者有没有其它的办法
最初版本的代码大概是这样的:
from six import PY2
if not PY2:
import queue
from datetime import datetime
class ConcurrentApi:
def __init__(self, *args, **kwargs):
self.thread_num = kwargs.pop('thread_num', 4)
self.ip = kwargs.pop('ip', '14.17.75.71')
self.executor = ThreadPoolExecutor(self.thread_num)
self.queue = queue.Queue(self.thread_num)
for i in range(self.thread_num):
api = TdxHq_API(args, kwargs)
api.connect(self.ip)
self.queue.put(api)
def __getattr__(self, item):
api = self.queue.get()
func = api.__getattribute__(item)
def wrapper(*args, **kwargs):
res = self.executor.submit(func,*args, **kwargs)
self.queue.put(api)
return res
return wrapper
# 获取股票列表,并行版
def concurrent_api(num=4):
capi = ConcurrentApi(thread_num=num)
now = datetime.now()
data = {capi.get_security_list(0, 100) for i in range(100)}
dd = [i.result() for i in data]
return (datetime.now() - now).total_seconds()
# 获取股票列表,原生版
def original_api():
api = TdxHq_API()
api.connect()
now = datetime.now()
dd = [api.get_security_list(0, 100) for i in range(100)]
return (datetime.now() - now).total_seconds()
#获取全市场行情,并行版
def concurrent_quotes(num=4):
capi = ConcurrentApi(thread_num=num, ip=best_ip)
now = datetime.now()
data = {capi.get_security_quotes(
code[80 * pos:80 * (pos + 1)]) for pos in range(int(len(code) / 80) + 1)}
dd = [i.result() for i in data]
return (datetime.now() - now).total_seconds()
#获取全市场行情,原生版
def original_quotes():
api = TdxHq_API()
api.connect(best_ip)
now = datetime.now()
data = [api.get_security_quotes(
code[80 * pos:80 * (pos + 1)]) for pos in range(int(len(code) / 80) + 1)]
return (datetime.now() - now).total_seconds()
感谢,非常精简和有效的代码, 我感觉可以放到pytdx.concurrent 模块下面,针对TdxHq_API, TdxExHq_API各做一个, 不过稍微有点担心使用的人多之后,对服务器的压力会有点大.. 最好让普通用户优先使用普通接口,然后多速度有需求的用户使用并发版本。
为了解决服务器数据源负载的问题,我在考虑后续是不是提供一个 relay server 功能,对于获取实时全行情这种特殊的场景,专门做一个relay server ,一个用户从relay sever 获取实时行情之后,可以分发给其它的机器。这样减轻服务器的负载。
@JaysonAlbert 我看了下你代码 我猜到你为啥要1.5秒了 你拿到的股票列表是全市场 包括基金 可转换债券这些的所有代码 那这样肯定要1.5秒 股票就3000多个 全部列表10000多条呢
@JaysonAlbert 另外 你这个代码写错了 少传了一个参数进去.....其实根本拿不到数据...
测试了一下
s = original_api()
print(s)
s = concurrent_api()
print(s)
Press ENTER or type command to continue
8.977931
41.095718
用的是best_ip = "14.17.75.71"
在我这里(北京电信)连接算是比较慢的,好像并发版的还耗时多一些
并发有时候 会出现一次性请求过多 被服务器拒掉 然后就timeout在那 于是就比较慢了 我估计你是这个原因 @rainx
我并发最快获取股票实时价格是270ms 但是如果是持续获取 还是按顺序是最稳定的 因为持续的并发 就会返回NONE 也就是timeout
顺序获取大致在700-800ms 我觉得是可以接受的范围内了 300ms并发 快了三倍 被拒一次 就变成5秒才能拿到 为了300ms得不偿失
@JaysonAlbert 我改了一下 可以测试用
import queue
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from pytdx.hq import TdxHq_API
import asyncio
import tushare as ts
code=ts.get_stock_basics().index.tolist()
best_ip='218.75.126.9'
def get_market(code):
code = str(code)
if code[0] in ['5', '6', '9'] or code[:3] in ["009", "126", "110", "201", "202", "203", "204"]:
return 1
return 0
def concurrent_quotes(num=2):
capi = ConcurrentApi(thread_num=num, ip=best_ip)
now = datetime.now()
data = {capi.get_security_quotes([(get_market(x), x) for x in code[90 * pos:90 * (pos + 1)]]) for pos in range(int(len(code) / 90) + 1)}
#print(data)
dd = [i.result() for i in data]
print((datetime.now() - now).total_seconds())
return dd
@rainx 因为这个只是个最初版本的代码,还没来得及优化,而且目前的话,貌似2个进程性能才有提升,进程数量变大的话,反而会变慢。以前是做c++开发的,profiler之类的用的比较多,python还没有优化性能的经验
@JaysonAlbert 这还是线程池 进程池是 concurrent的processpoolexecutor 你这个是 self.executor = ThreadPoolExecutor(self.thread_num) 还是线程池
@yutiansut 目前我那份能运行的版本在tdx的concurrent的分支里面。 这样子,我还没有仔细看过concurrent的用法。
@JaysonAlbert 恩 目前只能用concurrent来做 asyncio不行 需要await的对象 得改造了 要改 asyncio的transport和protocol 做datareceived的处理 然后自定义一个createconnection 把asyncio.transport添加进去
@JaysonAlbert 我建议 要不你设置一个50ms-100ms的timeout 总之测出来 并发容易timeout 不够稳定
我试一下python的profiler吧,看看问题出在哪里
@JaysonAlbert 恩恩 cProfiler蛮爽的
我自己的测试结果,2个线程的,我感觉用datatime.now()来计时是有问题的,应该不准。
差不多 我大概是300ms和700ms
这两天 拿全行情经常会断开,不会被封掉吧?
@1987zfp 别这么玩 我过段时间做个行情分发的东西 暂时别太猛的获取行情 封了就没啥好用的接口了
@rainx 我不知道怎么给一个你没有的分支(async) pull request
写了个asyncio版本的parser和client
https://github.com/JaysonAlbert/pytdx/tree/async/pytdx/async
测试在
https://github.com/JaysonAlbert/pytdx/blob/async/tests/test_async.py
每次请求80条k线,30次请求,异步版本快10倍左右,但是异步版本容易连接失败。
嗯,我明天研究一下吧,主要是parser部分共享挺有难度的,之前没有考虑异步的接口,所以设计上面不是特别好分离..
我个人在用的是一个多实例的方法。 clients = []
speed = pd.DataFrame([ping(x) for x in hostsip], columns=['t'])
ipidx = speed.sort_values('t').index.tolist()
cnt = min(6, len(ipidx)) #连接6个host
for i in range(0, len(ipidx)):
api = TdxHq_API(heartbeat=True, raise_exception=True, auto_retry=True)
try:
api.connect(hostsip[ipidx[i]], port)
clients.append(api)
同时链接多个服务器,这样子虽然不是使用最快的服务器,但是减轻了对单个服务器的压力,可以尽情使用。有兴趣的话,可以合并这个多线程思路,会更好用。我目前获取一个完整的snapshot大概3-5秒钟吧。应该说足够了。 我只是用了多实例,没有用多线程。所以速度上并没有什么提高,但是不容易被拒绝。
其实 可以不改造 api 而是 像我这样,在应用层面 使用多线程访问多实例,既并发,也避免了服务器拒绝。
可以薅不同的券商服务器,避免被拒绝。
@wuxin1030 嗯,多连接何以减轻单个服务器的请求,加上并发可以提高速度.. 对于对速度有要求的使用并发版本,如果没有特别的要求,使用这个多实例分别获取数据挺好的
现在有稳定快速获取全行情的方法了吗?不断开的,能封装成1个接口不?
@1987zfp 可以参考quantaxis 项目新版里的使用方式...
https://github.com/yutiansut/QUANTAXIS/blob/master/QUANTAXIS/QAFetch/QATdx_adv.py
用并发多线程测试过,没有做异步处理。每个线程完全向不同的服务器发送请求,发送前测试过每个服务器的时延小于固定值(即服务器选择有效的)。 测试结论是40线程并发效果最好,再增加线程并不会提高速度。而且轮询一次,平均会有1%左右的错误,需要对于错误部分重巡。 这个跟我的代码环境和网络环境应该也有关系,大家参考。
@solensolen 要持续测试一天