Asynchronous Sorting in Go

8 min read

When we began working on Dolt we made the decision to build on top of Noms. Noms stores data in a content addressable DAG, and has countless applications. It was a great starting point for us to build Dolt, and it let us hit the ground running. Once we had a working prototype of the product we began importing data.
Every dataset we imported taught us something new about the product. It helped us find issues with our code and dependencies, and it helped us understand the scale of data that we could handle. At that time, doing an import with a few million rows could be painfully slow. As I dove into the profile the majority of the time was spent sorting, and this is how I fixed that.

Background

Dolt stores tables in Noms, in a map, the details of which were covered by Aaron Son in this blog article. The TLDR is that a B-Tree like structure, called a Prolly-tree, backs the map that stores our table data. To efficiently edit our map, we need to apply edits in order.

The guys at Attic Labs, who wrote Noms, wrote a really cool way to get data into a map with their streaming map implementation. It takes a channel as an argument which you stream keys and values to, and returns a channel which you can read the resulting map from. It's pretty fast, and there is something about the interface that feels like Rob Pike would love it. The gotchas are:

  1. You need to stream your data into the map in order.
  2. It only works for new maps. You cannot use it to edit a map that already has data in it.

In either of these cases you'll need to use a MapEditor which collects a list of changes that will be applied to the map, and only after all the edits are collected, does it sort all the changes, and then apply them. Dolt may have been the first Noms use case which needed to be able to support reading in billions and applying billions of map edits, so this was likely the first time this case was optimized for.

The Approach

Before they can be applied, the edits are read from some source into a list. During this time Dolt is IO bound, and there are idle CPU cycles that could be used to decrease our sort time once all the data is available.

In order to take advantage of the up front time we need to use an algorithm where work done prior to having all of the items results in improved sort times once you do have all the items. In order to accomplish this, we accumulate edits into batches and once a batch is full it is sent to a background worker routine which sorts it. Once all the edits have been added, we wait for our workers to finish sorting in the background, and then merge sort the batches of sorted edits. This has the additional benefit of being highly parallelizable. Each batch sort can run in a go routine, and as we merge the batches, pairs of batches can be merged in separate routines as well.

Sort Visualization

Implementation

Refactoring Before we Start

The first step in our implementation is to replace the slice accumulating the edits with an interface which can be swapped in and out to make it easy to compare the existing performance and results with the new implementation.

// EditAccumulator is an interface for a datastructure that can have edits added to it. Once all edits are
// added FinishedEditing can be called to get an EditProvider which provides the edits in sorted order
type EditAccumulator interface {
	// AddEdit adds an edit to the list of edits. Not guaranteed to be thread-safe, thus requires external synchronization.
	AddEdit(k Value, v Value)

	// FinishEditing should be called when all edits have been added to get an EditProvider which provides the
	// edits in sorted order. Adding more edits after calling FinishedEditing is an error.
	FinishedEditing() (EditProvider, error)

	// Close ensures that the accumulator is closed. Repeat calls are allowed. Not guaranteed to be thread-safe, thus
	// requires external synchronization.
	Close()
}

// EditProvider is an interface which provides map edits as KVPs where each edit is a key and the new value
// associated with the key for inserts and updates.  deletes are modeled as a key with no value
type EditProvider interface {
	// Next returns the next KVP representing the next edit to be applied.  Next will always return KVPs
	// in key sorted order
	Next() (*KVP, error)

	// NumEdits returns the number of KVPs representing the edits that will be provided when calling next
	NumEdits() int64
}

Background Sorting of Batches

Now that we know what we need to implement lets take a look at our new AddEdit implementation. It's pretty straight forward: we add our edits to a slice, and once the slice is full, we write that slice to a channel and create a new slice to accumulate map edits into.

// AddEdit adds an edit
func (ase *AsyncSortedEdits) AddEdit(k Value, v Value) {
	ase.accumulating = append(ase.accumulating, types.KVP{Key: k, Val: v})

	if len(ase.accumulating) == ase.sliceSize {
		ase.asyncSortAcc()
	}
}

func (ase *AsyncSortedEdits) asyncSortAcc() {
	ase.sortChan <- ase.accumulating
	ase.accumulating = make([]types.KVP, 0, ase.sliceSize)
}

Our sortChan is being read by a go routine running the function sorter which sorts it and writes the results to resultsChan

func sorter(in, out chan types.KVPSlice) error {
	for kvps := range in {
        // internally this uses sort.Stable
		err := sortEdits(types.KVPSort{Values: kvps})

		if err != nil {
			return err
		}

		out <- kvps
	}

	return nil
}

We will accumulate edits until the point when FinishedEditing is called at which time we need to return an EditProvider which is used to iterate over all the edits in sorted order. We close our background batch sorting channel, sortChan, sort any edits that are in the accumulating slice, and then wait for all background sorting to finish before calling mergeSortBatches.

// FinishedEditing should be called once all edits have been added. Once FinishedEditing is called adding more edits
// will have undefined behavior.
func (ase *AsyncSortedEdits) FinishedEditing() (types.EditProvider, error) {
	close(ase.sortChan)

	if len(ase.accumulating) > 0 {
		err := sortEdits(types.KVPSort{Values: ase.accumulating})

		if err != nil {
			return nil, err
		}

		coll := NewKVPCollection(ase.nbf, sl)
		ase.sortedColls = append(ase.sortedColls, coll)
	}

	// wait for background sorting of batches to complete
	ase.wait()
	ase.mergeSortBatches()

	return ase.Iterator(), nil
}

