efinance icon indicating copy to clipboard operation
efinance copied to clipboard

如何在盘中设定定时任务更新行情数据,制作1分钟K线数据?

Open Micro-sheep opened this issue 3 years ago • 10 comments
trafficstars

示例代码如下

import numpy as np
from datetime import datetime
from threading import Thread
import time
from typing import Deque
import efinance as ef
from collections import deque
import pandas as pd
from dataclasses import dataclass


@dataclass()
class QuoteSnapshotOneMinute:
    update_time: datetime
    df: pd.DataFrame


quote_queue: Deque[QuoteSnapshotOneMinute] = deque(maxlen=10000)
pre_start_time = '09:15:00'
pre_end_time = '11:30:05'
after_start_time = '13:00:00'
after_end_time = '15:00:05'


def make_latest_kline_minute(quote_queue: Deque[QuoteSnapshotOneMinute]) -> pd.DataFrame:
    if len(quote_queue) < 2:
        return None
    o = quote_queue[-2].df
    c = quote_queue[-1].df
    df = pd.DataFrame(columns=['股票代码', '股票名称', '时间',
                      '今开', '昨收', '开盘', '收盘', '最高', '最低'], index=o.index)
    df['时间'] = quote_queue[-1].update_time.strftime('%Y-%m-%d %H:%M')
    df['今开'] = o['今开']
    df['昨收'] = o['昨日收盘']
    df['开盘'] = o['最新价']
    df['收盘'] = c['最新价']
    df['最高'] = np.select(
        [o['分钟最高'] > c['分钟最高']],
        [o['分钟最高']], c['分钟最高'])
    df['最低'] = np.select(
        [o['分钟最低'] < c['分钟最低']],
        [o['分钟最低']], c['分钟最低'])
    df['成交额'] = c['成交额'] - o['成交额']
    df['成交量'] = c['成交量'] - o['成交量']
    df['累计成交额'] = c['成交额']
    df['累计成交量'] = c['成交量']
    df['股票代码'] = o.index
    df['股票名称'] = o['股票名称']
    return df


def update_snapshot(interval: float = 1):
    """
    按一定时间间隔更新行情快照

    Parameters
    ----------
    interval : float, optional
        更新间隔,默认 1 秒

    Notes
    -----
    需要保证更新间隔秒数小于 3 秒
    """
    # 记录最新一分钟内最高价
    high_minute: pd.Series = None
    # 记录最新一分钟内最低价
    low_minute: pd.Series = None
    # 记录当前 1 分钟K线是否已经完成
    finished = False
    while 1:
        now = datetime.now()
        # 大盘数据更新时间
        update_time = datetime.strptime(ef.stock.get_latest_quote('上证指数')[
                                        '更新时间'].iloc[0], '%Y-%m-%d %H:%M:%S')
        # 非交易日
        if update_time.date() != now.date():
            break
        t = now.strftime('%H:%M:%S')
        # 非交易时间段
        if t >= after_end_time or t <= pre_start_time:
            print('不在交易时间段')
            break

        # 交易时间段
        if pre_start_time <= t <= pre_end_time or after_start_time <= t <= after_end_time:
            df = ef.stock.get_realtime_quotes()
            df = df.set_index('股票代码', drop=False).sort_index()
            df['成交额'] = pd.to_numeric(df['成交额'], errors='coerce')
            df['成交量'] = pd.to_numeric(df['成交量'], errors='coerce')
            df['最新价'] = pd.to_numeric(df['最新价'], errors='coerce')
            sn = QuoteSnapshotOneMinute(update_time, df)

            # 分钟开始
            if len(quote_queue) == 0 or quote_queue[-1].update_time.strftime('%H:%M') != sn.update_time.strftime('%H:%M'):
                if update_time.second < 5:
                    sn.df['分钟最高'] = sn.df['最新价']
                    sn.df['分钟最低'] = sn.df['最新价']

                    high_minute = sn.df['分钟最高']
                    low_minute = sn.df['分钟最低']

                    quote_queue.append(sn)
                    finished = False

                    print(f'{update_time} 记录 1 分钟开始行情快照')
            # 分钟末尾
            elif not finished and quote_queue[-1].update_time.strftime('%H:%M') == sn.update_time.strftime('%H:%M') and sn.update_time.second > 56:
                sn.df['分钟最高'] = high_minute
                sn.df['分钟最低'] = low_minute

                quote_queue.append(sn)
                finished = True
                kline = make_latest_kline_minute(quote_queue)

                # 下一分钟准备开始了 所以把分钟内最高最低价清空
                high_minute = None
                low_minute = None

                print(f'{update_time} 记录 1 分钟结束行情快照')
                print(kline)

            # 非分钟末尾 更新最低、最高价
            else:
                high_minute = np.select(
                    [sn.df['最新价'] > high_minute],
                    [sn.df['最新价']], high_minute)
                low_minute = np.select(
                    [sn.df['最新价'] < low_minute],
                    [sn.df['最新价']], low_minute)

        else:
            print(f'{now} 非盘中')
        time.sleep(interval)


