Source code for isogroup.base.targeted_experiment

from __future__ import annotations
import pandas as pd
from isogroup.base.experiment import Experiment
from isogroup.base.cluster import Cluster
from isogroup.base.database import Database
import logging
import time

logger = logging.getLogger(f"IsoGroup")

[docs] class TargetedExperiment(Experiment): """ Represents a targeted mass spectrometry experiment. Used to group and annotate detected features from an experimental dataset using a reference database with isotopic tracer information. """ def __init__(self, dataset:pd.DataFrame, tracer:str, ppm_tol:float, rt_tol:float, database:pd.DataFrame): """ :param dataset: DataFrame containing experimental data with columns for m/z, retention time (RT), feature ID and sample intensities. :param tracer: Tracer code used in the experiment (e.g. "13C"). :param ppm_tol: m/z tolerance (in ppm). :param rt_tol: Retention time tolerance (in seconds). :param database: DataFrame containing theoretical features with columns retention time (RT), metabolite names, and formulas. """ super().__init__(dataset = dataset, tracer=tracer, ppm_tol=ppm_tol, rt_tol=rt_tol, database=database) self.database = Database(dataset=database, tracer=self._tracer, tracer_element=self.tracer_element) self.all_features_df = None self.all_clusters_df = None # self.ppm_tol = ppm_tol # self.rt_tol = rt_tol # self.tracer = tracer # self.cluster = cluster # self._tracer_element, self._tracer_idx = Misc._parse_strtracer(tracer)
[docs] def run_targeted_pipeline(self): """ Run the full targeted annotation pipeline for the experiment. This includes: - Initializing Feature objects from the dataset. - Matching experimental features to the database within specified tolerances. - Clustering features by metabolite names. """ start_time = time.time() self.initialize_experimental_features() self.annotate_features() self.clusterize() self.create_features_df() self.create_clusters_df() total_time = time.time() - start_time logger.info(f"Targeted grouping completed in {total_time:.2f} seconds.")
[docs] def annotate_features(self): """ Annotate experimental features by matching them with the database features within specified m/z and retention time tolerances. """ logger.info("Find matches between experimental features and database features...") nb_features_annotated = 0 for features_id in self.features.values(): for feature in features_id.values(): for db_feature in self.database.theoretical_features: # Calculate the exact mz and rt errors mz_error = (db_feature.mz - feature.mz) rt_error = (db_feature.rt - feature.rt) # Covert mz_error to ppm mz_error = (mz_error / feature.mz) * 1e6 # Check if the experimental feature is within tolerance if abs(mz_error) <= self.ppm_tol and abs(rt_error) <= self.rt_tol: feature.chemical.append(db_feature.chemical[0]) # feature.isotopologue.append(db_feature.isotopologue[0]) feature.cluster_isotopologue[db_feature.chemical[0].label] = db_feature.cluster_isotopologue[db_feature.chemical[0].label] feature.metabolite.append(db_feature.chemical[0].label) feature.formula.append(db_feature.chemical[0].formula) feature.mz_error.append(mz_error) feature.rt_error.append(rt_error) nb_features_annotated += 1 logger.debug(f"Feature {feature.feature_id} in sample {feature.sample} annotated with {db_feature.chemical[0].label} (isotopologue: {db_feature.cluster_isotopologue[db_feature.chemical[0].label]})") logger.debug(f" - mz error (ppm): {mz_error}, rt error (sec): {rt_error}") logger.info(f" => {nb_features_annotated} experimental features matched with database features.\n")
[docs] def clusterize(self): """ Group features by metabolite names within each sample and assign a unique cluster ID to each group. Populates `self.clusters` as a dictionary of the form: {sample_name: {cluster_id: Cluster object}} """ # cluster_names = [] # # Group features by metabolite # for sample in self.features.values(): # for feature in sample.values(): # cluster_names += feature.metabolite # # cluster_names = set(cluster_names) # Create unique clusters # # # self.clusters = {} logger.info("Grouping features by metabolite names...") cluster_names = [] for _, features in self.features.items(): for feature in features.values(): cluster_names += [metabolite_name for metabolite_name in feature.metabolite if metabolite_name not in cluster_names] for sample in self.features.keys(): self.clusters[sample] = {} for index, clusters in enumerate(cluster_names): features = self.get_features_from_name(clusters, sample) # Sort features by isotopologues # features.sort(key=lambda f: f.isotopologue) features.sort(key=lambda f: f.cluster_isotopologue[clusters]) # Assign the cluster_id to the features in the cluster for feature in features: if not hasattr(feature, "in_cluster") or feature.in_cluster is None: feature.in_cluster = [] feature.in_cluster.append(f"C{index}") self.clusters[sample][clusters] = Cluster(features=features, cluster_id=f"C{index}", name=clusters) logger.debug(f"Cluster C{index} ({clusters}) identified with {len(features)} features in sample {sample}.") logger.debug(f" {[features.feature_id for features in features]} ") logger.info(f" => {len(cluster_names)} clusters identified.\n")
[docs] def get_features_from_name(self, name:str, sample_name:str): """ Retrieve all features in a given sample that are annotated with a specific metabolite name. :param name: Name of the metabolite for which to retrieve features :param sample_name: Name of the sample from which to retrieve features :return: List of Feature objects that match the metabolite name in the specified sample """ features = [] for feature in self.features[sample_name].values(): if name in feature.metabolite: features.append(feature) return features
[docs] def get_clusters_from_name(self, name, sample_name:str): """ Get a cluster from the experiment by its name, in a given sample if provided :param name: Name of the cluster to retrieve :param sample_name: Name of the sample to retrieve the cluster from :return: Cluster object if found, None otherwise """ for cluster in self.clusters[sample_name].values(): if cluster.name == name: return cluster return None
[docs] def create_clusters_df(self): #sample_name = None): """ Create and store a dataframe containing all clusters. """ # all_samples = list(self.features.keys()) # if sample_name is not None: # if sample_name not in all_samples: # raise ValueError(f"Sample {sample_name} not found in annotated clusters. Available samples: {', '.join(all_samples)}") cluster_data = [] for clusters in self.clusters.values(): # if sample_name is None or sample_name == sample: # Filter the DataFrame by sample name if provided # print(clusters.name) # for cname, cluster in clusters.items(): for cluster in clusters.values(): # print(cluster.name) for feature in cluster.features: idx = [index for index,metabolite in enumerate(feature.metabolite) if metabolite == cluster.name][0] # Get the cluster_id of the features in another cluster # other_clusters = [c.cluster_id for cluster_name, c in clusters.items() if feature in c.features and c.cluster_id != cluster.cluster_id] # print(other_clusters) cluster_data.append({ "cluster_id": cluster.cluster_id, "metabolite": cluster.name, "feature_id": feature.feature_id, "mz": feature.mz, "rt": feature.rt, "feature_potential_metabolite": feature.metabolite, # "isotopologue": feature.isotopologue[idx], "isotopologue": feature.cluster_isotopologue[cluster.name], "mz_error": feature.mz_error[idx], "rt_error": feature.rt_error[idx], "sample": feature.sample, "intensity": feature.intensity, "status": cluster.status, "missing_isotopologue": cluster.missing_isotopologues, "duplicated_isotopologue": cluster.duplicated_isotopologues, # "in_cluster": feature.in_cluster, "in_another_cluster": [c.cluster_id for c in clusters.values() if feature in c.features and c.cluster_id != cluster.cluster_id] }) # Create a DataFrame to summarize the annotated clusters self.all_clusters_df= pd.DataFrame(cluster_data)
[docs] def create_features_df(self): #sample_name = None): """ Create and store a dataframe containing all features. """ feature_data = [] for all_features in self.features.values(): for feature in all_features.values(): feature_data.append({ "feature_id": feature.feature_id, "mz": feature.mz, "rt": feature.rt, "metabolite": feature.metabolite, # "isotopologue": feature.isotopologue, "isotopologue": [feature.cluster_isotopologue[met] for met in feature.metabolite], "mz_error": feature.mz_error, "rt_error": feature.rt_error, "sample": feature.sample, "intensity": feature.intensity }) # Create a DataFrame to summarize the annotated data self.all_features_df = pd.DataFrame(feature_data)
# # Export the Dataframe of only one sample if a sample name is provided # if sample_name: # df = df[df["sample"] == sample_name] # Filter the DataFrame by sample name # if __name__ == "__main__": # from isogroup.base.io import IoHandler # from isogroup.base.database import Database # from pathlib import Path # io= IoHandler() # # data= io.read_dataset(Path(r"..\..\data\dataset_test_XCMS.txt")) # # database = io.read_database(Path(r"..\..\data\database.csv")) # experiment = TargetedExperiment(data, tracer="13C", ppm_tol=5, rt_tol=15, database=database) # experiment.run_targeted_pipeline() # experiment.clusters_df() ############################################################################### # @property # def rt_tol(self): # """ # Returns the retention time tolerance used for feature annotation. # :return: float # """ # return self._rt_tol # @property # def tracer(self): # """ # Returns the tracer used for the experiment. # :return: str | None # """ # return self._tracer # @property # def tracer_element(self): # """ # Returns the tracer element used in the experiment. # :return: str | None # """ # return self._tracer_element # @property # def mz_tol(self): # """ # Returns the m/z tolerance used for feature annotation. # :return: float | None # """ # return self._mz_tol # def initialize_experimental_features(self): # """ # Initialize Feature objects from the dataset and organize them by sample. # Each feature is created with its retention time, m/z, tracer, intensity, and sample name. # Populates `self.samples` as a dictionary of the form: # {sample_name: {feature_id: Feature object}} # """ # for idx, _ in self.dataset.iterrows(): # mz = idx[0] # rt = idx[1] # id = idx[2] # # Extract the intensity for each sample in the dataset # for sample in self.dataset.columns: # if sample not in ["mz", "rt", "id"]: # intensity = self.dataset.loc[idx, sample] # # Initialize the experimental features for each sample # feature = Feature( # rt=rt, mz=mz, tracer=self.tracer, # feature_id=id, # intensity=intensity, # sample=sample # ) # # Add the feature in the list corresponding to the sample # if sample not in self.samples: # self.samples[sample] = {} # self.samples[sample][id] = feature # def annotate_experiment(self, mz_tol, rt_tol): # """ # Run the full annotation process for the experiment. # This includes: # - Initializing Feature objects from the dataset. # - Matching experimental features to the database within specified tolerances. # :param mz_tol: m/z tolerance in ppm # :param rt_tol: retention time tolerance in seconds # """ # # Initialize the experimental features from the dataset # self.initialize_experimental_features() # # Annotate the experimental features # self.annotate_features(mz_tol, rt_tol) # def export_features(self, filename = None, sample_name = None): # """ # Summarize annotated features into a DataFrame and optionally export it to a tsv file. # :param filename: Name of the file to export the summary to # :param sample_name: Name of the sample to filter the DataFrame by, if provided # :return: pd.DataFrame with the summary of the annotated features # """ # # Create a DataFrame to summarize the experimental features # feature_data = [] # for sample in self.samples.values(): # for feature in sample.values(): # feature_data.append({ # "feature_id": feature.feature_id, # "mz": feature.mz, # "rt": feature.rt, # "metabolite": feature.metabolite, # "isotopologue": feature.isotopologue, # "mz_error": feature.mz_error, # "rt_error": feature.rt_error, # "sample": feature.sample, # "intensity": feature.intensity # }) # # Create a DataFrame to summarize the annotated data # df = pd.DataFrame(feature_data) # # Export the DataFrame to a tsv file if a filename is provided # if filename: # df.to_csv(filename, sep="\t", index=False) # # Export the Dataframe of only one sample if a sample name is provided # if sample_name: # df = df[df["sample"] == sample_name] # Filter the DataFrame by sample name # df.to_csv(filename, sep="\t", index=False) # return df # def export_clusters(self, filename = None, sample_name = None): # """ # Summarize annotated clusters into a DataFrame and optionally export it to a tsv file. # :param filename: Name of the file to export the summary to # :param sample_name: Name of the sample to filter the DataFrame by, if provided # :return: pd.DataFrame with the summary of the annotated clusters # """ # # Check if the sample name is in the DataFrame # all_samples = list(self.samples.keys()) # if sample_name is not None: # if sample_name not in all_samples: # raise ValueError(f"Sample {sample_name} not found in annotated clusters. Available samples: {', '.join(all_samples)}") # cluster_data = [] # for sample, clusters in self.clusters.items(): # if sample_name is None or sample_name == sample: # Filter the DataFrame by sample name if provided # for cname, cluster in clusters.items(): # for feature in cluster.features: # idx = [i for i,j in enumerate(feature.metabolite) if j == cname][0] # # Get the cluster_id of the features in another cluster # other_clusters = [c.cluster_id for cluster_name, c in clusters.items() if feature in c.features and c.cluster_id != cluster.cluster_id] # cluster_data.append({ # "cluster_id": cluster.cluster_id, # "metabolite": cluster.name, # "feature_id": feature.feature_id, # "mz": feature.mz, # "rt": feature.rt, # "feature_potential_metabolite": feature.metabolite, # "isotopologue": feature.isotopologue[idx], # "mz_error": feature.mz_error[idx], # "rt_error": feature.rt_error[idx], # "sample": feature.sample, # "intensity": feature.intensity, # "status": cluster.status, # "missing_isotopologue": cluster.missing_isotopologues, # "duplicated_isotopologue": cluster.duplicated_isotopologues, # # "in_cluster": feature.in_cluster, # "in_another_cluster": other_clusters # }) # # Create a DataFrame to summarize the annotated clusters # df = pd.DataFrame(cluster_data) # # Export the DataFrame to a tsv file if a filename is provided # if filename: # df.to_csv(filename, sep="\t", index=False) # return df # def clusters_summary(self, filename = None): # """ # Export a tsv file with a summary of the clusters # :param filename: Name of the file to export the summary to # :return: pd.DataFrame with the summary of the clusters # """ # # List to store the cluster summary data # cluster_summary = [] # cluster_id_unique = set() # To store unique cluster_id # for sample, clusters in self.clusters.items(): # for cluster in clusters.values(): # # Check if the cluster_id is unique # if cluster.cluster_id not in cluster_id_unique: # cluster_id_unique.add(cluster.cluster_id) # summary = cluster.cluster_summary # # Retrieve the samples in which the cluster is present # samples_in_cluster = {sample for sample, clusters in self.clusters.items() if cluster.cluster_id in [c.cluster_summary["cluster_id"] for c in clusters.values()]} # summary["samples"] = len(samples_in_cluster) # cluster_summary.append(summary) # # Create a DataFrame with the collected information # df = pd.DataFrame(cluster_summary) # # Export the DataFrame to a tsv file if a filename is provided # if filename: # df.to_csv(filename, sep="\t", index=False) # return df