Dolt with Popular DataFrame Tools

PYTHONINTEGRATION
6 min read

Dolt is a version-controlled SQL database. For data science (DS) workflows, specifically, Dolt uses data versioning primitives to implement unique flavors of reproducibility.

DataFrames are a common interface for exploring CSV, Parquet and other file types. Combining the ergonomics of DS tools with the performance, scalability and reliability of databases can yield fantastic results, exhibited most-famously by Uber’s Michelangelo platform. At the same time, synchronizing legacy and bleeding-edge data tools is difficult. Today we will load Dolt data into three types of DataFrames to show how near at hand production-ready and reproducible machine learning is with the tools you already use everyday:

0: Installing Dolt

All examples require the dolt binary to be installed separately:

> sudo bash -c 'curl -L https://github.com/dolthub/dolt/releases/latest/download/install.sh | sudo bash'

1: Pandas DataFrames

Pandas integrates SQL natively and can generate its own dataframes given a valid SQL server connection.

A handful of dependencies are required for Pandas-to-MySQL database access:

> pip install pandas sqlalchemy pymysql

Exposing Dolt's SQL interface requires running a server in the background that Pandas can communicate with (refer to the CLI vs SQL interface article for a refresher):

> mkdir -p test_db
> cd test_db
> dolt init
> dolt sql-server -l trace
Starting server with Config HP="localhost:3306"|U="root"|P=""|T="28800000"|R="false"|L="trace"

Pandas can now load keyless tables into Dolt: localhost:3306:

In [1]: import pandas as pd
   ...: from sqlalchemy import create_engine
   ...: engine = create_engine("mysql+pymysql://root@localhost/test_db", echo=False)

In [2]: with engine.begin() as connection:
   ...:     df = pd.DataFrame({"name" : ["User 4", "User 5"]})
   ...:     df.to_sql("users", con=connection, index=False, if_exists="append")

In [3]: engine.execute("SELECT * FROM users").fetchall()
Out[3]: [(None, 'User 4'), (None, 'User 5')]

Normally we use dolt's command-line-interface to persist commits to version-control, but we can use the SQL sesion to do the same thing:

In [4]: engine.execute("SELECT DOLT_ADD('users')").fetchall()
Out[4]: [(0,)]

In [5]: engine.execute("SELECT DOLT_COMMIT('-m', 'Add users table')").fetchall()
Out[5]: [('03562lom2sobtv21t28o7u49kpa5hg7e',)]

In [6]: with engine.begin() as connection:
    ...:     brances = pd.read_sql("select * from dolt_branches;", con=connection)
    ...: branches
Out[6]:
     name                              hash latest_committer latest_committer_email      latest_commit_date latest_commit_message
0  master  03562lom2sobtv21t28o7u49kpa5hg7e  Bojack Horseman    bojack@horseman.com 2021-03-20 17:14:57.094       Add users table

2: Dask DataFrames

Dask is a progression of Pandas for special use-cases. Out-of-memory DataFrames facilitate batching and/or distributed computing over large datasets. Dask copies the Pandas DataFrame interface to make the transition experience smooth for Python users.

To get started with Dask, first install its dataframe extension:

> pip install dask[dataframe]

We will use the hospital price transparency dataset for this example, which at ~20 GB is too big for Pandas to load (and therefore takes 15-30 minutes to download):

> dolt clone hospital-price-transparency

Similar to example (1), we expose dolt as an sql-server that Dask understand how to access:

> cd hospital-price-transparency
> dolt sql-server -l trace
Starting server with Config HP="localhost:3306"|U="root"|P=""|T="28800000"|R="false"|L="trace"

Dask's interface should feel familar after example (1).

In [1]: import dask.dataframe as dd
In [2]: conn = "mysql+pymysql://root@localhost/hospital_price_transparency"
In [3]: df = dd.read_sql_table(
                 table="prices",
                 uri=conn,
                 index_col="code",
                 divisions=list('acegikmoqsuwz'))
In [4]: df.head(npartitions=5)
Out[4]:
       npi_number                                      payer    price code__1
