Source code for optking.optimize

""" Provides some of the high level functions and classes to run optimizations. This is a good starting place for anyone
looking to add features to the code to familarize themselves with the overall workings of optking.
Functions may be useful to users seeking greater control over the inner workings of optking than provided by the
OptHelpers. For instance if manually creating a molecular system or manually controlling / switching algorithms
on the fly.

See also `OptimizationAlgorithm <stepalgorithms.OptimizationAlgorithm>` and
`OptimizationInterface <stepalgorithms.OptimizationInterface>`for core functionality common to all the optimization
algorithms like backtransformation and displacement.

"""

import logging
from typing import Union

import numpy as np
from optking.compute_wrappers import ComputeWrapper
from optking.molsys import Molsys

from . import IRCfollowing, addIntcos, hessian, history, intcosMisc
from . import optparams as op
from . import stepAlgorithms
from . import testB, linesearch
from .exceptions import AlgError, OptError
from .printTools import print_array_string, print_geom_grad, print_mat_string
from . import log_name

logger = logging.getLogger(f"{log_name}{__name__}")


[docs] def optimize(o_molsys, computer): """Driver for OptKing's optimization procedure. Suggested that users use EngineHelper, CustomHelper, or one of the external interfaces (psi4 and qcengine) to perform a normal (full) optimization Parameters ---------- o_molsys : cls optking molecular system computer : compute_wrappers.ComputeWrapper Returns ------- float, float or dict energy and nuclear repulsion energy or MolSSI qc_schema_output as dict """ logger = logging.getLogger(__name__) H = 0 # hessian in internals opt_history = history.History(op.Params) # Try to optimize one structure OR set of IRC points. OptError and all Exceptions caught below. try: converged = False if not o_molsys.intcos_present: make_internal_coords(o_molsys, op.Params) logger.debug("Molecular system after make_internal_coords:") logger.debug(str(o_molsys)) opt_object = OptimizationManager(o_molsys, opt_history, op.Params, computer) logger.info("\tStarting optimization algorithm.\n") while converged is not True: try: H, fq, energy = opt_object.start_step(H) dq = opt_object.take_step(fq, H, energy, return_str=False) converged = opt_object.converged(energy, fq, dq) opt_object.check_maxiter() # raise error otherwise continue except AlgError as AF: opt_object.alg_error_handler(AF) qc_output = opt_object.finish(error=None) return qc_output except OptError as error: logger.error(error) return opt_object.opt_error_handler(error) except Exception as error: logger.error(error) return opt_object.unknown_error_handler(error)
[docs] class OptimizationManager(stepAlgorithms.OptimizationInterface): """Recommended use of Optking's Optimization Algorithms is to create this class and then loop over take_step. OptimizationFactory will either return the appropriate OptimizationAlgorithm or return itself if management of multiple algorithms is required (linesearching). Currently only 1 linesearch method is implemented so changing linesearch_method to anything that does not evaluate to None will turn linesearch on. This class' primary purpose is to abstract the interface for an OptimizationAlgorithm, Linesearch, or IRC so no special handling is needed for an IRC optimization as opposed to a NR optimization. # TODO add dynamic_level management here """ _LINESEARCHES = {"ENERGY": linesearch.ThreePointEnergy} def __init__(self, molsys: Molsys, history_object: history.History, params: op.OptParams, computer: ComputeWrapper): super().__init__(molsys, history_object, params) self.direction: Union[np.ndarray, None] = None method = "IRC" if params.opt_type == "IRC" else params.step_type self.opt_method = optimization_factory(method, molsys, self.history, params) self.step_number = 0 self.computer = computer self.linesearch_method = None self.stashed_hessian = None if params.linesearch: self.linesearch_method = OptimizationManager._LINESEARCHES["ENERGY"](molsys, self.history, params) self.opt_method.trust_radius_on = False self.requires = self.update_requirements() self.current_requirements = self.update_requirements() self.params = params self.erase_hessian = False self.check_linesearch = True self.error = None
[docs] def to_dict(self): """Convert attributes to serializable form.""" d = { "direction": self.direction, "step_number": self.step_number, "stashed_hessian": self.stashed_hessian, "requires": self.requires, "current_requirements": self.current_requirements, "erase_hessian": self.erase_hessian, "check_linesearch": self.check_linesearch, "error": self.error, } if self.linesearch_method: d["linesearch_method"] = self.linesearch_method.to_dict() if self.params.opt_type == "IRC": d["irc_object"] = self.opt_method.to_dict() return d
[docs] @classmethod def from_dict(cls, d, molsys, history, params, computer): """Reload attributes from the provided dictionary. Create all necessary classes. To prevent duplication, OptHelper handles converting the molsys, history, params, and computer to/from dict """ manager = cls(molsys, history, params, computer) manager.direction = d["direction"] method = "IRC" if params.opt_type == "IRC" else params.step_type manager.step_number = d["step_number"] manager.stashed_hessian = d["stashed_hessian"] manager.requires = d["requires"] manager.current_requirements = d["current_requirements"] manager.erase_hessian = d["erase_hessian"] manager.check_linesearch = d["check_linesearch"] manager.error = d["error"] if params.opt_type == "IRC": manager.opt_method = IRCfollowing.IntrinsicReactionCoordinate.from_dict( d["irc_object"], molsys, history, params ) else: manager.opt_method = optimization_factory( method, molsys, history, params ) # Can just recreate with current history and params if d.get("linesearch_method"): manager.linesearch_method = OptimizationManager._LINESEARCHES["ENERGY"].from_dict( d["linesearch_method"], molsys, history, params ) return manager
[docs] def start_step(self, H: np.ndarray): """Initialize coordinates. Compute needed properties. Print molecular system and property information Returns ------- H: np.ndarray 2D. Hessian in appropriate coordinates f_q: np.ndarray 1D. forces in appropriate coordinates E: float energy """ if self.molsys.natom == 1: logger.info("There is only 1 atom. Nothing to optimize. Computing energy.") E = get_pes_info(np.ndarray(0), self.computer, self.molsys, self.history, self.params, "unneeded", ["energy"])[2] raise OptError("There is only 1 atom. Nothing to optimize. Computing energy.","OptError") # if optimization coordinates are absent, choose them. Could be erased after AlgError if not self.molsys.intcos_present: make_internal_coords(self.molsys, self.params) self.step_number += 1 header = f"{'----------------------------':^74}" header += f"\n{'Taking A Step: Step Number %d' % self.step_number:^90}" header += f"\n{'----------------------------':^90}" logger.info(header) requirements = self.opt_method.requires() protocol = self.get_hessian_protocol() H, g_q, g_x, E = get_pes_info(H, self.computer, self.molsys, self.history, self.params, protocol, requirements) logger.info("%s", print_geom_grad(self.molsys.geom, g_x)) self.molsys.q_show() if self.params.test_B: testB.test_b(self.molsys) if self.params.test_derivative_B: testB.test_derivative_b(self.molsys) logger.info(print_array_string(-g_q, title="Internal forces in au:")) return H, -g_q, E
[docs] def take_step(self, fq=None, H=None, energy=None, return_str=False, **kwargs): """Take whatever step (normal, linesearch, IRC, constrained IRC) is next. fq: Union[np.ndarray, None] forces H: Union[np.ndarray, None] hessian energy: Union[np.ndarray, None] return_str: bool if True return string with information about step (information is logged regardless) """ self.current_requirements = self.update_requirements() if not self.params.linesearch: achieved_dq, returned_str = self.opt_method.take_step(fq, H, energy, return_str=True) else: if self.direction is None: self.direction = self.opt_method.step(fq, H, energy) self.linesearch_method.start(self.direction) if self.check_linesearch: self.history.append(self.molsys.geom, energy, fq, self.molsys.gradient_to_cartesians(-1 * fq)) ls_energy = self.linesearch_method.expected_energy dq_norm, unit_dq, grad, hess = self.opt_method.step_metrics(self.direction, fq, H) self.history.append_record(ls_energy, self.direction, unit_dq, grad, hess) self.stashed_hessian = H self.check_linesearch = False achieved_dq, returned_str = self.linesearch_method.take_step(fq, H, energy, return_str=True) if self.linesearch_method.minimized: logger.info("Linesearch complete. Next step will compute a new direction") # cleanup self.linesearch_method.reset() self.direction = None self.check_linesearch = True self.requires = self.update_requirements() if return_str: return achieved_dq, returned_str return achieved_dq
[docs] def update_requirements(self): """Get the current requirements for the next step. Notes ----- If linesearching requirements can change. Always safe to provide a gradient regardless.""" if self.direction is None: return self.opt_method.requires() else: return self.linesearch_method.requires()
[docs] def converged(self, E, fq, dq, step_number=None, str_mode=None): """Test whether the optimization has finished. An optimization can only be declared converged If a gradient has been provided (linesearching cannot terminate an optimization)""" if step_number is None: step_number = self.step_number converged = False if not self.linesearch_method or self.check_linesearch: converged = self.opt_method.converged(dq, fq, step_number, str_mode=str_mode) if str_mode: return converged if converged is True: logger.info("\tConverged in %d steps!" % step_number) logger.info("\tFinal energy is %20.13f" % E) logger.info("\tFinal structure (Angstroms): \n" + self.molsys.show_geom()) return converged
[docs] def check_maxiter(self): """Check iterations < geom_maxiter. For IRC's check `total_steps_taken`.""" if self.params.opt_type == "IRC": iterations = self.opt_method.total_steps_taken else: iterations = self.step_number # Hard quit if too many total steps taken (inc. all IRC points and algorithms). if iterations >= self.params.geom_maxiter: logger.error( "\tTotal number of steps (%d) exceeds maximum allowed (%d).\n" % (iterations, self.params.geom_maxiter) ) raise OptError( "Maximum number of steps exceeded: {}.".format(self.params.geom_maxiter), "OptError", )
[docs] def get_hessian_protocol(self): """Determine action to take for how to compute a hessian. Handles alternate IRC behavior Returns ------- str: one of ('compute', 'update', 'guess', 'unneeded') """ if "hessian" not in self.opt_method.requires(): return "unneeded" if self.erase_hessian is True: self.erase_hessian = False return "compute" if self.params.full_hess_every > 0 else "guess" if self.params.cart_hess_read: return "compute" if self.step_number <= 1: if self.params.opt_type != "IRC": if self.params.full_hess_every > -1: # compute hessian at least once. protocol = "compute" else: protocol = "guess" else: # IRC protocol = "compute" else: if self.params.full_hess_every < 1: protocol = "update" elif (self.step_number - 1) % self.params.full_hess_every == 0: protocol = "compute" else: protocol = "update" return protocol
[docs] def clear(self): """Reset history (inculding all steps) and molecule""" self.history.steps = [] self.molsys.intcos = [] self.step_number = 0 self.history.steps_since_last_hessian = 0 self.history.consecutive_backsteps = 0
[docs] def alg_error_handler(self, error): """consumes an AlgError. Takes appropriate action""" logger.error(" Caught AlgError exception\n") eraseIntcos = False if error.linearBends: # New linear bends detected; Add them, and continue at current level. # from . import bend # import not currently being used according to IDE for l in error.linearBends: if l.bend_type == "LINEAR": # no need to repeat this code for "COMPLEMENT" iF = addIntcos.check_fragment(l.atoms, self.molsys) F = self.molsys.fragments[iF] intcosMisc.remove_old_now_linear_bend(l.atoms, F.intcos) F.add_intcos_from_connectivity() eraseHistory = True elif self.params.dynamic_level == self.params.dynamic_level_max: logger.critical("\n\t Current algorithm/dynamic_level is %d.\n" % self.params.dynamic_level) logger.critical("\n\t Alternative approaches are not available or turned on.\n") raise OptError("Maximum dynamic_level reached.") else: self.params.dynamic_level += 1 logger.warning("\n\t Increasing dynamic_level algorithm to %d.\n" % self.params.dynamic_level) logger.warning("\n\t Erasing old history, hessian, intcos.\n") eraseIntcos = True eraseHistory = True self.params.update_dynamic_level_params(self.params.dynamic_level) logger.info("Printing the parameters %s", self.params) if eraseIntcos: logger.warning(" Erasing coordinates.\n") for f in self.molsys.fragments: del f.intcos[:] self.molsys._dimer_intcos = [] if eraseHistory: logger.warning(" Erasing history.\n") self.clear() self.erase_hessian = True self.error = "AlgError"
[docs] def opt_error_handler(self, error): """OptError indicates an unrecoverable error. Print information and trigger cleanup.""" logger.critical("\tA critical optimization-specific error has occured.") logger.critical("\tResetting all optimization options for potential queued jobs.\n") logger.exception("Error caught:" + str(error)) # Dump histories if possible return self._exception_cleanup(error)
[docs] def unknown_error_handler(self, error): """Unknown errors are not recoverable error. Print information and trigger cleanup.""" logger.critical("\tA non-optimization-specific error has occurred.\n") logger.critical("\tResetting all optimization options for potential queued jobs.\n") logger.exception("Error Type: " + str(type(error))) logger.exception("Error caught:" + str(error)) return self._exception_cleanup(error)
def _exception_cleanup(self, error): logger.info("\tDumping history: Warning last point not converged.\n" + self.history.summary_string()) if self.params.opt_type == "IRC": logging.debug("\tDumping IRC points completed") self.opt_method.irc_history.progress_report() self.error = "OptError" return self.finish(error)
[docs] def finish(self, error=None): rxnpath = None if self.params.opt_type == "IRC": self.opt_method.irc_history.progress_report() rxnpath = self.opt_method.irc_history.rxnpath_dict() else: logger.info("\tOptimization Finished\n" + self.history.summary_string()) qc_output = prepare_opt_output(self.molsys, self.computer, rxnpath=rxnpath, error=error) self.clear() return qc_output
[docs] def optimization_factory(method, molsys, history_object, params=None): """create optimization algorithms. method may be redundant however params is allowed to be none so method is required explicitly Returns ------- OptimizationAlgorithm""" ALGORITHMS = { "RFO": stepAlgorithms.RestrictedStepRFO, "P_RFO": stepAlgorithms.PartitionedRFO, "NR": stepAlgorithms.QuasiNewtonRaphson, "SD": stepAlgorithms.SteepestDescent, "IRC": IRCfollowing.IntrinsicReactionCoordinate, "CONJUGATE": stepAlgorithms.ConjugateGradient, } return ALGORITHMS.get(method, stepAlgorithms.RestrictedStepRFO)(molsys, history_object, params)
[docs] def get_pes_info( H: np.ndarray, computer: ComputeWrapper, o_molsys: Molsys, opt_history: history.History, params: op.OptParams, hessian_protocol="update", requires=("energy", "gradient"), ): """Calculate, update, or guess hessian as appropriate. Calculate gradient and transform to the current coordinate system, pulls the gradient from hessian output if possible. Parameters ---------- H: np.ndarray current Hessian computer : compute_wrappers.ComputeWrapper o_molsys : molsys.Molsys opt_history: history.History params : op.OptParams requires : list ("energy", "gradient", "hessian") hessian_protocol : str one of ("unneeded", "compute", "guess", "update") Returns ------- np.ndarray hessian matrix (inteneral coordinates) np.ndarray: gradient vector (internal coordinates) np.ndarrray gradient vector (cartesian coordinates) float energy Notes ----- Some functionality from start_step has now been placed here. external forces are added into the forces. Redundancies and constraints are projected out of the forces and hessian. """ if "gradient" in requires: driver = "gradient" else: driver = "energy" if hessian_protocol == "update": # For update. Compute new gradient. Update with external forces if present. Update the hessian logger.info(f"Updating Hessian with {str(op.Params.hess_update)}") result = computer.compute(o_molsys.geom, driver=driver, return_full=False) g_x = np.asarray(result) if driver == "gradient" else None f_q = o_molsys.gradient_to_internals(g_x, -1.0) f_q, H = o_molsys.apply_external_forces(f_q, H) H = opt_history.hessian_update(H, f_q, o_molsys) else: if hessian_protocol == "compute" and not params.cart_hess_read: H, g_x = get_hess_grad(computer, o_molsys) # get gradient from hessian elif hessian_protocol in ["guess", "unneeded"]: # guess hessian compute gradient logger.info(f"Guessing Hessian with {str(params.intrafrag_hess)}") H = hessian.guess(o_molsys, guessType=params.intrafrag_hess) result = computer.compute(o_molsys.geom, driver=driver, return_full=False) g_x = np.asarray(result) if driver == "gradient" else None elif params.cart_hess_read: # read hessian from file. calculate gradient. Update params to not read from disk again logger.info("Reading hessian from file") result = computer.compute(o_molsys.geom, driver=driver, return_full=False) g_x = np.asarray(result) if driver == "gradient" else None Hx = hessian.from_file(params.hessian_file) H = o_molsys.hessian_to_internals(Hx) params.cart_hess_read = False params.hessian_file = None else: raise OptError("Encountered unknown value from get_hessian_protocol()") # Handle external forces (not included in hessian currently) f_q = o_molsys.gradient_to_internals(g_x, -1) f_q, H = o_molsys.apply_external_forces(f_q, H) f_q, H = o_molsys.project_redundancies_and_constraints(f_q, H) g_q = -f_q if H.size: logger.info(print_mat_string(H, title="Hessian matrix")) return H, g_q, g_x, computer.energies[-1]
[docs] def get_hess_grad(computer, o_molsys): """Compute hessian and fetch gradient from output if possible. Perform separate gradient calculation if needed Parameters ---------- computer: compute_wrappers.ComputeWrapper o_molsys: molsys.Molsys Returns ------- tuple(np.ndarray, np.ndarray) Notes ----- Hessian is in internals gradient is in cartesian """ ret = computer.compute(o_molsys.geom, driver="hessian", return_full=True, print_result=False) h_cart = np.asarray(ret["return_result"]).reshape(o_molsys.geom.size, o_molsys.geom.size) try: logger.debug("Looking for gradient in hessian output") g_cart = ret["extras"]["qcvars"]["CURRENT GRADIENT"] except KeyError: logger.error("Could not find the gradient in qcschema") grad = computer.compute(o_molsys.geom, driver="gradient", return_full=False) g_cart = np.asarray(grad) # Likely not at stationary point. Include forces # ADDENDUM currently neglects forces term for all points - including non-stationary H = o_molsys.hessian_to_internals(h_cart) return H, g_cart
[docs] def make_internal_coords(o_molsys: Molsys, params: op.OptParams): """ Add optimization coordinates to molecule system. May be called if coordinates have not been added yet, or have been removed due to an algorithm error (bend going linear, or energy increasing, etc.). Parameters ---------- o_molsys: Molsys current molecular system. params: op.OptParams Returns ------- o_molsys: Molsys The molecular system updated with internal coordinates. """ # if params is None: # params = op.Params logger.debug("\t Adding internal coordinates to molecular system") # Use covalent radii to determine bond connectivity. connectivity = addIntcos.connectivity_from_distances(o_molsys.geom, o_molsys.Z) logger.debug("Connectivity Matrix\n" + print_mat_string(connectivity)) if params.frag_mode == "SINGLE": # Make a single, supermolecule. o_molsys.consolidate_fragments() # collapse into one frag (if > 1) o_molsys.split_fragments_by_connectivity() # separate by connectivity # increase connectivity until all atoms are connected o_molsys.augment_connectivity_to_single_fragment(connectivity) o_molsys.consolidate_fragments() # collapse into one frag if params.opt_coordinates in ["INTERNAL", "REDUNDANT", "BOTH"]: o_molsys.fragments[0].add_intcos_from_connectivity(connectivity) if params.add_auxiliary_bonds: o_molsys.fragments[0].add_auxiliary_bonds(connectivity) if params.opt_coordinates in ["CARTESIAN", "BOTH"]: o_molsys.fragments[0].add_cartesian_intcos() elif params.frag_mode == "MULTI": # if provided multiple frags, then we use these. # if not, then split them (if not connected). if o_molsys.nfragments == 1: o_molsys.split_fragments_by_connectivity() if o_molsys.nfragments > 1: addIntcos.add_dimer_frag_intcos(o_molsys) # remove connectivity so that we don't add redundant coordinates # between fragments o_molsys.purge_interfragment_connectivity(connectivity) if params.opt_coordinates in ["INTERNAL", "REDUNDANT", "BOTH"]: for iF, F in enumerate(o_molsys.fragments): C = np.ndarray((F.natom, F.natom)) C[:] = connectivity[o_molsys.frag_atom_slice(iF), o_molsys.frag_atom_slice(iF)] F.add_intcos_from_connectivity(C) if params.add_auxiliary_bonds: F.add_auxiliary_bonds(connectivity) if params.opt_coordinates in ["CARTESIAN", "BOTH"]: for F in o_molsys.fragments: F.add_cartesian_intcos() addIntcos.add_constrained_intcos(o_molsys) # make sure these are in the set return
[docs] def prepare_opt_output(o_molsys, computer, rxnpath=False, error=None): logger.info("Preparing OptimizationResult") # Get molecule from most recent step. Add provenance and fill in non-required fills. # Turn back to dict computer.update_geometry(o_molsys.geom) final_molecule = computer.molecule qc_output = { "schema_name": "qcschema_optimization_output", "trajectory": computer.trajectory, "energies": computer.energies, "final_molecule": final_molecule, "extras": {}, "success": True, } if error: qc_output.update({"success": False, "error": {"error_type": error.err_type, "error_message": error.mesg}}) if rxnpath: print(rxnpath) qc_output["extras"]["irc_rxn_path"] = rxnpath qc_output["final_geometry"] = rxnpath[-2]["x"] qc_output["extras"]["final_irc_energy"] = rxnpath[-2]["energy"] return qc_output