Source code for isogroup.base.untargeted_experiment

from __future__ import annotations
from isogroup.base.experiment import Experiment
import isogroup.enhancer.unlabeled_enhancer as unlabeled_enhancer 
import isogroup.enhancer.labeled_enhancer as labeled_enhancer
import bisect
from collections import defaultdict
from isogroup.base.cluster import Cluster
from isogroup.base.misc import Misc
import logging
import time
import pandas as pd

logger = logging.getLogger(f"IsoGroup")


[docs] class UntargetedExperiment(Experiment): """ Represents an untargeted mass spectrometry experiment. An untargeted experiment involves grouping features into potential isotopologue clusters based on retention time proximity and m/z differences. """ def __init__(self, dataset:pd.DataFrame, tracer:str, ppm_tol:float, rt_tol:float, max_atoms:int = None, keep:str=None) : # keep_best_candidate: bool = False, # keep_richest: bool = False, """ :param dataset: DataFrame containing experimental data with columns for m/z, retention time (RT), feature ID and sample intensities. :param tracer: Tracer code used in the experiment (e.g. "13C"). :param ppm_tol: m/z tolerance in ppm. :param rt_tol: Retention time tolerance in seconds. :param max_atoms: Maximum number of tracer atoms to consider for isotopologues. If None, IsoGroup automatically estimates the maximum number of isotopologues based on the feature m/z and tracer element. :param keep: Strategy to keep clusters during deduplication. Options are "longest", "closest_mz", "both". By default, "all" (all clusters are kept). """ super().__init__(dataset= dataset, tracer=tracer, ppm_tol=ppm_tol, rt_tol=rt_tol, max_atoms=max_atoms) self.mode = "untargeted" # self.dataset = dataset # self.features = features # self.log_file = log_file # self.tracer = tracer # self._tracer_element, self._tracer_idx = tracer_element, tracer_idx # self.RTwindow = rt_window # self.ppm_tolerance = ppm_tolerance # self.max_atoms = max_atoms self.mzshift_tracer = float(Misc.calculate_mzshift(self.tracer)) self.keep = keep # Keep strategy: "longest", "closest_mz", "both". By default, "All" (all clusters are kept). # self.keep_best_candidate = keep_best_candidate # self.keep_richest = keep_richest self.unclustered_features = {} # {sample_name: [Feature objects]} self.subsets_removed = None self.all_features_df = None self.all_clusters_df = None # --- Set up logging --- # self.log_file = log_file # logging.basicConfig(filename=self.log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # self.logger = logging.getLogger("IsoGroup.UntargetedExperiment") # self.logger.info(f"Tracer: {self.tracer}, Tracer element: {self.tracer_element}, m/z shift: {self.mzshift_tracer}")
[docs] def run_untargeted_pipeline(self, enhancing_mode=None, sample_name=None,): """ Complete pipeline to build and deduplicate clusters from the dataset with logging and timing. :param enhancing_mode: Mode used to enhance the dataset. Accepted values are "unlabeled" or "fully labeled". If None, no enhancement is applied. Defaults to None. :param sample_name: name of the sample file to use for enhancement. Required if enhancing_mode is specified. """ start_time = time.time() # start_dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # logger.info(f"Starting untargeted clustering pipeline at {start_dt}") # --- Initialization of features --- self.initialize_experimental_features() # print(" Initializing features...", end=" ", flush=True) # t0 = time.time() # self.initialize_experimental_features() # features_count = len(next(iter(self.features.values()))) # nb_samples = len(self.features) # # print(f" done ({features_count} features per sample)") # logger.info(f"Initialized {features_count} features for {nb_samples} samples") # --- Construction of clusters --- # print(" Building clusters without filtration...", end=" ", flush=True) # t0 = time.time() # logger.info(f"Built clusters with RT window: {self.rt_tol} sec, m/z tolerance: {self.mz_tol} ppm, max atoms: {self.max_atoms}") logger.info("Building clusters...") self.build_clusters(self.rt_tol, self.ppm_tol, self.max_atoms) logger.info(f" => {len(next(iter(self.clusters.values())))} clusters formed per sample.\n") # clusters_count = len(next(iter(self.clusters.values()))) # print(f" done ({clusters_count} clusters per sample)") # --- Deduplication and cleaning of clusters --- self.deduplicate_clusters(self.keep) self.create_features_df() self.create_clusters_df() if enhancing_mode == "unlabeled": self.unlabeled_enhancer(self.all_clusters_df, sample_name) if enhancing_mode == "fully_labeled": self.fully_labeled_enhancer(self.all_clusters_df, sample_name) # print(" Cleaning clusters...", end=" ", flush=True) # t0 = time.time() # merged, subset_removed, final, unclustered = self.deduplicate_clusters(keep_best_candidate=keep_best_candidate, keep_richest=keep_richest) # print(f"→ {merged} merged, {subset_removed} subsets removed, {final} final clusters remained/sample") # self.logger.info( # f"Deduplication completed: merged clusters={merged}, removed subsets={subset_removed}, final cleaned clusters={final}, unclustered features={unclustered}" # ) # print(f"Total clusters after deduplication for sample {sample} : {len(new_clusters)}\n") # logger.info(f" => {len(next(iter(self.clusters.values()))) if self.clusters else 0} final clusters per sample") logger.info(f"{len(next(iter(self.clusters.values())))} isotopic clusters identified per sample.") logger.info(f"{len(next(iter(self.unclustered_features.values()))) if self.unclustered_features else 0} unassigned features per sample.") total_time = time.time() - start_time # print(f"[IsoGroup] Untargeted clustering completed in {total_time:.2f} seconds.") # self.logger.info(f"Pipeline completed in {total_time:.2f} seconds.") logger.info(f"Untargeted grouping completed in {total_time:.2f} seconds.")
# --- Verbose logging to file --- # if verbose: # summary = [ # ("Start Time", start_dt), # ("Tracer", self.tracer), # ("Number of samples", nb_samples), # ("Features/sample", features_count), # ("RT window (s)", self.RTwindow), # ("m/z tolerance (ppm)", self.ppm_tolerance), # ("Clusters before cleaning", clusters_count), # ("Clusters merged", merged), # ("Subset clusters removed", subset_removed), # ("Final isotopic clusters/sample", final), # ("Unclustered features", unclustered), # ("Total time (s)", f"{total_time:.2f}") # ] # with open(self.log_file, "a") as f: # f.write("\n" + "=" * 80 + "\nUntargeted Isotopic Clustering Summary\n" + "=" * 80 + "\n") # for key, value in summary: # f.write(f"{key}: {value}\n")
[docs] def build_clusters(self, rt_tol: float, ppm_tol: float, max_atoms: int = None): """ Group features into potential isotopologue clusters based on retention time proximity and m/z differences. :param rt_tol: Retention time window for clustering. :param ppm_tol: m/z tolerance in parts per million for clustering. :param max_atoms: Maximum number of tracer atoms to consider for isotopologues. If None, IsoGroup automatically estimates the maximum number of isotopologues based on the feature m/z and tracer element. """ # self._rt_tol = rt_tol # self._ppm_tol = ppm_tol if not self.features: logger.error("Features must be initialized before building clusters.") raise ValueError("Features must be initialized before building clusters.") # self.clusters = {} for sample_name, features in self.features.items(): all_features = sorted(features.values(), key=lambda f: f.rt) rts = [f.rt for f in all_features] clusters = {} cluster_id_local = 0 # For each feature, find potential isotopologues within the RT window for base_feature in all_features: # logger.debug(f" => Feature {base_feature.feature_id} (m/z: {base_feature.mz}, rt: {base_feature.rt})") # --- Find candidates within the RT window --- left_bound = bisect.bisect_left(rts, base_feature.rt - rt_tol) right_bound = bisect.bisect_right(rts, base_feature.rt + rt_tol) # logger.debug(f" ---- Candidates within RT window: {base_feature.rt - rt_tol} - {base_feature.rt + rt_tol} sec ----") candidates = all_features[left_bound:right_bound] potential_group = {base_feature} # logger.debug(f" {[candidate.feature_id for candidate in candidates]} \n") # --- Identification of candidates for isotopologues --- for candidate in candidates: if candidate == base_feature: continue # iso_index = round((candidate.mz - base_feature.mz) / self.mzshift_tracer) iso_index = Misc.calculate_isotopologue_index(candidate.mz, base_feature.mz, self.mzshift_tracer) # Define a maximum number of tracer atoms if specified max_iso = Misc.get_max_isotopologues_for_mz(base_feature.mz, self.tracer_element) if max_atoms is None else max_atoms if abs(iso_index) > max_iso: continue expected_mz = base_feature.mz + iso_index * self.mzshift_tracer delta_ppm = abs(expected_mz - candidate.mz) / expected_mz * 1e6 if delta_ppm <= ppm_tol: potential_group.add(candidate) # logger.debug(f" => Candidate {candidate.feature_id} matched as potential isotopologue M+{abs(iso_index)} (m/z: {candidate.mz}, rt: {candidate.rt}, delta ppm: {delta_ppm:.2f})") # --- If a group of isotopologues is found, create a cluster --- if len(potential_group) > 1: cluster_id = f"C{cluster_id_local}" group_sorted = sorted(list(potential_group), key=lambda f: f.mz) # for f in group_sorted: # # iso_index = round((f.mz - group_sorted[0].mz) / self.mzshift_tracer) # iso_index = Misc.calculate_isotopologue_index(f.mz, group_sorted[0].mz, self.mzshift_tracer) # Theoretical isotopologue index # iso_label_tmp = "Mx" if iso_index == 0 else f"M+{iso_index}" # f.cluster_isotopologue[cluster_id] = iso_label_tmp # Specific to clusters # # if cluster_id not in f.in_cluster: # # f.in_cluster.append(cluster_id) clusters[cluster_id] = Cluster(cluster_id=cluster_id, features=group_sorted) cluster_id_local += 1 self.clusters[sample_name] = clusters for cluster_id, cluster in clusters.items(): logger.debug(f" Cluster {cluster_id} formed with {len(cluster.features)} feature(s):") # feature's id and retentions times in the same line for feature in cluster.features: logger.debug(f" => Feature {feature.feature_id} : m/z={feature.mz}, rt={feature.rt}")
def _keep_longest_cluster(self, cluster:dict): """ Retain only the longest cluster. :param cluster: cluster dictionary to process. """ self.subsets_removed = [] signatures = {cid: set(f.feature_id for f in c.features) for cid, c in cluster.items()} sorted_clusters = sorted(signatures.items(), key=lambda x: len(x[1]), reverse=True) kept = [] # Compare from largest to smallest cluster to identify subsets # If a smaller cluster is a subset of any kept larger cluster, mark it for removal # logger.debug(" Clusters sorted by size:") for cid, sig1 in sorted_clusters: # logger.debug(f" Cluster {cid} : {sig1}") is_subset = False for _, sig2 in kept: if sig1 < sig2: is_subset = True self.subsets_removed.append(f"{sig1} removed (subset of {sig2})") del cluster[cid] break if not is_subset: kept.append((cid, sig1)) # logger.info(f" => {len(self.subsets_removed)} subsets removed.") # for subset in self.subsets_removed: # logger.debug(f" => {subset}") def _keep_closest_mz_candidate(self, cluster:dict): """ Keep only the feature closest to the expected m/z for each isotopologue in the cluster. :param cluster: cluster dictionary to process. """ # logger.info(" Keeping closest m/z feature candidate for each isotopologues...\n") self.subsets_removed = {} for cluster in cluster.values(): iso_to_candidate = defaultdict(list) base_mz = cluster.lowest_mz # logger.debug(f" Lowest mz in cluster {cluster.cluster_id} : {base_mz}") for feature in cluster.features: iso_index = Misc.calculate_isotopologue_index(feature.mz, base_mz, self.mzshift_tracer) iso_to_candidate[iso_index].append(feature) # logger.debug(f" Isotopologue {iso_index} candidates: {(feature.feature_id, f'mz: {feature.mz}')}") # cluster.features = [min(candidates, key=lambda f: abs(f.mz - (base_mz + index * self.mzshift_tracer))) for index, candidates in iso_to_candidate.items()] cluster_features = [] for index, candidates in iso_to_candidate.items(): best_candidate = min(candidates, key=lambda f: abs(f.mz - (base_mz + index * self.mzshift_tracer))) cluster_features.append(best_candidate) # logger.debug(f" => Keeping candidate {best_candidate.feature_id} for isotopologue {index} in cluster {cluster.cluster_id}") for f in candidates: if f not in cluster.features: self.subsets_removed[cluster.cluster_id] = {index: [f.feature_id]} else: continue cluster.features = cluster_features # for f in candidates: # if f not in cluster.features: # self.subsets_removed[cluster.cluster_id] = {index: [f.feature_id]} # else: # continue # print(f" => Removing candidate {f.feature_id} for isotopologue {index} in cluster {cluster.cluster_id}") # print(f" => {len(f.feature_id)} candidate(s) removed in {len(cluster.cluster_id)} cluster(s).")
[docs] def deduplicate_clusters(self, keep:str=None): """ Clean up and deduplicate clusters by : - Merging clusters with identical feature compositions. - Removing clusters that are subsets of larger clusters (if keep is "longest"). - Keeping only the best candidate feature for each isotopologue (if keep is "closest_mz"). - Updating each feature's cluster memberships, isotopologue numbers, and also_in lists. :param keep: Strategy for deduplication. Options are "longest" to keep the largest cluster, "closest_mz" to retain only the feature with the highest intensity for each isotopologue within a cluster, or "both" to apply both strategies. By default, all clusters are kept ("all"). """ final_clusters = {} logger.info("Merging clusters...") for sample, clusters in self.clusters.items(): merged = 0 final_clusters[sample] = {} seen_signatures = {} for cluster in clusters.values(): signature = frozenset(f.feature_id for f in cluster.features) if signature not in seen_signatures: seen_signatures[signature] = cluster.cluster_id final_clusters[sample][cluster.cluster_id] = cluster else: merged += 1 logger.info(f" => {merged} clusters deleted (merged) per sample.\n") new = {} if keep: logger.info(f"Deduplicating clusters based on specified strategy (keep '{keep}')...") for sample, clusters in final_clusters.items(): new[sample] = {} # --- Remove subset clusters --- if keep == "longest": self._keep_longest_cluster(final_clusters[sample]) elif keep =="closest_mz": self._keep_closest_mz_candidate(final_clusters[sample]) elif keep == "both": self._keep_longest_cluster(final_clusters[sample]) self._keep_closest_mz_candidate(final_clusters[sample]) if self.subsets_removed: if isinstance(self.subsets_removed, dict): feature_count = 0 for cluster_id, removed in self.subsets_removed.items(): for iso_index, features in removed.items(): feature_count += len(features) logger.debug(f" => In cluster {cluster_id}, removed candidates for isotopologue {iso_index}: {features}") logger.info(f" => {feature_count} candidate(s) removed in {len(self.subsets_removed)} cluster(s).\n") else: logger.info(f" => {len(self.subsets_removed)} subsets removed per sample.\n") logger.debug(" Removed subsets:") logger.debug(self.subsets_removed) for sample, clusters in final_clusters.items(): # --- Assign final cluster_id, isotopologues label, in_cluster and also_in to features --- features_to_clusters = defaultdict(set) for new_index, cluster in enumerate(final_clusters[sample].values()): logger.debug(f" Cluster_id: {cluster.cluster_id}") cluster.cluster_id = f"C{new_index}" logger.debug(f" New index assigned: {cluster.cluster_id}") new[sample][cluster.cluster_id] = cluster for f in cluster.features: features_to_clusters[f.feature_id].add(cluster.cluster_id) for cluster in final_clusters[sample].values(): cluster.features.sort(key=lambda f: f.mz) min_mz=cluster.lowest_mz for f in cluster.features: iso_index = Misc.calculate_isotopologue_index(f.mz, min_mz, self.mzshift_tracer) iso_label = "Mx" if iso_index == 0 else f"Mx+{iso_index}" f.cluster_isotopologue[cluster.cluster_id] = iso_label f.in_cluster = list(features_to_clusters[f.feature_id]) f.also_in[cluster.cluster_id] = [c for c in f.in_cluster if c != cluster.cluster_id] self.clusters = new # Keep unclustered features for reference for sample, features in self.features.items(): self.unclustered_features[sample] = [f for f in features.values() if not f.in_cluster]
# final = len(next(iter(self.clusters.values()))) if self.clusters else 0 # unclustered = sum(1 for f in next(iter(self.features.values())).values() if not f.in_cluster) if self.features else 0
[docs] def create_features_df(self): """ Create and store a dataframe containing all features. """ all_features = [] for features in self.features.values(): for f in features.values(): all_features.append({ "FeatureID": f.feature_id, "RT": f.rt, "m/z": f.mz, "sample": f.sample, "Intensity": f.intensity, "InClusters": f.in_cluster if f.in_cluster else ["None"], "Isotopologues": [f.cluster_isotopologue.get(cid, "N/A") for cid in f.in_cluster] if f.in_cluster else ["N/A"], }) self.all_features_df = pd.DataFrame(all_features)
[docs] def create_clusters_df(self): """ Create and store a dataframe containing all clusters. """ all_clusters = [] for clusters in self.clusters.values(): for cluster in clusters.values(): sorted_features = sorted(cluster.features, key=lambda f: f.mz) for f in sorted_features: # iso_label = f.cluster_isotopologue.get(cluster.cluster_id, "Mx") all_clusters.append({ "ClusterID": cluster.cluster_id, "FeatureID": f.feature_id, "RT": f.rt, "m/z": f.mz, "sample": f.sample, "Intensity": f.intensity, "Isotopologue": f.cluster_isotopologue[cluster.cluster_id], # "InClusters": f.in_cluster, "AlsoIn": str(f.also_in[cluster.cluster_id]) }) self.all_clusters_df = pd.DataFrame(all_clusters)
[docs] def unlabeled_enhancer(self, clusters_df, sample_name): """ Refine the untargeted pipeline annotations using unlabeled data. :param clusters_df: DataFrame containing all clusters generated by the IsoGroup's untargeted mode. :param sample_name: Name of the unlabeled sample use for enhancer. """ df_feature_found = unlabeled_enhancer.annotate_feature_found(clusters_df, sample_name) self.all_clusters_df = unlabeled_enhancer.calculate_m1_m0_ratio(df_feature_found, sample_name)
[docs] def fully_labeled_enhancer(self, clusters_df, sample_name): """ Refine the untargeted pipeline annotations using fully labeled data. :param clusters_df: DataFrame containing all clusters generated by the IsoGroup's untargeted mode. :param sample_name: Name of the fully labeled sample use for enhancer. """ self.all_clusters_df = labeled_enhancer.annotate_feature_found(clusters_df, sample_name)
# if __name__ == "__main__": # from isogroup.base.io import IoHandler # import pandas as pd # from pathlib import Path # io = IoHandler() # # data= io.read_dataset(Path(r"..\..\data\dataset_test_XCMS.txt")) # untargeted = UntargetedExperiment(dataset=data, tracer="13C", ppm_tol=5, rt_tol=15) # untargeted.run_untargeted_pipeline() # # untargeted.initialize_experimental_features() # # untargeted.build_clusters(RTwindow=15, ppm_tolerance=5) # # untargeted.deduplicate_clusters() # # # print(untargeted.clusters) # for sample, clusters in untargeted.clusters.items(): # for cluster in clusters.values(): # print(cluster.isotopologues) # # print(cluster.__len__()) # for f in cluster.features: # print(f"Sample {sample} Cluster {cluster.cluster_id} : {f.feature_id}{f.in_cluster, f.also_in[cluster.cluster_id]}") # for key, value in untargeted.clusters.items(): # for key2,value2 in value.items(): # for f in value2.features: # print(f"Sample {key} Cluster {key2} : {f.feature_id}{f.mz, f.rt, f.in_cluster, f.also_in}") # # print(untargeted.clusters.keys()) # # # print(untargeted.clusters) # untargeted.deduplicate_clusters() # print(untargeted.clusters.keys()) ################################################## # def deduplicate_clusters(self, keep_best_candidate: bool = False, keep_richest: bool = True): # """ # Clean up and deduplicate clusters by : # - Merging clusters with identical feature compositions. # - Removing clusters that are subsets of larger clusters (if keep_richest is True). # - Keeping only the best candidate feature for each isotopologue (if keep_best_candidate is True). # - Updating each feature's cluster memberships, isotopologue numbers, and also_in lists. # Parameters: # keep_best_candidate (bool): If True, retain only the feature with the highest intensity for each isotopologue within a cluster. # keep_richest (bool): If True, retain only the largest cluster when multiple clusters share features. # """ # merged = 0 # subset_removed = 0 # final_clusters = {} # for sample, clusters in self.clusters.items(): # # --- Merge identical clusters --- # final_clusters[sample] = {} # seen_signatures = {} # next_cluster_id = 0 # for cluster in clusters.values(): # signature = frozenset(f.feature_id for f in cluster.features) # if signature not in seen_signatures: # cluster.cluster_id = f"C{next_cluster_id}" # seen_signatures[signature] = cluster.cluster_id # final_clusters[sample][cluster.cluster_id] = cluster # next_cluster_id += 1 # else: # merged += 1 # # --- Remove subset clusters if keep_richest is True --- # if keep_richest: # signatures = {cid: set(f.feature_id for f in c.features) for cid, c in final_clusters[sample].items()} # sorted_clusters = sorted(signatures.items(), key=lambda x: len(x[1]), reverse=True) # to_remove = set() # kept = [] # # Compare from largest to smallest cluster to identify subsets # # If a smaller cluster is a subset of any kept larger cluster, mark it for removal # for cid, sig1 in sorted_clusters: # if any(sig1 < sig2 for _, sig2 in kept): # to_remove.add(cid) # print(to_remove) # else: # kept.append((cid, sig1)) # subset_removed += len(to_remove) # final_clusters[sample] = {cid: c for cid, c in final_clusters[sample].items() if cid not in to_remove} # # --- Keep only the best candidate for each isotopologue (based on the the closest m/z to expected) if keep_best_candidate is True --- # if keep_best_candidate: # for cluster in final_clusters[sample].values(): # iso_to_candidate = defaultdict(list) # base_mz = cluster.lowest_mz # for f in cluster.features: # iso_index = round((f.mz - base_mz) / self.mzshift_tracer) # iso_to_candidate[iso_index].append(f) # cluster.features = [min(candidates, key=lambda f: abs(f.mz - (base_mz + iso * self.mzshift_tracer))) for iso, candidates in iso_to_candidate.items()] # # --- Assign final cluster_id, isotopologues label, in_cluster and also_in to features --- # features_to_clusters = defaultdict(set) # for cluster in final_clusters[sample].values(): # for f in cluster.features: # features_to_clusters[f.feature_id].add(cluster.cluster_id) # for cluster in final_clusters[sample].values(): # cluster.features.sort(key=lambda f: f.mz) # min_mz=cluster.lowest_mz # for f in cluster.features: # iso_index = round((f.mz - min_mz) / self.mzshift_tracer) # iso_label = "Mx" if iso_index == 0 else f"Mx+{iso_index}" # f.cluster_isotopologue[cluster.cluster_id] = iso_label # f.in_cluster = list(features_to_clusters[f.feature_id]) # f.also_in = [c for c in f.in_cluster if c != cluster.cluster_id] # self.clusters = final_clusters # # Keep unclustered features for reference # self.unclustered_features = {} # for sample, features in self.features.items(): # self.unclustered_features[sample] = [f for f in features.values() if not f.in_cluster] # final = len(next(iter(self.clusters.values()))) if self.clusters else 0 # unclustered = sum(1 for f in next(iter(self.features.values())).values() if not f.in_cluster) if self.features else 0 # return merged, subset_removed, final, unclustered # def clusters_to_dataframe(self) -> pd.DataFrame: # """ # Convert the clusters into a pandas DataFrame for easier analysis and export. # :return: pd.DataFrame # """ # records = [] # for sample_name, clusters in self.clusters.items(): # for cluster in clusters.values(): # sorted_features = sorted(cluster.features, key=lambda f: f.mz) # for idx, f in enumerate(sorted_features): # iso_label = f.cluster_isotopologue.get(cluster.cluster_id, "Mx") # records.append({ # "ClusterID": cluster.cluster_id, # "FeatureID": f.feature_id, # "RT": f.rt, # "m/z": f.mz, # "sample": f.sample, # "Intensity": f.intensity, # "Isotopologue": iso_label, # "InClusters": f.in_cluster, # "AlsoIn": f.also_in # }) # return pd.DataFrame.from_records(records) # def export_clusters_to_tsv(self, filepath: str): # """ # Export the clusters to a CSV file. # :param filepath: str # """ # df = self.clusters_to_dataframe() # df.to_csv(filepath, sep="\t", index=False) # def export_features(self, filename: str): # """ # Export all features to a TSV file. # :param filename: str # """ # records = [] # for sample_name, features in self.features.items(): # for f in features.values(): # # If not in any cluster, mark accordingly # cluster_ids = f.in_cluster if f.in_cluster else ["None"] # iso_labels = [f.cluster_isotopologue.get(cid, "N/A") for cid in cluster_ids] # records.append({ # "FeatureID": f.feature_id, # "RT": f.rt, # "m/z": f.mz, # "sample": f.sample, # "Intensity": f.intensity, # "InClusters": cluster_ids, # "Isotopologues": iso_labels # }) # df = pd.DataFrame.from_records(records) # df.to_csv(filename, sep="\t", index=False)