Real World Golang Concurrency Examples

GOLANG
8 min read

Here at DoltHub, we're big fans of Golang. We built DoltDB, the first fully versioned SQL relational database, entirely on a Golang stack, and we use Go for all the services that power DoltHub, too. This post is a continuation of our series on Golang. You can find a few other popular posts in this series below:

In today's Golang blog post, we're looking at a real world example of using Go's concurrency features to solve some concurrency problems I encountered recently. I find concrete examples are often the most helpful thing to clearly explain concepts and help people understand how and when to apply them. Hopefully this article will help you better understand how and why you might need to apply Go's concurrency features in your own code.

A Tale of Two Blocked Goroutines

a tale of two blocked go routines

Lately I've been working on a new Dolt feature called "MySQL replication" that allows Dolt databases to replicate directly from MySQL databases using the existing MySQL binlog protocol. The implementation for this feature spins up a goroutine that is called the "applier". The applier goroutine connects to the MySQL source server, pulls replication events off the connection, and applies the changes – a pretty simple structure. This was all working well, until I started testing more edge cases around the STOP REPLICA command which stops the replication process. I found several cases where sending the STOP REPLICA command wasn't being processed and it made me realize I needed to change the code's structure a bit and use a little more of Golang's concurrency features to get the goroutines working together cleanly and correctly.

Before we dive into the fix, let's explain the exact problem better. The main thing the STOP REPLICA command needs to do is send a signal on an (unbuffered) channel for the replication applier goroutine to receive.

// StopReplica implements the BinlogReplicaController interface.
func (d *doltBinlogReplicaController) StopReplica(ctx *sql.Context) error {
   ...
   d.applier.stopReplicationChan <- struct{}{}
   ...
}

When the applier goroutine sees that message on the stopReplicationChan channel, it simply exits its processing loop:

    // Process binlog events
    for {
        select {
        case <-a.stopReplicationChan:
            ctx.GetLogger().Trace("received signal to stop replication routine")
            return nil
        default:
            event, err := conn.ReadBinlogEvent()
            if err != nil {
                // Handle errors, re-establish connection, or pause for next event
            }
            a.processBinlogEvent(ctx, engine, event)
        }
    }

Pretty simple, right? So where's the problem? In the basic, happy case, this works just fine. (It's always those pesky edge cases that get us!) Take a closer look and note that the call to conn.ReadBinlogEvent can block waiting for a binlog event message. If there aren't any more events being sent, then our code is stuck here waiting and can't process the message that the controller sent over the stopReplicationChan channel. Eventually we'll get a network timeout of course and should get an error back from ReadBinlogEvent, but we can't rely on that. These two goroutines are now both blocked and we can't service the STOP REPLICA command fast enough.

More Channels, More Better!

Not surprisingly, there are multiple ways we could address this problem. The *mysql.Conn instance has a net.Conn inside of it that we can access, and the net.Conn interface allows concurrent Close() calls with a guarantee that any blocked reads or writes on that connection will return with an error. We could also use SetReadDeadline(time.Now()) to force blocked reads to return, without closing the connection. Both of these would be reasonable ways to handle this problem, but we're going to take a different approach and move this blocking call to a separate goroutine and use some new channels for our new goroutine to communicate with the applier. This felt like a natural fit for a producer/consumer relationship between the goroutines, so I pulled out a new type, called binlogEventProducer that is responsible for reading binlog events from the connection (the blocking call we looked at above) and sending those back to the applier over a channel, as well as sending back any errors over a separate channel. Note that the event producer owns the two communication channels, so it is responsible for creating and destroying them, as we'll see later. Here's the full implementation on GitHub if you want to take a closer look.

Here's the main processing loop for the binlogEventProducer:

// Go starts this binlogEventProducer in a new goroutine. Right before this routine exits, it will close the
// two communication channels it owns.
func (p *binlogEventProducer) Go(_ *sql.Context) {
	go func() {
		for p.IsRunning() {
			// ReadBinlogEvent blocks until a binlog event can be read and returned, so this has to be done on a
			// separate thread, otherwise the applier would be blocked and wouldn't be able to handle the STOP
			// REPLICA signal fast enough. 
			event, err := p.conn.ReadBinlogEvent()

			// If this binlogEventProducer has been stopped while we were blocked waiting to read the next
			// binlog event, abort processing it and just return instead.
			if p.IsRunning() == false {
				break
			}

			if err != nil {
				p.errorChan <- err
			} else {
				p.eventChan <- event
			}
		}
		// Close the channels after we are done using them
		close(p.errorChan)
		close(p.eventChan)
	}()
}

Using this new type cleaned up the processing loop in our applier goroutine nicely, too:

    select {
    case event := <-eventProducer.EventChan():
        err := a.processBinlogEvent(ctx, engine, event)
        if err != nil {
            // handle an event processing error
        }

    case err := <-eventProducer.ErrorChan():
        // handle a network IO error 

    case <-a.stopReplicationChan:
        ctx.GetLogger().Trace("received stop replication signal")
        eventProducer.Stop()
        return nil
    }

