Planning joins to make use of indexes

SQL
15 min read

Introduction

Dolt is Git for Data. It's a SQL database that you can clone, fork, branch, and merge. Dolt's SQL engine is go-mysql-server, and today we're going to discuss how it implements join planning to make a query plan involving multiple tables as efficient as possible.

What's join planning?

When a query involves more than one table, there are many different ways to access those tables to get a correct result. But some ways are much faster than others! Choosing an order to access tables in and a strategy to assemble result rows is known as join planning. This is easiest to explain with an example.

Let's create three tables to track the populations of cities and states, and the people who live in them. If you have Dolt installed (see instructions here), you can follow along.

% mkdir join-planning && cd join-planning
% dolt init
Successfully initialized dolt data repository.
% dolt sql
# Welcome to the DoltSQL shell.
# Statements must be terminated with ';'.
# "exit" or "quit" (or Ctrl-D) to exit.
join_planning> create table states (name varchar(100) primary key not null, population int unsigned);
join_planning> create table cities (name varchar(100) primary key not null, state varchar(100) not null, population int unsigned);
join_planning> create table people (name varchar(100) primary key not null, city varchar(100) not null);

Let's say that we want a list of people named "John Smith" along with names and populations of the cities and states they live in. We would write a query like this:

select * from people p
    join cities c on p.city = c.name
    join states s on s.name = c.state
    where p.name = "John Smith";

There's lots of ways that a query planner could execute this query. A really bad way would be to look at every combination of every row from all three tables and test each combination to see if it matches the JOIN condition and WHERE clause. This is correct and valid, but very expensive. If we say that the states, cities and people tables contain S, C and P rows respectively, this query plan (which is called a cross join), will result in S * C * P row accesses and comparisons. It's a bad idea.

There are simple tricks you can use to speed up query execution. Using pushdown optimization, you can eliminate most of the accesses to the people table. Let's say that the number of "John Smiths" in the database is called J, and it's much smaller than P. Then using pushdown intelligently reduces the cost of our access to S * C * J.

Until a few weeks ago, this was as good as Dolt could do on joins of three or more tables. For two tables, we would use an index if available. But for three, no luck. It made the product borderline unusable for a workload with this query pattern and a non-trivial data size.

This blog post is about how we optimized the join planner to generate more intelligent, efficient query plans for any number of tables. In today's version of Dolt, that same query will generate the following query plan:

join_planning> explain select * from people p
    join cities c on p.city = c.name
    join states s on s.name = c.state
    where p.name = "John Smith";
+-------------------------------------------------------------+
| plan                                                        |
+-------------------------------------------------------------+
| IndexedJoin(p.city = c.name)                                |
|  ├─ Filter(p.name = "John Smith")                           |
|  │   └─ Projected table access on [name city]               |
|  │       └─ TableAlias(p)                                   |
|  │           └─ Indexed table access on index [people.name] |
|  │               └─ Exchange(parallelism=16)                |
|  │                   └─ Table(people)                       |
|  └─ IndexedJoin(s.name = c.state)                           |
|      ├─ TableAlias(c)                                       |
|      │   └─ IndexedTableAccess(cities on [cities.name])     |
|      └─ TableAlias(s)                                       |
|          └─ IndexedTableAccess(states on [states.name])     |
+-------------------------------------------------------------+

The plan starts with an indexed access on the name column of people to find all the John Smiths. Then for each row, it uses a primary key index to look up the city. Then for each city, it uses another primary key to look up the state. In all, this leads to a total query cost of J * 3.

Is that... a lot?

Is that a lot?

Using some real numbers to drive this home: let's use the US and say that there are 330,000,000 people rows, 20,000 cities rows, and 52 states rows (we didn't forget you, DC and Puerto Rico). A cross join query plan would access a number of rows equal to the product of these numbers, which is roughly 343 trillion accesses. It's a big number. Your query isn't going to complete.

