#!venv/bin/env python3
"""
slipper_synthesizer/CobblersWorkshop.py
This module contains the SlipperSynthesizer class.
"""
import logging
import os
from typing import (List, Dict, Tuple)
import pandas as pd
from rdkit import DataStructs
from rdkit.Chem import rdFMCS
from rdkit.Chem.EnumerateStereoisomers import EnumerateStereoisomers, StereoEnumerationOptions
import syndirella.fairy as fairy
from syndirella.error import *
from syndirella.route.Library import Library
from syndirella.slipper.slipper_synthesizer.Labeler import Labeler
[docs]
class SlipperSynthesizer:
"""
This class is used to perform the whole process of finding products of the analogues of reactants.
Since the final elaborated products are 'slippers' in this analogy, the SlipperSynthesizer
is where these slippers are made.
This is supposed to be instantiated for each step in the route.
"""
def __init__(self,
library: Library,
output_dir: str,
atom_ids_expansion: dict = None,
additional_info: dict = None):
[docs]
self.route_uuid: str = library.route_uuid
[docs]
self.output_dir = output_dir
[docs]
self.analogues_dataframes_to_react: Dict[str, pd.DataFrame] = {}
[docs]
self.analogue_columns: List[str] = None
[docs]
self.products: pd.DataFrame = None
[docs]
self.reactant_combinations: pd.DataFrame = None
[docs]
self.final_products_pkl_path: str = None
[docs]
self.final_products_csv_path: str = None
[docs]
self.atom_ids_expansion: dict = atom_ids_expansion
[docs]
self.additional_info = additional_info
[docs]
self.current_step: int = library.current_step
[docs]
self.num_steps: int = library.num_steps
[docs]
self.logger = logging.getLogger(f"{__name__}")
[docs]
self.atom_diff_min: int = self.library.atom_diff_min
[docs]
self.atom_diff_max: int = self.library.atom_diff_max
# variables for output
[docs]
self.num_unique_products: int = 0
[docs]
self.num_products_enumstereo: int = 0
[docs]
def get_products(self) -> pd.DataFrame:
"""
This function is used to find the products of the analogues of reactants. It is the main function that is
called.
"""
# Check if products already exist
if self.check_product_pkl_exists():
self.load_products()
return self.products
# Filter analogues and cut if too many
self.filter_analogues()
if len(self.analogues_dataframes_to_react) == 1:
self.products = self.get_products_from_single_reactant()
return self.products
# Get cartesian scaffold of all analogues
self.reactant_combinations: pd.DataFrame = self.combine_analogues()
# Find products by applying reaction
self.products: pd.DataFrame = self.find_products_from_reactants()
return self.products
[docs]
def check_product_pkl_exists(self):
"""
This function checks if the products pkl already exists and if so it loads it.
"""
pkl_name = (f"{self.library.id}_{self.library.route_uuid}_{self.library.reaction.reaction_name}_products_"
f"{self.library.current_step}of{self.library.num_steps}.pkl")
if self.library.num_steps != self.library.current_step:
pkl_path = os.path.join(self.output_dir, 'extra', pkl_name)
if os.path.exists(pkl_path):
self.logger.info(f"Products already exist at {pkl_path}. "
f"Loading from file...")
return True
else:
final_pkl_path = os.path.join(self.output_dir, pkl_name)
if os.path.exists(final_pkl_path):
self.logger.info(f"Products already exist at {final_pkl_path}. "
f"Loading from file...")
return True
return False
[docs]
def load_products(self):
"""
This function loads the scaffold .pkl file.
"""
pkl_name = (f"{self.library.id}_{self.library.route_uuid}_{self.library.reaction.reaction_name}_products_"
f"{self.library.current_step}of{self.library.num_steps}.pkl")
if self.library.num_steps != self.library.current_step:
self.products = pd.read_pickle(f"{self.output_dir}/extra/{pkl_name}")
else:
self.products = pd.read_pickle(f"{self.output_dir}/{pkl_name}")
# Check if any 'Unnamed' columns and remove them
unnamed_columns = [col for col in self.products.columns if 'Unnamed' in col]
if len(unnamed_columns) > 0:
self.products.drop(unnamed_columns, axis=1, inplace=True)
self.logger.info(f"Loaded {len(self.products)} products.")
[docs]
def filter_analogues(self):
"""
This function is used to go through the analogue dataframes, passing them to filter_analogues_on_smarts and
also ordering by metrics.
Finally it filters the analogues by number, making sure there aren't too many for an obscene number of products.
"""
for key, value in self.library.analogues_dataframes.items():
reactant_prefix = key
df: pd.DataFrame = value[0]
analogue_columns: Tuple[str, str] = value[1]
self.analogue_columns = [column for column in analogue_columns]
df = self.filter_analogues_on_smarts(df, analogue_columns, reactant_prefix)
df = self.order_analogues(df, reactant_prefix)
self.analogues_dataframes_to_react[key] = df
# Filters analogue df by size, shortens if necessary
self.filter_analogues_by_size()
[docs]
def order_analogues(self, df: pd.DataFrame, reactant_prefix: str) -> pd.DataFrame:
"""
This function is used to order the analogues dataframes by num atom diff to scaffold reactant of scaffold compound,
number of reactant matches found, and lead time.
"""
self.logger.info(f"Ordering analogues of {reactant_prefix} before finding products...")
# Add num_atom_diff to scaffold reactant, which is the first reactant
smarts_index: int = int(reactant_prefix[-1])
base_reactant: Chem.Mol = self.library.reaction.matched_smarts_index_to_reactant[smarts_index][0]
df[f'{reactant_prefix}_num_atom_diff'] = (
df[f"{reactant_prefix}_mol"].apply(lambda x: self.calc_num_atom_diff_absolute(base_reactant, x)))
# get columns to sort by
ordered_columns = [f'{reactant_prefix}_num_atom_diff', 'num_matches']
matching_columns = []
# Iterate over each substring in the order of preference
for substring in ordered_columns:
# Find and append columns that contain the current substring
matching_columns.extend([col for col in df.columns if substring in col])
# sort column order
df = df.sort_values(by=matching_columns, ascending=[True, True])
df.reset_index(drop=True, inplace=True)
return df
[docs]
def filter_analogues_on_smarts(self, df: pd.DataFrame, analogue_columns: Tuple[str, str], reactant_prefix: str) \
-> pd.DataFrame:
"""
This function is used to filter the analogues of reactants dataframes to make sure each analogue contains the
SMARTS pattern of the original reactant. If the SMARTS pattern of the other reactant is found as well, it is flagged.
"""
self.logger.info('Filtering analogues of reactants on SMARTS...')
orig_df = df.copy()
# add flag to rows with both 'r1' and 'r2' true
df.loc[df[analogue_columns[0]] & df[
analogue_columns[1]], 'flag'] = 'selectivity_issue_contains_reaction_atoms_of_both_reactants'
# only keep rows with original analogue_prefix true
orig_r_column = [col for col in analogue_columns if reactant_prefix in col][0]
df = df[df[orig_r_column]]
df.reset_index(drop=True, inplace=True)
num_filtered = len(orig_df) - len(df)
percent_diff = round((num_filtered / len(orig_df)) * 100, 2)
self.logger.info(f'Filtered {num_filtered} rows ({percent_diff}%) from {reactant_prefix} dataframe.')
if len(df) == 0:
self.logger.critical(f"All reactants were filtered for {reactant_prefix}. No products will be found.")
raise NoReactants(message=f"All reactants were filtered for {reactant_prefix}. No products will be found.",
route_uuid=self.route_uuid,
mol=self.library.reaction.scaffold)
return df
[docs]
def filter_analogues_by_size(self):
"""
This function is used to filter the analogues dataframes by length. Need to make sure the final combination
is less than 10,000.
If longer than 10,000, will just take the head with length of the square root of 10,000 (100).
"""
# before anything cut analogues based on min and max num_atom_diff values
for key, df in self.analogues_dataframes_to_react.items():
self.logger.info(f'{key}: Filtering reactants for by number of atoms difference to original reactant.'
f' Keeping only those with {self.atom_diff_min} <= num_atom_diff <= {self.atom_diff_max}.')
new_df = df[(df[f'{key}_num_atom_diff'] >= self.atom_diff_min) &
(df[f'{key}_num_atom_diff'] <= self.atom_diff_max)]
self.analogues_dataframes_to_react[key] = new_df
percent = round(((len(new_df) / len(df)) * 100), 2)
self.logger.info(f'Kept {len(new_df)} ({percent}%) valid products out of {len(df)} '
f'reactants.')
if len(self.analogues_dataframes_to_react) < 2:
if len(list(self.analogues_dataframes_to_react.values())[0]) > 10000:
self.logger.info(f"Too many analogues for {list(self.analogues_dataframes_to_react.keys())[0]}.")
self.analogues_dataframes_to_react[list(self.analogues_dataframes_to_react.keys())[0]] = (
self.analogues_dataframes_to_react[list(self.analogues_dataframes_to_react.keys())[0]].head(10000))
return
max_allowed_size = 10000
lengths: List[int] = [len(df) for df in self.analogues_dataframes_to_react.values()]
product_of_lengths = lengths[0] * lengths[1]
if product_of_lengths <= max_allowed_size:
return # No need to filter
max_length_each = int(max_allowed_size ** 0.5) # Taking the square root will give an approximation
if lengths[0] > max_length_each and lengths[1] <= max_length_each:
# Cut the first dataframe
analogue_prefix = list(self.analogues_dataframes_to_react.keys())[0]
self.logger.info(f"Too many analogues for {analogue_prefix}.")
analogue_df = self.analogues_dataframes_to_react[analogue_prefix]
shortened_analogue_df = self.cut_analogues(analogue_df, max_length_each, analogue_prefix)
self.analogues_dataframes_to_react[analogue_prefix] = shortened_analogue_df
elif lengths[1] > max_length_each and lengths[0] <= max_length_each:
# Cut the second dataframe
analogue_prefix = list(self.analogues_dataframes_to_react.keys())[1]
self.logger.info(f"Too many analogues for {analogue_prefix}.")
analogue_df = self.analogues_dataframes_to_react[analogue_prefix]
shortened_analogue_df = self.cut_analogues(analogue_df, max_length_each, analogue_prefix)
self.analogues_dataframes_to_react[analogue_prefix] = shortened_analogue_df
else:
# Cut both dataframes to max_length_each
self.logger.info(f"Too many analogues for both reactants.")
for key in self.analogues_dataframes_to_react.keys():
self.analogues_dataframes_to_react[key] = self.cut_analogues(
self.analogues_dataframes_to_react[key],
max_length_each, key)
[docs]
def cut_analogues(self, df: pd.DataFrame, max_length_each: int, analogue_prefix: int) -> pd.DataFrame:
"""
This function is used to cut the analogues dataframes to max_length_each by just taking the head.
"""
self.logger.info(f"Cutting {len(df) - max_length_each} analogues from "
f"{analogue_prefix} dataframe.")
return df.head(max_length_each)
[docs]
def cluster_analogues(self, df: pd.DataFrame, max_length_each: int, analogue_prefix: int) -> pd.DataFrame:
"""
This function is used to cluster the analogues dataframes to max_length_each by k-means clustering.
The number of clusters is the number max length each. Might be too much...
"""
self.logger.info(f"K-means clustering and sampling {len(df) - max_length_each} analogues from "
f"r{analogue_prefix} dataframe.")
# cluster
df = self.cluster_analogues_on_fingerprint(df)
# sample
df = self.sample_analogues(df, max_length_each)
return df
# Combine 'flag' columns
[docs]
def combine_flags(self, row) -> Tuple[str] | None:
flags = []
if pd.notna(row['flag_x']):
flags.append(row['flag_x'])
if pd.notna(row['flag_y']):
flags.append(row['flag_y'])
flags = tuple(flags) # make sure it's hashable
return flags if flags else None
[docs]
def combine_analogues(self):
"""
This function is used to combine the analogues of reactants into 1 dataframe that the products are found from.
"""
if len(self.analogues_dataframes_to_react) < 2:
return list(self.analogues_dataframes_to_react.values())[0]
# Get all the analogues dataframes
r1 = self.analogues_dataframes_to_react['r1']
r2 = self.analogues_dataframes_to_react['r2']
combinations = pd.MultiIndex.from_product([r1.index, r2.index], names=['r1', 'r2']).to_frame(index=False)
# before merging drop analogue_columns
r1.drop(self.analogue_columns, axis=1, inplace=True)
r2.drop(self.analogue_columns, axis=1, inplace=True)
# merge indicies with original dataframes
combinations = combinations.merge(r1, left_on='r1', right_index=True)
combinations = combinations.merge(r2, left_on='r2', right_index=True)
# drop extra columns
combinations.drop(['r1', 'r2'], axis=1, inplace=True)
combinations.reset_index(drop=True, inplace=True)
# add flag
if 'flag' in r1.columns and 'flag' in r2.columns:
combinations['flag']: Tuple[str] | None = combinations.apply(self.combine_flags, axis=1)
elif 'flag' in r1.columns:
combinations['flag'] = combinations['flag_x']
elif 'flag' in r2.columns:
combinations['flag'] = combinations['flag_y']
# Drop intermediate 'flag_x' and 'flag_y' columns if they exist
combinations.drop(['flag_x', 'flag_y'], axis=1, inplace=True, errors='ignore')
# make sure there are no repeats
combinations.drop_duplicates(inplace=True)
return combinations
[docs]
def find_products_from_reactants(self) -> pd.DataFrame:
"""
This function is used to find the products of the reactant combinations.
"""
# Apply reaction to reactant combinations
products: pd.DataFrame = self.reactant_combinations.apply(self.apply_reaction, axis=1)
try:
products = products.explode('combined').reset_index(drop=True) # multiple products from one combination
new_columns = pd.DataFrame(products['combined'].tolist(), columns=['smiles', 'num_atom_diff'],
index=products.index)
products[['smiles', 'num_atom_diff']] = new_columns
products.drop('combined', axis=1, inplace=True)
except ValueError as e:
self.logger.critical(e.args[0])
raise ProductFormationError(message=e.args[0],
mol=self.library.reaction.scaffold,
route_uuid=self.route_uuid)
# Filter products
products = self.filter_products(products)
# Add metadata
products = self.add_metadata(products)
# Enumerate stereoisomers.
all_products = self.enumerate_stereoisomers(products)
if self.num_steps == self.current_step:
self.logger.info(f"Found {len(set(list(all_products['name'])))} unique products.")
return all_products
[docs]
def get_products_from_single_reactant(self) -> pd.DataFrame:
"""
This function gets the products from a single reactant (like deprotections).
"""
reactant = list(self.analogues_dataframes_to_react.values())[0]
products: pd.DataFrame = reactant.apply(self.apply_reaction_single, axis=1)
try:
products = products.explode('combined').reset_index(drop=True)
# Attempt to split the 'combined' column
new_columns = pd.DataFrame(products['combined'].tolist(), columns=['smiles', 'num_atom_diff'],
index=products.index)
products[['smiles', 'num_atom_diff']] = new_columns
products.drop('combined', axis=1, inplace=True)
except ValueError as e:
self.logger.critical(e.args[0])
raise ProductFormationError(message=e.args[0],
mol=self.library.reaction.scaffold,
route_uuid=self.route_uuid)
# Filter products
products = self.filter_products(products)
# Add metadata
products = self.add_metadata(products)
# Enumerate stereoisomers.
all_products = self.enumerate_stereoisomers(products)
self.logger.info(f"Found {len(set(list(all_products['name'])))} unique products.")
return all_products
[docs]
def apply_reaction_single(self, row) -> pd.Series:
"""
For mono-molecular reactions:
This function applies the original reaction to each row of the reactant combinations dataframe. Can return
multiple products.
"""
if self.library.current_step == self.library.num_steps: # calculate difference if final step
calc_difference: bool = True
else:
calc_difference: bool = False
reaction: Chem.rdChemReactions = self.library.reaction.reaction_pattern
r1: str = row['r1_mol']
products = reaction.RunReactants((r1,))
flags = row['flag'] if isinstance(row['flag'], list) else []
if len(products) == 0:
self.logger.info("No products found.")
row['flag'] = flags if flags else None
row['combined'] = [(None, None)]
elif len(products) > 1 or len(products[0]) > 1:
# check if all products can be sanitized and keep unique ones, only keep the ones that can be sanitized
# and are unique
row_smiles = []
row_num_atom_diff = []
for product in products:
if self.can_be_sanitized(product[0]):
row_smiles.append(Chem.MolToSmiles(product[0]))
if calc_difference:
row_num_atom_diff.append(
self.calc_num_atom_diff_absolute(self.library.reaction.scaffold, product[0]))
else:
row_num_atom_diff.append(None)
if len(row_smiles) > 1: # if more than 1 scaffold can be sanitized then flag
if 'one_of_multiple_products' not in flags:
flags.append('one_of_multiple_products')
row['combined'] = list(zip(row_smiles, row_num_atom_diff))
row['flag'] = flags if flags else None
else:
product = products[0][0]
if self.can_be_sanitized(product):
base = self.library.reaction.scaffold
if calc_difference:
num_atom_diff = self.calc_num_atom_diff_absolute(base, product)
else:
num_atom_diff = None
row['flag'] = flags if flags else None
row['combined'] = [(Chem.MolToSmiles(product), num_atom_diff)]
return row
[docs]
def apply_reaction(self, row) -> pd.Series:
"""
For bimolecular reactions:
This function applies the original reaction to each row of the reactant combinations dataframe. Checks to return
only products that are sanitized.
"""
if self.library.current_step == self.library.num_steps: # calculate difference only if final step
calc_difference: bool = True
else:
calc_difference: bool = False
reaction: Chem.rdChemReactions = self.library.reaction.reaction_pattern
r1: str = row['r1_mol']
r2: str = row['r2_mol']
flags = list(row['flag']) if isinstance(row['flag'], tuple) else [] # turn into list to append to
products = reaction.RunReactants((r1, r2))
if len(products) == 0:
row['flag'] = flags if flags else None
row['combined'] = [(None, None)]
elif len(products) > 1 or len(
products[0]) > 1: # should only return 1 scaffold, if more than 1 then there are selectivity issues
# check if all products can be sanitized, only keep the ones that can
row_smiles = []
row_num_atom_diff = []
for product in products:
if self.can_be_sanitized(product[0]): # only keep products that can be sanitized
row_smiles.append(Chem.MolToSmiles(product[0]))
if calc_difference:
row_num_atom_diff.append(
self.calc_num_atom_diff_absolute(self.library.reaction.scaffold, product[0]))
else:
row_num_atom_diff.append(None)
if len(row_smiles) > 1: # if more than 1 scaffold can be sanitized then flag
if 'one_of_multiple_products' not in flags:
flags.append('one_of_multiple_products')
row['combined'] = list(zip(row_smiles, row_num_atom_diff))
row['flag'] = flags if flags else None
else:
product = products[0][0]
if self.can_be_sanitized(product):
base = self.library.reaction.scaffold
if calc_difference:
num_atom_diff = self.calc_num_atom_diff_absolute(base, product)
else:
num_atom_diff = None
row['combined'] = [(Chem.MolToSmiles(product), num_atom_diff)]
# Set flag column to list of flags or None if empty
row['flag'] = flags if flags else None
return row
[docs]
def can_be_sanitized(self, mol: Chem.Mol) -> bool:
if type(mol) != Chem.Mol:
self.logger.error(f"Expected a Chem.Mol object, got {type(mol)}.") # Make sure it's a Chem.Mol object
try:
Chem.SanitizeMol(mol)
return True
except:
return False
[docs]
def calc_num_atom_diff_mcs(self, base: Chem.Mol, product: Chem.Mol) -> int:
"""
This function is used to calculate the number of atoms added to scaffold
by finding the maximum common substructure (MCS) and then finding the difference in length.
"""
mcs = rdFMCS.FindMCS([base, product])
mcs_mol = Chem.MolFromSmarts(mcs.smartsString)
mcs_atoms = mcs_mol.GetNumAtoms()
new_mol_atoms = product.GetNumAtoms()
difference = new_mol_atoms - mcs_atoms
return difference
[docs]
def calc_num_atom_diff_absolute(self, base: Chem.Mol, product: Chem.Mol) -> int:
"""
This function calculates the absolute number of atoms difference between the scaffold and scaffold.
"""
difference = product.GetNumAtoms() - base.GetNumAtoms()
return difference
[docs]
def filter_products(self, products: pd.DataFrame) -> pd.DataFrame:
"""
This function is used to filter the products dataframe to remove any rows with None values. Also
removes duplicates.
"""
products.dropna(subset=['smiles'], inplace=True, axis=0, how='any')
# Convert 'flag' column to tuple ot be hashable
products['flag'] = products['flag'].apply(lambda x: tuple(x) if isinstance(x, list) else x)
filt_products = products.drop_duplicates(ignore_index=True)
if self.library.current_step == self.library.num_steps: # only filter if final step
# drop products with less than minimum number of atoms
self.logger.info(f'Cutting products with number of atoms difference greater than {self.atom_diff_max} and '
f'below {self.atom_diff_min} to scaffold.')
filt_products = filt_products[(filt_products['num_atom_diff'] >= self.atom_diff_min) &
(filt_products['num_atom_diff'] <= self.atom_diff_max)]
self._print_diff(orig_df=products, input_df=filt_products, verb='Kept')
# reorder by num_atom_diff if calculated
if 'num_atom_diff' in filt_products.columns:
filt_products.sort_values(by=['num_atom_diff'], inplace=True)
if len(filt_products) == 0:
self.logger.critical("No products found.")
raise ProductFormationError(message=f"All products filtered for step {self.library.current_step}.",
route_uuid=self.route_uuid,
mol=self.library.reaction.scaffold)
filt_products.reset_index(drop=True, inplace=True)
return filt_products
[docs]
def _print_diff(self,
orig_df: pd.DataFrame,
input_df: pd.DataFrame,
verb: str = None):
"""
This function is used to print the difference between the original number of analogues and the number of
valid analogues.
"""
if len(input_df) >= len(orig_df):
self.logger.error("Problem with finding unique analogues. There are more than were in the original list of "
"analogues.")
percent = round(((len(input_df) / len(orig_df)) * 100), 2)
self.logger.info(f'{verb} {len(input_df)} ({percent}%) valid products out of {len(orig_df)} '
f'products.')
[docs]
def calculate_fingerprints(self, products):
"""Calculate morgan fingerprints for each molecule."""
products['mol'] = products['smiles'].apply(lambda x: Chem.MolFromSmiles(x) if pd.notnull(x) else None)
products['fp'] = products['mol'].apply(
lambda x: fairy.get_morgan_fingerprint(x) if x is not None else None)
return products
[docs]
def find_similarity_groups(self, products: pd.DataFrame) -> (pd.DataFrame, int):
"""
This is an intensive function to find all the similarity groups of the products. Could definitely be optimized.
"""
if 'group_id' not in products.columns:
products['group_id'] = -1 # Initialize all to -1 to indicate no group yet
fps = products['fp'].tolist()
n = len(fps)
# Calculate the fingerprint for the library's reaction scaffold
product_fp = fairy.get_morgan_fingerprint(self.library.reaction.scaffold)
groups = {}
existing_group_ids = set(products['group_id'])
group_id = max(existing_group_ids) + 1 if existing_group_ids else 0
base_group_id = -1 # Initialize with a value to indicate not found
for i in range(n):
if fps[i] is None:
continue
# Check similarity with the reaction scaffold
similarity_to_product = DataStructs.FingerprintSimilarity(fps[i], product_fp)
if similarity_to_product == 1:
# This is a match; assign it to a group with the reaction scaffold
if base_group_id == -1: # If not already assigned
while group_id in existing_group_ids:
group_id += 1
base_group_id = group_id
existing_group_ids.add(group_id)
group_id += 1
groups[i] = base_group_id
for j in range(i + 1, n):
if fps[j] is None:
continue
similarity = DataStructs.FingerprintSimilarity(fps[i], fps[j])
if similarity == 1:
if i not in groups and j not in groups:
while group_id in existing_group_ids:
group_id += 1
groups[i] = groups[j] = group_id
existing_group_ids.add(group_id)
group_id += 1
elif i in groups and j not in groups:
groups[j] = groups[i]
elif i not in groups and j in groups:
groups[i] = groups[j]
# Assign group IDs to products based on similarity groups
for idx, group_id in groups.items():
products.at[idx, 'group_id'] = group_id
# For rows without a group, assign a new unique group ID
ungrouped_indices = products[products['group_id'] == -1].index
for idx in ungrouped_indices:
while group_id in existing_group_ids:
group_id += 1
products.at[idx, 'group_id'] = group_id
existing_group_ids.add(group_id)
group_id += 1
return products, base_group_id
[docs]
def assign_names_based_on_groups(self, products: pd.DataFrame, library_id: str, base_group_id: int) -> pd.DataFrame:
"""Assign names to products based on their group ID, ensuring duplicates have the same name."""
base_name = f"{library_id}-{self.route_uuid}-scaffold"
unique_groups = products['group_id'].unique()
# Assign names based on group ID
for group in unique_groups:
if group == -1: # Handle molecules without a group
continue
if group == base_group_id:
products.loc[products['group_id'] == group, 'name'] = base_name
continue
group_members = products[products['group_id'] == group]
if not group_members.empty:
first_name = group_members.iloc[0]['name'] \
if pd.notnull(group_members.iloc[0]['name']) \
else f"{library_id}-{self.route_uuid}-{int(group)}"
products.loc[products['group_id'] == group, 'name'] = first_name
return products
[docs]
def enumerate_stereoisomers(self, products: pd.DataFrame) -> pd.DataFrame:
"""
This function is used to enumerate the stereoisomers of the products.
"""
# First check if internal step, if yes, don't enumerate stereoisomers
if self.library.num_steps != self.library.current_step:
return products
self.logger.info("Enumerating stereoisomers since this is the final step...")
self.num_unique_products = len(set(list(products['name']))) # unique products before stereoisomer enumeration
new_rows = []
for index, row in products.iterrows():
try:
stereoisomers = self.find_stereoisomers(row['smiles'])
for i, iso in enumerate(stereoisomers):
new_row = row.copy()
new_row['smiles'] = iso
new_row['name'] = f"{row['name']}-{chr(65 + i)}" # Appending A, B, C, etc., to the name
new_row['stereoisomer'] = chr(65 + i)
new_rows.append(new_row)
except:
self.logger.critical(f"Could not enumerate stereoisomers for {row['smiles']}.")
raise ProductFormationError(message=f"Could not enumerate stereoisomers for {row['smiles']}.",
mol=self.library.reaction.scaffold,
route_uuid=self.route_uuid)
new_df = pd.DataFrame(new_rows)
# remove NaNs
new_df = new_df.dropna(subset=['smiles'])
new_df.reset_index(drop=True, inplace=True)
self.num_products_enumstereo = len(
set(list(new_df['name']))) # number of products after stereoisomer enumeration
return new_df
[docs]
def find_stereoisomers(self, smiles: str) -> List[Chem.Mol]:
# This function should return a list of stereoisomers for the given SMILES string.
mol = Chem.MolFromSmiles(smiles)
# Generate all stereoisomers
try:
opts = StereoEnumerationOptions(unique=True)
isomers = list(EnumerateStereoisomers(mol, options=opts))
isomer_list = [Chem.MolToSmiles(isomer, isomericSmiles=True) for isomer in isomers]
except RuntimeError:
self.logger.warning(f"Could not enumerate stereoisomers for {smiles}. Keeping original SMILES.")
isomer_list = [smiles]
return isomer_list
[docs]
def save_products(self):
"""
This function is used to save the products dataframe as a .pkl file.
"""
pkl_name = (f"{self.library.id}_{self.library.route_uuid}_{self.library.reaction.reaction_name}_products_"
f"{self.library.current_step}of{self.library.num_steps}.pkl.gz")
csv_name = (f"{self.library.id}_{self.library.route_uuid}_{self.library.reaction.reaction_name}_products_"
f"{self.library.current_step}of{self.library.num_steps}.csv")
if self.library.num_steps != self.library.current_step:
self.logger.info(
"Since these products are not the final products they will be saved in the /extra folder. \n")
self.logger.info(f"Saving products to {self.output_dir}/extra/{pkl_name} \n")
os.makedirs(f"{self.output_dir}/extra/", exist_ok=True)
self.products.to_pickle(f"{self.output_dir}/extra/{pkl_name}")
else:
self.final_products_pkl_path: str = f"{self.output_dir}/{pkl_name}"
self.final_products_csv_path: str = f"{self.output_dir}/{csv_name}"
self.logger.info(f"Saving final products to {self.final_products_pkl_path} \n")
os.makedirs(f"{self.output_dir}/", exist_ok=True)
self.products.to_pickle(self.final_products_pkl_path)
self.logger.info(f"Saving final products to {self.final_products_csv_path} \n")
self.products.to_csv(self.final_products_csv_path, index=False)
[docs]
def label_products(self):
"""
This function makes a new instance of the Labeler class and calls the label_products function.
"""
labeler = Labeler(self.products, self.atom_ids_expansion, self.library)
self.products = labeler.label_products()