go-clickhouse
go-clickhouse copied to clipboard
CK server `Code:1000` error
initially, we use github.com/kshvakov/clickhouse for a little while. but we found it write data much slower than java http client (the only one listed in ck document). (snmpdump shows 10x smaller package size compare to java http client)


so we replaced it with this driver and outcome is good (similar performance as java http client). there are only minor flaws: once we start the program ck server pop out many Code: 1000 error. which never happens before. even with github.com/kshvakov/clickhouse or java client.
since db/sql provider high level abstraction. so we don't touch the code to write clickhouse while replacing the driver. we only modifed some args to setup the connection.
main code looks like this
var conn *sql.DB
var bulkChan = make(bulk, 0)
func (t *ckTable) bulkWrite(conn *sql.DB, bulk *ClickhouseBulk) error {
tx, err := conn.Begin()
if err != nil {
return err
}
// odd... this has to be added while using `github.com/kshvakov/clickhouse`, otherwise OOM
// with this driver. even comment this out, `Code:1000` remains
defer tx.Rollback()
stmt1, err := tx.Prepare()
if err != nil {
return err
}
// comment this out, `Code:1000` remains
defer stmt1.Close()
for _, row := range bulk {
err = stmt1.exec(row...)
// sampled log err
}
return tx.Commit()
}
// worker goroutine
go func(){
for blk := range bulkChan {
table := tables[blk.table]
table.bulkWrite(conn, bulk)
}
}()
no where to handle connection anywhere in our code.... and the worker goroutine never exit

How you do create a connection? can you please share options? how often you put data into bulkChan? Could you please share whole code of your test or provide minimal prototype which reproduces issue
err = stmt1.exec(row...) - seems like here should be stmt1.Exec(row...)
type Bulk interface {
Append(row map[string]interface{})
Length() int
Full() bool
LowWaterMarkFull() bool
}
// this channel is shared globally.
var bulkChan = make(chan Bulk, 0)
// this struct implement `Bulk` interface and hold all data
// once bulk is full in collector, it will be pushed into `bulkChan`, then a `BulkWrite` will be fired.
// two rules:
// 1. Length() > config.BULK_SIZE (default 50,000)
// 2. last commit time is before now() - 10s(default) &&
// Length() > config.LOW_WATER_MARK(default 5,000)
// one collector goroutine will only have one bulk struct at the same time, no reuse. no share.
type ClickHouseBulk struct {
Rows []map[string]interface{}
DB string
Table string
}
func NewClickHouseBulk(db, table string) *ClickHouseBulk {
return &ClickHouseBulk{
DB: db,
Table: table,
Rows: make([]map[string]interface{}, 0),
}
}
type CkServer struct {
conn *sql.DB
...
}
func NewCkServer(...) (*CkServer, error){
// http://username:password@ip:port/database?enable_http_comression=1"
// http://username:password@ip:port/database"
// no more options to form a url
db, err := sql.Open("clickhouse", url)
if err != nil {
return nil, err
}
...
// ~~no goroutine will be started.~~
// return &CkServer{
// conn: db,
// }, nil
// forget about ping goroutine.
ck := &CkServer{
conn: db,
}
go ck.ping()
return ck, nil
}
func (ck *CkServer) ping() {
// ping this ckserver every one second with an increment backoff once failed.
// if failed 5 times or more, we will stop write to this ckserver.
}
func (ck *CkServer) BulkWrite(bulk *ClickHouseBulk) error {
// ...
key := bulk.DB + ":" + bulk.Table
if table, ok := ck.tables[key]; ok == true {
return table.bulkWrite(ck.conn, bulk)
} else {
// we only write to a single table with in one process. this branch
// will only be called once in the beginning. and will fetch table
// desc blockingly(no goroutine).
table, err := newCkTable(ck.conn, bulk.DB, bulk.Table, ck.logger)
if err != nil {
return err
}
ck.tables[key] = table
return table.bulkWrite(ck.conn, bulk)
}
}
func newCkTable(conn *sql.DB, db, table string, logger zerolog.Logger) (*ckTable, error) {
lg := logger.With().Str("db", db).Str("table", table).Logger()
sampled := util.GetBrustSampledLogger(lg)
cktable := &ckTable{db: db, table: table, logger: lg, sampledLogger: sampled}
//TODO: what if refresh failed the first time ?
err := cktable.refreshDescAndDDLOnce(conn)
if err != nil {
return nil, err
}
return cktable, nil
}
func (t *ckTable) bulkWrite(conn *sql.DB, bulk *ClickHouseBulk) error {
defer logx.RecoverPrintStack()
t.logger.Debug().Msg("bulkWrite start")
desc := t.getDesc()
if desc == nil {
t.logger.Debug().Msg("table desc is nil")
return errors.New(fmt.Sprintf("table desc is nil, db: %s, table: %s", t.db, t.table))
}
tx, err := conn.Begin()
if err != nil {
return err
}
// odd... this has to be added while using `github.com/kshvakov/clickhouse`, otherwise OOM
// with this driver. even comment this out, `Code:1000` remains
//defer tx.Rollback()
stmt, err := tx.Prepare(desc.insertDDL)
if err != nil {
return err
}
// comment this out, `Code:1000` remains
defer stmt.Close()
for _, msg := range bulk.Rows {
var row []interface{}
// .... process row here.
for _, key := range desc.keys {
row_desc, ok := desc.dMap[key]
if ok == false {
//TODO: should not happend, replace it with orderedMap.
t.sampledLogger.Error().Msg("desc key not in desc.dMap")
break
}
if value, ok := msg[key]; ok == true && value != nil {
row = append(row, value)
} else {
row = append(row, row_desc.Default)
}
}
if _, err := stmt.Exec(row...); err != nil {
t.sampledLogger.Error().Err(err).Msg("stmt.Exec failed.")
}
}
// commit time
start := time.Now()
err = tx.Commit()
ometrics.BulkCommitTimer.UpdateSince(start)
return err
}
// start many worker to consume `bulkCh`
for {
select {
case bulk := <-bulkCh:
err := ckSrv.BulkWrite(bulk)
if err != nil {
ckSrv.logger.Error().Err(err).Msg("bulk write failed")
}
}
}
// so the path is clear
// bulk := <- bulkCh
// ckServer.BulkWrite(bulk)
// ckTable.bulkWrite(bulk)
commet go ck.ping(), Code:1000 remains.
// go ck.ping()

same happens with me:
- clickhouse 19.8.5
- go 1.14
- go-clickhouse 1.3.0
I use conn string like that:
http://readwrite:*******@my-standalone-ch.ru:8123/default?read_timeout=10s&write_timeout=20s
2020.04.27 17:42:00.153602 [ 274 ] {} <Trace> HTTPHandler-factory: HTTP Request for HTTPHandler-factory. Method: POST, Address: *redacted*:27906, User-Agent: Go-http-client/1.1, Length: 7944, Content Type: , Transfer Encoding: identity
2020.04.27 17:42:00.153720 [ 274 ] {} <Trace> HTTPHandler: Request URI: /?database=default&default_format=TabSeparatedWithNamesAndTypes
2020.04.27 17:42:00.153875 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Debug> executeQuery: (from *redacted*:27906, user: readwrite) INSERT INTO *redacted*
2020.04.27 17:42:00.154050 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Debug> executeQuery: Query pipeline:
NullAndDoCopy
InputStreamFromASTInsertQuery
2020.04.27 17:42:00.154513 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Information> executeQuery: Read 19 rows, 7.50 KiB in 0.001 sec., 31167 rows/sec., 12.02 MiB/sec.
2020.04.27 17:42:00.154556 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Debug> MemoryTracker: Peak memory usage (total): 115.66 KiB.
2020.04.27 17:42:00.154615 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Debug> MemoryTracker: Peak memory usage (for query): 2.12 MiB.
2020.04.27 17:42:00.154623 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Information> HTTPHandler: Done processing query
2020.04.27 17:42:00.154938 [ 274 ] {} <Error> ServerErrorHandler: Poco::Exception. Code: 1000, e.code() = 104, e.displayText() = Connection reset by peer (version 19.8.5)
Hi @chobostar can you check if the problem stays on a newer version 1.5.0