go-clickhouse icon indicating copy to clipboard operation
go-clickhouse copied to clipboard

CK server `Code:1000` error

Open oliveagle opened this issue 6 years ago • 7 comments

initially, we use github.com/kshvakov/clickhouse for a little while. but we found it write data much slower than java http client (the only one listed in ck document). (snmpdump shows 10x smaller package size compare to java http client)

image

image

so we replaced it with this driver and outcome is good (similar performance as java http client). there are only minor flaws: once we start the program ck server pop out many Code: 1000 error. which never happens before. even with github.com/kshvakov/clickhouse or java client.

since db/sql provider high level abstraction. so we don't touch the code to write clickhouse while replacing the driver. we only modifed some args to setup the connection.

main code looks like this

var conn *sql.DB

var bulkChan = make(bulk, 0)

func (t *ckTable) bulkWrite(conn *sql.DB, bulk *ClickhouseBulk) error {
         tx, err := conn.Begin()
         if err != nil {
         	return err
         }

         // odd... this has to be added while using `github.com/kshvakov/clickhouse`, otherwise OOM
         // with this driver.  even comment this out, `Code:1000` remains
         defer tx.Rollback()

         stmt1, err := tx.Prepare()
         if err != nil {
          	return err
         }

         // comment this out, `Code:1000` remains
         defer stmt1.Close()

		for _, row := range bulk {
             err = stmt1.exec(row...)
             // sampled log err
         }
         return tx.Commit()
}

// worker goroutine
go func(){
    for blk := range bulkChan {
              table := tables[blk.table]
              table.bulkWrite(conn, bulk)
    }
}()

no where to handle connection anywhere in our code.... and the worker goroutine never exit

image

oliveagle avatar Jan 10 '19 08:01 oliveagle

How you do create a connection? can you please share options? how often you put data into bulkChan? Could you please share whole code of your test or provide minimal prototype which reproduces issue

bgaifullin avatar Jan 10 '19 10:01 bgaifullin

err = stmt1.exec(row...) - seems like here should be stmt1.Exec(row...)

bgaifullin avatar Jan 10 '19 10:01 bgaifullin


type Bulk interface {
	Append(row map[string]interface{})
	Length() int
	Full() bool
	LowWaterMarkFull() bool
}

// this channel is shared globally.
var bulkChan = make(chan Bulk, 0)



// this struct implement `Bulk` interface and hold all data
// once bulk is full in collector, it will be pushed into `bulkChan`, then a `BulkWrite` will be fired.
//  two rules: 
//       1. Length() > config.BULK_SIZE (default 50,000)
//       2.  last commit time  is before now() - 10s(default) && 
//                Length() > config.LOW_WATER_MARK(default 5,000)
// one collector goroutine will only have one bulk struct at the same time, no reuse. no share.
type ClickHouseBulk struct {
	Rows  []map[string]interface{}
	DB    string
	Table string
}
func NewClickHouseBulk(db, table string) *ClickHouseBulk {
	return &ClickHouseBulk{
		DB:    db,
		Table: table,
		Rows:  make([]map[string]interface{}, 0),
	}
}



type CkServer struct {
	conn          *sql.DB
       ...
}
func NewCkServer(...) (*CkServer, error){
    // http://username:password@ip:port/database?enable_http_comression=1"
    // http://username:password@ip:port/database"
    // no more options to form a url
    db, err := sql.Open("clickhouse", url)
    if err != nil {
        return nil, err
    }
    ...
    // ~~no goroutine will be started.~~
    // return &CkServer{
    //    conn: db,
    // }, nil
    // forget about ping goroutine.
   ck := &CkServer{
        conn: db,
   }
   go ck.ping()
   return ck, nil
}

func (ck *CkServer) ping() {
     // ping this ckserver every one second with an increment backoff once failed.
    // if failed 5 times or more,  we will stop write to this ckserver.
}

func (ck *CkServer) BulkWrite(bulk *ClickHouseBulk) error {
        // ... 
	key := bulk.DB + ":" + bulk.Table
	if table, ok := ck.tables[key]; ok == true {
		return table.bulkWrite(ck.conn, bulk)
	} else {
                // we only write to a single table with in one process. this branch 
               // will only be called once in the beginning. and will fetch table 
               // desc blockingly(no goroutine).
		table, err := newCkTable(ck.conn, bulk.DB, bulk.Table, ck.logger)
		if err != nil {
			return err
		}
		ck.tables[key] = table
		return table.bulkWrite(ck.conn, bulk)
	}
}

