Source code for seq_tables

"""
**We can use Pandas to analyze aligned sequences in a table. This can be useful for quickly generating AA or NT distribution by position
and accessing specific positions of an aligned sequence**
"""
import gc
import copy
import warnings
import pandas as pd
import math
import numpy as np
try:
    from plotly import graph_objs as go
    plotly_installed = True
except:
    plotly_installed = False
    warnings.warn("PLOTLY not installed so interactive plots are not available. This may result in unexpected funtionality")

try:
    from Bio import SeqIO
    bio_installed = True
except:
    bio_installed = False

from collections import defaultdict


degen_to_base = {
    'GT': 'K',
    'AC': 'M',
    'ACG': 'V',
    'CGT': 'B',
    'AG': 'R',
    'AGT': 'D',
    'A': 'A',
    'CG': 'S',
    'AT': 'W',
    'T': 'T',
    'C': 'C',
    'G': 'G',
    'ACGT': 'N',
    'ACT': 'H',
    'CT': 'Y'
}

dna_alphabet = list('ACTG') + sorted(list(set(sorted(degen_to_base.values())) - set('ACTG'))) + ['-.']
aa_alphabet = list('ACDEFGHIKLMNPQRSTVWYX*Z-.')

amino_acid_color_properties = defaultdict(lambda: {"color": "#f1f2f1", "name": "Unknown"}, {
        'R': {"color": "#1FAABD", "name": "Arginine", "short": "Ala", "charge": 1, "hydropathy": -4.5},
        'H': {"color": "#1FAABD", "name": "Histidine", "short": "His", "charge": 1, "hydropathy": -3.2},
        'K': {"color": "#1FAABD", "name": "Lysine", "short": "Lys", "charge": 1, "hydropathy": -3.9},

        'D': {"color": "#D75032", "name": "Aspartic acid", "short": "Asp", "charge": -1, "hydropathy": -3.5},
        'E': {"color": "#D75032", "name": "Glutamic acid", "short": "Glu", "charge": -1, "hydropathy": -3.5},

        'C': {"color": "#64AD59", "name": "Cysteine", "short": "Cys", "charge": 0, "hydropathy": 2.5},
        'S': {"color": "#64AD59", "name": "Serine", "short": "Ser", "charge": 0, "hydropathy": -0.8},
        'G': {"color": "#64AD59", "name": "Glycine", "short": "Cys", "charge": 0, "hydropathy": -0.4},
        'Y': {"color": "#64AD59", "name": "Tyrosine", "short": "Tyr", "charge": 0, "hydropathy": -0.8},
        'T': {"color": "#64AD59", "name": "Threonine", "short": "Thr", "charge": 0, "hydropathy": -0.7},


        'P': {"color": "#4B3E4D", "name": "Proline", "short": "Pro", "charge": 0, "hydropathy": -1.6},
        'F': {"color": "#4B3E4D", "name": "Phenylalanine", "short": "Phe", "charge": 0, "hydropathy": 2.8},
        'V': {"color": "#4B3E4D", "name": "Valine", "short": "Val", "charge": 0, "hydropathy": 4.2},
        'L': {"color": "#4B3E4D", "name": "Leucine", "short": "Leu", "charge": 0, "hydropathy": 3.8},
        'I': {"color": "#4B3E4D", "name": "Isoleucine", "short": "Ili", "charge": 0, "hydropathy": 4.5},
        'A': {"color": "#4B3E4D", "name": "Alanine", "short": "Ala", "charge": 0, "hydropathy": 1.8},

        'M': {"color": "#E57E25", "name": "Methionine", "short": "Met", "charge": 0, "hydropathy": 1.9},
        'W': {"color": "#E57E25", "name": "Tryptophan", "short": "Trp", "charge": 0, "hydropathy": -0.9},

        'N': {"color": "#92278F", "name": "Asparagine", "short": "Asn", "charge": 0, "hydropathy": -3.5},
        'Q': {"color": "#92278F", "name": "Glutamine", "short": "Pro", "charge": 0, "hydropathy": -3.5},

        'X': {"color": "#f1f2f1", "name": "Unknown", "short": "X", "charge": 0, "hydropathy": 0},
        '*': {"color": "#f1f2f1", "name": "Unknown", "short": "X", "charge": 0, "hydropathy": 0}
    }
)

dna_colors = defaultdict(lambda: {"color": "#f1f2f1", "name": "Unknown"}, {
    'A': {"color": "#1FAABD", "name": "Adenine", "short": "A"},
    'T': {"color": "#D75032", "name": "Thymine", "short": "T"},
    'G': {"color": "#4B3E4D", "name": "Guanine", "short": "G"},
    'C': {"color": "#64AD59", "name": "Cytosine", "short": "C"},
    'X': {"color": "#f1f2f1", "name": "Unknown", "short": "X"},
    'N': {"color": "#f1f2f1", "name": "Unknown", "short": "X"}
})


