From ae5eb5b63ad2ffcd6bea28a015471c838833f767 Mon Sep 17 00:00:00 2001 From: wuyongji Date: Mon, 29 Mar 2021 08:04:32 +0800 Subject: [PATCH] add multiprocessing support --- combo/models/classifier_stacking.py | 78 +++++++++++++++++++++-------- 1 file changed, 57 insertions(+), 21 deletions(-) diff --git a/combo/models/classifier_stacking.py b/combo/models/classifier_stacking.py index 01c681a..b365567 100644 --- a/combo/models/classifier_stacking.py +++ b/combo/models/classifier_stacking.py @@ -20,6 +20,8 @@ from ..utils.utility import list_diff from .base import BaseAggregator +from concurrent.futures import ProcessPoolExecutor, Future, wait + def split_datasets(X, y, n_folds=3, shuffle_data=False, random_state=None): """Utility function to split the data for stacking. The data is split @@ -124,7 +126,7 @@ class Stacking(BaseAggregator): def __init__(self, base_estimators, meta_clf=None, n_folds=2, keep_original=True, use_proba=False, shuffle_data=False, random_state=None, - threshold=None, pre_fitted=None): + threshold=None, pre_fitted=None, n_jobs=None): super(Stacking, self).__init__( base_estimators=base_estimators, pre_fitted=pre_fitted) @@ -147,6 +149,11 @@ def __init__(self, base_estimators, meta_clf=None, n_folds=2, self.shuffle_data = shuffle_data self.random_state = random_state + self.n_jobs = n_jobs + # if n_jobs is not None: + # self._pool = ProcessPoolExecutor(max_workers=n_jobs) + # else: + # self._pool = None if threshold is not None: warnings.warn( @@ -183,26 +190,55 @@ def fit(self, X, y): X, y, n_folds=self.n_folds, shuffle_data=self.shuffle_data, random_state=self.random_state) - # iterate over all base classifiers - for i, clf in enumerate(self.base_estimators): - # iterate over all folds - for j in range(self.n_folds): - # build train and test index - full_idx = list(range(n_samples)) - test_idx = index_lists[j] - train_idx = list_diff(full_idx, test_idx) - X_train, y_train = X_new[train_idx, :], y_new[train_idx] - X_test, y_test = X_new[test_idx, :], y_new[test_idx] - - # train the classifier - clf.fit(X_train, y_train) - - # generate the new features on the pseudo test set - if self.use_proba: - new_features[test_idx, i] = clf.predict_proba( - X_test)[:, 1] - else: - new_features[test_idx, i] = clf.predict(X_test) + if self.n_jobs is not None: + futures_map = dict() + pool = ProcessPoolExecutor(self.n_jobs) + for i, clf in enumerate(self.base_estimators): + # iterate over all folds + for j in range(self.n_folds): + # build train and test index + full_idx = list(range(n_samples)) + test_idx = index_lists[j] + train_idx = list_diff(full_idx, test_idx) + X_train, y_train = X_new[train_idx, :], y_new[train_idx] + X_test, y_test = X_new[test_idx, :], y_new[test_idx] + + fut: Future = pool.submit(clf.fit, X_train, y_train) + futures_map[(i, j)] = fut + + def callback(fut): + if self.use_proba: + new_features[test_idx, i] = \ + fut.result().predict_proba(X_test)[:, 1] + else: + new_features[test_idx, i] = \ + fut.result().predict(X_test) + print(fut.result()) + + fut.add_done_callback(callback) + + wait(futures_map.values()) + else: + # iterate over all base classifiers + for i, clf in enumerate(self.base_estimators): + # iterate over all folds + for j in range(self.n_folds): + # build train and test index + full_idx = list(range(n_samples)) + test_idx = index_lists[j] + train_idx = list_diff(full_idx, test_idx) + X_train, y_train = X_new[train_idx, :], y_new[train_idx] + X_test, y_test = X_new[test_idx, :], y_new[test_idx] + + # train the classifier + clf.fit(X_train, y_train) + + # generate the new features on the pseudo test set + if self.use_proba: + new_features[test_idx, i] = clf.predict_proba( + X_test)[:, 1] + else: + new_features[test_idx, i] = clf.predict(X_test) # build the new dataset for training if self.keep_original: