Go channels to solve interface impedance mismatch
We're hard at work on compatibility for
Doltgres, the world's first and only
version-controlled Postgres-compatible SQL database. This means getting all the incredibly complex
queries on the pg_catalog
tables issued by various tools returning correct results in a reasonable
amount of time. To this end, for the last week I've been busy implementing index support for our
virtual pg_catalog
tables. Without indexes, some queries involving joins on five or more of these
tables (surprisingly common in the Postgres library world) simply won't return, so it's table stakes
to have them.
I chose Google's btree package to store and query index information in memory. Performance is good, and the package is easy enough to use, but I ran into a problem during integration.
The btree
package scans ranges of elements with methods that look like this:
func (t *BTreeG[T]) AscendRange(greaterOrEqual, lessThan T, iterator ItemIteratorG[T])
iterator
is a callback function that gets called for every element in the range provided. A single
call to AscendRange
will call iterator
N times, once for every element found in the range
provided.
Meanwhile, our SQL engine interfaces rely on a more
classical iterator pattern, one which returns a single element every time a Next()
method is
called. The interface looks like this:
// RowIter is an iterator that produces rows.
type RowIter interface {
// Next retrieves the next row. It will return io.EOF if it's the last row.
// After retrieving the last row, Close will be automatically closed.
Next(ctx *Context) (Row, error)
Closer
}
The problem in front of me is a form of impedance
mismatch between these
two interfaces, specifically relating to their periodicity. The btree
package's iteration methods
have a natural period of every element in the range, whereas go-mysql-server
's interfaces have a
natural period of a single element. How can I get them to agree, so that I can scan a range during a
call to Next()
but return only a single element, saving the rest for subsequent calls to Next()
?
There are two basic techniques to achieve this that I'm aware of.
The first is to store the results of the AscendRange()
call in a slice, then return one every time
the iterator is called. This works fine, but it's wasteful: not only do you have to store the entire
slice up front for tables that can have very many elements, but simply allocating and collecting the
slices puts significant pressure on garbage collection, dragging performance for the entire process.
The second technique is to fix the impedance mismatch with Go channels.
Go channels to the rescue
Like many of you, I read the Go tour when I was learning the language. When I got to the chapter on channels, I encountered this example, which seemed very silly to me.
package main
import (
"fmt"
)
func fibonacci(n int, c chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
close(c)
}
func main() {
c := make(chan int, 10)
go fibonacci(cap(c), c)
for i := range c {
fmt.Println(i)
}
}
Why bother with concurrency at all here? Sure, this is a toy example for a tutorial, but isn't it pretty wasteful to allocate a channel and a new goroutine when the loop could be run in place? Won't this give newcomers the wrong idea what channels are for?
As it turns out, there is a perfectly good reason to pursue this pattern of iteration, and it's to solve the exact problem in front of me. A channel is an easy, efficient way to bridge two interfaces of differing periods.
Let's look at how this works.
First, we have a struct which we'll use to implement the sql.RowIter
interface. It will have some
accessor fields that give it access to a *btree.BTreeG
for its range scan, as well as some sql
engine bookkeeping.
// inMemIndexScanIter is a sql.RowIter that uses an in-memory btree index to satisfy index lookups
// on pg_catalog tables.
type inMemIndexScanIter[T any] struct {
lookup sql.IndexLookup
rangeConverter RangeConverter[T]
btreeAccess BTreeIndexAccess[T]
rowConverter rowConverter[T]
rangeIdx int
nextChan chan T
}
The magic happens in the nextItem()
method, where the btree data structure is queried. Note that
this method is parameterized so it can be reused for the in-memory storage of all the different
pg_catalog
tables we need it for. I've heavily annotated this logic with comments for readers
unfamiliar with our domain.
// nextItem returns the next item from the index lookup, or io.EOF if there are no more items.
// Needs to return a pointer to T so that we can return nil for EOF.
func (l *inMemIndexScanIter[T]) nextItem() (*T, error) {
if l.rangeIdx >= l.lookup.Ranges.Len() {
return nil, io.EOF
}
// If we have a channel established a scan is in progress,
// so read from it if we can
if l.nextChan != nil {
next, ok := <-l.nextChan
// If we failed to read, then the channel was closed, so try again with the next range
if !ok {
l.nextChan = nil
l.rangeIdx++
return l.nextItem()
}
return &next, nil
}
// Start a new goroutine for the next index scan
l.nextChan = make(chan T)
rng := l.lookup.Ranges.ToRanges()[l.rangeIdx]
go func() {
// When iteration is done, close this channel to signal that this range
// is exhausted to the channel reader
defer func() {
close(l.nextChan)
}()
// Fetch the bounds for the range scan
gte, hasLowerBound, lt, hasUpperBound := l.rangeConverter.getIndexScanRange(rng, l.lookup.Index)
// Our iterator function just sends elements to the channel
// (and will block until the channel is read)
itr := func(item T) bool {
l.nextChan <- item
return true
}
// idx is a btree.BTreeG
idx := l.btreeAccess.getIndex(l.lookup.Index.(pgCatalogInMemIndex).name)
// Now just iterate the range requested with the iterator
// that sends on the channel
if hasLowerBound && hasUpperBound {
idx.AscendRange(gte, lt, itr)
} else if hasLowerBound {
idx.AscendGreaterOrEqual(gte, itr)
} else if hasUpperBound {
idx.AscendLessThan(lt, itr)
} else {
return
}
}()
// Now that we have a channel with elements waiting on it,
// try this call again to read from it
return l.nextItem()
}
Let's go over the channel part in more detail to see what it's doing. First let's look at the most important bit, the callback we pass to the B-tree scan.
// Our iterator function just sends elements to the channel
// (and will block until the channel is read)
itr := func(item T) bool {
l.nextChan <- item
return true
}
This is the function that will get called for every element in range in the B-tree we're scanning,
and all it does is send that element to the channel we created and stored in the iterator
struct. This is the part that bridges the time gap between the lifecycles of AscendRange()
and
Next()
. The index scan happens in the background, and queues up a single element to be read by
Next()
. That happens here:
// If we have a channel established a scan is in progress,
// so read from it if we can
if l.nextChan != nil {
next, ok := <-l.nextChan
// If we failed to read, then the channel was closed, so try again with the next range
if !ok {
l.nextChan = nil
l.rangeIdx++
return l.nextItem()
}
return &next, nil
}
Reading elements from the channel is obvious, but the behavior of next, ok
might not be: ok
will
only be false when the channel has been closed, which signals to the reader that we should advance
to the next range and keep going.
It's also interesting to note that in a generic function like this one, it's only possible to return
a nil
generic value by declaring the function return type as *T
rather than T
.
Conclusion
This was a surprising solution to me, one of the few times in years of Go development that I've used channels in this way, for a non-concurrent use case. The technique is generally useful, and because the performance of goroutines is quite good, it's viable even on the hot path of an application. One obvious extension would be to buffer the channel to get more parallel processing on the scan, but I haven't profiled that yet to see how much it matters in practice.
Questions about Go channels or Doltgres? Come by our Discord to talk to our engineering team and meet other Doltgres users.