nbio-examples icon indicating copy to clipboard operation
nbio-examples copied to clipboard

working redis client with pool requesting comment

Open kolinfluence opened this issue 10 months ago • 0 comments

@lesismal i got it working, can u pls give your comments?

i put client pool Put under onclose BUT i will check on how to do keep alive can you pls check and comment? thx

        defer client.pool.Put(tlsConn) //if i comment this line it will work
package main

import (
        "sync"
        "fmt"
        "github.com/lesismal/nbio"
        "github.com/lesismal/nbio/extension/tls"
        "log"
        "strings"
        "sync/atomic"
        "time"
)

type RedisClient struct {
        g          *nbio.Gopher
        addr       string
        tlsConfig  *tls.Config
        responseCh chan []byte
        errorCh    chan error
        pool       *ConnPool
}


var (
        qps int64 // Global counter for queries per second
        tlsConfig = &tls.Config{
                InsecureSkipVerify: true,
        }
)


// A simple structure for a connection pool.
type ConnPool struct {
        pool    chan *tls.Conn
        addr    string
        config  *tls.Config
        maxConn int
        mu      sync.Mutex
}


func NewRedisClient(addr string, tlsConfig *tls.Config, pool *ConnPool) *RedisClient {
        client := &RedisClient{
                g:          nbio.NewGopher(nbio.Config{}),
                addr:       addr,
                tlsConfig:  tlsConfig,
                responseCh: make(chan []byte, 1),
                errorCh:    make(chan error, 1),
                pool:       pool,
        }
        client.setupHandlers()
        return client
}

// NewConnPool creates a new connection pool.
func NewConnPool(addr string, config *tls.Config, maxConn int) *ConnPool {
        return &ConnPool{
                pool:    make(chan *tls.Conn, maxConn),
                addr:    addr,
                config:  config,
                maxConn: maxConn,
        }
}

// Get acquires a connection from the pool.
func (p *ConnPool) Get() (*tls.Conn, error) {
        select {
        case conn := <-p.pool:
                return conn, nil
        default:
                // Pool is empty, create a new connection
                return p.DialNew()
        }
}

func (p *ConnPool) DialNew() (*tls.Conn, error) {
        conn, err := tls.Dial("tcp", p.addr, p.config)
        if err != nil {
                return nil, err
        }
        return conn, nil
}


// Put returns a connection to the pool.
func (p *ConnPool) Put(conn *tls.Conn) {
        select {
        case p.pool <- conn:
                // Connection returned to the pool
log.Printf("pool returned")
        default:
log.Printf("clossssss")
                // Pool is full, close the connection
                conn.Close()
        }
}

// CloseAll drains the pool and closes all connections.
func (p *ConnPool) CloseAll() {
        for {
                select {
                case conn := <-p.pool:
                        conn.Close()
                default:
                        return
                }
        }
}

/*
func NewConnPool(addr string, config *tls.Config, maxConn int) *ConnPool {
        client := &RedisClient{
                g:         nbio.NewGopher(nbio.Config{}),
                addr:      addr,
                tlsConfig: tlsConfig,
                responseCh: make(chan []byte, 1), // Buffer of 1 for non-blocking operation
                errorCh:    make(chan error, 1),
        }
        client.setupHandlers()
        return client
}
*/

func (client *RedisClient) setupHandlers() {
        isClient := true

        client.g.OnOpen(tls.WrapOpen(client.tlsConfig, isClient, func(c *nbio.Conn, tlsConn *tls.Conn) {

                log.Printf("connection open")
                // Initialize connection setup if necessary
        }))
        client.g.OnClose(tls.WrapClose(func(c *nbio.Conn, tlsConn *tls.Conn, err error) {
                log.Printf("connection closed")
                client.errorCh <- err
                client.pool.Put(tlsConn)

        }))
        client.g.OnData(tls.WrapData(func(c *nbio.Conn, tlsConn *tls.Conn, data []byte) {
                log.Printf("data = %s", data)
                // Handle incoming data, Redis protocol
                client.responseCh <- data
        }))
}

