-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathridge_regression.py
More file actions
198 lines (166 loc) · 7.54 KB
/
Copy pathridge_regression.py
File metadata and controls
198 lines (166 loc) · 7.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import logging
import numpy as np
class RidgeRegression:
"""
Linear regression using least squares loss with L2 regularization.
"""
def __init__(self, x_train, y_train, lambd=0.1, min_grad=0.001, max_iter=1000, backtracking_max_iter=500):
"""
Initializes and instance of RidgeRegression
:param x_train: training features of shape (n, d)
:param y_train: training labels of shape (n, 1)
:param lambd: regularization parameter, default value 0.1
:param min_grad: minimum slope to stop gradient descent, default - 0.001
:param max_iter: maximum number if iteration - default 1000. If minimum gradient is not
is not reached in max iteration, the algorithm will stop optimizing further
:param backtracking_max_iter: maximum number of iterations to optimize learning rate using
backtracking line search
"""
self.logger = logging.getLogger(__name__)
self.n_train, self.d = x_train.shape
self.x_train = np.asarray(x_train)
self.y_train = np.asarray(y_train)
self.lambd = lambd
self.min_grad = min_grad
self.max_iter = max_iter
self.backtracking_max_iter = backtracking_max_iter
# arrays to contain costs, coefficients, and gradients after each iteration
self.cost_history = np.zeros(max_iter)
self.grad_magnitude_history = np.zeros(max_iter)
self.w_history = np.zeros((self.d, max_iter))
# initialize to an arbitrary learning rate
self.init_learning_rate = 0.001
self.w = None
self.n_iter = 0
def __compute_initial_learning_rate__(self):
"""
Computes the initial learning rate by calculating the Lipschitz. Backtracking
line search optimized this learning rate
:return: initial learning rate
"""
eigen_values = np.linalg.eigvalsh(np.cov(self.x_train.T))
lipschitz = eigen_values[-1] + self.lambd
initial_learning_rate = 1 / lipschitz
return initial_learning_rate
def backtracking(self, w, grad, grad_magnitude, cost, alpha=0.5, gamma=0.8):
"""
Computes optimal learning rate for each iteration.
alpha and gamma are constants and the default values
had proven to work well in most scenarios.
Parameters grad, grad_magnitude and cost are passed instead of being
calculated in the method because they are already calculated in during
gradient descent
:param w: weight vector after last iteration
:param grad: grad at the given beta
:param grad_magnitude: grad magnitude at the given beta
:param cost: cost at the given beta
:param alpha: a constant - default 0.5
:param gamma: a constant - default 0.8
:return: optimal learning rate
"""
learning_rate = self.init_learning_rate
condition = False
i = 0 # Iteration counter
while condition is False and i < self.backtracking_max_iter:
lhs = self.compute_cost(w - learning_rate * grad)
rhs = cost - alpha * learning_rate * grad_magnitude ** 2
if lhs <= rhs:
condition = True
elif i == self.backtracking_max_iter - 1:
self.logger.warning('maximum number of iterations for backtracking reached')
break
else:
learning_rate *= gamma
i += 1
return learning_rate
def compute_cost(self, w):
"""
Computes the cost (value of objective function) for the given coefficient
:param w: weight vector
:return: cost
"""
residuals = self.y_train - self.x_train.dot(w)
least_squares = np.square(np.linalg.norm(residuals, ord=2))
regularization = self.lambd * np.square(np.linalg.norm(w, ord=2))
return (1 / self.n_train) * least_squares + regularization
def compute_grad(self, w):
"""
Computes the gradient for the given coefficient
:param w: weight vector
:return: grad
"""
residuals = self.y_train - self.x_train.dot(w)
least_square_grad = (-2 / self.n_train) * self.x_train.T.dot(residuals)
reg_grad = 2 * self.lambd * w
return least_square_grad + reg_grad
def fast_gradient_descent(self, init_w=None):
"""
Runs accelerated gradient descent algorithm to minimize the cost. The stopping criteria is
minimum gradient or maximum number of iterations whichever comes earlier
:param init_w: by default the weights will be initialized to 0. However, if coefficients are known
from a similar problem they can be used to make convergence much faster.
"""
if init_w is None:
beta = np.zeros((self.d, 1))
theta = np.zeros((self.d, 1))
else:
beta = init_w
theta = init_w
condition = False
itr = 0
while condition is False:
grad_theta = self.compute_grad(beta)
cost = self.compute_cost(beta)
grad_magnitude = np.linalg.norm(grad_theta, ord=2)
learning_rate = self.backtracking(beta, grad_theta, grad_magnitude, cost)
self.w_history[:, itr] = beta.flatten()
self.cost_history[itr] = cost
self.grad_magnitude_history[itr] = grad_magnitude
if grad_magnitude < self.min_grad:
condition = True
self.n_iter = itr + 1
elif itr == self.max_iter - 1:
self.logger.warning('max iteration for fast gradient descent reached before condition became true')
condition = True
self.n_iter = itr + 1
else:
itr += 1
beta = theta - learning_rate * grad_theta
theta = beta + itr / (itr + 3) * (beta - self.w_history[:, (itr - 1)].reshape(self.d, 1))
self.w_history = self.w_history[:, 0:itr]
self.cost_history = self.cost_history[0:itr]
self.grad_magnitude_history = self.grad_magnitude_history[0:itr]
self.w = beta
def train(self, init_w=None):
"""
Trains the model and optimizes the coefficients
:param init_w: by default the coefficients will be initialized to 0. However, if coefficients are known
from a similar problem they can be used to make convergence much faster.
"""
self.init_learning_rate = self.__compute_initial_learning_rate__()
self.fast_gradient_descent(init_w)
def predict(self, x, w=None):
"""
Make predictions
:param x: the features dataset of shape (n, d)
:param w: the coefficients. default - None. If not passed, it will use the coefficients
optimized during training
:return: the predictions in shape (n, 1)
"""
if w is None:
w = self.w
return x.dot(w)
def r_squared(self, x, y, w=None):
"""
Calculates the R Squared value (coefficient of determination). The maximum value is 1 and
higher values indicate better accuracy of the model.
:param x: features dataset
:param y: labels dataset
:param w: coefficients
:return: r squared
"""
predictions = self.predict(x, w)
total_sq_error = np.sum(np.square(y - predictions))
total_variation_y = np.sum(np.square(y - np.mean(y)))
r_squared = 1 - total_sq_error / total_variation_y
return r_squared