There are about 48,000 people named John Smith in the US. So using pushdown optimization gets us down to about 50 billion row accesses. This is a lot better than before, but still terrible. The query isn't returning.

Using both pushdown to the people table and indexed accesses to cities and states, on the other hand, limits the query execution to only 48,000 accesses to the people table, then 1 access to each of the cities and states table for each of these rows. That's 3 * 48,000, or 144,000 table accesses total.

Join plan Number of rows accessed
Cross join 343 * 10^12
Cross join with pushdown 50 * 10^9
Pushdown and indexed access 144 * 10^3

Unlike in the pushdown blog, I won't bother to spell out the percentage savings. We're looking at 4 decimal orders of magnitude improvement for the first optimization, then another 5 for the second. It's the difference between Dolt being a usable query engine or a bad space heater.

How does it work?

To assemble an efficient query plan, you have to start by by answering one really important question:

What order should we access the tables in?

This really makes all the difference. In the example above, a table access order of people > cities > states lets us use the primary key index on the latter two tables. If we instead chose the order states > cities > people, we can't use the information from earlier tables to reduce the number of lookups into later tables, giving us a cross join.

There are a lot of interesting details to get wrong, but to get table order right you can use some pretty simple heuristics.

  1. What index could I use to access this table? Are those columns part of a join condition?
  2. Are required columns available to use as a key? Did the other tables in the join condition precede this one?
  3. How many rows are in this table if I need to do a full table scan?
  4. Is this a LEFT or RIGHT join, and if so, is this table on the side of the join that requires it to come first?

We'll come back to the actual implementation of the table ordering algorithm later. For now let's assume its existence, and it tells us which order to access tables in. How do we build a join plan with that access order?

Join plans are not isomorphic

In go-mysql-server, query plans are organized in a tree of Node objects. As of now, all nodes have at most two children, making the query plan a binary-ish tree. A Join node knows how to get a row from its left child, then iterate over its right child looking for matches on the join condition. When the right child iterator is out of rows, it gets the next row from its left child. Eventually it runs out of rows in the left child and returns io.EOF from its iterator.

Like everything else, this is easiest to visualize with some examples. For all of these, we'll use one-letter table names with single columns that match the table name. Here's a simple join between two tables A and B:

select * from A join B on a = b;

A naive query plan looks like this:

two table join

As we add additional tables to the join, they become the new root of the tree, with the original subtree as the left child.

select * from A join B on a = b join C on b = c;

three table join

select * from A join B on a = b join C on b = c join D on c = d;

four table join

Let's examine this last example more closely. What happens when we open an iterator on the root Node of the query? It opens an iterator on its left child, which in turn opens an iterator on its left child, and so on. Each node, after accessing a row from its left child, then attempts to find a matching row from its right child. We end up with the table access order the same as in the lexical query: A > B > C > D.

Let's trace through the execution of a single row in the result set.

  1. The join node a = b gets a row from A. Then it iterates through the rows of B looking for rows that match the join condition a = b. When it finds such a row, it returns it.
  2. The node b = c takes the row from its left child, which is a concatenation of rows from tables A and B. It then iterates over its right child, the rows of C, looking for rows that match the join condition b = c. When it finds such a row, it returns it.
  3. The node c = d takes the row from its left child, which is a concatenation of rows from A and B and C, in that order. It then attempts to match rows from its right child, D, just as above.

Importantly, there are sometimes many possible binary trees that can implement the above logic to yield a correct result for any given table access order. The tree construction algorithm above, where we keep shoving a sub-tree down to the left child of a new join node, is just what the parser gives us by default because it's left associative. But we can draw other trees that give the same results. For example, here's a balanced join tree:

balanced four table join

Like the original, this produces a table access order of A > B > C > D. If we wanted to access the tables in the opposite order, we could simply flip the left and right children of every node in the original tree like so:

reversed four table join