func (client *RedisClient) Start() error {
        return client.g.Start()
}

func (client *RedisClient) Stop() {
        client.g.Stop()
        close(client.responseCh)
        close(client.errorCh)
}

func (client *RedisClient) Do(cmd string, args ...string) (interface{}, error) {
        fullCmd := fmt.Sprintf("*%d\r\n$%d\r\n%s\r\n", len(args)+1, len(cmd), cmd)
        for _, arg := range args {
                fullCmd += fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg)
        }

        atomic.AddInt64(&qps, 1)

        // Use the connection pool to get a TLS connection
        tlsConn, err := client.pool.Get()
        if err != nil {
                return nil, fmt.Errorf("failed to get connection: %v", err)
        }

        //defer client.pool.Put(tlsConn)

        // Convert *tls.Conn to *tls.Conn for use with NBIO
        nbConn, err := nbio.NBConn(tlsConn.Conn())
        if err != nil {
                return nil, fmt.Errorf("failed to convert connection: %v", err)
        }

        // step 3: set tls.Conn and nbio.Conn to each other, and add nbio.Conn to the gopher
        isNonblock := true
        nbConn.SetSession(tlsConn)
        tlsConn.ResetConn(nbConn, isNonblock)
        client.g.AddConn(nbConn)

        // step 4: write data here or in the OnOpen handler or anywhere
        _, err = tlsConn.Write([]byte(fullCmd))
        if err != nil {
                return nil, err
        }

        /*
        //c, err := client.g.Dial(client.addr)
        c, err := tls.Dial("tcp", client.addr, client.tlsConfig)
        if err != nil {
                return nil, err
        }
        defer c.Close()
        */

        select {
        case response := <-client.responseCh:
                return parseResponse(response)
        case err := <-client.errorCh:
                return nil, err
        }
}

func parseResponse(data []byte) (interface{}, error) {
        if len(data) == 0 {
                return nil, fmt.Errorf("no response")
        }

        switch data[0] {
        case '-':
                return string(data[1:]), fmt.Errorf("error response: %s", data[1:])
        case '+', ':':
                return string(data[1:]), nil
        case '$':
                parts := strings.SplitN(string(data[1:]), "\r\n", 2)
                if parts[0] == "-1" {
                        return nil, nil // Null bulk response
                }
                return parts[1], nil
        case '*':
                lines := strings.Split(string(data[1:]), "\r\n")
                var count int
                _, err := fmt.Sscanf(lines[0], "%d", &count)
                if err != nil || count <= 0 {
                        return nil, err
                }
                results := make([]interface{}, count)
                for i := 0; i < count; i++ {
                        results[i] = lines[i+1] // Simplification, might need refinement
                }
                return results, nil
        default:
                return nil, fmt.Errorf("unknown response type: %c", data[0])
        }
}

func main() {
        // TLS configuration for secure Redis connection, adjust as necessary.
        addr := "127.0.0.1:6380"
        pool := NewConnPool(addr, tlsConfig, 3) // Pool with max 3 connections
        client := NewRedisClient(addr, tlsConfig, pool)

        err := client.Start()
        if err != nil {
                log.Fatalf("Failed to start client: %v", err)
        }
        defer client.Stop()

        log.Printf("here1")

        // Use the client
        resp, err := client.Do("SET", "key", "value")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("SET Response: %v\n", resp)
        resp, err = client.Do("SET", "key", "value")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("SET Response: %v\n", resp)
        resp, err = client.Do("SET", "key", "value")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("SET Response: %v\n", resp)
        resp, err = client.Do("SET", "a", "b")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("SET Response: %v\n", resp)

        log.Printf("here2")
        resp2, err := client.Do("GET", "key")
        if err != nil {
                log.Fatalf("Failed to execute command: %v", err)
        }
        fmt.Printf("GET Response: %v\n", resp2)

        log.Printf("here3")
        // Keep the main goroutine alive for a short time to ensure the response is processed
        time.Sleep(1 * time.Second)
}

kolinfluence avatar Mar 25 '24 07:03 kolinfluence