Previously I wrote about a data importing ai agent that could take local files and import them into a MySQL compatible database. While the agent was able to do simple imports, it wasn’t robust. Because of the non-deterministic nature of LLMs, the simple jobs it achieved one time may fail the next. Additionally, it took little effort to find cases that the agent wouldn’t ever get right.
In this blog, I’ll start by digging into the cases where our agent failed, and make iterative improvements to the agent until we have a much more robust data importing agent. I’ll talk through the various issues that I encountered along the way, and then go through the implementation of the latest agent’s plan execution strategy.
Initial Failures#
During the development of the initial agent, I had a few test cases that I used to test the agent’s capabilities. However, as soon as new import jobs were attempted, new failure modes were discovered. As an example, Tim downloaded a CSV file containing information on the first 43 US presidents and asked the agent to import it with the following prompt:
There's a dolt database on localhost port 3306. I want you to make a database of US Presidents. It can be a single table with their name, term, and party affiliation"
This doesn’t seem any more complicated than the import jobs the agent had already done, but the agent failed multiple times. Each time the agent would properly extract the correct data from the prompt, ask some reasonable clarifying questions, propose tables to alter or create as appropriate, and then create what looked like a reasonable import plan. However, when it came time to execute the plan, the agent would fail.
I tried different test jobs, and found that the agent would often fail in similar ways with other prompts, and datasets. Planning and plan execution were just not reliable.
Looking at Successful Agents#
The agent I have had the greatest success with is Claude Code. It seems like what makes it so good is that it doesn’t trust that the work it does is correct. Instead, it has a strong feedback loop where it checks its work at every step. If something is wrong, it goes back and tries again. LLMs are not reliable, and this feedback loop is essential to getting good results.
First Attempt Implementing a Plan Execution Feedback Loop#
Initially, after each attempt to execute a plan step, I would make a follow-up LLM call to check if the step was successful. If not, the agent would try again. This provided some improvement for the cases where the agent did something obviously wrong and could fix its mistake. But take the example plan below:
1. Write a script to read presidents.csv and update it to only include valid presidents.
2. Run the script to update presidents.csv.
3. Write a script to import presidents.csv into the table 'presidents'
4. Run the import script.
5. Verify the data in the 'presidents' table.
Now if our first step has a bug that causes it to errantly delete a row or two, then our agent will never be able to recover because the source input file was lost. We need to make sure that the steps to our plan never modify their inputs in place, and that each step produces new outputs. We will fix this with idempotent execution.
A second type of error could occur if the plan was something like:
1. Write a script to read presidents.csv and output transformed.csv with columns: id (int), name (string), term_start (date), term_end (date), party (string).
2. Run the script to generate transformed.csv.
3. Write a script to import transformed.csv into the table 'presidents'
4. Run the import script.
5. Verify the data in the 'presidents' table.
In this case, if the agent fails to write the script in step 1, our feedback loop will catch that and have the agent try again. However, if the agent writes a script that looks correct, but has a subtle bug that causes it to produce incorrect output, the agent will not be able to catch that until a later step. Now if the mistake isn’t caught until step 5, and then the feedback loop tells the agent to repeat step 5, the agent will just keep failing. The agent needs to be able to go back to earlier steps in the plan when it detects a failure. This is a challenge, because an error in step 5 may be caused by a mistake in steps 1-5. We will address this with the goal-based planning approach described below.
Idempotent Execution#
We need to make sure that each step in our plan is idempotent. Running the step multiple times should produce the same result as running it once. This can be achieved by ensuring our inputs are the same each time we run a step.
Data Processing Steps#
Data processing steps take our input files, do some level of processing, and produce new output files. By outputting new files rather than modifying our input files in place, we can ensure that re-running a step after a failure won’t propagate errors.
Database Modification Steps (Goodbye MySQL Compatibility)#
Now steps that modify the database also need to be idempotent. We want our database in the same state each time we attempt to accomplish one of the steps in our plan. One way to achieve this is to make your database modifications in a transaction and roll back if an issue is found, however, if we are inserting millions of rows over a long-running job, this is fragile. A lost connection could cause the entire transaction to be lost. Additionally, you would need to keep your transaction open and your verification step would need to be part of the same transaction. Lastly what if your database is being written to by others at the same time your import job is running. This would make it impossible to guarantee a MySQL database is in the same state if a step needs to be re-run.
This is where I decided the agent being compatible with any MySQL database was no longer feasible. Instead, I made use of many core features that are unique to Dolt databases.
Branches#
As MySQL compatibility was no longer a requirement, the first step was to make each import jub run on its own branch. This gives the agent an isolated environment to work in, and allows it to make changes without affecting other branches until the branch is merged. This allows our agent to run against a Dolt database server that others are using without worrying about interfering with their work. It also gives the option of leaving the branch unmerged so the user can verify the results before merging.
The first step in supporting this workflow is to alter prompt extraction to include the base branch to create the import branch from. Once we have that, we can create the branch for the import job to run in.
Commits#
Another change we will make is that we will commit any changes we make to the database before planning, and executing the steps of the plan. Then, we will add a commit after each step in the plan is successfully executed. If a step fails and needs to be re-run, we can simply reset the branch to the last successful commit. Additionally, if we ever get to a place where we need to abandon the plan, and create a brand new one, we can reset the branch to the initial commit, and start over. Lastly, once we have completed the import job, all the work is isolated in the import branch, and the user can query it and verify the results before merging it.
Diffs#
Lastly, Dolt provides a powerful way to verify the results of our database modification steps.
Dolt is a fully version controlled database, which allows us to see what has changed. This allows
us to provide the agent with powerful tools to verify the results of its work. We proide the agent with the function tools
get_table_diffs, get_table_diff_counts, and get_database_diff_counts which allow the agent to see exactly what has
changed in the database. Each of these functions will return results based on changes that have occurred since the last commit.
Goal Based Planning#
Instead of creating a list of tasks to execute in order, we want a list of goals, and a way to verify that each goal is achieved. The way in which the goal is accomplished is not the concern of the planning stage, only defining what needs to be accomplished and how to test it. An example goal-based plan for our presidents import job could look like this:
1. goal: Validate and sanitize the data in /Users/brian/datasets/csv/presidents.csv to ensure that each row conforms to the expected structure (columns: presidency, president, wikipedia_entry, took_office, left_office, party, portrait, thumbnail, home_state) and all values in the 'presidency' column are unique integers and non-null.
test: Produce a new sanitized CSV in ./.d0/cde3a2eb-e81d-4bd6-837c-25e824a8c255 containing only valid rows. Test by checking the file has valid columns, unique non-null integers in 'presidency', and discard or log any malformed or duplicate rows.
2. goal: Transform the sanitized CSV to ensure all field types match target database schema requirements (e.g., trim strings, enforce varchar length limits, check date formats).
test: Produce a transformed CSV file in ./.d0/cde3a2eb-e81d-4bd6-837c-25e824a8c255. Inspect sample rows to ensure that string columns do not exceed the defined lengths and all fields can be inserted into the target schema without truncation or errors.
3. goal: Import all rows from the transformed CSV into the 'presidents' table in the sink database.
test: After import, use get_table_diffs and get_table_diff_counts on the 'presidents' table to ensure that the expected number of rows were added and that the values match those in the transformed CSV.
To achieve this, we define the structure for our goal-based plan as follows:
class Goal(BaseModel):
something_to_accomplish: str
way_to_test_accomplishment: str
class GoalBasedPlan(BaseModel):
goals: list[Goal]
summary: Optional[str] = None
With that defined, we define our goal-based planning agent like so:
plan_agent = Agent(
name="Task Planning Agent",
instructions = f"""
You are a task planning assistant. Given information on the sources of information, the schema of the destination database,
and a plan for where data should be moved from and to, your task is to create a series of goals that needs to be accomplished
in order to complete the data import job. Each goal should be clear and specific and should be testable. An example goal
may be to take an input file and output a sanitized file that is transformed to match the destination schema. This can be
tested by inspecting the output file. The way in which each goal is accomplished is not your concern, just that the goal
is clear and testable, and that it does not modify its input data, instead it should create new files with the results. Dolt
databases are version controlled like dolt, so any changes made to the database can be rolled back if needed, and diffs
can be inspected. Changes to database tables should be tested by using the function tools:
get_table_diffs: to get the diffs for the tables that were expected to be changed by the goal
get_table_diff_counts: to get the number of rows added, modified, and deleted in a specified table
get_database_diff_counts: to get the number of rows added, modified, and deleted in all tables in the database
It is likely that the amount of data being processed may be very large. You should consider goals that filter out bad data,
or data that is not needed early in the process. Do not modify source files, instead goals should specify creating new files
with the results of any transformations. These files should be easily testable to verify that the goal was accomplished. Do
not validate the entire data source. Instead transform data to the desired schema, and filter out any rows that cannot be
transformed to valid data for import.
It is possible that you have tried to create a plan for this data import job before, and that plan failed to execute successfully.
If there are any previous failed plans, take into account the reasons why those plans failed when creating this new plan. If a previous
plan has been attempted, then any database modifications made have been rolled back and you should try to overcome the issues faced in previous attempts.
""",
output_type=GoalBasedPlan
)
Our planning agent is told to take the data sources, the destination database schema, and the overview of where data should be moved from and to, and create a goal-based plan for the data import job. It is told to test that each goal is satisfied by reading the output files or inspecting the database diffs after each goal is accomplished. It is also told goals should not modify input files, but create new ones. Lastly, we tell it to take into account any previous failed plans, and the reasons for their failures. This will allow us to retry by generating a new plan that overcomes the issues faced in previous attempts.
With that defined, we will also define a plan feedback agent that can take user feedback and update the plan accordingly
plan_feedback_agent = Agent(
name = "Plan Feedback Agent",
instructions = """
You are a data import assistant that looks back at the previously proposed goal-based data import plan and incorporates feedback from
the user. You should return an updated plan with a list of steps to be taken in order and a brief summary of the overall plan.
""",
output_type = GoalBasedPlan,
strict=False
)
With those defined we can implement our plan creation and feedback loop as follows:
async def run_plan_agent(conn, sess, temp_dir, source_schemas:list[SourceSchema], db_schema:dict[str,str], destination_plan:str, failed_plans : list[PlanExecutionFailure]) -> GoalBasedPlan:
prompt = f"The sources are {list_of_basemodels_to_json(source_schemas)}\n"
prompt += f"\nThe database schema is {json.dumps(db_schema)}\n"
prompt += f"\nThe plan for where data should be moved from and to is: {destination_plan}\n"
prompt += (f"\nYou can use the temp directory at {temp_dir} for any local files you need to create or download"
f"(but there is no need to copy files that are already local here).\n")
if len(failed_plans) > 0:
prompt += f"\nThe following previous plans failed to execute successfully: {list_of_basemodels_to_json(failed_plans)}\n"
try:
print("Planning data import...")
plan = await plan_agent.run(conn, prompt, sess)
except Exception as e:
print(f"Error creating plan: {e}")
sys.exit(1)
while True:
print("Data import plan created:")
print("steps")
for i, goal in enumerate(plan.goals):
print(f" {i+1}. goal: {goal.something_to_accomplish} test: {goal.way_to_test_accomplishment}")
answer = ""
while answer not in ["1", "2", "3"]:
print(f"""\nWould you like to:
1. Execute this plan to import the data as described.
2. Make changes to the import the plan.
3. Abort the data import job.""")
answer = input("(1/2/3)> ")
print()
if answer == "1":
return plan
elif answer == "2":
try:
prompt = input("What changes would you like to make to the plan?> ")
print()
plan = await plan_feedback_agent.run(conn, prompt=prompt, session=sess)
except Exception as e:
print(f"Error updating inferred sink tables: {e}")
sys.exit(1)
elif answer == "3":
print("Aborting data import job.")
sys.exit(0)
Executing the Goal Based Plan#
Now that we have a list of testable goals, we’ll process them in order. To achieve each goal we will have our agent write a script which takes the inputs defined in the goal, and produces the goal’s outputs.
class ScriptWritingOutput(BaseModel):
script_file_path: str
pip_dependencies: Optional[list[str]] = []
explanation: Optional[str] = ""
expected_files_created: Optional[list[str]] = []
expected_db_tables_changed: Optional[list[str]] = []
error : Optional[str] = ""
script_writing_agent = Agent(
name = "Goal Script Writing Agent",
instructions = """
A previous agent created a goal-based plan for importing data from one or more data sources to a mysql compatible database.
Your task is to take the provided goal from the plan, and write a python script to accomplish that goal in the provided temp
directory. Your script should process records one by one. Row processing should handle errors, and issues with one row
should not impact the next. At the end the script output a summary of how many rows were processed, how many rows failed
to process, a list of the most common reasons rows failed to process, and any other relevant information about the
processing that was done. Make sure that when you complete that the script file is written to the filesystem in the provided
temp directory.
Once the script is written to the filesystem, the list of "pip_dependencies" returned will be installed and then it will be executed, and
then a testing agent will look at the results, along with the script's output to determine if the goal was accomplished
successfully. If the goal was not accomplished, the results will be rolled back and you will be run again with information
about what went wrong so you can write a new script to try to accomplish the goal again. Any debug output you put in the script
may help you write a better script the next time if the goal is not accomplished.
This may be the first run, or there may be information on previous runs in the context. If there is information from previous
failed attempts, use that information to write a better script that overcomes the issues faced in previous attempts and write
that script to a new file in the provided temp directory. Any modifications made to the database during previous attempts
will be rolled back before you are run again, so your script should not assume any previous changes to the database exist.
""",
tools = [file_write, read_file, file_exists],
output_type = ScriptWritingOutput
)
The instructions specify to write a python script to accomplish the goal. I found that agents would write scripts that would succeed or fail on an all or nothing basis. This could be due to a bug in the script, or a data issue in as few as one row causing the entire script to fail. To mitigate this, the instructions specify that the script should process records one by one, and handle errors on a per-row basis. This way, if one row has an issue, it won’t cause the entire script to fail. The script should output a summary of how many rows were processed, how many rows failed to process, and the most common reasons for failure. This information is useful for verifying the goal was accomplished, and for writing a better script if the goal was not accomplished.
The structured output returned will include the path to the script file, any pip dependencies needed to run the script, an explanation of what the script does, the expected files created, the expected database tables changed. Additionally, if there was an error in writing the script, that will be included in the output as well.
We have two different types of goals to verify. One type is a file processing goal in which an input file is processed, and an output file is created. The other type is a database modification goal in which data is read from a file and database tables are modified. We’ll define two different agents to verify each type of goal.
Both agents will return the following structured output:
class GoalTestingOutput(BaseModel):
goal_accomplished: bool
analysis_of_failures: Optional[str] = ""
error: Optional[str] = ""
And our two goal testing agents will be defined as follows:
goal_testing_agent_with_file_outputs = Agent(
name = "Goal Testing Agent - File Outputs",
instructions = """
You are a goal testing assistant. A python script was written and executed to accomplish a specific goal in a data import job.
Your task is to look at the output logged by the script, files written by the script and determine if the goal was
accomplished successfully. If the goal was not accomplished, the 'analysis_of_failures' field should contain details that
would help the script writing agent write a new script to try to accomplish the goal again. If you failed to test whether
the goal was accomplished, the 'error' field should contain details about what went wrong during testing.
""",
tools = [ read_file, read_lines, file_exists, file_size ],
output_type=GoalTestingOutput
)
goal_testing_agent_with_table_changes = Agent(
name = "Goal Testing Agent - Dolt Outputs",
instructions = """
You are a goal testing assistant. A python script was written and executed to accomplish a specific goal in a data import job.
Your task is to look at the output logged by the script, files written by the script, tables modified by the script,
and determine if the goal was accomplished successfully. If the goal was not accomplished, the 'analysis_of_failures' field
should contain details that would help the script writing agent write a new script to try to accomplish the goal again. If
you failed to test whether the goal was accomplished, the 'error' field should contain details about what went wrong during testing.
""",
tools = [ read_file, read_lines, file_exists, file_size, get_table_diffs, get_table_diff_counts, get_database_diff_counts ],
output_type=GoalTestingOutput
)
Both agents have similar instructions. The only real difference is that the database modification goal testing agent has access to the Dolt specific tools to inspect database changes.
We’ll also define structures for handling errors during plan execution:
class PastAttempt(BaseModel):
script_path: str = ""
captured_output_path: str = ""
return_code: int = 1
expected_files_created: Optional[list[str]] = []
expected_db_tables_changed: Optional[list[str]] = []
pip_dependencies: Optional[list[str]] = []
goal_test_output: GoalTestingOutput
class PlanExecutionFailure(BaseModel):
plan: GoalBasedPlan
failed_goal: Goal
failed_goal_attempts: list[PastAttempt]
Our PastAttempt structure will hold information about a single failed attempt to accomplish a goal. The PlanExecutionFailure structure is used when the maximum number of attempts to accomplish a goal has been reached, and we need to create a new plan. It will hold the failed plan, the goal that we failed to accomplish, and the list of past attempts to accomplish that goal.
With all that defined, we can implement our goal-based plan execution as follows:
async def accomplish_goal(conn, sess, goal, dolt):
failed_attempts = []
initial_hash = dolt.current_commit_hash
retries_str = os.environ.get("D0_GOAL_RETRIES")
retries = to_int_or_default(retries_str, 5)
didnt_write_script_count = 0
while True:
dolt.reset_hard(initial_hash)
if len(failed_attempts) - didnt_write_script_count > 0:
print("\tfailed to achieve goal: " + failed_attempts[-1].goal_test_output.analysis_of_failures)
print(f"\t\nretrying {len(failed_attempts) - didnt_write_script_count}/{retries}")
if len(failed_attempts) - didnt_write_script_count > retries:
print(list_of_basemodels_to_json(failed_attempts, indent=2))
return False, failed_attempts
Our implementation starts by initializing structures to manage failed attempts and retries as well as a loop which will run until the goal is accomplished or the maximum number of retries is reached.
try:
prompt = f"""
The goal to accomplish is: {goal.something_to_accomplish}
The way the goal will be tested is: {goal.way_to_test_accomplishment}
The past attempts to accomplish this goal from oldest to newest are: {list_of_basemodels_to_json(failed_attempts)}
If you connect to the database use the connection string: {dolt.connection_string}
"""
print("\twriting script")
script_writing_output = await script_writing_agent.run(conn, prompt, sess)
except Exception as e:
failed_attempts.append(PastAttempt(
script_path=script_writing_output.script_file_path,
goal_test_output=GoalTestingOutput(
goal_accomplished=False,
analysis_of_failures=f"There was an error writing the script: {str(e)}"
)
))
continue
if script_writing_output.error is not None and len(script_writing_output.error) > 0:
failed_attempts.append(PastAttempt(
script_path=script_writing_output.script_file_path,
goal_test_output=GoalTestingOutput(
goal_accomplished=False,
analysis_of_failures=f"Script writing agent reported an error: {script_writing_output.error}"
)
))
continue
Now we’ll run our script writing agent to get the script to accomplish the goal. If there is an error in writing the script, we’ll add that to the list of failed attempts and continue to the next iteration of the loop.
path = os.path.abspath(script_writing_output.script_file_path)
if not os.path.exists(path):
didnt_write_script_count += 1
failed_attempts.append(PastAttempt(
script_path=script_writing_output.script_file_path,
goal_test_output=GoalTestingOutput(
goal_accomplished=False,
analysis_of_failures=f"Script file '{path}' does not exist after script writing step."
)
))
continue
if script_writing_output.pip_dependencies is not None and len(script_writing_output.pip_dependencies) > 0:
for dep in script_writing_output.pip_dependencies:
try:
# print(f"Installing pip dependency '{dep}' for goal '{goal.something_to_accomplish}'")
subprocess.run([sys.executable, "-m", "pip", "install", dep], check=True, capture_output=True)
except Exception as e:
continue
print("\trunning script")
runner = ScriptRunner(path)
print("\twriting output to: " + os.path.abspath(runner.captured_output_path))
runner.run()
At the start of this block, we checked if the script file was missing from disk. This was such a frequent occurrence that I added a specific check for it to increment a separate counter which does not count against our retry attempts. If the script file exists, we’ll install any pip dependencies needed to run the script, and then run the script.
try:
prompt = f"""
The script written is at: {script_writing_output.script_file_path}
The return code from executing the script is: {runner.return_code}
The stdout and stderr streams were written to: {runner.captured_output_path}
"""
if script_writing_output.expected_db_tables_changed is not None and len(script_writing_output.expected_db_tables_changed) > 0:
prompt += f"\nThe database tables expected to be changed by the script are: {json.dumps(script_writing_output.expected_db_tables_changed)}\n"
print("\ttesting goal with dolt table changes")
goal_test_output = await goal_testing_agent_with_table_changes.run(conn, prompt, sess)
else:
if script_writing_output.expected_files_created is not None and len(script_writing_output.expected_files_created) > 0:
prompt += f"\nThe files expected to be created by the script are: {json.dumps(script_writing_output.expected_files_created)}\n"
print("\ttesting goal with file outputs")
goal_test_output = await goal_testing_agent_with_file_outputs.run(conn, prompt, sess)
except Exception as e:
failed_attempts.append(PastAttempt(
script_path=script_writing_output.script_file_path,
captured_output_path=runner.captured_output_path,
return_code=runner.return_code,
expected_files_created=script_writing_output.expected_files_created,
expected_db_tables_changed=script_writing_output.expected_db_tables_changed,
pip_dependencies=script_writing_output.pip_dependencies,
goal_test_output=GoalTestingOutput(
goal_accomplished=False,
analysis_of_failures=f"There was an error testing the goal: {str(e)}"
)
))
continue
if goal_test_output is not None:
if goal_test_output.error is not None and len(goal_test_output.error) > 0:
print(f"Error testing goal for step '{goal.something_to_accomplish}': {goal_test_output.error}")
raise Exception(f"Error testing goal '{goal.something_to_accomplish}': {goal_test_output.error}")
if goal_test_output.goal_accomplished:
return True, None
else:
failed_attempts.append(PastAttempt(
script_path=script_writing_output.script_file_path,
captured_output_path=runner.captured_output_path,
return_code=runner.return_code,
expected_files_created=script_writing_output.expected_files_created,
expected_db_tables_changed=script_writing_output.expected_db_tables_changed,
pip_dependencies=script_writing_output.pip_dependencies,
goal_test_output=goal_test_output
))
else:
failed_attempts.append(PastAttempt(
script_path=script_writing_output.script_file_path,
captured_output_path=runner.captured_output_path,
return_code=runner.return_code,
expected_files_created=script_writing_output.expected_files_created,
expected_db_tables_changed=script_writing_output.expected_db_tables_changed,
pip_dependencies=script_writing_output.pip_dependencies,
goal_test_output=GoalTestingOutput(
goal_accomplished=False,
analysis_of_failures="No goal testing output was produced."
)
))
After running the script, we’ll run the appropriate goal testing agent based on whether the goal is expected to modify database tables or just produce output files. If the goal testing agent reports that the goal was accomplished, we return success. If not, we add the attempt to the list of failed attempts and continue to the next iteration of the loop.
Use a Worse Model to Build a Better Agent#
In building this agent, I used the default OpenAI model, GPT-4.1. Using GPT-4.1 instead of GPT-5 forced me to work harder to find approaches that would make the agent more reliable. Now that we have a more robust import agent, I will also upgrade it to use GPT-5 to make the agent even better.
Trying it out#
Now that we have implemented goal-based planning and execution with feedback loops, and upgraded to use GPT-5, let’s try it out with our presidents import job:
$>PYTHONPATH=src python3 -m d0_file_import 'There is MySQL compatible Dolt database server running on 127.0.0.1:3306 which you can connect to using the user "root" with no password. Import /Users/brian/datasets/csv/presidents.csv into the database "presidents" on the main branch'
initializing session database host:127.0.0.1 port:3306 user:root ...
session table already exists, skipping initialization
The session ID for this run is: 88e834ae-d01d-49e9-ba9b-2ad435e86247
Extracting information from prompt...
This is my understanding of the job:
This job is a data collection task to import the local file '/Users/brian/datasets/csv/presidents.csv' into the Dolt database named 'presidents' running on 127.0.0.1:3306 using the 'root' user without a password, targeting the 'main' branch. The destination table is unspecified, so it is assumed that the relevant table will be created or auto-inferred.
Is this correct? (y/n)> y
Retrieving schema from sink database...
Investigating data sources...
Some information is needed to set up your import job from presidents.csv to your Dolt database.
What should the destination table be called in the Dolt database? (e.g., 'presidents')
> presidents
Do you want to create a new table in the Dolt database based on the columns found in the CSV, or should the import fail if a table already exists with a different schema? [create a new table/fail if exists and mismatched]
> create a new table
Should any columns be excluded or renamed during the import, or should all columns from the CSV be imported as-is? [import as-is/exclude or rename]
> import as-is
Comparing data sources to the database schema...
No existing tables will have data imported into them.
The following new tables need to be created:
Table: presidents
Schema: CREATE TABLE presidents (
presidency INT PRIMARY KEY,
president VARCHAR(255),
wikipedia_entry VARCHAR(255),
took_office VARCHAR(32),
left_office VARCHAR(32),
party VARCHAR(64),
portrait VARCHAR(255),
thumbnail VARCHAR(255),
home_state VARCHAR(64)
);
The plan for where the source data will go once the tables are created is:
A new table called 'presidents' will be created in the Dolt database with columns directly inferred from the CSV file: presidency (INT, used as the PRIMARY KEY), president (VARCHAR), wikipedia_entry (VARCHAR), took_office (VARCHAR), left_office (VARCHAR), party (VARCHAR), portrait (VARCHAR), thumbnail (VARCHAR), and home_state (VARCHAR). All columns from the CSV will be imported as-is. After the table is created, the rows from presidents.csv will be loaded into the new 'presidents' table.
Would you like to:
1. Continue with this plan to create 1 and import data into 0 existing tables.
2. Make changes to the import destination plan, such as changing table names or schema, adding or removing tables to create, or designate the different table to import into.
3. Exit and create the tables manually, or change the prompt to specify existing tables.
(1/2/3)> 1
Planning data import...
Data import plan created:
steps
1. goal: Extract and validate the header row from presidents.csv to verify that it matches the expected columns: presidency, president, wikipedia_entry, took_office, left_office, party, portrait, thumbnail, home_state.
test: Inspect a new file (e.g., ./.d0/88e834ae-d01d-49e9-ba9b-2ad435e86247/validated_headers.csv) containing just the header row and confirm that all expected columns are present and named correctly.
2. goal: Filter the presidents.csv file and output a new CSV (e.g., ./.d0/88e834ae-d01d-49e9-ba9b-2ad435e86247/presidents_sanitized.csv) containing only rows that conform to the required schema (i.e., presidency is an integer, all other columns have values that can be inserted as strings, and the row has exactly the correct number of columns).
test: Inspect the sanitized CSV file for schema compliance, spot-checking: all rows have 9 columns, presidency values can be parsed as integers, and no extra or missing fields.
3. goal: Import the sanitized CSV file into the 'presidents' table in the Dolt database.
test: Use get_table_diff_counts or get_database_diff_counts to verify the number of rows added to the 'presidents' table matches the number of rows in presidents_sanitized.csv.
Would you like to:
1. Execute this plan to import the data as described.
2. Make changes to the import the plan.
3. Abort the data import job.
(1/2/3)> 1
Executing step 1 / 3
goal: Extract and validate the header row from presidents.csv to verify that it matches the expected columns: presidency, president, wikipedia_entry, took_office, left_office, party, portrait, thumbnail, home_state.
test: Inspect a new file (e.g., ./.d0/88e834ae-d01d-49e9-ba9b-2ad435e86247/validated_headers.csv) containing just the header row and confirm that all expected columns are present and named correctly.
writing script
running script
writing output to: /Users/brian/dev/d0/.d0/88e834ae-d01d-49e9-ba9b-2ad435e86247/extract_and_validate_headers.py.output.txt
testing goal with file outputs
Step 1 succeeded
Executing step 2 / 3
goal: Filter the presidents.csv file and output a new CSV (e.g., ./.d0/88e834ae-d01d-49e9-ba9b-2ad435e86247/presidents_sanitized.csv) containing only rows that conform to the required schema (i.e., presidency is an integer, all other columns have values that can be inserted as strings, and the row has exactly the correct number of columns).
test: Inspect the sanitized CSV file for schema compliance, spot-checking: all rows have 9 columns, presidency values can be parsed as integers, and no extra or missing fields.
writing script
running script
writing output to: /Users/brian/dev/d0/.d0/88e834ae-d01d-49e9-ba9b-2ad435e86247/sanitize_presidents_csv.py.output.txt
testing goal with file outputs
Step 2 succeeded
Executing step 3 / 3
goal: Import the sanitized CSV file into the 'presidents' table in the Dolt database.
test: Use get_table_diff_counts or get_database_diff_counts to verify the number of rows added to the 'presidents' table matches the number of rows in presidents_sanitized.csv.
writing script
running script
writing output to: /Users/brian/dev/d0/.d0/88e834ae-d01d-49e9-ba9b-2ad435e86247/import_presidents_to_mysql.py.output.txt
testing goal with dolt table changes
Getting diff counts for table 'presidents'...
Table 'presidents': 43 new rows, 0 deleted rows, 0 modified rows.
Step 3 succeeded
Data import plan executed successfully. Merge d0-88e834ae-d01d-49e9-ba9b-2ad435e86247 into your desired branch to apply the changes.
Conclusion#
In this post, we explored ways of improving the reliability of our agent driven data import jobs. We implemented goal-based planning, and idempotent goal execution with the ability to roll back and retry failed goals. This approach allows us to handle failures more gracefully, and provides a clearer path to success for complex data import tasks. With these improvements we can tackle more complex data import jobs with greater confidence in their success.
In making our agent more reliable, it has become clear that an agent needs to be able to see what it has done to be successful. With files, this is straightforward. But with databases, it’s much more challenging… unless you use Dolt. Dolt is the only SQL database that version controls both the schema and the data. Features like branches, diffs, commits, rollbacks, and merges make Dolt the only choice for relational databases when working with agents. If you’re interested in trying out our agent, or just want to talk about agents and Dolt, join us on our Discord.