Again, there are sometimes many possible join trees for a given table ordering. But they all have one thing in common: their join conditions refer to tables that can be found in their left and right children. Otherwise, the node cannot evaluate its join condition. For example, let's say that we are querying three tables and want to access them in the order B > A > C. This is an invalid join plan with that table ordering:

invalid three table join

This plan is invalid because the node b = c doesn't have table B as a descendant, so it cannot evaluate its join condition. We can't get around this issue by just swapping the join conditions, either:

invalid three table join

The lower join condition still isn't satisfied, this time because it needs table B and doesn't have it as a descendant. In order to get the order B > A > C and still satisfy the join conditions, we need to produce this tree instead:

three table join

Join search

We can use the above insights about constructing a join tree to come up with a general algorithm for doing so. It's a constraint solving / search problem, and in the source code I call it join search.

  1. Choose a join condition from the set of available join conditions to begin with.
  2. Construct all possible left subtrees for this join condition recursively, using the remaining tables and join conditions.
  3. Check each potential left subtree for validity: the tables must be in order, and the join conditions must be satisfied. If a subtree is not valid, discard it and examine the next one.
  4. Construct all possible right subtrees for this join condition recursively, using the remaining tables and join conditions (making sure to remove from consideration all tables and join conditions used by the left subtree). Again discard subtrees that aren't valid.
  5. If you can't find a valid left or right subtree, give up and go back to step one, choosing a new join condition to begin with.

Like most things in this tech blog series, the idea is relatively simple but there are many small details to screw up. Because the Node objects are pretty cumbersome to work with, I created a simplified type to represent the join tree during search:

// A joinSearchNode is a simplified type representing a join tree node, which is either an internal node (a join) or a
// leaf node (a table). The top level node in a join tree is always an internal node. Every internal node has both a
// left and a right child.
type joinSearchNode struct {
    table    string            // empty if this is an internal node
    joinCond *joinCond         // nil if this is a leaf node
    parent   *joinSearchNode   // nil if this is the root node
    left     *joinSearchNode   // nil if this is a leaf node
    right    *joinSearchNode   // nil if this is a leaf node
    params   *joinSearchParams // search params that assembled this node
}

Besides the normal tree structure, this type also tracks a parent pointer (to make it easier to walk from the root to validate table ordering rules) and search params to track which tables and join conditions are available for use.

Here's a portion of the searchJoins function that implements this algorithm. The full code is linked above.

func searchJoins(parent *joinSearchNode, params *joinSearchParams) []*joinSearchNode {
    // Our goal is to construct all possible child nodes for the parent given. Every permutation of a legal subtree should
    // go into this list.
    children := make([]*joinSearchNode, 0)

    // <snipped code appending tables to the list of children>

    for i, cond := range params.joinConds {
        if params.joinCondIndexUsed(i) {
            continue
        }

        paramsCopy := params.copy()
        paramsCopy.usedJoinCondsIndexes = append(paramsCopy.usedJoinCondsIndexes, i)

        candidate := &joinSearchNode{
            joinCond: cond,
            parent:   parent,
            params:   paramsCopy,
        }

        // For each of the left and right branch, find all possible children, add all valid subtrees to the list
        candidate = candidate.targetLeft()
        leftChildren := searchJoins(candidate, paramsCopy)

        // pay attention to variable shadowing in this block
        for _, left := range leftChildren {
            if !isValidJoinSubTree(left) {
                continue
            }
            candidate := candidate.withChild(left).targetRight()
            candidate.params = candidate.accumulateAllUsed()
            rightChildren := searchJoins(candidate, paramsCopy)
            for _, right := range rightChildren {
                if !isValidJoinSubTree(right) {
                    continue
                }
                candidate := candidate.withChild(right)
                if isValidJoinSubTree(candidate) {
                    children = append(children, candidate)
                }
            }
        }
    }

    return children
}

Plumbing key information through

Now that we have a table order and a join plan that implements that order, we need a way to get key information to later tables in the plan. The whole reason that we ordered tables was to make it possible to use indexed lookups on every table but the first. For each of these tables, we want to pass the concatention of the rows from all earlier tables in the join, so that we can use this information to construct a lookup key for an index.

