kafka icon indicating copy to clipboard operation
kafka copied to clipboard

Partition rebalance has serious issues

Open baconalot opened this issue 9 years ago • 4 comments

Given the following test program:

package main

import (
    "fmt"
    "log"
    "os"
    "time"

    "github.com/wvanbergen/kafka/consumergroup"
    "gopkg.in/Shopify/sarama.v1"
)

func main() {
    sarama.Logger = log.New(os.Stdout, "[Sarama]", log.LstdFlags)
    config := consumergroup.NewConfig()
    config.Offsets.Initial = sarama.OffsetOldest
    config.Offsets.ProcessingTimeout = 10 * time.Second

    consumer, consumerErr := consumergroup.JoinConsumerGroup("FOO_TEST_CAN_GO", []string{"testproducercango"}, []string{"127.0.0.1"}, config)
    if consumerErr != nil {
        log.Fatalln(consumerErr)
    }
    defer consumer.Close()

    go func() {
        for err := range consumer.Errors() {
            log.Println(err)
        }
    }()

    eventCount := 0
    messProcessed := make(map[string]int)

StreamLoop:
    for {
        select {
        case <-time.After(time.Second * 10):
            break StreamLoop
        case mess := <-consumer.Messages():
            fmt.Printf("Got event from stream. Topic: %v, Partition: %v, Offset: %v, Mess: %v \n", mess.Topic, mess.Partition, mess.Offset, string(mess.Value))
            eventCount += 1
            // Simulate processing time
            time.Sleep(2 * time.Second)
            consumer.CommitUpto(mess)
            messProcessed[string(mess.Value)] += 1
        }
    }

    log.Printf("Processed %d events.", eventCount)
    log.Printf("%+v", messProcessed)
}

With topic testproducercango having 20 messages called bladiebla1...bladiebla20 in 2 partitions. I get the following output when I start this program 4 times in parallel. Pid 0:

[Sarama]2015/04/07 17:37:39 Initializing new client
[Sarama]2015/04/07 17:37:39 Fetching metadata for all topics from broker localhost:9092
[Sarama]2015/04/07 17:37:39 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:39 Registered new broker #0 at localhost:9092
[Sarama]2015/04/07 17:37:39 Successfully initialized new client
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] Consumer instance registered (me-user:f3080828-f8b8-4e21-a3c4-52c1ac213840).
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] Currently registered consumers: 1
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango :: Claiming 2 of 2 partitions
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/0 :: Partition consumer starting at the oldest available offset.
[Sarama]2015/04/07 17:37:39 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: Partition consumer starting at the oldest available offset.
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 0, Mess: bladiebla2 
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 1, Mess: bladiebla3 
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 2, Mess: bladiebla4 
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/0 :: Stopping partition consumer at offset 6
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/0 :: Last processed offset: 1. Waiting up to 10s for another 5 messages to process...
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: Stopping partition consumer at offset 12
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: Last processed offset: -1. Waiting up to 10s for another 13 messages to process...
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: TIMEOUT waiting for offset 12. Last committed offset: -1
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] FAILED closing the offset manager: Not all offsets were committed before shutdown was completed!
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] Deregistered consumer instance me-user:f3080828-f8b8-4e21-a3c4-52c1ac213840.
[Sarama]2015/04/07 17:37:55 Closing Client
[Sarama]2015/04/07 17:37:55 Closed connection to broker localhost:9092
[Sarama]2015/04/07 17:37:55 Closed connection to broker localhost:9092
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x47a613]

goroutine 1 [running]:
github.com/wvanbergen/kafka/consumergroup.(*partitionOffsetTracker).markAsProcessed(0x0, 0x2, 0x0)
    /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:223 +0x123
github.com/wvanbergen/kafka/consumergroup.(*zookeeperOffsetManager).MarkAsProcessed(0xc20802c040, 0x6582b0, 0x11, 0x0, 0x2, 0xc208381e00)
    /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:152 +0x12b
github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).CommitUpto(0xc208056210, 0xc208041590, 0x0, 0x0)
    /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:232 +0xb1
main.main()
    /home/me/Development/Dev/go/src/test/foo/foo.go:44 +0xc6f

goroutine 5 [semacquire]:
sync.(*WaitGroup).Wait(0xc20801e140)
    /usr/local/go/src/sync/waitgroup.go:132 +0x169
github.com/samuel/go-zookeeper/zk.(*Conn).loop(0xc2080320d0)
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:227 +0x76d
github.com/samuel/go-zookeeper/zk.func·001()
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:145 +0x2c
created by github.com/samuel/go-zookeeper/zk.ConnectWithDialer
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:149 +0x44f

goroutine 7 [runnable]:
github.com/samuel/go-zookeeper/zk.(*Conn).sendLoop(0xc2080320d0, 0x7f9731560bb8, 0xc208038038, 0xc2080302a0, 0x0, 0x0)
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:412 +0xce9
github.com/samuel/go-zookeeper/zk.func·002()
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:212 +0x5a
created by github.com/samuel/go-zookeeper/zk.(*Conn).loop
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:215 +0x680

