import sys
try:
import wx
hasWx = True
#Check if wx is the newest 3.0+ version:
try:
from wx.lib.pubsub import pub
pub.subscribe
newWx = True
except AttributeError as e:
from wx.lib.pubsub import Publisher as pub
newWx = False
except Exception as e:
hasWx = False
newWx = False
import os
import time
import math
import random
import numpy
import scipy.stats
import datetime
import warnings
import base
import pytransit.transit_tools as transit_tools
import pytransit.tnseq_tools as tnseq_tools
import pytransit.norm_tools as norm_tools
import pytransit.stat_tools as stat_tools
############# GUI ELEMENTS ##################
short_name = "gumbel"
long_name = "Bayesian analysis of essentiality based on long gaps."
description = """Bayesian methods of analyzing longest runs of non-insertions in a row. Estimates the parameters using the MCMC sampling, and estimates posterior probabilities of essentiality.
Reference: DeJesus et al. (2013; Bioinformatics)"""
transposons = ["himar1"]
columns = ["Orf","Name","Desc","k","n","r","s","zbar", "Call"]
############# Analysis Method ##############
[docs]class GumbelAnalysis(base.TransitAnalysis):
def __init__(self):
base.TransitAnalysis.__init__(self, short_name, long_name, description, transposons, GumbelMethod, GumbelGUI, [GumbelFile])
################## FILE ###################
[docs]class GumbelFile(base.TransitFile):
def __init__(self):
base.TransitFile.__init__(self, "#Gumbel", columns)
################# GUI ##################
[docs]class GumbelGUI(base.AnalysisGUI):
[docs] def definePanel(self, wxobj):
self.wxobj = wxobj
gumbelPanel = wx.Panel( self.wxobj.optionsWindow, wx.ID_ANY, wx.DefaultPosition, wx.DefaultSize, wx.TAB_TRAVERSAL )
gumbelSection = wx.BoxSizer( wx.VERTICAL )
gumbelLabel = wx.StaticText( gumbelPanel, wx.ID_ANY, u"Gumbel Options", wx.DefaultPosition, wx.DefaultSize, 0 )
gumbelLabel.Wrap( -1 )
gumbelSection.Add( gumbelLabel, 0, wx.ALL|wx.ALIGN_CENTER_HORIZONTAL, 5 )
mainSizer1 = wx.BoxSizer( wx.VERTICAL )
# Samples
(gumbelSampleLabel, self.wxobj.gumbelSampleText, sampleSizer) = self.defineTextBox(gumbelPanel, u"Samples:", u"10000", "These are the number of samples to take when estimating the parameters. More samples give more accurate estimates of the parameters at the cost of computation time.")
mainSizer1.Add(sampleSizer, 1, wx.ALIGN_CENTER_HORIZONTAL|wx.EXPAND, 5 )
# Burn-In
(gumbelBurninLabel, self.wxobj.gumbelBurninText, burninSizer) = self.defineTextBox(gumbelPanel, u"Burn-In:", u"500", "These are the number of samples to take before beginning to estimate the parameters. Allows the MCMC sampler to 'converge' to the true parameter space. More samples give more accurate estimates of the parameters at the cost of computation time.")
mainSizer1.Add(burninSizer, 1, wx.ALIGN_CENTER_HORIZONTAL|wx.EXPAND, 5 )
# Trim
(gumbelTrimLabel, self.wxobj.gumbelTrimText, trimSizer) = self.defineTextBox(gumbelPanel, u"Trim:", u"1", "The MCMC sample will keep every i-th sample. A value of '1' will take all samples. Larger values will reduces autocorrelation at the cost of a substantial cost in computation time.")
mainSizer1.Add(trimSizer, 1, wx.ALIGN_CENTER_HORIZONTAL|wx.EXPAND, 5 )
# Min Read
gumbelReadChoiceChoices = [ u"1", u"2", u"3", u"4", u"5" ]
(gumbelReadLabel, self.wxobj.gumbelReadChoice, readSizer) = self.defineChoiceBox(gumbelPanel, u"Minimum Read:", gumbelReadChoiceChoices, "This is the minimum number of reads to consider a 'true' insertion. Value of 1 will consider all insertions. Larger values allow the method to ignore spurious insertions which might interrupt a run of non-insertions. Noisy datasets or those with many replicates can beneffit from increasing this.")
mainSizer1.Add(readSizer, 1, wx.ALIGN_CENTER_HORIZONTAL|wx.EXPAND, 5 )
# Replicates
gumbelRepChoiceChoices = [ u"Sum", u"Mean" ]
(gumbelRepLabel, self.wxobj.gumbelRepChoice, repSizer) = self.defineChoiceBox(gumbelPanel, u"Replicates:", gumbelRepChoiceChoices, "Determines how to handle replicates, and their read-counts. When using many replicates, summing read-counts may make spurious counts appear to be significantly large and interrupt a run of non-insertions.")
mainSizer1.Add(repSizer, 1, wx.ALIGN_CENTER_HORIZONTAL|wx.EXPAND, 5 )
gumbelSection.Add( mainSizer1, 1, wx.EXPAND, 5 )
gumbelButton = wx.Button( gumbelPanel, wx.ID_ANY, u"Run Gumbel", wx.DefaultPosition, wx.DefaultSize, 0 )
gumbelSection.Add( gumbelButton, 0, wx.ALL|wx.ALIGN_CENTER_HORIZONTAL, 5 )
gumbelPanel.SetSizer( gumbelSection )
gumbelPanel.Layout()
gumbelSection.Fit( gumbelPanel )
#Connect events
gumbelButton.Bind( wx.EVT_BUTTON, self.wxobj.RunMethod )
self.panel = gumbelPanel
########## METHOD #######################
ALPHA = 1
BETA = 1
[docs]class GumbelMethod(base.SingleConditionMethod):
"""
Gumbel
"""
def __init__(self,
ctrldata,
annotation_path,
output_file,
samples=10000,
burnin=500,
trim=1,
minread=1,
replicates="Sum",
normalization=None,
LOESS=False,
ignoreCodon=True,
NTerminus=0.0,
CTerminus=0.0, wxobj=None):
base.SingleConditionMethod.__init__(self, short_name, long_name, description, ctrldata, annotation_path, output_file, replicates=replicates, normalization=normalization, LOESS=LOESS, NTerminus=NTerminus, CTerminus=CTerminus, wxobj=wxobj)
self.samples = samples
self.burnin = burnin
self.trim = trim
self.minread = minread
self.cache_nn = {}
[docs] @classmethod
def fromGUI(self, wxobj):
""" """
#Get Annotation file
annotationPath = wxobj.annotation
if not transit_tools.validate_annotation(annotationPath):
return None
#Get selected files
ctrldata = wxobj.ctrlSelected()
if not transit_tools.validate_control_datasets(ctrldata):
return None
#Validate transposon types
if not transit_tools.validate_transposons_used(ctrldata, transposons):
return None
#Read the parameters from the wxPython widgets
try:
minread = int(wxobj.gumbelReadChoice.GetString(wxobj.gumbelReadChoice.GetCurrentSelection()))
except:
warnings.warn("Warning: problem reading minimum read parameter. Assuming a value of '1'")
minread = 1
try:
samples = int(wxobj.gumbelSampleText.GetValue())
except:
warnings.warn("Warning: problem reading samples parameter. Assuming a value of '10000'")
samples = 10000
try:
burnin = int(wxobj.gumbelBurninText.GetValue())
except:
warnings.warn("Warning: problem reading burnin parameter. Assuming a value of '500'")
burnin = 500
try:
trim = int(wxobj.gumbelTrimText.GetValue())
except:
warnings.warn("Warning: problem reading trim parameter. Assuming a value of '1'")
trim = 1
try:
replicates = wxobj.gumbelRepChoice.GetString(wxobj.gumbelRepChoice.GetCurrentSelection())
except:
warnings.warn("Warning: problem reading replicates parameter. Assuming a value of 'Mean'")
replicates = "Mean"
ignoreCodon = True
try:
NTerminus = float(wxobj.globalNTerminusText.GetValue())
except:
warnings.warn("Warning: problem reading NTerminus parameter. Assuming a value of '0.00'")
NTerminus = 0.0
try:
CTerminus = float(wxobj.globalCTerminusText.GetValue())
except:
warnings.warn("Warning: problem reading CTerminus parameter. Assuming a value of '0.00'")
CTerminus = 0.0
normalization = None
LOESS = False
#Get output path
name = transit_tools.basename(ctrldata[0])
defaultFileName = "gumbel_%s_s%d_b%d_t%d.dat" % (".".join(name.split(".")[:-1]), samples, burnin, trim)
defaultDir = os.getcwd()
output_path = wxobj.SaveFile(defaultDir, defaultFileName)
if not output_path: return None
output_file = open(output_path, "w")
return self(ctrldata,
annotationPath,
output_file,
samples,
burnin,
trim,
minread,
replicates,
normalization,
LOESS,
ignoreCodon,
NTerminus,
CTerminus, wxobj)
[docs] @classmethod
def fromargs(self,rawargs):
(args, kwargs) = transit_tools.cleanargs(rawargs)
ctrldata = args[0].split(",")
annotationPath = args[1]
outpath = args[2]
output_file = open(outpath, "w")
samples = int(kwargs.get("s", 10000))
burnin = int(kwargs.get("b", 500))
trim = int(kwargs.get("t", 1))
minread = int(kwargs.get("m", 1))
replicates = kwargs.get("r", "Sum")
normalization = None
LOESS = False
ignoreCodon = True
NTerminus = float(kwargs.get("iN", 0.0))
CTerminus = float(kwargs.get("iC", 0.0))
return self(ctrldata,
annotationPath,
output_file,
samples,
burnin,
trim,
minread,
replicates,
normalization,
LOESS,
ignoreCodon,
NTerminus,
CTerminus)
[docs] def Run(self):
self.status_message("Starting Gumbel Method")
#Set Default parameter values
w1 = 0.15
w0 = 1.0 - w1
ALPHA = 1
BETA = 1
ALPHA_w = 600
BETA_w = 3400
mu_c = 0
acctot = 0.0
phi_start = 0.3
sigma_c = 0.01
start_time = time.time()
self.progress_range(self.samples+self.burnin)
#Get orf data
self.transit_message("Reading Annotation")
#Validate data has empty sites
#(status, genome) = transit_tools.validate_wig_format(self.ctrldata, wxobj=self.wxobj)
#if status <2: tn_used = "himar1"
#else: tn_used = "tn5"
self.transit_message("Getting Data")
(data, position) = transit_tools.get_validated_data(self.ctrldata, wxobj=self.wxobj)
(K,N) = data.shape
if self.normalization and self.normalization != "nonorm":
self.transit_message("Normalizing using: %s" % self.normalization)
(data, factors) = norm_tools.normalize_data(data, self.normalization, self.ctrldata, self.annotation_path)
G = tnseq_tools.Genes(self.ctrldata, self.annotation_path, minread=self.minread, reps=self.replicates, ignoreCodon=self.ignoreCodon, nterm=self.NTerminus, cterm=self.CTerminus, data=data, position=position)
ii_good = numpy.array([self.good_orf(g) for g in G]) # Gets index of the genes that can be analyzed
K = G.local_insertions()[ii_good]
N = G.local_sites()[ii_good]
R = G.local_runs()[ii_good]
S = G.local_gap_span()[ii_good]
T = G.local_gene_span()[ii_good]
self.transit_message("Doing Regression")
mu_s, temp, sigma_s = stat_tools.regress(R, S) # Linear regression to estimate mu_s, sigma_s for span data
mu_r, temp, sigma_r = stat_tools.regress(S, R) # Linear regression to estimate mu_r, sigma_r for run data
N_GENES = len(G)
N_GOOD = sum(ii_good)
self.transit_message("Setting Initial Class")
Z_sample = numpy.zeros((N_GOOD, self.samples))
Z = [self.classify(g.n, g.r, 0.5) for g in G if self.good_orf(g)]
Z_sample[:,0] = Z
N_ESS = numpy.sum(Z_sample[:,0] == 1)
phi_sample = numpy.zeros(self.samples) #[]
phi_sample[0] = phi_start
phi_old = phi_start
phi_new = 0.00
SIG = numpy.array([self.sigmoid(g.s, g.t) * scipy.stats.norm.pdf(g.r, mu_r*g.s, sigma_r) for g in G if self.good_orf(g)])
i = 1; count = 0;
while i < self.samples:
try:
# PHI
acc = 1.0
phi_new = phi_old + random.gauss(mu_c, sigma_c)
i0 = Z_sample[:,i-1] == 0
if phi_new > 1 or phi_new <= 0 or (self.F_non(phi_new, N[i0], R[i0]) - self.F_non(phi_old, N[i0], R[i0])) < math.log(random.uniform(0,1)):
phi_new = phi_old
acc = 0.0
flag = 0
# Z
Z = self.sample_Z(phi_new, w1, N, R, S, T, mu_s, sigma_s, SIG)
# w1
N_ESS = sum(Z == 1)
w1 = scipy.stats.beta.rvs(N_ESS + ALPHA_w, N_GOOD - N_ESS + BETA_w)
count +=1
acctot+=acc
if (count > self.burnin) and (count % self.trim == 0):
phi_sample[i] = phi_new
Z_sample[:,i] = Z
i+=1
except ValueError as e:
self.transit_message("Error: %s" % e)
self.transit_message("This is likely to have been caused by poor data (e.g. too sparse).")
self.transit_message("If the density of the dataset is too low, the Gumbel method will not work.")
self.transit_message("Quitting.")
return
phi_old = phi_new
#Update progress
text = "Running Gumbel Method... %2.0f%%" % (100.0*(count+1)/(self.samples+self.burnin))
self.progress_update(text, count)
self.transit_message_inplace(text)
ZBAR = numpy.apply_along_axis(numpy.mean, 1, Z_sample)
(ess_t, non_t) = stat_tools.bayesian_ess_thresholds(ZBAR)
#Orf k n r s zbar
self.output.write("#Gumbel\n")
if self.wxobj:
members = sorted([attr for attr in dir(self) if not callable(getattr(self,attr)) and not attr.startswith("__")])
memberstr = ""
for m in members:
memberstr += "%s = %s, " % (m, getattr(self, m))
self.output.write("#GUI with: ctrldata=%s, annotation=%s, output=%s, samples=%s, minread=%s, trim=%s\n" % (",".join(self.ctrldata).encode('utf-8'), self.annotation_path.encode('utf-8'), self.output.name.encode('utf-8'), self.samples, self.minread, self.trim))
else:
self.output.write("#Console: python %s\n" % " ".join(sys.argv))
self.output.write("#Data: %s\n" % (",".join(self.ctrldata).encode('utf-8')))
self.output.write("#Annotation path: %s\n" % self.annotation_path.encode('utf-8'))
self.output.write("#FDR Corrected thresholds: %f, %f\n" % (ess_t, non_t))
self.output.write("#MH Acceptance-Rate:\t%2.2f%%\n" % (100.0*acctot/count))
self.output.write("#Total Iterations Performed:\t%d\n" % count)
self.output.write("#Sample Size:\t%d\n" % i)
self.output.write("#phi estimate:\t%f\n" % numpy.average(phi_sample))
self.output.write("#Time: %s\n" % (time.time() - start_time))
self.output.write("#%s\n" % "\t".join(columns))
i = 0
data = []
for g in G:
if not self.good_orf(g):
zbar = -1.0
else:
zbar = ZBAR[i]
i+=1
if zbar > ess_t:
call = "E"
elif non_t <= zbar <= ess_t:
call = "U"
elif 0 <= zbar < non_t:
call = "NE"
else:
call = "S"
data.append("%s\t%s\t%s\t%d\t%d\t%d\t%d\t%f\t%s\n" % (g.orf, g.name, g.desc, g.k, g.n, g.r, g.s, zbar, call))
data.sort()
for line in data:
self.output.write(line)
self.output.close()
self.transit_message("") # Printing empty line to flush stdout
self.transit_message("Adding File: %s" % (self.output.name))
self.add_file(filetype="Gumbel")
self.finish()
self.transit_message("Finished Gumbel Method")
[docs] @classmethod
def usage_string(self):
return """python %s gumbel <comma-separated .wig files> <annotation .prot_table or GFF3> <output file> [Optional Arguments]
Optional Arguments:
-s <integer> := Number of samples. Default: -s 10000
-b <integer> := Number of Burn-in samples. Default -b 500
-m <integer> := Smallest read-count to consider. Default: -m 1
-t <integer> := Trims all but every t-th value. Default: -t 1
-r <string> := How to handle replicates. Sum or Mean. Default: -r Sum
-iN <float> := Ignore TAs occuring at given fraction of the N terminus. Default: -iN 0.0
-iC <float> := Ignore TAs occuring at given fraction of the C terminus. Default: -iC 0.0
""" % (sys.argv[0])
[docs] def good_orf(self, gene):
return (gene.n >= 3 and gene.t >= 150)
[docs] def classify(self, n,r,p):
if n == 0: return 0
q = 1-p; B = 1/math.log(1/p); u = math.log(n*q,1/p);
pval = 1 - scipy.stats.gumbel_r.cdf(r,u,B)
if pval < 0.05: return(1)
else: return(0)
[docs] def F_non(self, p, N, R):
q = 1.0 - p
total = numpy.log(scipy.stats.beta.pdf(p,ALPHA,BETA))
mu = numpy.log(N*q) / numpy.log(1/p)
sigma = 1/math.log(1/p);
total+= numpy.sum(numpy.log(scipy.stats.gumbel_r.pdf(R, mu, sigma)))
return(total)
[docs] def sample_Z(self, p, w1, N, R, S, T, mu_s, sigma_s, SIG):
G = len(N)
q = 1.0-p
mu = numpy.log(N*q) / numpy.log(1.0/p)
sigma = 1.0/math.log(1.0/p);
h0 = ((scipy.stats.gumbel_r.pdf(R,mu,sigma)) * scipy.stats.norm.pdf(S, mu_s*R, sigma_s) * (1-w1))
h1 = SIG * w1
p_z1 = h1/(h0+h1)
return scipy.stats.binom.rvs(1, p_z1, size=G)
[docs] def sigmoid(self,d,n):
Kn = 0.1
MEAN_DOMAIN_SPAN = 300
if d == 0: return(0.00)
f = 1./(1.+math.exp(Kn*(MEAN_DOMAIN_SPAN-d)))
#if n in self.cache_nn: return f/self.cache_nn[n]
tot = 0
N = int(n+1)
for i in range(1,N): tot += 1.0/(1.0+math.exp(Kn*(MEAN_DOMAIN_SPAN-i)))
self.cache_nn[n] = tot
return f/tot
if __name__ == "__main__":
(args, kwargs) = transit_tools.cleanargs(sys.argv)
G = GumbelMethod.fromargs(sys.argv[1:])
G.console_message("Printing the member variables:")
G.print_members()
print ""
print "Running:"
G.Run()