from . import *
# http://stackoverflow.com/a/32107024/353278
# use dot notation on dicts for convenience
[docs]
class dotdict(dict):
def __init__(self, *args, **kwargs):
super(dotdict, self).__init__(*args, **kwargs)
for arg in args:
if isinstance(arg, dict):
for k, v in list(arg.items()):
self[k] = v
if kwargs:
for k, v in list(kwargs.items()):
self[k] = v
def __getattr__(self, attr):
return self.get(attr)
def __setattr__(self, key, value):
self.__setitem__(key, value)
def __setitem__(self, key, value):
super(dotdict, self).__setitem__(key, value)
self.__dict__.update({key: value})
def __delattr__(self, item):
self.__delitem__(item)
def __delitem__(self, key):
super(Map, self).__delitem__(key) # TODO: no definition of Map
del self.__dict__[key]
# from http://locallyoptimal.com/blog/2013/01/20/elegant-n-gram-generation-in-python/
# generate list of ngrams
[docs]
def find_ngrams(input_list, n):
"""
Generate a list of n-grams from an input list.
Parameters
----------
input_list : list
The input list from which to generate n-grams.
n : int
The number of elements in each n-gram.
Returns
-------
list of tuple
A list of n-gram tuples.
"""
return list(zip(*[input_list[i:] for i in range(n)]))
# modified from ExGUtils package by Daniel Gamermann <gamermann@gmail.com>
# helper function generate flast lists from nested lists
# modified from http://stackoverflow.com/a/952952/353278
# flattens list of list one level only, preserving non-list items
# flattens type list and type np.ndarray, nothing else (on purpose)
[docs]
def flatten_list(l, numtimes=1):
"""
Flatten a nested list or numpy array by one level, optionally multiple times.
Parameters
----------
l : list
The list to flatten.
numtimes : int, optional
Number of times to flatten the list, by default 1.
Returns
-------
list
The flattened list.
"""
l1 = [item for sublist in l if isinstance(sublist,list) or isinstance(sublist,np.ndarray) for item in sublist]
l = l1+[item for item in l if not isinstance(item,list) and not isinstance(item,np.ndarray)]
if numtimes > 1:
l = flatten_list(l, numtimes-1)
return l
# log trick given list of log-likelihoods **UNUSED
[docs]
def logTrick(loglist):
"""
Numerically stable log-sum-exp trick for a list of log-likelihoods.
Parameters
----------
loglist : list of float
A list of log-likelihood values.
Returns
-------
float
The log of the summed exponentiated values.
"""
logmax=max(loglist)
loglist=[i-logmax for i in loglist] # log trick: subtract off the max
p=np.log(sum([np.e**i for i in loglist])) + logmax # add it back on
return p
# helper function grabs highest n items from list items **UNUSED
# http://stackoverflow.com/questions/350519/getting-the-lesser-n-elements-of-a-list-in-python
[docs]
def maxn(items,n):
"""
Return the top n maximum elements from a list.
Parameters
----------
items : list
Input list of numeric values.
n : int
Number of maximum values to retrieve.
Returns
-------
list
A sorted list of the top n maximum values.
"""
maxs = items[:n]
maxs.sort(reverse=True)
for i in items[n:]:
if i > maxs[-1]:
maxs.append(i)
maxs.sort(reverse=True)
maxs= maxs[:n]
return maxs
# find best ex-gaussian parameters
# port from R's retimes library, mexgauss function by Davide Massidda <davide.massidda@humandata.it>
# returns [mu, sigma, lambda]
[docs]
def mexgauss(rts):
"""
Estimate parameters for the ex-Gaussian distribution from response times.
This function estimates the parameters of an ex-Gaussian distribution
(mu, sigma, lambda) using the method of moments. It is ported from the
`mexgauss` function in R's `retimes` package.
Parameters
----------
rts : array-like
A list or array of response times.
Returns
-------
tuple of float
A tuple containing:
- mu : float
Mean of the normal component.
- sigma : float
Standard deviation of the normal component.
- lambda : float
Rate parameter of the exponential component (1/tau).
"""
n = len(rts)
k = [np.nan, np.nan, np.nan]
start = [np.nan, np.nan, np.nan]
k[0] = np.mean(rts)
xdev = [rt - k[0] for rt in rts]
k[1] = sum([i**2 for i in xdev])/(n - 1.0)
k[2] = sum([i**3 for i in xdev])/(n - 1.0)
if (k[2] > 0):
start[2] = (k[2]/2.0)**(1/3.0)
else:
start[2] = 0.8 * np.std(rts)
start[1] = np.sqrt(abs(k[1] - start[2]**2))
start[0] = k[0] - start[2]
start[2] = (1.0/start[2]) # tau to lambda
return(start)
# decorator; disables garbage collection before a function, enable gc after function completes
# provides some speed-up for functions that have lots of unnecessary garbage collection (e.g., lots of list appends)
[docs]
def nogc(fun):
"""
Decorator to disable garbage collection during function execution.
Temporarily disables garbage collection to potentially speed up functions
that involve frequent memory allocations and deallocations.
Parameters
----------
fun : callable
The function to wrap.
Returns
-------
callable
The wrapped function with garbage collection disabled during execution.
"""
import gc
def gcwrapper(*args, **kwargs):
gc.disable()
returnval = fun(*args, **kwargs)
gc.enable()
return returnval
return gcwrapper
# take list of lists in number/node and translate back to items using dictionary (e.g., 1->dog, 2->cat)
[docs]
def numToItemLabel(data, items):
"""
Convert numerical indices in nested lists to corresponding item labels.
Parameters
----------
data : list of list of int
Lists containing indices of items.
items : dict
Dictionary mapping indices to labels.
Returns
-------
list of list of str
Nested lists with item labels instead of indices.
"""
new_data=[]
for l in data:
new_data.append([])
for i in l:
new_data[-1].append(items[i])
return new_data
# modified from ExGUtils package by Daniel Gamermann <gamermann@gmail.com>
[docs]
def rand_exg(irt, sigma, lambd):
"""
Generate a random sample from an ex-Gaussian distribution.
Parameters
----------
irt : float
Mean of the Gaussian component.
sigma : float
Standard deviation of the Gaussian component.
lambd : float
Rate parameter (1/tau) of the exponential component.
Returns
-------
float
A sample drawn from the ex-Gaussian distribution.
"""
tau=(1.0/lambd)
nexp = -tau*np.log(1.-np.random.random())
ngau = np.random.normal(irt, sigma)
return nexp + ngau
#def renumber(Xs,numsubs,numper):
# start=0
# end=numper
# ssnumnodes=[]
# itemsb=[]
# datab=[]
# for sub in range(len(subs)):
# subXs = Xs[start:end]
# itemset = set(snafu.flatten_list(subXs))
# ssnumnodes.append(len(itemset))
#
# ss_items = {}
# convertX = {}
# for itemnum, item in enumerate(itemset):
# ss_items[itemnum] = items[item]
# convertX[item] = itemnum
#
# itemsb.append(ss_items)
#
# subXs = [[convertX[i] for i in x] for x in subXs]
# datab.append(subXs)
# start += 3
# end += 3
# decorator; prints elapsed time for function call
[docs]
def timer(fun):
"""
Decorator that prints the elapsed time of a function call.
Parameters
----------
fun : callable
The function to time.
Returns
-------
callable
The wrapped function that prints execution time.
"""
from datetime import datetime
def timerwrapper(*args, **kwargs):
starttime=datetime.now()
returnval = fun(*args, **kwargs)
elapsedtime=str(datetime.now()-starttime)
print(elapsedtime)
return returnval
return timerwrapper
[docs]
def reverseDict(items):
"""
Reverse keys and values in a dictionary.
Parameters
----------
items : dict
Dictionary to reverse.
Returns
-------
dict
Dictionary with keys and values swapped.
"""
newitems=dict()
for itemnum in items:
itemlabel = items[itemnum]
newitems[itemlabel] = itemnum
return newitems
# remove perseverations -- keep only first occurrence in place
# https://www.peterbe.com/plog/uniqifiers-benchmark
[docs]
def no_persev(x):
"""
This function is copied from scipy to avoid shipping that whole library with snafu
unlike scipy version, this one doesn't return p-value (requires C code from scipy)
"""
seen = set()
seen_add = seen.add
return [i for i in x if not (i in seen or seen_add(i))]
# this function is copied from scipy to avoid shipping that whole library with snafu
# unlike scipy version, this one doesn't return p-value (requires C code from scipy)
[docs]
def pearsonr(x, y):
"""
Compute the Pearson correlation coefficient between two arrays.
Parameters
----------
x : array-like
First input array.
y : array-like
Second input array.
Returns
-------
float
Pearson correlation coefficient.
"""
def _sum_of_squares(a, axis=0):
a, axis = _chk_asarray(a, axis)
return np.sum(a*a, axis)
def _chk_asarray(a, axis):
if axis is None:
a = np.ravel(a)
outaxis = 0
else:
a = np.asarray(a)
outaxis = axis
if a.ndim == 0:
a = np.atleast_1d(a)
return a, outaxis
# x and y should have same length.
x = np.asarray(x)
y = np.asarray(y)
n = len(x)
mx = x.mean()
my = y.mean()
xm, ym = x - mx, y - my
r_num = np.add.reduce(xm * ym)
r_den = np.sqrt(_sum_of_squares(xm) * _sum_of_squares(ym))
r = r_num / r_den
return r
# takes an individual's data in group space and translates it into local space
[docs]
def groupToIndividual(Xs, group_dict):
"""
Map group-level node labels to individual-level indices.
Parameters
----------
Xs : list of list of int
Participant responses in group space.
group_dict : dict
Mapping of group node indices to labels.
Returns
-------
tuple
- Translated data with local indices.
- Dictionary mapping local indices to labels.
"""
itemset = set(flatten_list(Xs))
ss_items = {}
convertX = {}
for itemnum, item in enumerate(itemset):
ss_items[itemnum] = group_dict[item]
convertX[item] = itemnum
Xs = [[convertX[i] for i in x] for x in Xs]
return Xs, ss_items
# take Xs and convert them from numbers (nodes) to labels
[docs]
def numToLabel(Xs, items):
"""
Convert numerical node IDs to corresponding labels in-place.
Parameters
----------
Xs : list of list of int
Lists containing node indices.
items : dict
Dictionary mapping node indices to labels.
Returns
-------
list of list of str
Lists with node labels.
"""
for lnum, l in enumerate(Xs):
for inum, i in enumerate(l):
Xs[lnum][inum]=items[i]
return Xs
# flat list from tuple walk
[docs]
def nodes_from_edges(walk):
"""
Convert a sequence of edges into a sequence of nodes.
Assumes the input is a list of (source, target) tuples representing a walk
through a graph. Reconstructs the sequence of visited nodes by taking the
source of each edge and appending the target of the last edge.
Parameters
----------
walk : list of tuple
List of edges (as tuples of nodes) representing a walk.
Returns
-------
list
List of nodes visited in the walk.
"""
path=list(list(zip(*walk))[0]) # first element from each tuple
path.append(walk[-1][1]) # second element from last tuple
return path
# tuple walk from flat list
[docs]
def edges_from_nodes(path):
"""
Convert a sequence of nodes into a sequence of edges.
Creates a list of consecutive (source, target) tuples from an ordered list
of nodes representing a walk through a graph.
Parameters
----------
path : list
List of nodes in the order they were visited.
Returns
-------
list of tuple
List of edges representing transitions between consecutive nodes.
"""
walk=[]
for i in range(len(path)-1):
walk.append((path[i],path[i+1]))
return walk
[docs]
def stationary(t, method="unweighted"):
"""
Compute the stationary distribution of a transition matrix.
Parameters
----------
t : ndarray
Transition matrix.
method : str, optional
Method for computing the stationary distribution. Options:
- "unweighted": Returns the proportion of non-zero entries (only works for unweighted matrices).
- otherwise: Computes the dominant eigenvector (may be buggy).
Returns
-------
ndarray or float
Stationary distribution as a vector (if using eigen method), or a scalar proportion (if unweighted).
"""
if method=="unweighted": # only works for unweighted matrices!
return sum(t>0)/float(sum(sum(t>0)))
else: # buggy
eigen=np.linalg.eig(t)[1][:,0]
return np.real(eigen/sum(eigen))
# Unique nodes in random walk preserving order
# (aka fake participant data)
# http://www.peterbe.com/plog/uniqifiers-benchmark
[docs]
def censored(walk, td=None, seed=None):
"""
Apply censoring rules to a random walk to simulate participant data.
Filters repeated items from a walk according to emission and censoring faults.
Parameters
----------
walk : list of tuple
List of edges representing the walk.
td : object, optional
Object with attributes `emission_fault` and `censor_fault` (probabilities).
seed : int, optional
Seed for random number generator for reproducibility.
Returns
-------
list
List of nodes after applying censoring.
"""
def addItem(item):
seen[item] = 1
result.append(item)
nplocal = np.random.RandomState(seed)
seen = {}
result = []
for item in nodes_from_edges(walk):
if item in seen:
try:
if nplocal.rand() <= td.censor_fault:
addItem(item)
except:
continue
else:
try:
if nplocal.rand() <= td.emission_fault:
continue
else:
addItem(item)
except:
addItem(item)
return result
# first hitting times for each node
# TODO: Doesn't work with faulty censoring!!!
[docs]
def firstHits(walk):
"""
Compute first hitting times for each node in a censored walk.
For each unique node in a censored walk, finds the index of its first occurrence
in the original walk's edge list.
Parameters
----------
walk : list of int
List of nodes visited in a walk.
Returns
-------
list of tuple
List of (node, index) pairs representing the first time each node is visited.
"""
firsthit=[]
path=edges_from_nodes(walk)
for i in censored(walk):
firsthit.append(path.index(i))
return list(zip(censored(walk),firsthit))