goroutine 17 [syscall, locked to thread]:
runtime.goexit()
    /usr/local/go/src/runtime/asm_amd64.s:2232 +0x1

Pid 1:

[Sarama]2015/04/07 17:37:45 Initializing new client
[Sarama]2015/04/07 17:37:45 Fetching metadata for all topics from broker localhost:9092
[Sarama]2015/04/07 17:37:45 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:45 Registered new broker #0 at localhost:9092
[Sarama]2015/04/07 17:37:45 Successfully initialized new client
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] Consumer instance registered (me-user:7c3f576d-063f-4399-b346-509180e0075d).
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 2
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Partition consumer starting at offset 2.
[Sarama]2015/04/07 17:37:45 Connected to broker localhost:9092
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 2, Mess: bladiebla4 
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 3, Mess: bladiebla8 
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 4, Mess: bladiebla12 
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 5, Mess: bladiebla14 
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/509180e0075d] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Stopping partition consumer at offset 6
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Last processed offset: 4. Waiting up to 10s for another 2 messages to process...
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 6, Mess: bladiebla19 
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 3
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 0 of 2 partitions
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 2
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Partition consumer starting at offset 3.
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 3, Mess: bladiebla6 
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/509180e0075d] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Stopping partition consumer at offset 12
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Last processed offset: 2. Waiting up to 10s for another 10 messages to process...
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 4, Mess: bladiebla7 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 5, Mess: bladiebla9 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 6, Mess: bladiebla10 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 7, Mess: bladiebla11 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 8, Mess: bladiebla13 
[Sarama]2015/04/07 17:38:13 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: TIMEOUT waiting for offset 12. Last committed offset: 3
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 1
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 2 of 2 partitions
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Partition consumer starting at offset 7.
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Partition consumer starting at offset 4.
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 9, Mess: bladiebla15 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 10, Mess: bladiebla16 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 11, Mess: bladiebla17 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 12, Mess: bladiebla18 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 4, Mess: bladiebla7 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 5, Mess: bladiebla9 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 6, Mess: bladiebla10 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 7, Mess: bladiebla11 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 8, Mess: bladiebla13 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 9, Mess: bladiebla15 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 10, Mess: bladiebla16 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 11, Mess: bladiebla17 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 12, Mess: bladiebla18 
[Sarama]2015/04/07 17:38:50 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Stopping partition consumer at offset -1
[Sarama]2015/04/07 17:38:50 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Stopping partition consumer at offset 12
2015/04/07 17:38:50 Processed 24 events.
2015/04/07 17:38:50 map[bladiebla13:2 bladiebla17:2 bladiebla6:1 bladiebla7:2 bladiebla16:2 bladiebla12:1 bladiebla8:1 bladiebla19:1 bladiebla11:2 bladiebla15:2 bladiebla4:1 bladiebla9:2 bladiebla10:2 bladiebla18:2 bladiebla14:1]
[Sarama]2015/04/07 17:38:51 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:51 [FOO_TEST_CAN_GO/509180e0075d] Deregistered consumer instance me-user:7c3f576d-063f-4399-b346-509180e0075d.
[Sarama]2015/04/07 17:38:51 Closing Client
[Sarama]2015/04/07 17:38:51 Closed connection to broker localhost:9092
[Sarama]2015/04/07 17:38:51 Closed connection to broker localhost:9092

Pid 2:

[Sarama]2015/04/07 17:37:52 Initializing new client
[Sarama]2015/04/07 17:37:52 Fetching metadata for all topics from broker localhost:9092
[Sarama]2015/04/07 17:37:52 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:52 Registered new broker #0 at localhost:9092
[Sarama]2015/04/07 17:37:52 Successfully initialized new client
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] Consumer instance registered (me-user:1f1f7e20-dbff-4c01-89d6-14644cac47d7).
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] Currently registered consumers: 3
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/14644cac47d7] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Partition consumer starting at offset 7.
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Stopping partition consumer at offset -1
[Sarama]2015/04/07 17:37:56 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] Currently registered consumers: 3
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Partition consumer starting at offset 7.
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Stopping partition consumer at offset -1
2015/04/07 17:38:02 Processed 0 events.
2015/04/07 17:38:02 map[]
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/14644cac47d7] Deregistered consumer instance me-user:1f1f7e20-dbff-4c01-89d6-14644cac47d7.
[Sarama]2015/04/07 17:38:02 Closing Client
[Sarama]2015/04/07 17:38:02 Closed connection to broker localhost:9092
[Sarama]2015/04/07 17:38:02 Closed connection to broker localhost:9092

Pid 3:

