Source code for adaptivemastermsm.launcher.launcher

"""
This file is part of the AdaptiveMasterMSM package.

"""
import sys, os
import subprocess
import shlex
import multiprocessing as mp

# AdaptiveMasterMSM
from adaptivemastermsm.system import system
from ..launcher import launcher_lib

[docs]class Launcher(object): """ Call GROMACS and process output to pass it to class analyzer """ def __init__(self, simsys): """ Read input from system, run MD, and process output for the class analyzer Parameters ---------- simsys : object An instance of the System class. """ Launcher.system = simsys
[docs] def gen_tpr(self, mdp=None, tpr='data/out.tpr', nsteps=250000):#, sim_time=None,chk=None): """ Function for generating tpr files Parameters ---------- mdp : str The parameter input file tpr : str The Gromacs run input file """ #launcher_lib.checkfile(tpr) #if chk is not None: # cmd = "gmx grompp -c %s -p %s -f %s -t %s -o %s -maxwarn 1"\ # %(gro, top, mdp, chk, tpr) #else: filemdp = mdp emstep = 0.002 if mdp in ["min","nvt","npt","prod"]: if mdp is "min": txt = launcher_lib.write_mdp_min() if mdp is "nvt": txt = launcher_lib.write_mdp_nvt() if mdp is "npt": txt = launcher_lib.write_mdp_npt() if mdp is "prod": txt = launcher_lib.write_mdp_prod(emstep=emstep, nsteps=nsteps) filemdp = "data/mdp/%s.mdp" % mdp launcher_lib.checkfile(filemdp) inp = open(filemdp, "w") inp.write(txt) inp.close() cmd = "gmx grompp -c %s -p %s -f %s -o %s -r %s"\ %(self.system.gro, self.system.top, filemdp, tpr, self.system.gro) print(cmd) os.system(cmd) self.tpr = tpr
#sim_time += emstep * float(nsteps)
[docs] def gmx_run(self, out='data/out'): """ Runs MD simulations using gmx mdrun Parameters ---------- tpr : str Gromacs run input file out : str Root filename for output """ cmd = "gmx mdrun -v -s %s -deffnm %s"%(self.tpr, out) print(cmd) p = subprocess.Popen(shlex.split(cmd), \ stdout=subprocess.PIPE, \ stderr=subprocess.PIPE) output, error = p.communicate() self.output = output self.error = error self.out = out # after the run we replace the gro in the system for the output gro file self.system.gro = "%s.gro"%out
[docs] def mp_run(self, tprs, outs): """ Driver for gmx_worker """ # define multiprocessing options nproc = mp.cpu_count() if len(tprs) > nproc: sys.exit(" Please reduce number of parallel runs") #self.nt = nproc/len(tprs) pool = mp.Pool(processes=nproc) # generate multiprocessing input mpinput = [] for i, tpr in enumerate(tprs): mpinput.append([tpr, outs[i]]) #mpinput = [[tpr, out] for tpr in tprs for out in outs] # run counting using multiprocessing pool.map(self.gmx_worker, mpinput) pool.close() pool.join()
[docs] def gmx_worker(self, x): """ mp worker for running Gromacs jobs -->Shall we use 'multi' flag from Gromacs? Parameters ---------- x : list List containing input for each mp worker. Includes: path to tpr file path to inp/out file """ #print(x) tpr = x[0] out = x[1] # run simulation #if self.nt > 1: #cmd = "gmx mdrun -nt %g -v -s %s -deffnm %s"%(self.nt, tpr, out) #else: cmd = "gmx mdrun -nt 1 -v -s %s -deffnm %s"%(tpr, out) print(cmd) p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = p.communicate()
#return (out, err)