Promise Femi

gRPC Client Connection Pooling

25 May 2023.11mins

Recently I was working on a daemon for consuming MQTT messages sent from thousands of IOT devices. One of the requirements was that the daemon needed to be able to handle about 15k messages per second (I know right 😮 ).

Without going into too much detail (that would be for another blog) we decided to cut the round trip of our MySQL database completely and introduce a gRPC server that would act as a buffer between our daemon and the database. By doing that our daemon would simply drop messages off to the gRPC server and the server would queue and write to the database on its own time.

Building the gRPC server was the easy part the hard part is managing the needs of the daemons, a number of goroutines would need to send messages to the gRPC server, but using the same client connection would significantly affect performance since new requests would have to wait for the connection to finish whatever task is currently being handled, just imagine 14K+ messages waiting for one connection to become free. At this point it was clear using one connection was a no-go and creating new connections is out of the picture (creating new connections is very expensive here, so we decided to use a connection pool.

A lot of the ideas here were inspired by This Blog, i strongly recommend it.

If you don’t know what a connection pool is I strongly advise that you read the blog I linked above.

Jumping In

If you don’t want to go through everything, i have already created a go package that can be used right away. https://github.com/promisefemi/grpc-client-pool

Before anything else, let's talk about how to create new gRPC client connections

conn, err := grpc.Dial(":9000", grpc.WithTransportCredentials(insecure.NewCredential()), grpc.WithBlock()) if err != nil{ // Handle error } // create a new client and use connection

or with context

ctx, cancelCtx := context.WithTimeout(context.Background(), 3 * time.Second) conn, err := grpc.DialContext(ctx,":9000", grpc.WithTransportCredentials(insecure.NewCredential()), grpc.WithBlock()) if err != nil{ // Handle error } // create a new client and use connection

Pool Structure

At its core, a connection pool contains three important properties maxOpenConnection, maxIdleConnection and idleConnections

maxOpenConnection - indicates how many connections we are allowed to open before we have to start queue connection requests.

maxIdleConnection - how many connections are we allowed to keep before we start closing connections?

idleConnections - a list of connections that can be reused.

There are also two important methodsGet and put these methods are responsible for getting and caching connections. Altogether these properties and methods are responsible for managing our connection pool.

type ClientPool struct { mu sync.Mutex //mutex to protect the idleConnections and numOfOpenConnection properties address string // gRPC server address configOptions []grpc.DialOption // gRPC dial configurations maxOpenConnection int // maximum number of open connections allowed maxIdleConnection int // maximum number of idle connections allwed idleConnections map[string]*ClientCon // list of idle connections numOfOpenConnection int // number of currently open connections }

ClientCon is a custom type, so let's create that as well

type ClientCon struct { id string pool *ClientPool Conn *grpc.ClientConn }

Now let us proceed with our put and get methods

Put method

The responsibility of the put method is very simple, it simply takes a connection and returns it back to the list of idle connections (the pool) or closes it if there is no room.

func (cp *ClientPool) put(conn *ClientCon) { //Lock the pool and defer the unlock cp.mu.Lock() defer cp.mu.Unlock() // check to see if we can return the connection back to the idleConnection if cp.maxIdleConnection >= len(cp.idleConnections) { cp.idleConnections[conn.id] = conn } else { // if the number of idle connections are more than allowed, close the connection cp.numOfOpenConnection-- _ = conn.Conn.Close() } }

and that’s about it, the put method is done.

Get method

The get method is a little bit different, the get method has to handle three different scenarios:

  1. Return a connection from the list of idle connections if there is any available

  2. Queue connection request if we have surpassed the number of connections we can create

  3. Create a new connection

Let's see how this translates into code.

func (cp *ClientPool) Get() (*ClientCon, error) { //Lock the pool cp.mu.Lock() // check if we have any connection available if len(cp.idleConnections) > 0 { // return the first connection available, and remove from the list of idleConnection for _, val := range cp.idleConnections { delete(cp.idleConnections, val.id) // increment the number of open connections cp.numOfOpenConnection++ cp.mu.Unlock() return val, nil } } ...

For the first step, we check if we have any idle connections and if we do return the first connection, and increment the list of open connections.

If there are no available connections we move to the next step.

Before we start creating new connections, its important to know if we have not surpassed the number of open connections we are allowed to create, and if we have we would need to queue the connection request and resolve it at a later time.

Go channels are coming to the rescue here. But before we do that we need to introduce a new type that’s going to act as the channel type.

The queueChan struct contains two properties a, this channel is used to receive a ClientCon request once it is fulfilled and an errorChan a channel that receives an error if a connection request could not be fulfilled.

// A queue struct ( The struct is not a channel i just named it that way) type queueChan struct { connectionChan chan *ClientCon errorChan chan error }

and let's add an update to the ClientPool to contain the channel that would act as the queue.

type ClientPool struct { ... connectionQueue chan *queueChan // channel for queuing connection requests when there are no idle connections left. }

Back in the get method, we need to create a new queue request for a connection, pass that request to the pool connection queue then use a select to block until the request is resolved

... // Check if we are allowed to open new connections if cp.maxOpenConnection > 0 && cp.numOfOpenConnection >= cp.maxOpenConnection { // create a new queue request queueRequest := &queueChan{ connectionChan: make(chan *ClientCon), errorChan: make(chan error), } // pass request into connection queue cp.connectionQueue <- queueRequest // block with a select until the request is fuffiled or an error occurs instead select { case conn := <-queueRequest.connectionChan: cp.numOfOpenConnection++ cp.mu.Unlock() return conn, nil case err := <-queueRequest.errorChan: cp.mu.Unlock() return nil, err } } ...

With that we have successfully queued the connection request and when an idle connection is available the request would be resolved. We would come back to how we intend on resolving requests from the connection queue.

The last part of the get method is very simple, at this point, there is no idle connection and we can create a new connection (instead of queuing it).

conn, err := cp.openConnection() if err != nil { return nil, err } cp.numOfOpenConnection++ cp.mu.Unlock() return conn, nil } func (cp *ClientPool) openConnection() (*ClientCon, error) { // Dial a new grpc client connection or use DialContext if that works for you newConn, err := grpc.Dial(cp.address, cp.configOptions...) if err != nil { return nil, err } // create a new ClientCon return &ClientCon{ id: fmt.Sprintf("%v", time.Now().Unix()), pool: cp, Conn: newConn, }, nil }

and all together

// A queue struct ( The struct is not a channel i just named it that way) type queueChan struct { connectionChan chan *ClientCon errorChan chan error } func (cp *ClientPool) Get() (*ClientCon, error) { //Lock the pool cp.mu.Lock() // check if we have any connection available if len(cp.idleConnections) > 0 { // return the first connection available, and remove from the list of idleConnection for _, val := range cp.idleConnections { delete(cp.idleConnections, val.id) // increment the number of open connections cp.numOfOpenConnection++ cp.mu.Unlock() return val, nil } } // Check if we are allowed to open new connections if cp.maxOpenConnection > 0 && cp.numOfOpenConnection >= cp.maxOpenConnection { // create a new queue request queueRequest := &queueChan{ connectionChan: make(chan *ClientCon), errorChan: make(chan error), } // pass request into connection queue cp.connectionQueue <- queueRequest // block with a select until the request is fuffiled or an error occurs instead select { case conn := <-queueRequest.connectionChan: cp.numOfOpenConnection++ cp.mu.Unlock() return conn, nil case err := <-queueRequest.errorChan: cp.mu.Unlock() return nil, err } } conn, err := cp.openConnection() if err != nil { return nil, err } cp.numOfOpenConnection++ cp.mu.Unlock() return conn, nil } func (cp *ClientPool) openConnection() (*ClientCon, error) { // Dial a new grpc client connection or use DialContext if that works for you newConn, err := grpc.Dial(cp.address, cp.configOptions...) if err != nil { return nil, err } // create a new Clientcon return &ClientCon{ id: fmt.Sprintf("%v", time.Now().Unix()), pool: cp, Conn: newConn, }, nil }

And with all that we are almost done. Remember when we queued connection requests? well it's time to resolve those requests.

To do that we need to add a method that would run on a separate goroutine, constantly waiting for queued connection requests and the chance to resolve them. let's call this method handleConnectionQueue

Handling Connection Queue

There are three basic things we need to care about when handling connection queue requests:

  1. Watching for new requests in the queue

  2. Checking for a newly idle connection

  3. Creating a connection if there are no idle connections and if we are allowed to do so.

As you may have noticed there are a lot of similarities with the Get method, but something to take note of is that we are not returning an error immediately after a connection request fails instead we will continue to retry for a given time frame before deciding that the request failed.

Let us begin

func (cp *ClientPool) handleConnectionQueue() { for rq := range cp.connectionQueue { var ( hasTimedOut = false hasCompleted = false timeout = time.After(time.Duration(3) * time.Second) ) //continually try to get/create a connection until timeout or connection completed for { if hasCompleted || hasTimedOut { break } //continually check for timeout or try to get/create a connection select { case <-timeout: hasTimedOut = true rq.errorChan <- ErrConnectionWaitTimeout default: ...

And within the default: we would be doing two things, trying to get an idle connection or trying to create a new connection

... // first check if an idle connection is available cp.mu.Lock() numberOfIdleConnections := len(cp.idleConnections) if numberOfIdleConnections > 0 { for _, val := range cp.idleConnections { delete(cp.idleConnections, val.id) cp.mu.Unlock() rq.connectionChan <- val hasCompleted = true break } } else if cp.maxOpenConnection > 0 && cp.maxOpenConnection > cp.numOfOpenConnection { //check if pool has not exceeded number of allowed open connections // increase numberOfConnection hoping new connection would be created // unlock mutext to free up resources for other connection, since creating new connections could take a while cp.numOfOpenConnection++ cp.mu.Unlock() conn, err := cp.openConnection() //ignoring error because the only error we care about is the timeout, we continue to retry cp.mu.Lock() cp.numOfOpenConnection-- cp.mu.Unlock() if err == nil { rq.connectionChan <- conn hasCompleted = true } } else { //unlock pool and restart cp.mu.Unlock() } } } } }

All together

func (cp *ClientPool) handleConnectionQueue() { for rq := range cp.connectionQueue { var ( hasTimedOut = false hasCompleted = false timeout = time.After(time.Duration(3) * time.Second) ) //continually try to get/create a connection until timeout or connection completed for { if hasCompleted || hasTimedOut { break } //continually check for timeout or try to get/create a connection select { case <-timeout: hasTimedOut = true rq.errorChan <- ErrConnectionWaitTimeout default:// first check if a idle connection is available cp.mu.Lock() numberOfIdleConnections := len(cp.idleConnections) if numberOfIdleConnections > 0 { for _, val := range cp.idleConnections { delete(cp.idleConnections, val.id) cp.mu.Unlock() rq.connectionChan <- val hasCompleted = true break } } else if cp.maxOpenConnection > 0 && cp.maxOpenConnection > cp.numOfOpenConnection { //check if pool has not exceeded number of allowed open connections // increase numberOfConnection hoping new connection would be created // unlock mutext to free up resources for other connection, since creating new connections could take a while cp.numOfOpenConnection++ cp.mu.Unlock() conn, err := cp.openConnection() //ignoring error because the only error we care about is the timeout, we continue to retry cp.mu.Lock() cp.numOfOpenConnection-- cp.mu.Unlock() if err == nil { rq.connectionChan <- conn hasCompleted = true } } else { //unlock pool and restart cp.mu.Unlock() } } } } }

That is most of what we need to build a functional pool.

We still need to write a method to release a connection when you're done using it, for ease of usage we can bind that method to the ClientCon . All the method does is call the ClientPool put method, as i have stated above all the put method does is return a connection back to the list of idle connections or close it completely.

func (c *ClientCon) Release() { c.pool.put(c) }

And for the last part, we want to write a function that helps us create a new pool, configure it and start the handleConnectionQueue goroutine

type PoolConfig struct { MaxOpenConnection int // number of open Connections MaxIdleConnection int // number of idle connections ConnectionQueueLength int //buffer capacity of the queue channel Address string // gRPC server address ConfigOptions []grpc.DialOption // gRPC connection options } func NewClientPool(config *PoolConfig) *ClientPool { clientPool := &ClientPool{ mu: sync.Mutex{}, address: config.Address, configOptions: config.ConfigOptions, maxOpenConnection: config.MaxOpenConnection, maxIdleConnection: config.MaxOpenConnection, numOfOpenConnection: 0, connectionQueue: make(chan *queueChan, config.ConnectionQueueLength), idleConnections: make(map[string]*ClientCon, 0), } // start goroutine go clientPool.handleConnectionQueue() return clientPool }

For real now, that's it we have successfully created a gRPC client connection pool, to create a new pool we would simply

... //Create a new pool config poolConfig := &pool.PoolConfig{ MaxOpenConnection: 10, MaxIdleConnection: 10, ConnectionQueueLength: 10000, Address: ":9000", ConfigOptions: []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), }, } // create a new client pool connPool := pool.NewClientPool(poolConfig) //get a new ClientConn client, err := connPool.Get() if err != nil { log.Fatalf("%s", err) } // Setup a new grpc client // send a request using the conn userMessage := &proto.UserMessage{ FirstName: "Promise", LastName: "Femi", Email: "", } uc := proto.NewUserClient(client.Conn) response, err := uc.Set(context.Background(), userMessage) if err != nil { fmt.Printf("error unable to set user -- %s -- %d\n", err, connPool.GetNumberOfOpenConnections()) } else { fmt.Printf("%+v -- %d \n", response, connPool.GetNumberOfOpenConnections()) } // remember to release connections. client.Release()

We are at the end of a great journey 🙇🏾. Thank you for reading my article, and you can reach out to me if you have any feedback ✌🏾.

I have already created a package that implements all the features outlined in this article @ https://github.com/promise/grpc-client-pool

Prev Data Structures: Singly Linked List