def strseries_to_bytearray(series, fillvalue):
    max_len = series.apply(len).max()
    series = series.apply(lambda x: x.ljust(max_len, fillvalue))
    seq_as_int = np.array(list(series), dtype='S').view('S1').reshape((series.size, -1)).view('uint8')
    return (series, seq_as_int)


[docs]def draw_seqlogo_barplots(seq_dist, alphabet=None, label_cutoff=0.09, use_properties=True, additional_text = {}, show_consensus=True, scale_by_distance=False, annotation_font_size=14, yaxistitle='', bargap=None, plotwidth=None, num_y_ticks = 3): """ Uses plotly to generate a sequence logo as a series of bar graphs. This method of sequence logo is taken from the following source: Repository: https://github.com/ISA-tools/SequenceLogoVis Paper: E. Maguire, P. Rocca-Serra, S.-A. Sansone, and M. Chen, Redesigning the sequence logo with glyph-based approaches to aid interpretation, In Proceedings of EuroVis 2014, Short Paper (2014) Args: seq_dist (Dataframe): Rows should be unique letters, Columns should be a specific position within the sequence, Values should be the values that represent the frequency OR bits of a specific letter at a specific position alphabet (string): AA or NT label_cutoff (int, default=0.09): Defines a cutoff for when to stop adding 'text' labels to the plot (i.e. dont add labels for low freq letters) use_properties (bool, default=True): If True and if the alphabet is AA then it will color AA by their properties additional_text (list of tuples): For each tuple, element 0 is string/label, element 1 is string of letters at each position. i.e. additional_text = [('text', 'AACCA'), ('MORE', 'ATTTT')]. This is meant to add extra layers of information for the sequence. For example may you would like to include the WT sequence at the bottom show_consensus (bool): If True will show the consensus sequence at the bottom of the graph scale_by_distance (bool): If True, then each x-coordinate of the bar graph will be equal to the column position in the dataframe. For example if you are showing a motif in residues 10, 50, and 90 then the xcoordinates of the bars wil be 10, 50, 90 rather than 1,2,3 annotation_font_size (int): size of text font yaxistitle (string): how to label the y axis bargap (float, default=None): bargap parameter for plots. If None, then lets plotly handle it plotwidth (float, default=None): defines the total width of the plot and individual bars num_y_ticks (int): How many ytick labels should appear in plot Returns: fig (plotly.graph_objs.Figure): plot object for plotting in plotly Examples: >>> from seq_tables import draw_seqlogo_barplots >>> import pandas as pd >>> from plotly.offline import iplot, init_notebook_mode >>> init_notebook_mode() >>> distribution = pd.DataFrame({1: [0.9, 0.1, 0 ,0], 2: [0.5, 0.2, 0.1, 0.2], 3: [0, 0, 0, 1], 4: [0.25, 0.25, 0.25, 0.25]}, index=['A', 'C', 'T', 'G']) >>> plotdata = draw_seqlogo_barplots(distribution) >>> iplot(plotdata) """ seq_dist = seq_dist.copy() if plotly_installed is False: warnings.warn('Cannot generate seq logo plots. Please install plotly') return None data = [] if alphabet is None: letters = list(seq_dist.index) alphabet = 'nt' if len(set(letters) - set(['A', 'C', 'T', 'G', 'N', 'S'])) > 0 else 'aa' annotation = [] if alphabet.lower() == 'nt': colors = {c1: dna_colors[c1]['color'] for c1 in list(dna_colors.keys()) + list(seq_dist.index)} elif alphabet.lower() == 'aa': if use_properties is True: colors = {c1: amino_acid_color_properties[c1]['color'] for c1 in list(amino_acid_color_properties.keys()) + list(seq_dist.index) } labels = seq_dist.columns if scale_by_distance is False: seq_dist = seq_dist.rename(columns = {r: i + 1 for i, r in enumerate(seq_dist.columns)}) max_dist = seq_dist.shape[1] + 1 else: start = min(seq_dist.columns) seq_dist = seq_dist.rename(columns = {r: r - start + 1 for i, r in enumerate(seq_dist.columns)}) max_dist = max(seq_dist.columns) + 1 if plotwidth is None: plotwidth=max(400, ((350/6.0) * seq_dist.shape[1])) cnt = 0 for i in seq_dist.columns: top = 0 l = False if cnt > 0 else True for name, val in seq_dist.loc[:, i].sort_values().iteritems(): top += val data.append(go.Bar(y = [val], x=[i], name=name, marker =dict( color= colors[name], line = dict( color='white', width = 1.50) ), legendgroup = name, showlegend=l ), ) if val > label_cutoff: annotation.append( dict(x=i, y=top, align='center', xanchor='center', yanchor='top', text=name, font=dict(color='white', size=annotation_font_size), showarrow=False) ) cnt += 1 consensus_seq = seq_dist.idxmax() consensus_seq = dict(consensus_seq) starting_y = -0.1 if show_consensus else 0.0 for pos, xc in enumerate(seq_dist.columns): if show_consensus: annotation.append( dict(x=xc, y= -0.1, align='center', xanchor='center', yanchor='top', text=consensus_seq[xc], font=dict(color=colors[consensus_seq[xc]], size=annotation_font_size), showarrow=False) ) for numk, rows in enumerate(additional_text): key = rows[0] textval = rows[1] if pos < len(textval): annotation.append( dict(x=xc, y= -0.1 * (numk + 1) + starting_y, align='center', xanchor='center', yanchor='top', text=textval[pos], font=dict(color=colors[textval[pos]], size=annotation_font_size), showarrow=False) ) if show_consensus: annotation.append( dict(x=-0.1, y= -0.1, align='center', xanchor='right', yanchor='top', text='Consensus', font=dict(color='black', size=14), showarrow=False) ) for numk, rows in enumerate(additional_text): annotation.append( dict(x=-0.1, y= -0.1 * (numk + 1) + starting_y, align='center', xanchor='right', yanchor='top', text=rows[0], font=dict(color='black', size=12), showarrow=False) ) num_y_ticks = max(3, num_y_ticks) miny = 0 # math.floor(seq_dist.min().min()) maxy = math.ceil(seq_dist.max().max()) tick_steps = ((maxy * 1.0) - 0) / (num_y_ticks - 1) tmp = miny tick_vals = [] while tmp <= maxy: tick_vals.append(round(tmp, 2)) tmp += tick_steps layout = go.Layout( barmode='stack', annotations=annotation, yaxis=dict(showgrid=False, rangemode='nonnegative', tickvals=tick_vals, zeroline=False, showline=True, title=yaxistitle, ), xaxis=dict(showgrid=False, rangemode='nonnegative', side='top', showline=False, zeroline=False, ticktext=labels, tickvals = seq_dist.columns), # xaxis2=dict(overlaying='x', side='bottom', tickvals = seq_dist.columns, ticktext = consensus_seq), legend=dict(traceorder='reversed'), width=plotwidth, # margin={'l': 90} ) if bargap is not None: layout['bargap'] = bargap fig = go.Figure(data=data, layout=layout) return fig
[docs]def get_quality_dist(qual_df, bins=None, percentiles=[0.1, 0.25, 0.5, 0.75, 0.9], exclude_null_quality=True, sample=None): """ Returns the distribution of quality across the given sequence, similar to FASTQC quality seq report. Args: bins(list of ints or tuples, default=None): bins defines how to group together the columns/sequence positions when aggregating the statistics. .. note:: bins=None If bins is none, then by default, bins are set to the same ranges defined by fastqc report percentiles (list of floats, default=[0.1, 0.25, 0.5, 0.75, 0.9]): value passed into pandas describe() function. exclude_null_quality (boolean, default=True): do not include quality scores of 0 in the distribution sample (int, default=None): If defined, then we will only calculate the distribution on a random subsampled population of sequences Returns: data (DataFrame): contains the distribution information at every bin (min value, max value, desired precentages and quartiles) graphs (plotly object): contains plotly graph objects for generating plots of the data afterwards Examples: Show the median of the quality at the first ten positions in the sequence >>> table = SeqTable(['AAAAAAAAAA', 'AAAAAAAAAC', 'CCCCCCCCCC'], qualitydata=['6AA9-C9--6C', '6AA!1C9BA6C', '6AA!!C9!-6C']) >>> box_data, graphs = table.get_quality_dist(bins=range(10), percentiles=[0.5]) Now repeat the example from above, except group together all values from the first 5 bases and the next 5 bases i.e. All qualities between positions 0-4 will be grouped together before performing median, and all qualities between 5-9 will be grouped together). Also, return the bottom 10 and upper 90 percentiles in the statsitics >>> box_data, graphs = table.get_quality_dist(bins=[(0,4), (5,9)], percentiles=[0.1, 0.5, 0.9]) We can also plot the results as a series of boxplots using plotly >>> from plotly.offline import init_notebook_mode, iplot, plot, iplot_mpl # assuming ipython.. >>> init_notebook_mode() >>> plotly.iplot(graphs) # using outside of ipython >>> plotly.plot(graphs) """ from collections import OrderedDict if bins is None: # use default bins as defined by fastqc report bins = [ 1, 2, 3, 4, 5, 6, 7, 8, 9, (10, 14), (15, 19), (20, 24), (25, 29), (30, 34), (35, 39), (40, 44), (45, 49), (50, 54), (55, 59), (60, 64), (65, 69), (70, 74), (80, 84), (85, 89), (90, 94), (95, 99), (100, 104), (105, 109), (110, 114), (115, 119), (120, 124), (125, 129), (130, 134), (135, 139), (140, 144), (145, 149), (150, 154), (155, 159), (160, 164), (165, 169), (170, 174), (175, 179), (180, 184), (185, 189), (190, 194), (195, 199), (200, 204), (205, 209), (210, 214), (215, 219), (220, 224), (225, 229), (230, 234), (235, 239), (240, 244), (245, 249), (250, 254), (255, 259), (260, 264), (265, 269), (270, 274), (275, 279), (280, 284), (285, 289), (290, 294), (295, 299), 300 ] bins = [x if isinstance(x, int) else (x[0], x[1]) for x in bins] else: # just in case its a generator (i.e. range function) bins = [x for x in bins] binnames = OrderedDict() for b in bins: if isinstance(b, int): binnames[str(b)] = (b, b) elif len(b) == 2: binnames[str(b[0]) + '-' + str(b[1])] = (b[0], b[1]) def get_binned_cols(column): # use this function to group together columns for n, v in binnames.items(): if column >= v[0] and column <= v[1]: return n temp = qual_df.replace(0, np.nan) if exclude_null_quality else self.qual_table if sample: temp = temp.sample(sample) def agg_fxn(group, per): # use this function to aggregate quality scores and create distributions/boxplots col = group.columns name = str(col[0]) if len(col) == 1 else str(col[0]) + '-' + str(col[-1]) per = [round(p, 2) for p in per] to_add_manually = set([0.10, 0.25, 0.50, 0.75, 0.90]) - set(per) program_added_values = {f: str(int(f * 100)) + '%' for f in to_add_manually} per = per + list(to_add_manually) g = group.stack().describe(percentiles=per) # Now create a small fake distribution for making box plots. This is preferred over just storing all millions of datapoint l = 100 # man this is ugly, gotta clean this up some hwow storevals = [g.loc['min'], g.loc['10%'], g.loc['25%'], g.loc['50%'], g.loc['75%'], g.loc['90%'], g.loc['max']] if g.loc['50%'] < 20: color = 'red' elif g.loc['50%'] < 30: color = 'blue' else: color = 'green' subsets = [int(x) for x in np.arange(0, 1, 0.05) * l] sample_data = np.zeros(l) sample_data[0:subsets[1]] = storevals[1] sample_data[subsets[1]:subsets[3]] = storevals[1] sample_data[subsets[3]:subsets[7]] = storevals[2] sample_data[subsets[7]:subsets[13]] = storevals[3] sample_data[subsets[13]:subsets[17]] = storevals[4] sample_data[subsets[17]:subsets[19]] = storevals[5] sample_data[subsets[19]:] = storevals[5] median = g.loc['50%'] # now only store the values the user wanted g = g.drop(program_added_values.values()) if plotly_installed: plotdata = go.Box( y=sample_data, pointpos=0, name=name, boxpoints=False, fillcolor=color, showlegend=False, line={ 'color': 'black', 'width': 0.7 }, marker=dict( color='rgb(107, 174, 214)', size=3 ) ) else: plotdata = None return (g, plotdata, median) # group results using the aggregation function defined above grouped_data = temp.groupby(get_binned_cols, axis=1).apply(lambda groups: agg_fxn(groups, percentiles)) labels = [b for b in binnames.keys() if b in grouped_data.keys()] data = pd.DataFrame(grouped_data.apply(lambda x: x[0])).transpose()[labels] if plotly_installed is True: graphs = list(grouped_data.apply(lambda x: x[1]).transpose()[labels]) # median_vals = list(grouped_data.apply(lambda x: x[2]).transpose()[labels]) scatter_min = go.Scatter(x=data.columns, y=data.loc['min'], mode='markers', name='min', showlegend=False) # scatter_median = go.Scatter(x=data.columns, y=median_vals, mode='line', name='median', line=dict(shape='spline') scatter_mean = go.Scatter(x=data.columns, y=data.loc['mean'], mode='line', name='mean', line=dict(shape='spline')) graphs.extend([scatter_min, scatter_mean]) else: graphs = None return data, graphs
[docs]class seqtable(): """ Class for viewing aligned sequences within a list or dataframe. This will take a list of sequences and create views such that we can access each position within specific positions. It will also associate quality scores for each base if provided. Args: seqdata (Series, or list of strings): List containing a set of sequences aligned to one another qualitydata (Series or list of quality scores, default=None): If defined, then user is passing in quality data along with the sequences) start (int): Explicitly define where the aligned sequences start with respect to some refernce frame (i.e. start > 2 means sequences start at position 2 not 1) index (list of values defining the index, default=None): .. note::Index=None If None, then the index will result in default integer indexing by pandas. seqtype (string of 'AA' or 'NT', default='NT'): Defines the format of the data being passed into the dataframe phred_adjust (integer, default=33): If quality data is passed, then this will be used to adjust the quality score (i.e. Sanger vs older NGS quality scorning) Attributes: seq_df (Dataframe): Each row in the dataframe is a sequence. It will always contain a 'seqs' column representing the sequences past in. Optionally it will also contain a 'quals' column representing quality scores seq_table (Dataframe): Dataframe representing sequences as characters in a table. Each row in the dataframe is a sequence. Each column represents the position of a base/residue within the sequence. The 4th position of sequence 2 is found as seq_table.ix[1, 4] qual_table (Dataframe, optional): Dataframe representing the quality score for each character in seq_table Examples: >>> sq = seq_tables.seqtable(['AAA', 'ACT', 'ACA']) >>> sq.hamming_distance('AAA') >>> sq = read_fastq('fastqfile.fq') """ def __init__(self, seqdata=None, qualitydata=None, start=1, index=None, seqtype='NT', phred_adjust=33, null_qual='!', **kwargs): self.null_qual = null_qual self.start = start if seqtype not in ['AA', 'NT']: raise Exception('You defined seqtype as, {0}. We only allow seqtype to be "AA" or "NT"'.format(seqtype)) self.seqtype = seqtype self.phred_adjust = phred_adjust self.fillna_val = 'N' if seqtype == 'NT' else 'X' self.loc = seqtable_indexer(self, 'loc') self.iloc = seqtable_indexer(self, 'iloc') self.ix = seqtable_indexer(self, 'ix') if seqdata is not None: self.index = index self._seq_to_table(seqdata) self.qual_table = None if (isinstance(qualitydata, pd.Series) and qualitydata.empty is False) or qualitydata is not None: self.qual_to_table(qualitydata, phred_adjust, return_table=False) def __len__(self): return self.seq_list.shape[0] def slice_object(self, method, params): if method == 'loc': seq_table = self.seq_table.loc[params] qual_table = self.qual_table.loc[params] if self.qual_table is not None else None seq_df = self.seq_df.loc[params] elif method == 'iloc': seq_table = self.seq_table.iloc[params] qual_table = self.qual_table.iloc[params] if self.qual_table is not None else None seq_df = self.seq_df.iloc[params] elif method == 'ix': seq_table = self.seq_table.ix[params] qual_table = self.qual_table.ix[params] if self.qual_table is not None else None seq_df = self.seq_df.ix[params] if isinstance(seq_table, pd.Series): seq_table = pd.DataFrame(seq_table) if isinstance(qual_table, pd.Series): qual_table = pd.DataFrame(qual_table) if isinstance(seq_table, pd.Series): seq_table = pd.DataFrame(seq_table) try: return self.copy_using_template(seq_table, seq_df, qual_table) except: if isinstance(qual_table, pd.DataFrame): return self.copy_using_template(seq_table.transpose(), seq_df.transpose(), qual_table.transpose()) else: return self.copy_using_template(seq_table.transpose(), seq_df.transpose(), None) def __getitem__(self, key): seq_table = self.seq_table.__getitem__(key) return self.copy_using_template(seq_table) def copy_using_template(self, template, template_seqdf=None, template_qual=None): new_member = seqtable(seqtype=self.seqtype, phred_adjust=self.phred_adjust, null_qual=self.null_qual) if template_qual is None and self.qual_table is not None: qual_table = self.qual_table.loc[template.index, template.columns] else: qual_table = None if template_seqdf is None: self.slice_sequences(template.columns) seqs = self.slice_sequences(template.columns).loc[template.index] else: seqs = template_seqdf new_member.seq_df = seqs new_member.seq_table = template new_member.qual_table = qual_table new_member.index = template.index return new_member def view_bases(self, as_dataframe=False): return self.seq_table.values.view('S1') def shape(self): return self.seq_table.shape def __repr__(self): return self.seq_df.__repr__() def __str__(self): return self.seq_table.__str__() def copy(self): return copy.deepcopy(self)
[docs] def subsample(self, numseqs): """ Return a random sample of sequences as a new object Args: numseqs (int): How many sequences to sample Returns: SeqTable Object """ random_sequences = self.seq_df.sample(numseqs) if 'quals' in random_sequences: random_qualities = random_sequences['quals'] else: random_qualities = None return seqtable(random_sequences['seqs'], random_qualities, self.start, index=random_sequences.index, seqtype=self.seqtype, phred_adjust=self.phred_adjust)
[docs] def update_seqdf(self): """ Make seq_df attribute in sync with seq_table and qual_table Sometimes it might be useful to make changes to the seq_table attribute. For example, may you have your own custom code where you change the values of seq_table to be '.' or something random. Well you want to make sure that seq_df updates accordingly because the full length strings are the most useful in the end """ self.seq_df = self.slice_sequences(self.seq_table.columns)
[docs] def qual_to_table(self, qualphred, phred_adjust=33, return_table=False): """ Given a set of quality score strings, updates the return a new dataframe such that each column represents the quality at each position as a number Args: qualphred: (Series or list of quality scores, default=None): If defined, then user is passing in quality data along with the sequences) phred_adjust (integer, default=33): If quality data is passed, then this will be used to adjust the quality score (i.e. Sanger vs older NGS quality scorning) return_table (boolean, default=False): If True, then the attribute self.qual_table is returned Returns: self.qual_table (Dataframe): each row corresponds to a specific sequence and each column corresponds to """ if isinstance(qualphred, list): qual_list = pd.Series(qualphred) else: qual_list = qualphred (qual_list, self.qual_table) = strseries_to_bytearray(qual_list, self.null_qual) self.seq_df['quals'] = list(qual_list) self.qual_table -= self.phred_adjust self.qual_table = pd.DataFrame(self.qual_table, index=self.index, columns=range(self.start, self.qual_table.shape[1] + self.start)) if self.qual_table.shape != self.seq_table.shape: raise Exception("The provided quality list does not match the format of the sequence list. Shape of sequences {0}, shape of quality {1}".format(str(self.seq_table.shape), str(self.qual_table.shape))) return self.qual_table
def _seq_to_table(self, seqlist): """ Given a set of sequences, generates a dataframe such that each column represents a base or residue at each position of the aligned sequences .. important::Private function This function is not for public use """ if isinstance(seqlist, list): seq_list = pd.Series(seqlist) else: seq_list = seqlist (seq_list, self.seq_table) = strseries_to_bytearray(seq_list, self.fillna_val) self.seq_df = pd.DataFrame(list(seq_list), index=self.index, columns=['seqs']) self.seq_table = pd.DataFrame(self.seq_table, index=self.index, columns=range(self.start, self.seq_table.shape[1] + self.start))
[docs] def table_to_seq(self, new_name): """ Return the sequence list """ return self.seq_list
[docs] def compare_to_reference(self, reference_seq, positions=None, ref_start=0, flip=False, set_diff=False, ignore_characters=[], return_num_bases=False): """ Calculate which positions within a reference are not equal in all sequences in dataframe Args: reference_seq (string): A string that you want to align sequences to positions (list, default=None): specific positions in both the reference_seq and sequences you want to compare ref_start (int, default=0): where does the reference sequence start with respect to the aligned sequences flip (bool): If True, then find bases that ARE MISMATCHES(NOT equal) to the reference set_diff (bool): If True, then we want to analyze positions that ARE NOT listed in positions parameters ignore_characters (char or list of chars): When performing distance/finding mismatches, always treat these characters as matches return_num_bases (bool): If true, returns a second argument defining the number of relevant bases present in each row ..important:: Change in output results Setting return_num_bases to true will change how results are returned (two elements rather than one are returned) Returns: Dataframe of boolean variables showing whether base is equal to reference at each position """ reference_seq = reference_seq.upper() compare_column_header = list(self.seq_table.columns) if ref_start < 0: # simple: the reference sequence is too long, so just trim it reference_seq = reference_seq[(-1 * ref_start):] elif ref_start > 0: reference_seq = self.fillna_val * ref_start + reference_seq # more complicated because we need to return results to user in the way they expected. What to do if the poisitions they requested are not # found in reference sequence provided if positions is None: positions = compare_column_header ignore_postions = compare_column_header[ref_start] before_filter = positions positions = [p for p in positions if p >= ref_start] if len(positions) < len(before_filter): warnings.warn("Warning: Because the reference starts at a position after the start of sequences we cannot anlayze the following positions: {0}".format(','.join([_ for _ in before_filter[:ref_start]]))) compare_column_header = compare_column_header[ref_start:] # adjust reference length if len(reference_seq) > self.seq_table.shape[1]: reference_seq = reference_seq[:self.seq_table.shape[1]] elif len(reference_seq) < self.seq_table.shape[1]: reference_seq = reference_seq + self.fillna_val * (self.seq_table.shape[1] - len(reference_seq)) if set_diff is True: # change positions of interest to be the SET DIFFERENCE of positions parameter if positions is None: raise Exception('You cannot analyze the set-difference of all positions. Returns a non-informative answer (no columns to compare)') positions = sorted(list(set(compare_column_header) - set(positions))) else: # determine which columns we should look at if positions is None: ref_cols = [i for i in range(len(compare_column_header))] positions = compare_column_header else: positions = sorted(list(set(positions) & set(compare_column_header))) ref_cols = [i for i, c in enumerate(compare_column_header) if c in positions] # convert reference to numbers # reference_array = np.array(bytearray(reference_seq))[ref_cols] reference_array = np.array([reference_seq], dtype='S').view(np.uint8)[ref_cols] # actually compare distances in each letter (find positions which are equal) diffs = self.seq_table[positions].values == reference_array # if flip is False else self.seq_table[positions].values != reference_array if ignore_characters: if not isinstance(ignore_characters, list): ignore_characters = [ignore_characters] ignore_characters = [ord(let) for let in ignore_characters] # now we have to ignore characters that are equal to specific values ignore_pos = (self.seq_table[positions].values == ignore_characters[0]) | (reference_array == ignore_characters[0]) for chr_p in range(1, len(ignore_characters)): ignore_pos = ignore_pos | (self.seq_table[positions].values == ignore_characters[chr_p]) | (reference_array == ignore_characters[chr_p]) # now adjust boolean results to ignore any positions == ignore_characters diffs = (diffs | ignore_pos) # if flip is False else (diffs | ignore_pos) if flip: diffs = ~diffs if return_num_bases: if ignore_characters: num_bases = len(positions) - ignore_pos.sum(axis=1) else: num_bases = len(positions) return pd.DataFrame(diffs, index=self.seq_table.index, dtype=bool, columns=positions), num_bases else: return pd.DataFrame(diffs, index=self.seq_table.index, dtype=bool, columns=positions)
[docs] def hamming_distance(self, reference_seq, positions=None, ref_start=0, set_diff=False, ignore_characters=[], normalized=False): """ Determine hamming distance of all sequences in dataframe to a reference sequence. Args: reference_seq (string): A string that you want to align sequences to positions (list, default=None): specific positions in both the reference_seq and sequences you want to compare ref_start (int, default=0): where does the reference sequence start with respect to the aligned sequences set_diff (bool): If True, then we want to analyze positions that ARE NOT listed in positions parameters normalized (bool): If True, then divides hamming distance by the number of relevant bases """ if normalized is True: diffs, bases = self.compare_to_reference(reference_seq, positions, ref_start, flip=True, set_diff=set_diff, ignore_characters=ignore_characters, return_num_bases=True) return pd.Series(diffs.values.sum(axis=1).astype(float) / bases, index=diffs.index) else: diffs = self.compare_to_reference(reference_seq, positions, ref_start, flip=True, set_diff=set_diff, ignore_characters=ignore_characters) return pd.Series(diffs.values.sum(axis=1), index=diffs.index) # columns=c1, index=ind1)
[docs] def quality_filter(self, q, p, inplace=False, ignore_null_qual=True): """ Filter out sequences based on their average qualities at each base/position Args: q (int): quality score cutoff p (int/float/percent 0-100): the percent of bases that must have a quality >= the cutoff q inplace (boolean): If False, returns a copy of the object filtered by quality score ignore_null_qual (boolean): Ignore bases that are not represented. (i.e. those with quality of 0) """ if self.qual_table is None: raise Exception("You have not passed in any quality data for these sequences") meself = self if inplace is True else copy.deepcopy(self) total_bases = (meself.qual_table.values > (ord(self.null_qual) - self.phred_adjust)).sum(axis=1) if ignore_null_qual else meself.qual_table.shape[1] percent_above = (100 * ((meself.qual_table.values >= q).sum(axis=1))) / total_bases meself.qual_table = meself.qual_table[percent_above >= p] meself.seq_table = meself.seq_table.loc[meself.qual_table.index] meself.seq_df = meself.seq_df.loc[meself.qual_table.index] # bases = meself.seq_table.shape[1] self.shape = self.seq_table.shape if inplace is False: return meself
[docs] def convert_low_bases_to_null(self, q, replace_with='N', inplace=False): """ This will convert all letters whose corresponding quality is below a cutoff to the value replace_with Args: q (int): quality score cutoff, convert all bases whose quality is < than q inplace (boolean): If False, returns a copy of the object filtered by quality score replace_with (char): a character to replace low bases with """ if self.qual_table is None: raise Exception("You have not passed in any quality data for these sequences") meself = self if inplace is True else self.copy() replace_with = ord(replace_with) if replace_with is not None else ord('N') if self.seqtype == 'NT' else ord('X') meself.seq_table.values[meself.qual_table.values < q] = replace_with chars = self.seq_table.shape[1] meself.seq_df['seqs'] = list(meself.seq_table.values.copy().view('S' + str(chars)).ravel()) if inplace is False: return meself
def slice_sequences(self, positions, name='seqs', return_quality=False, empty_chars=None): if empty_chars is None: empty_chars = self.fillna_val positions = [p for p in positions] num_chars = len(positions) # confirm that all positions are present in the column missing_pos = set(positions) - set(self.seq_table.columns) if len(missing_pos) > 0: new_positions = [p for p in positions if p in self.seq_table.columns] prepend = ''.join([empty_chars for p in positions if p < self.seq_table.columns[0]]) append = ''.join([empty_chars for p in positions if p > self.seq_table.columns[-1]]) positions = new_positions num_chars = len(positions) warnings.warn("The sequences do not cover all positions requested. {0}'s will be appended and prepended to sequences as necessary".format(empty_chars)) else: prepend = '' append = '' if positions == []: if return_quality: qual_empty = '!' * (len(prepend) + len(append)) return pd.DataFrame({'seqs': prepend + append, 'quals': qual_empty}, columns=['seqs', 'quals'], index=self.index) else: return pd.DataFrame(prepend + append, columns=['seqs'], index=self.index) substring = pd.DataFrame({name: self.seq_table.loc[:, positions].values.copy().view('S{0}'.format(num_chars)).ravel()}, index=self.index) if prepend or append: substring['seqs'] = substring['seqs'].apply(lambda x: prepend + x + append) if self.qual_table is not None and return_quality: subquality = self.qual_table.loc[:, positions].values subquality = (subquality + self.phred_adjust).copy().view('S{0}'.format(num_chars)).ravel() substring['quals'] = subquality if prepend or append: prepend = '!' * len(prepend) append = '!' * len(append) substring['quals'] = substring['quals'].apply(lambda x: prepend + x + append) return substring
[docs] def get_seq_dist(self, positions=None): """ Returns the distribution of bases or amino acids at each position. """ compare = self.seq_table.loc[:, positions] if positions else self.seq_table dist = compare.apply(pd.value_counts).fillna(0) dist.rename({c: chr(c) for c in list(dist.index)}, inplace=True) return dist
[docs] def get_consensus(self, positions=None, modecutoff=0.5): """ Returns the sequence consensus of the bases at the defined positions Args: positions: Slice which positions in the table should be conidered modecutoff: Only report the consensus base of letters which appear more than the provided modecutoff (in other words, the mode must be greater than this frequency) """ compare = self.seq_table.loc[:, positions] if positions else self.seq_table cutoff = float(compare.shape[0]) * modecutoff chars = compare.shape[1] dist = np.int8(compare.apply(lambda x: x.mode()).values[0]) dist[(compare.values == dist).sum(axis=0) <= cutoff] = ord('N') seq = dist.view('S' + str(chars))[0] return seq
def seq_logo(self): pass
[docs] def get_quality_dist(self, bins=None, percentiles=[0.1, 0.25, 0.5, 0.75, 0.9], exclude_null_quality=True, sample=None): """ Returns the distribution of quality across the given sequence, similar to FASTQC quality seq report. Args: bins(list of ints or tuples, default=None): bins defines how to group together the columns/sequence positions when aggregating the statistics. .. note:: bins=None If bins is none, then by default, bins are set to the same ranges defined by fastqc report percentiles (list of floats, default=[0.1, 0.25, 0.5, 0.75, 0.9]): value passed into pandas describe() function. exclude_null_quality (boolean, default=True): do not include quality scores of 0 in the distribution sample (int, default=None): If defined, then we will only calculate the distribution on a random subsampled population of sequences Returns: data (DataFrame): contains the distribution information at every bin (min value, max value, desired precentages and quartiles) graphs (plotly object): contains plotly graph objects for generating plots of the data afterwards Examples: Show the median of the quality at the first ten positions in the sequence >>> table = SeqTable(['AAAAAAAAAA', 'AAAAAAAAAC', 'CCCCCCCCCC'], qualitydata=['6AA9-C9--6C', '6AA!1C9BA6C', '6AA!!C9!-6C']) >>> box_data, graphs = table.get_quality_dist(bins=range(10), percentiles=[0.5]) Now repeat the example from above, except group together all values from the first 5 bases and the next 5 bases i.e. All qualities between positions 0-4 will be grouped together before performing median, and all qualities between 5-9 will be grouped together). Also, return the bottom 10 and upper 90 percentiles in the statsitics >>> box_data, graphs = table.get_quality_dist(bins=[(0,4), (5,9)], percentiles=[0.1, 0.5, 0.9]) We can also plot the results as a series of boxplots using plotly >>> from plotly.offline import init_notebook_mode, iplot, plot, iplot_mpl # assuming ipython.. >>> init_notebook_mode() >>> plotly.iplot(graphs) # using outside of ipython >>> plotly.plot(graphs) """ return get_quality_dist(self.qual_table, bins, percentiles, exclude_null_quality, sample)
[docs]def read_fastq(input_file, limit=None, chunk_size=10000, use_header_as_index=True, use_pandas=True): """ Load a fastq file as class SeqTable """ def group_fastq(index): return index % 4 if use_pandas: grouped = pd.read_csv(input_file, sep='\n', header=None).groupby(group_fastq) header = list(grouped.get_group(0)[0].apply(lambda x: x[1:])) seqs = list(grouped.get_group(1)[0]) quals = list(grouped.get_group(3)[0]) else: if bio_installed is False: raise Exception("You do not have BIOPYTHON installed and therefore must use pandas to read the fastq (set use_pandas parameter to True). If you would like to use biopython then please install") seqs = [] quals = [] header = [] for seq in SeqIO.parse(input_file, 'fastq'): r = seq.format('fastq').split('\n') header.append(r[0]) seqs.append(r[1]) quals.append(r[3]) return seqtable(seqs, quals, index=header, seqtype='NT')
[docs]def read_sam(input_file, limit=None, chunk_size=100000, cleave_softclip=False, use_header_as_index=True): """ Load a SAM file into class SeqTable """ skiplines = 0 with open(input_file) as r: for i in r: if i[0] == '@': skiplines += 1 else: break cols_to_use = [9, 10] if use_header_as_index: cols_to_use.append(0) index_col = 0 else: index_col = None if cleave_softclip: cols_to_use.append(5) df = pd.read_csv(input_file, sep='\t', header=None, index_col=index_col, usecols=cols_to_use, skiprows=skiplines) index = df.index return seqtable(df[9], df[10], index, 'NT')
class seqtable_indexer(): def __init__(self, obj, method): self.obj = obj self.method = method def __getitem__(self, key): return self.obj.slice_object(self.method, key)