clickhouse-go icon indicating copy to clipboard operation
clickhouse-go copied to clipboard

Compress insertion data column by column to reduce memory usage

Open qixiaogang opened this issue 3 years ago • 5 comments

Is your feature request related to a problem? Please describe. After upgrading clickhouse-go to v2, find client memory usage increased when inserting data with compression into tables of 150 columns, almost doubled. We don't see memory increase when inserting data with compression into tables of 20 columns.

After investigating, we find v2 encode the whole block then compress the encode result, v1 encode and compress column by column, so v2 will cost more memory to store the encode result of whole block.

v1 compress https://github.com/ClickHouse/clickhouse-go/blob/v1/lib/data/block.go#L220-L235 v2 compress https://github.com/ClickHouse/clickhouse-go/blob/main/conn.go#L169-L185

Describe the solution you'd like v2 implement compression column by column just as v1 do

Describe alternatives you've considered

Additional context

qixiaogang avatar Sep 20 '22 05:09 qixiaogang

Good spot, agree this needs to be done.

gingerwizard avatar Sep 23 '22 19:09 gingerwizard

should be doable by simply passing a compressor into the Encode method. We can then compress each column as we encode it.

gingerwizard avatar Sep 23 '22 19:09 gingerwizard

In our case, each batch inserts 5K rows and each row has 150 columns, the total uncompressed size is about 7MB. Each client server maintains about 130+ connections to backend clickhouse servers.

  • V2's Encode reserved 7MB: https://github.com/ClickHouse/clickhouse-go/blob/9d4be497ed788fde5677bb39f1ad0a37e9fca7ba/conn.go#L99
  • Compress reserved 7MB: https://github.com/ClickHouse/ch-go/blob/main/compress/writer.go#L22-L23
func (w *Writer) Compress(m Method, buf []byte) error {
	maxSize := lz4.CompressBlockBound(len(buf))
	w.Data = append(w.Data[:0], make([]byte, maxSize+headerSize)...)

The memory reserved for Encode and Compress will be (7MB + 7MB)*130 = 1820MB, which is not a small number.

qixiaogang avatar Sep 26 '22 10:09 qixiaogang

After checking v1 code, v1 has a limitation (1M) for raw buffer to be compressed, so it seems don't need to compress column by column.

And based on the limitation of raw buffer for compress. A quick change of v2's sendData implementation, memory usage reduced to 1/3 v.s. original v2 for our case.

func (c *connect) sendData(block *proto.Block, name string) error {
	c.debugf("[send data] compression=%t", c.compression)
	c.buffer.PutByte(proto.ClientData)
	c.buffer.PutString(name)
	// Saving offset of compressible data.
	start := len(c.buffer.Buf)
	if c.revision > 0 {
		c.buffer.PutUVarInt(1)
		c.buffer.PutBool(false)
		c.buffer.PutUVarInt(2)
		c.buffer.PutInt32(-1)
		c.buffer.PutUVarInt(0)
	}
	var rows int
	if len(block.Columns) != 0 {
		rows = block.Columns[0].Rows()
		for _, col := range block.Columns[1:] {
			cRows := col.Rows()
			if rows != cRows {
				return &proto.BlockError{
					Op:  "Encode",
					Err: fmt.Errorf("mismatched len of columns - expected %d, recieved %d for col %s", rows, cRows, col.Name()),
				}
			}
		}
	}
	c.buffer.PutUVarInt(uint64(len(block.Columns)))
	c.buffer.PutUVarInt(uint64(rows))
	for _, col := range block.Columns {
		c.buffer.PutString(col.Name())
		c.buffer.PutString(string(col.Type()))
		if serialize, ok := col.(column.CustomSerialization); ok {
			if err := serialize.WriteStatePrefix(c.buffer); err != nil {
				return &proto.BlockError{
					Op:         "Encode",
					Err:        err,
					ColumnName: col.Name(),
				}
			}
		}
		col.Encode(c.buffer)
		if len(c.buffer.Buf) > 1048576 {
			if c.compression != CompressionNone {
				// Performing compression. Note: only blocks are compressed.
				data := c.buffer.Buf[start:]
				if err := c.compressor.Compress(compress.Method(c.compression), data); err != nil {
					return errors.Wrap(err, "compress")
				}
				c.buffer.Buf = append(c.buffer.Buf[:start], c.compressor.Data...)
			}
			if err := c.flush(); err != nil {
				return err
			}
			start = 0
			c.buffer.Reset()
		}
	}

	if c.compression != CompressionNone {
		// Performing compression. Note: only blocks are compressed.
                if len(c.buffer.Buf) > 0 {
		  data := c.buffer.Buf[start:]
		  if err := c.compressor.Compress(compress.Method(c.compression), data); err != nil {
			  return errors.Wrap(err, "compress")
		  }
		  c.buffer.Buf = append(c.buffer.Buf[:start], c.compressor.Data...)
                }
	}

	if err := c.flush(); err != nil {
		return err
	}
	defer func() {
		c.buffer.Reset()
	}()
	return nil
}

qixiaogang avatar Sep 26 '22 10:09 qixiaogang

@gingerwizard thank you.

qixiaogang avatar Sep 26 '22 10:09 qixiaogang

I want to make this change i'm cautious re the potential performance impact. Ill raise a PR and do some tests.

gingerwizard avatar Nov 02 '22 15:11 gingerwizard

BRanch here with these changes https://github.com/ClickHouse/clickhouse-go/tree/compress_by_column but I'm not happy with implementation

gingerwizard avatar Nov 02 '22 17:11 gingerwizard