Source code for labtools.adtools.seqlib

import gzip
from random import sample
import subprocess
from sklearn.utils.random import sample_without_replacement
import csv
import linecache

# fasta reader
[docs]def read_fasta(filename): """Generator for reading entries in a fasta file. Yields 2 lines of a fasta file at a time (name, seq). Parameters ---------- filename : str Path to fasta or fasta.gz file. Yields ---------- (name, seq) : (str, str) Name of sequence, biological sequence. Examples ---------- >>> for line in read_fasta("example.fasta"): ... name = line[0] ... seq = line[1] ... print(name, seq) Geraldine ACGTGCTGAGGCTGCGCTAGCAT Gustavo CTGATGCTAGATGCTGATA """ name = None seqs = [] fp = None if filename.endswith('.gz'): fp = gzip.open(filename, 'rt') else: fp = open(filename) for line in fp.readlines(): line = line.rstrip() if line.startswith('>'): if len(seqs) > 0: seq = ''.join(seqs) yield(name, seq) name = line[1:] seqs = [] else: name = line[1:] else: seqs.append(line) yield(name, ''.join(seqs)) fp.close()
# fastq reader # this method opens the entire fastq into memory # which is great... as long as the fastq file isn't > your RAM lol # yes this happened so please see the read_fastq_big method below for that
[docs]def read_fastq(filename, subset=None): """Generator for reading entries in a fastq file. Yields 4 lines of a fastq file at a time (name, seq, +, error). Parameters ---------- filename : str Path to fastq or fastq.gz file. subset : int, optional Number of reads to randomly sample from the fastq file. Yields ---------- (name, seq, qual) : (str, str, str) tuple of str containing name, seq and quality for entry. Examples ---------- >>> for line in read_fastq("example.fasta"): ... name = line[0] ... seq = line[1] ... qual = line[2] ... print(name, seq) Geraldine ACGTGCTGAGGCTGCGCTAGCAT Gustavo CTGATGCTAGATGCTGATA """ # add a warning for large fastq files and suggest using read_fastq_big name = None seqs = [] fp = None if filename.endswith('.gz'): fp = gzip.open(filename, 'rt') else: fp = open(filename) lines = fp.readlines() all_reads = range(0, len(lines), 4) if subset: subset_indices = sample_without_replacement(len(all_reads), subset) subset_reads = [all_reads[i] for i in subset_indices] else: subset_reads = all_reads for num in subset_reads: name = lines[num] seq = lines[num+1] opt = lines[num+2] qual = lines[num+3] yield(name.rstrip(), seq.rstrip(), qual.rstrip()) fp.close()
[docs]def read_fastq_big(filename, subset = None, progress = True, **kwargs): """Generator for fastq file without opening into memory. Yields 4 lines of a fastq file at a time (name, seq, +, error). Useful in situations where the fastq file is large and opening into RAM would crash computer. Supports subsetting with sklearn.sample_without_replacement(). Parameters ---------- filename : str Path to fastq or fastq.gz file. subset: int Number of reads to randomly subsample from file. Yields ---------- (name, seq, qual) : (str, str, str) tuple of str containing name, seq and quality for entry. Examples ---------- >>> for line in read_fastq_big("example.fasta"): ... name = line[0] ... seq = line[1] ... qual = line[2] ... print(name, seq) Geraldine ACGTGCTGAGGCTGCGCTAGCAT Gustavo CTGATGCTAGATGCTGATA """ name = None seqs = [] if progress: print(f"Opening file {filename.split('/')[-1]} ...", flush = True) if filename.endswith('.gz') and subset != None: raise NotImplementedError("No implementation for subsetting from a gzip file, please unzip.") elif filename.endswith('.gz'): opener = gzip.open(filename, 'rt') else: opener = open(filename) if subset != None: if progress: print("Counting reads...", end = '\r', flush = True) numreads = get_numreads(filename) # every 4 lines is a read in fastq all_reads = range(0, numreads*4, 4) if subset != None: # a list of indices to subsample subset_indices = sample_without_replacement(len(all_reads), subset) # get the actual read number from the index (aka the index*4) subset_reads = [all_reads[i] for i in subset_indices] if progress: print("Reading lines...", end = '\r', flush = True) linenum = 0 total_line_num = 0 yielded_reads = 0 for line in subset_reads: name = linecache.getline(filename, line+1) seq = linecache.getline(filename, line+2) qual = linecache.getline(filename, line+4) yield(name.rstrip(), seq.rstrip(), qual.rstrip()) yielded_reads += 1 if progress: print(f"Total parsed reads = {numreads} with {yielded_reads} yielded ", flush = True) else: with opener as file: linenum = 0 readnum = 0 if progress: print(f"Reading lines... {readnum} reads completed", end = '\r', flush = True) char_list = ['|', '/', '-', '\\'] c = 0 for line in file: linenum += 1 if linenum == 5: linenum = 1 if linenum == 1: name = line elif linenum == 2: seq = line elif linenum == 3: opt = line elif linenum == 4: qual = line yield(name.rstrip(), seq.rstrip(), qual.rstrip()) readnum += 1 if readnum%100000 == 0: char = char_list[c] c += 1 if c == 4: c = 0 if progress: print(f"Reading lines {char} {readnum} reads completed", end = '\r', flush = True) if progress: print(f"Total parsed reads = {readnum:,} ", flush = True) opener.close()
[docs]def get_numreads(filename): """Returns number of reads in a fastq or fastq.gz file. Parameters ---------- filename : str Path to fastq or fastq.gz file. Returns ---------- numreads : int Number of reads in the fastq file. Examples ---------- >>> get_numreads("example.fastq") 124 """ fp = None if filename.endswith(".gz"): bashCommand = f"zgrep -c '@' {filename}" else: bashCommand = f"grep -c '@' {filename}" process = subprocess.run(bashCommand, shell=True, capture_output=True, text=True) numreads = int(process.stdout) return numreads
[docs]def get_numreads_old(filename): """Returns number of reads in a fastq file. Parameters ---------- filename : str Path to fastq file. Returns ---------- numreads : int Number of reads in the fastq file. Examples ---------- >>> get_numreads("example.fastq") 124 """ fp = None if filename.endswith(".gz"): fp = gzip.open(filename, 'rt') else: fp = open(filename) lines = fp.readlines() numreads = len(range(0, len(lines), 4)) return numreads
[docs]def write_bc_dict(bc_dict, name): """Writes bc_dict to a csv. Parameters ---------- bc_dict : dict Dictionary output from counter.create_map(). name : str Filename for output csv. Ex "Library1_dictionary" """ with open(f'{name}', 'w') as f: w = csv.DictWriter(f, bc_dict.keys()) w.writeheader() w.writerow(bc_dict)
[docs]def read_bc_dict(filename): """Reads bc_dict from a csv. Parameters ---------- filename : str Path to csv containing a single dictionary. Returns ---------- bc_dict : dict Dictionary. """ with open(filename, newline='') as csvfile: reader = csv.DictReader(csvfile) for d in reader: bc_dict = d return(bc_dict)