-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathNWTOPT.py
executable file
·178 lines (157 loc) · 6.87 KB
/
NWTOPT.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""
Master Script used to Run All necessary process
[usage]: python3 NWTOPT.py --ip {mongodb ip}
--port {mongodb port}
--key {mongodb job key}
--random {set True if random hyperparameter generation is desired}
--trials {number of trials to run}
--workers {number of HTCondor workers}
--poll_interval {poll interval of HTCondor workers}
--enable_condor {True to enable condor job submission}
--timeout {desired timeout}
* NOTE * not fully completed and not to be used
"""
# Disabling pylint snake_case warnings, import error warnings, and
# redefining out of scope warnings, too many local variables
# too many branches, too many statements
#
# pylint: disable = E0401, C0103, W0621, R0914, R0912, R0915, R0915
import time
import sys
import os
import shutil
import socket
import argparse
import logging
import asyncio
from collections import defaultdict
from objects.OPTSubprocess import OPTSubprocess
from objects.Master import Master
from objects.DB import DB
from objects.Condor import Condor
from objects.DB_Poller import DB_Poller
class NWTOPT():
"""
NWTOPT Master Script
[usage]: python3 NWTOPT.py
--ip {mongodb ip}
--port {mongodb port}
--key {mongodb job key}
--random {set True if random hyperparameter generation is desired}
--trials {number of trials to run}
--workers {number of HTCondor workers}
--poll_interval {poll interval of HTCondor workers}
--enable_condor {True to enable condor job submission}
--timeout {desired timeout}
"""
def __init__(self, args):
"""
Initialization Method
"""
self.ip = args.ip
self.port = args.port
self.key = args.key
self.workers = args.workers
self.random = args.random
self.trials = args.trials
self.poll_interval = args.poll_interval
self.enable_condor = args.enable_condor
self.timeout = args.timeout
self.cwd = os.getcwd()
self.processes = defaultdict(OPTSubprocess)
self.logger = self._init_logger()
self.event_loop = asyncio.new_event_loop()
shutil.copyfile(os.path.join(self.cwd, 'config', 'HParams.py'), os.path.join(self.cwd, 'NWT_SUBMIT', 'NWTOPT_FILES', 'HParams.py'))
self.log(f'Working out of {self.cwd}', 0)
def _init_logger(self):
"""
Initializes logger
"""
logger = logging.getLogger('NWTOPT')
logger.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler(stream=sys.stdout)
if logger.hasHandlers():
for hdlr in logger.handlers:
logger.removeHandler(hdlr)
# creates both .log file and output stream
file_handler = logging.FileHandler('./NWTOPT.log', mode='w', encoding='utf-8')
formatter = logging.Formatter('%(asctime)s:[%(levelname)7s]:%(threadName)12s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
def log(self, msg, level):
"""
Log
[usage]: takes in a log message and log severity
"""
if level not in [0, 1, 2]: self.log('Invalid log level', 2)
else:
if level == 0:
self.logger.info(msg)
elif level == 1:
self.logger.warning(msg)
else:
self.logger.error(msg)
def start_loop(self):
"""
Start asycio inifite loop
"""
self.event_loop.create_task(self.processes['DB'].init_db())
self.event_loop.create_task(self.processes['DB_Poller'].init_poller())
if self.enable_condor:
self.event_loop.create_task(self.processes['Condor'].init_condor())
self.event_loop.create_task(self.processes['Master'].init_master())
self.event_loop.run_forever()
def init_master(self):
"""
Initialize master process and store in NWTOPT object suprocesses
"""
master_process = Master(3, self.logger, self.cwd, self.ip, self.port, self.key, self.random, self.trials)
self.processes['Master'] = master_process
def init_db(self):
"""
Initialize database process and store in NWTOPT object suprocesses
"""
db_process = DB(1, self.logger, self.cwd, self.ip, self.port)
self.processes['DB'] = db_process
def init_condor(self):
"""
Initialize conodor process and store in NWTOPT object suprocesses
"""
condor_process = Condor(2, self.logger, self.cwd, self.ip, self.port, self.poll_interval, self.workers, self.timeout)
self.processes['Condor'] = condor_process
def init_db_poller(self):
"""
Initialize database polling process and store in NWTOPT object suprocesses
"""
dbPoll_process = DB_Poller(4, self.logger, self.cwd, self.ip, self.port, self.key, 60)
self.processes['DB_Poller'] = dbPoll_process
if __name__ == '__main__':
# REMEBER TO REREQUIRE CERTAIN ARGS, SIMPLY FALSE FOR TESTING
parser = argparse.ArgumentParser(description='NWTOPT - Hyperparameter Optimization for MODFLOW-NWT')
parser.add_argument('--ip', type=str, required=False, default=socket.gethostbyname(socket.gethostname()), help='ip address of DB')
parser.add_argument('--port', type=int, required=False, default=27017, help='port of DB')
parser.add_argument('--key', type=str, required=True, default = '', help='key of job you want to pull')
parser.add_argument('--workers', type=int, required=False, default=1, help='the number of Condor workers to deploy')
parser.add_argument('--random', type=bool, required=False, default=False, help='set to True to switch from TPE to Random Search')
parser.add_argument('--trials', type=int, required=False, default = 1, help='the number of optimization trials')
parser.add_argument('--poll_interval', type=int, required=False, default=240, help='the frequency that a Condor worker pings the DB in seconds')
parser.add_argument('--enable_condor', type=bool, required=False, default=False, help='set to True to send out jobs through Condor')
parser.add_argument('--timeout', type=float, required=False, default=22, help='model run time limit - leave empty for no time limit')
# init vars
args = parser.parse_args()
assert args.trials > 0, 'You cannot run NWTOPT with less than 1 trial'
assert args.poll_interval > 0, 'You cannot run NWTOPT with a poll interval less than 1 second'
if args.enable_condor:
assert args.workers > 0, 'Please specify your desired number of workers'
OPTHandler = NWTOPT(args)
# initialize subprocess and start loop
OPTHandler.init_db()
OPTHandler.init_master()
if args.enable_condor:
OPTHandler.init_condor()
OPTHandler.init_db_poller()
OPTHandler.start_loop()
# killProcesses()