print('启动新线程记录行情')
t = Thread(target=update_snapshot, daemon=True)
t.start()
t.join()

Micro-sheep avatar Jun 20 '22 09:06 Micro-sheep

大佬牛13

vensentzhou avatar Jun 23 '22 00:06 vensentzhou

您好,我做个测试了,发现了一些问题:

  1. 这个代码是把 1分钟开头的第一个快照保存下来,然后把1分钟最后一个快照保存下来,这样用两个快照来画出1分钟k线,如果1分钟k线的最高点和最低点是在1分钟内,不在1分钟的开头和结尾,这个1分钟k线是不是就不对了?

  2. 发现用:get_latest_quote 会丢包:

     		当前时间=2022-06-23 10:26:00.504031
     		行情时间=2022-06-23 10:25:54
     		睡眠一下,时间=2 秒
    
     		----》这个地丢失了 第 57 秒的数据
    
     		当前时间=2022-06-23 10:26:04.502259
     		行情时间=2022-06-23 10:26:00	  
    

这样的话,根据代码的逻辑,这个1分钟的k线就画不出来了。 如果您看到,请您看一下。 谢谢您的帮助,鞠躬致谢。

hrbcxq avatar Jun 24 '22 02:06 hrbcxq

另外,补充一下: 在 df['最高'] = np.select( [o['最高'] > c['最高']], [o['最高']], c['最高']) 代码,会发生以下错误:

ValueError: Can only compare identically-labeled Series objects 请您看一下。 谢谢您。

hrbcxq avatar Jun 24 '22 02:06 hrbcxq

您好,我做个测试了,发现了一些问题:

  1. 这个代码是把 1分钟开头的第一个快照保存下来,然后把1分钟最后一个快照保存下来,这样用两个快照来画出1分钟k线,如果1分钟k线的最高点和最低点是在1分钟内,不在1分钟的开头和结尾,这个1分钟k线是不是就不对了?
  2. 发现用:get_latest_quote 会丢包:
     		当前时间=2022-06-23 10:26:00.504031
     		行情时间=2022-06-23 10:25:54
     		睡眠一下,时间=2 秒
    
     		----》这个地丢失了 第 57 秒的数据
    
     		当前时间=2022-06-23 10:26:04.502259
     		行情时间=2022-06-23 10:26:00	  
    

这样的话,根据代码的逻辑,这个1分钟的k线就画不出来了。 如果您看到,请您看一下。 谢谢您的帮助,鞠躬致谢。

确实有问题,我把当日最高和最底当作分钟内的最高和最低了,如果要取分钟内最高最低的话,得按行情更新频率来更新最高和最低价格了

Micro-sheep avatar Jun 24 '22 02:06 Micro-sheep

另外,补充一下: 在 df['最高'] = np.select( [o['最高'] > c['最高']], [o['最高']], c['最高']) 代码,会发生以下错误:

ValueError: Can only compare identically-labeled Series objects 请您看一下。 谢谢您。

把代码更新一下,我之前我忘记加 .sort_index 了,现在已经加了,复制最新代码即可

Micro-sheep avatar Jun 24 '22 02:06 Micro-sheep

另外,补充一下: 在 df['最高'] = np.select( [o['最高'] > c['最高']], [o['最高']], c['最高']) 代码,会发生以下错误: ValueError: Can only compare identically-labeled Series objects 请您看一下。 谢谢您。

把代码更新一下,我之前我忘记加 .sort_index 了,现在已经加了,复制最新代码即可

好的,谢谢您了。你真好!

hrbcxq avatar Jun 24 '22 04:06 hrbcxq

你好,发现了新问题,在 ef.stock.get_latest_quote 中没有返回,我跟踪了一下,发现是:search_quote 中, 卡在 json_response = session.get(url, params=params).json() 这个地方了,请老大看一下,谢谢您。

hrbcxq avatar Jun 24 '22 06:06 hrbcxq

你好,发现了新问题,在 ef.stock.get_latest_quote 中没有返回,我跟踪了一下,发现是:search_quote 中, 卡在 json_response = session.get(url, params=params).json() 这个地方了,请老大看一下,谢谢您。

卡在这一般就是网络问题了,如果是在国内,最好不要挂网络代理来使用。

Micro-sheep avatar Jun 24 '22 07:06 Micro-sheep

微信图片_20220629111246 加个条件可以减少类型报错,特别是对没有成交的标的

vensentzhou avatar Jun 29 '22 03:06 vensentzhou

微信图片_20220629111246 加个条件可以减少类型报错,特别是对没有成交的标的

谢谢您。

hrbcxq avatar Jul 04 '22 03:07 hrbcxq