TDengine icon indicating copy to clipboard operation
TDengine copied to clipboard

python api原生连接参数绑定写入数据的奇怪bug

Open myyhuster opened this issue 6 months ago • 1 comments

问题: 通过python api建立原生连接,然后用参数绑定往TD表中批量写入数据(使用超级表模版自动建表)。有一个标签(dep_date)的值总是写入错误,无论设置什么值,写入后都是同一个值。 表结构:

--show create database air_line;
CREATE DATABASE `air_line` BUFFER 16 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 1d WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 STT_TRIGGER 2 KEEP 30d,30d,30d PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0 TABLE_PREFIX 0 TABLE_SUFFIX 0 TSDB_PAGESIZE 4 WAL_RETENTION_PERIOD 3600 WAL_RETENTION_SIZE 0 KEEP_TIME_OFFSET 0 ENCRYPT_ALGORITHM 'none' S3_CHUNKPAGES 131072 S3_KEEPLOCAL 525600m S3_COMPACT 1 COMPACT_INTERVAL 0d COMPACT_TIME_RANGE 0d,0d COMPACT_TIME_OFFSET 0h

--show create stable airline_price;
CREATE STABLE `airline_price` (`ts` TIMESTAMP ENCODE 'delta-i' COMPRESS 'lz4' LEVEL 'medium', `dep_datetime` TIMESTAMP ENCODE 'delta-i' COMPRESS 'lz4' LEVEL 'medium', `arr_datetime` TIMESTAMP ENCODE 'delta-i' COMPRESS 'lz4' LEVEL 'medium', `base_cabin` VARCHAR(10) ENCODE 'disabled' COMPRESS 'zstd' LEVEL 'medium', `cabin` VARCHAR(10) ENCODE 'disabled' COMPRESS 'zstd' LEVEL 'medium', `discount` FLOAT ENCODE 'delta-d' COMPRESS 'lz4' LEVEL 'medium', `price` FLOAT ENCODE 'delta-d' COMPRESS 'lz4' LEVEL 'medium', `seat_left` INT ENCODE 'simple8b' COMPRESS 'lz4' LEVEL 'medium', `record_id` VARCHAR(64) ENCODE 'disabled' COMPRESS 'zstd' LEVEL 'medium') TAGS (`is_stop` BOOL, `stop_code` VARCHAR(10), `is_share` BOOL, `carrier_fly` VARCHAR(10), `ac_code` VARCHAR(5), `dep_code` VARCHAR(5), `arr_code` VARCHAR(5), `fly_no` VARCHAR(10), `dep_date` VARCHAR(20))

代码:

# td原生连接
conn = taos.connect(host="xxx.xxx.xxx.xx,
                    port=6030,
                    user="xxxxx",
                    password="xxxxx",
                    database='xxxxx')

# 构造数据点,主要关注dep_date和dep_datetime
def build_td_point(ac_code, dep_code, arr_code, fly_no, dep_datetime, arr_datetime, 
                     cabin, discount, price, seat_left, record_id, is_stop=0, is_share=0, stop_code='', carrier_fly='') -> dict:
    tags = {
        'is_stop': is_stop,
        'is_share': is_share,
        'ac_code': ac_code,
        'dep_code': dep_code,
        'arr_code': arr_code,
        'fly_no': fly_no,
        # dep_date 是取的dep_datetime日期部分
        'dep_date': dep_datetime[:10]
    }
    fields = {
        'dep_datetime': dep_datetime,
        'arr_datetime': arr_datetime,
        'base_cabin': 'Y',
        'cabin': cabin,
        'discount': discount,
        'price': price,
        'seat_left': seat_left,
        'record_id': record_id
    }

    if is_stop == 1:
        tags.update({
            'stop_code': stop_code
        })
    if is_share == 1:
        tags.update({
            'carrier_fly': carrier_fly
        })
    return {
            'table_name': f'{dep_code}-{arr_code}-{fly_no}-{dep_datetime[:10]}', 
            'tags': tags,
            'fields': fields
            }


# 通过上述build_td_point函数构造了N个数据点
# 写入数据
def write_td_points(points):
    stmt2 = None
    try:
        # 3 prepare
        sql = '''INSERT INTO ? USING xxxx (is_stop, stop_code, is_share, carrier_fly, ac_code, dep_code, arr_code, fly_no, dep_date) 
        TAGS(?,?,?,?,?,?,?,?,?) 
        VALUES (?,?,?,?,?,?,?,?,?)'''
        stmt2 = conn.statement2(sql)
    
        tbnames = []
        tags    = []
        datas   = []
        for point in points:
            raw_fields = point['fields']
            raw_tags = point['tags']
            # tbnames
            tbnames.append(point['table_name'])
            # tags
            tags.append([
                raw_tags.get('is_stop', 0),
                raw_tags.get('stop_code', ''),
                raw_tags.get('is_share', 0),
                raw_tags.get('carrier_fly', ''),
                raw_tags.get('ac_code', ''),
                raw_tags.get('dep_code', ''),
                raw_tags.get('arr_code', ''),
                raw_tags.get('fly_no', ''),
                raw_tags.get('dep_date', '')
            ])
            # datas
            timestamps = [int(datetime.now().timestamp() * 1000)]
            dep_datetime = [raw_fields['dep_datetime']]
            arr_datetime = [raw_fields['arr_datetime']]
            base_cabin = ['Y']
            cabin = [raw_fields['cabin']]
            discount = [raw_fields['discount']]
            price = [raw_fields['price']]
            seat_left = [raw_fields['seat_left']]
            record_id = [raw_fields['record_id']]
            
            datas.append([timestamps, dep_datetime, arr_datetime, base_cabin, cabin, discount, price, seat_left, record_id])
    
        # 4 bind param
        stmt2.bind_param(tbnames, tags, datas)
    
        # 5 execute
        stmt2.execute()
    
        # show 
        logger.info(f"Successfully inserted to td, points {len(points)}")
    
    except Exception as err:
        logger.exception(f"Failed to insert to table air_line.airline_price using stmt2") 
    finally:
        if stmt2:
            stmt2.close()

现象: Image 如图,我写入了一些数据,但是query出来dep_date(图中红框2)是错的,它应该是跟dep_datetime或arr_datetime(图中红框1)的日期是一样的。如图可以看到dep_date的值统一为'2025-05-22', 这是我第一次往TD数据库中写数据的日期。

环境: TDengine服务器:TDengine-server-3.3.6.6-Linux-x64.tar.gz TDengine客户端:TDengine-client-3.3.6.6-Linux-x64.tar.gz Python版本: 3.10.14 taos-ws-py:0.5.1 taospy:2.8.1

这个问题困扰我很久了,写入的数据不可用。辛苦帮忙看看,需要提供什么context随时联系我,电话/微信也可15907117680。

myyhuster avatar May 30 '25 17:05 myyhuster

收到 我们看下

yu285 avatar Jun 03 '25 05:06 yu285