nbio-examples
nbio-examples copied to clipboard
working redis client with pool requesting comment
@lesismal i got it working, can u pls give your comments?
i put client pool Put under onclose BUT i will check on how to do keep alive can you pls check and comment? thx
defer client.pool.Put(tlsConn) //if i comment this line it will work
package main
import (
"sync"
"fmt"
"github.com/lesismal/nbio"
"github.com/lesismal/nbio/extension/tls"
"log"
"strings"
"sync/atomic"
"time"
)
type RedisClient struct {
g *nbio.Gopher
addr string
tlsConfig *tls.Config
responseCh chan []byte
errorCh chan error
pool *ConnPool
}
var (
qps int64 // Global counter for queries per second
tlsConfig = &tls.Config{
InsecureSkipVerify: true,
}
)
// A simple structure for a connection pool.
type ConnPool struct {
pool chan *tls.Conn
addr string
config *tls.Config
maxConn int
mu sync.Mutex
}
func NewRedisClient(addr string, tlsConfig *tls.Config, pool *ConnPool) *RedisClient {
client := &RedisClient{
g: nbio.NewGopher(nbio.Config{}),
addr: addr,
tlsConfig: tlsConfig,
responseCh: make(chan []byte, 1),
errorCh: make(chan error, 1),
pool: pool,
}
client.setupHandlers()
return client
}
// NewConnPool creates a new connection pool.
func NewConnPool(addr string, config *tls.Config, maxConn int) *ConnPool {
return &ConnPool{
pool: make(chan *tls.Conn, maxConn),
addr: addr,
config: config,
maxConn: maxConn,
}
}
// Get acquires a connection from the pool.
func (p *ConnPool) Get() (*tls.Conn, error) {
select {
case conn := <-p.pool:
return conn, nil
default:
// Pool is empty, create a new connection
return p.DialNew()
}
}
func (p *ConnPool) DialNew() (*tls.Conn, error) {
conn, err := tls.Dial("tcp", p.addr, p.config)
if err != nil {
return nil, err
}
return conn, nil
}
// Put returns a connection to the pool.
func (p *ConnPool) Put(conn *tls.Conn) {
select {
case p.pool <- conn:
// Connection returned to the pool
log.Printf("pool returned")
default:
log.Printf("clossssss")
// Pool is full, close the connection
conn.Close()
}
}
// CloseAll drains the pool and closes all connections.
func (p *ConnPool) CloseAll() {
for {
select {
case conn := <-p.pool:
conn.Close()
default:
return
}
}
}
/*
func NewConnPool(addr string, config *tls.Config, maxConn int) *ConnPool {
client := &RedisClient{
g: nbio.NewGopher(nbio.Config{}),
addr: addr,
tlsConfig: tlsConfig,
responseCh: make(chan []byte, 1), // Buffer of 1 for non-blocking operation
errorCh: make(chan error, 1),
}
client.setupHandlers()
return client
}
*/
func (client *RedisClient) setupHandlers() {
isClient := true
client.g.OnOpen(tls.WrapOpen(client.tlsConfig, isClient, func(c *nbio.Conn, tlsConn *tls.Conn) {
log.Printf("connection open")
// Initialize connection setup if necessary
}))
client.g.OnClose(tls.WrapClose(func(c *nbio.Conn, tlsConn *tls.Conn, err error) {
log.Printf("connection closed")
client.errorCh <- err
client.pool.Put(tlsConn)
}))
client.g.OnData(tls.WrapData(func(c *nbio.Conn, tlsConn *tls.Conn, data []byte) {
log.Printf("data = %s", data)
// Handle incoming data, Redis protocol
client.responseCh <- data
}))
}
func (client *RedisClient) Start() error {
return client.g.Start()
}
func (client *RedisClient) Stop() {
client.g.Stop()
close(client.responseCh)
close(client.errorCh)
}
func (client *RedisClient) Do(cmd string, args ...string) (interface{}, error) {
fullCmd := fmt.Sprintf("*%d\r\n$%d\r\n%s\r\n", len(args)+1, len(cmd), cmd)
for _, arg := range args {
fullCmd += fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg)
}
atomic.AddInt64(&qps, 1)
// Use the connection pool to get a TLS connection
tlsConn, err := client.pool.Get()
if err != nil {
return nil, fmt.Errorf("failed to get connection: %v", err)
}
//defer client.pool.Put(tlsConn)
// Convert *tls.Conn to *tls.Conn for use with NBIO
nbConn, err := nbio.NBConn(tlsConn.Conn())
if err != nil {
return nil, fmt.Errorf("failed to convert connection: %v", err)
}
// step 3: set tls.Conn and nbio.Conn to each other, and add nbio.Conn to the gopher
isNonblock := true
nbConn.SetSession(tlsConn)
tlsConn.ResetConn(nbConn, isNonblock)
client.g.AddConn(nbConn)
// step 4: write data here or in the OnOpen handler or anywhere
_, err = tlsConn.Write([]byte(fullCmd))
if err != nil {
return nil, err
}
/*
//c, err := client.g.Dial(client.addr)
c, err := tls.Dial("tcp", client.addr, client.tlsConfig)
if err != nil {
return nil, err
}
defer c.Close()
*/
select {
case response := <-client.responseCh:
return parseResponse(response)
case err := <-client.errorCh:
return nil, err
}
}
func parseResponse(data []byte) (interface{}, error) {
if len(data) == 0 {
return nil, fmt.Errorf("no response")
}
switch data[0] {
case '-':
return string(data[1:]), fmt.Errorf("error response: %s", data[1:])
case '+', ':':
return string(data[1:]), nil
case '$':
parts := strings.SplitN(string(data[1:]), "\r\n", 2)
if parts[0] == "-1" {
return nil, nil // Null bulk response
}
return parts[1], nil
case '*':
lines := strings.Split(string(data[1:]), "\r\n")
var count int
_, err := fmt.Sscanf(lines[0], "%d", &count)
if err != nil || count <= 0 {
return nil, err
}
results := make([]interface{}, count)
for i := 0; i < count; i++ {
results[i] = lines[i+1] // Simplification, might need refinement
}
return results, nil
default:
return nil, fmt.Errorf("unknown response type: %c", data[0])
}
}
func main() {
// TLS configuration for secure Redis connection, adjust as necessary.
addr := "127.0.0.1:6380"
pool := NewConnPool(addr, tlsConfig, 3) // Pool with max 3 connections
client := NewRedisClient(addr, tlsConfig, pool)
err := client.Start()
if err != nil {
log.Fatalf("Failed to start client: %v", err)
}
defer client.Stop()
log.Printf("here1")
// Use the client
resp, err := client.Do("SET", "key", "value")
if err != nil {
log.Fatalf("Failed to execute command: %v", err)
}
fmt.Printf("SET Response: %v\n", resp)
resp, err = client.Do("SET", "key", "value")
if err != nil {
log.Fatalf("Failed to execute command: %v", err)
}
fmt.Printf("SET Response: %v\n", resp)
resp, err = client.Do("SET", "key", "value")
if err != nil {
log.Fatalf("Failed to execute command: %v", err)
}
fmt.Printf("SET Response: %v\n", resp)
resp, err = client.Do("SET", "a", "b")
if err != nil {
log.Fatalf("Failed to execute command: %v", err)
}
fmt.Printf("SET Response: %v\n", resp)
log.Printf("here2")
resp2, err := client.Do("GET", "key")
if err != nil {
log.Fatalf("Failed to execute command: %v", err)
}
fmt.Printf("GET Response: %v\n", resp2)
log.Printf("here3")
// Keep the main goroutine alive for a short time to ensure the response is processed
time.Sleep(1 * time.Second)
}