Files
VASP_calc/electrochemistry/echem/neb/neb.py
2026-01-08 19:47:32 +03:00

588 lines
25 KiB
Python

from __future__ import annotations
from ase.mep.neb import NEB
from ase.optimize.sciopt import OptimizerConvergenceError
from ase.io.trajectory import Trajectory, TrajectoryWriter
from ase.io import read
from echem.neb.calculators import JDFTx
from echem.neb.autoneb import AutoNEB
from echem.io_data.jdftx import Ionpos, Lattice, Input
from echem.core.useful_funcs import shell
from pathlib import Path
import numpy as np
import logging
import os
from typing import Literal, Callable
logging.basicConfig(filename='logfile_NEB.log', filemode='a', level=logging.INFO,
format="%(asctime)s %(levelname)8s %(name)14s %(message)s",
datefmt='%d/%m/%Y %H:%M:%S')
class NEBOptimizer:
def __init__(self,
neb: NEB,
trajectory_filepath: str | Path | None = None,
append_trajectory: bool = True):
self.neb = neb
self.logger = logging.getLogger(self.__class__.__name__ + ':')
self.E_image_first = None
self.E_image_last = None
if trajectory_filepath is not None:
if append_trajectory:
self.trj_writer = TrajectoryWriter(trajectory_filepath, mode='a')
else:
self.trj_writer = TrajectoryWriter(trajectory_filepath, mode='w')
else:
self.trj_writer = None
def converged(self, fmax):
return self.neb.get_residual() <= fmax
def update_positions(self, X):
positions = X.reshape((self.neb.nimages - 2) * self.neb.natoms, 3)
self.neb.set_positions(positions)
def get_forces(self):
return self.neb.get_forces().reshape(-1)
def get_energies(self, first: bool = False, last: bool = False):
if not first and not last:
return [image.calc.E for image in self.neb.images[1:-1]]
elif first and not last:
return [image.calc.E for image in self.neb.images[:-1]]
elif not first and last:
return [image.calc.E for image in self.neb.images[1:]]
elif first and last:
return [image.calc.E for image in self.neb.images]
def dump_trajectory(self):
if self.trj_writer is not None:
for image in self.neb.images:
self.trj_writer.write(image)
def dump_positions_vasp(self, prefix='last_img'):
length = len(str(self.neb.nimages + 1))
for i, image in enumerate(self.neb.images):
image.write(f'{prefix}_{str(i).zfill(length)}.vasp', format='vasp')
def set_step_in_calculators(self, step, first: bool = False, last: bool = False):
if not first and not last:
for image in self.neb.images[1:-1]:
image.calc.global_step = step
elif first and not last:
for image in self.neb.images[:-1]:
image.calc.global_step = step
elif not first and last:
for image in self.neb.images[1:]:
image.calc.global_step = step
elif first and last:
for image in self.neb.images:
image.calc.global_step = step
def run_static(self,
fmax: float = 0.1,
max_steps: int = 100,
alpha: float = 0.02,
dE_max: float = None,
construct_calc_fn: Callable = None):
self.logger.info('Static method of optimization was chosen')
max_new_images_at_step = 1
min_steps_after_insertion = 3
steps_after_insertion = 0
if dE_max is not None:
self.logger.info(f'AutoNEB with max {dE_max} eV difference between images was set')
self.logger.info(f'Initial number of images is {self.neb.nimages}, '
f'including initial and final images')
if max_steps < 1:
raise ValueError('max_steps must be greater or equal than one')
if dE_max is not None:
self.set_step_in_calculators(0, first=True, last=True)
self.E_image_first = self.neb.images[0].get_potential_energy()
self.E_image_last = self.neb.images[-1].get_potential_energy()
length_step = len(str(max_steps))
#X = self.neb.get_positions().reshape(-1)
for step in range(max_steps):
self.dump_trajectory()
self.dump_positions_vasp(prefix=f'Step-{step}-1-')
if dE_max is not None:
self.set_step_in_calculators(step, first=True, last=True)
else:
self.set_step_in_calculators(step)
F = self.get_forces()
self.logger.info(f'Step: {step:{length_step}}. Energies = '
f'{[np.round(en, 4) for en in self.get_energies()]}')
R = self.neb.get_residual()
if R <= fmax:
self.logger.info(f'Step: {step:{length_step}}. Optimization terminates successfully. Residual R = {R:.4f}')
return True
else:
self.logger.info(f'Step: {step:{length_step}}. Residual R = {R:.4f}')
X = self.neb.get_positions().reshape(-1)
X += alpha * F
self.update_positions(X)
self.dump_positions_vasp(prefix=f'Step:-{step}-2-')
if dE_max is not None:
energies = self.get_energies(first=True, last=True)
self.logger.debug(f'Energies raw: {energies}')
self.logger.info(f'Step: {step:{length_step}}. Energies = '
f'{[np.round(en, 4) for en in energies]}')
diff = np.abs(np.diff(energies))
self.logger.debug(f'diff: {diff}')
idxs = np.where(diff > dE_max)[0]
self.logger.debug(f'Idxs where diff > dE_max: {idxs}')
if len(idxs) > max_new_images_at_step:
idxs = np.flip(np.argsort(diff))[:max_new_images_at_step]
self.logger.debug(f'Images will be added for idxs {idxs} since more than {max_new_images_at_step} '
f'diffs were large than {dE_max} eV')
if (len(idxs) > 0) and (steps_after_insertion > min_steps_after_insertion):
steps_after_insertion = -1
for idx in reversed(idxs):
self.logger.debug(f'Start working with idx: {idx}')
length_prev = len(str(len(self.neb.images) - 1))
length_new = len(str(len(self.neb.images)))
self.logger.debug(f'{length_prev=} {length_new=}')
tmp_images = [self.neb.images[idx].copy(),
self.neb.images[idx].copy(),
self.neb.images[idx + 1].copy()]
tmp_neb = NEB(tmp_images)
tmp_neb.interpolate()
images_new = self.neb.images.copy()
images_new.insert(idx + 1, tmp_neb.images[1])
energies = self.get_energies(first=True, last=True)
energies.insert(idx + 1, None)
self.neb = NEB(images_new,
k=self.neb.k[0],
climb=self.neb.climb)
self.dump_positions_vasp(prefix=f'Step: {step} 3 ')
zfill_length = len(str(len(self.neb.images)))
for k, image in enumerate(self.neb.images):
self.logger.debug(f'Trying to attach the calc to {k} image '
f'with the length: {len(str(len(self.neb.images)))}')
image.calc = construct_calc_fn(str(k).zfill(zfill_length))
image.calc.E = energies[k]
if length_prev != length_new:
self.logger.debug('Trying to rename due to the change in length')
for i in range(0, self.neb.nimages):
shell(f'mv {str(i).zfill(length_prev)} {str(i).zfill(length_new)}')
self.logger.debug(f'Trying to execute the following command: '
f'mv {str(i).zfill(length_prev)} {str(i + 1).zfill(length_new)}')
self.logger.debug('Trying to rename due to the insertion')
for i in range(len(self.neb.images) - 2, idx, -1):
self.logger.debug(f'{i=}')
self.logger.debug(f'Trying to execute the following command: '
f'mv {str(i).zfill(length_new)} {str(i + 1).zfill(length_new)}')
shell(f'mv {str(i).zfill(length_new)} {str(i + 1).zfill(length_new)}')
self.logger.debug(f'Trying to create the new folder: {str(idx + 1).zfill(length_new)}')
folder = Path(str(idx + 1).zfill(length_new))
folder.mkdir()
self.dump_positions_vasp(prefix=f'Step: {step} 4 ')
steps_after_insertion += 1
self.logger.debug(f'Step after insertion: {steps_after_insertion}')
self.logger.warning(f'convergence was not achieved after max iterations = {max_steps}, '
f'residual R = {R:.4f} > {fmax}')
return False
def run_ode(self,
fmax: float = 0.1,
max_steps: int = 100,
C1: float = 1e-2,
C2: float = 2.0,
extrapolation_scheme: Literal[1, 2, 3] = 3,
h: float | None = None,
h_min: float = 1e-10,
R_max: float = 1e3,
rtol: float = 0.1):
"""
fmax : float
convergence tolerance for residual force
max_steps : int
maximum number of steps
C1 : float
sufficient contraction parameter
C2 : float
residual growth control (Inf means there is no control)
extrapolation_scheme : int
extrapolation style (3 seems the most robust)
h : float
initial step size, if None an estimate is used based on ODE12
h_min : float
minimal allowed step size
R_max: float
terminate if residual exceeds this value
rtol : float
relative tolerance
"""
if max_steps < 2:
raise ValueError('max_steps must be greater or equal than two')
length = len(str(max_steps))
self.set_step_in_calculators(0)
F = self.get_forces()
self.logger.info(f'Step: {0:{length}}. Energies = {[np.round(en, 4) for en in self.get_energies()]}')
R = self.neb.get_residual() # pick the biggest force
if R >= R_max:
self.logger.info(f'Step: {0:{length}}. Residual {R:.4f} >= R_max {R_max}')
raise OptimizerConvergenceError(f'Step: 0. Residual {R:.4f} >= R_max {R_max}')
else:
self.logger.info(f'Step: {0:{length}}. Residual R = {R:.4f}')
if h is None:
h = 0.5 * rtol ** 0.5 / R # Chose a step size based on that force
h = max(h, h_min) # Make sure the step size is not too big
self.logger.info(f'Step: {0:{length}}. Step size h = {h}')
X = self.neb.get_positions().reshape(-1)
for step in range(1, max_steps):
X_new = X + h * F # Pick a new position
self.update_positions(X_new)
self.set_step_in_calculators(step)
F_new = self.get_forces() # Calculate the new forces at this position
self.logger.info(f'Step: {step:{length}}. Energies = {[np.round(en, 4) for en in self.get_energies()]}')
R_new = self.neb.get_residual()
self.logger.info(f'Step: {step:{length}}. At new coordinates R = {R:.4f} -> R_new = {R_new:.4f}')
e = 0.5 * h * (F_new - F) # Estimate the area under the forces curve
err = np.linalg.norm(e, np.inf) # Error estimate
# Accept step if residual decreases sufficiently and/or error acceptable
condition_1 = R_new <= R * (1 - C1 * h)
condition_2 = R_new <= R * C2
condition_3 = err <= rtol
accept = condition_1 or (condition_2 and condition_3)
self.logger.info(f'Step: {step:{length}}. {"R_new <= R * (1 - C1 * h)":26} \t is {condition_1}')
self.logger.info(f'Step: {step:{length}}. {"R_new <= R * C2":26} is {condition_2}')
self.logger.info(f'Step: {step:{length}}. {"err <= rtol":26} is {condition_3}')
# Pick an extrapolation scheme for the system & find new increment
y = F - F_new
if extrapolation_scheme == 1: # F(xn + h Fp)
h_ls = h * (F @ y) / (y @ y)
elif extrapolation_scheme == 2: # F(Xn + h Fp)
h_ls = h * (F @ F_new) / (F @ y + 1e-10)
elif extrapolation_scheme == 3: # min | F(Xn + h Fp) |
h_ls = h * (F @ y) / (y @ y + 1e-10)
else:
raise ValueError(f'Invalid extrapolation_scheme: {extrapolation_scheme}. Must be 1, 2 or 3')
if np.isnan(h_ls) or h_ls < h_min: # Rejects if increment is too small
h_ls = np.inf
h_err = h * 0.5 * np.sqrt(rtol / err)
if accept:
self.logger.info(f'Step: {step:{length}}. The displacement is accepted')
X = X_new
R = R_new
F = F_new
self.dump_trajectory()
self.dump_positions_vasp()
# We check the residuals again
if self.converged(fmax):
self.logger.info(f"Step: {step:{length}}. Optimization terminates successfully")
return True
if R > R_max:
self.logger.info(f"Step: {step:{length}}. Optimization fails, R = {R:.4f} > R_max = {R_max}")
return False
# Compute a new step size.
# Based on the extrapolation and some other heuristics
h = max(0.25 * h, min(4 * h, h_err, h_ls)) # Log steep-size analytic results
self.logger.info(f'Step: {step:{length}}. New step size h = {h}')
else:
self.logger.info(f'Step: {step:{length}}. The displacement is rejected')
h = max(0.1 * h, min(0.25 * h, h_err, h_ls))
self.logger.info(f'Step: {step:{length}}. New step size h = {h}')
if abs(h) < h_min: # abort if step size is too small
self.logger.info(f'Step: {step:{length}}. Stop optimization since step size h = {h} < h_min = {h_min}')
return True
self.logger.warning(f'Step: {step:{length}}. Convergence was not achieved after max iterations = {max_steps}')
return True
class NEB_JDFTx:
def __init__(self,
path_jdftx_executable: str | Path,
nimages: int = 5,
input_filepath: str | Path = 'input.in',
output_name: str = 'output.out',
input_format: Literal['jdftx', 'vasp'] = 'jdftx',
cNEB: bool = True,
spring_constant: float = 5.0,
interpolation_method: Literal['linear', 'idpp'] = 'idpp',
restart: Literal[False, 'from_traj', 'from_vasp'] = False,
dE_max: float = None):
if isinstance(path_jdftx_executable, str):
self.path_jdftx_executable = Path(path_jdftx_executable)
else:
self.path_jdftx_executable = path_jdftx_executable
if isinstance(input_filepath, str):
input_filepath = Path(input_filepath)
self.jdftx_input = Input.from_file(input_filepath)
self.nimages = nimages
self.path_rundir = Path.cwd()
self.output_name = output_name
self.input_format = input_format.lower()
self.cNEB = cNEB
self.restart = restart
self.spring_constant = spring_constant
self.interpolation_method = interpolation_method.lower()
self.dE_max = dE_max
self.optimizer = None
self.logger = logging.getLogger(self.__class__.__name__ + ':')
def prepare(self):
length = len(str(self.nimages + 1))
if self.restart is False:
if self.input_format == 'jdftx':
init_ionpos = Ionpos.from_file('init.ionpos')
init_lattice = Lattice.from_file('init.lattice')
final_ionpos = Ionpos.from_file('final.ionpos')
final_lattice = Lattice.from_file('final.lattice')
init_poscar = init_ionpos.convert('vasp', init_lattice)
init_poscar.to_file('init.vasp')
final_poscar = final_ionpos.convert('vasp', final_lattice)
final_poscar.to_file('final.vasp')
initial = read('init.vasp', format='vasp')
final = read('final.vasp', format='vasp')
images = [initial]
images += [initial.copy() for _ in range(self.nimages)]
images += [final]
neb = NEB(images,
k=self.spring_constant,
climb=self.cNEB)
neb.interpolate(method=self.interpolation_method)
for i, image in enumerate(images):
image.write(f'start_img_{str(i).zfill(length)}.vasp', format='vasp')
else:
images = []
if self.restart == 'from_traj':
trj = Trajectory('NEB_trajectory.traj')
n_iter = int(len(trj) / (self.nimages + 2))
for i in range(self.nimages + 2):
trj[(n_iter - 1) * (self.nimages + 2) + i].write(f'start_img_{str(i).zfill(length)}.vasp',
format='vasp')
trj.close()
if self.restart == 'from_traj' or self.restart == 'from_vasp':
for i in range(self.nimages + 2):
img = read(f'start_img_{str(i).zfill(length)}.vasp', format='vasp')
images.append(img)
else:
raise ValueError(f'restart must be False or \'from_traj\', '
f'or \'from_vasp\' but you set {self.restart=}')
neb = NEB(images,
k=self.spring_constant,
climb=self.cNEB)
for i in range(self.nimages):
folder = Path(str(i+1).zfill(length))
folder.mkdir(exist_ok=True)
if self.dE_max is not None:
self.logger.debug(f'Trying to create the folder {str(0).zfill(length)}')
folder = Path(str(0).zfill(length))
folder.mkdir(exist_ok=True)
self.logger.debug(f'Trying to create the folder {str(self.nimages + 1).zfill(length)}')
folder = Path(str(self.nimages + 1).zfill(length))
folder.mkdir(exist_ok=True)
for i, image in enumerate(images[1:-1]):
image.calc = JDFTx(self.path_jdftx_executable,
path_rundir=self.path_rundir / str(i+1).zfill(length),
commands=self.jdftx_input.commands)
if self.dE_max is not None:
images[0].calc = JDFTx(self.path_jdftx_executable,
path_rundir=self.path_rundir / str(0).zfill(length),
commands=self.jdftx_input.commands)
images[-1].calc = JDFTx(self.path_jdftx_executable,
path_rundir=self.path_rundir / str(self.nimages + 1).zfill(length),
commands=self.jdftx_input.commands)
self.optimizer = NEBOptimizer(neb=neb,
trajectory_filepath='NEB_trajectory.traj')
def run(self,
fmax: float = 0.1,
method: Literal['ode', 'static'] = 'ode',
max_steps: int = 100,
**kwargs):
self.prepare()
if self.dE_max is not None:
def calc_fn(folder_name) -> JDFTx:
return JDFTx(self.path_jdftx_executable,
path_rundir=self.path_rundir / folder_name,
commands=self.jdftx_input.commands)
else:
calc_fn = None
if method == 'ode':
self.optimizer.run_ode(fmax, max_steps)
elif method == 'static':
self.optimizer.run_static(fmax, max_steps, dE_max=self.dE_max, construct_calc_fn=calc_fn)
else:
raise ValueError(f'Method must be ode or static but you set {method=}')
class AutoNEB_JDFTx:
"""
Class for running AutoNEB with JDFTx calculator
Parameters:
prefix: string or Path
path to folder with initial files. Basically could be os.getcwd()
In this folder required:
1) init.vasp file with initial configuration
2) final.vasp file with final configuration
3) in file with JDFTx calculation parameters
path_jdftx_executable: string or Path
path to jdftx executable
n_start: int
Starting number of images between starting and final for NEB
n_max: int
Maximum number of images, including starting and final
climb: boolean
Whether it is necessary to use cNEB or not
fmax: float or list of floats
The maximum force along the NEB path
maxsteps: int
The maximum number of steps in each NEB relaxation.
If a list is given the first number of steps is used in the build-up
and final scan phase;
the second number of steps is used in the CI step after all images
have been inserted.
k: float
The spring constant along the NEB path
method: str (see neb.py)
Choice betweeen three method:
'aseneb', standard ase NEB implementation
'improvedtangent', published NEB implementation
'eb', full spring force implementation (default)
optimizer: str or object
Set optimizer for NEB: FIRE, BFGS or NEB
space_energy_ratio: float
The preference for new images to be added in a big energy gab
with a preference around the peak or in the biggest geometric gab.
A space_energy_ratio set to 1 will only considder geometric gabs
while one set to 0 will result in only images for energy
resolution.
interpolation_method: string
method for interpolation
smooth_curve: boolean
"""
def __init__(self,
prefix,
path_jdftx_executable,
n_start=3,
n_simul=3,
n_max=10,
climb=True,
fmax=0.05,
maxsteps=100,
k=0.1,
restart=False,
method='eb',
optimizer='FIRE',
space_energy_ratio=0.5,
interpolation_method='idpp',
smooth_curve=False):
self.restart = restart
self.path_jdftx_executable = path_jdftx_executable
self.prefix = Path(prefix)
self.n_start = n_start
self.n_max = n_max
self.commands = Input.from_file(Path(prefix) / 'in').commands
self.interpolation_method = interpolation_method
self.autoneb = AutoNEB(self.attach_calculators,
prefix=prefix,
n_simul=n_simul,
n_max=n_max,
climb=climb,
fmax=fmax,
maxsteps=maxsteps,
k=k,
method=method,
space_energy_ratio=space_energy_ratio,
world=None, parallel=False, smooth_curve=smooth_curve,
interpolate_method=interpolation_method, optimizer=optimizer)
def prepare(self):
if not self.restart:
initial = read(self.prefix / 'init.vasp', format='vasp')
final = read(self.prefix / 'final.vasp', format='vasp')
images = [initial]
if self.n_start != 0:
images += [initial.copy() for _ in range(self.n_start)]
images += [final]
if self.n_start != 0:
neb = NEB(images)
neb.interpolate(method=self.interpolation_method)
for i, image in enumerate(images):
image.write(self.prefix / f'{i:03d}.traj', format='traj')
image.write(self.prefix / f'{i:03d}.vasp', format='vasp')
else:
index_exists = [i for i in range(self.n_max) if
os.path.isfile(self.prefix / f'{i:03d}.traj')]
for i in index_exists:
image = Trajectory(self.prefix / f'{i:03d}.traj')
image[-1].write(self.prefix / f'{i:03d}.vasp', format='vasp')
img = read(self.prefix / f'{i:03d}.vasp', format='vasp')
img.write(self.prefix / f'{i:03d}.traj', format='traj')
def attach_calculators(self, images, indexes, iteration):
for image, index in zip(images, indexes):
path_rundir = self.autoneb.iter_folder / f'iter_{iteration}' / str(index)
path_rundir.mkdir(exist_ok=True)
image.calc = JDFTx(self.path_jdftx_executable,
path_rundir=path_rundir,
commands=self.commands)
def run(self):
self.prepare()
self.autoneb.run()