MyTT icon indicating copy to clipboard operation
MyTT copied to clipboard

贡献两个函数,filter,sumbars

Open qzhjiang opened this issue 4 years ago • 5 comments

def sumbars(X, A):
    # type: (np.ndarray, Optional[np.ndarray, float, int]) -> np.ndarray
    """
    通达信SumBars函数的Python实现
    SumBars函数将X向前累加,直到大于等于A, 返回这个区间的周期数。例如SUMBARS(VOL, CAPITAL),求完全换手的周期数。

    :param X: 数组。被累计的源数据。 源数组中不能有小于0的元素。
    :param A: 数组(一组)或者浮点数(一个)或者整数(一个),累加截止的界限数
    :return:  数组。各K线分别对应的周期数
    """
    X = np.flipud(X)  # 倒转
    length = len(X)

    if isinstance(A * 1.0, float):  # 是单值
        A = np.repeat(A, length)  # 转化为数组

    sumbars = np.zeros(length)  # 初始化sumbars为0
    Sigma = np.insert(np.cumsum(X), 0, 0.0)  # 在累加值前面插入一个0.0(元素变多1个,便于引用)

    for i in range(length):
        k = np.searchsorted(Sigma[i + 1:], A[i] + Sigma[i])
        if k < length - i:  # 找到
            sumbars[length - i - 1] = k + 1

    return sumbars.astype(int)


def filter(X, n):
    # type: (np.ndarray, int) -> np.ndarray
    """
    通达信filter函数,用于过滤连续出现的信号。X满足条件后,将其后n周期内的数据置为0. 例如filter(Close>Open,13)

    :param X: 信号原数组, bool类型
    :param n: 周期
    :return: 处理后的信号数组, bool类型
    """
    i = 0
    while i < len(X):
        if X[i]:
            X[i + 1: i + n + 1] = False  # 其后的n周期数据置为False
            i += n + 1  # 直接跳转到(n+1)个数据之后
        else:
            i += 1
    #
    return X

写的不怎么优雅,但准确性和速度是没有问题的。

qzhjiang avatar Nov 19 '21 00:11 qzhjiang

感谢,不过还不够简练,近期研究后更新

mpquant avatar Nov 19 '21 08:11 mpquant

上面的sumbars函数有错误,漏掉了一条语句。 A = np.flipud(A) # 倒转(第一版漏了) 在A为数组的情况下会出错。 现在更正过来了,请用这个正确的版本:

def sumbarsFast(X, A):
    # type: (np.ndarray, Optional[np.ndarray, float, int]) -> np.ndarray
    """
    通达信SumBars函数的Python实现
    SumBars函数将X向前累加,直到大于等于A, 返回这个区间的周期数。例如SUMBARS(VOL, CAPITAL),求完全换手的周期数。

    :param X: 数组。被累计的源数据。 源数组中不能有小于0的元素。
    :param A: 数组(一组)或者浮点数(一个)或者整数(一个),累加截止的界限数
    :return:  数组。各K线分别对应的周期数
    """
    if any(X<=0):
        raise ValueError('数组X的每个元素都必须大于0!')
    
    X = np.flipud(X)  # 倒转
    length = len(X)

    if isinstance(A * 1.0, float):  # 是单值
        A = np.repeat(A, length)  # 转化为数组
    A = np.flipud(A)  # 倒转(第一版漏了)
    
    sumbars = np.zeros(length)  # 初始化sumbars为0
    Sigma = np.insert(np.cumsum(X), 0, 0.0)  # 在累加值前面插入一个0.0(元素变多1个,便于引用)

    for i in range(length):
        k = np.searchsorted(Sigma[i + 1:], A[i] + Sigma[i])
        if k < length - i:  # 找到
            sumbars[length - i - 1] = k + 1

    return sumbars.astype(int)

qzhjiang avatar Nov 19 '21 10:11 qzhjiang

感谢分享,我看两个函数里还有FOR循环 不是完全的向量化实现,请问速度如何?

yglpyn8888 avatar Nov 20 '21 12:11 yglpyn8888

感谢分享,我看两个函数里还有FOR循环 不是完全的向量化实现,请问速度如何?

是的,我的水平无法消灭这个循环,但速度还是很快的了。20W条记录,大概是784ms。

qzhjiang avatar Nov 21 '21 07:11 qzhjiang

感谢分享,我看两个函数里还有FOR循环 不是完全的向量化实现,请问速度如何?

是的,我的水平无法消灭这个循环,但速度还是很快的了。20W条记录,大概是784ms。

OK, 这个速度是相当快的~ 比我想象中快多了

yglpyn8888 avatar Nov 22 '21 13:11 yglpyn8888