Merging Sorted Batches

We have a list of sorted batches, that we need to merge. We need a way of grouping our batches into pairs. As you merge N batches, pairwise, down to a single collection of sorted edits, it is possible that your batches vary in size wildly. Say you have 9 batches of edits. Here are two ways they could be merged:

10   10 10   10 10   10 10   10 3        3    10 10   10 10   10 10   10 10
 \  /    \  /    \  /    \  /   |          \  /    \  /    \  /    \  /   |
  20      20      20      20    |           13      20      20      20    10
  20      20      20      20    |           10      13      20      20    20
    \    /          \    /      |            \      /        \      /     |
     \  /            \  /       |             \    /          \    /      |
      40              40        |               23              40        20
      40              40        |               20              23        40
        \             /         |                 \            /          |
         ------+------          |                  ------+-----           |
               80               3                        43               40
                 \             /                           \             /
                  ------+------                             ------+------
                        83                                        83

The first approach uses a naive merge order. The second sorts by size between levels which results in more even batches being grouped as you merge down each batch. I don't believe there to be a measurable performance impact of either strategy, but we opted to go with the more balanced approach.

// we pair collections so that as you perform many merges you end up with collections of edits that are similarly sized
func pairCollections(colls []*KVPCollection) [][2]*KVPCollection {
	numColls := len(colls)
	pairs := make([][2]*KVPCollection, 0, numColls/2+1)
	sort.Slice(colls, func(i, j int) bool {
		return colls[i].Size() < colls[j].Size()
	})

	if numColls%2 == 1 {
		pairs = append(pairs, [2]*KVPCollection{colls[numColls-1], nil})

		colls = colls[:numColls-1]
		numColls -= 1
	}

	for i, j := 0, numColls-1; i < numColls/2; i, j = i+1, j-1 {
		pairs = append(pairs, [2]*KVPCollection{colls[i], colls[j]})
	}

	return pairs
}

Now that we have a strategy for pairing our batches we can get back to merging them into a single sorted collection. We spawn worker go routines to handle the merge operations up to some configurable limit. Then we write our batch pairs to a channel to be sorted, and close the channel once all the batches being merged have been written. Then we will wait for the results before doing it all over again for the next level of our merge tree.

// mergeSortBatches performs a concurrent merge sort.  Once this completes use the Iterator method for getting a KVPIterator
// which can be used to iterate over all the KVPs in order.
func (ase *AsyncSortedEdits) mergeSortBatches() {
	for len(ase.sortedColls) > 1 {
		pairs := pairCollections(ase.sortedColls)

		numPairs := len(pairs)
		numGoRs := ase.sortConcurrency
		if numGoRs > numPairs {
			numGoRs = numPairs
		}
		
		wg := &sync.WaitGroup{}
		wg.Add(numGoRs)

		sortChan := make(chan [2]*KVPCollection, numPairs)
		resChan := make(chan *KVPCollection, numPairs)
		for i := 0; i < numGoRs; i++ {
			go func() {
				defer wg.Done()
				merger(sortChan, resChan)
			}()
		}

		for _, pair := range pairs {
			sortChan <- pair
		}

		close(sortChan)
		wg.Wait()
		close(resChan)

		ase.sortedColls = nil
		for coll := range resChan {
			ase.sortedColls = append(ase.sortedColls, col)
		}       
	}
}

The code presented here omits some error handling for clarity, and does not go into the details of merge sort implementation, but the full source code can be found here.

Visualizing the Difference

Sort Visualization

This video provides a visual representation of the old and new sorting strategies. At the start of the video you can see data being added, and after half a second we begin async sorting our first batch. The process of adding and concurrently sorting batches asynchronously continues for about 5 seconds until all the data is available. It is at this time that the old model begins sorting it's data, and then at 6 seconds all of our batches are sorted and we begin merging. The new model completes in about 8 seconds, and the old is done after 15.

Performance Impact

While the visualization helps us see what's going on, they may not represent real world benchmarks. On small sets of changes (small being less than 1 batch), our new methodology is identical to the old. For change sets where the number of edits is just slightly more than one the additional parallelism has a small negative performance impact. After that the performance impacts are very positive. Benchmarking 10 million edits gave the following results on average:

  • Async Batch Sorting with Merge
    • Time to add the items: 7.52
    • Time to sort once all items are added: 3.16
  • sort.Stable once all items are added:
    • Time to add the items: 8.97
    • Time to sort once all items are added: 32.96

Roughly a 10x performance improvement in the sort time, meant that Dolt users are able to get data into a table more quickly. Another interesting improvement was in the amount of time needed to add the items to the collection of edits. The new method pre-allocates fixed size blocks that it writes to a channel for sorting before creating another fixed size block. The old method would simply append to a slice. The more you append the more time you will need to grow your slice. This involves additional allocations, copying, and garbage collection.

Summary

As Dolt becomes an extremely useful product, it's performance will become a focus of development. We are committed to supporting larger datasets, and more use cases. Stay tuned for future blog articles detailing some of this work.

SHARE

JOIN THE DATA EVOLUTION

Get started with Dolt

Or join our mailing list to get product updates.