How To Get Survey Response Data From Qualtrics With Python
- By : Mydatahack
- Category : Data Engineering, Data Ingestion
- Tags: Python API Data Ingestion
In the previous post, we had a look at Python code examples of basic data engineering with AWS infrastructure. By using Qualtrics API, I would like to present a coding example of API data ingestion into S3 and Redshift. This code can be scheduled hourly, daily or weekly in a server or AWS Data Pipeline.
Qualtrics is an online survey software which allows you to send surveys via email or SMS, receive responses and generate reports. The aim of the ingestion is to get the survey response data into Redshift.
API Reference
Qualtrics API is a simple REST-based API. Once you generate an API token, you are pretty much ready to go. They have comprehensive API documentation. In a nutshell, you can use the requests module to make a POST requests with the token in the header to get the data as a csv file. Further API references are here.
I found Qualtrics API was unreliable hard way. My code initially failed randomly because I didn’t have the for loop to keep the request repeating until it connects. In the code example, I set the maximum to 200 (see the bulk_export method). It usually works within 20 times.
Key Points
In this example, we are using truncate & load because the data comes in one csv file with all the responses. We cannot obtain data incrementally. But, this is ok. We can leverage the power of Redshift copy command from S3, which is extremely fast. Truncate & load should be fine unless you have massive volume or other business requirements. If you want to do an incremental load, you can pick the record in the exported file according to the last updated time for insertion, which can be done relatively easily.
The program exports response data in a csv format into a local directory, push it to a specified S3 bucket, and execute copy command after truncating the table. It can ingest responses from multiple surveys. The argument for survey project names has to be concatenated by ‘,’.
The get_project_id method will return a list of survey ids based on the survey project name, which in turn uses to get the survey-specific response data.
The format_colnames method takes care of formatting the column as the exported file has multiple rows for the column names. I am using the pandas package to do the data manipulation.
Note that you also need AWS Access Key Id & Secret Access Key for the copy command.
OK, here comes the code.
Enjoy!
Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 | import requests import zipfile import json import urllib2 import sys import os import pandas as pd import shutil import psycopg2 import boto3 import time ''' The character formatting below is for Python 2.7. Get rid of them for Python 3 and use encoding ='utf-8' when you open file.abs The rest should work for both versions. ''' reload(sys) sys.setdefaultencoding('utf8') # List of Argument taken from the command line project_names = sys.argv[1] lst = [] for i in project_names.split(','): lst.append(i) target_date = sys.argv[2] bucket_name = sys.argv[3] dbname = sys.argv[4] host = sys.argv[5] port = sys.argv[6] user = sys.argv[7] password = sys.argv[8] aws_access_key_id = sys.argv[9] aws_secret_access_key = sys.argv[10] target_schema = sys.argv[11] base_url = sys.argv[12] secret_token = sys.argv[13] # (1) Getting a list of ID def get_project_id(names): '''The function takes a list of project names and return a list of IDs ''' report_dic = {} url = base_url + '/surveys/' header = {'X-API-TOKEN': secret_token} # (1) generating the request object req = urllib2.Request(url,None,header) # (2) Make request response = urllib2.urlopen(req) data = json.load(response) # (3) Find Id for each project for name in names: # This is necessary because project names sometimes contain 2 spaces instead of 1 target_name = name.replace(" ", "").lower() # It is better to create a table name list separately table_key = name.replace(" ", "_").replace("-", "_").lower() # print target_name for i in data['result']['elements']: if i['name'].replace(" ", "").lower() == target_name: report_dic[table_key] = i['id'] return report_dic # (2) Get Metadata def get_survey_metadata(report_dic, target_dir): '''Takes survey ID and create json file in a specified directory''' for k, v in report_dic.items(): url = base_url + '/surveys/' + v header = {'X-API-TOKEN': secret_token} req = urllib2.Request(url,None,header) response = urllib2.urlopen(req) data = json.load(response) pretty = json.dumps(data, sort_keys=False, indent=4) file = open('./' + target_dir + '/' + k + '_meta.json', 'w') file.write(pretty) print('Metadata File for %s Generated!' % (k)) # (3) Exporting reports fileFormat = "csv" baseUrl = base_url + "/responseexports/" headers = {"content-type": "application/json","x-api-token": secret_token} def bulk_exports(report_dic): '''This function takes a list of ids and create data export''' if os.path.exists('./Exported'): shutil.rmtree('./Exported') for key, val in report_dic.items(): # Step 1: Creating Data Export print(key, val) downloadRequestUrl = baseUrl downloadRequestPayload = '{"format":"' + fileFormat + '","surveyId":"' + val + '"}' downloadRequestResponse = requests.request("POST", downloadRequestUrl, \ data=downloadRequestPayload, headers=headers) progressId = downloadRequestResponse.json()["result"]["id"] # Step 3: Downloading file requestDownloadUrl = baseUrl + progressId + '/file' requestDownload = requests.request("GET", requestDownloadUrl, headers=headers, stream=True) for i in range(0, 200): print(str(requestDownload)) if str(requestDownload) == '<Response [200]>': # Step 4: Unziping file with open("RequestFile.zip", "wb") as f: for chunk in requestDownload.iter_content(chunk_size=1024): f.write(chunk) f.close() zipfile.ZipFile("RequestFile.zip").extractall('Exported') print('Completed Export for {}'.format(key)) os.remove("./RequestFile.zip") break else: time.sleep(10) requestDownload = requests.request("GET", requestDownloadUrl, headers=headers, stream=True) for filename in os.listdir("Exported"): print(filename) os.rename('./Exported/'+filename, './Exported/'+filename.replace(" ", "_").replace("-", "_").lower()) # os.rename('./'+filename, './'+filename.replace(" ", "_").replace("-", "_").lower()) # (4) Create the folder before moving to S3 def create_dir(target_date): direc = "./" + target_date if not os.path.exists(direc): os.makedirs(direc) print('New directory %s has been created' % (target_date)) else: shutil.rmtree(direc) os.makedirs(direc) print('New directory %s has been created' % (target_date)) # (5) Reformat csv file and put into the right local folder created in (4) def format_colnames(output_dir): '''This function takes the file and rename its columns with the right format, and generate csv file with the right column names''' for filename in os.listdir("./Exported"): # (1) Read csv file df = pd.read_csv("./Exported/" + filename, skiprows=[0,1], low_memory=False) columns = df.columns new_cols = [] # (2) Reformat the column names for name in columns: new_name = name.replace('{', '').replace('}', '').split(':')[1].replace('\'', '').\ replace('-', '_').replace(' ', '') new_cols.append(new_name) # print new_cols df.columns = new_cols # (3) Create CSV file into the output directory df.to_csv('./' + output_dir + '/' + filename, doublequote=True, sep='|', index=False) print('Reformateed and moved %s' % (filename)) # (6) Uploading to S3 def upload_files(local_path, s3_path, bucket_name): '''Search all the files from specified directory and push to S3''' s3 = boto3.resource('s3') for (root, dirs, files) in os.walk(local_path): for filename in files: print("File: {}".format(filename)) s3_filename = s3_path + filename print('Uploading to %s...' % (s3_filename)) s3.meta.client.upload_file(local_path + filename, bucket_name, s3_filename) print('Done') # (7) Truncate & Load to Redshift def truncate_load_tables(report_dict): con = psycopg2.connect(dbname=dbname, host=host, port=port, user=user, password=password) print("Connection to Redshift Successful!") cur = con.cursor() for k, v in report_dict.items(): target_table = target_schema + '.' + k file_name = 's3://' + bucket_name + '/Qualtrics/data_export/' + k + '.csv' sql = """ Truncate %s;Commit; copy %s from '%s' dateformat 'auto' credentials 'aws_access_key_id=%s;aws_secret_access_key=%s' CSV QUOTE '"' DELIMITER '|' ACCEPTINVCHARS EMPTYASNULL COMPUPDATE OFF IGNOREHEADER 1; Commit; """ % (target_table, target_table, file_name, aws_access_key_id, aws_secret_access_key) print(sql) cur.execute(sql) print("Copy Command executed successfully for %s" % (target_table)) con.close() # Execution # # (1) get the dictionary with report name and report id to export reports = get_project_id(lst) # (2) Create a directory for S3 transfer create_dir(target_date) # (3) Do Bulk Export bulk_exports(reports) # (4) Get metadata and prep for S3 transfer get_survey_metadata(reports, target_date) # (5) Transfer format_colnames(target_date) # (6) Move to S3 upload_files('./' + target_date + '/', 'Qualtrics/data_export/', bucket_name) # (7) Load Table truncate_load_tables(reports) |