TDengine icon indicating copy to clipboard operation
TDengine copied to clipboard

tmq_get_table_name can't get table name

Open wx163 opened this issue 1 year ago • 11 comments

TDengine Version OS: Linux CentOS TDengine Version: 3.2.3.0.alpha

在订阅的参数中,已经增加 tmq_conf_set(conf, "msg.with.table.name", "true") ,但是在返回结果中通过 tmq_get_table_name 获取不到表名,返回结果为空,以下是c 的源码实现。

#include <assert.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <time.h> #include "taos.h"

static int running = 1; const char* topic_name = "topicname";

static int32_t msg_process(TAOS_RES* msg) { char buf[1024]; int32_t rows = 0;

const char* topicName = tmq_get_topic_name(msg); const char* dbName = tmq_get_db_name(msg); const char* tableName = tmq_get_table_name(msg); int32_t vgroupId = tmq_get_vgroup_id(msg);

printf("topic: %s\n", topicName); printf("db: %s table:%s \n", dbName,tableName); printf("vgroup id: %d\n", vgroupId);

while (1) { TAOS_ROW row = taos_fetch_row(msg); if (row == NULL) break;

TAOS_FIELD* fields = taos_fetch_fields(msg);
int32_t     numOfFields = taos_field_count(msg);
// int32_t*    length = taos_fetch_lengths(msg);
int32_t precision = taos_result_precision(msg);
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("precision: %d, row content: %s\n", precision, buf);

}

return rows; }

static int32_t init_env() { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { return -1; }

TAOS_RES* pRes; // drop database if exists printf("create database\n"); pRes = taos_query(pConn, "drop topic topicname"); if (taos_errno(pRes) != 0) { printf("error in drop topicname, reason:%s\n", taos_errstr(pRes)); } taos_free_result(pRes);

pRes = taos_query(pConn, "drop database if exists tmqdb"); if (taos_errno(pRes) != 0) { printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes)); } taos_free_result(pRes);

// create database pRes = taos_query(pConn, "create database tmqdb precision 'ns' WAL_RETENTION_PERIOD 3600"); if (taos_errno(pRes) != 0) { printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes)); goto END; } taos_free_result(pRes);

// create super table printf("create super table\n"); pRes = taos_query( pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))"); if (taos_errno(pRes) != 0) { printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes)); goto END; } taos_free_result(pRes);

// create sub tables printf("create sub tables\n"); pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')"); if (taos_errno(pRes) != 0) { printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes)); goto END; } taos_free_result(pRes);

pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')"); if (taos_errno(pRes) != 0) { printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes)); goto END; } taos_free_result(pRes);

pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')"); if (taos_errno(pRes) != 0) { printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes)); goto END; } taos_free_result(pRes);

pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')"); if (taos_errno(pRes) != 0) { printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes)); goto END; } taos_free_result(pRes);

// insert data printf("insert data into sub tables\n"); pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); goto END; } taos_free_result(pRes);

pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); goto END; } taos_free_result(pRes);

pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); goto END; } taos_free_result(pRes);

pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); goto END; } taos_free_result(pRes); taos_close(pConn); return 0;

END: taos_free_result(pRes); taos_close(pConn); return -1; }

int32_t create_topic() { printf("create topic\n"); TAOS_RES* pRes; TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { return -1; }

pRes = taos_query(pConn, "use tmqdb"); if (taos_errno(pRes) != 0) { printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes);

pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3, tbname from tmqdb.stb where c1 > 1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes);

taos_close(pConn); return 0; }

void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param); }

tmq_t* build_consumer() { tmq_conf_res_t code; tmq_t* tmq = NULL;

tmq_conf_t* conf = tmq_conf_new(); code = tmq_conf_set(conf, "enable.auto.commit", "true"); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; }

code = tmq_conf_set(conf, "msg.with.table.name", "true");

if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; }

code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } code = tmq_conf_set(conf, "group.id", "cgrpName"); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } code = tmq_conf_set(conf, "client.id", "user defined name"); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } code = tmq_conf_set(conf, "td.connect.user", "root"); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } code = tmq_conf_set(conf, "td.connect.pass", "taosdata"); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } code = tmq_conf_set(conf, "auto.offset.reset", "earliest"); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; }

tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq = tmq_consumer_new(conf, NULL, 0);

_end: tmq_conf_destroy(conf); return tmq; }

tmq_list_t* build_topic_list() { tmq_list_t* topicList = tmq_list_new(); int32_t code = tmq_list_append(topicList, topic_name); if (code) { tmq_list_destroy(topicList); return NULL; } return topicList; }

void basic_consume_loop(tmq_t* tmq) { int32_t totalRows = 0; int32_t msgCnt = 0; int32_t timeout = 5000; while (running) { TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout); if (tmqmsg) { msgCnt++; totalRows += msg_process(tmqmsg); taos_free_result(tmqmsg); } else { break; } }

fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); }

void consume_repeatly(tmq_t* tmq) { int32_t numOfAssignment = 0; tmq_topic_assignment* pAssign = NULL;

int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment); if (code != 0) { fprintf(stderr, "failed to get assignment, reason:%s", tmq_err2str(code)); }

// seek to the earliest offset for(int32_t i = 0; i < numOfAssignment; ++i) { tmq_topic_assignment* p = &pAssign[i];

code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) {
  fprintf(stderr, "failed to seek to %d, reason:%s", (int)p->begin, tmq_err2str(code));
}

}

tmq_free_assignment(pAssign);

// let's do it again basic_consume_loop(tmq); }

int main(int argc, char* argv[]) { int32_t code;

if (init_env() < 0) { return -1; }

if (create_topic() < 0) { return -1; }

tmq_t* tmq = build_consumer(); if (NULL == tmq) { fprintf(stderr, "build_consumer() fail!\n"); return -1; }

tmq_list_t* topic_list = build_topic_list(); if (NULL == topic_list) { return -1; }

if ((code = tmq_subscribe(tmq, topic_list))) { fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code)); }

tmq_list_destroy(topic_list);

basic_consume_loop(tmq);

consume_repeatly(tmq);

code = tmq_consumer_close(tmq); if (code) { fprintf(stderr, "Failed to close consumer: %s\n", tmq_err2str(code)); } else { fprintf(stderr, "Consumer closed\n"); }

return 0; }

wx163 avatar Jan 01 '24 10:01 wx163

可以加微信具体看一下情况 :

a15652223354

yu285 avatar Feb 17 '24 08:02 yu285

版本:3.3.2.0 也出现了这个问题 用的是java的驱动

TongleiSun avatar Jul 18 '24 08:07 TongleiSun

我们在看

yu285 avatar Jul 24 '24 05:07 yu285

image

wangmm0220 avatar Jul 31 '24 01:07 wangmm0220

https://docs.taosdata.com/develop/tmq/#%E6%95%B0%E6%8D%AE%E8%AE%A2%E9%98%85%E7%9B%B8%E5%85%B3%E5%8F%82%E6%95%B0 msg.with.table.name 这个参数不使用与列订阅

wangmm0220 avatar Jul 31 '24 01:07 wangmm0220

image

TongleiSun avatar Jul 31 '24 02:07 TongleiSun

image

TongleiSun avatar Jul 31 '24 02:07 TongleiSun

你好 我用的数据库的订阅方式 获取不到tableName

TongleiSun avatar Jul 31 '24 02:07 TongleiSun

d558f4e1561a0e9710608ff135ea7f9

TongleiSun avatar Jul 31 '24 02:07 TongleiSun

https://docs.taosdata.com/develop/tmq/#%E6%95%B0%E6%8D%AE%E8%AE%A2%E9%98%85%E7%9B%B8%E5%85%B3%E5%8F%82%E6%95%B0 msg.with.table.name 这个参数不使用与列订阅

上面的这个是我当时打断点的一个图

TongleiSun avatar Jul 31 '24 02:07 TongleiSun

你好 我用的数据库的订阅方式 获取不到tableName

我看上面c 代码里用的是查询订阅

wangmm0220 avatar Jul 31 '24 08:07 wangmm0220