Implementing window functions in go-mysql-server

SQLREFERENCE
7 min read

Dolt is Git for Data, the first SQL database you can clone, fork, branch and merge. Its SQL engine is go-mysql-server. Our goal is to be a 100% compatible, drop-in replacement for MySQL, but we have a ways to go. Today we're excited to announce the newest piece of missing functionality: window functions!

What's a window function?

The simplest way to think of a window function is that it's like a group-by aggregate function, but it gets a value for every row in the result set, not just one value per grouping key. Like most things, this is easiest to explain with an example, which comes straight from the MySQL docs.

SELECT year, country, product, profit,
    SUM(profit) OVER() AS total_profit,
    SUM(profit) OVER(PARTITION BY country) AS country_profit
    FROM sales
    ORDER BY country, year, product, profit;
+------+---------+------------+--------+--------------+----------------+
| year | country | product    | profit | total_profit | country_profit |
+------+---------+------------+--------+--------------+----------------+
| 2000 | Finland | Computer   |   1500 |         7535 |           1610 |
| 2000 | Finland | Phone      |    100 |         7535 |           1610 |
| 2001 | Finland | Phone      |     10 |         7535 |           1610 |
| 2000 | India   | Calculator |     75 |         7535 |           1350 |
| 2000 | India   | Calculator |     75 |         7535 |           1350 |
| 2000 | India   | Computer   |   1200 |         7535 |           1350 |
| 2000 | USA     | Calculator |     75 |         7535 |           4575 |
| 2000 | USA     | Computer   |   1500 |         7535 |           4575 |
| 2001 | USA     | Calculator |     50 |         7535 |           4575 |
| 2001 | USA     | Computer   |   1200 |         7535 |           4575 |
| 2001 | USA     | Computer   |   1500 |         7535 |           4575 |
| 2001 | USA     | TV         |    100 |         7535 |           4575 |
| 2001 | USA     | TV         |    150 |         7535 |           4575 |
+------+---------+------------+--------+--------------+----------------+

So unlike a normal GROUP BY query that returns a single SUM for each grouping key, by using a window function you also retain access to the disaggregated row data. In this example, the clause OVER(PARTITION BY country) is roughly equivalent to GROUP BY country, except that we retain the individual result set rows. Most of the normal aggregate functions in MySQL can accept a window clause in this way.

But the real power of window functions is doing things that you can't achieve with a GROUP BY (or any other query method). Here's a query that computes a bunch of metrics for the rows in a table based on a particular ordering:

SELECT val,
    ROW_NUMBER()   OVER w AS 'row_number',
    CUME_DIST()    OVER w AS 'cume_dist',
    PERCENT_RANK() OVER w AS 'percent_rank'
    FROM numbers
    WINDOW w AS (ORDER BY val);
+------+------------+--------------------+--------------+
| val  | row_number | cume_dist          | percent_rank |
+------+------------+--------------------+--------------+
|    1 |          1 | 0.2222222222222222 |            0 |
|    1 |          2 | 0.2222222222222222 |            0 |
|    2 |          3 | 0.3333333333333333 |         0.25 |
|    3 |          4 | 0.6666666666666666 |        0.375 |
|    3 |          5 | 0.6666666666666666 |        0.375 |
|    3 |          6 | 0.6666666666666666 |        0.375 |
|    4 |          7 | 0.8888888888888888 |         0.75 |
|    4 |          8 | 0.8888888888888888 |         0.75 |
|    5 |          9 |                  1 |            1 |
+------+------------+--------------------+--------------+

Implementing the ROW_NUMBER() function

For our support of window functions in Dolt, we decided to start small and iterate. We built this feature to support a customer who need the functionality, and all they need for now is ROW_NUMBER(), so that's what we built so far.

But of course, even implementing a single window function involves building out a significant amount of foundation and scaffolding.

