Compress insertion data column by column to reduce memory usage
Is your feature request related to a problem? Please describe. After upgrading clickhouse-go to v2, find client memory usage increased when inserting data with compression into tables of 150 columns, almost doubled. We don't see memory increase when inserting data with compression into tables of 20 columns.
After investigating, we find v2 encode the whole block then compress the encode result, v1 encode and compress column by column, so v2 will cost more memory to store the encode result of whole block.
v1 compress https://github.com/ClickHouse/clickhouse-go/blob/v1/lib/data/block.go#L220-L235 v2 compress https://github.com/ClickHouse/clickhouse-go/blob/main/conn.go#L169-L185
Describe the solution you'd like v2 implement compression column by column just as v1 do
Describe alternatives you've considered
Additional context
Good spot, agree this needs to be done.
should be doable by simply passing a compressor into the Encode method. We can then compress each column as we encode it.
In our case, each batch inserts 5K rows and each row has 150 columns, the total uncompressed size is about 7MB. Each client server maintains about 130+ connections to backend clickhouse servers.
- V2's Encode reserved 7MB: https://github.com/ClickHouse/clickhouse-go/blob/9d4be497ed788fde5677bb39f1ad0a37e9fca7ba/conn.go#L99
- Compress reserved 7MB: https://github.com/ClickHouse/ch-go/blob/main/compress/writer.go#L22-L23
func (w *Writer) Compress(m Method, buf []byte) error {
maxSize := lz4.CompressBlockBound(len(buf))
w.Data = append(w.Data[:0], make([]byte, maxSize+headerSize)...)
The memory reserved for Encode and Compress will be (7MB + 7MB)*130 = 1820MB, which is not a small number.
After checking v1 code, v1 has a limitation (1M) for raw buffer to be compressed, so it seems don't need to compress column by column.
And based on the limitation of raw buffer for compress. A quick change of v2's sendData implementation, memory usage reduced to 1/3 v.s. original v2 for our case.
func (c *connect) sendData(block *proto.Block, name string) error {
c.debugf("[send data] compression=%t", c.compression)
c.buffer.PutByte(proto.ClientData)
c.buffer.PutString(name)
// Saving offset of compressible data.
start := len(c.buffer.Buf)
if c.revision > 0 {
c.buffer.PutUVarInt(1)
c.buffer.PutBool(false)
c.buffer.PutUVarInt(2)
c.buffer.PutInt32(-1)
c.buffer.PutUVarInt(0)
}
var rows int
if len(block.Columns) != 0 {
rows = block.Columns[0].Rows()
for _, col := range block.Columns[1:] {
cRows := col.Rows()
if rows != cRows {
return &proto.BlockError{
Op: "Encode",
Err: fmt.Errorf("mismatched len of columns - expected %d, recieved %d for col %s", rows, cRows, col.Name()),
}
}
}
}
c.buffer.PutUVarInt(uint64(len(block.Columns)))
c.buffer.PutUVarInt(uint64(rows))
for _, col := range block.Columns {
c.buffer.PutString(col.Name())
c.buffer.PutString(string(col.Type()))
if serialize, ok := col.(column.CustomSerialization); ok {
if err := serialize.WriteStatePrefix(c.buffer); err != nil {
return &proto.BlockError{
Op: "Encode",
Err: err,
ColumnName: col.Name(),
}
}
}
col.Encode(c.buffer)
if len(c.buffer.Buf) > 1048576 {
if c.compression != CompressionNone {
// Performing compression. Note: only blocks are compressed.
data := c.buffer.Buf[start:]
if err := c.compressor.Compress(compress.Method(c.compression), data); err != nil {
return errors.Wrap(err, "compress")
}
c.buffer.Buf = append(c.buffer.Buf[:start], c.compressor.Data...)
}
if err := c.flush(); err != nil {
return err
}
start = 0
c.buffer.Reset()
}
}
if c.compression != CompressionNone {
// Performing compression. Note: only blocks are compressed.
if len(c.buffer.Buf) > 0 {
data := c.buffer.Buf[start:]
if err := c.compressor.Compress(compress.Method(c.compression), data); err != nil {
return errors.Wrap(err, "compress")
}
c.buffer.Buf = append(c.buffer.Buf[:start], c.compressor.Data...)
}
}
if err := c.flush(); err != nil {
return err
}
defer func() {
c.buffer.Reset()
}()
return nil
}
@gingerwizard thank you.
I want to make this change i'm cautious re the potential performance impact. Ill raise a PR and do some tests.
BRanch here with these changes https://github.com/ClickHouse/clickhouse-go/tree/compress_by_column but I'm not happy with implementation