Source code for prody.sequence.msa

# -*- coding: utf-8 -*-
"""This module defines MSA analysis functions."""

from numpy import all, zeros, dtype, array, char, cumsum

from .sequence import Sequence, splitSeqLabel

from prody import LOGGER

__all__ = ['MSA', 'refineMSA', 'mergeMSA']

try:
    range = xrange
except NameError:
    pass


[docs]class MSA(object): """Store and manipulate multiple sequence alignments.""" def __init__(self, msa, title='Unknown', labels=None, **kwargs): """*msa* must be a 2D Numpy character array. *labels* is a list of sequence labels (or titles). *mapping* should map label or part of label to sequence index in *msa* array. If *mapping* is not given, one will be build from *labels*.""" try: ndim, dtype_, shape = msa.ndim, msa.dtype, msa.shape except AttributeError: raise TypeError('msa is not a Numpy array') self._aligned = aligned = kwargs.get('aligned', True) if aligned: if ndim != 2: raise ValueError('msa.dim must be 2') if dtype_ != dtype('|S1'): raise ValueError('msa must be a character array') numseq = shape[0] if labels and len(labels) != numseq: raise ValueError('len(labels) must be equal to number of ' 'sequences') self._labels = labels mapping = kwargs.get('mapping') if mapping is None: if labels is not None: # map labels to sequence index self._mapping = mapping = {} for index, label in enumerate(labels): label = splitSeqLabel(label)[0] try: value = mapping[label] except KeyError: mapping[label] = index else: try: value.append(index) except AttributeError: mapping[label] = [value, index] elif mapping: try: mapping['isdict'] except KeyError: pass except Exception: raise TypeError('mapping must be a dictionary') self._mapping = mapping if labels is None: self._labels = [None] * numseq self._msa = msa self._title = str(title) or 'Unknown' self._split = bool(kwargs.get('split', True)) def __str__(self): return 'MSA ' + self._title def __repr__(self): if self._aligned: return ('<MSA: {0} ({1} sequences, {2} residues)>' ).format(self._title, self.numSequences(), self.numResidues()) else: return ('<MSA: {0} ({1} sequences, not aligned)>' ).format(self._title, self.numSequences()) def __len__(self): return len(self._msa) def __getitem__(self, index): if isinstance(index, int): return Sequence(self, index) if isinstance(index, str): try: rows = self._mapping[index] except KeyError: raise KeyError('label {0} is not mapped to a sequence' .format(index)) else: msa = self._msa[rows] if msa.ndim == 1: return Sequence(self, rows) else: if msa.base is not None: msa = msa.copy() labels = self._labels return MSA(msa, title='{0}[{1}]'.format(self._title, index), labels=[labels[i] for i in rows]) elif isinstance(index, tuple): if len(index) == 1: return self[index[0]] elif len(index) == 2: rows, cols = index else: raise IndexError('invalid index: ' + str(index)) else: rows, cols = index, None # handle list of labels if isinstance(rows, list): rows = self.getIndex(rows) or rows elif isinstance(rows, int): return Sequence(self._msa[rows, cols].tostring(), self._labels[rows]) elif isinstance(rows, str): try: rows = self._mapping[rows] except KeyError: raise KeyError('label {0} is not mapped to a sequence' .format(index)) else: if isinstance(rows, int): return Sequence(self._msa[rows, cols].tostring(), self._labels[rows]) if cols is None: msa = self._msa[rows] else: if isinstance(cols, (slice, int)): msa = self._msa[rows, cols] else: try: msa = self._msa[rows].take(cols, 1) except TypeError: raise IndexError('invalid index: ' + str(index)) try: lbls = self._labels[rows] except TypeError: labels = self._labels lbls = [labels[i] for i in rows] else: if not isinstance(lbls, list): lbls = [lbls] if msa.ndim == 0: msa = msa.reshape((1, 1)) elif msa.ndim == 1: msa = msa.reshape((1, len(msa))) if msa.base is not None: msa = msa.copy() return MSA(msa=msa, title=self._title + '\'', labels=lbls, aligned=self._aligned) def __iter__(self): for i in range(len(self._msa)): yield Sequence(self, i) def __contains__(self, key): try: return key in self._mapping except Exception: pass return False def __eq__(self, other): try: other = other._getArray() except AttributeError: return False try: return all(other == self._msa) except Exception: pass return False def _getSplit(self): return self._split def _setSplit(self, split): self._split = bool(split) split = property(_getSplit, _setSplit, doc='Return split label when iterating or indexing.')
[docs] def isAligned(self): """Return **True** if MSA is aligned.""" return self._aligned
[docs] def numSequences(self): """Return number of sequences.""" return self._msa.shape[0]
[docs] def numResidues(self): """Return number of residues (or columns in the MSA), if MSA is aligned.""" if self._aligned: return self._msa.shape[1]
[docs] def numIndexed(self): """Return number of sequences that are indexed using the identifier part or all of their labels. The return value should be equal to number of sequences.""" count = len(self._mapping) if len(self._msa) == count: return count else: count = len(self._mapping) for val in self._mapping.values(): try: count += len(val) - 1 except TypeError: pass return count
[docs] def getTitle(self): """Return title of the instance.""" return self._title
[docs] def setTitle(self, title): """Set title of the instance.""" self._title = str(title)
[docs] def getLabel(self, index, full=False): """Return label of the sequence at given *index*. Residue numbers will be removed from the sequence label, unless *full* is **True**.""" index = self._mapping.get(index, index) if full: return self._labels[index] else: return splitSeqLabel(self._labels[index])[0]
[docs] def getResnums(self, index): """Return starting and ending residue numbers (:term:`resnum`) for the sequence at given *index*.""" index = self._mapping.get(index, index) return splitSeqLabel(self._labels[index])[1:]
[docs] def getArray(self): """Return a copy of the MSA character array.""" return self._msa.copy()
def _getArray(self): """Return MSA character array.""" return self._msa
[docs] def getIndex(self, label): """Return index of the sequence that *label* maps onto. If *label* maps onto multiple sequences or *label* is a list of labels, a list of indices is returned. If an index for a label is not found, return **None**.""" try: index = self._mapping[label] except KeyError: return None except TypeError: mapping = self._mapping indices = [] append, extend = indices.append, indices.extend for key in label: try: index = mapping[key] except KeyError: return None try: extend(index) except TypeError: append(index) return indices else: try: return list(index) except TypeError: return index
[docs] def iterLabels(self, full=False): """Yield sequence labels. By default the part of the label used for indexing sequences is yielded.""" if full: for label in self._labels: yield label else: for label in self._labels: yield splitSeqLabel(label)[0]
[docs] def countLabel(self, label): """Return the number of sequences that *label* maps onto.""" try: return len(self._mapping[label]) except KeyError: return 0 except TypeError: return 1
[docs]def refineMSA(msa, label=None, rowocc=None, seqid=None, colocc=None, **kwargs): """Refine *msa* by removing sequences (rows) and residues (columns) that contain gaps. :arg msa: multiple sequence alignment :type msa: :class:`.MSA` :arg label: remove columns that are gaps in the sequence matching label, ``msa.getIndex(label)`` must return a sequence index, a PDB identifier is also acceptable :type label: str :arg rowocc: row occupancy, sequences with less occupancy will be removed after *label* refinement is applied :type rowocc: float :arg seqid: keep unique sequences at specified sequence identity level, unique sequences are identified using :func:`.uniqueSequences` :type seqid: float :arg colocc: column occupancy, residue positions with less occupancy will be removed after other refinements are applied :type colocc: float :arg keep: keep columns corresponding to residues not resolved in the PDB structure, default is **False**, applies when *label* is a PDB identifier :arg type: bool For Pfam MSA data, *label* is UniProt entry name for the protein. You may also use PDB structure and chain identifiers, e.g. ``'1p38'`` or ``'1p38A'``, for *label* argument and UniProt entry names will be parsed using :func:`.parsePDBHeader` function (see also :class:`.Polymer` and :class:`.DBRef`). The order of refinements are applied in the order of arguments. If *label* and *unique* is specified is specified, sequence matching *label* will be kept in the refined :class:`.MSA` although it may be similar to some other sequence.""" # if msa is a char array, it will be refined but label won't work try: ndim, dtype_ = msa.ndim, msa.dtype except AttributeError: try: arr = msa._getArray() except AttributeError: raise TypeError('msa must be a character array or an MSA instance') ndim, dtype_ = arr.ndim, arr.dtype else: arr, msa = msa, None if dtype('|S1') != dtype_: raise ValueError('msa must be a character array or an MSA instance') if ndim != 2: raise ValueError('msa must be a 2D array or an MSA instance') title = [] cols = None index = None if label is not None: before = arr.shape[1] LOGGER.timeit('_refine') try: upper, lower = label.upper(), label.lower() except AttributeError: raise TypeError('label must be a string') if msa is None: raise TypeError('msa must be an MSA instance, ' 'label cannot be used') index = msa.getIndex(label) if index is None: index = msa.getIndex(upper) if index is None: index = msa.getIndex(lower) chain = None if index is None and (len(label) == 4 or len(label) == 5): from prody import parsePDB try: structure, header = parsePDB(label[:4], header=True) except Exception as err: raise IOError('failed to parse header for {0} ({1})' .format(label[:4], str(err))) chid = label[4:].upper() for poly in header['polymers']: if chid and poly.chid != chid: continue for dbref in poly.dbrefs: if index is None: index = msa.getIndex(dbref.idcode) if index is not None: LOGGER.info('{0} idcode {1} for {2}{3} ' 'is found in chain {3}.'.format( dbref.database, dbref.idcode, label[:4], poly.chid, str(msa))) break if index is None: index = msa.getIndex(dbref.accession) if index is not None: LOGGER.info('{0} accession {1} for {2}{3} ' 'is found in chain {3}.'.format( dbref.database, dbref.accession, label[:4], poly.chid, str(msa))) break if index is not None: chain = structure[poly.chid] if index is None: raise ValueError('label is not in msa, or msa is not indexed') try: len(index) except TypeError: pass else: raise ValueError('label {0} maps onto multiple sequences, ' 'so cannot be used for refinement'.format(label)) title.append('label=' + label) cols = char.isalpha(arr[index]).nonzero()[0] arr = arr.take(cols, 1) LOGGER.report('Label refinement reduced number of columns from {0} to ' '{1} in %.2fs.'.format(before, arr.shape[1]), '_refine') if chain is not None and not kwargs.get('keep', False): before = arr.shape[1] LOGGER.timeit('_refine') from prody.proteins.compare import importBioPairwise2 from prody.proteins.compare import MATCH_SCORE, MISMATCH_SCORE from prody.proteins.compare import GAP_PENALTY, GAP_EXT_PENALTY pw2 = importBioPairwise2() chseq = chain.getSequence() algn = pw2.align.localms(arr[index].tostring().upper(), chseq, MATCH_SCORE, MISMATCH_SCORE, GAP_PENALTY, GAP_EXT_PENALTY, one_alignment_only=1) torf = [] for s, c in zip(*algn[0][:2]): if s == '-': continue elif c != '-': torf.append(True) else: torf.append(False) torf = array(torf) tsum = torf.sum() assert tsum <= before, 'problem in mapping sequence to structure' if tsum < before: arr = arr.take(torf.nonzero()[0], 1) LOGGER.report('Structure refinement reduced number of ' 'columns from {0} to {1} in %.2fs.' .format(before, arr.shape[1]), '_refine') else: LOGGER.debug('All residues in the sequence are contained in ' 'PDB structure {0}.'.format(label)) from .analysis import calcMSAOccupancy, uniqueSequences rows = None if rowocc is not None: before = arr.shape[0] LOGGER.timeit('_refine') try: rowocc = float(rowocc) except Exception as err: raise TypeError('rowocc must be a float ({0})'.format(str(err))) assert 0. <= rowocc <= 1., 'rowocc must be between 0 and 1' rows = calcMSAOccupancy(arr, 'row') >= rowocc if index is not None: index = rows[:index].sum() rows = (rows).nonzero()[0] arr = arr[rows] title.append('rowocc>=' + str(rowocc)) LOGGER.report('Row occupancy refinement reduced number of rows from ' '{0} to {1} in %.2fs.'.format(before, arr.shape[0]), '_refine') if seqid is not None: before = arr.shape[0] LOGGER.timeit('_refine') unique = uniqueSequences(arr, seqid) if index is not None: unique[index] = True unique = unique.nonzero()[0] arr = arr[unique] title.append('seqid>=' + str(seqid)) if rows is not None: rows = rows[unique] else: rows = unique LOGGER.report('Sequence identity refinement reduced number of rows ' 'from {0} to {1} in %.2fs.'.format(before, arr.shape[0]), '_refine') if colocc is not None: before = arr.shape[1] LOGGER.timeit('_refine') try: colocc = float(colocc) except Exception as err: raise TypeError('colocc must be a float ({0})'.format(str(err))) assert 0. <= colocc <= 1., 'colocc must be between 0 and 1' cols = (calcMSAOccupancy(arr, 'col') >= colocc).nonzero()[0] arr = arr.take(cols, 1) title.append('colocc>=' + str(colocc)) LOGGER.report('Column occupancy refinement reduced number of columns ' 'from {0} to {1} in %.2fs.'.format(before, arr.shape[1]), '_refine') if not title: raise ValueError('label, rowocc, colocc all cannot be None') # depending on slicing of rows, arr may not have it's own memory if arr.base is not None: arr = arr.copy() if msa is None: return arr else: if rows is None: from copy import copy labels = copy(msa._labels) mapping = copy(msa._mapping) else: labels = msa._labels labels = [labels[i] for i in rows] mapping = None return MSA(arr, title=msa.getTitle() + ' refined ({0})' .format(', '.join(title)), labels=labels, mapping=mapping)
[docs]def mergeMSA(*msa, **kwargs): """Return an :class:`.MSA` obtained from merging parts of the sequences of proteins present in multiple *msa* instances. Sequences are matched based on protein identifiers found in the sequence labels. Order of sequences in the merged MSA will follow the order of sequences in the first *msa* instance. Note that protein identifiers that map to multiple sequences will be excluded.""" if len(msa) <= 1: raise ValueError('more than one msa instances are needed') try: arrs = [m._getArray() for m in msa] sets = [] for m in msa: aset = set([]) count = m.countLabel for label in m.iterLabels(): if count(label) == 1: aset.add(label) sets.append(aset) except AttributeError: raise TypeError('all msa arguments must be MSA instances') sets = iter(sets) common = next(sets) for aset in sets: common = common.intersection(aset) if not common: return None lens = [m.numResidues() for m in msa] rngs = [0] rngs.extend(cumsum(lens)) rngs = [(start, end) for start, end in zip(rngs[:-1], rngs[1:])] idx_arr_rng = list(zip([m.getIndex for m in msa], arrs, rngs)) merger = zeros((len(common), sum(lens)), '|S1') index = 0 labels = [] mapping = {} for label in msa[0].iterLabels(): if label not in common: continue for idx, arr, (start, end) in idx_arr_rng: merger[index, start:end] = arr[idx(label)] labels.append(label) mapping[label] = index index += 1 merger = MSA(merger, labels=labels, mapping=mapping, title=' + '.join([m.getTitle() for m in msa])) return merger
Read the Docs v: v1.5
Versions
latest
v1.5
Downloads
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.