Now we can process the STOP REPLICA command as soon as it is received, even if the separate binlogEventProducer goroutine is blocked waiting for a binlog event to be read from the connection. Because that blocking call is now isolated on its own goroutine and we're using channels to communicate between the two goroutines, we don't have to worry about the STOP REPLICA command hanging in this case anymore.

Time.After(Duration) 🎶

time

Another edge case I ran into while testing the STOP REPLICA command was when a connection to the source MySQL database can't be established. In this case, connection retry logic kicks in, which includes a backoff period where we wait before trying to establish a connection again. By default, MySQL waits 60s between connection retry attempts (and will retry for up to 60 days!!). My first, naive version of the connection retry logic simply used time.Sleep(), but that means the applier goroutine is blocked again, this time for a full minute and can't process the incoming message on the channel to stop the replication process.

Here's what that first, naive version looked like:

    conn, err = mysql.Connect(ctx, &connParams)
    if err != nil && connectionAttempts >= maxConnectionAttempts {
        return nil, err
    } else if err != nil {
        time.Sleep(time.Duration(connectRetryDelay) * time.Second)
    }

Go provides a really elegant (and channel-based 🙌🏻) solution for this with timers and the time.After(Duration) function, which under the covers, simply uses NewTimer(d) to create a timer that will send a message on a channel after the given duration. This means our goroutine will not be blocked, and we can use select to listen for messages on either one of these channels. As a still fairly new Go developer, this was my first time using this API, and I really appreciated how simple and clean it was to use.

Here's how the connection establishment code looks after moving to use the time.After(Duration) API.

    // If there was an error connecting (and we haven't used up all our retry attempts), listen for a  
    // STOP REPLICA signal or for the retry delay timer to fire. We need to use select here so that we don't  
    // block on our retry backoff and ignore the STOP REPLICA signal for a long time.  
    select {  
    case <-a.stopReplicationChan:  
      ctx.GetLogger().Debugf("Received stop replication signal while trying to connect")  
      return nil, ErrReplicationStopped  
    case <-time.After(time.Duration(connectRetryDelay) * time.Second):  
      // Nothing to do here if our timer completes; just fall through  
    }  

The result of this is that even if the connection establishment code is in a long retry backoff period, our goroutine isn't blocked! It's asynchronously polling for either a stop replication signal or for the retry timer to fire. A quick test for this connection retry scenario, and now we've got this edge case covered, too! 🙌🏻

Sync.Atomic

There's one last Go concurrency feature I needed to use in this work. As part of the coordination between the applier goroutine and the event producer goroutine, the applier needs to be able to stop the event producer. I could have used a channel to send a stop signal, but this case was simple enough that I used a boolean value to track if the event producer was supposed to still be running. Reading and writing a value in memory from multiple goroutines is not safe of course, but Go provides some really handy synchronization primitives in the sync.atomic package. In particular, Go 1.19 added several new atomic types that make it really easy to ensure updates to boolean, 32 and 64 bit ints (signed and unsigned), and pointers all happen atomically and are safe to read/write from concurrent code.

Without being able to atomically update this value, we risk reading a partially updated value from memory while a write is taking place. We could have used other synchronization tools like mutexes, to block concurrent access to that member, but using the atomic.Bool type is soo simple and those types embed all the synchronization logic we need for this use case.

In the code snippet below, you can see where we define our atomic.Bool member, and then how we use the Load() and Store() functions to read and write the value atomically. It really couldn't be much easier than this!

type binlogEventProducer struct {  
   conn      *mysql.Conn  
   errorChan chan error  
   eventChan chan mysql.BinlogEvent  
   running   atomic.Bool  
}

// IsRunning returns true if this instance is processing binlog events and has not been stopped.
func (p *binlogEventProducer) IsRunning() bool {  
   return p.running.Load()  
}  
  
// Stop requests for this binlogEventProducer to stop processing events as soon as possible.
func (p *binlogEventProducer) Stop() {  
   p.running.Store(false)  
}

Now the applier goroutine can use the binlogEventProducer.Stop() API to safely update the running boolean member inside the binlogEventProducer instance, and the binlogEventProducer uses the IsRunning() API to safely check if it's supposed to still be running or not.

Conclusion

These were a few simple examples of using Go's concurrency features. Hopefully they show how you can easily and safely deal with real world concurrency issues using Go's very capable toolset. As a developer coming to Go fairly recently, I've been pleasantly surprised at how easy Go has been to learn and how simple and clean most of its abstractions and APIs are. Go's concurrency features are perhaps the best example of that! They are powerful, but are also easy to understand and to use safely.

If you liked this article, subscribe to the DoltHub weekly newsletter to stay in sync on what we're up to and get notified about Golang blog posts. You can also swing by the DoltHub Discord and let us know what you think! We love discussing Go, databases, and data versioning! 🤓 Come by and join us!

SHARE

JOIN THE DATA EVOLUTION

Get started with Dolt

Or join our mailing list to get product updates.