python-in-the-morning
python-in-the-morning copied to clipboard
Optimise code and multi-thread/pooling for increased speed
I have a list (csv file) with a bunch of combinations of issue_date,stock_ticker
. The list is 40.000 rows long. The current function spits 1 return per 1 second or so. This would take me more than 10hrs + before it to finish.
import quandl
import datetime
import numpy as np
quandl.ApiConfig.api_key = 'api-key'
def get_data(issue_date, stock_ticker):
# Prepare var
stock_ticker = "EOD/" + stock_ticker
# Volatility
date_1 = datetime.datetime.strptime(issue_date, "%d/%m/%Y")
pricing_date = date_1 + datetime.timedelta(days=-40) # -40 days of issue date
volatility_date = date_1 + datetime.timedelta(days=-240) # -240 days of issue date (-40,-240 range)
# Check if code exists : if not -> return empty array
try:
stock = quandl.get(stock_ticker, start_date=volatility_date, end_date=pricing_date) # get pricing data
except quandl.errors.quandl_error.NotFoundError:
return []
daily_close = stock['Adj_Close'].pct_change() # returns using adj.close
stock_vola = np.std(daily_close) * np.sqrt(252) # annualized volatility
# Average price
stock_pricing_date = date_1 + datetime.timedelta(days=-2) # -2 days of issue date
stock_pricing_date2 = date_1 + datetime.timedelta(days=-12) # -12 days of issue date
stock_price = quandl.get(stock_ticker, start_date=stock_pricing_date2, end_date=stock_pricing_date)
stock_price_average = np.mean(stock_price['Adj_Close']) # get average price
# Amihuds Liquidity measure
liquidity_pricing_date = date_1 + datetime.timedelta(days=-20)
liquidity_pricing_date2 = date_1 + datetime.timedelta(days=-120)
stock_data = quandl.get(stock_ticker, start_date=liquidity_pricing_date2, end_date=liquidity_pricing_date)
p = np.array(stock_data['Adj_Close'])
returns = np.array(stock_data['Adj_Close'].pct_change())
dollar_volume = np.array(stock_data['Adj_Volume'] * p)
illiq = (np.divide(returns, dollar_volume))
print(np.nanmean(illiq))
illiquidity_measure = np.nanmean(illiq, dtype=float) * (10 ** 6) # multiply by 10^6 for expositional purposes
return [stock_vola, stock_price_average, illiquidity_measure]
import function
import csv
import tkinter as tk
from tkinter import filedialog
# Open File Dialog
root = tk.Tk()
root.withdraw()
file_path = filedialog.askopenfilename()
# Load Spreadsheet data
f = open(file_path)
csv_f = csv.reader(f)
next(csv_f)
result_data = []
# Iterate
for row in csv_f:
try:
return_data = function.get_data(row[1], row[0])
if len(return_data) != 0:
# print(return_data)
result_data_loc = [row[1], row[0]]
result_data_loc.extend(return_data)
result_data.append(result_data_loc)
except AttributeError:
print(row[0])
print('\n\n')
print(row[1])
continue
if result_data is not None:
with open('resuls.csv', mode='w', newline='') as result_file:
csv_writer = csv.writer(result_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
for result in result_data:
# print(result)
csv_writer.writerow(result)
else:
print("No results found!")
Hello! Thanks for stopping by the stream. I got to say your code is prime for refactoring for speed! it's exciting to me how tangible it seems. However, I think a test suite/way of mocking the API is required before starting to optimize.
I could try and help with that on stream, or you could do it, and then we optimize on stream. If our schedules don't line up NBD, I have faith you can figure it out, or join my discord, and myself or others even smarter than me can help. Or I can recommend other streamers too!