flagx.io
This module implements data loading, processing, and export functionality.
FlowDataManager
- class flagx.io.FlowDataManager(data_file_names: ~typing.List[str], data_file_type: typing_extensions.Literal.('fcs', 'csv', 'lmd') | None = None, data_file_path: str | None = None, save_path: str | None = None, verbosity: int = 1)
Bases:
objectClass for loading, preprocessing, organizing, and exporting flow cytometry datasets stored as FCS or CSV files.
The class wraps a complete data-management pipeline for cytometry workflows:
Load raw files into AnnData objects:
load_data_files_to_anndata()Inspect sample sizes:
check_sample_sizes(),plot_sample_size_df()Check class balance:
check_class_balance(),plot_class_balance_df()Relabel datasets:
relabel_data()Align channel names across samples:
align_channel_names(),check_og_channel_names_df()Normalize/transform channels:
sample_wise_preprocessing()Perform train/val/test splitting:
perform_data_split()Downsample samples (optional stratification):
sample_wise_downsampling()Create PyTorch/NumPy dataloaders:
get_data_loader()Export datasets to disk:
save_to_numpy_files()
- invalid_files_
Filenames skipped due to incompatible type.
- Type:
list or None
- anndata_list_
List of loaded AnnData objects.
- Type:
list or None
- sample_sizes_
Summary of sample sizes.
- Type:
pd.DataFrame or None
- og_channel_names_
Original channel names per file before alignment.
- Type:
pd.DataFrame or None
- compensation_log_
Error log for compensation
- Type:
pd.DataFrame or None
- train_data_
Train split as a list of AnnData objects.
- Type:
list or None
- val_data_
Validation split as a list of AnnData objects.
- Type:
list or None
- test_data_
Test split as a list of AnnData objects.
- Type:
list or None
- Parameters:
data_file_names (List[str]) – List of input filenames to load.
data_file_type (Literal['fcs', 'csv', 'lmd'] or None) – Type of input files. If None, inferred from extension of the first file.
data_file_path (str or None) – Directory containing the raw files. Defaults to CWD.
save_path (str or None) – Output directory for any exported files. Defaults to CWD.
verbosity (int) – Logging level. 0: silent, 1: warnings, 2+: info/debug.
- load_data_files_to_anndata(reindex: bool = True, fcs_version_lmd: typing_extensions.Literal.('3.1', '3.0', '2.0')='3.0', read_csv_kwargs: Dict[str, ~typing.Any] | None=None) None
Load all provided data files into AnnData objects.
CSV files are read using Pandas. FCS and LMD files are parsed using the
flowioPython package (https://github.com/whitews/FlowIO). If a spillover matrix is present in the TEXT section of the FCS file, it is extracted and stored inadata.uns['meta']['spill']. For LMD files the loader looks for embedded FCS files and loads the one compliant withfcs_version_lmd. The data filename is always stored inadata.uns['filename']. Invalid files are skipped and recorded.- Parameters:
reindex (bool) – Whether to reindex the AnnData.var and the spillover matrix with PnS. Defaults to True.
fcs_version_lmd – (str): FCS version to load. Should be one of:
'3.1','3.0','2.0'. Defaults to'3.0'.read_csv_kwargs (dict or None) – Parameters passed on to
Pandas.read_csv(). Defaults to None.
- Raises:
ValueError – If file type cannot be inferred for the first file.
UserWarning – When skipping incompatible file types.
- Returns:
Loaded data samples are stored as
AnnDataobjects inanndata_list_.- Return type:
None
- check_sample_sizes(filename_sample_sizes_df: str | None = None)
Compute and optionally save a summary table with the number of events per dataset.
- Parameters:
filename_sample_sizes_df (str or None) – If provided, the summary dataframe is saved to this filename inside
save_path.- Returns:
Results stored in
sample_sizes_.- Return type:
None
- static plot_sample_size_df(sample_size_df: pandas.DataFrame, ax: matplotlib.pyplot.Axes | None = None) matplotlib.pyplot.Axes
Plot a bar chart of sample sizes from a summary dataframe.
- Parameters:
sample_size_df (pd.DataFrame) – Output of
check_sample_sizes_worker.ax (matplotlib.axes.Axes or None) – Existing axes to plot into, or None to create new axes.
- Returns:
Axes containing the bar plot.
- Return type:
matplotlib.axes.Axes
- align_channel_names(reference_channel_names: int | dict = 0, filename_log_df: str | None = None) None
Standardize channel names across all samples using either a reference sample or a user-provided mapping.
Channel names from each sample are aligned so that all
AnnDataobjects share identicalvar_names. A log DataFrame is stored to retain the original channel names for inspection.- Assumptions:
All samples contain the same number of channels.
Channels appear in the same order across samples.
If assumption (1) is violated, a
ValueErroris raised. Violations of assumption (2) cannot be detected automatically and may result in incorrectly assigned chanel names. It is therefore recommended to verify both assumptions prior to calling this method. :param reference_channel_names: • int: index of the reference AnnData inanndata_list_.dict: mapping {old_name: new_name}.
None: use the first sample as reference.
- Parameters:
filename_log_df (str or None) – Filename to save the channel-name log dataframe. If None, no file is saved.
- Returns:
Log dataframe stored in
og_channel_names_.- Return type:
None
- check_og_channel_names_df() None
Validate consistency of original channel names before alignment.
Checks whether each channel index had identical names across all samples. If inconsistencies are found, a warning is emitted.
- Returns:
None
- sample_wise_compensation(filename_compensation_log: str | None = None) None
Apply fluorescence compensation to each sample in
anndata_list_.Each AnnData object is compensated independently. An
'uncompensated'layer is added containing a copy of the original expression matrix before compensation.- Returns:
The compensated data are written in place into
anndata_list_and a dataframe with logs is saved tocompensation_logs.csv.- Return type:
None
- sample_wise_preprocessing(flavour: typing_extensions.Literal.('logicle', 'arcsinh', 'biexp', 'log10_w_cutoff', 'log10_w_custom_cutoffs', 'custom')='arcsinh', save_raw_to_layer: str | None = None, **kwargs) None
Applies a per-sample preprocessing transformation to all AnnData objects.
This method supports common cytometry transformations such as arcsinh, logicle, and biexponential scaling (For detailed documentation see: https://pytometry.netlify.app/api (11/27/2025). Log10-based transformations require user-specified cutoffs. Fully custom preprocessing functions may also be supplied.
- Parameters:
flavour (Literal['logicle', 'arcsinh', 'biexp', 'log10_w_cutoff', 'log10_w_custom_cutoffs', 'custom']) –
The transformation type to apply.
Options:
'logicle','arcsinh','biexp': Apply the corresponding cytometry scaling function. Parameters (e.g. the arcsinh cofactor) can be passed viakwargs.'log10_w_cutoff': Requires acutoff(float) passed viakwargs.'log10_w_custom_cutoffs': Requirescutoffs(dict mapping channel names to values) passed viakwargs.'custom': Expects a user-defined preprocessing callable passed aspreprocessing_methodviakwargs. The callable must modify the AnnData object in place.
save_raw_to_layer (str or None) – If provided, the raw (untransformed) data matrix of each AnnData object will be saved under
adata.layers[save_raw_to_layer]before transformation.**kwargs – Additional arguments forwarded to the selected transformation function or to the custom preprocessing callable.
- Returns:
The transformation is performed in place on each AnnData object.
- Return type:
None
- perform_data_split(data_split: Tuple[float, float] | Tuple[float, float, float] | pandas.DataFrame = (0.75, 0.25), filename_data_split: str | None = None, **kwargs) None
Split the dataset into train-test- or train-validation-test-sets.
- Splitting can be done in two ways:
By providing fractions (e.g., (0.7, 0.2, 0.1))
By passing a saved dataframe specifying each sample’s assignment
- Parameters:
data_split (tuple or pd.DataFrame) – Fractions for train/(val)/test or a dataframe with columns
'filename'and'mode'.filename_data_split (str or None) – If provided, the split assignment is saved to
save_pathin CSV format.**kwargs – Additional parameters passed to Sklearns’s
train_test_splitsuch as'random_state','shuffle', or'stratify'.
- Returns:
Results stored in
train_data_,val_data_, andtest_data_.- Return type:
None
- sample_wise_downsampling(data_set: typing_extensions.Literal.('train', 'val', 'test', 'all'), target_num_events: int | float, stratified: bool = False, label_key: int | str | None = None, label_layer_key: str | None = None) List[numpy.ndarray]
Downsample each sample in the specified dataset.
- Downsampling may be:
Uniform random (no stratification)
Stratified by class labels (requires
label_key)
- Parameters:
data_set (Literal['train', 'val', 'test', 'all']) – Which subset to downsample.
target_num_events (int or float) – If ≥1: absolute number of events to retain. If <1: fraction of events to retain.
stratified (bool) – Whether to preserve class proportions via stratified sampling.
label_key (int, str, or None) – Key to labels for stratification (X column index, var name, or obs key).
label_layer_key (str or None) – Layer name if labels are stored in a layer instead of
.X.
- Returns:
List of boolean arrays used for downsampling.
- Return type:
list[np.ndarray]
- check_class_balance(data_set: typing_extensions.Literal.('train', 'val', 'test', 'all'), label_key: int | str, label_layer_key: str | None = None, filename_class_balance_df: str | None = None) pandas.DataFrame | None
Compute class frequency and counts for a dataset subset.
- Parameters:
data_set (Literal['train', 'val', 'test', 'all']) – Subset to analyze.
label_key (int or str) – Location of labels (X column index, var name, or obs key).
label_layer_key (str or None) – Layer key if labels are stored in a layer instead of
.X.filename_class_balance_df (str or None) – Optional output file for saving the class-balance dataframe to
save_pathin CSV format.
- Returns:
Dataframe with columns
'count'and'fraction'and labels in index. None if the specified subset does not exist.- Return type:
pd.DataFrame or None
- static plot_class_balance_df(class_balance_df: pandas.DataFrame, ax: matplotlib.pyplot.Axes | None = None) matplotlib.pyplot.Axes
Plot absolute and relative class frequencies as a bar chart.
- Parameters:
class_balance_df (pd.DataFrame) – Output from
check_class_balancecontaining'count'and'fraction'and labels in index.ax (Axes or None) – Matplotlib axis to plot into. Creates a new figure if None.
- Returns:
The axis containing the plot.
- Return type:
matplotlib.axes.Axes
- get_data_loader(data_set: typing_extensions.Literal.('train', 'val', 'test', 'all'), channels: List[int] | List[str] | None = None, layer_key: str | None = None, label_key: int | str | None = None, label_layer_key: str | None = None, batch_size: int = -1, shuffle: bool = True, return_data_loader: typing_extensions.Literal.('np_array', 'torch_tensor')='np_array', on_disk: bool = False, filename_np: str | None = None, **kwargs) torch.utils.data.DataLoader | None
Construct a dataloader for the selected dataset split.
This method concatenates samples, extracts requested channels, appends labels (optional), and returns a PyTorch dataloader that returns either PyTorch Tensors or Numpy arrays.
- Parameters:
data_set (Literal['train', 'val', 'test', 'all']) – Subset from which to load data.
channels (list[int] or list[str] or None) – Which channels (features) to extract. Defaults to all channels.
layer_key (str or None) – Layer key if data should come from a layer instead of
.X.label_key (int, str, or None) – Location of labels: X column index, var name, or obs key. If None, no labels are added.
label_layer_key (str or None) – Layer key if labels are stored in a layer instead of
.X.batch_size (int) – Batch size. -1 loads all data at once.
shuffle (bool) – Whether to shuffle samples each epoch.
return_data_loader (Literal['np_array', 'torch_tensor']) – Output format of the dataloader.
on_disk (bool) – If True, data is first saved to disk as a .npy file and loaded lazily in memory-mapped mode.
filename_np (str or None) – Filename for on-disk storage when
on_disk=True.**kwargs – Additional arguments forwarded to
FlowDataLoadersand in term PyTorch DataLoader.
- Returns:
The prepared dataloader, or None if the chosen subset is unavailable.
- Return type:
DataLoader or None
- save_to_numpy_files(data_set: typing_extensions.Literal.('train', 'val', 'test', 'all'), sample_wise: bool = False, save_path: str | None = None, filename_suffix: str | None = None, channels: List[int] | List[str] | None = None, layer_key: str | None = None, label_key: int | str | None = None, label_layer_key: str | None = None, shuffle: bool = True, precision: typing_extensions.Literal.('16bit', '32bit', '64bit')='32bit')
Export data to .npy files in either combined or per-sample format.
The exported data matrices may optionally include labels and may be stored with user-selected numeric precision. Files are placed in
save_path.- Parameters:
data_set (Literal['train', 'val', 'test', 'all']) – Which subset to export.
sample_wise (bool) – If False: save all data as a single matrix. If True: save one file per sample.
save_path (str or None) – Output directory. Defaults to the manager’s
save_path.filename_suffix (str or None) – Optional suffix appended to output filenames.
channels (list[int] or list[str] or None) – Channels to export; defaults to all.
layer_key (str or None) – Which layer to export; defaults to
.X.label_key (int, str, or None) – If provided, labels are appended or saved separately.
label_layer_key (str or None) – Layer containing labels, if not
.X.shuffle (bool) – Whether to shuffle events before saving.
precision (Literal['16bit', '32bit', '64bit']) – Numeric precision for output arrays.
- Returns:
None
- relabel_data(data_set: typing_extensions.Literal.('train', 'val', 'test', 'all'), old_to_new_label_mapping: ~typing.Dict[~typing.Any, ~typing.Any], label_key: int | str, label_layer_key: str | None = None, new_label_key: str = 'new_labels') None
Apply a mapping from old to new labels for all samples in a dataset.
The new labels are always written to .obs[new_label_key] to avoid interference with existing preprocessing, layers, or var-based labels.
- Parameters:
data_set (Literal['train', 'val', 'test', 'all']) – Which data subset to relabel.
old_to_new_label_mapping (dict) – Dictionary mapping old labels to new labels.
label_key (int or str) – Location of original labels (X column index, var name, or obs key).
label_layer_key (str or None) – If labels are stored in a layer instead of
.X.new_label_key (str) – Name of the new label field added to
.obs.
- Returns:
None
export_to_fcs
- flagx.io.export_to_fcs(data_list: List[scanpy.AnnData], layer_key: str | None = None, sample_wise: bool = False, add_columns: List[List[numpy.ndarray]] | None = None, add_columns_names: List[str] | None = None, scale_columns: List[str] | None = None, val_range: Tuple[float, float] = (0.0, 1048576), keep_unscaled: bool = False, save_path: str | None = None, save_filenames: str | List[str] | None = None)
Export one or multiple AnnData objects to FCS files.
This function converts the
.Xmatrix or a specified layer of each AnnData object into FCS-compatible numeric event tables, optionally adds user-provided columns, scales selected columns to a defined numeric range, and writes the result to FCS 3.1 files using the Flowio Python package.Export can be performed sample-wise (one FCS per AnnData) or as a single concatenated FCS containing all samples.
- Parameters:
data_list (List[AnnData]) – List of AnnData objects to export.
layer_key (str or None) – Name of the AnnData layer to use instead of
adata.X. IfNone, the.Xmatrix is used.sample_wise (bool) – If
True, each AnnData object is exported to its own FCS file. IfFalse, all samples are concatenated into a single FCS file.add_columns (List[List[np.ndarray]] or None) – Optional list of lists, where
add_columns[i]contains one additional column per sample. Each inner list must have the same length asdata_list. Columns are appended to the exported DataFrame(s). Example:add_columns = [[col_for_sample0, col_for_sample1, ...], [...], ...]add_columns_names (List[str] or None) – Names corresponding to each entry in
add_columns. Must have the same length asadd_columns. Example:add_columns_names = ['UMAP1', 'UMAP2', ...]scale_columns (List[str] or None) – Column names to rescale into
val_range. Scaling is either sample-specific (sample_wise=True) or global across all samples (sample_wise=False). Functionality exists for better display of added columns in classical flow cytometry analysis tools.val_range (Tuple[float, float]) – Minimum and maximum allowed values in the FCS file. Scaled columns are mapped to this numeric range (with a 5% margin removed at both ends to avoid boundary clipping). Defaults to (0.0, 2**20).
keep_unscaled (bool) – If
True, a copy of each scaled column is stored with suffix'_unscaled'before transformation.save_path (str or None) – Directory in which the FCS files will be written.
save_filenames (str or List[str] or None) – Output filename(s). If
sample_wise=True, list of filenames. Else, a single filename. IfNone, default names are generated.
- Raises:
ValueError – If
add_columnsandadd_columns_namesdo not match in length, or ifadd_columnsis supplied without column names.
- Outputs:
One or multiple .fcs files written to
save_path.If
sample_wise=False, also writesfilenames_and_sample_id.csvcontaining sample name to ID mapping.
Notes
When
sample_wise=False, a'sample_id'column is automatically added unless already present.Column scaling is performed as linear min-max scaling.
FCS metadata fields
PnRare set according toval_range.
FlowDataset
- class flagx.io.FlowDataset(*args: Any, **kwargs: Any)
Bases:
DatasetA lightweight dataset wrapper for flow cytometry matrices stored in memory or on disk.
This dataset supports two usage modes: - In-memory: a NumPy array is passed directly. - On-disk: a .npy file path is passed and optionally memory-mapped for efficient loading of large datasets.
If
includes_labels=True, the dataset assumes that the last column of the matrix contains integer class labels.- data
The underlying data matrix.
- Type:
np.ndarray or np.memmap
- on_disk
Whether the dataset is backed by a memory-mapped .npy file or in memory.
- Type:
bool
- includes_labels
Whether the last column contains labels.
- Type:
bool
- label_idx
Column index of the last columns where the labels are stored (only when includes_labels=True).
- Type:
int
- file_path
Path to .npy file when loading from disk.
- Type:
str
- Parameters:
data (str or np.ndarray) – Either a path to a .npy file or a NumPy array containing the data. If a path is given, the file is loaded in the mode determined by on_disk.
on_disk (bool) – If True, the .npy file is memory-mapped instead of fully loaded into RAM. Has no effect when data is already an array.
includes_labels (bool) – If True, the last column of the data matrix is interpreted as label values.
- Raises:
ValueError – If data is neither a file path nor a NumPy array.
ValueError – If includes_labels=True but the provided matrix does not have at least two columns.
FlowDataLoaders
- class flagx.io.FlowDataLoaders(dataset: torch.utils.data.Dataset, **kwargs)
Bases:
objectWrapper providing two PyTorch dataloaders for a given
Dataset:pytorch_dataloader: standard PyTorch dataloader using default collation.pytorch_np_dataloader: PyTorch dataloader that returns NumPy arrays instead of tensors via a customnp_collatefunction.
This class exists to conveniently switch between tensor-based and NumPy-based dataloading pipelines without modifying the underlying dataset or training code.
- dataset
The dataset from which batches are generated.
- Type:
Dataset
- pytorch_dataloader
Standard PyTorch dataloader using default tensor collation.
- Type:
DataLoader
- pytorch_np_dataloader
Dataloader returning NumPy arrays through a custom
np_collatefunction.- Type:
DataLoader
- Parameters:
dataset (Dataset) – Any PyTorch-compatible dataset instance producing samples or sample-label pairs.
**kwargs – Additional arguments forwarded directly to
torch.utils.data.DataLoader, such asbatch_size,shuffle, ornum_workers.