Consider this four-table join plan. For each edge in the tree, we'll label the edge with the tables that the child node need access to in order to look up key information in any table to its left.

four table join

Each join node will supply its left-hand child with the row it received (empty in the case of the root node). Then it will supply its right-hand child the concatenation of this row and the row from the left-hand child. This amounts to an in-order walk of the tree, which will visit the tables in the order we specified in the beginning. First, the root node traverses left with an empty row. The same thing happens in the first child node (a = b). When this child node traverses to the right, it supplies the row from its left-hand child, A. Then the root node traverses right, and supplies the row from its left child, which corresponds to AB. The same thing happens in the final node (c = d), where the final right-hand child gets a row formed by the concatenation of its parent row (AB) and its left-hand child (C).

Unlike most of the rest of engine analysis, we can't do this transformation of the tree bottom up. It's fundamentally an in-order walk, so we have to do it top down with a custom function. Here's the interesting portion of it (with some small edits, like removing error handling boilerplate and simplifying the return type).

func replaceTableAccessWithIndexedAccess(
    node sql.Node,
    schema sql.Schema,
    scope *Scope,
    joinIndexes joinIndexesByTable,
    exprAliases ExprAliases,
    tableAliases TableAliases,
) sql.Node {

    switch node := node.(type) {
    case *plan.TableAlias, *plan.ResolvedTable:
        // If the available schema makes an index on this table possible, use it, replacing the table with indexed access
        indexes := joinIndexes[node.(sql.Nameable).Name()]
        indexToApply := indexes.getUsableIndex(schema)
        if indexToApply == nil {
            return node
        }

        node, err := plan.TransformUp(node, func(node sql.Node) (sql.Node, error) {
            switch node := node.(type) {
            case *plan.ResolvedTable:
                if _, ok := node.Table.(sql.IndexAddressableTable); !ok {
                    return node
                }

                keyExprs := createIndexLookupKeyExpression(indexToApply, exprAliases, tableAliases)
                keyExprs, err := FixFieldIndexesOnExpressions(scope, schema, keyExprs...)

                return plan.NewIndexedTableAccess(node, indexToApply.index, keyExprs)
            default:
                return node
            }
        })

        return node
    case *plan.IndexedJoin:
        // Recurse the down the left side with the input schema
        left, replacedLeft, err := replaceTableAccessWithIndexedAccess(node.Left(), schema, scope, joinIndexes, exprAliases, tableAliases)

       // then the right side, appending the schema from the left
       right, replacedRight, err := replaceTableAccessWithIndexedAccess(node.Right(), append(schema, left.Schema()...), scope, joinIndexes, exprAliases, tableAliases)

       // the condition's field indexes might need adjusting if the order of tables changed
       cond, err := FixFieldIndexes(scope, append(schema, append(left.Schema(), right.Schema()...)...), node.Cond)
       return plan.NewIndexedJoin(left, right, node.JoinType(), cond)
   default:
       // Other node types
       newChild, replaced, err := replaceTableAccessWithIndexedAccess(node.Child, schema, scope, joinIndexes, exprAliases, tableAliases)
       return node.WithChildren(newChild)
   }
}

Back to the beginning: choosing a table order

Now we're back to where we started: how do we decide which order tables should be accessed in? This part is relatively easy now that we've put all the other pieces together. To determine an optimal ordering, all we need is a set of the join conditions with index information attached. Then the function is pretty simple to write:

