Skip to content
Snippets Groups Projects
Commit c5f07170 authored by Ahmed Markhoos's avatar Ahmed Markhoos
Browse files

preliminary HepNetSearch class

parent 499e3f18
No related branches found
No related tags found
1 merge request!74HepNetSearch Implementation
Pipeline #12739 passed
......@@ -990,3 +990,349 @@ class HepNet:
f"{path_base}_wght_{fold_i}.h5 "
f"{path_base}_vars_{fold_i}.json "
f"> {path_base}_{fold_i}.json", file=script_file)
class HepNetSearch:
"""
A hyperparameter tuner/search for HepNet, based on keras-tuner.
The class support multi-processing and distributed hyperparameter tuning on demand.
"""
def __init__(self, keras_model, tuner_name, cross_validator, normalizer, input_list,
output_list, tolerance=None, ETH_IP=None):
"""
HepNetSearch arguments tightly follow those of HepNet, except for the following:
tuner_name: Name of the keras-tuner class {RandomSearch, BayesianOptimization, Hyperband}
tolerence: A positive number defining the tolernce level of the tracked metric validation std.
Models with Nfold_std(val_metric) > tolerence will be discarded from the search_book.
ETH_IP: In case distributed training is planned in the form of a chief-worker model,
the Ethernet IP has to be passed.
"""
self.model = keras_model
self.cv = cross_validator
self.norm_cls = normalizer
self.input_list = input_list
self.output_list = output_list
self.norms = []
self.tuner_settings = None
if tuner_name not in ['Hyperband', 'RandomSearch', 'BayesianOptimization']:
raise ValueError("%s is an invalid tuner. It must be either Hyperband, etc.." % repr(tuner))
else:
self.tuner_name = tuner_name
if tolerance is None:
self.tolerance = np.inf
elif tolerence >=0:
self.tolerance = tolerance
else:
raise ValueError("Tolerance value must either be None or a positive number.")
if ETH_IP == None:
self.ETH_IP = ni.ifaddresses('eno1')[ni.AF_INET][0]['addr']
else:
self.ETH_IP = ETH_IP
def set_tuner(self, **kwds):
"""
Keras tuner kwds
"""
self.tuner_settings = kwds
def search(self, df, weight=None, Nfmp= False, distribute= False, tuner_id= None, **kwds):
"""
Perform the hyperparameter search.
weight: Training weights.
Nfmp: Execute Nfold fits in parallel as multiple processes (multi-processing).
distribute: Allow distributed training.
tuner_id: Chief-worker model ID. This is either 'chief' or 'tunerX', where X is a unique tuner number.
**kwds: Passed directly to fit()
"""
#Meant for internal use only
self.tuner_id = tuner_id
if self.tuner_settings == None:
raise ValueError("The tuner is not yet set. You need to setup tuner_settings as a dictionary kwds for the Keras tuner.")
if weight is None:
weight = Variable("unity", lambda d: np.ones(len(d)))
elif isinstance(weight, str):
weight = Variable(weight, weight)
#Perform the tuning/search on each fold and the oracles and tuners
self.norms = []
oracles = []
tuners = []
multi_tuner = tuner_mp(self)
if Nfmp:
#Run multiprocessing jobs
q= mp.Queue()
procs=[]
for fold_i in range(self.cv.k):
p= mp.Process(target=multi_tuner.search_body,args=(fold_i,df,weight,None,None,distribute,kwds,tuner_id))
procs.append(p)
p.start()
for p in procs:
p.join()
else:
for fold_i in range(self.cv.k):
multi_tuner.search_body(fold_i,df,weight,tuners,oracles,distribute,kwds,tuner_id)
#On search completion, register the combined Nfold searches
if tuner_id == "chief" or tuner_id == None:
for fold_i in range(self.cv.k):
multi_tuner.search_body(fold_i,df,weight,tuners,oracles,distribute,kwds,tuner_id,noTraining=True)
#Fetch hyperparameter names
hps_str=[]
for i in range(len(oracles[0].get_best_trials()[0].get_state()['hyperparameters']['space'])):
hps_str.append(oracles[0].get_best_trials()[0].get_state()['hyperparameters']['space'][i]['config']['name'])
self.hps_str = hps_str
#Evaluate validation mean and std score across folds
search_book = []
fold_0_Ntrials=len(list(oracles[0].trials.values()))
for j in range(fold_0_Ntrials):
fold_0_trial_j = oracles[0].get_best_trials(num_trials=fold_0_Ntrials)[j] #fold_0 oracle, trial_j
fold_0_hps_j = fold_0_trial_j.get_state()['hyperparameters']['values'] #dictionary of fold_0 oracle trial_j ...
#... ALL hyperparameter values
fold_0_hps_j = {p:fold_0_hps_j[p] for p in hps_str} #only model hyperparameters
scores={'fold_0_score': fold_0_trial_j.get_state()['score']} #Register fold_0 trial_j score
#search the fold_i oracle (oracle_i) for the matching trial_m using the hyperparameter values then append the score
for i in range(1,self.cv.k):
fold_i_Ntrials = len(list(oracles[i].trials.values()))
fold_i_trials = oracles[i].get_best_trials(num_trials=fold_i_Ntrials)
for m in range(len(fold_i_trials)):
fold_i_trial_m = fold_i_trials[m]
fold_i_hps_m = fold_i_trial_m.get_state()['hyperparameters']['values']
if fold_0_hps_j.items() <= fold_i_hps_m.items():
scores['fold_'+str(i)+'_score'] = fold_i_trial_m.get_state()['score']
break
search_book.append({**fold_0_hps_j,**scores})
#Convert search_book to a dataframe then evaluate mean and std
search_book = pd.DataFrame(search_book)
scores_book = search_book.drop(hps_str,axis=1)
hps_book = search_book.drop(scores_book.columns,axis=1)
if len(scores_book>1):
search_book = search_book.assign(mean=np.mean(scores_book,axis=1))
search_book = search_book.assign(std=np.std(scores_book,axis=1))
else:
search_book = search_book.assign(mean=np.mean(scores_book,axis=1))
search_book = search_book.assign(std=[0]*len(scores_book))
self.tuners= tuners
self.oracles = oracles
self.search_book = search_book
self.hps_book = hps_book
self.scores_book = scores_book
#Sort the search_book
self.sorted_search_book = self.tolerance_filter(np.inf,return_book=True)
#Make a filtered version of the search_book
self.filtered_search_book = self.tolerance_filter(self.tolerance,return_book=True,overwrite_best_index=True)
def tolerance_filter(self, tolerance, return_book=False, overwrite_best_index=False):
"""
Sort and filter the search_book based on tolerance
"""
#The following if statement is to support custom objective methods
if isinstance(self.tuner_settings['objective'],str):
objective_name = self.tuner_settings['objective']
elif isinstance(self.tuner_settings['objective'], keras_tuner.Objective):
objective_name = self.tuner_settings['objective'].name
else:
raise TypeError("The objective must be a string or tf.keras.Objective")
score_direction=self.oracles[0].get_best_trials()[0].get_state()\
['metrics']['metrics'][objective_name]['direction']
if score_direction=='min':
ascending=True
elif score_direction=='max':
ascending=False
else:
ascending=False
print('Warning: Objective direction is neither max nor min! Defaulted to descending order for optimal trial mean.')
###TODO: The following is a work arround for dropped trials. It is in fact a performance issue
###1-It affects run time 2-It might affect the possibility of reaching the optimal hyperparameter value
#Remove tuner dropped trials
dropped_trial_indices = self.search_book[self.search_book.isnull().any(1)].index.tolist()
filtered_search_book = self.search_book.drop(dropped_trial_indices)
#Get score direction method then sort accordingly
score_direction = getattr(np,score_direction)
filtered_search_book = filtered_search_book.sort_values(by=['mean'],axis=0,ascending=ascending)
search_book_pre_tolerance = filtered_search_book
#Filter by tolerance level
filtered_search_book = filtered_search_book[filtered_search_book['std']<=tolerance]
if len(filtered_search_book)!=0:
best_index=filtered_search_book.iloc[0].name
else:
best_index=search_book_pre_tolerance.iloc[0].name
print("Warning: No trial satisfied the tolerance level provided. Defaulted to tolerance=None."
" To refilter with a new tolerance, call object.tolerance_filter(new_tolerance,True,True), it will return the new filtered_search_book and update the best_index.")
if overwrite_best_index:
self.best_index = best_index
if return_book:
if len(filtered_search_book)!=0:
return filtered_search_book
else:
return serach_book_pre_tolerance
def trial_summary(self, trial_index=None, detailed=False):
"""
Summary for trial_index model. By default, the function returns the best trial summary.
Warnings are expected in this method.
"""
if trial_index==None:
trial_index = self.best_index
trial=dict(self.search_book.loc[trial_index])
print("Index: %s" %trial_index)
print("Best mean score: %s \nBest std: %s" %(trial['mean'],trial['std']))
print("Hyperparameters:")
for key in self.hps_str:
print("\t%s: %s" %(key,trial[key]))
print('\nTrial model summary:')
model=self.get_model(trial_index=trial_index)
model.summary()
if detailed:
print('\n')
print( json.dumps(model.get_config(), indent=1) )
def save_books(self, path='', name='search_book'):
"""
Save the filtered and unfiltered search books as csv
"""
self.search_book.to_csv(path+'/'+name+'.csv')
self.sorted_search_book.to_csv(path+'/'+'sorted_'+name+'.csv')
self.filtered_search_book.to_csv(path+'/'+'filtered_'+name+'.csv')
def get_model(self,trial_index=None):
"""
Get the model of interest by trial_index (oracle_0 or search_book indices)
"""
if trial_index==None:
trial_index=self.best_index
oracle0_Ntrials=len(self.oracles[0].get_state()['tried_so_far'])
hps=self.tuners[0].get_best_hyperparameters(num_trials=oracle0_Ntrials)[trial_index]
model=self.tuners[0].hypermodel.build(hps)
return model
def save_untrained_model(self, path=None, trial_index=None, name='model', **kwds):
"""
Save the untrained model of interest by trial index (oracle_0 or search_book indices)
By default, the best trial is saved. kwds are passed to tf.keras.models.save_model.
"""
if trial_index==None:
trial_index=self.best_index
if path==None:
path=name
else:
path=path+'/'+name
model=self.get_model(trial_index=trial_index)
model.save(path,**kwds)
class tuner_mp(object):
def __init__(self,class_object):
self.__dict__= class_object.__dict__.copy()
def search_body(self,fold_i,df,weight,tuners,oracles,distribute,kwds,tuner_id,noTraining=False):
#Set chief/worker environment communication ports
if distribute and not noTraining:
if tuner_id == None: raise ValueError("tuner_id was not passed.")
os.environ["KERASTUNER_TUNER_ID"]= tuner_id
os.environ["KERASTUNER_ORACLE_IP"]= self.ETH_IP
os.environ["KERASTUNER_ORACLE_PORT"] = str(47808+fold_i)
importlib.reload(keras_tuner)
print("Fold:%s\t ID:%s IP:%s PORT:%s"%(fold_i,os.getenv("KERASTUNER_TUNER_ID"),os.getenv("KERASTUNER_ORACLE_IP"),os.getenv("KERASTUNER_ORACLE_PORT")))
if distribute and noTraining:
try:
del os.environ["KERASTUNER_TUNER_ID"]
del os.environ["KERASTUNER_ORACLE_IP"]
del os.environ["KERASTUNER_ORACLE_PORT"]
except:
pass
print("Fold:%s\t ID:%s IP:%s PORT:%s"%(fold_i,os.getenv("KERASTUNER_TUNER_ID"),os.getenv("KERASTUNER_ORACLE_IP"),os.getenv("KERASTUNER_ORACLE_PORT")))
#Constrain memory growth on physical GPUs
physical_devices = tensorflow.config.list_physical_devices('GPU')
try:
tensorflow.config.experimental.set_memory_growth(physical_devices[0], True)
except:
# In case of CPU or virtual devices
pass
# select training set
selected = self.cv.select_training(df, fold_i)
training_df = df[selected]
# select validation set
selected = self.cv.select_validation(df, fold_i)
validation_df = df[selected]
# seed normalizers
norm = self.norm_cls(training_df, self.input_list)
self.norms.append(norm)
training_df = norm(training_df)
validation_df = norm(validation_df)
# search in fold
tuner = getattr(keras_tuner,self.tuner_name)
tuner_settings_i = self.tuner_settings.copy()
if 'logger' in tuner_settings_i:
tuner_settings_i['logger'].fold = fold_i
tuner_settings_i.update({'project_name':self.tuner_settings['project_name']+'_'+str(fold_i)})
tuner = tuner(self.model,**tuner_settings_i)
tuner.search(training_df[self.input_list],
training_df[self.output_list],
validation_data=(
validation_df[self.input_list],
validation_df[self.output_list],
np.array(weight(validation_df)),
),
sample_weight=np.array(weight(training_df)),
**kwds)
#append for completly saved search
if tuners!=None and oracles!=None:
tuners.append(tuner)
oracles.append(tuner.oracle)
del tuner
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment