from . import cigar as cigar_tools
import pysam
import subprocess
import re
from copy import copy
[docs]class SamRead(pysam.AlignedSegment):
def __init__(self, reference_name=None, next_reference_name=None, alignment_score=None):
self._reference_name = reference_name
self._next_reference_name = next_reference_name
self.alignment_score = alignment_score
def __repr__(self):
return '{}({}:{}, {}, {})'.format(
self.__class__.__name__, self.reference_name, self.reference_start,
self.cigar, self.query_sequence
[docs] @classmethod
def copy(cls, pysamRead):
return cls.copyOnto(pysamRead)
[docs] @classmethod
def copyOnto(cls, pysamRead, copyRead=None):
cp = cls() if copyRead is None else copyRead
cp._reference_name = pysamRead.reference_name
cp.query_sequence = pysamRead.query_sequence
cp.reference_start = pysamRead.reference_start
cp.reference_id = pysamRead.reference_id
cp.cigar = pysamRead.cigar[:]
cp.query_name = pysamRead.query_name
cp.mapping_quality = pysamRead.mapping_quality
cp.flag = pysamRead.flag
if pysamRead.is_paired:
cp.next_reference_id = pysamRead.next_reference_id
cp.next_reference_start = pysamRead.next_reference_start
cp._next_reference_name = pysamRead.next_reference_name
cp.alignment_score = pysamRead.alignment_score
except AttributeError:
return cp
def __copy__(self):
return self.__class__.copy(self)
def reference_name(self):
return self._reference_name
def next_reference_name(self):
return self._next_reference_name
[docs]def breakpoint_pos(read, orient=ORIENT.NS):
assumes the breakpoint is the position following softclipping on the side with more
softclipping (unless and orientation has been specified)
read (:class:`~pysam.AlignedSegment`): the read object
orient (ORIENT): the orientation
int: the position of the breakpoint in the input read
typ, freq = read.cigar[0]
end_typ, end_freq = read.cigar[-1]
if typ != CIGAR.S and end_typ != CIGAR.S:
raise AttributeError('cannot compute breakpoint for a read without soft-clipping', read.cigar)
if orient == ORIENT.NS:
if (typ == CIGAR.S and end_typ == CIGAR.S and freq > end_freq) \
or typ == CIGAR.S and end_typ != CIGAR.S:
# soft clipped to the left
# soft clipped to the right
orient = ORIENT.LEFT
if orient == ORIENT.RIGHT:
if typ != CIGAR.S:
raise AttributeError('soft clipping doesn\'t support input orientation for a breakpoint', repr(orient), read.cigar, read.get_tags())
return read.reference_start
if end_typ != CIGAR.S:
raise AttributeError('soft clipping doesn\'t support input orientation for a breakpoint', orient, read.cigar, read.get_tags())
return read.reference_end - 1
[docs]def calculate_alignment_score(read):
calculates a score for comparing alignments
read (pysam.AlignedSegment): the input read
float: the score
score = 0
qlen = read.reference_end - read.reference_start
max_score = 2 * qlen - 1
for c, v in read.cigar:
if c == CIGAR.M:
raise ValueError('cannot calculate the alignment score if mismatch v match has not been specified')
elif c == CIGAR.EQ:
score += 2 * v - 1
return score / max_score
[docs]def nsb_align(
ref, seq,
given some reference string and a smaller sequence string computes the best non-space-breaking alignment
i.e. an alignment that does not allow for indels (straight-match). Positions in the aligned segments are
given relative to the length of the reference sequence (1-based)
ref (str): the reference sequence
seq (str): the sequence being aligned
weight_of_score (float): when scoring alignments this determines the amount
of weight to place on the cigar match. Should be a number between 0 and 1
min_overlap_percent (float): the minimum amount of overlap of the input sequence to the reference
should be a number between 0 and 1
min_match (float): the minimum number of matches compared to total
scoring_function (callable): any function that will take a read as input and return a float
used in comparing alignments to choose the best alignment
:class:`list` of :class:`~pysam.AlignedSegment`: list of aligned segments
using a higher min_match may improve performance as low quality alignments are rejected more quickly. However
this may also result in no match being returned when there is no high quality match to be found.
ref = str(ref)
if len(ref) < 1 or len(seq) < 1:
raise AttributeError('cannot overlap on an empty sequence')
if min_match < 0 or min_match > 1:
raise AttributeError('min_match must be between 0 and 1')
if min_overlap_percent <= 0 or min_overlap_percent > 1:
raise AttributeError('percent must be greater than 0 and up to 1', min_overlap_percent)
min_overlap = int(round(min_overlap_percent * len(seq), 0))
# store to improve speed and space (don't need to store all alignments)
best_score = (0, 0)
results = []
putative_start_positions = range(min_overlap - len(seq), len(ref) + len(seq) - min_overlap)
if min_consecutive_match > 1:
putative_start_positions = set()
kmers_checked = {}
for i in range(0, len(seq) - min_consecutive_match):
current_kmer = seq[i:i + min_consecutive_match]
if current_kmer in kmers_checked:
putative_start_positions.update([p - i for p in kmers_checked[current_kmer]])
rp = [m.start() for m in re.finditer(current_kmer, ref)]
kmers_checked[current_kmer] = rp
putative_start_positions.update([p - i for p in rp])
for ref_start in putative_start_positions:
score = 0
cigar = []
mismatches = 0
length = len(seq)
for i in range(0, len(seq)):
if length == 0:
r = ref_start + i
if r < 0 or r >= len(ref): # outside the length of the reference seq
cigar.append((CIGAR.S, 1))
length -= 1
if DNA_ALPHABET.match(ref[r], seq[i]):
cigar.append((CIGAR.EQ, 1))
cigar.append((CIGAR.X, 1))
mismatches += 1
if mismatches / length > 1 - min_match:
if length == 0 or mismatches / length > 1 - min_match:
cigar = cigar_tools.join(cigar)
# end mismatches we set as soft-clipped
if cigar[0][0] == CIGAR.X:
cigar[0] = (CIGAR.S, cigar[0][1])
if cigar[-1][0] == CIGAR.X:
cigar[-1] = (CIGAR.S, cigar[-1][1])
qstart = 0 if cigar[0][0] != CIGAR.S else cigar[0][1]
a = pysam.AlignedSegment()
a.query_sequence = str(seq)
a.reference_start = ref_start + qstart
a.cigar = cigar
qlen = a.reference_end - a.reference_start
score = (scoring_function(a), qlen) # this way for equal identity matches we take the longer alignment
if qlen < min_overlap:
if score >= best_score:
best_score = score
results.append((a, score))
filtered = [x for x, y in results if y == best_score]
return filtered
[docs]def sequenced_strand(read, strand_determining_read=2):
determines the strand that was sequenced
read (:class:`~pysam.AlignedSegment`): the read being used to determine the strand
strand_determining_read (int): which read in the read pair is the same as the sequenced strand
STRAND: the strand that was sequenced
ValueError: if strand_determining_read is not 1 or 2
if the input pair is unstranded the information will not be representative of the
strand sequenced since the assumed convention is not followed
if read.is_unmapped or not read.is_paired:
raise ValueError('cannot determine strand if the read is unmapped or unpaired')
strand = None
if strand_determining_read == 1:
if read.is_read1:
strand = STRAND.NEG if read.is_reverse else STRAND.POS
strand = STRAND.NEG if not read.is_reverse else STRAND.POS
elif strand_determining_read == 2:
if read.is_read2:
strand = STRAND.NEG if read.is_reverse else STRAND.POS
strand = STRAND.NEG if not read.is_reverse else STRAND.POS
raise ValueError('unexpected value. Expected 1 or 2, found:', strand_determining_read)
return strand
[docs]def read_pair_type(read):
# check if the read pair is in the expected orientation
assumptions based on illumina pairs: only 4 possible combinations
read (:class:`~pysam.AlignedSegment`): the input read
READ_PAIR_TYPE: the type of input read pair
NotImplementedError: for any read that does not fall into the four expected configurations (see below)
++++> <---- is LR same-strand
++++> ++++> is LL opposite
<---- <---- is RR opposite
<---- ++++> is RL same-strand
reverse = False
if read.reference_id > read.next_reference_id or \
(read.reference_id == read.next_reference_id and read.reference_start > read.next_reference_start):
reverse = True
if not read.is_reverse and read.mate_is_reverse: # LR
return READ_PAIR_TYPE.RL if reverse else READ_PAIR_TYPE.LR
elif not read.is_reverse and not read.mate_is_reverse: # LL opp
elif read.is_reverse and read.mate_is_reverse: # RR opp
elif read.is_reverse and not read.mate_is_reverse: # RL
return READ_PAIR_TYPE.LR if reverse else READ_PAIR_TYPE.RL
raise NotImplementedError('unexpected orientation for pair')
[docs]def orientation_supports_type(read, event_type):
checks if the orientation is compatible with the type of event
read (:class:`~pysam.AlignedSegment`): a read from the pair
event_type (SVTYPE): the type of event to check
- ``True`` - the read pair is in the correct orientation for this event type
- ``False`` - the read is not in the correct orientation
if event_type == SVTYPE.DEL or event_type == SVTYPE.INS:
if read_pair_type(read) != READ_PAIR_TYPE.LR:
return False
elif event_type == SVTYPE.TRANS:
if read_pair_type(read) != READ_PAIR_TYPE.LR and \
read_pair_type(read) != READ_PAIR_TYPE.RL:
return False
elif event_type == SVTYPE.ITRANS or event_type == SVTYPE.INV:
if read_pair_type(read) != READ_PAIR_TYPE.LL and \
read_pair_type(read) != READ_PAIR_TYPE.RR:
return False
elif event_type == SVTYPE.DUP:
if read_pair_type(read) != READ_PAIR_TYPE.RL:
return False
raise ValueError('unexpected event type', event_type)
return True
[docs]def convert_events_to_softclipping(read, orientation, max_event_size, min_anchor_size=None):
given an alignment, simplifies the alignment by grouping everything past the first anchor and including the
first event considered too large and unaligning them turning them into softclipping
if min_anchor_size is None:
min_anchor_size = max_event_size
if orientation == ORIENT.LEFT:
event_size = 0
adjusted_cigar = []
anchor = 0
for state, count in read.cigar:
if state == CIGAR.M:
raise NotImplementedError('match v mismatch must be specified')
elif anchor < min_anchor_size:
if state == CIGAR.EQ:
anchor += count
elif state in EVENT_STATES:
event_size += count
if event_size > max_event_size:
event_size = 0
adjusted_cigar.append((state, count))
if event_size > max_event_size:
while adjusted_cigar[-1][0] in EVENT_STATES:
del adjusted_cigar[-1]
aligned = sum([y for x, y in adjusted_cigar if x in QUERY_ALIGNED_STATES] + [0])
sc = len(read.query_sequence) - aligned
adjusted_cigar.append((CIGAR.S, sc))
read = copy(read)
read.cigar = adjusted_cigar
elif orientation == ORIENT.RIGHT:
# more complicated than left b/c need to also adjust the start position
event_size = 0
anchor = 0
adjusted_cigar = []
for state, count in read.cigar[::-1]: # first event from the right
if state == CIGAR.M:
raise NotImplementedError('match v mismatch must be specified')
elif anchor < min_anchor_size:
if state == CIGAR.EQ:
anchor += count
elif state in EVENT_STATES:
event_size += count
if event_size > max_event_size:
event_size = 0
adjusted_cigar.append((state, count))
if event_size > max_event_size:
while adjusted_cigar[-1][0] in EVENT_STATES:
del adjusted_cigar[-1]
originally_refaligned = sum([y for x, y in read.cigar if x in REFERENCE_ALIGNED_STATES] + [0])
refaligned = sum([y for x, y in adjusted_cigar if x in REFERENCE_ALIGNED_STATES] + [0])
aligned = sum([y for x, y in adjusted_cigar if x in QUERY_ALIGNED_STATES] + [0])
sc = len(read.query_sequence) - aligned
adjusted_cigar = [(CIGAR.S, sc)] + adjusted_cigar[::-1]
read = copy(read)
read.cigar = adjusted_cigar
read.reference_start += originally_refaligned - refaligned
raise ValueError('orientation must be specified', orientation)
return read