Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add multiprocessing support #13

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 57 additions & 21 deletions combo/models/classifier_stacking.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from ..utils.utility import list_diff
from .base import BaseAggregator

from concurrent.futures import ProcessPoolExecutor, Future, wait


def split_datasets(X, y, n_folds=3, shuffle_data=False, random_state=None):
"""Utility function to split the data for stacking. The data is split
Expand Down Expand Up @@ -124,7 +126,7 @@ class Stacking(BaseAggregator):
def __init__(self, base_estimators, meta_clf=None, n_folds=2,
keep_original=True,
use_proba=False, shuffle_data=False, random_state=None,
threshold=None, pre_fitted=None):
threshold=None, pre_fitted=None, n_jobs=None):

super(Stacking, self).__init__(
base_estimators=base_estimators, pre_fitted=pre_fitted)
Expand All @@ -147,6 +149,11 @@ def __init__(self, base_estimators, meta_clf=None, n_folds=2,
self.shuffle_data = shuffle_data

self.random_state = random_state
self.n_jobs = n_jobs
# if n_jobs is not None:
# self._pool = ProcessPoolExecutor(max_workers=n_jobs)
# else:
# self._pool = None

if threshold is not None:
warnings.warn(
Expand Down Expand Up @@ -183,26 +190,55 @@ def fit(self, X, y):
X, y, n_folds=self.n_folds, shuffle_data=self.shuffle_data,
random_state=self.random_state)

# iterate over all base classifiers
for i, clf in enumerate(self.base_estimators):
# iterate over all folds
for j in range(self.n_folds):
# build train and test index
full_idx = list(range(n_samples))
test_idx = index_lists[j]
train_idx = list_diff(full_idx, test_idx)
X_train, y_train = X_new[train_idx, :], y_new[train_idx]
X_test, y_test = X_new[test_idx, :], y_new[test_idx]

# train the classifier
clf.fit(X_train, y_train)

# generate the new features on the pseudo test set
if self.use_proba:
new_features[test_idx, i] = clf.predict_proba(
X_test)[:, 1]
else:
new_features[test_idx, i] = clf.predict(X_test)
if self.n_jobs is not None:
futures_map = dict()
pool = ProcessPoolExecutor(self.n_jobs)
for i, clf in enumerate(self.base_estimators):
# iterate over all folds
for j in range(self.n_folds):
# build train and test index
full_idx = list(range(n_samples))
test_idx = index_lists[j]
train_idx = list_diff(full_idx, test_idx)
X_train, y_train = X_new[train_idx, :], y_new[train_idx]
X_test, y_test = X_new[test_idx, :], y_new[test_idx]

fut: Future = pool.submit(clf.fit, X_train, y_train)
futures_map[(i, j)] = fut

def callback(fut):
if self.use_proba:
new_features[test_idx, i] = \
fut.result().predict_proba(X_test)[:, 1]
else:
new_features[test_idx, i] = \
fut.result().predict(X_test)
print(fut.result())

fut.add_done_callback(callback)

wait(futures_map.values())
else:
# iterate over all base classifiers
for i, clf in enumerate(self.base_estimators):
# iterate over all folds
for j in range(self.n_folds):
# build train and test index
full_idx = list(range(n_samples))
test_idx = index_lists[j]
train_idx = list_diff(full_idx, test_idx)
X_train, y_train = X_new[train_idx, :], y_new[train_idx]
X_test, y_test = X_new[test_idx, :], y_new[test_idx]

# train the classifier
clf.fit(X_train, y_train)

# generate the new features on the pseudo test set
if self.use_proba:
new_features[test_idx, i] = clf.predict_proba(
X_test)[:, 1]
else:
new_features[test_idx, i] = clf.predict(X_test)

# build the new dataset for training
if self.keep_original:
Expand Down