code
c9132  1932148947   Anthem Commercial Out of Network IP RATE  3407.10   c9132
c9132  1932148947                      Aetna HMO/PPO OP RATE  3116.25   c9132
c9132  1932148947  Alignment Medicare Adv_ HMO / PPO OP RATE     2.09   c9132
c9132  1932148947   Anthem Commercial Out of Network OP RATE  3656.40   c9132
c9132  1932148947                    Anthem Medi-Cal OP RATE    10.83   c9132

Dask executed the query above with a where-clause partition, loading a subset of data to avoid memory bottlenecks. Dolt's trace-logging reveals a bit about how that process works: (logged from the dolt sql-server with -l trace mode):

TRACE: received query SELECT prices.code, prices.npi_number, prices.payer, prices.price, prices.code AS code__1
FROM prices
WHERE prices.code >= 'i' AND prices.code < 'k'

If we try to read the hospital data with Pandas and SQLAlchemy, dolt spins at its I/O limit until an OOM (unless your computer can comfortably load the whole dataset into memory). I killed the process after letting it run for a few minutes:

...
TRACE: returning result row [VARCHAR("52640") CHAR("1922076603") VARCHAR("i_GOLDEN_RULE_INS_CO__S__1350_130146") DECIMAL(3784.00)]
TRACE: returning result row [VARCHAR("52640") CHAR("1922076603") VARCHAR("i_UMR__S__1350_170130") DECIMAL(3784.00)]
TRACE: returning result row [VARCHAR("33019") CHAR("1790717650") VARCHAR("TUFTS HEALTH PUBLIC PLANS [1013]") DECIMAL(5310.00)]
TRACE: returning result row [VARCHAR("33019") CHAR("1790717650") VARCHAR("UNICARE GIC [1015]") DECIMAL(5310.00)]
ERROR: Error in the middle of a stream to client 1 (127.0.0.1:51376): conn 1: Write(packet) failed: write tcp 127.0.0.1:3306->127.0.0.1:51376: write: broken pipe (errno 1105) (sqlstate HY000)

3: Spark DataFrames

Spark is a distributed ETL platform written in Scala. Spark's compute model shares many of the same relational design principles as Dolt, and both execute a range of SQL commands. Unlike Dolt, Spark is not a storage engine and depends on S3 or HDFS. Spark also executes ETL jobs in a distributed master-executor model that works best with ten-to-thousand node clusters. With a little extra setup (compared to the first two examples) we will demonstrate how to load Spark DataFrame objects using Dolt and Pyspark.

An in-depth guide on how to acquire all of the dependencies for Spark is beyond the scope of this blog (this blog gets most of the way there), but the installation steps are loosely:

  • Install Java 8
  • Install Scala 2.11
  • Install Spark 3.1.1
  • Point SPARK_HOME and PATH to the new Spark libexec and libexec/bin, respectively
  • Download the "platform independent" JDBC connector
  • Unzip mysql-connector-java-8.0.23.jar and move to your spark jars folder. (there are several options for updating spark confs, this is just one.)
  • pip install pyspark

The "spark job" below accesses our dolt sql-server to read and print data:

# dolt_pyspark.py
from pyspark.sql import SQLContext
import pyspark

if __name__=="__main__":
    sc = pyspark.SparkContext('local[*]')
    sqlContext = SQLContext(sc)

    df = sqlContext.read.format("jdbc").options(
        url="jdbc:mysql://localhost:3306/hospital_price_transparency",
        driver="com.mysql.cj.jdbc.Driver",
        dbtable="hospitals",
        user="root",
        password="").load()

    df.show()

We add two mysql driver jar flags to spark-submit before running:

> spark-submit \
    --driver-class-path=/usr/local/Cellar/apache-spark/3.1.1/libexec/jars/mysql-connector-java-8.0.23.jar \
    --jars=/usr/local/Cellar/apache-spark/3.1.1/libexec/jars/mysql-connector-java-8.0.23.jar \
    dolt_pyspark.py