// orderTables returns an access order for the tables provided, attempting to minimize total query cost
func orderTables(tables []NameableNode, tablesByName map[string]NameableNode, joinIndexes joinIndexesByTable) []string {
    tableNames := make([]string, len(tablesByName))
    indexes := make([]int, len(tablesByName))
    for i, table := range tables {
        tableNames[i] = strings.ToLower(table.Name())
        indexes[i] = i
    }

    // generate all permutations of table order
    accessOrders := permutations(indexes)
    lowestCost := int64(math.MaxInt64)
    lowestCostIdx := 0
    for i, accessOrder := range accessOrders {
        cost := estimateTableOrderCost(tableNames, tablesByName, accessOrder, joinIndexes, lowestCost)
        if cost < lowestCost {
            lowestCost = cost
            lowestCostIdx = i
        }
    }

    cheapestOrder := make([]string, len(tableNames))
    for i, j := range accessOrders[lowestCostIdx] {
        cheapestOrder[i] = tableNames[j]
    }

    return cheapestOrder
}

But this is burying the lede a bit. Estimating the cost of a table ordering is the juicy part.

// Estimates the cost of the table ordering given. Lower numbers are better. Bails out and returns cost so far if cost
// exceeds lowest found so far. We could do this better if we had table and key statistics.
func estimateTableOrderCost(
	tables []string,
	tableNodes map[string]NameableNode,
	accessOrder []int,
	joinIndexes joinIndexesByTable,
	lowestCost int64,
) int64 {
	cost := int64(1)
	var availableSchemaForKeys sql.Schema
	for i, idx := range accessOrder {
		if cost >= lowestCost {
			return cost
		}

		table := tables[idx]
		availableSchemaForKeys = append(availableSchemaForKeys, tableNodes[table].Schema()...)
		indexes := joinIndexes[table]

		// If this table is part of a left or a right join, assert that tables are in the correct order. No table
		// referenced in the join condition can precede this one in that case.
		for _, idx := range indexes {
			if (idx.joinType == plan.JoinTypeLeft && idx.joinPosition == plan.JoinTypeLeft) ||
				(idx.joinType == plan.JoinTypeRight && idx.joinPosition == plan.JoinTypeRight) {
				for j := 0; j < i; j++ {
					otherTable := tables[accessOrder[j]]
					if colsIncludeTable(idx.comparandCols, otherTable) {
						return math.MaxInt64
					}
				}
			}
		}

		if i == 0 || indexes.getUsableIndex(availableSchemaForKeys) == nil {
			// TODO: estimate number of rows in table
			cost *= 1000
		} else {
			// TODO: estimate number of rows from index lookup based on cardinality
			cost += 1
		}
	}

	return cost
}

The engine doesn't have table statistics yet, so for the time being we're treating any full table scan as a factor of 1000, and any indexed lookup as constant cost. And we rule out join plans that access the primary tables in LEFT or RIGHT joins incorrectly by making them maximally expensive.

Future work

In our discussion so far, we concentrated on table ordering and the use of index lookups. This is an important aspect to join planning, but not the only one. We also need to consider different join strategies other than simple nested loop joins, such as the hash joins that SQL Server makes such extensive use of. For certain table sizes, it's much faster to load a table result set in memory and hash join it to the rest of the query than to do N indexed lookups into it. But this is work for another day.

Conclusion

It's been almost a year since we announced support for indexed joins for two tables. We've come a long way since then, rewriting large parts of the engine and adding tons of features. This latest improvement, indexed joins for any number of tables, was the most difficult addition I've made to the engine yet. It took a lot of careful thought and analysis, tons of experimentation and false starts, and a crazy amount of tests.

notes on joining

But it's also the most satisfying addition I've made. It takes the query engine from unusably slow to actually functional for a lot of customers. It makes the product seem like a real database, not just a cool toy. And it actually makes use of the hard parts of my computer science education, unlike nearly all my professional software experience. That's a good feeling.

Dolt is a great tool to learn how to use SQL, now with non-terrible performance on queries with three or more tables. Install Dolt today to try it out. Or if you aren't ready to download the tool or just want to ask questions, come chat with us on Discord. We're always happy to hear from new customers!

SHARE

JOIN THE DATA EVOLUTION

Get started with Dolt

Or join our mailing list to get product updates.