ekuiper
ekuiper copied to clipboard
The little problem of Mysql.go insert file
There is a little problem of Mysql.go of sink file:
data --> item // 该函数为数据处理简化函数。
func (m *mysqlSink) Collect(ctx api.StreamContext, item interface{}) error { logger := ctx.GetLogger() v, _, err := ctx.TransformOutput(item) if err != nil { logger.Error(err) return err }
//TODO 生产环境中需要处理item unmarshall后的各种类型。
// 默认的类型为 []map[string]interface{}
// 如果sink的`dataTemplate`属性有设置,则可能为各种其他的类型
logger.Debugf("mysql sink receive %s", item)
//TODO 此处列名写死。生产环境中一般可从item中的键值对获取列名
sql := fmt.Sprintf("INSERT INTO %s (`name`) VALUES ('%s')", m.conf.Table, v)
logger.Debugf(sql)
insert, err := m.db.Query(sql)
if err != nil {
return err
}
defer insert.Close()
return nil
}
Environment:
- eKuiper version (e.g.
1.3.0
): - Hardware configuration (e.g.
lscpu
): - OS (e.g.
cat /etc/os-release
): - Others: