Athena Queries via Python

2020-09-05 / AWS Athena / Python / pandas / 3 minutes

Python’s boto3 library provides an easy way to submit SQL queries to your databases on AWS Athena. In some cases though, instead of just submitting the query letting results be written to S3, you want to immediately work with the output. This post shows how to download the result after the query is processed and how to import it as a pandas DataFrame.

Functions

First, we need a function that submits an SQL query to Athena. We can use boto3's Athena client for it and wrap it to simplify usage. Note that the function does only return an id for the query execution, not the results of the query execution:

def submit_query(query:str, database:str, s3_location_output:str, workgroup:str) -> str:
    """submits a query to Athena and returns its execution id"""
    athena = boto3.client('athena')
    response = athena.start_query_execution(QueryString=query,
                                            QueryExecutionContext={'Database': database},
                                            WorkGroup=workgroup,
                                            ResultConfiguration={'OutputLocation': s3_location_output})
    query_execution_id = response["QueryExecutionId"]
    return query_execution_id

Second, we need a function that checks the status of the query execution until it is finished and returns information on where the results have been stored in S3. We provide a timeout threshold and the default setting is to update the status every 5 seconds:

def check_and_wait_for_query(query_execution_id:str, timeout:int, check_freq_sec=5) -> dict:
    """checks status of Athena, waits until finished and returns result dict"""
    athena   = boto3.client('athena')
    response = athena.get_query_execution(QueryExecutionId=query_execution_id)
    query_state = response['QueryExecution']['Status']['State']
    waited = 0
    while query_state in ['QUEUED', 'RUNNING'] and waited <= timeout:
        time.sleep(check_freq_sec)
        waited = waited + check_freq_sec
        response = athena.get_query_execution(QueryExecutionId=query_execution_id)
        query_state = response['QueryExecution']['Status']['State']
    if query_state in ['QUEUED', 'RUNNING']:
        result = {'state': 'TIMEOUT', 's3location': None, 'query_execution_id': query_execution_id}
    elif query_state in ['FAILED', 'CANCELLED']:
        result = {'state': query_state, 's3location': None, 'query_execution_id': query_execution_id}
    elif query_state == 'SUCCEEDED':
        result = {'state': query_state, 's3location': response['QueryExecution']['ResultConfiguration']['OutputLocation'], 'query_execution_id': query_execution_id}
    else:
        result = {'state':'ERROR', 's3location': None, 'query_execution_id': query_execution_id}
    return result

Finally, we write two high-level functions that submit the query, wait for the result and finally download/read the result. As these two functions return two very different objects (a filepath and a DataFrame), I have created them as two functions rather than submitting download/read as a function parameter to a single function.

def submit_query_and_download(query:str, filepath:str, timeout=60, database:str) -> str:
    """submits a query to Athena, waits for and downloads the result"""
    athena = boto3.client('athena')
    s3 = boto3.client('s3')
    query_execution_id = submit_query(query,database=database)
    response = check_and_wait_for_query(query_execution_id=query_execution_id,timeout=60)
    if response['state'] == 'SUCCEEDED':
        o = urlparse(response['s3location'], allow_fragments=False)
        s3.download_file(Bucket=o.netloc,Key=o.path[1:],Filename=filepath)
        result = filepath
    else:
        result = None
    return result

def submit_query_and_get_dataframe(query:str, timeout=60, database:str):
    """submits a query to Athena, waits for and downloads the result"""
    athena = boto3.client('athena')
    s3 = boto3.client('s3')
    query_execution_id = submit_query(query,database=database)
    response = check_and_wait_for_query(query_execution_id=query_execution_id,timeout=60)
    if response['state'] == 'SUCCEEDED':
        df = pd.read_csv(response['s3location'])
    else:
        df = None
    return df

Package Imports

import boto3
import time
import os
import pandas as pd 
from pandas import DataFrame
from urllib.parse import urlparse


comments powered by Disqus