# -*- coding: utf-8 -*-
"""This module defines functions and classes for parsing, manipulating, and
analyzing multiple sequence alignments."""
__author__ = 'Anindita Dutta, Ahmet Bakan'
from os.path import isfile, splitext, split, getsize
from numpy import array, fromstring, empty
from .sequence import splitSeqLabel, Sequence
from prody import LOGGER
from prody.utilities import openFile
__all__ = ['MSAFile', 'splitSeqLabel', 'parseMSA', 'writeMSA']
FASTA = 'FASTA'
SELEX = 'SELEX'
STOCKHOLM = 'Stockholm'
MSAFORMATS = {
FASTA.lower(): FASTA,
SELEX.lower(): SELEX,
STOCKHOLM.lower(): STOCKHOLM,
}
MSAEXTMAP = {
FASTA: '.fasta',
SELEX: '.slx',
STOCKHOLM: '.sth',
FASTA.lower(): '.fasta',
SELEX.lower(): '.slx',
STOCKHOLM.lower(): '.sth',
'.sth': STOCKHOLM,
'.slx': SELEX,
'.fasta': FASTA
}
WSJOIN = ' '.join
ESJOIN = ''.join
NUMLINES = 1000
LEN_FASTA_LINE = 60
LEN_SELEX_LABEL = 31
try:
range = xrange
except NameError:
pass
[docs]class MSAFile(object):
"""Handle MSA files in FASTA, SELEX and Stockholm formats."""
def __init__(self, msa, mode='r', format=None, aligned=True, **kwargs):
"""*msa* may be a filename or a stream. Multiple sequence alignments
can be read from or written in FASTA (:file:`.fasta`), Stockholm
(:file:`.sth`), or SELEX (:file:`.slx`) *format*. For spesified
extensions, *format* argument is not needed. If *aligned* is
**True**, unaligned sequences in the file or stream will cause an
:exc:`IOError` exception. *filter*, a function that returns a
boolean, can be used for filtering sequences, see :meth:`setFilter`
for details. *slice* can be used to slice sequences, and is applied
after filtering, see :meth:`setSlice` for details."""
if mode[0] not in 'rwa':
raise ValueError("mode string must be one of 'r', 'w', or 'a', "
"not {0}".format(repr(mode)))
if 'b' in mode:
mode = mode.replace('b', '')
if 't' not in mode:
mode += 't'
self._format = None
if format is not None:
try:
self._format = format = MSAFORMATS[format.lower()]
except AttributeError:
raise TypeError('format argument must be a string')
except KeyError:
raise ValueError('format argument is not recognized')
self._filename = filename = None
if mode.startswith('r'):
try:
torf = isfile(msa)
except:
pass
else:
if torf:
self._filename = filename = msa
else:
try:
msa.lower, msa.strip
except AttributeError:
pass
else:
self._filename = filename = msa
if filename is not None:
self._filename = filename
title, ext = splitext(split(filename)[1])
if ext.lower() == '.gz':
title, ext = splitext(split(title)[1])
if format is None:
try:
self._format = format = MSAEXTMAP[ext.lower()]
except KeyError:
raise TypeError('format is not specified and could not be '
'determined from file extension')
self._title = title
self._stream = openFile(msa, mode)
else:
if self._format is None:
raise ValueError('format must be specified when msa is a '
'stream')
self._stream = msa
self._title = 'stream'
try:
closed = self._stream.closed
except AttributeError:
closed = self._stream.myfileobj.closed
if closed:
raise ValueError('msa stream must not be closed')
self._lenseq = None
self._closed = False
self._readline = None
self._aligned = bool(aligned)
if mode.startswith('r'):
self._readline = self._stream.readline
try:
self._readlines = self._stream.readlines
except AttributeError:
pass
self.setFilter(kwargs.get('filter', None),
kwargs.get('filter_full', False))
self.setSlice(kwargs.get('slice', None))
self._iter = self._itermap[format](self)
else:
try:
self._write = write = self._stream.write
except AttributeError:
raise TypeError('msa must be a filename or a stream with '
'write method')
if mode.startswith('w') and format == STOCKHOLM:
write('# STOCKHOLM 1.0\n')
if format.startswith('S'):
self._selex_line = '{0:' + str(LEN_SELEX_LABEL) + 's} {1}\n'
self._mode = mode
def __del__(self):
self.close()
def __iter__(self):
if not self._mode.startswith('r'):
raise IOError('File not open for reading')
filter = self._filter
slicer = self._slicer
if filter is None:
for seq, label in self._iter:
yield Sequence(slicer(seq), label)
else:
if self._filter_full:
for seq, label in self._iter:
if filter(label, seq):
yield Sequence(slicer(seq), label)
else:
for seq, label in self._iter:
if filter(splitSeqLabel(label)[0], seq):
yield Sequence(slicer(seq), label)
def __str__(self):
return 'MSAFile ' + self._title
def __repr__(self):
if self._closed:
return '<MSAFile: {0} ({1}; closed)>'.format(self._title,
self._format)
else:
return ('<MSAFile: {0} ({1}; mode {2})>'
).format(self._title, self._format, repr(self._mode))
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.close()
def _readlines(self, size=None):
"""Read multiple lines, in case stream does not have this method."""
if self._closed:
raise ValueError('I/O operation on closed file')
import sys
size = size or sys.maxint
lines = []
append = lines.append
readline = self._stream.readline
for i in range(size):
line = readline()
if line:
append(line)
else:
break
return lines
[docs] def close(self):
"""Close the file. This method will not affect a stream."""
if self._filename is None:
self._closed = True
return
if not self._mode.startswith('r') and self._format == STOCKHOLM:
try:
self._write('//\n')
except ValueError:
LOGGER.info('Failed to write terminal slash characters to '
'closed file.')
try:
self._stream.close()
except Exception:
pass
self._closed = True
def _isClosed(self):
return self._closed
closed = property(_isClosed, None, doc="True for closed file.")
def _getFormat(self):
"""Return format of the MSA file."""
return self._format
format = property(_getFormat, None, doc="Format of the MSA file.")
[docs] def reset(self):
"""Return to the beginning of the file."""
self._readline()
self._iterator = self._iter()
[docs] def isAligned(self):
"""Return **True** if MSA is aligned."""
return self._aligned
def _iterBio(self):
"""Yield sequences from a Biopython MSA object."""
aligned = self._aligned
lenseq = self._lenseq
numseq = 0
for record in self._bio:
label, seq = record.id, str(record.seq)
if not lenseq:
self._lenseq = lenseq = len(seq)
if aligned and lenseq != len(seq):
raise IOError('sequence for {0} does not have '
'expected length {1}'
.format(label, lenseq))
numseq += 1
yield seq, label
def _iterFasta(self):
"""Yield sequences from a file or stream in FASTA format."""
aligned = self._aligned
lenseq = self._lenseq
temp = []
label = ''
lines = []
while not label.startswith('>'):
if not lines:
lines = self._readlines(NUMLINES)
label = lines.pop(0)
label = label[1:].strip()
while lines:
for line in lines:
if line.startswith('>'):
seq = ESJOIN(temp)
if not lenseq:
self._lenseq = lenseq = len(seq)
if aligned and lenseq != len(seq):
raise IOError('sequence for {0} does not have '
'expected length {1}'
.format(label, lenseq))
yield seq, label
temp = []
label = line[1:].strip()
else:
temp.append(line.strip())
lines = self._readlines(NUMLINES)
yield ESJOIN(temp), label
def _iterSelex(self):
"""Yield sequences from an MSA file in Stockholm/SELEX format."""
aligned = self._aligned
lenseq = self._lenseq
readlines = self._readlines
lines = readlines(NUMLINES)
while lines:
for line in lines:
ch = line[0]
if ch == '#' or ch == '/':
continue
items = line.split()
if len(items) == 2:
label = items[0]
seq = items[1]
else:
label = WSJOIN(items[:-1])
seq = items[-1]
if not lenseq:
self._lenseq = lenseq = len(seq)
if aligned and lenseq != len(seq):
raise IOError('sequence for {0} does not have '
'expected length {1}'
.format(label, lenseq))
yield seq, label
lines = readlines(NUMLINES)
_itermap = {
FASTA: _iterFasta,
SELEX: _iterSelex,
STOCKHOLM: _iterSelex,
}
[docs] def getTitle(self):
"""Return title of the instance."""
return self._title
[docs] def setTitle(self, title):
"""Set title of the instance."""
self._title = str(title)
[docs] def getFilename(self):
"""Return filename, or **None** if instance is handling a stream."""
return self._filename
[docs] def getFilter(self):
"""Return function used for filtering sequences."""
return self._filter
[docs] def setFilter(self, filter, filter_full=False):
"""Set function used for filtering sequences. *filter* will be applied
to split sequence label, by default. If *filter_full* is **True**,
filter will be applied to the full label. """
self._filter_full = bool(filter_full)
if filter is None:
self._filter = None
return
if not callable(filter):
raise TypeError('filter must be callable')
try:
result = filter('TEST_TITLE', 'SEQUENCE-WITH-GAPS')
except Exception as err:
raise TypeError('filter function must not raise exceptions, '
'e.g. ' + str(err))
else:
try:
result = result or not result
except Exception as err:
raise ValueError('filter function must return a boolean, '
'e.g. ' + str(err))
self._filter = filter
[docs] def getSlice(self):
"""Return object used to slice sequences."""
return self._slice
[docs] def setSlice(self, slice):
"""Set object used to *slice* sequences, which may be a :func:`slice`
or a :func:`list` of numbers."""
if slice is None:
self._slice = None
self._slicer = lambda seq: seq
else:
seq = 'SEQUENCE' * 1000
try:
seq[slice]
except Exception:
arr = fromstring(seq, '|S1')
try:
arr[slice]
except Exception:
raise TypeError('invalid slice: ' + repr(slice))
else:
self._slice = slice
self._slicer = lambda seq, slc=slice: fromstring(seq,
'|S1')[slc].tostring()
else:
self._slice = slice
self._slicer = lambda seq, slc=slice: seq[slc]
[docs] def write(self, seq):
"""Write *seq*, an :class:`.Sequence` instance, into the MSA file."""
if self._closed:
raise ValueError('I/O operation on closed file')
write = self._write
try:
label, sequence = seq.getLabel(True), str(seq)
except AttributeError:
raise TypeError('seq must be a Sequence instance')
if self._lenseq is None:
lenseq = self._lenseq = len(sequence)
else:
lenseq = self._lenseq
if self._aligned and lenseq != self._lenseq:
raise ValueError('writing an aligned MSA file, '
'len(sequence) must be ' + str(lenseq))
if self._format == FASTA:
write('>')
write(label)
write('\n')
beg = 0
end = LEN_FASTA_LINE
lenseq = len(sequence)
while beg < lenseq:
write(sequence[beg:end])
write('\n')
beg += LEN_FASTA_LINE
end += LEN_FASTA_LINE
else:
write(self._selex_line.format(label, sequence))
[docs]def parseMSA(filename, **kwargs):
"""Return an :class:`.MSA` instance that stores multiple sequence alignment
and sequence labels parsed from Stockholm, SELEX, or FASTA format
*filename* file, which may be a compressed file. Uncompressed MSA files
are parsed using C code at a fraction of the time it would take to parse
compressed files in Python."""
from .msa import MSA
try:
fileok = isfile(filename)
except TypeError:
raise TypeError('filename must be a string')
else:
if not fileok:
raise IOError('[Errno 2] No such file or directory: ' +
repr(filename))
# if MSA is a compressed file or filter/slice is passed, use
# Python parsers
LOGGER.timeit('_parsemsa')
title, ext = splitext(filename)
title = split(title)[1]
aligned = kwargs.get('aligned', True)
if (ext.lower() == '.gz' or 'filter' in kwargs or 'slice' in kwargs or
not aligned):
if ext.lower() == '.gz':
title = splitext(title)[0]
msa = MSAFile(filename, split=False, **kwargs)
seqlist = []
sappend = seqlist.append
labels = []
lappend = labels.append
mapping = {}
maxlen = 0
for i, seq in enumerate(msa):
label = seq.getLabel(True)
lappend(label)
if aligned:
sappend(seq._array)
else:
if len(seq) > maxlen:
maxlen = len(seq)
sappend(seq)
key = splitSeqLabel(label)[0]
if key in mapping:
try:
mapping[key].append(i)
except AttributeError:
mapping[key] = [mapping[key], i]
else:
mapping[key] = i
if not seqlist:
LOGGER.warn('No sequences were parsed from {0}.'.format(filename))
return
if aligned:
msaarr = array(seqlist, '|S1')
else:
msaarr = array(seqlist, '|S' + str(maxlen))
else:
filesize = getsize(filename)
format = MSAEXTMAP.get(splitext(filename)[1])
if format == FASTA:
from .msaio import parseFasta as parser
elif format == SELEX or format == STOCKHOLM:
from .msaio import parseSelex as parser
else:
raise IOError('MSA file format is not recognized from the '
'extension')
msaarr = empty(filesize, '|S1')
msaarr, labels, mapping, lcount = parser(filename, msaarr)
if lcount != len(msaarr):
LOGGER.warn('Failed to parse {0} sequence labels.'
.format(len(msaarr) - lcount))
msa = MSA(msa=msaarr, title=title, labels=labels, mapping=mapping,
aligned=aligned)
if aligned:
LOGGER.report('{0} sequence(s) with {1} residues were parsed in '
'%.2fs.'.format(*msaarr.shape), '_parsemsa')
else:
LOGGER.report('{0} sequence(s) were parsed in %.2fs.'
.format(*msaarr.shape), '_parsemsa')
return msa
[docs]def writeMSA(filename, msa, **kwargs):
"""Return *filename* containing *msa*, a :class:`.MSA` or :class:`.MSAFile`
instance, in the specified *format*, which can be *SELEX*, *Stockholm*, or
*FASTA*. If *compressed* is **True** or *filename* ends with :file:`.gz`,
a compressed file will be written. :class:`.MSA` instances will be written
using C function into uncompressed files."""
fntemp, ext = splitext(filename)
ext = ext.lower()
compressed = kwargs.get('compressed', ext == '.gz')
if compressed and ext != '.gz':
filename += '.gz'
format = kwargs.get('format', None)
if format:
try:
format = MSAFORMATS[format.lower()]
except KeyError:
raise ValueError('format {0} is not recognized'
.format(repr(format)))
else:
if ext == '.gz':
ext = splitext(fntemp)[1].lower()
try:
format = MSAEXTMAP[ext]
except KeyError:
raise ValueError('format is not specified, and file extension '
'{0} is not recognized'.format(repr(ext)))
fast = False
try:
seqarr, _, _ = msa._getArray(), msa.numResidues(), msa.numSequences()
except AttributeError:
try:
msa.getFormat(), msa.getFilename(), msa.getFilter()
except AttributeError:
raise ValueError('msa must be an MSA or MSAFile instance, not '
.format(type(msa).__name__))
else:
seqiter = msa
else:
seqiter = msa
fast = True
if not fast or compressed:
with MSAFile(filename, 'w', format=format) as out:
write = out.write
[write(seq) for seq in seqiter]
else:
from prody.utilities import backupFile
backupFile(filename)
if format == FASTA:
from .msaio import writeFasta
writeFasta(filename, msa._labels, seqarr,
kwargs.get('line_length', LEN_FASTA_LINE))
else:
from .msaio import writeSelex
writeSelex(filename, msa._labels, seqarr,
stockholm=format != SELEX,
label_length=kwargs.get('label_length',
LEN_SELEX_LABEL))
return filename