[Sarama]2015/04/07 17:37:55 Initializing new client
[Sarama]2015/04/07 17:37:55 Fetching metadata for all topics from broker localhost:9092
[Sarama]2015/04/07 17:37:55 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:55 Registered new broker #0 at localhost:9092
[Sarama]2015/04/07 17:37:55 Successfully initialized new client
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] Consumer instance registered (me-user:6ebed8bf-285e-4f9b-9007-d91a72837cd0).
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] Currently registered consumers: 3
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/1 :: Partition consumer starting at the oldest available offset.
[Sarama]2015/04/07 17:37:55 Connected to broker localhost:9092
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 0, Mess: bladiebla0 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 1, Mess: bladiebla1 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 2, Mess: bladiebla5 
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 3, Mess: bladiebla6 
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/1 :: Stopping partition consumer at offset 12
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/1 :: Last processed offset: 2. Waiting up to 10s for another 10 messages to process...
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] Currently registered consumers: 2
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/0 :: Partition consumer starting at offset 7.
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/0 :: Stopping partition consumer at offset -1
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/d91a72837cd0] Deregistered consumer instance me-user:6ebed8bf-285e-4f9b-9007-d91a72837cd0.
[Sarama]2015/04/07 17:38:03 Closing Client
[Sarama]2015/04/07 17:38:03 Closed connection to broker localhost:9092
[Sarama]2015/04/07 17:38:03 Closed connection to broker localhost:9092
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x47a613]

goroutine 1 [running]:
github.com/wvanbergen/kafka/consumergroup.(*partitionOffsetTracker).markAsProcessed(0x0, 0x3, 0x0)
    /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:223 +0x123
github.com/wvanbergen/kafka/consumergroup.(*zookeeperOffsetManager).MarkAsProcessed(0xc20802c040, 0x6582b0, 0x11, 0x1, 0x3, 0xc208381700)
    /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:152 +0x12b
github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).CommitUpto(0xc208056210, 0xc208040fa0, 0x0, 0x0)
    /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:232 +0xb1
main.main()
    /home/me/Development/Dev/go/src/test/foo/foo.go:44 +0xc6f

goroutine 5 [semacquire]:
sync.(*WaitGroup).Wait(0xc20801e140)
    /usr/local/go/src/sync/waitgroup.go:132 +0x169
github.com/samuel/go-zookeeper/zk.(*Conn).loop(0xc2080320d0)
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:227 +0x76d
github.com/samuel/go-zookeeper/zk.func·001()
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:145 +0x2c
created by github.com/samuel/go-zookeeper/zk.ConnectWithDialer
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:149 +0x44f

goroutine 7 [runnable]:
github.com/samuel/go-zookeeper/zk.(*Conn).sendLoop(0xc2080320d0, 0x7f7bc8842bb8, 0xc208038038, 0xc2080302a0, 0x0, 0x0)
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:412 +0xce9
github.com/samuel/go-zookeeper/zk.func·002()
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:212 +0x5a
created by github.com/samuel/go-zookeeper/zk.(*Conn).loop
    /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:215 +0x680

goroutine 17 [syscall, locked to thread]:
runtime.goexit()
    /usr/local/go/src/runtime/asm_amd64.s:2232 +0x1

Issues in bold. For now I see two:

  • markAsProcessed acts on already closed partition
  • Messages are processed multiple times!

baconalot avatar Apr 07 '15 16:04 baconalot

Can you try #72 by any chance? The way the rebalance works has been improved significantly.

wvanbergen avatar Aug 18 '15 12:08 wvanbergen

hi wvanbergen

I tried to run #72 but found it only work with kafka 0.8.2 because of sarama's Client.Coordinator in interface Client. is it necessary?

// Coordinator returns the coordinating broker for a consumer group. It will return a locally cached // value if it's available. You can call RefreshCoordinator to update the cached value. // This function only works on Kafka 0.8.2 and higher. Coordinator(consumerGroup string) (*Broker, error)

teou avatar Aug 26 '15 02:08 teou

Yes; the new implementation uses Kafka to commit processed offsets, which requires 0.8.2. I may add Zookeeper support but for now that is not really on my list.

If you can't upgrade to Kafka 0.8.2, make sure you're using the latest master version of consumergroup, because a bunch of bugs were recently fixed in it's implementation which may have been the cause of this panic.

wvanbergen avatar Aug 26 '15 09:08 wvanbergen

we can not upgrade to 0.8.2 but thank you for your work after all

在 2015年8月26日,17:14,Willem van Bergen [email protected] 写道:

Yes; the new implementation uses Kafka to commit processed offsets, which requires 0.8.2. I may add Zookeeper support but for now that is not really on my list.

If you can't upgrade to Kafka 0.8.2, make sure you're using the latest master version of consumergroup, because a bunch of bugs were recently fixed in it's implementation which may have been the cause of this panic.

— Reply to this email directly or view it on GitHub.

teou avatar Aug 26 '15 09:08 teou