...
+------------+--------------------+--------------------+--------------------+---------------+-----+----------+------------+
|  npi_number|                name|                 url|      street_address|           city|state|  zip_code|publish_date|
+------------+--------------------+--------------------+--------------------+---------------+-----+----------+------------+
|1003139775.0|        HCA Virginia|https://hcavirgin...|901 E. Cary St Su...|       Richmond|   VA|      null|  2021-01-01|
|  1003260480|Brookwood Baptist...|https://www.brook...|2010 Brookwood Me...|     Birmingham|   AL|     35209|        null|
|  1003281452|  Henderson Hospital|https://uhsfilecd...|1050 West Galleri...|      Henderson|   NV|     89011|  2021-01-01|
|  1003362997|CHI Health St. El...|https://www.chihe...|     555 S. 70Th St.|        Lincoln|   NE|     68510|  2021-01-01|
|  1003389206|Merrill pioneer h...|https://www.avera...|1100 S 10th Ave, ...|    Rock Rapids|   IA|51246-2020|        null|
|  1003811290|   Providence Health|http://www.yourpr...|   2435 Forest Drive|       Columbia|   SC|     29204|        null|
|  1003833013|ASCENSION SETON H...|https://healthcar...|     3201 S WATER ST|         BURNET|   TX|78611-4510|        null|
|1003853185.0|   Dominion Hospital|https://dominionh...|901 E. Cary St Su...|       Richmond|   VA|      null|  2021-01-01|
|  1003858408|Frisbie Memorial ...|https://frisbieho...|   11 Whitehall Road|      Rochester|   NH|     03867|  2021-01-01|
|1003858408.0|Frisbie Memorial ...|https://frisbieho...|   11 Whitehall Road|      Rochester|   NH|    3867.0|  2021-01-01|
|1003862053.0|Doctors Hospital ...|https://doctorsof...|   5731 Bee Ridge Rd|       Sarasota|   FL|   34233.0|  2021-01-01|
|  1003865825|WILLIAM BEAUMONT ...|https://www.beaum...|   3601 W 13 MILE RD|      ROYAL OAK|   MI|48073-6712|  2021-01-16|
|  1003873225|The Specialty Hos...|https://www.rushh...|       1314 19th Ave|       Meridian|   MS|39301-4116|        null|
|  1003887266|Avera Missouri Ri...|https://www.avera...|  606 E Garfield Ave|     Gettysburg|   SD|57442-1325|        null|
|  1003905092|       Kula Hospital|https://res.cloud...|  628 Seventh Street|          Lanai|   HI|     96763|        null|
|1003908443.0|Mercy Miami Hospital|https://mercymiam...|    3663 S Miami Ave|          Miami|   FL|   33133.0|  2021-01-01|
|  1013017839|ALEXIAN BROTHERS ...|https://healthcar...|1650 MOON LAKE BO...|HOFFMAN ESTATES|   IL|     60169|        null|
|  1013062769| Zion Medical Center|https://healthy.k...|       4647 Zion Ave|      San Diego|   CA|     92120|  2021-01-01|
|  1013085083|Baylor Scott & Wh...|https://www.bswhe...|     546 N Kegley Rd|         Temple|   TX|     76502|        null|
|  1013100692|  Abrazo West Campus|https://www.abraz...|13677 West McDowe...|       Goodyear|   AZ|     85395|        null|
+------------+--------------------+--------------------+--------------------+---------------+-----+----------+------------+
only showing top 20 rows

Conclusion

Dolt combines a familiar SQL interface with Git-like version control. That combination gives data scientists and engineers powerful tools for defining reproducible data pipelines. Implementing a widely adopted SQL protocol allows Dolt to drop into existing tools used by the data science community.

Every week Dolt adds new support for tools used by engineers and scientists. If you work with a tool or product that you think may benefit from versioning, merging, diffing, logging, or some other form of lineage and reproducibility reach out to us on our Discord. We would love to hear about your experiences with data applications.

SHARE

JOIN THE DATA EVOLUTION

Get started with Dolt

Or join our mailing list to get product updates.