"""Structure-first PDB2PQR integration for Neurosnap."""
from __future__ import annotations
import io
import logging
from contextlib import contextmanager
from typing import Any, Dict, Optional, Sequence, Tuple
import numpy as np
from neurosnap.constants.chemistry import ATOMIC_MASSES
from neurosnap.io.pdb import save_pdb
from neurosnap.log import logger
from neurosnap.structure import Structure
from ._pdb2pqr_vendor import biomolecule as vendor_biomolecule
from ._pdb2pqr_vendor import debump as vendor_debump
from ._pdb2pqr_vendor import forcefield as vendor_forcefield
from ._pdb2pqr_vendor import hydrogens as vendor_hydrogens
from ._pdb2pqr_vendor import io as vendor_io
from ._pdb2pqr_vendor import pdb as vendor_pdb
from ._pdb2pqr_vendor.config import FORCE_FIELDS, REPAIR_LIMIT
from ._pdb2pqr_vendor.utilities import noninteger_charge
__all__ = ["PDB2PQR_FORCE_FIELDS", "assign_pqr"]
PDB2PQR_FORCE_FIELDS = tuple(forcefield.upper() for forcefield in FORCE_FIELDS)
_VENDORED_LOGGER_NAMES = (
"neurosnap.algos._pdb2pqr_vendor.aa",
"neurosnap.algos._pdb2pqr_vendor.biomolecule",
"neurosnap.algos._pdb2pqr_vendor.cells",
"neurosnap.algos._pdb2pqr_vendor.debump",
"neurosnap.algos._pdb2pqr_vendor.definitions",
"neurosnap.algos._pdb2pqr_vendor.forcefield",
"neurosnap.algos._pdb2pqr_vendor.hydrogens",
"neurosnap.algos._pdb2pqr_vendor.hydrogens.optimize",
"neurosnap.algos._pdb2pqr_vendor.hydrogens.structures",
"neurosnap.algos._pdb2pqr_vendor.pdb",
"neurosnap.algos._pdb2pqr_vendor.residue",
"neurosnap.algos._pdb2pqr_vendor.utilities",
)
[docs]
def assign_pqr(
structure: Structure,
*,
forcefield: str = "PARSE",
ffout: Optional[str] = None,
neutraln: bool = False,
neutralc: bool = False,
assign_only: bool = False,
debump: bool = True,
optimize: bool = True,
) -> Structure:
"""Assign PDB2PQR radii and partial charges to a local Structure.
The returned structure is rebuilt from the PDB2PQR-updated atom table and
includes two float annotations:
- ``partial_charge``
- ``radius``
Parameters:
structure: Input single-model structure.
forcefield: PDB2PQR forcefield name.
ffout: Optional alternate output naming scheme.
neutraln: Make the N-terminus neutral. Only supported with PARSE.
neutralc: Make the C-terminus neutral. Only supported with PARSE.
assign_only: Assign parameters without repair, debumping, or hydrogen optimization.
debump: Run the PDB2PQR debumping routines.
optimize: Optimize hydrogens when ``assign_only`` is false.
Returns:
A new :class:`Structure` carrying PDB2PQR geometry updates and annotations.
Example:
Basic structure-first PQR assignment and writing::
from neurosnap.io.pdb import parse_pdb
from neurosnap.algos.pdb2pqr import assign_pqr
from neurosnap.io.pqr import save_pqr
structure = parse_pdb(
"tests/files/1nkp_mycmax_with_hydrogens.pdb",
return_type="ensemble",
).first()
pqr_structure = assign_pqr(
structure,
forcefield="AMBER",
assign_only=True,
)
print(pqr_structure.atom_annotations.dtype.names)
print(pqr_structure.atom_annotations["partial_charge"][:5])
print(pqr_structure.atom_annotations["radius"][:5])
save_pqr(pqr_structure, "test_output.pqr", include_header=True)
Quick smoke test on a standard PDB file::
from neurosnap.io.pdb import parse_pdb
from neurosnap.algos.pdb2pqr import assign_pqr
s = parse_pdb("tests/files/1BTL.pdb", return_type="ensemble").first()
out = assign_pqr(s, forcefield="AMBER", assign_only=True)
print(
"partial_charge" in out.atom_annotations.dtype.names,
"radius" in out.atom_annotations.dtype.names,
)
Inspect any automatic remediations or charge warnings::
from neurosnap.io.pdb import parse_pdb
from neurosnap.algos.pdb2pqr import assign_pqr
from neurosnap.io.pqr import save_pqr
structure = parse_pdb(
"tests/files/1nkp_mycmax_with_hydrogens.pdb",
return_type="ensemble",
).first()
pqr_structure = assign_pqr(structure, forcefield="AMBER", assign_only=True)
print(pqr_structure.metadata.get("pdb2pqr_remediations"))
print(pqr_structure.metadata.get("pdb2pqr_charge_warning"))
print("partial_charge" in pqr_structure.atom_annotations.dtype.names)
print("radius" in pqr_structure.atom_annotations.dtype.names)
save_pqr(pqr_structure, "test_output.pqr", include_header=True)
"""
if not isinstance(structure, Structure):
raise TypeError(f"assign_pqr() expects a Structure, found {type(structure).__name__}.")
forcefield_name = _normalize_forcefield_name(forcefield)
ffout_name = None if ffout is None else _normalize_forcefield_name(ffout)
if neutraln and forcefield_name != "parse":
raise ValueError("neutraln only works with the PARSE forcefield.")
if neutralc and forcefield_name != "parse":
raise ValueError("neutralc only works with the PARSE forcefield.")
effective_debump = bool(debump) and not assign_only
effective_optimize = bool(optimize) and not assign_only
matched_atoms, header, metadata_updates = _run_vendor_pdb2pqr(
structure,
forcefield_name=forcefield_name,
ffout_name=ffout_name,
neutraln=neutraln,
neutralc=neutralc,
assign_only=assign_only,
debump_requested=bool(debump),
optimize_requested=bool(optimize),
effective_debump=effective_debump,
effective_optimize=effective_optimize,
)
updated_structure = _build_structure_from_vendor_atoms(matched_atoms, source_metadata=structure.metadata)
updated_structure.metadata["pdb2pqr_header"] = header
updated_structure.metadata.update(metadata_updates)
return updated_structure
def _normalize_forcefield_name(forcefield: str) -> str:
if not isinstance(forcefield, str) or not forcefield:
raise ValueError("forcefield must be a non-empty string.")
normalized = forcefield.strip().lower()
if normalized not in FORCE_FIELDS:
raise ValueError(f'forcefield must be one of {", ".join(PDB2PQR_FORCE_FIELDS)}.')
return normalized
def _run_vendor_pdb2pqr(
structure: Structure,
*,
forcefield_name: str,
ffout_name: Optional[str],
neutraln: bool,
neutralc: bool,
assign_only: bool,
debump_requested: bool,
optimize_requested: bool,
effective_debump: bool,
effective_optimize: bool,
) -> Tuple[Sequence[Any], str, Dict[str, Any]]:
# These metadata entries expose automatic behavior that would otherwise be
# hard for callers to see, especially when assign_only falls back to a full
# rebuild or when input hydrogens are stripped before parameter assignment.
metadata_updates: Dict[str, Any] = {"pdb2pqr_remediations": []}
working_structure = structure
if not assign_only and _structure_has_hydrogens(working_structure):
working_structure = _strip_hydrogens(working_structure)
metadata_updates["pdb2pqr_remediations"].append("stripped_input_hydrogens")
logger.info("PDB2PQR: stripped input hydrogens before rebuilding protonation and parameters.")
try:
with _vendored_logging_context():
matched_atoms, header, run_metadata = _run_vendor_pdb2pqr_once(
working_structure,
forcefield_name=forcefield_name,
ffout_name=ffout_name,
neutraln=neutraln,
neutralc=neutralc,
assign_only=assign_only,
effective_debump=effective_debump,
effective_optimize=effective_optimize,
)
except TypeError as exc:
if not assign_only or not _is_assign_only_histidine_error(exc):
raise
fallback_structure = _strip_hydrogens(structure)
metadata_updates["pdb2pqr_remediations"].extend(
["assign_only_histidine_fallback", "stripped_input_hydrogens"]
)
logger.info("PDB2PQR: input protonation was ambiguous for assign-only mode; retrying with a full hydrogen rebuild.")
with _vendored_logging_context():
matched_atoms, header, run_metadata = _run_vendor_pdb2pqr_once(
fallback_structure,
forcefield_name=forcefield_name,
ffout_name=ffout_name,
neutraln=neutraln,
neutralc=neutralc,
assign_only=False,
effective_debump=debump_requested,
effective_optimize=optimize_requested,
)
metadata_updates.update(run_metadata)
if not metadata_updates["pdb2pqr_remediations"]:
metadata_updates.pop("pdb2pqr_remediations")
return matched_atoms, header, metadata_updates
def _run_vendor_pdb2pqr_once(
structure: Structure,
*,
forcefield_name: str,
ffout_name: Optional[str],
neutraln: bool,
neutralc: bool,
assign_only: bool,
effective_debump: bool,
effective_optimize: bool,
) -> Tuple[Sequence[Any], str, Dict[str, Any]]:
definition = vendor_io.get_definitions()
pdblist = _structure_to_vendor_pdblist(structure)
biomolecule = vendor_biomolecule.Biomolecule(pdblist, definition)
logger.info("PDB2PQR: assigning %s parameters for %d residues and %d atoms.", forcefield_name.upper(), len(biomolecule.residues), len(biomolecule.atoms))
biomolecule.set_termini(neutraln=neutraln, neutralc=neutralc)
biomolecule.update_bonds()
if assign_only:
biomolecule.set_hip()
else:
if _is_repairable(biomolecule):
biomolecule.repair_heavy()
biomolecule.update_ss_bridges()
debumper = vendor_debump.Debump(biomolecule)
if effective_debump:
debumper.debump_biomolecule()
biomolecule.add_hydrogens()
if effective_debump:
debumper.debump_biomolecule()
hydrogen_handler = vendor_hydrogens.create_handler()
hydrogen_routines = vendor_hydrogens.HydrogenRoutines(debumper, hydrogen_handler)
if effective_optimize:
hydrogen_routines.set_optimizeable_hydrogens()
biomolecule.hold_residues(None)
hydrogen_routines.initialize_full_optimization()
else:
hydrogen_routines.initialize_wat_optimization()
hydrogen_routines.optimize_hydrogens()
hydrogen_routines.cleanup()
biomolecule.set_states()
active_forcefield = vendor_forcefield.Forcefield(forcefield_name, definition, None, None)
matched_atoms, missing_atoms = biomolecule.apply_force_field(active_forcefield)
total_charge = 0.0
residue_charge_warnings = []
for residue in biomolecule.residues:
charge = residue.charge
charge_error = noninteger_charge(charge)
if charge_error:
residue_charge_warnings.append((str(residue), charge_error))
total_charge += charge
if residue_charge_warnings:
sample_count = min(3, len(residue_charge_warnings))
sample_text = "; ".join(f"{residue}: {message}" for residue, message in residue_charge_warnings[:sample_count])
if len(residue_charge_warnings) > sample_count:
sample_text += f"; +{len(residue_charge_warnings) - sample_count} more"
logger.warning("PDB2PQR found %d residues with non-integral charges: %s", len(residue_charge_warnings), sample_text)
total_charge_error = noninteger_charge(total_charge)
if total_charge_error:
logger.warning("PDB2PQR returned a non-integral total charge: %s", total_charge_error)
if ffout_name is not None:
output_forcefield = active_forcefield if ffout_name == forcefield_name else vendor_forcefield.Forcefield(ffout_name, definition, None, None)
biomolecule.apply_name_scheme(output_forcefield)
reslist, net_charge = biomolecule.charge
header = vendor_io.print_pqr_header(
biomolecule.pdblist,
missing_atoms,
reslist,
net_charge,
forcefield_name,
None,
None,
ffout_name,
include_old_header=False,
)
metadata_updates = {
"pdb2pqr_forcefield": forcefield_name.upper(),
"pdb2pqr_missing_atoms": [
{
"serial": int(atom.serial),
"atom_name": str(atom.name),
"res_name": str(atom.residue.name),
"res_id": int(atom.residue.res_seq),
"chain_id": str(atom.chain_id),
}
for atom in missing_atoms
],
}
if total_charge_error:
# Preserve the warning text in metadata so callers can surface it in logs,
# UIs, or downstream provenance without parsing logger output.
metadata_updates["pdb2pqr_charge_warning"] = total_charge_error
if ffout_name is not None:
metadata_updates["pdb2pqr_ffout"] = ffout_name.upper()
return matched_atoms, header, metadata_updates
def _structure_to_vendor_pdblist(structure: Structure):
buffer = io.StringIO()
save_pdb(structure, buffer)
buffer.seek(0)
pdblist, errlist = vendor_pdb.read_pdb(buffer)
if errlist:
logger.warning("Vendored PDB2PQR parser reported non-standard record types: %s", ", ".join(sorted(set(errlist))))
return pdblist
@contextmanager
def _vendored_logging_context():
previous_levels = []
try:
for logger_name in _VENDORED_LOGGER_NAMES:
vendored_logger = logging.getLogger(logger_name)
previous_levels.append((vendored_logger, vendored_logger.level))
vendored_logger.setLevel(logging.ERROR)
yield
finally:
for vendored_logger, previous_level in previous_levels:
vendored_logger.setLevel(previous_level)
def _structure_has_hydrogens(structure: Structure) -> bool:
if len(structure) == 0:
return False
elements = np.char.upper(np.char.strip(structure.atom_annotations["element"].astype("U2")))
return bool(np.any(elements == "H"))
def _strip_hydrogens(structure: Structure) -> Structure:
if not _structure_has_hydrogens(structure):
return structure
elements = np.char.upper(np.char.strip(structure.atom_annotations["element"].astype("U2")))
keep_mask = elements != "H"
kept_indices = np.flatnonzero(keep_mask)
index_map = {int(old_index): new_index for new_index, old_index in enumerate(kept_indices)}
stripped = Structure(remove_annotations=False)
stripped.metadata = dict(structure.metadata)
stripped.atoms = structure.atoms[keep_mask].copy()
stripped.atom_annotations = structure.atom_annotations[keep_mask].copy()
bond_rows = []
for bond in structure.bonds:
atom_i = int(bond["atom_i"])
atom_j = int(bond["atom_j"])
if atom_i not in index_map or atom_j not in index_map:
continue
bond_rows.append((index_map[atom_i], index_map[atom_j], int(bond["bond_type"])))
if bond_rows:
stripped.bonds = np.array(bond_rows, dtype=structure._dtype_bond)
else:
stripped.bonds = np.zeros(0, dtype=structure._dtype_bond)
return stripped
def _is_assign_only_histidine_error(exc: Exception) -> bool:
message = str(exc)
return "Missing both HD1 and HE2 atoms" in message and "assign-only" in message
def _is_repairable(biomolecule) -> bool:
num_heavy = biomolecule.num_heavy
num_missing = biomolecule.num_missing_heavy
if num_heavy == 0:
raise ValueError("No biomolecule heavy atoms were found by the vendored PDB2PQR engine.")
if num_missing == 0:
return False
missing_fraction = float(num_missing) / float(num_heavy)
if missing_fraction > REPAIR_LIMIT:
raise ValueError(
f"This structure is missing too many heavy atoms for repair ({num_missing} of {num_heavy}, fraction={missing_fraction:g}; limit={REPAIR_LIMIT:g})."
)
return True
def _build_structure_from_vendor_atoms(vendor_atoms: Sequence[Any], *, source_metadata: Dict[str, Any]) -> Structure:
structure = Structure(remove_annotations=False)
structure.metadata = dict(source_metadata)
structure.metadata["source_format"] = "pqr"
atom_count = len(vendor_atoms)
structure.atoms = np.zeros(atom_count, dtype=structure._dtype_atoms)
structure.atom_annotations = np.zeros(atom_count, dtype=structure._dtype_atom_annotations)
partial_charges = np.zeros(atom_count, dtype=np.float32)
radii = np.zeros(atom_count, dtype=np.float32)
bond_pairs = set()
atom_to_index = {id(atom): atom_index for atom_index, atom in enumerate(vendor_atoms)}
for atom_index, atom in enumerate(vendor_atoms):
structure.atoms["x"][atom_index] = float(atom.x)
structure.atoms["y"][atom_index] = float(atom.y)
structure.atoms["z"][atom_index] = float(atom.z)
structure.atom_annotations["chain_id"][atom_index] = atom.chain_id or ""
structure.atom_annotations["res_id"][atom_index] = int(atom.res_seq)
structure.atom_annotations["ins_code"][atom_index] = atom.ins_code or ""
structure.atom_annotations["res_name"][atom_index] = atom.res_name or ""
structure.atom_annotations["hetero"][atom_index] = bool(atom.type == "HETATM")
structure.atom_annotations["atom_name"][atom_index] = atom.name or ""
structure.atom_annotations["element"][atom_index] = _infer_element(atom.element, atom.name)
structure.atom_annotations["atom_id"][atom_index] = int(atom.serial or (atom_index + 1))
structure.atom_annotations["b_factor"][atom_index] = float(atom.temp_factor) if atom.temp_factor is not None else 0.0
structure.atom_annotations["occupancy"][atom_index] = float(atom.occupancy) if atom.occupancy is not None else 1.0
structure.atom_annotations["charge"][atom_index] = _normalize_integer_charge(getattr(atom, "charge", None))
structure.atom_annotations["sym_id"][atom_index] = ""
partial_charges[atom_index] = float(atom.ffcharge) if atom.ffcharge is not None else 0.0
radii[atom_index] = float(atom.radius) if atom.radius is not None else 0.0
for bonded_atom in getattr(atom, "bonds", []):
bonded_index = atom_to_index.get(id(bonded_atom))
if bonded_index is None or bonded_index == atom_index:
continue
bond_pairs.add((min(atom_index, bonded_index), max(atom_index, bonded_index)))
if bond_pairs:
structure.bonds = np.array([(atom_i, atom_j, 1) for atom_i, atom_j in sorted(bond_pairs)], dtype=structure._dtype_bond)
else:
structure.bonds = np.zeros(0, dtype=structure._dtype_bond)
structure.add_annotation("partial_charge", np.float32, values=partial_charges)
structure.add_annotation("radius", np.float32, values=radii)
structure._remove_empty_annotations()
return structure
def _infer_element(raw_element: Any, atom_name: Any) -> str:
element = "" if raw_element is None else str(raw_element).strip().upper()
if element in ATOMIC_MASSES:
return element
atom_name_text = "" if atom_name is None else str(atom_name).strip()
if not atom_name_text:
return ""
candidate = atom_name_text[:2].strip().title()
if candidate in ATOMIC_MASSES:
return candidate.upper()
candidate = atom_name_text[:1].strip().title()
if candidate in ATOMIC_MASSES:
return candidate.upper()
return ""
def _normalize_integer_charge(value: Any) -> int:
if value is None:
return 0
if isinstance(value, (int, np.integer)):
return int(value)
text = str(value).strip()
if not text:
return 0
try:
return int(float(text))
except ValueError:
sign = -1 if text.endswith("-") else 1
digits = "".join(char for char in text if char.isdigit())
if digits:
return sign * int(digits)
return 0