# coding=utf-8
# Copyright 2016-2018 Emre Ahmetick
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
__author__ = "Emre Ahmetick"
__copyright__ = "Copyright 2018, Emre Ahmetick"
__maintainer__ = "Emre Ahmetick"
__email__ = "ahmetick@fhi-berlin.mpg.de"
__date__ = "23/09/18"
import numpy as np
from ai4materials.utils.utils_config import SSH
import os
import sched
import time
import sys
import logging
from shutil import rmtree
import pandas as pd
from subprocess import Popen
import operator as opop
from copy import deepcopy
from functools import reduce
F_unit = [
['IP(A)', 'IP(B)', 'EA(A)', 'EA(B)'],
['E_HOMO(A)', 'E_HOMO(B)', 'E_LUMO(A)', 'E_LUMO(B)'],
['r_s(A)', 'r_s(B)', 'r_p(A)', 'r_p(B)', 'r_d(A)', 'r_d(B)', 'r_sigma(AB)', 'r_pi(AB)'],
['Z(A)', 'Z(B)', 'Z_val(A)', 'Z_val(B)', 'period(A)', 'period(B)'],
['d(AB)', 'd(A)', 'd(B)'],
['E_b(AB)', 'E_b(A)', 'E_b(B)'],
['HL_gap(AB)', 'HL_gap(A)', 'HL_gap(B)'],
]
reals = [
'IP(A)',
'IP(B)',
'EA(A)',
'EA(B)',
'E_HOMO(A)',
'E_HOMO(B)',
'E_LUMO(A)',
'E_LUMO(B)',
'r_s(A)',
'r_s(B)',
'r_p(A)',
'r_p(B)',
'r_d(A)',
'r_d(B)',
'd(AB)',
'd(A)',
'd(B)',
'E_b(AB)',
'E_b(A)',
'E_b(B)',
'HL_gap(AB)',
'HL_gap(A)',
'HL_gap(B)',
'r_sigma(AB)',
'r_pi(AB)']
ints = ['Z(A)', 'Z(B)', 'Z_val(A)', 'Z_val(B)', 'period(A)', 'period(B)']
standard_format = [
'IP(A)',
'IP(B)',
'EA(A)',
'EA(B)',
'E_HOMO(A)',
'E_HOMO(B)',
'E_LUMO(A)',
'E_LUMO(B)',
'r_s(A)',
'r_s(B)',
'r_p(A)',
'r_p(B)',
'r_d(A)',
'r_d(B)',
'd(AB)',
'd(A)',
'd(B)',
'Z(A)',
'Z(B)',
'Z_val(A)',
'Z_val(B)',
'E_b(AB)',
'E_b(A)',
'E_b(B)',
'HL_gap(AB)',
'HL_gap(A)',
'HL_gap(B)',
'r_sigma(AB)',
'r_pi(AB)',
'period(A)',
'period(B)']
converted_format = [
'ipA',
'ipB',
'eaA',
'eaB',
'homoA',
'homoB',
'lumoA',
'lumoB',
'rsA',
'rsB',
'rpA',
'rpB',
'rdA',
'rdB',
'disAB',
'disA',
'disB',
'zA',
'zB',
'valA',
'valB',
'ebAB',
'ebA',
'ebB',
'hlgapAB',
'hlgapA',
'hlgapB',
'rsigmaAB',
'rpiAB',
'periodA',
'periodB']
standard_2_converted = dict(zip(standard_format, converted_format))
converted_2_standard = dict(zip(converted_format, standard_format))
""" Set logger for outputs as errors, warnings, infos. """
#
# try:
# hdlr = logging.FileHandler(configs["output_file"], mode='a')
# except:
# hdlr = logging.FileHandler(configs["output_file"], mode='w')
#
# level = logging.getLevelName(configs["log_level_general"])
#
# logger = logging.getLogger(__name__)
# logger.setLevel(level)
# logging.basicConfig(level=level)
# FORMAT = "%(levelname)s: %(message)s"
# formatter = logging.Formatter(fmt=FORMAT)
# handler = logging.StreamHandler()
# handler.setFormatter(formatter)
# hdlr.setFormatter(formatter)
# logger.addHandler(handler)
# logger.addHandler(hdlr)
# logger.setLevel(level)
# logger.propagate = False
#
# __metainfopath__ = configs["meta_info_file"]
# START PARAMETERS REFERENCE
# In the following lists of tuples the order of the items might be important. Thus no dict is used.
# If value is tuple, then only one of items are possible as value when passing the dict control to the SIS class.
Tuple_list = [
# FCDI
('mpiname', str), # code will be run by: mpiname codename. set mpiname='' for serial run.
('desc_dim', int), # starting iteration (can be n if iteration up to n-1 already calculated before)
('ptype', ('quanti', 'quali')), # property type: 'quanti'(quantitative),'quali'(qualitative)
('ntask', int), # number of tasks (properties)
('nsample', list), # number of samples for each task (and group for classification, e.g. (4,3,5),(7,9) )
('width', float), # for classification, the boundary tolerance
# FC
('nsf', int), # number of scalar features (i.e.: the atomic parameters)
('task_arr', int), # number of tasks arranged in columns
('rung', int), # rung of feature spaces (rounds of combination)
('opset', list), # oprators(currently: (+)(-)(*)(/)(exp)(log)(^-1)(^2)(^3)(sqrt)(|-|) )
('ndimtype', int), # number of dimension types (for dimension analysis)
('dimclass', list), # specify features in each class denoted by ( )
('allele', bool), # Should all elements appear in each of the selected features?
('nele', int), # number of element (<=6): useful only when symm=.true. and/or allele=.true.
('maxfval_lb', float), # features having the max. abs. data value <maxfval_lb will not be selected
('maxfval_ub', float), # features having the max. abs. data value >maxfval_ub will not be selected
('subs_sis', int), # total number of features selected by sure independent screen
# DI
('method', ('L1L0', 'L0')), # 'L1L0' or 'L0'
('size_fs', int), # number of total features in each taskxxx.dat (same for all)
('nfL0', int), # number of features for L0(ntotf->nfL0 if nfL0>ntotf)
('metric', ('LS_RMSE', 'CV_RMSE', 'CV_MAE')), # metric for the evaluation: LS_RMSE,CV_RMSE,CV_MAE
('n_eval', int), # number of top models (based on fitting) to be evaluated by the metric
('CV_fold', int), # k-fold CV (>=2)
('CV_repeat', int), # repeated k-fold CV
('n_out', int), # number of top models to be output, off when =0
]
# Generate lists and dics for easier coding later.
Param_key_list = [i for i, j in Tuple_list]
Param_dic = dict(Tuple_list)
# Important: control reference. Specifies how the structure of input control dict to SIS class should look like.
# If key tuple, then value has to be tuple, too. A tuple stands for the option that on and only one of the keys
# have to set.
control_ref = {
'local_paths': {'local_path': str, 'SIS_input_folder_name': str},
('local_run', 'remote_run'): (
{'SIS_code_path': str, 'mpi_command': str},
{'SIS_code_path': str, 'username': str, 'hostname': str, 'port': int, 'remote_path': str,
'eos': bool, 'mpi_command': str, 'nodes': int, ('key_file', 'password'): (str, str)}
),
'parameters': {'rung': int, 'subs_sis': int, 'desc_dim': int, 'opset': list, 'ptype': ('quanti', 'quali')},
'advanced_parameters': Param_dic
}
# All keys which do not need to be set in input control dict tree. If they are not set, default values are used.
not_mandotary = ['advanced_parameters', 'eos', 'nodes', 'port', 'FC', 'DI', 'FCDI'] + Param_key_list
# Availabel OPs for the SIS fortran code, at the moment.
available_OPs = ['+', '-', '*', '/', 'exp', 'exp-', '^-1', '^2', '^3', 'sqrt', 'log', '|-|', 'SCD', '^6']
un_OP = ['exp', '^2', 'exp-', '^-1', '^2', '^3', 'sqrt', 'log', 'SCD', '^6']
bin_OP = ['-', '/']
bin_OP_bino = ['+', '|-|', '*']
# END PARAMETERS REFERENCE
[docs]class SIS(object):
""" Python interface with the fortran SIS+(Sure Independent Screening)+L0/L1L0 code.
The SIS+(Sure Independent Screening)+L0/L1L0 is a greedy algorithm. It enhances the OMP, by considering
not only the closest feature vector to the residual in each step, but collects the closest 'n_SIS' features vectors.
The final model is then built after a given number of iterations by determining the (approximately) best linear combination
of the collected features using the L0 (L1-L0) algorithm.
To execute the code, besides the SIS code parameters also folder paths are needed as well as account
information of a remote machine to let the code be executed on it.
Parameters
----------
P : array, [n_sample]; list; [n_sample]
P refers to the target (label). If ptype = 'quali' list of ints is required
D : array, [n_sample, n_features]
D refers to the feature matrix. The SIS code calculates algebraic combinations
of the features and then applies the SIS+L0/L1L0 algorithm.
feature_list : list of strings
List of feature names. Needs to be in the same order as the feature vectors (columns) in D.
Features must consist of strings which are in F_unit (See above).
feature_unit_classes : None or {list integers or the string: 'no_unit'}
integers correspond to the unit class of the features from feature_list. 'no_unit' is reserved for
dimensionless unit.
output_log_file : string
file path for the logger output.
rm_existing_files : bool
If SIS_input_path on local or remote machine (remote_input_path) exists, it is removed.
Otherwise it is renamed to SIS_input_path_$number.
control : dict of dicts (of dicts)
Dict tree: {
'local_paths': { 'local_path':str, 'SIS_input_folder_name':str},
('local_run','remote_run') : (
{'SIS_code_path':str, 'mpi_command':str},
{'SIS_code_path':str, 'username':str, 'hostname':str, 'remote_path':str, 'eos':bool, 'mpi_command':str, 'nodes':int, ('key_file', 'password'):(str,str)}
),
'parameters' : {'n_comb':int, 'n_sis':int, 'max_dim':int, 'OP_list':list},
'advanced_parameters' : {'FC':FC_dic,'DI':DI_dic, 'FCDI':FCDI_dic}
}
Here the tuples (.,.) mean that one and only one of the both keys has to be set.
To see forms of FC_dic, DI_dic, FCDI_dic check FC_tuplelist, DI_tuplelist and FCDI_tuplelist above in PARAMETERS REFERENCE.
Attributes
----------
start : -
starts the code
get_results : list [max_dim] of dicts {'D', 'coefficients', 'P_pred'}
get_results[model_dim-1]['D'] : pandas data frame [n_sample, model_dim+1]
Descriptor matrix with the columns being algebraic combinations of the input feature matrix.
Column names are thus strings of the algebraic combinations of strings of inout feature_list.
Last column is full of ones corresponding to the intercept
get_results[model_dim-1]['coefficients'] : array [model_dim+1]
Optimizing coefficients.
get_results[model_dim-1]['P_pred'] : array [m_sample]
Fit : np.dot( np.array(D), coefficients)
Notes
-----
For remote_run the library nomad_sim.ssh_code is needed. If remote machine is eos,
in dict control['remote_run'] the (key:value) 'eos':True has to be set. Then set
for example in addition 'nodes':1 and 'mpi_run -np 32' can be set.
Paths (say name: path) are all set in the intialization part with self.path and
used in other functions with self.path. In general the other variables are directly
passed as arguements to the functions. There are a few exceptions as self.ssh.
Examples
--------
# >>> import numpy as np
# >>> from nomad_sim.SIS import SIS
# >>> ### Specify where on local machine input files for the SIS fortran code shall be created
# >>> Local_paths = {
# >>> 'local_path' : '/home/beaker/',
# >>> 'SIS_input_folder_name' : 'SIS_input',
# >>> }
# >>> # Information for ssh connection. Instead of password also 'key_file' for rsa key
# >>> # file path is possible.
# >>> Remote_run = {
# >>> 'mpi_command':'',
# >>> 'remote_path' : '/home/username/',
# >>> 'SIS_code_path' : '/home/username/SIS_code/',
# >>> 'hostname' :'hostname',
# >>> 'username' : 'username',
# >>> 'password' : 'XXX'
# >>> }
# >>> # Parameters for the SIS fortran code. If at each iteration a different 'OP_list'
# >>> # shall be used, set a list of max_dim lists, e.g. [ ['+','-','*'], ['/','*'] ], if
# >>> # n_comb = 2
# >>> Parameters = {
# >>> 'n_comb' : 2,
# >>> 'OP_list' : ['+','|-|','-','*','/','exp','^2'],
# >>> 'max_dim' : 2,
# >>> 'n_sis' : 10
# >>> }
# >>> # Final control dict for the SIS class. Instead of remote_run also local_run can be set
# >>> # (with different keys). Also advanced_parameters can be set, but should be done only
# >>> # if the parameters of the SIS fortran code are understood.
# >>> SIS_control = {'local_paths':Local_paths, 'remote_run':Remote_run, 'parameters':Parameters}
# >>> # Target (label) vector P , feature_list, feature matrix D. The values are made up.
# >>> P = np.array( [1,2,3,-2,-9] )
# >>> feature_list=['r_p(A)','r_p(B)', 'Z(A)']
# >>> D = np.array([[7,-11,3],
# >>> [-1,-2,4],
# >>> [2,20,3],
# >>> [8,1,8],
# >>> [-3,4,1]])
# >>> # Use the code
# >>> sis = SIS(P,D,feature_list, control = SIS_control, output_log_file ='/home/ahmetcik/codes/beaker/output.log')
# >>> sis.start()
# >>> results = sis.get_results()
# >>>
# >>> coef_1dim = results[0]['coefficients']
# >>> coef_2dim = results[1]['coefficients']
# >>> D_1dim = results[0]['D']
# >>> D_2dim = results[1]['D']
# >>> print coef_2dim
# [-3.1514 -5.9171 3.9697]
# >>>
# >>> print D_2dim
# ((rp(B)/Z(A))/(rp(A)+rp(B))) ((Z(A)/rp(B))/(rp(B)*Z(A))) intercept
# 0 0.916670 0.008264 1.0
# 1 0.166670 0.250000 1.0
# 2 0.303030 0.002500 1.0
# 3 0.013889 1.000000 1.0
# 4 4.000000 0.062500 1.0
#
# """
# START INIT
def __init__(self, P, D, feature_list, feature_unit_classes=None, target_unit='eV', control=None,
output_log_file='/home/beaker/.beaker/v1/web/tmp/output.log', rm_existing_files=False, if_print=True, check_only_control=False):
control = deepcopy(control)
self.rm_existing_files = rm_existing_files
self.target_unit = target_unit
# set_logger(output_log_file)
self.logger = logger
self.if_print = if_print
# Check inputs
self.check_arrays(P, D, feature_list, feature_unit_classes, control['parameters']['ptype'])
self.check_control(control, control_ref, "control")
self.check_quali_dim(control)
self.check_OP_list(control)
if check_only_control:
return
# Distribute the control keys to the corresponding init functions.
self.set_main_settings(P, D, feature_list, feature_unit_classes, **control['local_paths'])
if 'remote_run' in control:
self.set_ssh_connection(**control['remote_run'])
else:
self.set_local_run(**control['local_run'])
if 'advanced_parameters' in control:
advanced_parameters = control['advanced_parameters']
else:
advanced_parameters = None
self.set_SIS_parameters(advanced_parameters=advanced_parameters, **control['parameters'])
self.predicted_feature_space_size = None
self.l0_steps = None
self.checking_expense = True
self.if_print = False
self.if_close_ssh = False
self.estimate_calculation_expense(feature_list)
self.checking_expense = False
self.if_print = if_print
if control['parameters']['ptype'] == 'quanti':
self.if_close_ssh = True
[docs] def set_main_settings(self, P, D, feature_list, feature_unit_classes,
local_path='/home/beaker/', SIS_input_folder_name='input_folder'):
""" Set local environment and P, D and feature_list."""
self.local_path = local_path
self.SIS_input_folder_name = SIS_input_folder_name
self.SIS_input_path = os.path.join(self.local_path, SIS_input_folder_name)
if feature_unit_classes is None:
feature_unit_classes = [0 for _ in feature_list]
# Bring feature_list and D in the feature_order of F_unit becauese self.check_feature_untis needs it.
ordered_indices = np.argsort(feature_unit_classes)
self.feature_unit_classes = [feature_unit_classes[i] for i in ordered_indices]
self.feature_list = [feature_list[i] for i in ordered_indices]
self.D = D[:, ordered_indices]
self.P = P
self.ssh_connection = False
self.local_run = False
[docs] def set_local_run(self, SIS_code_path='~/codes/SIS_code/', mpi_command=''):
""" Set and check local enviroment if local_run is used."""
self.local_run = True
self.SIS_code_path = SIS_code_path
self.SIS_code_FCDI = os.path.join(self.SIS_code_path, 'FCDI')
self.mpi_command = mpi_command
# Check if SIS_code_path exists and if the SIS codes FC, DI and FCDI exist in it.
if os.path.isdir(self.SIS_code_path):
for program in ['FCDI', 'FC', 'DI']:
program_path = os.path.join(self.SIS_code_path, program)
if not os.path.exists(program_path):
raise OSError("No executable: %s" % program_path)
else:
raise OSError("No such directory: %s" % self.SIS_code_path)
[docs] def set_ssh_connection(self, hostname=None, username=None, port=22, key_file=None, password=None,
remote_path=None, SIS_code_path=None, eos=False, nodes=1, mpi_command=''):
""" Set ssh connection. Set and check remote enviroment if remote_run is used."""
self.ssh_connection = True
# weather close ssh connection at the end of do_transfer
self.if_close_ssh = True
self.remote_path = remote_path
self.SIS_code_path = SIS_code_path
self.SIS_code_FCDI = os.path.join(self.SIS_code_path, 'FCDI')
self.remote_input_path = os.path.join(self.remote_path, self.SIS_input_folder_name)
self.username = username
self.mpi_command = mpi_command
self.eos = eos
key_file = self.check_(key_file)
# set ssh connection
try:
self.ssh = SSH(hostname=hostname, username=self.username, port=port, key_file=key_file, password=password)
os.remove(key_file)
except Exception as e:
os.remove(key_file)
self.logger.error('ssh connection failed. The error message:\n%s' % e)
sys.exit(1)
# set number of CPUs for job submission script.
if eos:
self.CPUs = nodes * 32
else:
# Further remote machines... Now only eos
self.CPUs = None
# check paths on remote machine
# Check if SIS_code_path exists and if the SIS codes FC, DI and FCDI exist in it.
if self.ssh.isdir(self.SIS_code_path):
for program in ['FCDI', 'FC', 'DI']:
program_path = os.path.join(self.SIS_code_path, program)
if not self.ssh.exists(program_path):
raise OSError("No such executable on remote machine: %s" % program_path)
else:
raise OSError("No such directory on remote machine: %s" % self.SIS_code_path)
if not self.ssh.isdir(self.remote_path):
raise OSError("No such directory on remote machine: %s" % self.remote_path)
[docs] def set_SIS_parameters(self, desc_dim=2, subs_sis=100, rung=1, opset=[
'+', '-', '/', '^2', 'exp'], ptype='quanti', advanced_parameters=None):
""" Set the SIS fortran code parameters
If advanced parameters is passed, they will be used, otherwise default values will be used.
Also max_dim, n_sis, n_comb, and OP_list can be overwritten by advanced_parameters if specified.
"""
# Get units. It is a list of strings, e.g. ['(1:4)','(5:8)',...], specifiying which columns/features of D
# belong to a unit class. Index starts with 1. The columns/features were ordered in self.set_main_settings
# such that columns/features of same unit are next to each other.
units_list = self.check_feature_units(self.feature_unit_classes)
ndimtype = len(units_list)
nsf = len(self.feature_list)
# self.set_par will use it
self.advanced_parameters = advanced_parameters
# Get shape of P
if ptype == 'quanti':
row_lengths = len(self.P)
else:
index = np.unique(self.P, return_index=True)[1]
class_names = [self.P[i] for i in np.sort(index)]
row_lengths = tuple([len([None for p in self.P if p == current_class]) for current_class in class_names])
# initilize SIS parameters: self.parameters
self.parameters = dict.fromkeys(Param_key_list)
# set parameters
# FCDI
# code will be run by: mpiname codename. set mpiname='' for serial run.
self.parameters['mpiname'] = self.mpi_command
self.parameters['desc_dim'] = desc_dim # ending iteration
self.parameters['ptype'] = ptype # property type: 'quanti'(quantitative),'quali'(qualitative)
self.parameters['ntask'] = 1 # number of tasks (properties)
# number of samples for each task (and group for classification, e.g. (4,3,5),(7,9) )
self.parameters['nsample'] = row_lengths
self.parameters['width'] = 0.01 # for classification, the boundary tolerance
# FC
self.parameters['nsf'] = nsf # number of scalar features (i.e.: the atomic parameters)
self.parameters['task_arr'] = '1c' # number of tasks arranged in columns
self.parameters['rung'] = rung # rung of feature spaces (rounds of combination)
self.parameters['opset'] = opset # oprators(currently: (+)(-)(*)(/)(exp)(log)(^-1)(^2)(^3)(sqrt)(|-|) )
self.parameters['ndimtype'] = ndimtype # number of dimension types (for dimension analysis)
self.parameters['dimclass'] = units_list # specify features in each class denoted by ( )
self.parameters['allele'] = False # Should all elements appear in each of the selected features?
self.parameters['nele'] = 0 # number of element (<=6): useful only when symm=.true. and/or allele=.true.
# features having the max. abs. data value <maxfval_lb will not be selected
self.parameters['maxfval_lb'] = 1e-8
# features having the max. abs. data value >maxfval_ub will not be selected
self.parameters['maxfval_ub'] = 1e5
self.parameters['subs_sis'] = subs_sis # total number of features selected by sure independent screen
# DI
self.parameters['method'] = 'L0' # 'L1L0' or 'L0'
self.parameters['size_fs'] = '' # number of total features in each taskxxx.dat (same for all)
self.parameters['nfL0'] = '' # number of features for L0(ntotf->nfL0 if nfL0>ntotf)
self.parameters['metric'] = 'LS_RMSE' # metric for the evaluation: LS_RMSE,CV_RMSE,CV_MAE
# number of top models (based on fitting) to be evaluated by the metric
self.parameters['n_eval'] = 1000
self.parameters['CV_fold'] = 10 # k-fold CV (>=2)
self.parameters['CV_repeat'] = 1 # repeated k-fold CV
self.parameters['n_out'] = 100 # number of top models to be output, off when =0
# overwrite parameter values if specified in advanced_parameters
if not advanced_parameters is None:
for key, value in advanced_parameters.iteritems():
self.parameters[key] = value
# END INIT
[docs] def start(self):
""" Attribute which starts the calculations after init. """
# Check if folders exists. If yes delete (if self.rm_existing_files)
# or rename it to self.SIS_input_path_old_#
if os.path.isdir(self.SIS_input_path):
self.logger.warning('Directory %s already exists.' % self.SIS_input_path)
if self.rm_existing_files:
rmtree(self.SIS_input_path)
self.logger.warning('It is removed.')
else:
for i in range(1000):
old_name = "%s_old_%s" % (self.SIS_input_path, i)
if not os.path.isdir(old_name):
os.rename(self.SIS_input_path, old_name)
break
self.logger.warning('It is renamed to %s.' % old_name)
# creat input folder on local machine
os.mkdir(self.SIS_input_path)
# write input files in inputfolder
self.write_P_D(self.P, self.D, self.feature_list)
self.write_parameters()
# decide if calculation on local or remote machine
if self.ssh_connection:
self.do_transfer(ssh=self.ssh, eos=self.eos, username=self.username, CPUs=self.CPUs)
else:
# calculate on local machine. (At the moment not clear if python blocks parallel computing)
os.chdir(self.SIS_input_path)
Popen(self.SIS_code_FCDI).wait()
[docs] def set_logger(self, output_log_file):
""" Set logger for outputs as errors, warnings, infos. """
self.logger = logging.getLogger(__name__)
hdlr = logging.FileHandler(output_log_file)
self.logger.setLevel(logging.INFO)
logging.basicConfig(level=logging.INFO)
FORMAT = "%(levelname)s: %(message)s"
formatter = logging.Formatter(fmt=FORMAT)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
hdlr.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.addHandler(hdlr)
self.logger.setLevel(logging.INFO)
self.logger.propagate = False
# START ckecking functions before calculations
[docs] def check_arrays(self, P_in, D, feature_list, feature_unit_classes, ptype):
""" Check arrays/list P, D and feature_list"""
P, D, feature_list = np.array(P_in), np.array(D), np.array(feature_list)
P_shape, D_shape, f_shape = P.shape, D.shape, feature_list.shape
if not len(D_shape) == 2:
self.logger.error(
'Dimension of feature matrix is %s. A two-dimensional list or array is needed.' %
len(D_shape))
sys.exit(1)
if not len(f_shape) == 1:
self.logger.error(
'Dimension of feature list is %s. A one-dimensional list or array is needed.' %
len(f_shape))
sys.exit(1)
if not P_shape[0] == D_shape[0]:
self.logger.error(
"Length (%s) of target property has to match to number of rows (%s) of feature matrix." %
(P_shape[0], D_shape[0]))
sys.exit(1)
if ptype == 'quanti':
if not all(isinstance(el, (float, int)) for el in P):
self.logger.error("For ptype = 'quanti', a 1-dimensional array of floats/ints is required is required.")
sys.exit(1)
if ptype == 'quali':
if not all(isinstance(el, int) for el in P_in):
self.logger.error("For ptype = 'quali', a 1-dimensional array of ints is required is required.")
sys.exit(1)
index = np.unique(P, return_index=True)[1]
class_names = P[np.sort(index)]
n_class = len(class_names)
current_i = 0
for p in P:
if not p == class_names[current_i]:
current_i += 1
if n_class == current_i:
self.logger.error("For ptype = 'quali', the target property has to be ordered by classes:")
self.logger.error("first all members of the first class, next all members of the next class ...")
sys.exit(1)
if not D_shape[1] == f_shape[0]:
self.logger.error(
'Length (%s) of feature_list has to match to number of columns (%s) of feature matrix.' %
(f_shape[0], D_shape[1]))
sys.exit(1)
if f_shape[0] < 2:
self.logger.error('Length of feature_list is %s. Choose at least two features.' % f_shape[0])
sys.exit(1)
if not isinstance(feature_unit_classes, (np.ndarray, list, type(None))):
raise TypeError("'feature_unit_classes' must be numpy array, list or None.")
if isinstance(feature_unit_classes, (np.ndarray, list)) and f_shape[0] != len(feature_unit_classes):
self.logger.error('Length of feature_unit_classes does not match length of feature_list.')
sys.exit(1)
feature_unit_classes_integers = [f for f in feature_unit_classes if isinstance(f, int)]
feature_unit_classes_strings = [f for f in feature_unit_classes if isinstance(f, str)]
if isinstance(feature_unit_classes, (np.ndarray, list)) and (not all(isinstance(f_c, int)
for f_c in feature_unit_classes_integers) or not all(f_c == 'no_unit' for f_c in feature_unit_classes_strings)):
raise TypeError("'feature_unit_classes' must consist of integers or the string 'no_unit', where each integer stands for the unit of a feature, i.e. 1:eV, 2:Angstrom. 'no_unit' is reserved for dimensionless unit.")
[docs] def check_control(self, par_in, par_ref, par_in_path):
""" Recursive Function to check input control dict tree.
If for example check_control(control,control_ref,'control')
function goes through dcit tree control and compares with control_ref
if correct keys (mandotory, not_mandotory, typos of key string) are set
and if values are of correct type or of optional list.
Furthermore it gives Errors with hints what is wrong, and what is needed.
Parameters
----------
par_in : any key
if par_in is dict, then recursion.
par_ref: any key
Is compared to par_in, if of same time.
If par_in and par_key are dict, alse keys are compared.
par_in_path: string
Gives the dict tree path where, when error occurs, e.g.
control[key_1][key_2]... For using function from outside
start with name of input dict, e.g. 'control'
"""
# check if value_in has correct type = value_ref_type
self.check_type(par_in, par_ref, par_in_path)
if isinstance(par_in, dict):
# check if correct keys are used
self.check_keys(par_in, par_ref, par_in_path)
for key_in, value_in in par_in.iteritems():
# get reference value like: dictionary[key_1][key_2] or here: par_ref[key_in]
# Needed because control_ref has special form.
value_ref = self.get_value_from_dic(par_ref, [key_in])
# recursion
self.check_control(value_in, value_ref, par_in_path + "['%s']" % key_in)
[docs] def get_type(self, value):
if isinstance(value, type):
return value
else:
return type(value)
[docs] def check_type(self, par_in, par_ref, par_in_path, if_also_none=False):
""" Check type of par_in and par_ref.
If par_ref is tuple, par_in must be item of par_ref:
else: they must have same type.
"""
# if par_ref is tuple, then only a few values are allowed. Thus just checked if
# par_in is in par_ref instead of checking type.
if isinstance(par_ref, tuple):
if not par_in in par_ref:
self.logger.error('%s must be in %s.' % (par_in_path, par_ref))
sys.exit(1)
# check if type(par_in) = type(par_ref)
else:
# get type of par_ref. type(par_ref) is not enough, since in control_ref
# strings,integers,dictionaries... AND types as <int>, <dict>, <str> are given.
ref_type = self.get_type(par_ref)
if not isinstance(par_in, ref_type):
if if_also_none and par_in is None:
pass
else:
self.logger.error('%s must be %s.' % (par_in_path, ref_type))
sys.exit(1)
[docs] def get_value_from_dic(self, dictionary, key_tree_path):
""" Returns value of the dict tree
Parameters
----------
dictionary: dict or 'dict tree' as control_ref
dict_tree is when key is tuple of keys and value is tuple of
corresponding values.
key_tree_path: list of keys
Must be in the correct order beginning from the top of the tree/dict.
# Examples
# --------
# >>> print get_value_from_dic[control_ref, ['local_run','SIS_code_path']]
# <type 'str'>
"""
value_ref = dictionary
for key in key_tree_path:
value_ref_keys = value_ref.keys()
if key in value_ref_keys:
value_ref = value_ref[key]
else:
tuples = [tup for tup in value_ref_keys if isinstance(tup, tuple)]
try:
select_tuple = [tup for tup in tuples if key in tup][0]
except BaseException:
raise KeyError
index = [i for i, key_tuple in enumerate(select_tuple) if key == key_tuple][0]
value_ref = value_ref[select_tuple][index]
return value_ref
[docs] def check_keys(self, par_in, par_ref, par_in_path):
""" Compares the dicts par_in and par_ref.
Collects which keys are missing (only if keys are not in not_mandotary) amd
whcih keys are not expected (if for example there is a typo).
If there are missing or not expected ones, error message with missing/not expected ones.
Parameters
----------
par_in : dict
par_ref : dict
par_in_path : string
Dictionary path string for error message, e.g 'control[key_1][key_2]'.
"""
keys_in, keys_ref = par_in.keys(), par_ref.keys()
# check if wrong keys are in keys_in
wrong_keys = [key for key in keys_in if not key in self.flatten(keys_ref)]
# check missing keys and if exactly one of optional keys is selected
missing_keys = []
for key in keys_ref:
if isinstance(key, tuple):
optional_in = [k for k in keys_in if k in key]
leng = len(optional_in)
if leng > 1:
self.logger.error("The following keys are set in %s: %s." % (par_in_path, optional_in))
self.logger.error("Please select only one of %s" % list(key))
sys.exit(1)
if leng == 0 and not key in not_mandotary:
missing_keys.append("--one of: (%s)" % (", ".join(["'%s'" % k for k in key])))
#missing_keys.append(('--one of:',)+key)
elif not key in keys_in and not key in not_mandotary:
missing_keys.append(key)
# error message if needed
len_wrong, len_missing = len(wrong_keys), len(missing_keys)
if len_wrong > 0 or len_missing > 0:
if len_wrong > 0:
self.logger.error("The following keys are not expected in %s: %s" % (par_in_path, wrong_keys))
if len_missing > 0:
self.logger.error("The following keys are missing in %s: %s" % (par_in_path, missing_keys))
sys.exit(1)
[docs] def check_OP_list(self, control):
""" Checks form and items of control['parameters']['OP_list'].
control['parameters']['OP_list'] must be a list of operations strings
or list of n_comb lists of operation strings. Furthermore if operation
strings are item of available_OPs (see above) is checked.
Parameters
----------
control : dict
Returns
-------
control : with manipulated control['parameters']['OP_list']
"""
OP_list = control['parameters']['opset']
n_comb = control['parameters']['rung']
# If just list of strings make list of n_comb lists
if all(isinstance(OPs, str) for OPs in OP_list):
# check if correct operations
self.check_OP_strings(OP_list)
OP_list = [OP_list for i in range(n_comb)]
control['parameters']['opset'] = OP_list
return control
# If list of lists/tuples check if n_comb lists/tuples
elif all(isinstance(OPs, (list, tuple)) for OPs in OP_list):
if not len(OP_list) == n_comb:
self.return_OP_error()
try:
# check if correct operations
self.check_OP_strings(self.flatten(OP_list))
control['parameters']['opset'] = OP_list
return control
except BaseException:
self.return_OP_error()
# False form
else:
self.return_OP_error()
[docs] def check_OP_strings(self, OPs):
""" Check if all items of OPs are items of available_OPs"""
if not all(op in available_OPs for op in OPs):
self.logger.error("Available operations: %s" % available_OPs)
sys.exit(1)
[docs] def return_OP_error(self):
""" Error message if control['parameters']['OP_list'] has wrong form """
self.logger.error("'OP_list' must consist of 'n_comb' tuples/lists of strings of operations.")
self.logger.error("The other option is that it contains only strings of operations.")
self.logger.error("Then for each iteration the same operations will be used.")
sys.exit(1)
[docs] def check_quali_dim(self, control):
""" Check if quali then also desc_dim=2 """
if control['parameters']['ptype'] == 'quali' and not control['parameters']['desc_dim'] == 2:
self.logger.error("At the moment, for ptype = quali only desc_dim = 2 allowed ")
sys.exit(1)
[docs] def check_(self, k):
self.key_to_maxcpu_dic = {"/home/keys/Q8E8RS2hj441kaFaLFHSY678g2rgF20f": 1, # hands-on-CS
"/home/keys/Kucn93hf1F0F38aypq5fD63n7XhDyOP0": 24, # sis-tutorial metal-nonmetal
"/home/keys/4Sofj9D3I1kc03E39k1fIPO9w9A03N5Z": 5, # sis-tutorial binaries
"/home/keys/Zn98Li73k39h5Bd0a12eq344ba3maye3": 5} # sis-tutorial topological insulators
self.kkey = k
self.n_cpu = 1
if k in self.key_to_maxcpu_dic:
max_cpu = self.key_to_maxcpu_dic[k]
k = os.path.join(self.local_path, "key.mpi")
key = base64.b64decode(for_me)
with open(k, 'w') as f:
f.write(key)
else:
max_cpu = 1
if not(not self.mpi_command or self.mpi_command.isspace()):
try:
idx_n_cpu, self.n_cpu = [(i, int(s)) for i, s in enumerate(self.mpi_command.split()) if s.isdigit()][-1]
if self.n_cpu > max_cpu:
self.n_cpu = max_cpu
if self.if_print:
self.logger.warning("For your pupose, the maximum allowed CPU number is %s." % max_cpu)
self.mpi_command = self.mpi_command.split()
self.mpi_command[idx_n_cpu] = str(self.n_cpu)
self.mpi_command = " ".join(self.mpi_command)
if self.if_print:
self.logger.info("The calculations are running on %s CPUs." % self.n_cpu)
except BaseException:
self.n_cpu = 1
self.mpi_command = ''
self.logger.warning("MPI command not known. The calculations are restricted to run on only one CPU.")
return k
# feature space estimation
[docs] def ncr(self, n, r):
""" Binomial coefficient"""
r = min(r, n - r)
if r == 0:
return 1
numer = reduce(opop.mul, xrange(n, n - r, -1))
denom = reduce(opop.mul, xrange(1, r + 1))
return numer // denom
[docs] def check_l0_steps(self, max_dim, n_sis, upper_limit=10000):
""" Check if number of l0 steps is larger then a upper_limit"""
l0_steps_list = [self.ncr(n_sis * dim, dim) for dim in range(1, max_dim + 1)]
l0_steps = sum(l0_steps_list)
self.l0_steps = l0_steps
if l0_steps > upper_limit * self.n_cpu:
logger.error(
"With the given settings in the l0-regularizaton %s combinations of features have to be considered." %
l0_steps)
logger.error(
"In this version the upper limit for ptype = '%s' is %s*n_CPUs. Choose a smaller" %
(self.parameters['ptype'], upper_limit))
logger.error("'Optimal descriptor maximum dimension' or 'Number of collected features per SIS iteration'")
sys.exit(1)
[docs] def get_next_size(self, n_features, ops):
new_features = 0
for op in ops:
if op in un_OP:
new_features += n_features
elif op in bin_OP:
new_features += n_features**2
else:
new_features += self.ncr(n_features, 2)
return new_features + n_features
[docs] def estimate_feature_space(self, n_comb, n_features, ops, rate=1., n_comb_start=0):
if isinstance(rate, (float, int)):
rate = [rate for i in range(n_comb)]
for i in range(n_comb_start, n_comb):
n_features = int(self.get_next_size(n_features, ops) * rate[i])
return int(n_features)
[docs] def check_feature_space_size(self, feature_list, n_target=5, upper_bound=300000000):
n_comb = deepcopy(self.parameters['rung'])
max_dim = deepcopy(self.parameters['desc_dim'])
n_sis = deepcopy(self.parameters['subs_sis'])
self.parameters['rung'] = 2
self.parameters['desc_dim'] = 1
self.parameters['subs_sis'] = 1
OP_list = self.parameters['opset']
P = np.random.random((n_target))
D = np.random.random((n_target, len(feature_list)))
# make sis calculation to obtain self.featurespace(rung=2) for feature_space estimation
self.start()
self.get_results()
feature_space_size_ncomb2 = self.featurespace
# set parameters back
self.parameters['rung'] = n_comb
self.parameters['desc_dim'] = max_dim
self.parameters['subs_sis'] = n_sis
estimate = self.estimate_feature_space(3, feature_space_size_ncomb2, OP_list, rate=0.12, n_comb_start=2)
self.predicted_feature_space_size = estimate
if estimate * max_dim > upper_bound * self.n_cpu:
digit_len = len(str(estimate)) - 1
logger.error(
"Estimated order of magnitude of feature space size: 10^%s - 10^%s" %
(digit_len, digit_len + 1))
logger.error("In this version the upper bound for n_features is given by:")
logger.error("%s > n_features*max_dim/n_CPUs" % (upper_bound))
logger.error("Hint: select less primary features, less operations or a smaller max_dim.")
logger.error("The registered user will be allowed soon to use larger feature spaces.")
sys.exit(1)
[docs] def estimate_calculation_expense(self, feature_list):
""" Check the expense of the SIS+l0 calculations"""
n_target = 12
P = np.random.random((n_target))
D = np.random.random((n_target, len(feature_list)))
max_dim = self.parameters['desc_dim']
n_sis = self.parameters['subs_sis']
n_comb = self.parameters['rung']
# check l0 steps
if self.parameters['ptype'] == 'quanti':
self.check_l0_steps(max_dim, n_sis, upper_limit=1100000)
else:
u_l = 180000
if self.kkey in "/home/keys/Zn98Li73k39h5Bd0a12eq344ba3maye3": # topological insulator
u_l /= 5
elif self.kkey in "/home/keys/Kucn93hf1F0F38aypq5fD63n7XhDyOP0": # metal-nonmetal
u_l = 1150000
self.check_l0_steps(max_dim, n_sis, upper_limit=u_l)
# check feature spcae
if n_comb == 3:
if self.kkey in "/home/keys/Zn98Li73k39h5Bd0a12eq344ba3maye3": # topological insulator
logger.error(
"A 'number of iterations for the construction for the feature space' > 2 is not allowed for this tutorial.")
sys.exit()
u_l = 4460000
if self.kkey in "/home/keys/Kucn93hf1F0F38aypq5fD63n7XhDyOP0":
u_l = 4460000 * 2
self.check_feature_space_size(feature_list, n_target=n_target, upper_bound=u_l)
elif n_comb > 3:
logger.error("A 'number of iterations for the construction for the feature space' >3 is not allowed.")
sys.exit(1)
# END checking functions
[docs] def do_transfer(self, ssh=None, eos=None, username=None, CPUs=None):
""" Run the calcualtion on remote machine
First checks if already folder self.remote_input_path exists on remote machine,
if yes it deletes or renames it.
Then copies file system self.SIS_input_path with SIS fortran code files into the
folder self.remote_input_path. Finally lets run the calculations on remote machine
and copy back the file system with results.
If eos, writes submission script, submits script and checks qstat if calculation
finished.
Parameters
----------
ssh : object
Must be from code nomad_sim.ssh_code.
eos : bool
If remote machine is eos. To write submission script and submit ...
username: string
needed to check qstat on eos
CPUs : int
To reserve the write number of CPUs in the eos submission script
"""
# check if remote_input_path exists and if yes rename it to remote_input_path_old_#
if self.ssh.isdir(self.remote_input_path):
self.logger.warning('Directory %s on remote machine already exists.' % self.remote_input_path)
if self.rm_existing_files:
ssh.rm(self.remote_input_path)
self.logger.warning('It is removed.')
else:
for i in range(1000):
old_name = "%s_old_%s" % (self.remote_input_path, i)
if not self.ssh.isdir(old_name):
self.ssh.rename(self.remote_input_path, old_name)
break
self.logger.warning('It is renamed to %s.' % old_name)
if eos:
self.write_submission_script(CPUs)
# copy self.SIS_input_path INto self.remote_path
ssh.put_all(self.SIS_input_path, self.remote_path)
rmtree(self.SIS_input_path)
if eos:
seconds = 1
# submit job called go.sge
ssh.command("cd %s; qsub go.sge" % self.remote_input_path)
self.SCHEDule = sched.scheduler(time.time, time.sleep)
# check each seconds if is job is finished
self.SCHEDule.enter(seconds, 1, self.ask_periodically, (self.SCHEDule, seconds, 0, username))
self.SCHEDule.run()
else:
# execute SIS_code on remote machine
# exporting path is needed, since code FCDI calls the codes FC and DI by just 'FC' and 'DI'.
ssh.command('export PATH=$PATH:%s; cd %s; %s' %
(self.SIS_code_path, self.remote_input_path, self.SIS_code_FCDI))
# copy back file system with results
ssh.get_all(self.remote_input_path, self.local_path)
ssh.rm(self.remote_input_path)
# close ssh connection
if self.if_close_ssh:
ssh.close()
[docs] def check_status(self, filename, username):
""" Check if calculation on eos is finished
Parameters
filename: str
qstat will be written into this file. The file will be then read.
username: str
search in filename for this username. If not appears calculation is finished.
Returns
-------
status : bool
True if calculations is still running.
"""
# write qstat into filenmae
self.ssh.command("qstat -u %s > %s" % (username, filename))
status = False
# read filename
lines = self.ssh.open_file(filename).readlines()
for line in lines:
split = line.split()
if len(split) > 3:
# if job name SIS_tutori (only 10 char) and username appears
if split[2] == 'SIS_tutori' and split[3] == username:
status = True
return status
[docs] def ask_periodically(self, sc, seconds, counter, username):
""" Recursive function that runs periodically (each seconds) the
function self.check_status.
"""
counter += 1
filename = os.path.join(self.remote_input_path, 'status.dat')
if counter > 1000:
return 1
if not self.check_status(filename, username):
return 0
self.SCHEDule.enter(seconds, 1, self.ask_periodically, (sc, seconds, counter, username))
[docs] def write_submission_script(self, CPUs):
""" writes eos job submission script. """
strings = [
"#$ -S /bin/bash",
"#$ -j n",
"#$ -N SIS_tutorial", # jobname
"#$ -cwd",
"#$ -m n",
"#$ -pe impi_hydra %s" % CPUs, # CPUs= nodes*32!
"#$ -l h_rt=00:01:00", # time reservation for job
"%s" % SIS_code_FCDI
]
# write submission file "go.sge"
submission_file = open(os.path.join(self.SIS_input_path, 'go.sge'), 'w')
for s in strings:
submission_file.write("%s\n" % s)
submission_file.close()
[docs] def check_feature_units(self, feature_unit_classes):
""" Check feature units
Checks which
Parameters
----------
feature_unit_classes : list integers
list must be sorted.
Returns
-------
unit_strings : list of strings
In the form ['(1:3)','(4:8)',..], where the indices start from 1,
"""
index = np.unique(feature_unit_classes, return_index=True)[1]
class_names = [feature_unit_classes[i] for i in np.sort(index)]
unit_strings = []
col = 0
for i, cl in enumerate(class_names):
length = len([None for p in feature_unit_classes if p == cl])
if cl != 'no_unit':
unit_strings.append("(%s:%s)" % (col + 1, col + length))
col += length
return unit_strings
[docs] def convert_feature_strings(self, feature_list):
""" Convert feature strings.
Puts an 'sr' for reals and an 'si' for integers at the beginning of a string.
Returns the list with the changed strings.
"""
converted = []
for f in feature_list:
if f in reals:
which = 'r'
elif f in ints:
which = 'i'
else:
self.logger.error("Developer error: %s not found in the list reals or ints." % f)
sys.exit(1)
f = standard_2_converted[f]
converted.append('s%s_%s' % (which, f))
return converted
[docs] def write_parameters(self):
""" Write parameters into the SIS fortran code input files. Convert the parameters into
the special format before."""
filename = 'FCDI.in'
input_file = open(os.path.join(self.SIS_input_path, filename), 'w')
# loop in correct order as in Param_key_list could be essential. So better no iteritems()
for key in Param_key_list:
value = self.parameters[key]
value = self.convert_2_fortran(key, value)
input_file.write("%s=%s\n" % (key, value))
input_file.close()
[docs] def convert_2_fortran(self, parameter, parameter_value):
""" Convert parameters to SIS fortran code style.
Converts e.g. True to string '.true.' or a string 's' to
"'s'", and other special formats.
Returns the converted parameter.
"""
if parameter == 'opset':
return self.get_OPs(parameter_value)
elif parameter == 'dimclass':
return "".join(parameter_value)
elif isinstance(parameter_value, bool):
if parameter_value == True:
return '.true.'
else:
return '.false.'
elif isinstance(parameter_value, str):
return "'%s'" % parameter_value
elif isinstance(parameter_value, tuple) and len(parameter_value) == 1:
return "(%s)" % parameter_value[0]
else:
return parameter_value
[docs] def get_OPs(self, OP_list):
""" Conver OP_list to special format for SIS fortran input."""
list_of_strings = []
for OPs in OP_list:
# convert OP_list: in example ['+', '-', '/', '^2', 'exp'] to '(+)(-)(/)(^2)(exp)'
OP_string = ""
for op in OPs:
OP_string += '(%s)' % op
list_of_strings.append("'%s'" % OP_string)
# make string of OP_string listed ncomb times e.g. "'(+)(-)(/)(^2)(exp)','(+)(-)(/)(^2)(exp)',..."
converted = ",".join(list_of_strings)
return converted
[docs] def flatten(self, list_in):
""" Returns the list_in collapsed into a one dimensional list
Parameters
----------
list_in : list/tuple of lists/tuples of ...
"""
list_out = []
for item in list_in:
if isinstance(item, (list, tuple)):
list_out.extend(self.flatten(item))
else:
list_out.append(item)
return list_out
[docs] def write_P_D(self, P, D, feature_list):
""" Writes 'train.dat' as SIS fortran code input with P, D and feature strings"""
#converted_features = self.convert_feature_strings(feature_list)
converted_features = feature_list
P = np.array(P)
P_shape = P.shape
if self.parameters['ptype'] == 'quanti':
if len(P_shape) > 1 and not P_shape[1] == 1:
first_line = ['#'] + ['target_%s' % (t + 1) for t in range(P_shape[1])]
else:
first_line = ['#', 'target']
P = np.transpose(np.vstack((['xxx' for i in range(len(P))], P)))
else:
entries_of_P = len(P)
P = P.reshape([entries_of_P, 1])
first_line = ['#']
first_line.extend(converted_features)
Out = np.hstack((P, D))
Out = np.vstack((first_line, Out))
np.savetxt(os.path.join(self.SIS_input_path, "train.dat"), Out, fmt='%s', delimiter=" ")
[docs] def get_des(self, x):
""" Change the descriptor strings read from the output DI.out.
Remove characters as ':' 'si', 'sr'. Then convert feature strings for printing"""
index = [n_i for n_i, i in enumerate(x) if i == ':'][0]
x = x[index + 2:-1]
x = list(x)
remove_index = []
for n_i, i in enumerate(x):
if i == 's':
if x[n_i + 1] in ['r', 'i']:
if x[n_i + 2] == '_':
remove_index.extend(range(n_i, n_i + 3))
x = [s for i, s in enumerate(x) if not i in remove_index]
if x[0] == '(' and x[-1] == ')':
x = x[1:-1]
new_string = "".join(x)
return new_string
[docs] def check_FC(self, file_path):
""" Check FC.out, if calculation has finished and feature space_sizes.
Returns
-------
calc_finished : bool
If calculation finished there shoul be a 'Have a nice day !'.
featurespace : integer
Total feature space size generated, before the redundant check.
n_collected : integer
The number of features collected in the current iteration.
Should be n_sis.
"""
lines = open(file_path, 'r').readlines()
featurespace = None
n_collected = None
calc_finished = False
feature_space_list = []
for line in lines:
if line.rfind('Total Featurespace:') > -1:
feature_space_list.append(line.split()[2])
if line.rfind('Have a nice day !') > -1:
calc_finished = True
if line.rfind('Final feature space size:') > -1:
n_collected = int(line.split()[4])
return calc_finished, feature_space_list, n_collected
[docs] def check_DI(self, file_path):
""" Check DI.out, if calculation has finished. """
lines = open(file_path, 'r').readlines()
calc_finished = False
for line in lines:
if line.rfind('Have a nice day !') > -1:
calc_finished = True
return calc_finished
[docs] def check_files(self, iter_folder_name, dimension):
""" Check which file is missing and maybe why.
This function, if something went wrong to find out where the problem occured.
Returns an error string.
"""
iter_path = os.path.join(self.SIS_input_path, iter_folder_name)
DI_path = os.path.join(iter_path, 'DI.out')
FC_path = os.path.join(iter_path, 'FC.out')
if_iter = os.path.isdir(iter_path)
if_FC = os.path.isfile(FC_path)
if_DI = os.path.isfile(DI_path)
n_sis = self.parameters['subs_sis']
sub_space_size = dimension * n_sis
if if_iter:
if if_FC:
calc_finished, feature_space, n_collected = self.check_FC(FC_path)
if not calc_finished:
return 'FC.out not finished'
if feature_space is None:
return "'Total Featurespace' not found"
else:
return 'FC.out not found'
if n_collected < n_sis:
return 'No %sD descriptor!\nThe number of collected feateres in iteration %s is %s. Probably the total feature space size is not large enough. Collect less features per iteration.\nTotal feature space size before redundant check: %s\n Target total number of collected features: %s\nAfter eliminating redundant features the total feature space becomes smaller.' % (
dimension, dimension, n_collected, feature_space, sub_space_size)
if if_DI:
calc_finished = self.check_DI(DI_path)
if not calc_finished:
return 'DI.out not finished'
else:
return 'DI.out not found'
return 'Unknown error'
else:
return '%s not found' % iter_folder_name
[docs] def read_results(self, iter_folder_name, dimension, task, tsizer):
""" Read results from DI.out.
parameters
----------
iter_folder : string
Name of the iter_folder the outputs of the corresponding iteration of SIS+l1/l1l0,
e.g. 'iter01', 'iter02'.
dimension : integer
DI.out provides for example in iteration three 1-3 dimensionl descriptors.
Here choose which dimension should be returned.
task : integer < 100
For multi task, must be worked on.
tsizer : integer
Number of samples, e.g. number ofrows of D or P.
Returns
-------
RMSE : float
Root means squares error of model
Des : list of strings
List of the descriptors
coef : array [model_dim+1]
Coefficients including the intercept
D : array [n_sample, model_dim+1]
Matrix with columns being the selected features (descriptors) for the model.
The last column is full of ones corresponding to the intercept
"""
iter_path = os.path.join(self.SIS_input_path, iter_folder_name)
DI_path = os.path.join(iter_path, 'DI.out')
if task > 9:
s_task = '0%s' % task
else:
s_task = '00%s' % task
desc_path = os.path.join(iter_path, 'desc_dat', 'desc%s_%s.dat' % (dimension, s_task))
count_dim = 0
lines = open(DI_path, 'r').readlines()
for line in lines:
if line.rfind('@@@descriptor') > -1:
count_dim += 1
if count_dim == dimension:
des = line.split()[1:]
Des = [self.get_des(x) for x in des] # convert strings
if count_dim == dimension:
if line.rfind('coefficients_') > -1:
coef = np.array([float(i) for i in line.split()[1:]])
if line.rfind('Intercept_') > -1:
inter = float(line.split()[1])
coef = np.append(coef, inter)
if line.rfind('LSrmse') > -1:
RMSE = float(line.split()[1])
D = np.empty([tsizer, dimension])
lines = open(desc_path, 'r').readlines()
for i, line in enumerate(lines):
if i > 0:
for j, val in enumerate(line.split()[3:]):
D[i - 1, j] = val
D = np.column_stack((D, np.ones(tsizer)))
return RMSE, Des, coef, D
[docs] def get_indices_of_top_descriptors(self):
try:
filename = [f for f in os.listdir(self.iter_path,) if f[-2:] == '2D' and f[:3] == 'top'][0]
except BaseException:
self.logger.error("Calculation Aborted.")
self.logger.error("The Number of collected features in the SIS step might have exceeded")
self.logger.error("the number of features in the created feature space.")
self.logger.error("Hint: Try a smaller 'Number of collected features per SIS iteration'")
self.logger.error("Hint: or increase the feature space size.")
sys.exit()
#filename = "top%04d_02D" % n_out
filename = os.path.join(self.iter_path, filename)
top_dat = open(filename, 'r').readlines()
Ind = []
Overlaps = []
old_n_overlap, old_overlap_area = None, None
for l, line in enumerate(top_dat):
if l > 0:
n_overlap, overlap_area = int(line.split()[1]), float(line.split()[2])
if old_n_overlap in [n_overlap, None] and old_overlap_area in [overlap_area, None]:
indices = [int(idx) - 1 for idx in line.split()[-2:]]
Ind.append(indices)
Overlaps.append(n_overlap)
old_n_overlap, old_overlap_area = n_overlap, overlap_area
else:
break
return Overlaps, Ind
[docs] def manipulate_descriptor_string(self, d):
if d[0] == '(' and d[-1] == ')':
return d[1:-1]
else:
return d
[docs] def get_strings_of_top_descriptors(self, top_indices):
filename = os.path.join(self.iter_path, "task.fname")
lines = open(filename, 'r').readlines()
descriptors = [line.split()[0] for line in lines]
# importan to return [1:-1] to remove brackets in string
return [[self.manipulate_descriptor_string(descriptors[i]) for i in indices] for indices in top_indices]
[docs] def get_arrays_of_top_descriptors(self, top_indices):
n_models = len(top_indices)
top_indices = np.array(top_indices)
filename = os.path.join(self.iter_path, 'task001.dat')
lines = open(filename, 'r').readlines()
Ds = []
for line in lines:
ls = line.split()
Ds.append([float(ls[i]) for i in top_indices.flatten()])
Ds = np.array(Ds)
return [Ds[:, [2 * i, 2 * i + 1]] for i in range(n_models)]
[docs] def read_results_quali(self):
""" Read results for 2D desriptor from calculations with qualitative run.
Returns
-------
results: list of lists
Each sublist characterizes separate model (if multiple model have same score/cost
all of them are returned). Sublist contains [descriptor_strings, D, n_overlap]
where D (D.shape = (n_smaple,2)) is array with descriptor vectors.
"""
self.iter_path = os.path.join(self.SIS_input_path, "iter02")
Overlaps, Top_indices = self.get_indices_of_top_descriptors()
Top_strings = self.get_strings_of_top_descriptors(Top_indices)
Top_Ds = self.get_arrays_of_top_descriptors(Top_indices)
return [[Top_strings[i], Top_Ds[i], Overlaps[i]] for i in range(len(Top_indices))]
[docs] def string_descriptor(self, RMSE, features, coefficients, target_unit):
""" Make string for output in the terminal with model and its RMSE."""
dimension = len(features)
string = '%sD descriptor:\nRoot Mean Squared Error (RMSE): %s %s\nModel: \n' % (dimension, RMSE, target_unit)
for i in range(dimension + 1):
if coefficients[i] > 0:
sign = '+'
c = coefficients[i]
else:
sign = '-'
c = abs(coefficients[i])
if i < dimension:
string += '%s %.5f %s\n' % (sign, c, features[i])
else:
string += '%s %.5f\n' % (sign, c)
return string
[docs] def get_results(self, ith_descriptor=0):
""" Attribute to get results from the file system.
Parameters
-------
ith_descriptor: int
Return the ith best descriptor.
Returns
-------
out : list [max_dim] of dicts {'D', 'coefficients', 'P_pred'}
out[model_dim-1]['D'] : pandas data frame [n_sample, model_dim+1]
Descriptor matrix with the columns being algebraic combinations of the input feature matrix.
Column names are thus strings of the algebraic combinations of strings of inout feature_list.
Last column is full of ones corresponding to the intercept
out[model_dim-1]['coefficients'] : array [model_dim+1]
Optimizing coefficients.
out[model_dim-1]['P_pred'] : array [m_sample]
Fit : np.dot( np.array(D) , coefficients)
"""
max_dim = self.parameters['desc_dim']
Results_list = []
tsizer = len(self.flatten(self.P))
if self.parameters['ptype'] == 'quanti':
for dimension in range(1, max_dim + 1):
if dimension < 10:
iter_folder_name = 'iter0%s' % (dimension)
else:
iter_folder_name = 'iter%s' % (dimension)
try:
results = self.read_results(iter_folder_name, dimension, 1, tsizer)
Results_list.append(results)
if dimension == 1:
iter_path = os.path.join(self.SIS_input_path, iter_folder_name)
FC_path = os.path.join(iter_path, 'FC.out')
# feature space size
feature_space_list = self.check_FC(FC_path)[1]
try:
self.featurespace = int(feature_space_list[-1])
featurespace = int(self.featurespace * 0.5)
except BaseException:
if self.parameters['rung'] == 3:
featurespace = int(feature_space_list[-2])
self.featurespace = self.estimate_feature_space(
3, featurespace, self.parameters['opset'], rate=0.12, n_comb_start=2)
featurespace = int(self.featurespace)
else:
self.logger.error("Developper error: feature space estimation and rung conflict!")
self.exit(1)
if self.if_print:
digit_len = len(str(featurespace)) - 1
self.logger.info(
"Estimated order of magnitude of feature space size: 10^%s - 10^%s" %
(digit_len, digit_len + 1))
except Exception as e:
message = self.check_files(iter_folder_name, dimension)
if dimension > 2:
self.logger.warning(message)
break
else:
self.logger.error(message)
self.logger.error("## See below the Error message:")
self.logger.error(e)
sys.exit(1)
out = []
# print results, make pandas DataFrames and calulate predicted/fitted values
for RMSE, features_selected, coefficients, D_model in Results_list:
if self.if_print:
string = self.string_descriptor(RMSE, features_selected, coefficients, self.target_unit)
self.logger.info(string)
# predicted/fitted values of the model
fit = np.dot(D_model, coefficients)
# D_model and selected features as pandas DataFrames
features_selected.append('intercept')
D_df = pd.DataFrame(D_model, columns=features_selected)
out.append({'D': D_df, 'coefficients': coefficients, 'P_pred': fit})
rmtree(self.SIS_input_path)
return out
else: # 'quali'. Only for specific case of 2D
dimension = 2
iter_folder_name = 'iter0%s' % (dimension)
try:
iter_path = os.path.join(self.SIS_input_path, iter_folder_name)
FC_path = os.path.join(self.SIS_input_path, 'iter01', 'FC.out')
# feature space size
feature_space_list = self.check_FC(FC_path)[1]
try:
self.featurespace = int(feature_space_list[-1])
if self.parameters['rung'] == 3:
featurespace = int(self.featurespace * 0.5)
else:
featurespace = int(self.featurespace)
except BaseException:
if self.parameters['rung'] == 3:
featurespace = int(feature_space_list[-2])
self.featurespace = self.estimate_feature_space(
3, featurespace, self.parameters['opset'], rate=0.12, n_comb_start=2)
featurespace = int(self.featurespace)
else:
self.logger.error("Developper error: feature space estimation and rung conflict!")
self.exit(1)
digit_len = len(str(featurespace)) - 1
first_digit = str(round(featurespace, -digit_len))[0]
feature_space_message = "Size of feature space: %s*10^%s" % (first_digit, digit_len)
# get results
results_list = None
if not self.checking_expense:
results_list_v1 = self.read_results_quali()
rmtree(self.SIS_input_path)
n_results = len(results_list_v1)
# get real overlap with width=0
self.parameters['rung'] = 0
self.parameters['subs_sis'] = 1
self.parameters['width'] = 0.0
self.parameters['ndimtype'] = 2
self.parameters['dimclass='] = ['(1:1)', '(2:2)']
self.parameters['nsf'] = 2
self.parameters['mpiname'] = ''
#self.if_print = False
try:
Des, D_selected, overlap = results_list_v1[ith_descriptor]
except BaseException:
Des, D_selected, overlap = results_list_v1[-1]
self.D = D_selected
self.feature_list = Des
self.feature_unit_classes = [1, 2]
self.if_close_ssh = True
self.start()
final_result = self.read_results_quali()[0]
rmtree(self.SIS_input_path)
try:
rmtree(self.SIS_input_path)
except BaseException:
pass
if self.if_print:
self.logger.info("SISSO CALCULATION FINISHED")
self.logger.info(feature_space_message)
return final_result
except Exception as e:
self.logger.error(e)
sys.exit(1)