protons
protons copied to clipboard
Abl benchmarks

Breakdown of an Abl NCMC benchmark.
from __future__ import print_function
from protons import app
from simtk import unit, openmm as mm
from protons.app import ConstantPHCalibration, ForceFieldProtonDrive, NCMCProtonDrive
from protons.app import MetadataReporter, TitrationReporter, NCMCReporter, SAMSReporter
import shutil
import signal
import json
from protons.app.logger import log, logging
import numpy as np
from openmmtools.integrators import LangevinIntegrator, ExternalPerturbationLangevinIntegrator
log.setLevel(logging.DEBUG)
import os
import netCDF4
import cProfile
number_R_steps = 1
class ExternalGBAOABIntegrator(ExternalPerturbationLangevinIntegrator):
"""
Implementation of the gBAOAB integrator which tracks external protocol work.
Parameters
----------
number_R: int, default: 1
The number of sequential R steps. For instance V R R O R R V has number_R = 2
temperature : simtk.unit.Quantity compatible with kelvin, default: 298*unit.kelvin
The temperature.
collision_rate : simtk.unit.Quantity compatible with 1/picoseconds, default: 1.0/unit.picoseconds
The collision rate.
timestep : simtk.unit.Quantity compatible with femtoseconds, default: 1.0*unit.femtoseconds
The integration timestep.
"""
def __init__(self, number_R_steps=1, temperature=298.0 * unit.kelvin,
collision_rate=1.0 / unit.picoseconds,
timestep=1.0 * unit.femtoseconds,
constraint_tolerance=1e-7
):
Rstep = " R" * number_R_steps
super(ExternalGBAOABIntegrator, self).__init__(splitting="V{0} O{0} V".format(Rstep),
temperature=temperature,
collision_rate=collision_rate,
timestep=timestep,
constraint_tolerance=constraint_tolerance,
measure_shadow_work=False,
measure_heat=False,
)
# Define what to do on timeout
def timeout_handler(signum, frame):
log.warn("Script is running out of time. Attempting to exit cleanly.")
raise Exception("Running out of time, shutting down!")
def serialize_state(context, outputfile):
"""
Serialize the simulation state to xml.
"""
xmls = mm.XmlSerializer
statexml = xmls.serialize(context.getState(getPositions=True, getVelocities=True))
with open(outputfile,'w') as statefile:
statefile.write(statexml)
def serialize_drive(drive, outputfile):
"""
Serialize the drive residues to xml.
"""
drivexml = drive.serialize_titration_groups()
with open(outputfile, 'wb') as drivefile:
drivefile.write(drivexml)
def serialize_sams_status(calibration, outputfile):
"""
Serialize the state of the SAMS calibration as json
"""
samsProperties = calibration.export_samsProperties()
samsjson = json.dumps(samsProperties, sort_keys=True, indent=4, separators=(',', ': '))
with open(outputfile, 'w') as samsfile:
samsfile.write(samsjson)
# Register the timeout handling
signal.signal(signal.SIGALRM, timeout_handler)
# naming the input files
input_pdb_file = "2HYY-H.pdb"
ligand_xml = "imatinib.xml"
input_state_xml = "resume-Abl-Imatinib-full-1-state.xml"
input_drive_xml = "resume-Abl-Imatinib-full-1-drive.xml"
input_sams_json = "resume-Abl-Imatinib-full-1-calibration.json"
# Naming the output files
basename = "Abl-Imatinib-full-2"
name_netcdf = '%s.nc' % basename
dcd_output_name = '%s.dcd' %basename
weights_txt_name = '%s-weights.txt' % basename
output_context_xml = "resume-%s-state.xml" % basename
output_drive_xml = "resume-%s-drive.xml" % basename
output_calibration_json = "resume-%s-calibration.json" % basename
# Simulation settings
ewaldErrorTolerance = 1.e-5
barostatInterval = 25
switching_distance = 0.85 * unit.nanometers
nonbondedCutoff = 1.0 * unit.nanometers
pressure = 1.0 * unit.atmosphere
temperature = 300.0 * unit.kelvin
# Integrator specific options
timestep = 2.0 * unit.femtosecond
constraint_tolerance = 1.e-7
collision_rate = 1.0 / unit.picosecond
# default of 20 ps of MD before starting the main loop
num_thermalization_steps = 100
steps_between_updates = 100
# Driver update settings
# Number of attempts per protein update
nattempts_protein = 1
ncmc_steps_per_trial = 10000 # 20 ps / 20 fs
prop_steps_per_trial = 1
total_iterations = 1000
modulo_ligand_update = 1 # Update ligand every n iterations
modulo_protein_update = 1 # Update protein every n iterations
# Minimization
pre_run_minimization_tolerance = 1e-5 * unit.kilojoule / unit.mole
minimization_max_iterations = 0
# SAMS settings
beta_burnin = 0.5
flatness_criterion = 0.2
# Loads standard residues so that the topology of the PDBx file will be interpreted correctly
pdb_object = app.PDBFile(input_pdb_file)
forcefield = app.ForceField('amber10-constph.xml', 'gaff.xml', ligand_xml, 'tip3p.xml', 'ions_tip3p.xml')
# System Configuration
nonbondedMethod = app.PME
constraints = app.HBonds
rigidWater = True
# Script specific settings
script_timeout = 428400 # 119 hours
# Simulation Options
platform = mm.Platform.getPlatformByName('CUDA')
properties = {'CudaPrecision': 'mixed', 'DeterministicForces': 'true', 'CudaDeviceIndex': os.environ['CUDA_VISIBLE_DEVICES']}
# Prepare the Simulation
topology = pdb_object.topology
positions = pdb_object.positions
system = forcefield.createSystem(topology, nonbondedMethod=nonbondedMethod, constraints=constraints,
rigidWater=rigidWater, ewaldErrorTolerance=ewaldErrorTolerance, nonbondedCutoff=nonbondedCutoff)
for force in system.getForces():
if isinstance(force, mm.NonbondedForce):
force.setUseSwitchingFunction(True)
force.setSwitchingDistance(switching_distance)
system.addForce(
mm.MonteCarloBarostat(
pressure,
temperature,
barostatInterval))
integrator = ExternalGBAOABIntegrator(number_R_steps=number_R_steps, temperature=temperature, collision_rate=collision_rate, timestep=timestep, constraint_tolerance=constraint_tolerance)
ncmc_propagation_integrator = ExternalGBAOABIntegrator(number_R_steps=number_R_steps, temperature=temperature, collision_rate=collision_rate, timestep=timestep, constraint_tolerance=constraint_tolerance)
compound_integrator = mm.CompoundIntegrator()
compound_integrator.addIntegrator(integrator)
compound_integrator.addIntegrator(ncmc_propagation_integrator)
compound_integrator.setCurrentIntegrator(0)
xmlserializer = mm.XmlSerializer
state = input_state_xml
driver = NCMCProtonDrive(temperature, topology, system, pressure=pressure, perturbations_per_trial=ncmc_steps_per_trial, propagations_per_step=prop_steps_per_trial)
with open(input_drive_xml, 'r') as serialized_drive:
driver.add_residues_from_serialized_xml(serialized_drive.read())
# Assumes ligand is always the last titration group
ligand_titration_group_index = len(driver.titrationGroups) - 1
pools = {'protein_titratable_residues' : list(range(ligand_titration_group_index)),'ligand' : [ligand_titration_group_index]}
driver.define_pools(pools)
# Create SAMS sampler
input_sams_json = "resume-Abl-Imatinib-full-1-calibration.json"
with open(input_sams_json, 'r') as sams_json_file:
samsProperties = json.loads(sams_json_file.read())
simulation = app.ConstantPHCalibration(topology, system, compound_integrator, driver, group_index=ligand_titration_group_index, platform=platform, platformProperties=properties, state=state, samsProperties=samsProperties)
dcdreporter = app.DCDReporter(dcd_output_name, int(steps_between_updates/30))
ncfile = netCDF4.Dataset(name_netcdf, "w")
metdatarep = MetadataReporter(ncfile, shared=True)
ncmcrep = NCMCReporter(ncfile,1,shared=True)
titrep = TitrationReporter(ncfile,1,shared=True)
samsrep = SAMSReporter(ncfile,1,shared=True)
simulation.reporters.append(dcdreporter)
simulation.update_reporters.append(metdatarep)
simulation.update_reporters.append(ncmcrep)
simulation.update_reporters.append(titrep)
simulation.calibration_reporters.append(samsrep)
# Run SAMS for a specified number of iterations
# Raises an exception if the simulation runs out of time, so that the script can be killed cleanly from within python
signal.alarm(script_timeout)
def run_loops(total_iterations, steps_between_updates, nattempts_protein):
log.setLevel(logging.INFO)
for i in range(total_iterations):
log.info("Iteration %i", i)
simulation.step(steps_between_updates)
simulation.update(nattempts_protein, pool='protein_titratable_residues')
simulation.update(1, pool='ligand')
simulation.adapt()
try:
cProfile.run('run_loops(2, steps_between_updates, nattempts_protein)', filename="Abl_2long.pstat")
# Reset timer
signal.alarm(0)
finally:
# export the context
serialize_state(simulation.context,output_context_xml)
# export the driver
serialize_drive(simulation.drive, output_drive_xml)
# export the calibration status
serialize_sams_status(simulation, output_calibration_json)
ncfile.close()
# End of script