forked from andreashsieh/stacked_generalization
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstacked_generalizer.py
More file actions
232 lines (178 loc) · 7.93 KB
/
stacked_generalizer.py
File metadata and controls
232 lines (178 loc) · 7.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import numpy as np
from sklearn.model_selection import KFold
from sklearn.metrics import (classification_report, accuracy_score,
confusion_matrix)
from copy import copy
def get_predictions(model, X):
if hasattr(model, 'predict_proba'):
pred = model.predict_proba(X)
else:
pred = model.predict(X)
if len(pred.shape) == 1: # for 1-d ouputs
pred = pred[:,None]
return pred
class StackedGeneralizer(object):
"""Base class for stacked generalization classifier models
"""
def __init__(self, base_models=None, blending_model=None, n_folds=5,
verbose=True):
"""
Stacked Generalizer Classifier
Trains a series of base models using K-fold cross-validation, then
combines the predictions of each model into a set of features that are
used to train a high-level classifier model.
Parameters
-----------
base_models: list of classifier models
Each model must have a .fit and .predict_proba/.predict method a'la
sklearn
blending_model: object
A classifier model used to aggregate the outputs of the trained
base models. Must have a .fit and .predict_proba/.predict method
n_folds: int
The number of K-folds to use in =cross-validated model training
verbose: boolean
Example
-------
from sklearn.datasets import load_digits
from stacked_generalizer import StackedGeneralizer
from sklearn.ensemble import (RandomForestClassifier,
ExtraTreesClassifier)
from sklearn.linear_model import LogisticRegression
import numpy as np
logger = Logger('test_stacked_generalizer')
VERBOSE = True
N_FOLDS = 5
# load data and shuffle observations
data = load_digits()
X = data.data
y = data.target
shuffle_idx = np.random.permutation(y.size)
X = X[shuffle_idx]
y = y[shuffle_idx]
# hold out 20 percent of data for testing accuracy
n_train = round(X.shape[0]*.8)
# define base models
base_models = [RandomForestClassifier(n_estimators=100, n_jobs=-1,
criterion='gini'),
RandomForestClassifier(n_estimators=100, n_jobs=-1,
criterion='entropy'),
ExtraTreesClassifier(n_estimators=100, n_jobs=-1,
criterion='gini')]
# define blending model
blending_model = LogisticRegression()
# initialize multi-stage model
sg = StackedGeneralizer(base_models, blending_model,
n_folds=N_FOLDS, verbose=VERBOSE)
# fit model
sg.fit(X[:n_train],y[:n_train])
# test accuracy
pred = sg.predict(X[n_train:])
pred_classes = [np.argmax(p) for p in pred]
_ = sg.evaluate(y[n_train:], pred_classes)
precision recall f1-score support
0 0.97 1.00 0.99 33
1 0.97 1.00 0.99 38
2 1.00 1.00 1.00 42
3 1.00 0.98 0.99 41
4 0.97 0.94 0.95 32
5 0.95 0.98 0.96 41
6 1.00 0.95 0.97 37
7 0.94 0.97 0.96 34
8 0.94 0.94 0.94 34
9 0.96 0.96 0.96 27
avg / total 0.97 0.97 0.97 359
"""
self.base_models = base_models
self.blending_model = blending_model
self.n_folds = n_folds
self.verbose = verbose
self.base_models_cv = None
def fit_base_models(self, X, y):
if self.verbose:
print('Fitting Base Models...')
kf = KFold(y.shape[0], self.n_folds, shuffle=True, random_state=4242)
print('How KFold splits data:')
for j , (train_idx,test_idx) in enumerate(kf):
print(train_idx,test_idx,j)
self.base_models_cv = {}
for i, model in enumerate(self.base_models):
model_name = f"model {i+1:02d}: {model.__repr__()}"
if self.verbose:
print(f'Fitting {model_name}')
# run stratified CV for each model
self.base_models_cv[model_name] = []
for j, (train_idx, test_idx) in enumerate(kf):
if self.verbose:
print(f'Fold {j + 1}')
X_train = X[train_idx]
y_train = y[train_idx]
model.fit(X_train, y_train)
# add trained model to list of CV'd models
self.base_models_cv[model_name].append(copy(model))
def transform_base_models(self, X):
# predict via model averaging
predictions = []
for key in sorted(self.base_models_cv.keys()):
cv_predictions = None
n_models = len(self.base_models_cv[key])
for i, model in enumerate(self.base_models_cv[key]):
model_predictions = get_predictions(model, X)
if cv_predictions is None:
cv_predictions = np.zeros((n_models, X.shape[0],
model_predictions.shape[1]))
cv_predictions[i,:,:] = model_predictions
# perform model averaging and add to features
predictions.append(cv_predictions.mean(0))
# concat all features
predictions = np.hstack(predictions)
return predictions
def fit_transform_base_models(self, X, y):
self.fit_base_models(X, y)
return self.transform_base_models(X)
def fit_blending_model(self, X_blend, y):
if self.verbose:
model_name = f"{self.blending_model.__repr__()}"
print(f'Fitting Blending Model:\n{model_name}')
kf = KFold(y.shape[0], self.n_folds, shuffle=True, random_state=4242)
# run CV
self.blending_model_cv = []
for j, (train_idx, test_idx) in enumerate(kf):
if self.verbose:
print(f'Fold {j}')
X_train = np.take(X_blend,train_idx, axis=0)
y_train = np.take(y,train_idx, axis=0)
print(X_train.shape,y_train.shape)
np.reshape(y_train,(-1,1))
print(X_train.shape,y_train.shape)
model = copy(self.blending_model)
model.fit(X_train, y_train)
# add trained model to list of CV'd models
self.blending_model_cv.append(model)
def transform_blending_model(self, X_blend):
# make predictions from averaged models
predictions = []
n_models = len(self.blending_model_cv)
for i, model in enumerate(self.blending_model_cv):
cv_predictions = None
model_predictions = get_predictions(model, X_blend)
if cv_predictions is None:
cv_predictions = np.zeros((n_models, X_blend.shape[0],
model_predictions.shape[1]))
cv_predictions[i,:,:] = model_predictions
# perform model averaging to get predictions
predictions = cv_predictions.mean(0)
return predictions
def predict(self, X):
# perform model averaging to get predictions
X_blend = self.transform_base_models(X)
predictions = self.transform_blending_model(X_blend)
return predictions
def fit(self, X, y):
X_blend = self.fit_transform_base_models(X, y)
self.fit_blending_model(X_blend, y)
def evaluate(self, y, y_pred):
print(classification_report(y, y_pred))
print('Confusion Matrix:')
print(confusion_matrix(y, y_pred))
return accuracy_score(y, y_pred)