#!venv/bin/env python3
"""
slipper_fitter/CobblersWorkshop.py
This module contains the SlipperFitter class.
"""
from typing import (List, Dict, Tuple, Union, Optional)
from fragmenstein import Laboratory, Wictor, Igor
import pandas as pd
from fragmenstein.laboratory.validator import place_input_validator
import os, logging
import time
from syndirella.slipper import intra_geometry, flatness
import syndirella.fairy as fairy
import datetime
from syndirella.error import *
import json
import syndirella.slipper._placement_data as placement_data
[docs]
class SlipperFitter:
"""
This class is instantiated to place all the products in the template using hits_path
"""
def __init__(self,
template_path: str,
hits_path: str,
hits_names: List[str],
output_dir: str,
id: str | None = None,
route_uuid: str | None = None,
scaffold_placements: Dict[Chem.Mol, str | None] = None):
[docs]
self.template_path: str = template_path
[docs]
self.hits_path: str = hits_path
[docs]
self.hits_names: List[str] = hits_names
[docs]
self.output_dir: str = output_dir
[docs]
self.id: str | None = id
[docs]
self.route_uuid: str | None = route_uuid
[docs]
self.scaffold_placements: Dict[Chem.Mol, str | None] = scaffold_placements
[docs]
self.atom_diff_min: int = 0
[docs]
self.atom_diff_max: int = 10
[docs]
self.final_products_csv_path: str | None = None
[docs]
self.final_products_pkl_path: str | None = None
[docs]
self.batch_num: int | None = None
[docs]
self.final_products: pd.DataFrame | None = None
[docs]
self.placements: pd.DataFrame | None = None
[docs]
self.merged_placements: pd.DataFrame | None = None
[docs]
self.output_path = None
# Placements variables set
[docs]
self.timeout: int = 240
[docs]
self.logger = logging.getLogger(f"{__name__}")
# variables for output
[docs]
self.num_placed: int | None = None
[docs]
self.num_successful: int | None = None
[docs]
def fit_products(self):
"""
Main entry to the SlipperFitter class. This function is used to fit the products to the final library.
"""
self.logger.info('Fitting products, warning: it is expected that the scaffold compound has already passed '
'minimisation and intramolecular checks.')
input_df: pd.DataFrame = self.prep_products()
placements = self.place_products(input_df)
self.placements = self.check_intra_geometry(placements)
self.format_placements()
self.merged_placements = self.merge_placements()
return self.merged_placements
[docs]
def check_scaffold(self,
scaffold: Chem.Mol,
scaffold_name: str,
scaffold_place_num: int) -> str | None:
"""
Checks if the scaffold can be minimised (no specific stereoisomer) and passes intermolecular checks.
If it cannot be minimised after 3 attempts, returns False.
"""
input_df: pd.DataFrame = self._prep_scaffold_input_df(scaffold=scaffold, scaffold_name=scaffold_name)
id = fairy.generate_inchi_ID(Chem.MolToSmiles(scaffold, isomericSmiles=False))
output_path: str = os.path.join(self.output_dir, f'{id}-scaffold-check')
lab: Laboratory = self.setup_Fragmenstein(output_path)
for attempt in range(1, scaffold_place_num + 1):
scaffold_placed: Chem.Mol = self._place_scaffold(lab, input_df) # None if not minimised
if scaffold_placed is not None:
paths = [os.path.join(output_path, 'output', scaffold_name, f'{scaffold_name}.minimised.mol'),
os.path.join(output_path, scaffold_name, f'{scaffold_name}.minimised.mol')] # could be two output paths...
path_exists = [os.path.exists(path) for path in paths]
if not any(path_exists):
self.logger.info(f'Scaffold could not be minimised. Attempt {attempt} of {scaffold_place_num}.')
continue
geometries: Dict = intra_geometry.check_geometry(scaffold_placed,
threshold_clash=0.4) # increasing threshold for internal clash
flat_results: Dict = flatness.check_flatness(scaffold_placed)
if self._check_intra_geom_flatness_results(geometries=geometries, flat_results=flat_results):
self.logger.info(f'Scaffold minimised and passed intramolecular checks.')
return paths[0] if path_exists[0] else paths[1]
else:
self.logger.info(f'Scaffold could not pass intramolecular checks. Attempt {attempt} of {scaffold_place_num}.')
else:
self.logger.info(f'Scaffold could not be minimised. Attempt {attempt} of {scaffold_place_num}.')
return None
[docs]
def _check_intra_geom_flatness_results(self,
geometries: Dict,
flat_results: Dict) -> bool:
"""
This function checks the intramolecular geometry and flatness results and returns True if all are True.
"""
if not geometries['results']['bond_lengths_within_bounds']:
#self.logger.info('Mol could not pass bond length checks.')
return False
if not geometries['results']['bond_angles_within_bounds']:
#self.logger.info('Mol could not pass bond angle checks.')
return False
if not geometries['results']['no_internal_clash']:
#self.logger.info('Mol could not pass internal clash checks.')
return False
if not flat_results['results']['flatness_passes']:
#self.logger.info('Mol could not pass flatness checks.')
return False
return True
[docs]
def _get_scaffold(self, input_df: pd.DataFrame) -> Chem.Mol:
"""
Get scaffold compound as mol object from the input_df.
"""
# get flat scaffold smiles
scaffold_smiles: str = input_df.loc['scaffold' in input_df['name'], 'smiles'].values[0]
scaffold_mol: Chem.Mol = Chem.MolFromSmiles(scaffold_smiles)
return scaffold_mol
[docs]
def prep_products(self) -> pd.DataFrame:
"""
This function is used to prepare the inputs for Fragmenstein.
"""
# input_df is a copy of final_products but removing duplicates of names and other metadata from synthesizer step
input_df: pd.DataFrame = self._prepare_input_df()
input_df = self.add_hits(input_df)
# add typing to columns
input_df = input_df.astype({'name': str, 'smiles': str})
# Check if there are any duplicates in the input_df
if input_df.duplicated(subset='name').any():
self.logger.critical('There are duplicates of names in the scaffold dataframe to place.')
raise PlacementError(message='There are duplicates of names in the scaffold dataframe to place.',
inchi=self.id,
route_uuid=self.route_uuid)
return input_df
[docs]
def _print_diff(self,
orig_df: pd.DataFrame,
input_df: pd.DataFrame,
verb: str = None):
"""
This function is used to print the difference between the original number of analogues and the number of
valid analogues.
"""
if len(input_df) > len(orig_df):
self.logger.error("Problem with finding unique analogues. There are more than were in the original list of "
"analogues.")
percent = round(((len(input_df) / len(orig_df)) * 100), 2)
self.logger.info(f'{verb} {len(input_df)} ({percent}%) enumerated analogue stereoisomers out of {len(orig_df)} '
f'enumerated analogue stereoisomers.')
[docs]
def add_hits(self, input_df: pd.DataFrame) -> pd.DataFrame:
"""
This function adds:
hits_path as mol objects to input_df['hits']
"""
# load hits_path either from mol or sdf
if os.path.splitext(self.hits_path)[1] == '.mol':
self.logger.info('Loading hits from a mol file.')
hits: List[Chem.Mol] = [Chem.MolFromMolFile(self.hits_path.strip())] # single hit
else:
with Chem.SDMolSupplier(self.hits_path.strip()) as sd:
hits: List[Chem.Mol] = list(sd) # many hits
# only get hits that exactly match the hit_name in the hits_names
hits = [hit for hit in hits for hit_name in self.hits_names if hit.GetProp('_Name') == hit_name]
input_df['hits'] = input_df.apply(lambda row: hits, axis=1)
return input_df
[docs]
def _place_scaffold(self,
lab: Laboratory,
input_df: pd.DataFrame) -> Chem.Mol:
"""
Places the scaffold compound, returns the mol object of the scaffold compound if successful else None.
"""
placements: pd.DataFrame = lab.place(place_input_validator(input_df), n_cores=self.n_cores,
timeout=self.timeout)
try:
scaffold: Chem.Mol = placements.at[0, 'minimized_mol']
except KeyError:
self.logger.critical('Scaffold could not be minimised.')
raise ScaffoldPlacementError(message='Scaffold could not be minimised.',
inchi=self.id,
route_uuid=self.route_uuid)
return scaffold
[docs]
def place_products(self, input_df: pd.DataFrame) -> pd.DataFrame:
"""
This function places products using Fragmenstein.
"""
start_time = time.time() # Start timing
# set up Wictor
self.output_path: str = os.path.join(self.output_dir, 'output')
lab: Laboratory = self.setup_Fragmenstein(self.output_path)
placements: pd.DataFrame = lab.place(place_input_validator(input_df),
n_cores=self.n_cores,
timeout=self.timeout)
end_time = time.time() # End timing
elapsed_time = end_time - start_time # Calculate elapsed time
self.logger.info(f"Placing {len(input_df)} run time: {datetime.timedelta(seconds=elapsed_time)}")
self.num_placed = len(input_df)
return placements
[docs]
def check_intra_geometry(self,
placements: pd.DataFrame) -> pd.DataFrame:
"""
Checks the intramolecular geometry and flatness of double bonds and aromatic rings
of each mol object using PoseBuster's implemented checks.
Checks:
Bond lengths
The bond lengths in the input molecule are within 0.75 of the lower and 1.25 of the upper bounds determined by distance geometry
Bond angles
The angles in the input molecule are within 0.75 of the lower and 1.25 of the upper bounds determined by distance geometry
Planar aromatic rings
All atoms in aromatic rings with 5 or 6 members are within 0.25 Å of the closest shared plane
Planar double bonds
The two carbons of aliphatic carbon–carbon double bonds and their four neighbours are within 0.25 Å of the closest shared plane
Internal steric clash
The interatomic distance between pairs of non-covalently bound atoms is above 0.7 of the lower bound determined by distance geometry
"""
# get list of mol objects
mols: List[Chem.Mol] = placements['minimized_mol'].tolist()
if len(mols) != len(placements):
self.logger.warning("There are missing mol objects in the placements.")
intra_geometry_results: List[bool] = []
flatness_results: List[bool] = []
for mol in mols:
if mol is None:
intra_geometry_results.append(None)
flatness_results.append(None)
else:
geometries: Dict = intra_geometry.check_geometry(mol, threshold_clash=0.4) # increasing threshold for internal clash
flat_results: Dict = flatness.check_flatness(mol)
intra_geometry_results.append(self._check_intra_geom_flatness_results(geometries=geometries, flat_results=flat_results))
# get list of pass output from intra_geometry for each mol
placements['intra_geometry_pass'] = intra_geometry_results
return placements
[docs]
def setup_Fragmenstein(self,
output_path: str) -> Laboratory:
"""
This function sets up Fragmenstein to run.
"""
# Using Wictor to place just by RDKit minimisation
# make output directory
os.makedirs(output_path, exist_ok=True)
Wictor.work_path = output_path
os.chdir(output_path) # this does the trick
Wictor.monster_throw_on_discard = True # stop this merger if a fragment cannot be used.
Wictor.monster_joining_cutoff = 5 # Å
Wictor.quick_reanimation = False # for the impatient
Wictor.error_to_catch = Exception # stop the whole laboratory otherwise
Wictor.enable_stdout(logging.CRITICAL)
# Wictor.enable_logfile(os.path.join(self.output_path, f'fragmenstein.log'), logging.ERROR)
Laboratory.Victor = Wictor
Igor.init_pyrosetta()
with open(self.template_path) as fh:
pdbblock: str = fh.read()
lab = Laboratory(pdbblock=pdbblock,
covalent_resi=None,
run_plip=False)
return lab
[docs]
def fix_intxns(self):
intxn_names: List[tuple] = [c for c in self.placements.columns if isinstance(c, tuple)]
for intxn_name in intxn_names:
self.placements[intxn_name] = self.placements[intxn_name].fillna(0).astype(int)
[docs]
def get_scaffold_check_values(self, scaffold_path: str) -> Tuple[float, float, float, float]:
"""
This function gets the ∆∆G, comRMSD, ∆G_bound and ∆G_unbound values from the placement done by scaffold check.
"""
json_file_path = str.replace(scaffold_path, '.mol', '.json')
if not os.path.exists(json_file_path):
self.logger.error(f'Could not find json file for scaffold placement in {scaffold_path}.')
raise PlacementError(message=f'Could not find json file for scaffold placement in {scaffold_path}.',
inchi=self.id,
route_uuid=self.route_uuid)
with open(json_file_path, 'r') as file:
data = json.load(file)
comRMSD = data['mRMSD']
ddG = placement_data.get_delta_delta_G(data)
bound, unbound = placement_data.get_bound_unbound(data)
return ddG, comRMSD, bound, unbound
[docs]
def add_paths_to_placements(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Adds the absolutes paths to the .mol files.
"""
if self.output_path is None:
self.logger.error(f'Output path not set for placements of {self.id}.')
raise PlacementError(message=f'Output path not set for placements of {self.id}.',
inchi=self.id,
route_uuid=self.route_uuid)
df['paths_to_mols'] = df.apply(lambda x: [os.path.join(self.output_path, x['name'], f'{x["name"]}.minimised.mol'),
os.path.join(self.output_path, 'output', x['name'], f'{x["name"]}.minimised.mol')], axis=1)
# Add only the first path that exists, or None if neither path exists
df['path_to_mol'] = df['paths_to_mols'].apply(
lambda paths: next((path for path in paths if os.path.exists(path)), None))
df.drop(columns=['paths_to_mols'], inplace=True)
return df
[docs]
def merge_placements(self) -> pd.DataFrame:
"""
This function merges the fragmenstein output with products csv.
"""
# Define the columns to drop
columns_to_drop = ['smiles', 'binary_hits', 'mode', 'disregarded', 'unmin_binary', 'min_binary',
'hit_binaries', 'unminimized_mol', 'minimized_mol', 'hit_mols', 'hit_names']
# Filter out the columns that do not exist in merged_placements
columns_to_drop = [col for col in columns_to_drop if col in self.placements.columns]
# Drop the columns if they exist
self.placements = self.placements.drop(columns=columns_to_drop)
# merge on name
merged_placements = pd.merge(self.final_products, self.placements, on='name', how='left')
return merged_placements
[docs]
def save_placements(self, id: str, route_uuid: str):
"""
This function saves the placements as a .pkl.gz and .csv file.
"""
self.placements.to_pickle(os.path.join(self.output_dir,
f'{id}_{route_uuid}_fragmenstein_placements.pkl.gz'))
self.placements.to_csv(os.path.join(self.output_dir,
f'{id}_{route_uuid}_fragmenstein_placements.csv'))
# get final name same as final products csv
merged_placements_csv_path = self.final_products_csv_path.split('.')[0] + '_placements.csv'
merged_placements_pkl_path = self.final_products_pkl_path.split('.')[0] + '_placements.pkl.gz'
self.merged_placements.to_csv(merged_placements_csv_path)
self.merged_placements.to_pickle(merged_placements_pkl_path)