cla
cla copied to clipboard
Updated code and Notebook to work on Python 3
Hi, I have updated the code and created a Jupyter notebook to work on Python 3 (the original code was in Python 2). You can use them to update your repo.
#!/usr/bin/env python
# On 20121129
# Critical Line Algorithm
# by MLdP <[email protected]>
import numpy as np
# ---------------------------------------------------------------
# ---------------------------------------------------------------
class CLA:
def __init__(self, mean, covar, lB, uB):
# Initialize the class
self.mean = mean
self.covar = covar
self.lB = lB
self.uB = uB
self.w = [] # solution
self.l = [] # lambdas
self.g = [] # gammas
self.f = [] # free weights
# ---------------------------------------------------------------
def solve(self):
# Compute the turning points,free sets and weights
f, w = self.initAlgo()
self.w.append(np.copy(w)) # store solution
self.l.append(-np.inf)
self.g.append(None)
self.f.append(f[:])
while True:
# 1) case a): Bound one free weight
l_in = -np.inf
if len(f) > 1:
covarF, covarFB, meanF, wB = self.getMatrices(f)
covarF_inv = np.linalg.inv(covarF)
j = 0
for i in f:
l, bi = self.computeLambda(
covarF_inv, covarFB, meanF, wB, j, [self.lB[i], self.uB[i]]
)
if l > l_in:
l_in, i_in, bi_in = l, i, bi
j += 1
# 2) case b): Free one bounded weight
l_out = -np.inf
if len(f) < self.mean.shape[0]:
b = self.getB(f)
for i in b:
covarF, covarFB, meanF, wB = self.getMatrices(f + [i])
covarF_inv = np.linalg.inv(covarF)
l, bi = self.computeLambda(
covarF_inv,
covarFB,
meanF,
wB,
meanF.shape[0] - 1,
self.w[-1][i],
)
if (self.l[-1] == -np.inf or l < self.l[-1]) and l > l_out:
l_out, i_out = l, i
# 3) decide lambda
if (l_in == -np.inf or l_in < 0) and (l_out == -np.inf or l_out < 0):
break
if l_in > l_out:
# If it's the first elment and it's Nan (-np.inf) replace it
if len(self.l) == 1 and self.l[0] == -np.inf:
self.l[0] = l_in
else:
self.l.append(l_in)
f.remove(i_in)
w[i_in] = bi_in # set value at the correct boundary
else:
# If it's the first elment and it's Nan (-np.inf) replace it
if len(self.l) == 1 and self.l[0] == -np.inf:
self.l[0] = l_out
else:
self.l.append(l_out)
f.append(i_out)
# 4) compute solution vector
covarF, covarFB, meanF, wB = self.getMatrices(f)
covarF_inv = np.linalg.inv(covarF)
wF, g = self.computeW(covarF_inv, covarFB, meanF, wB)
for i in range(len(f)):
w[f[i]] = wF[i]
self.w.append(np.copy(w)) # store solution
self.g.append(g)
self.f.append(f[:])
if len(f) == self.mean.shape[0]:
# 5) minimum variance solution
wF, g = self.computeW(covarF_inv, covarFB, np.zeros(meanF.shape), wB)
for i in range(len(f)):
w[f[i]] = wF[i]
self.w.append(np.copy(w)) # store solution
self.g.append(g)
self.f.append(f[:])
# Remove first element from weights (it's a duplicate)
self.w = self.w[1:]
# If we miss the last lambda insert it
if len(self.l) < len(self.w):
self.l += [0.0]
# ---------------------------------------------------------------
def initAlgo(self):
# Initialize the algo
# 1) Form structured array
a = np.zeros((self.mean.shape[0]), dtype=[("id", int), ("mu", float)])
b = [self.mean[i][0] for i in range(self.mean.shape[0])] # dump array into list
a[:] = list(zip(range(self.mean.shape[0]), b)) # fill structured array
# 2) Sort structured array
b = np.sort(a, order="mu")
# 3) First free weight
i, w = b.shape[0], np.copy(self.lB)
while sum(w) < 1:
i -= 1
w[b[i][0]] = self.uB[b[i][0]]
w[b[i][0]] += 1 - sum(w)
return [b[i][0]], w
# ---------------------------------------------------------------
def computeBi(self, c, bi):
if c > 0:
bi = bi[1]
if c < 0:
bi = bi[0]
return bi
# ---------------------------------------------------------------
def computeW(self, covarF_inv, covarFB, meanF, wB):
# 1) compute gamma
onesF = np.ones(meanF.shape)
g1 = np.dot(np.dot(onesF.T, covarF_inv), meanF)
g2 = np.dot(np.dot(onesF.T, covarF_inv), onesF)
if wB is None:
g, w1 = float(-self.l[-1] * g1 / g2 + 1 / g2), 0
else:
onesB = np.ones(wB.shape)
g3 = np.dot(onesB.T, wB)
g4 = np.dot(covarF_inv, covarFB)
w1 = np.dot(g4, wB)
g4 = np.dot(onesF.T, w1)
g = float(-self.l[-1] * g1 / g2 + (1 - g3 + g4) / g2)
# 2) compute weights
w2 = np.dot(covarF_inv, onesF)
w3 = np.dot(covarF_inv, meanF)
return -w1 + g * w2 + self.l[-1] * w3, g
# ---------------------------------------------------------------
def computeLambda(self, covarF_inv, covarFB, meanF, wB, i, bi):
# 1) C
onesF = np.ones(meanF.shape)
c1 = np.dot(np.dot(onesF.T, covarF_inv), onesF)
c2 = np.dot(covarF_inv, meanF)
c3 = np.dot(np.dot(onesF.T, covarF_inv), meanF)
c4 = np.dot(covarF_inv, onesF)
c = -c1 * c2[i] + c3 * c4[i]
if c == 0:
return
# 2) bi
if type(bi) == list:
bi = self.computeBi(c, bi)
# 3) Lambda
if wB is None:
# All free assets
return float((c4[i] - c1 * bi) / c), bi
else:
onesB = np.ones(wB.shape)
l1 = np.dot(onesB.T, wB)
l2 = np.dot(covarF_inv, covarFB)
l3 = np.dot(l2, wB)
l2 = np.dot(onesF.T, l3)
return float(((1 - l1 + l2) * c4[i] - c1 * (bi + l3[i])) / c), bi
# ---------------------------------------------------------------
def getMatrices(self, f):
# Slice covarF,covarFB,covarB,meanF,meanB,wF,wB
covarF = self.reduceMatrix(self.covar, f, f)
meanF = self.reduceMatrix(self.mean, f, [0])
b = self.getB(f)
covarFB = self.reduceMatrix(self.covar, f, b)
wB = self.reduceMatrix(self.w[-1], b, [0])
return covarF, covarFB, meanF, wB
# ---------------------------------------------------------------
def getB(self, f):
return self.diffLists(np.arange(self.mean.shape[0]), f)
# ---------------------------------------------------------------
def diffLists(self, list1, list2):
return list(set(list1) - set(list2))
# ---------------------------------------------------------------
def reduceMatrix(self, matrix, listX, listY):
# Reduce a matrix to the provided list of rows and columns
if len(listX) == 0 or len(listY) == 0:
return
matrix_ = matrix[:, listY[0] : listY[0] + 1]
for i in listY[1:]:
a = matrix[:, i : i + 1]
matrix_ = np.append(matrix_, a, 1)
matrix__ = matrix_[listX[0] : listX[0] + 1, :]
for i in listX[1:]:
a = matrix_[i : i + 1, :]
matrix__ = np.append(matrix__, a, 0)
return matrix__
# ---------------------------------------------------------------
def getMinVar(self):
# Get the minimum variance solution
var = []
for w in self.w:
a = np.dot(np.dot(w.T, self.covar), w)
var.append(a)
return min(var) ** 0.5, self.w[var.index(min(var))]
# ---------------------------------------------------------------
def getMaxSR(self):
# Get the max Sharpe ratio portfolio
# 1) Compute the local max SR portfolio between any two neighbor turning points
w_sr, sr = [], []
for i in range(len(self.w) - 1):
w0 = np.copy(self.w[i])
w1 = np.copy(self.w[i + 1])
kargs = {"minimum": False, "args": (w0, w1)}
a, b = self.goldenSection(self.evalSR, 0, 1, **kargs)
w_sr.append(a * w0 + (1 - a) * w1)
sr.append(b)
return max(sr), w_sr[sr.index(max(sr))]
# ---------------------------------------------------------------
def evalSR(self, a, w0, w1):
# Evaluate SR of the portfolio within the convex combination
w = a * w0 + (1 - a) * w1
b = np.dot(w.ravel(), self.mean)[0]
c = np.sqrt(np.dot(np.dot(w.ravel(), self.covar), w.ravel()))
return b / c
# ---------------------------------------------------------------
def goldenSection(self, obj, a, b, **kargs):
# Golden section method. Maximum if kargs['minimum']==False is passed
# from math import log,ceil
tol, sign, args = 1.0e-9, 1, None
if "minimum" in kargs and kargs["minimum"] == False:
sign = -1
if "args" in kargs:
args = kargs["args"]
numIter = int(np.ceil(-2.078087 * np.log(tol / np.abs(b - a))))
r = 0.618033989
c = 1.0 - r
# Initialize
x1 = r * a + c * b
x2 = c * a + r * b
f1 = sign * obj(x1, *args)
f2 = sign * obj(x2, *args)
# Loop
for i in range(numIter):
if f1 > f2:
a = x1
x1 = x2
f1 = f2
x2 = c * a + r * b
f2 = sign * obj(x2, *args)
else:
b = x2
x2 = x1
f2 = f1
x1 = r * a + c * b
f1 = sign * obj(x1, *args)
if f1 < f2:
return x1, sign * f1
else:
return x2, sign * f2
# ---------------------------------------------------------------
def efFrontier(self, points):
# Get the efficient frontier
mu, sigma, weights = [], [], []
a = np.linspace(0, 1, points // len(self.w))[
:-1
] # remove the 1, to avoid duplications
b = np.arange(len(self.w) - 1)
for i in b:
w0, w1 = self.w[i], self.w[i + 1]
if i == b[-1]:
a = np.linspace(
0, 1, points // len(self.w)
) # include the 1 in the last iteration
for j in a:
w = w1 * j + (1 - j) * w0
weights.append(np.copy(w))
mu.append(np.dot(w.T, self.mean)[0, 0])
sigma.append(np.dot(np.dot(w.T, self.covar), w)[0, 0] ** 0.5)
return mu, sigma, weights
# ---------------------------------------------------------------
# ---------------------------------------------------------------
And the notebook:
CLA in Python 3
Original code in Python 2, together with the example dataset, is here: https://github.com/mdengler/cla
import CLA
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from IPython.display import Markdown, display
# Python >= 3.9 or you might have issues with type-hints
Helper functions
def compute_return_risk(
df: pd.DataFrame, mean: np.ndarray, covar: np.ndarray
) -> tuple[pd.Series, pd.Series]:
"""Compute Return and Risk (volatility) series for the CLW weights
Args:
df (pd.DataFrame): Dataframe with the weights
mean (np.ndarray): Array of mean returns
covar (np.ndarray): Array of covariances
Returns:
tuple[pd.Series, pd.Series]: series of returns and risk
"""
p_ret = []
risk = []
for i in df.index:
p_ret.append(np.dot(df.loc[i, :].values, mean)[0])
risk.append(
np.sqrt(np.dot(df.loc[i, :].values, np.dot(covar, df.loc[i, :].values)))
)
return pd.Series(p_ret, index=df.index, name="Return"), pd.Series(
risk, index=df.index, name="Risk"
)
def create_weight_table(
cla: CLA, mean: np.ndarray, covar: np.ndarray, var_names: list
) -> pd.DataFrame:
"""Create the table with Return, Risk, Lambda and Weights
Args:
cla (CLA): The CLA object with the solution
mean (np.ndarray): Array with the return means
covar (np.ndarray): Array of covariances
var_names (list): List of names for the columns
Returns:
pd.DataFrame: The weights table
"""
weights = pd.DataFrame(
np.hstack(cla.w).T, columns=var_names, index=np.arange(1, len(cla.w) + 1)
)
port_return, port_risk = compute_return_risk(weights, mean, covar)
port_lambda = pd.Series(cla.l, index=weights.index, name="Lambda")
return pd.concat([port_return, port_risk, port_lambda, weights], axis=1)
def display_results(ret_mean: np.ndarray, cla: CLA) -> None:
"""Plot the CLA results and dispaly the maximum Sharpe portfolio
and the minimum variance one
Args:
ret_mean (np.ndarray): Array or return means
cla (CLA): CLA object with the solution
"""
plot_results()
print()
# 5) Get Maximum Sharpe ratio portfolio
sr, w_sr = cla.getMaxSR()
display(
Markdown(
f"### Portfolio volatility: {np.sqrt(np.dot(np.dot(w_sr.ravel(),cla.covar),w_sr.ravel())):.2%}, Sharpe ratio: {sr:.2f}"
)
)
display(Markdown("### Weights (rounded to 4 decimal places):"))
display(
pd.Series(
np.round(w_sr.ravel(), 4),
name="sr_weights",
index=np.arange(1, len(w_sr) + 1),
)
)
print()
# 6) Get Minimum Variance portfolio
mv, w_mv = cla.getMinVar()
mu_v = np.dot(w_mv.ravel(), ret_mean)
sr_v = np.round(mu_v / mv, 2).ravel()[0]
display(
Markdown(
f"### Portfolio minimum volatility: {mv.ravel()[0]:.2%}, Sharpe ratio: {sr_v:.2f}"
)
)
display(Markdown("### Weights (rounded to 4 decimal places):"))
display(
pd.Series(
np.round(w_mv.ravel(), 4),
name="mv_weights",
index=np.arange(1, len(w_sr) + 1),
)
)
print()
return
def plot_results() -> None:
"""Plots the results of CLA"""
mu, sigma, weights = cla.efFrontier(100)
mu = np.array(mu)
sigma = np.array(sigma)
x_low = max(sigma.min() - 0.05, 0.0)
fig, ax = plt.subplots(1, 2, figsize=(20, 7)) # one row, one column, first plot
ax[0].plot(sigma, mu, color="blue")
ax[0].set_xlabel("Risk")
ax[0].set_ylabel("Expected Excess Return", rotation=90)
# ax[0].set_xticklabels(ax[0].get_xticks(), rotation='vertical')
ax[0].set_xlim(x_low, 1.0)
ax[0].set_title("CLA-derived Efficient Frontier")
ax[1].plot(sigma, np.round(np.array(mu) / np.array(sigma), 2), color="blue")
ax[1].set_xlabel("Risk")
ax[1].set_ylabel("Sharpe ratio", rotation=90)
# ax[1].set_xticklabels(ax[0].get_xticks(), rotation='vertical')
ax[1].set_xlim(x_low, 1.0)
ax[1].set_title("CLA-derived Sharpe Ratio function")
plt.suptitle("CLA results", fontsize=16)
plt.show()
return
Import dataset
df = pd.read_csv("CLA_Data.csv")
df
X1 | X2 | X3 | X4 | X5 | X6 | X7 | X8 | X9 | X10 | |
---|---|---|---|---|---|---|---|---|---|---|
0 | 1.175000 | 1.190000 | 0.396000 | 1.120000 | 0.346000 | 0.679000 | 0.089000 | 0.730000 | 0.481000 | 1.080000 |
1 | 0.000000 | 0.000000 | 0.000000 | 0.000000 | 0.000000 | 0.000000 | 0.000000 | 0.000000 | 0.000000 | 0.000000 |
2 | 1.000000 | 1.000000 | 1.000000 | 1.000000 | 1.000000 | 1.000000 | 1.000000 | 1.000000 | 1.000000 | 1.000000 |
3 | 0.407552 | 0.031758 | 0.051839 | 0.056639 | 0.033023 | 0.008278 | 0.021659 | 0.013324 | 0.034348 | 0.022499 |
4 | 0.031758 | 0.906305 | 0.031364 | 0.026873 | 0.019172 | 0.009344 | 0.024950 | 0.007610 | 0.028749 | 0.013369 |
5 | 0.051839 | 0.031364 | 0.194909 | 0.044085 | 0.030068 | 0.013227 | 0.035260 | 0.011549 | 0.042756 | 0.020573 |
6 | 0.056639 | 0.026873 | 0.044085 | 0.195285 | 0.027773 | 0.005267 | 0.013758 | 0.007809 | 0.029142 | 0.016404 |
7 | 0.033023 | 0.019172 | 0.030068 | 0.027773 | 0.340591 | 0.007771 | 0.020678 | 0.007364 | 0.025427 | 0.012841 |
8 | 0.008278 | 0.009344 | 0.013227 | 0.005267 | 0.007771 | 0.159839 | 0.021056 | 0.005187 | 0.017237 | 0.007238 |
9 | 0.021659 | 0.024950 | 0.035260 | 0.013758 | 0.020678 | 0.021056 | 0.680567 | 0.013779 | 0.046270 | 0.019261 |
10 | 0.013324 | 0.007610 | 0.011549 | 0.007809 | 0.007364 | 0.005187 | 0.013779 | 0.955269 | 0.010655 | 0.007610 |
11 | 0.034348 | 0.028749 | 0.042756 | 0.029142 | 0.025427 | 0.017237 | 0.046270 | 0.010655 | 0.316816 | 0.018543 |
12 | 0.022499 | 0.013369 | 0.020573 | 0.016404 | 0.012841 | 0.007238 | 0.019261 | 0.007610 | 0.018543 | 0.110793 |
Create variables
var_names = df.columns.to_list()
var_names
['X1', 'X2', 'X3', 'X4', 'X5', 'X6', 'X7', 'X8', 'X9', 'X10']
mean = df.iloc[0, :].values.reshape(-1, 1)
lB = df.iloc[1, :].values.reshape(-1, 1)
uB = df.iloc[2, :].values.reshape(-1, 1)
covar = df.iloc[3:, :].values
Compute CLA
# 3) Invoke object
cla = CLA.CLA(mean, covar, lB, uB)
cla.solve()
Display results
weight_df = create_weight_table(cla, mean, covar, var_names)
weight_df.style.format("{:.3f}")
Return | Risk | Lambda | X1 | X2 | X3 | X4 | X5 | X6 | X7 | X8 | X9 | X10 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1 | 1.190 | 0.952 | 58.303 | 0.000 | 1.000 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 |
2 | 1.180 | 0.546 | 4.174 | 0.649 | 0.351 | 0.000 | -0.000 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 |
3 | 1.160 | 0.417 | 1.946 | 0.434 | 0.231 | 0.000 | 0.335 | 0.000 | 0.000 | 0.000 | 0.000 | 0.000 | -0.000 |
4 | 1.111 | 0.267 | 0.165 | 0.127 | 0.072 | 0.000 | 0.281 | 0.000 | 0.000 | 0.000 | -0.000 | 0.000 | 0.520 |
5 | 1.108 | 0.265 | 0.147 | 0.123 | 0.070 | 0.000 | 0.279 | 0.000 | 0.000 | 0.000 | 0.006 | 0.000 | 0.521 |
6 | 1.022 | 0.230 | 0.056 | 0.087 | 0.050 | 0.000 | 0.224 | 0.000 | 0.174 | 0.000 | 0.030 | 0.000 | 0.435 |
7 | 1.015 | 0.228 | 0.052 | 0.085 | 0.049 | 0.000 | 0.220 | -0.000 | 0.180 | 0.000 | 0.031 | 0.006 | 0.429 |
8 | 0.973 | 0.220 | 0.037 | 0.074 | 0.044 | 0.000 | 0.199 | 0.026 | 0.198 | 0.000 | 0.033 | 0.028 | 0.398 |
9 | 0.950 | 0.216 | 0.031 | 0.068 | 0.041 | 0.015 | 0.188 | 0.034 | 0.202 | 0.000 | 0.034 | 0.034 | 0.383 |
10 | 0.803 | 0.205 | 0.000 | 0.037 | 0.027 | 0.095 | 0.126 | 0.077 | 0.219 | 0.030 | 0.036 | 0.061 | 0.292 |
display_results(mean, cla)
Portfolio volatility: 22.74%, Sharpe ratio: 4.45
Weights (rounded to 4 decimal places):
1 0.0840
2 0.0489
3 0.0000
4 0.2183
5 0.0017
6 0.1812
7 0.0000
8 0.0312
9 0.0079
10 0.4269
Name: sr_weights, dtype: float64
Portfolio minimum volatility: 20.52%, Sharpe ratio: 3.91
Weights (rounded to 4 decimal places):
1 0.0370
2 0.0269
3 0.0949
4 0.1258
5 0.0767
6 0.2194
7 0.0300
8 0.0360
9 0.0613
10 0.2920
Name: mv_weights, dtype: float64
Thank you very much for your repo.
Kind regards, Andrea Dalseno