func newCkTable(conn *sql.DB, db, table string, logger zerolog.Logger) (*ckTable, error) {
	lg := logger.With().Str("db", db).Str("table", table).Logger()

	sampled := util.GetBrustSampledLogger(lg)

	cktable := &ckTable{db: db, table: table, logger: lg, sampledLogger: sampled}

	//TODO:  what if refresh failed the first time ?
	err := cktable.refreshDescAndDDLOnce(conn)
	if err != nil {
		return nil, err
	}
	return cktable, nil
}

func (t *ckTable) bulkWrite(conn *sql.DB, bulk *ClickHouseBulk) error {
	defer logx.RecoverPrintStack()

	t.logger.Debug().Msg("bulkWrite start")

	desc := t.getDesc()
	if desc == nil {
		t.logger.Debug().Msg("table desc is nil")
		return errors.New(fmt.Sprintf("table desc is nil, db: %s, table: %s", t.db, t.table))
	}

	tx, err := conn.Begin()
	if err != nil {
		return err
	}

         // odd... this has to be added while using `github.com/kshvakov/clickhouse`, otherwise OOM
         // with this driver.  even comment this out, `Code:1000` remains
	//defer tx.Rollback() 

	stmt, err := tx.Prepare(desc.insertDDL)
	if err != nil {
		return err
	}

        // comment this out, `Code:1000` remains
	defer stmt.Close()

	for _, msg := range bulk.Rows {
		var row []interface{}
                // .... process row here.
                for _, key := range desc.keys {
                        row_desc, ok := desc.dMap[key]
			if ok == false {
				//TODO: should not happend, replace it with orderedMap.
				t.sampledLogger.Error().Msg("desc key not in desc.dMap")
				break
			}
                        if value, ok := msg[key]; ok == true && value != nil  {
                                 row = append(row, value)
                        } else {
                                 row = append(row, row_desc.Default)
                        }
                }

		if _, err := stmt.Exec(row...); err != nil {
			t.sampledLogger.Error().Err(err).Msg("stmt.Exec failed.")
		}
	}

	// commit time
	start := time.Now()
	err = tx.Commit()
	ometrics.BulkCommitTimer.UpdateSince(start)
	return err
}


// start many worker to consume `bulkCh`
for {
      select {
          case bulk := <-bulkCh:
                 err := ckSrv.BulkWrite(bulk)
                 if err != nil {
                        ckSrv.logger.Error().Err(err).Msg("bulk write failed")
                 }
      }
}

// so the path is clear
//  bulk  := <- bulkCh
//  ckServer.BulkWrite(bulk)
//  ckTable.bulkWrite(bulk)

oliveagle avatar Jan 11 '19 06:01 oliveagle

commet go ck.ping(), Code:1000 remains.

// go ck.ping()

oliveagle avatar Jan 11 '19 06:01 oliveagle

image

oliveagle avatar Jan 11 '19 07:01 oliveagle

same happens with me:

  • clickhouse 19.8.5
  • go 1.14
  • go-clickhouse 1.3.0

I use conn string like that:

http://readwrite:*******@my-standalone-ch.ru:8123/default?read_timeout=10s&write_timeout=20s
2020.04.27 17:42:00.153602 [ 274 ] {} <Trace> HTTPHandler-factory: HTTP Request for HTTPHandler-factory. Method: POST, Address: *redacted*:27906, User-Agent: Go-http-client/1.1, Length: 7944, Content Type: , Transfer Encoding: identity
2020.04.27 17:42:00.153720 [ 274 ] {} <Trace> HTTPHandler: Request URI: /?database=default&default_format=TabSeparatedWithNamesAndTypes
2020.04.27 17:42:00.153875 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Debug> executeQuery: (from *redacted*:27906, user: readwrite) INSERT INTO *redacted*
2020.04.27 17:42:00.154050 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Debug> executeQuery: Query pipeline:
NullAndDoCopy
 InputStreamFromASTInsertQuery

2020.04.27 17:42:00.154513 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Information> executeQuery: Read 19 rows, 7.50 KiB in 0.001 sec., 31167 rows/sec., 12.02 MiB/sec.
2020.04.27 17:42:00.154556 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Debug> MemoryTracker: Peak memory usage (total): 115.66 KiB.
2020.04.27 17:42:00.154615 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Debug> MemoryTracker: Peak memory usage (for query): 2.12 MiB.
2020.04.27 17:42:00.154623 [ 274 ] {91a585fc-a759-4f61-8351-6ef5883d68ac} <Information> HTTPHandler: Done processing query
2020.04.27 17:42:00.154938 [ 274 ] {} <Error> ServerErrorHandler: Poco::Exception. Code: 1000, e.code() = 104, e.displayText() = Connection reset by peer (version 19.8.5)

chobostar avatar Apr 27 '20 17:04 chobostar

Hi @chobostar can you check if the problem stays on a newer version 1.5.0

DoubleDi avatar Apr 14 '21 20:04 DoubleDi