Source code for snafu.generate_lists

from . import *


# return simulated data on graph g
# also return number of steps between first hits (to use for IRTs)
[docs] def gen_lists(g, td, seed=None): """ Generate simulated lists by running censored random walks on a graph. This function simulates category fluency-style data by performing repeated random walks on a given graph `g`, using parameters defined in `td`. It returns the visited item lists, unused step info (placeholder), and a flag indicating whether the output used all nodes in the graph. Parameters ---------- g : networkx.Graph The graph on which the random walks are performed. td : DataModel Object containing simulation parameters such as list length, jump probability, priming, and censoring behavior. seed : int, optional Random seed for reproducibility. Default is None. Returns ------- list of list Simulated fluency data (visited node sequences) for each run. list Placeholder for step counts between items (currently unused). int A binary flag indicating if not all nodes were visited (1 = graph altered). """ Xs=[] steps=[] priming_vector=[] for xnum in range(td.numx): if seed == None: seedy = None else: seedy = seed + xnum rwalk=random_walk(g, td, priming_vector=priming_vector, seed=seedy) x=censored(rwalk, td) # fh=list(zip(*firstHits(rwalk)))[1] # step=[fh[i]-fh[i-1] for i in range(1,len(fh))] Xs.append(x) if td.priming > 0.0: priming_vector=x[:] # steps.append(step) td.priming_vector = [] # reset mutable priming vector between participants; JCZ added 9/29, untested alter_graph_size=0 if td.trim != 1.0: numnodes=nx.number_of_nodes(g) for i in range(numnodes): if i not in set(flatten_list(Xs)): alter_graph_size=1 return Xs, steps, alter_graph_size
# given an adjacency matrix, take a random walk that hits every node; returns a list of tuples
[docs] def random_walk(g, td, priming_vector=[], seed=None): """ Perform a censored random walk on a graph, optionally with priming and jumping. This function simulates a walk on graph `g`, incorporating jump probabilities, censoring behavior, and optional priming effects to simulate memory recall or fluency search strategies. Parameters ---------- g : networkx.Graph Graph on which the random walk is performed. td : DataModel Object defining walk behavior: jump probability, jump type, max steps, starting node policy, censoring, and priming. priming_vector : list, optional List of previously visited nodes to bias the walk. Default is empty. seed : int, optional Random seed for reproducibility. Default is None. Returns ------- list of tuple A list of (from_node, to_node) edges representing the random walk path. """ import scipy.stats nplocal=np.random.RandomState(seed) def jump(): if td.jumptype=="stationary": second=statdist.rvs(random_state=seed) # jump based on statdist elif td.jumptype=="uniform": second=nplocal.choice(nx.nodes(g)) # jump uniformly return second if (td.start_node=="stationary") or (td.jumptype=="stationary"): a=nx.to_numpy_array(g) t=a/sum(a).astype(float) statdist=stationary(t) statdist=scipy.stats.rv_discrete(values=(list(range(len(t))),statdist)) if td.start_node=="stationary": start=statdist.rvs(random_state=seed) # choose starting point from stationary distribution elif td.start_node=="uniform": start=nplocal.choice(nx.nodes(g)) # choose starting point uniformly elif td.start_node[0]=="specific": start=td.start_node[1] walk=[] unused_nodes=set(nx.nodes(g)) unused_nodes.remove(start) first=start numnodes=nx.number_of_nodes(g) if td.trim <= 1: numtrim=int(round(numnodes*td.trim)) # if <=1, paramater is proportion of a list else: numtrim=td.trim # else, parameter is length of a list num_unused = numnodes - numtrim censoredcount=0 # keep track of censored nodes and jump after td.jumponcensored censored nodes numsteps = 0 while (len(unused_nodes) > num_unused) and ((td.maxsteps == None) or (numsteps < td.maxsteps)): # covers td.trim nodes-- list could be longer if it has perseverations # jump after n censored nodes or with random probability (depending on parameters) if (censoredcount == td.jumponcensored) or (nplocal.random_sample() < td.jump): second=jump() else: # no jumping! second=nplocal.choice([x for x in nx.all_neighbors(g,first)]) # follow random edge (actual random walk!) if (td.priming > 0.0) and (len(priming_vector) > 0): if (first in priming_vector[:-1]) & (nplocal.random_sample() < td.priming): idx=priming_vector.index(first) second=priming_vector[idx+1] # overwrite RW... kinda janky walk.append((first,second)) numsteps += 1 if second in unused_nodes: unused_nodes.remove(second) censoredcount=0 else: censoredcount += 1 first=second return walk