To understand our approach, it's probably simplest to compare and contrast with the existing Aggregate functions already supported. Here are the two interface definitions:

// Aggregation implements an aggregation expression, where an
// aggregation buffer is created for each grouping (NewBuffer) and rows in the
// grouping are fed to the buffer (Update). Multiple buffers can be merged
// (Merge), making partial aggregations possible.
// Note that Eval must be called with the final aggregation buffer in order to
// get the final result.
type Aggregation interface {
	Expression
	// NewBuffer creates a new aggregation buffer and returns it as a Row.
	NewBuffer() Row
	// Update updates the given buffer with the given row.
	Update(ctx *Context, buffer, row Row) error
	// Merge merges a partial buffer into a global one.
	Merge(ctx *Context, buffer, partial Row) error
}

// WindowAggregation implements a window aggregation expression. A WindowAggregation is similar to an Aggregation,
// except that it returns a result row for every input row, as opposed to as single for the entire result set. Every
// WindowAggregation is expected to track its input rows in the order received, and to return the value for the row
// index given on demand.
type WindowAggregation interface {
	Expression
	// WithWindow returns a version of this window aggregation with the window given
	WithWindow(window *Window) (WindowAggregation, error)
	// NewBuffer creates a new buffer and returns it as a Row. This buffer will be provided for all further operations.
	NewBuffer() Row
	// Add updates the aggregation with the input row given. Implementors must keep track of rows added in order so
	// that they can later be retrieved by EvalRow(int)
	Add(ctx *Context, buffer, row Row) error
	// Finish gives aggregations that need to final computation once all rows have been added (like sorting their
	// inputs) a chance to do before iteration begins
	Finish(ctx *Context, buffer Row) error
	// EvalRow returns the value of the expression for the row with the index given
	EvalRow(i int, buffer Row) (interface{}, error)
}

Both of these interfaces can be fed a row at a time, and are expected to create and update a Buffer they use to store intermediate results of the computations. The big difference is that for Aggregation implementations, order of the rows added doesn't matter. For window functions, this is crucial: every input row maps to exactly one output row of result. This means that window functions must keep track of which order the rows were added, and to produce the result for a given row number in the original ordering, to get the right answers.

For the ROW_NUMBER() function, this looks something like this:

// Add implements sql.WindowAggregation
func (r *RowNumber) Add(ctx *sql.Context, buffer, row sql.Row) error {
    rows := buffer[0].([]sql.Row)
    buffer[0] = append(rows, append(row, nil, r.pos))
    r.pos++
    return nil
}

So we are keeping a buffer of every row added, and we're adding an increasing count to the end of it. Then after all rows have been added, we can do the real work of the function:

  1. Sort the rows according to the window specification, first by partition key, then by any ORDER BY clause
  2. Number the rows in this new ordering, and then
  3. Sort them a final time to return them to the original order

This is pretty simple, and looks like this:

// Finish implements sql.WindowAggregation
func (r *RowNumber) Finish(ctx *sql.Context, buffer sql.Row) error {
	rows := buffer[0].([]sql.Row)
	if len(rows) > 0 && r.window != nil && r.window.OrderBy != nil {
		sorter := &expression.Sorter{
			SortFields: append(partitionsToSortFields(r.window.PartitionBy), r.window.OrderBy...),
			Rows:       rows,
			Ctx:        ctx,
		}
		sort.Stable(sorter)
		if sorter.LastError != nil {
			return sorter.LastError
		}

		// Now that we have the rows in sorted order, number them
		rowNumIdx := len(rows[0]) - 2
		originalOrderIdx := len(rows[0]) - 1
		var last sql.Row
		var rowNum int
		for _, row := range rows {
			// every time we encounter a new partition, start the count over
			isNew, err := r.isNewPartition(ctx, last, row)
			if err != nil {
				return err
			}
			if isNew {
				rowNum = 1
			}

			row[rowNumIdx] = rowNum

			rowNum++
			last = row
		}

		// And finally sort again by the original order
		sort.SliceStable(rows, func(i, j int) bool {
			return rows[i][originalOrderIdx].(int) < rows[j][originalOrderIdx].(int)
		})
	}

	return nil
}

