Python’s boto3
library provides an easy way to submit SQL queries to your databases on AWS Athena. In some cases though, instead of just submitting the query letting results be written to S3, you want to immediately work with the output. This post shows how to download the result after the query is processed and how to import it as a pandas DataFrame.
Functions
First, we need a function that submits an SQL query to Athena. We can use boto3
's Athena client for it and wrap it to simplify usage. Note that the function does only return an id for the query execution, not the results of the query execution:
def submit_query(query:str, database:str, s3_location_output:str, workgroup:str) -> str:
"""submits a query to Athena and returns its execution id"""
athena = boto3.client('athena')
response = athena.start_query_execution(QueryString=query,
QueryExecutionContext={'Database': database},
WorkGroup=workgroup,
ResultConfiguration={'OutputLocation': s3_location_output})
query_execution_id = response["QueryExecutionId"]
return query_execution_id
Second, we need a function that checks the status of the query execution until it is finished and returns information on where the results have been stored in S3. We provide a timeout threshold and the default setting is to update the status every 5 seconds:
def check_and_wait_for_query(query_execution_id:str, timeout:int, check_freq_sec=5) -> dict:
"""checks status of Athena, waits until finished and returns result dict"""
athena = boto3.client('athena')
response = athena.get_query_execution(QueryExecutionId=query_execution_id)
query_state = response['QueryExecution']['Status']['State']
waited = 0
while query_state in ['QUEUED', 'RUNNING'] and waited <= timeout:
time.sleep(check_freq_sec)
waited = waited + check_freq_sec
response = athena.get_query_execution(QueryExecutionId=query_execution_id)
query_state = response['QueryExecution']['Status']['State']
if query_state in ['QUEUED', 'RUNNING']:
result = {'state': 'TIMEOUT', 's3location': None, 'query_execution_id': query_execution_id}
elif query_state in ['FAILED', 'CANCELLED']:
result = {'state': query_state, 's3location': None, 'query_execution_id': query_execution_id}
elif query_state == 'SUCCEEDED':
result = {'state': query_state, 's3location': response['QueryExecution']['ResultConfiguration']['OutputLocation'], 'query_execution_id': query_execution_id}
else:
result = {'state':'ERROR', 's3location': None, 'query_execution_id': query_execution_id}
return result
Finally, we write two high-level functions that submit the query, wait for the result and finally download/read the result. As these two functions return two very different objects (a filepath and a DataFrame), I have created them as two functions rather than submitting download/read as a function parameter to a single function.
def submit_query_and_download(query:str, filepath:str, timeout=60, database:str) -> str:
"""submits a query to Athena, waits for and downloads the result"""
athena = boto3.client('athena')
s3 = boto3.client('s3')
query_execution_id = submit_query(query,database=database)
response = check_and_wait_for_query(query_execution_id=query_execution_id,timeout=60)
if response['state'] == 'SUCCEEDED':
o = urlparse(response['s3location'], allow_fragments=False)
s3.download_file(Bucket=o.netloc,Key=o.path[1:],Filename=filepath)
result = filepath
else:
result = None
return result
def submit_query_and_get_dataframe(query:str, timeout=60, database:str):
"""submits a query to Athena, waits for and downloads the result"""
athena = boto3.client('athena')
s3 = boto3.client('s3')
query_execution_id = submit_query(query,database=database)
response = check_and_wait_for_query(query_execution_id=query_execution_id,timeout=60)
if response['state'] == 'SUCCEEDED':
df = pd.read_csv(response['s3location'])
else:
df = None
return df
Package Imports
import boto3
import time
import os
import pandas as pd
from pandas import DataFrame
from urllib.parse import urlparse