Source code for snafu.core

from . import *
from functools import reduce
from .pci import _pci_lowerbound

# TODO: when doing same phase twice in a row, don't re-try same failures
    # (pass dict of failures, don't try if numchanges==0)
# TODO: Implement GOTM/ECN from Goni et al. 2011


#jump probability

import numpy as np
import math
from typing import Tuple

[docs] def estimateJumpProbability( group_network, data_model, fluency_list, return_ll: bool = False, ) -> Tuple[float, float]: """ Grid-search jump in [0.01, 0.99] to maximize likelihood for ONE fluency list. Always returns a (best_jump, best_log_likelihood) tuple. If return_ll is False, best_log_likelihood will be np.nan and nothing is printed. Parameters ---------- group_network : np.ndarray (N x N) Adjacency matrix for the known network. data_model : DataModel SNAFU data model. Only the `.jump` field is varied here. fluency_list : list[int] Single fluency list as item indices aligned to group_network. Returns ------- float Best jump probability (0.01..0.99). """ A = np.asarray(group_network, dtype=float) if A.ndim != 2 or A.shape[0] != A.shape[1]: raise ValueError("group_network must be a square adjacency matrix.") if not fluency_list: raise ValueError("fluency_list must be non-empty.") original_jump = getattr(data_model, "jump", 0.0) best_jump = None best_ll = -np.inf try: for i in range(1, 100): candidate = i / 100.0 data_model.jump = candidate ll, _ = probX([fluency_list], A, data_model) if ll > best_ll: best_ll = ll best_jump = candidate finally: data_model.jump = original_jump if best_jump is None or not math.isfinite(best_ll): raise RuntimeError("Failed to find a valid jump over 0.01..0.99.") if return_ll: print(f"Best jump: {best_jump:.2f}, Log-likelihood: {best_ll:.4f}") return best_jump, best_ll else: return best_jump, np.nan
# alias for backwards compatibility def communitynetwork(*args, **kwargs): return conceptualNetwork(*args, **kwargs) # alias for backwards compatibility
[docs] def priorToGraph(*args, **kwargs): """ References priorToNetwork """ return priorToNetwork(*args, **kwargs)
# mix U-INVITE with random jumping model
[docs] def addJumps(probs, td, numnodes=None, statdist=None, Xs=None): """ Adjusts transition probabilities by incorporating jump behavior. Depending on jump type ('uniform' or 'stationary'), modifies the transition probabilities to allow for random jumps in the network. Parameters ---------- probs : list of list Original transition probabilities td : DataModel Data model containing the jump type and jump probability. numnodes : int, optional Number of nodes (required for uniform jump). statdist : array-like, optional Stationary distribution across nodes (required for stationary jump). Xs : list, optional List of token sequences (used with stationary jump). Returns ------- list of list Jump-adjusted transition probabilities. """ if (td.jumptype == "uniform") and (numnodes == None): raise ValueError("Must specify 'numnodes' when jumptype is uniform [addJumps]") if (td.jumptype == "stationary") and (np.any(statdist == None) or (Xs == None)): raise ValueError("Must specify 'statdist' and 'Xs' when jumptype is stationary [addJumps]") if td.jumptype == "uniform": jumpprob = float(td.jump)/numnodes # uniform jumping for l in range(len(probs)): # loop through all lists (l) for inum, i in enumerate(probs[l][1:]): # loop through all items (i) excluding first (don't jump to item 1) if td.jumptype=="stationary": jumpprob = statdist[Xs[l][inum+1]] # stationary probability jumping if np.isnan(jumpprob): # if node is disconnected, jumpprob is nan jumpprob = 0.0 probs[l][inum+1] = jumpprob + (1-td.jump)*i # else normalize existing probability and add jumping probability return probs
# mix U-INVITE with priming model # code is confusing...
[docs] def adjustPriming(probs, td, Xs): """ Modifies transition probabilities to account for priming across lists. If an item appears at the end of a list and is followed by the same transition in the next list, it is treated as a primed transition and boosted by a configurable weight. Parameters ---------- probs : list of list Transition probability matrix for each list. td : DataModel Contains the priming parameter. Xs : list of list Sequences of node indices (one per participant/list). Returns ------- list of list Transition probabilities adjusted for priming. """ for xnum, x in enumerate(Xs[1:]): # check all items starting with 2nd list for inum, i in enumerate(x[:-1]): # except last item if i in Xs[xnum][:-1]: # is item in previous list? if so, prime next item # follow prime with calc_prob_adjacent td.priming, follow RW with calc_prob_adjacent (1-td.priming) idx=Xs[xnum].index(i) # index of item in previous list if Xs[xnum][idx+1]==Xs[xnum+1][inum+1]: probs[xnum+1][inum+1] = (probs[xnum+1][inum+1] * (1-td.priming)) + td.priming else: probs[xnum+1][inum+1] = (probs[xnum+1][inum+1] * (1-td.priming)) return probs
# Chan, Butters, Paulsen, Salmon, Swenson, & Maloney (1993) jcogneuro (p. 256) + pathfinder (PF; Schvaneveldt) # + Chan, Butters, Salmon, Johnson, Paulsen, & Swenson (1995) neuropsychology # Returns only PF(q, r) = PF(n-1, inf) = minimum spanning tree (sparsest possible graph) # other parameterizations of PF(q, r) not implemented
[docs] def pathfinder(Xs, numnodes=None, valid=False, td=None): """ Constructs a Pathfinder network (minimal spanning tree) based on item co-occurrences. The function builds a weighted graph from co-occurrence distances and then reduces it to a minimum spanning tree using the Pathfinder MST algorithm. Parameters ---------- Xs : list of list Sequence of observed node transitions. numnodes : int, optional Number of unique nodes. valid : bool, optional If True, ensures that the resulting graph can generate data using censored RW. td : DataModel, optional Used when `valid=True`. Returns ------- ndarray Binary adjacency matrix of the Pathfinder network. """ if numnodes == None: numnodes = len(set(flatten_list(Xs))) # From https://github.com/evanmiltenburg/dm-graphs def MST_pathfinder(G): """The MST-pathfinder algorithm (Quirin et al. 2008) reduces the graph to the unions of all minimal spanning trees.""" NG = nx.Graph() NG.add_nodes_from(list(range(numnodes))) edges = sorted( ((G[a][b]['weight'],a,b) for a,b in G.edges()), reverse=False) # smaller distances are more similar clusters = {node:i for i,node in enumerate(G.nodes())} while not edges == []: w1,a,b = edges[0] l = [] # Select edges to be considered this round: for w2,u,v in edges: if w1 == w2: l.append((u,v,w2)) else: break # Remaining edges are those not being considered this round: edges = edges[len(l):] # Only select those edges for which the items are not in the same cluster l = [(a,b,c) for a,b,c in l if not clusters[a]==clusters[b]] # Add these edges to the graph: NG.add_weighted_edges_from(l) # Merge the clusters: for a,b,w in l: cluster_1 = clusters[a] cluster_2 = clusters[b] clusters = {node:cluster_1 if i==cluster_2 else i for node,i in clusters.items()} return NG if valid and not td: raise ValueError('Need to pass DataModel when generating \'valid\' pathfinder()') N = float(len(Xs)) distance_mat = np.zeros((numnodes, numnodes)) for item1 in range(numnodes): for item2 in range(item1+1,numnodes): Tij = 0 dijk = 0 for x in Xs: if (item1 in x) and (item2 in x): Tij += 1 dijk = dijk + (abs(x.index(item1) - x.index(item2)) / float(len(x))) try: dij = dijk * (N / (Tij**2)) except: dij = 0.0 # added constraint for divide-by-zero... this will ensure that no edge will exist between i and j distance_mat[item1, item2] = dij distance_mat[item2, item1] = dij #graph = scipy.sparse.csgraph.minimum_spanning_tree(distance_mat) graph = nx.to_numpy_array(MST_pathfinder(nx.Graph(distance_mat))) # binarize and make graph symmetric (undirected)... some redundancy but it's cheap #graph = np.where(graph.todense(), 1, 0) graph = np.array(np.where(graph, 1, 0)) for rownum, row in enumerate(graph): for colnum, val in enumerate(row): if val==1: graph[rownum,colnum]=1 graph[colnum,rownum]=1 if valid: graph = makeValid(Xs, graph, td) #return np.array(graph).astype(int) return graph
# objective graph cost # returns the number of links that need to be added or removed to reach the true graph
[docs] def cost(graph,a, undirected=True): """ Computes the link cost between two graphs. Measures the number of differing edges between the estimated and target graph. Parameters ---------- graph : ndarray Estimated graph adjacency matrix. a : ndarray Target or comparison graph. undirected : bool, optional Whether the graphs are undirected. Default is True. Returns ------- float Number of differing edges (adjusted for direction). """ cost = sum(sum(np.array(abs(graph-a)))) if undirected: return cost/2.0 else: return cost
# graph=estimated graph, a=target/comparison graph
[docs] def costSDT(graph, a): """ Computes signal detection theory metrics for graph comparison. Returns hit, miss, false alarm, and correct rejection counts. Parameters ---------- graph : ndarray Estimated graph. a : ndarray Ground truth or comparison graph. Returns ------- list [hits, misses, false alarms, correct rejections] """ hit=0; miss=0; fa=0; cr=0 check=(graph==a) for rnum, r in enumerate(a): for cnum, c in enumerate(r[:rnum]): if check[rnum,cnum]==True: if a[rnum,cnum]==1: hit += 1 else: cr += 1 else: if a[rnum,cnum]==1: miss += 1 else: fa += 1 return [hit, miss, fa, cr]
[docs] def evalGraphPrior(a, prior, undirected=True): """ Evaluates the log-likelihood of a graph under a prior. Calculates the total log probability of a graph based on a learned or assumed edge prior. Parameters ---------- a : ndarray Graph adjacency matrix. prior : tuple A tuple of (priordict, item dictionary). undirected : bool, optional Whether the graph is undirected. Returns ------- float Total log-likelihood of the graph given the prior. """ probs = [] priordict = prior[0] items = prior[1] nullprob = priordict['DEFAULTPRIOR'] for inum, i in enumerate(a): for jnum, j in enumerate(i): if (inum > jnum) or ((undirected==False) and (inum != jnum)): if undirected: pair = np.sort((items[inum], items[jnum])) else: pair = (items[inum], items[jnum]) try: priorprob = priordict[pair[0]][pair[1]] if j==1: prob = priorprob elif j==0: prob = (1-priorprob) except: prob = nullprob # no information about edge probs.append(prob) probs = [np.log(prob) for prob in probs] # multiplication probably results in underflow... probs = sum(probs) return probs
[docs] def firstEdge(Xs, numnodes=None): """ Constructs a graph connecting only the first two nodes of each list. Parameters ---------- Xs : list of list Node sequences. numnodes : int, optional Number of unique nodes. Returns ------- ndarray Binary adjacency matrix. """ if numnodes == None: numnodes = len(set(flatten_list(Xs))) a=np.zeros((numnodes,numnodes)) for x in Xs: a[x[0],x[1]]=1 a[x[1],x[0]]=1 # symmetry a=np.array(a.astype(int)) return a
[docs] def fullyConnected(numnodes): """ Generate a fully connected undirected graph. Parameters ---------- numnodes : int Number of nodes in the graph. Returns ------- ndarray Binary adjacency matrix with all non-diagonal elements set to 1. """ a=np.ones((numnodes,numnodes)) for i in range(numnodes): a[i,i]=0.0 return a.astype(int)
# only returns adjacency matrix, not nx graph
[docs] def naiveRandomWalk(Xs, numnodes=None, directed=False): """ Build a graph by connecting items based on their sequential occurrence. Parameters ---------- Xs : list of list Lists of node sequences. numnodes : int, optional Total number of unique nodes. directed : bool, optional Whether to treat transitions as directed edges. Default is False. Returns ------- ndarray Binary adjacency matrix of the random walk graph. """ if numnodes == None: numnodes = len(set(flatten_list(Xs))) a=np.zeros((numnodes,numnodes)) for x in Xs: walk = edges_from_nodes(x) for i in set(walk): a[i[0],i[1]]=1 if directed==False: a[i[1],i[0]]=1 a=np.array(a.astype(int)) return a
[docs] def genGraphPrior(graphs, items, fitinfo=Fitinfo({}), mincount=1, undirected=True, returncounts=False): """ Generate edge prior probabilities from a collection of graphs. Uses a beta-binomial or zero-inflated beta-binomial model to calculate edge presence probability across subjects or samples. Parameters ---------- graphs : list of ndarray List of adjacency matrices (one per subject). items : list of dict Mapping of item index to item label (one per graph). fitinfo : Fitinfo, optional Configuration object controlling prior method. mincount : int, optional Minimum number of observations required to include an edge. undirected : bool, optional Whether the graph is undirected. returncounts : bool, optional If True, returns raw counts instead of probabilities. Returns ------- dict Nested dictionary of edge probabilities or counts. """ a_start = fitinfo.prior_a b_start = fitinfo.prior_b method = fitinfo.prior_method p = fitinfo.zibb_p priordict={} #def betabinomial(a,b): # return (b / (a + b)) def zeroinflatedbetabinomial(a,b,p): return (b / ((1-p)*a+b)) # tabulate number of times edge does or doesn't appear in all of the graphs when node pair is present for graphnum, graph in enumerate(graphs): # for each graph itemdict=items[graphnum] for inum, i in enumerate(graph): # rows of graph for jnum, j in enumerate(i): # columns of graph if (inum > jnum) or ((undirected==False) and (inum != jnum)): item1 = itemdict[inum] item2 = itemdict[jnum] if undirected: pair = np.sort((item1,item2)) else: pair = (item1,item2) if pair[0] not in list(priordict.keys()): priordict[pair[0]]={} if pair[1] not in list(priordict[pair[0]].keys()): priordict[pair[0]][pair[1]] = [a_start, b_start] if j==1: priordict[pair[0]][pair[1]][1] += 1.0 # increment b counts elif j==0: priordict[pair[0]][pair[1]][0] += 1.0 # increment a counts if not returncounts: for item1 in priordict: for item2 in priordict[item1]: a, b = priordict[item1][item2] # a=number of participants without link, b=number of participants with link if (a+b) >= (mincount + a_start + b_start): #if method == "zeroinflatedbetabinomial": priordict[item1][item2] = zeroinflatedbetabinomial(a,b,p) # zero-inflated beta-binomial #elif method == "betabinomial": # priordict[item1][item2] = betabinomial(a,b) # beta-binomial else: priordict[item1][item2] = 0.0 # if number of observations is less than mincount, make edge prior 0.0 if 'DEFAULTPRIOR' in list(priordict.keys()): raise ValueError('Sorry, you can\'t have a node called DEFAULTPRIOR. \ Sure, I should have coded this better, but I really didn\'t think this situation would ever occur.') else: #if method == "zeroinflatedbetabinomial": priordict['DEFAULTPRIOR'] = zeroinflatedbetabinomial(a_start, b_start, p) #elif method=="betabinomial": # priordict['DEFAULTPRIOR'] = betabinomial(a_start, b_start) return priordict
# generate starting graph for U-INVITE
[docs] def genStartGraph(Xs, numnodes, td, fitinfo): """ Generate an initial graph for U-INVITE fitting based on strategy. Strategy is determined by `fitinfo.startGraph`, which can include: - "cn_valid", "pf_valid", "rw", "fully_connected", etc. Parameters ---------- Xs : list of list Input fluency data. numnodes : int Number of unique nodes. td : DataModel Model configuration. fitinfo : Fitinfo Model fitting configuration. Returns ------- ndarray Starting adjacency matrix. """ sg = fitinfo.startGraph # cache for clarity if isinstance(sg, str) and sg == "cn_valid": graph = conceptualNetwork(Xs, numnodes, td=td, valid=True, fitinfo=fitinfo) elif isinstance(sg, str) and sg == "pf_valid": graph = pathfinder(Xs, numnodes, valid=True, td=td) elif isinstance(sg, str) and (sg == "rw" or sg == "nrw"): graph = naiveRandomWalk(Xs, numnodes) elif isinstance(sg, str) and sg == "fully_connected": graph = fullyConnected(numnodes) elif isinstance(sg, str) and sg == "empty_graph": graph = np.zeros((numnodes, numnodes)).astype(int) else: graph = np.copy(sg) # assume it's a numpy array graph return graph
# deprecated alias for backwards compatibility
[docs] def communitynetwork(*args, **kwargs): """ references conceptualNetwork """ return conceptualNetwork(*args, **kwargs)
# w = window size; two items appear within +/- w steps of each other (where w=1 means adjacent items) # f = filter frequency; if two items don't fall within the same window more than f times, then no edge is inferred # c = confidence interval; retain the edge if there is a <= c probability that two items occur within the same window n times by chance alone # valid (t/f) ensures that graph can produce data using censored RW.
[docs] def conceptualNetwork(Xs, numnodes=None, fitinfo=Fitinfo({}), valid=False, td=None): """ Generate a semantic network using co-occurrence windows. Builds a network by identifying how often item pairs co-occur within a specified window, and filters edges based on frequency and significance. w = window size; two items appear within +/- w steps of each other (where w=1 means adjacent items) f = filter frequency; if two items don't fall within the same window more than f times, then no edge is inferred c = confidence interval; retain the edge if there is a <= c probability that two items occur within the same window n times by chance alone valid (t/f) ensures that graph can produce data using censored RW. Parameters ---------- Xs : list of list List of item sequences. numnodes : int, optional Total number of nodes. fitinfo : Fitinfo, optional Parameters controlling window size, frequency threshold, etc. valid : bool, optional If True, ensures resulting graph can produce valid walk data. td : DataModel, optional Required if `valid=True`. Returns ------- ndarray Binary adjacency matrix. """ if numnodes == None: numnodes = len(set(flatten_list(Xs))) w = fitinfo.cn_windowsize f = fitinfo.cn_threshold c = fitinfo.cn_alpha if f<1: # if <1 treat as proportion of total lists; if >1 treat as absolute # of lists f=int(round(len(Xs)*f)) if valid and not td: raise ValueError('Need to pass Data when generating \'valid\' conceptualNetwork()') #if c<1: # from statsmodels.stats.proportion import proportion_confint as pci if w < 1: print("Error in conceptualNetwork(): w must be >= 1") return graph=np.zeros((numnodes, numnodes)).astype(int) # empty graph # frequency of co-occurrences within window (w) for x in Xs: # for each list cooccur_within_list = [] # only count one cooccurence per list (for binomial test) for pos in range(len(x)): # for each item in list for i in range(1, w+1): # for each window size if pos+i<len(x): if (x[pos], x[pos+i]) not in cooccur_within_list: graph[x[pos],x[pos+i]] += 1 graph[x[pos+i],x[pos]] += 1 cooccur_within_list.append((x[pos], x[pos+i])) cooccur_within_list.append((x[pos+i], x[pos])) # exclude edges with co-occurrences less than frequency (f) and binarize # but first save co-occurence frequencies cooccur = np.copy(graph) for i in range(len(graph)): for j in range(len(graph)): if graph[i, j] < f: graph[i, j] = 0 else: graph[i, j] = 1 # check if co-occurrences are due to chance if c<1: setXs=[list(set(x)) for x in Xs] # unique nodes in each list flatX=flatten_list(setXs) # flattened xfreq=[flatX.count(i) for i in range(numnodes)] # number of lists each item appears in (at least once) listofedges=list(zip(*np.nonzero(graph))) # list of edges in graph to check numlists=float(len(Xs)) meanlistlength=np.mean([len(x) for x in Xs]) # Goni et al. (2011), eq. 10 p_adj = (2.0/(meanlistlength*(meanlistlength-1))) * ((w*meanlistlength) - ((w*(w+1))/2.0)) for i,j in listofedges: p_linked = (xfreq[i]/numlists) * (xfreq[j]/numlists) * p_adj #ci=pci(cooccur[i,j],numlists,alpha=c,method="beta")[0] # lower bound of Clopper-Pearson binomial CI ci = _pci_lowerbound(cooccur[i,j], numlists, c) # lower bound of Clopper-Pearson binomial CI if (p_linked >= ci): # if co-occurrence could be due to chance, remove edge graph[i,j]=0 graph[j,i]=0 if valid: graph = makeValid(Xs, graph, td) # make sure there are no self-transitions -- is this necessary? for i in range(len(graph)): graph[i,i]=0.0 return graph
# enrich graph by finding modules and making them completely interconnected # using Generalized Topological Overlap Measure (GTOM) # right now only does n=2 (neighbors and neighbors of neighbors) # see Goni et al 2010 # TODO unfinished: so far, creates GTOM matrix but doesn't "enrich" network... how to determine # of clusters?
[docs] def gtom(graph): """ Compute the Generalized Topological Overlap Measure (GTOM). Measures how similar nodes are in terms of shared neighbors. Currently supports GTOM with n=2. Parameters ---------- graph : ndarray Adjacency matrix of a graph. Returns ------- ndarray Symmetric matrix of pairwise GTOM scores. """ # modified from uinvite(), copied for convenience (TODO consolidate by moving outside to its own function) # return list of neighbors of neighbors of i, that aren't themselves neighbors of i # i.e., an edge between i and any item in nn forms a triangle def neighborsofneighbors(i, nxg): nn=[] # neighbors of neighbors (nn) n=list(nx.all_neighbors(nxg,i)) for j in n: nn=nn+list(nx.all_neighbors(nxg,j)) nn=list(set(nn)) if i in nn: nn.remove(i) # remove self return nn nxgraph = nx.to_networkx_graph(graph) numnodes = nx.number_of_nodes(nxgraph) gtom_mat = np.zeros((numnodes,numnodes)) nn_dict = {} for i in range(numnodes): nn_dict[i] = neighborsofneighbors(i, nxgraph) for i in range(numnodes): for j in range(i+1,numnodes): i_neighbors = nn_dict[i] j_neighbors = nn_dict[j] min_neighbors = min(len(i_neighbors),len(j_neighbors)) len_overlap = len(set.intersection(set(i_neighbors),set(j_neighbors))) gtom_mat[i, j] = 1 - (float(len_overlap) / min_neighbors) gtom_mat[j, i] = gtom_mat[i, j] return gtom_mat
[docs] def hierarchicalUinvite(Xs, items, numnodes=None, td=DataModel({}), irts=False, fitinfo=Fitinfo({}), seed=None, debug=True): """ Fit subject-level U-INVITE models in a hierarchical manner. Iteratively fits subject graphs by conditioning on others' graphs and generates a prior from group estimates. Process continues until no more changes in graphs are detected. Parameters ---------- Xs : list of list List of subject fluency data (sequences). items : list of dict Mapping of item indices to labels. numnodes : int or list of int, optional Number of unique nodes. If list, must match number of subjects. td : DataModel, optional Configuration for U-INVITE model. irts : list or bool, optional IRTs per subject or False to ignore. fitinfo : Fitinfo, optional Model settings (starting graph, limits, etc.). seed : int, optional Random seed. debug : bool, optional If True, prints debug output. Returns ------- list of ndarray List of fitted subject-level graphs. dict Final group prior dictionary. """ if numnodes == None: numnodes = [len(set(flatten_list(x))) for x in Xs] nplocal=np.random.RandomState(seed) fitinfoSG = fitinfo.startGraph # fitinfo is mutable, need to revert at end of function... blah # create ids for all subjects subs=list(range(len(Xs))) graphs = [None] * len(subs) # cycle though participants exclude_subs=[] graphchanges=1 rnd=1 while graphchanges > 0: if debug: print("Round: ", rnd) graphchanges = 0 nplocal.shuffle(subs) for sub in [i for i in subs if i not in exclude_subs]: if debug: print("SS: ", sub) if graphs[sub] is None: fitinfo.startGraph = fitinfoSG # on first pass for subject, use default fitting method (e.g., NRW, goni, etc) else: fitinfo.startGraph = graphs[sub] # on subsequent passes, use ss graph from previous iteration # generate prior without participant's data, fit graph valid_graphs = [g for i, g in enumerate(graphs) if i != sub and g is not None] valid_items = [items[i] for i in range(len(items)) if i != sub and graphs[i] is not None] priordict = genGraphPrior(valid_graphs, valid_items, fitinfo=fitinfo) prior = (priordict, items[sub]) if isinstance(irts, list): uinvite_graph, bestval = uinvite(Xs[sub], td, numnodes[sub], fitinfo=fitinfo, prior=prior, irts=irts[sub]) else: uinvite_graph, bestval = uinvite(Xs[sub], td, numnodes[sub], fitinfo=fitinfo, prior=prior) if not np.array_equal(uinvite_graph, graphs[sub]): graphchanges += 1 graphs[sub] = uinvite_graph exclude_subs=[sub] # if a single change, fit everyone again (except the graph that was just fit) else: exclude_subs.append(sub) # if graph didn't change, don't fit them again until another change rnd += 1 ## generate group graph priordict = genGraphPrior(graphs, items, fitinfo=fitinfo) fitinfo.startGraph = fitinfoSG # reset fitinfo starting graph to default return graphs, priordict
[docs] def probXhierarchical(Xs, graphs, items, td, priordict=None, irts=Irts({})): """ Compute the total log-likelihood of a set of subject graphs under a hierarchical U-INVITE model. For each subject, this function calculates the likelihood of their observed data given their individual graph and optionally a shared group prior. Returns the sum of log-likelihoods across all subjects. Parameters ---------- Xs : list of list Observed sequences for each subject. graphs : list of ndarray Individual graph (adjacency matrix) for each subject. items : list of dict Mapping of node indices to item labels for each subject. td : DataModel Data model settings including jump, priming, and censoring behavior. priordict : dict, optional Group prior used to condition each subject's likelihood. If None, prior is not used. irts : Irts, optional Inter-response time data, one list per subject. Default is empty. Returns ------- float Sum of log-likelihoods across all subjects. """ lls=[] for sub in range(len(Xs)): if priordict: prior = (priordict, items[sub]) else: prior=None best_ll, probmat = probX(Xs[sub], graphs[sub], td, irts=irts, prior=prior) # LL of graph lls.append(best_ll) ll=sum(lls) return ll
# construct graph using method using item correlation matrix and planar maximally filtered graph (PMFG) # see Borodkin, Kenett, Faust, & Mashal (2016) and Kenett, Kenett, Ben-Jacob, & Faust (2011) # does not work well for small number of lists! many NaN correlations + when two correlations are equal, ordering is arbitrary
[docs] def correlationBasedNetwork(Xs, numnodes=None, minlists=0, valid=False, td=None): """ Generate a semantic network using item co-occurrence correlations. Constructs a correlation matrix from binary item-by-list occurrence and builds a Planar Maximally Filtered Graph (PMFG) by retaining only non-redundant edges while maintaining planarity. Parameters ---------- Xs : list of list List of observed item sequences. numnodes : int, optional Number of unique nodes. minlists : int, optional Minimum number of lists an item must appear in to be included. valid : bool, optional If True, ensures the graph is usable with censored random walk. td : DataModel, optional Required if `valid=True`. Returns ------- ndarray Adjacency matrix of the PMFG network. """ import networkx as nx if numnodes == None: numnodes = len(set(flatten_list(Xs))) # construct matrix of list x item where each cell indicates whether that item is in that list list_by_item = np.zeros((numnodes,len(Xs))) for node in range(numnodes): for x in range(len(Xs)): if node in Xs[x]: list_by_item[node,x]=1.0 # find pearsonr correlation for all item pairs item_by_item = {} for item1 in range(numnodes): for item2 in range(item1+1,numnodes): if (sum(list_by_item[item1]) <= minlists) or (sum(list_by_item[item2]) <= minlists): # if a word only appears in <= minlists lists, exclude from correlation list (kenett personal communication, to avoid spurious correlations) item_by_item[(item1, item2)] = np.nan else: #item_by_item[(item1, item2)] = scipy.stats.pearsonr(list_by_item[item1],list_by_item[item2])[0] item_by_item[(item1, item2)] = pearsonr(list_by_item[item1],list_by_item[item2]) # Fixed NaNs: corr_vals = sorted( item_by_item, key=lambda k: (np.isnan(item_by_item[k]), -item_by_item[k] if not np.isnan(item_by_item[k]) else 0, k) ) #corr_vals = sorted(item_by_item, key=item_by_item.get)[::-1] # keys in correlation dictionary sorted by value (high to low, including NaN first) g = nx.Graph() g.add_nodes_from(range(numnodes)) for pair in corr_vals: if not np.isnan(item_by_item[pair]): g.add_edge(*pair) is_planar, _ = nx.check_planarity(g) if not is_planar: g.remove_edge(*pair) a = nx.to_numpy_array(g).astype(int) if valid: a = makeValid(Xs, a, td) return a
[docs] def makeValid(Xs, graph, td, seed=None): """ Modify the graph to ensure all transitions in `Xs` are reachable. If a transition results in zero probability, adds necessary edges to make the sequence valid under the model. Parameters ---------- Xs : list of list Observed sequences. graph : ndarray Initial adjacency matrix. td : DataModel Configuration object. seed : int, optional Random seed for reproducibility. Returns ------- ndarray Modified, valid adjacency matrix. """ # add direct edges when transition is impossible check=probX(Xs, graph, td) while check[0] == -np.inf: if isinstance(check[1],tuple): graph[check[1][0], check[1][1]] = 1 graph[check[1][1], check[1][0]] = 1 # i think these 2 lines are no longer necessary #elif check[1] == "prior": # raise ValueError('Starting graph has prior probability of 0.0') elif isinstance(check[1],int): # when list contains one item and node is unreachable, connect to random node nplocal = np.random.RandomState(seed) randnode = nplocal.choice(range(len(graph))) graph[check[1], randnode] = 1 graph[randnode, check[1]] = 1 else: raise ValueError('Unexpected error from makeValid()') check=probX(Xs, graph, td) return graph
# converts priordict to graph if probability of edge is greater than cutoff value
[docs] def priorToNetwork(priordict, items, cutoff=0.5, undirected=True): """ Convert a prior probability dictionary into a binary graph. Adds an edge between nodes if the prior probability exceeds a threshold. Parameters ---------- priordict : dict Nested dictionary of prior edge probabilities. items : dict Mapping of node index to item label. cutoff : float, optional Threshold for edge inclusion. Default is 0.5. undirected : bool, optional Whether to mirror edges for symmetry. Default is True. Returns ------- ndarray Binary adjacency matrix. """ numnodes = len(items) a = np.zeros((numnodes, numnodes)) for item1 in list(priordict.keys()): if item1 != 'DEFAULTPRIOR': for item2 in priordict[item1]: if priordict[item1][item2] > cutoff: item1_idx = list(items.keys())[list(items.values()).index(item1)] # syntax is a little convoluted in case dictionary keys are not in sequential order item2_idx = list(items.keys())[list(items.values()).index(item2)] a[item1_idx, item2_idx] = 1.0 if undirected: a[item2_idx, item1_idx] = 1.0 return a
# probability of observing Xs, including irts and prior #@profile #@nogc
[docs] def probX(Xs, a, td, irts=Irts({}), prior=None, origmat=None, changed=[]): """ Compute the likelihood of observed sequences under a transition model. Calculates the log-likelihood of each sequence under the given adjacency matrix using the U-INVITE framework. Supports censoring, priming, jumping, and IRT models. Parameters ---------- Xs : list of list Sequences of node transitions. a : ndarray Adjacency matrix representing the transition structure. td : DataModel Configuration object (jump, priming, censoring). irts : Irts, optional Inter-response time model. Default is empty. prior : tuple, optional (priordict, items) for computing graph priors. origmat : list of list, optional Previously computed probability matrix (for delta updates). changed : list, optional List of nodes that changed since `origmat`. Returns ------- float Log-likelihood of the data. list of list Transition probabilities for each sequence. """ try: numnodes=len(a) except TypeError: raise Exception(a) reg=(1+1e-10) # nuisance parameter to prevent errors; can also use pinv instead of inv, but that's much slower identmat=np.identity(numnodes) * reg # pre-compute for tiny speed-up (only for non-IRT) probs=[] # generate transition matrix (from: column, to: row) from link matrix t=a/sum(a.astype(float)) t=np.nan_to_num(t) # jumping/priming models can have nan in matrix, need to change to 0 if (td.jumptype=="stationary") or (td.start_node=="stationary"): statdist=stationary(t) # U-INVITE probability excluding jumps, prior, and priming adjustments -- those come later for xnum, x in enumerate(Xs): x2=np.array(x) t2=t[x2[:,None],x2] # re-arrange transition matrix to be in list order prob=[] if td.start_node=="stationary": prob.append(statdist[x[0]]) # probability of X_1 elif td.start_node=="uniform": prob.append(1.0/numnodes) # if impossible starting point, return immediately if (prob[-1]==0.0): try: return -np.inf, (x[0], x[1]) except: return -np.inf, x[0] if (len(changed) > 0) and isinstance(origmat,list): # if updating prob. matrix based on specific link changes update=0 # reset for each list # flag if list contains perseverations if len(x) == len(set(x)): list_has_perseverations = False else: list_has_perseverations = True for curpos in range(1,len(x)): if (len(changed) > 0) and isinstance(origmat,list): if update==0: # first check if probability needs to be updated if (Xs[xnum][curpos-1] in changed): # (only AFTER first changed node has been reached) update=1 else: # if not, take probability from old matrix prob.append(origmat[xnum][curpos]) continue if list_has_perseverations: # a bit slower because matrix is being copied x2=np.array([i for i,j in enumerate(x) if (j not in x[:i]) and (i < curpos)]) # column ids for transient states excluding perseverations Q=t2[x2[:,None],x2] # excludes perseverations. could be sped if only performed when Q contains perseverations # as opposed to being done for every transition if a perseveration is in the list else: Q=t2[:curpos,:curpos] # old way when data does not include perseverations # td.censor_fault is necessary to model perservations in the data if td.censor_fault > 0.0: Q=np.multiply(Q, 1.0-td.censor_fault) if len(irts.data) > 0: # use this method only when passing IRTs numcols=len(Q) flist=[] newQ=np.zeros(numcols) # init to Q^0, for when r=1 newQ[curpos-1]=1.0 # (using only one: row for efficiency) irt=irts.data[xnum][curpos-1] # precompute for small speedup if irts.irttype=="gamma": logbeta=np.log(irts.gamma_beta) logirt=np.log(irt) # normalize irt probabilities to avoid irt weighting if irts.irttype=="gamma": # r=alpha. probability of observing irt at r steps irtdist=[r*logbeta-math.lgamma(r)+(r-1)*logirt-irts.gamma_beta*irt for r in range(1,irts.rcutoff)] if irts.irttype=="exgauss": irtdist=[np.log(irts.exgauss_lambda/2.0)+(irts.exgauss_lambda/2.0)*(2.0*r+irts.exgauss_lambda*(irts.exgauss_sigma**2)-2*irt)+np.log(math.erfc((r+irts.exgauss_lambda*(irts.exgauss_sigma**2)-irt)/(np.sqrt(2)*irts.exgauss_sigma))) for r in range(1,irts.rcutoff)] for r in range(1,irts.rcutoff): innersum=0 for k in range(numcols): num1=newQ[k] # probability of being at node k in r-1 steps num2=t2[curpos,k] # probability transitioning from k to absorbing node innersum=innersum+(num1*num2) # compute irt probability given r steps log_dist = irtdist[r-1] / sum(irtdist) if innersum > 0: # sometimes it's not possible to get to the target node in r steps flist.append(log_dist + np.log(innersum)) newQ=np.inner(newQ,Q) # raise power by one f=sum([np.e**i for i in flist]) prob.append(f) # probability of x_(t-1) to X_t else: # if no IRTs, use standard U-INVITE I=identmat[:len(Q),:len(Q)] # novel items are emitted with probability 1 when encountered. perseverations are emitted with probability td.censor_fault when encountered. if list_has_perseverations: # if list has perseverations. could speed up by only doing this step when a perseveration has been encountered x1=np.array([curpos]) # absorbing node #x2=np.array([i for i,j in enumerate(x) if (j not in x[:i]) and (i < curpos)]) # column ids for transient states excluding perseverations x2=np.array([i for i,j in enumerate(x) if (j not in x[i+1:curpos]) and (i < curpos)]) # column ids for transient states excluding perseverations R=t2[x1[:,None],x2][0] # why is [0] necessary here but not in the else case? if Xs[xnum][curpos] in Xs[xnum][:curpos]: # if absorbing state has appeared in list before... R=np.multiply(R,td.censor_fault) else: # if not a perseveration R=t2[curpos,:curpos] # old way ### test (when censor_fault=0) to see if absorbing distribution sums to 1... something is broken #total = [] #x2=np.array([j for i,j in enumerate(x) if (i < curpos)]) # column ids for transient states excluding perseverations #N=np.linalg.solve(I-Q,I[-1]) #for i in range(len(t)): # R=t[np.array([i])[:,None],x2] # B=np.dot(R,N) # total.append(B[0]) # if B[0] > 1.0: # print("NONONO") #print("total ", total) #R=t2[curpos,:curpos] # old way to reset ### N=np.linalg.solve(I-Q,I[-1]) B=np.dot(R,N) if np.isnan(B): B=0.0 prob.append(B) # alternative/original using matrix inverse #R=t2[curpos:,:curpos] #N=inv(I-Q) #B=np.dot(R,N) #prob.append(B[0,curpos-1]) # if there's an impossible transition and no jumping/priming, return immediately if (prob[-1]==0.0) and (td.jump == 0.0) and (td.priming == 0.0): return -np.inf, (x[curpos-1], x[curpos]) probs.append(prob) uinvite_probs = copy.deepcopy(probs) # store only u-invite transition probabilities (the computationally hard stuff) to avoid recomputing # adjust for jumping probability if td.jump > 0.0: if td.jumptype=="uniform": probs=addJumps(probs, td, numnodes=numnodes) elif td.jumptype=="stationary": probs=addJumps(probs, td, statdist=statdist, Xs=Xs) if (td.priming > 0.0): probs=adjustPriming(probs, td, Xs) # check for impossible transitions after priming and jumping for xnum, x in enumerate(probs): for inum, i in enumerate(x): if (i==0.0) and (inum==0): return -np.inf, (Xs[xnum][inum], Xs[xnum][inum+1]) # link to next item when first item is unreachable elif (i==0.0) and (inum > 0): return -np.inf, (Xs[xnum][inum-1], Xs[xnum][inum]) # link to previous item otherwise try: ll=sum([sum([np.log(j) for j in probs[i]]) for i in range(len(probs))]) except: ll=-np.inf # include prior? if prior: priorlogprob = evalGraphPrior(a, prior) ll = ll + priorlogprob return ll, uinvite_probs
#@profile
[docs] def uinvite(Xs, td=DataModel({}), numnodes=None, irts=Irts({}), fitinfo=Fitinfo({}), prior=None, debug=True, seed=None): """ Fit a graph to fluency data using the U-INVITE algorithm. Iteratively searches over graph space to find a structure that maximizes likelihood under a random walk model, using triangle-based heuristics and pruning. Optionally adjusts for IRTs and perseverations. Parameters ---------- Xs : list of list Observed sequences of node transitions. td : DataModel, optional Model configuration (jump, priming, censoring). numnodes : int, optional Number of unique nodes in the network. irts : Irts, optional IRT model information. fitinfo : Fitinfo, optional Algorithm parameters including edge strategy and convergence criteria. prior : tuple, optional Prior probability dictionary and item mappings. debug : bool, optional If True, prints debug output. seed : int, optional Random seed. Returns ------- ndarray Fitted graph adjacency matrix. float Log-likelihood of the fitted graph. """ nplocal=np.random.RandomState(seed) if numnodes == None: numnodes = len(set(flatten_list(Xs))) # return list of neighbors of neighbors of i, that aren't themselves neighbors of i # i.e., an edge between i and any item in nn forms a triangle #@profile def neighborsofneighbors(i, nxg): nn=[] # neighbors of neighbors (nn) n=list(nx.all_neighbors(nxg,i)) for j in n: nn=nn+list(nx.all_neighbors(nxg,j)) nn=list(set(nn)) for k in n: # remove neighbors if k in nn: nn.remove(k) if i in nn: nn.remove(i) # remove self return nn # toggle links back, should be faster than making graph copy #@profile def swapEdges(graph,links): for link in links: graph[link[0],link[1]] = 1 - graph[link[0],link[1]] if not fitinfo.directed: graph[link[1],link[0]] = 1 - graph[link[1],link[0]] return graph #@timer #@profile def pivot(graph, vmin=1, vmaj=0, best_ll=None, probmat=None, limit=np.inf, method=""): numchanges=0 # number of changes in single pivot() call if (best_ll == None) or (np.any(probmat == None)): best_ll, probmat = probX(Xs,graph,td,irts=irts,prior=prior) # LL of best graph found nxg=nx.to_networkx_graph(graph) # generate dict where v[i] is a list of nodes where (i, v[i]) is an existing edge in the graph if (method=="prune") or (method==0): if debug: print("Pruning", str(vmaj) + "." + str(vmin), "... ",) # (len(edges)/2)-len(firstedges), "possible:", sys.stdout.flush() listofedges=np.where(graph==1) v=dict() for i in range(numnodes): v[i]=[] for i in zip(*listofedges): if ((i[0], i[1]) not in firstedges) and ((i[1], i[0]) not in firstedges): # don't flip first edges (FE)! if td.jump == 0.0: # unless jumping is allowed, untested 10/6/17 JCZ v[i[0]].append(i[1]) # generate dict where v[i] is a list of nodes where (i, v[i]) would form a new triangle if (method=="triangles") or (method==1): if debug: print("Adding triangles", str(vmaj) + "." + str(vmin), "... ",) # (len(edges)/2), "possible:", sys.stdout.flush() nn=dict() for i in range(len(graph)): nn[i]=neighborsofneighbors(i, nxg) v=nn # generate dict where v[i] is a list of nodes where (i, v[i]) is NOT an existing an edge and does NOT form a triangle if (method=="nonneighbors") or (method==2): # list of a node's non-neighbors (non-edges) that don't form triangles if debug: print("Adding other edges", str(vmaj) + "." + str(vmin), "... ",) sys.stdout.flush() nonneighbors=dict() for i in range(numnodes): nn=neighborsofneighbors(i, nxg) # non-neighbors that DON'T form triangles nonneighbors[i]=[j for j in range(numnodes) if j not in nx.all_neighbors(nxg,i) and j not in nn] nonneighbors[i].remove(i) # also remove self v=nonneighbors count=[0.0]*numnodes avg=[-np.inf]*numnodes finishednodes=0 loopcount=0 while (finishednodes < numnodes) and (loopcount < limit): loopcount += 1 # number of failures before giving up on this pahse maxval=max(avg) bestnodes=[i for i, j in enumerate(avg) if j == maxval] # most promising nodes based on avg logprob of edges with each node as vertex node1=nplocal.choice(bestnodes) if len(v[node1]) > 0: #node2=nplocal.choice(v[node1]) # old n2avg=[avg[i] for i in v[node1]] maxval=max(n2avg) bestnodes=[v[node1][i] for i, j in enumerate(n2avg) if j == maxval] node2=nplocal.choice(bestnodes) edge=(node1, node2) graph=swapEdges(graph,[edge]) graph_ll, newprobmat=probX(Xs,graph,td,irts=irts,prior=prior,origmat=probmat,changed=[node1,node2]) if best_ll >= graph_ll: graph=swapEdges(graph,[edge]) else: best_ll = graph_ll probmat = newprobmat numchanges += 1 loopcount = 0 # probX under all possible perseveration values JCZ 5/9/2018 if fitinfo.estimatePerseveration: old_censor = td.censor_fault best_param = old_censor for censor_param in [i/100.0 for i in range(101)]: td.censor_fault = censor_param graph_ll, newprobmat = probX(Xs,graph,td,irts=irts,prior=prior) # LL of starting graph if graph_ll > best_ll: best_ll = graph_ll probmat = newprobmat best_param = censor_param td.censor_fault = best_param if debug: print("censor_fault old:", old_censor, " censor_fault new: ", best_param) v[node1].remove(node2) # remove edge from possible choices if not fitinfo.directed: v[node2].remove(node1) # increment even if graph prob = -np.inf for implicit penalty count[node1] += 1 count[node2] += 1 if (graph_ll != -np.inf) and (fitinfo.followtype != "random"): if avg[node1] == -np.inf: avg[node1] = graph_ll else: # followtype == avg avg[node1] = avg[node1] * ((count[node1]-1)/count[node1]) + (1.0/count[node1]) * graph_ll if avg[node2] == -np.inf: avg[node2] = graph_ll else: # followtype == avg avg[node2] = avg[node2] * ((count[node2]-1)/count[node2]) + (1.0/count[node2]) * graph_ll else: # no edges on this node left to try! avg[node1]=-np.inf # so we don't try it again... finishednodes += 1 if debug: print(numchanges, "changes") return graph, best_ll, probmat, numchanges # return graph def phases(graph, best_ll, probmat): complete=[0,0,0] # marks which phases are complete vmaj=0 vmin=1 while sum(complete) < 3: phasenum=complete.index(0) if phasenum==0: limit=fitinfo.prune_limit if phasenum==1: limit=fitinfo.triangle_limit if phasenum==2: limit=fitinfo.other_limit if (phasenum==0) and (vmin==1): vmaj += 1 graph, best_ll, probmat, numchanges = pivot(graph, best_ll=best_ll, vmaj=vmaj, vmin=vmin, method=phasenum, limit=limit, probmat=probmat) if numchanges > 0: vmin += 1 else: if (vmin==1): complete[phasenum]=1 if (phasenum==0) and (vmin>1): complete=[1,0,0] if (phasenum==1) and (vmin>1): complete=[0,1,0] if (phasenum==2) and (vmin>1): complete=[0,0,1] vmin=1 return graph, best_ll # check if data has perseverations if not [len(set(i)) for i in Xs]==[len(i) for i in Xs]: if (td.censor_fault == 0.0) and (not fitinfo.estimatePerseveration): raise Exception('''Your data contains perseverations, but \ censor_fault = 0.0; Set to some value 0.0 < x <= 1.0 or set estimatePerseveration to True''') try: firstedges=[(x[0], x[1]) for x in Xs] except: firstedges=[] # find a good starting graph graph = genStartGraph(Xs, numnodes, td, fitinfo) # find best starting perseveration parameter if applicable JCZ 5/9/2018 if fitinfo.estimatePerseveration: best_ll = -np.inf best_param = 0.0 for censor_param in [i/100.0 for i in range(101)]: td.censor_fault = censor_param graph_ll, probmat = probX(Xs,graph,td,irts=irts,prior=prior) # LL of starting graph if graph_ll > best_ll: best_ll = graph_ll best_param = censor_param td.censor_fault = best_param best_ll, probmat = probX(Xs,graph,td,irts=irts,prior=prior) # LL of starting graph graph, best_ll = phases(graph, best_ll, probmat) return graph, best_ll