Then when we're asked to retrieve the value of the function for a particular numbered row, it's a simple lookup:

// EvalRow implements sql.WindowAggregation
func (r *RowNumber) EvalRow(i int, buffer sql.Row) (interface{}, error) {
    rows := buffer[0].([]sql.Row)
    return rows[i][len(rows[i])-2], nil
}

Result

We implemented this functionality for a customer who needed it for a particular query they need to run. But we aren't at liberty to share those very interesting queries, so you'll have to content yourself with some test data.

> select * from mytable;
+---+------------+
| i | s          |
+---+------------+
| 1 | first row  |
| 2 | second row |
| 3 | third row  |
+---+------------+
> select i,
    row_number() over (order by i desc) + 3,
    row_number() over (order by length(s),i) as s_asc,
    row_number() over (order by length(s) desc,i desc) as s_desc
    from mytable order by 1;
+---+-----------------------------------------+-------+--------+
| i | row_number() over (order by i desc) + 3 | s_asc | s_desc |
+---+-----------------------------------------+-------+--------+
| 1 |                                       6 |     1 |      3 |
| 2 |                                       5 |     3 |      1 |
| 3 |                                       4 |     2 |      2 |
+---+-----------------------------------------+-------+--------+
> select i,
    row_number() over (partition by case when i > 2 then "under two" else "over two" end order by i desc) as s_asc
    from mytable order by 1;
+---+-------+
| i | s_asc |
+---+-------+
| 1 |     2 |
| 2 |     1 |
| 3 |     1 |
+---+-------+

Future work

There's a lot left to do! Besides implementing all the other window functions, there are a bunch of constructs missing from the current implementation that are incredibly useful. For example, using the ROW PRECEDING / FOLLOWING construct to do things like compute a running average:

SELECT
    time, subject, val,
    SUM(val) OVER (PARTITION BY subject ORDER BY time ROWS UNBOUNDED PRECEDING) AS running_total,
    AVG(val) OVER (PARTITION BY subject ORDER BY time ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS running_average
    FROM observations;
+----------+---------+------+---------------+-----------------+
| time     | subject | val  | running_total | running_average |
+----------+---------+------+---------------+-----------------+
| 07:00:00 | st113   |   10 |            10 |          9.5000 |
| 07:15:00 | st113   |    9 |            19 |         14.6667 |
| 07:30:00 | st113   |   25 |            44 |         18.0000 |
| 07:45:00 | st113   |   20 |            64 |         22.5000 |
| 07:00:00 | xh458   |    0 |             0 |          5.0000 |
| 07:15:00 | xh458   |   10 |            10 |          5.0000 |
| 07:30:00 | xh458   |    5 |            15 |         15.0000 |
| 07:45:00 | xh458   |   30 |            45 |         20.0000 |
| 08:00:00 | xh458   |   25 |            70 |         27.5000 |
+----------+---------+------+---------------+-----------------+

None of that works yet, but will in the near future. Additionally, there are some other limitations in the current release of Dolt:

  • No named windows
  • Can't mix window functions with GROUP BY
  • Can't mix window functions and aggregate functions unless they have an OVER() clause

We'll be addressing these shortcomings in the following weeks and months.

Conclusion

This release gets us one big step closer to being a 100% compatible drop-in MySQL replacement. Window functions are an advanced query feature, but they're indispensable for certain use cases, and we want to support all the use cases MySQL does. Try it out yourself by downloading Dolt today! Or if you just want to come chat with us first, join our discord server! We're always happy to hear from new prospective customers.

SHARE

JOIN THE DATA EVOLUTION

Get started with Dolt

Or join our mailing list to get product updates.