Source code for src.strand.bond_graph

from src.strand import bond
from src.util import util
import queue
import copy
from collections import defaultdict


[docs]class BondGraph: """ Data structure of the bond graph, undirected graph with loops """ V = [] '''vertices''' E = [] '''edges''' adj = [] '''adjacency list''' hidden = [] '''hidden domains of strands''' loop = [] '''loops in the graph''' species = [] '''species of the graph''' speciesnum = 0 '''species number''' color = [] '''inherit from StrandGraph''' def __init__(self, V, color, E): self.V = V self.color = color self.E = E self.adj = [[] for _ in self.V] self.hidden = [] self.loop = [] self.species = []
[docs] def create_bond(self, v1, v2, d1, d2): """ create bond :param v1: the strand to be examined :param v2: the target strand to check :param d1: the domain position on the examining strand of the bond :param d2: the domain position on the target strand of the bond :return: """ flag = 0 if len(self.adj[v1]) != 0: for i in self.adj[v1]: if v2 == i.node2: flag = 1 break # if there are bonds between two strands if flag == 1: i.appenddom(d1, d2) # new bond else: self.adj[v1].append(bond.Bond(v1, v2, [d1], [d2])) # new bond else: self.adj[v1].append(bond.Bond(v1, v2, [d1], [d2]))
[docs] def add_edges(self, v1, v2, d1, d2): """ add edges to the bond graph :param v1: strand 1 :param v2: strand 2 :param d1: domain 1, corresponds to strand 1 :param d2: domain 2, corresponds to strand 2 """ self.create_bond(v1, v2, d1, d2) self.create_bond(v2, v1, d2, d1)
#self.map_colors({(v1, d1), (v2, d2)})
[docs] def check_in_loop(self, startnode, endnode): """ check if the two vertices are in the same loop :param startnode: start vertex :param endnode: end vertex :return: True if in the same loop, False otherwise """ for i in self.loop: if startnode in i and endnode in i: return True return False
[docs] def check_junction(self, startnode, endnode, toehold): """ check if the bond between startnode and endnode is a part of junction :param startnode: :param endnode: :return: """ toeholdcnt = 0 for i in self.loop: if startnode in i and endnode in i: for j in range(len(i)): if j == 0: pre = len(i) - 1 suc = j + 1 elif j == len(i) - 1: pre = j - 1 suc = 0 else: pre = j - 1 suc = j + 1 n = [] toeholddom = {} for b in self.adj[i[j]]: if b.node2 == i[pre] or b.node2 == i[suc]: for k in range(len(b.dom)): if toehold[frozenset({(b.node1, b.dom[k]), (b.node2, b.dom2[k])})]: toeholddom[b.dom[k]] = True else: toeholddom[b.dom[k]] = False n.append(b.dom) cont = util.check_continuity(n[0], n[1]) if cont is None: return False else: if toeholddom[cont[0]]: toeholdcnt += 1 if toeholddom[cont[1]]: toeholdcnt += 1 if toeholdcnt/2 > 1: return False return True return False
[docs] def get_bond(self, startstrand, endstrand): """ get the bond between the two vertices :param startstrand: start strand :param endstrand: end strand :return: get the bond between two strands """ for i in self.adj[startstrand]: if i.node2 == endstrand: return i
[docs] def search_path(self, startstrand, stack, endstrand, edge): """ search a path from the start node to the end node in the spanning tree """ stack.append(startstrand) if endstrand == stack[len(stack) - 1]: self.loop.append(stack) return True else: for bond in self.adj[startstrand]: if bond in edge and bond.node2 not in stack: if self.search_path(bond.node2, stack, endstrand, edge): return True stack.pop() return False
[docs] def spanning(self, mark, edge, startstrand, times, depth): """ building a spanning tree of the bond graph :param mark: list of boolean numbers of vertices to show if they have been visited :param edge: edges of a spanning tree of the bond graph :param startstrand: starting strand to find spanning tree :param times: list of integers of vertices indicating the times of this function is called :param depth: current cycle number of calling this function """ for bond in self.adj[startstrand]: if mark[bond.node2] is False: edge.append(bond) for b in self.adj[bond.node2]: if b.node2 == bond.node1: edge.append(b) continue mark[bond.node2] = True times[bond.node2] = depth self.spanning(mark, edge, bond.node2, times, depth)
[docs] def call_spanning(self, mark, edge, times, depth): """ find a spanning forest of the bond graph and store the loops existing in it :param mark: list of boolean numbers of vertices to show if they have been visited :param edge: edges of a spanning tree of the bond graph :param times: list of integers of vertices indicating the times of spanning() is called :param depth: current cycle number of calling spanning() """ for i in range(0, len(self.V)): if not mark[i]: mark[i] = True times[i] = depth self.spanning(mark, edge, i, times, depth) for j in range(0, len(self.V)): if mark[j] and times[j] == depth: for bond in self.adj[j]: if mark[bond.node2]: if bond not in edge: # there is a loop # no need to store the loop in cycle if the loop is already found if self.check_in_loop(j, bond.node2): continue self.search_path(j, [], bond.node2, edge) depth += 1 self.species = times self.speciesnum = depth
[docs] def find_loops(self): """ find loops in the bond graph """ mark = [False for _ in self.V] edge = [] times = [0 for _ in self.V] # find loops in bond graph and store them in cycle[] self.call_spanning(mark, edge, times, 0)
[docs] def store_hidden(self): """ store all the hidden domains in hidden[] """ for i in self.adj: # the case of harpin loop and the case multiple bonds exist between two strands for b in i: if b.node2 == b.node1 or len(b.dom) > 1: minn = min(b.dom) maxn = max(b.dom) for j in range(minn + 1, maxn): self.hidden.append((b.node1, j)) for j in self.loop: loopedge = [] for k in range(0, len(j)): if k == len(j) - 1: loopedge.append(self.get_bond(j[k], j[0])) loopedge.append(self.get_bond(j[k], j[k - 1])) elif k == 0: loopedge.append(self.get_bond(j[k], j[k + 1])) loopedge.append(self.get_bond(j[k], j[len(j) - 1])) else: loopedge.append(self.get_bond(j[k], j[k + 1])) loopedge.append(self.get_bond(j[k], j[k - 1])) for p in range(0, len(j)): minn = max(loopedge[p * 2].dom) maxn = max(loopedge[p * 2 + 1].dom) if minn > maxn: b = minn minn = maxn maxn = b for x in range(minn + 1, maxn): if (loopedge[p * 2].node1, x) not in self.hidden: self.hidden.append((loopedge[p * 2].node1, x))
[docs] def check_bonded(self, node): """ check if the node is bonded to another node :param node: a domain on a strand :return: the domain it binds to if True, otherwise None """ v, n = node for i in self.adj[v]: for j in range(0, len(i.dom)): if n == i.dom[j]: return i.node2, i.dom2[j] return None
[docs] def potential_junction(self, node1, node2): """ Check if node1 and node2 can be a part of junction :param node1: :param node2: :return: """ if node1[0] == node2[0]: if node1[1] + 1 == node2[1] or node1[1] - 1 == node2[1]: return True else: return False q = queue.Queue() visited = [False for _ in self.V] flag = False q.put((node1[0], node1[1], 0)) while not q.empty(): curstrand, curdomain, direction = q.get() visited[curstrand] = True if curstrand == node2[0]: if (node2[1] + 1 == curdomain and direction == 0) or (node2[1] - 1 == curdomain and direction == 1): flag = True break for i in self.adj[curstrand]: if not visited[i.node2]: for j in range(len(i.dom)): if i.dom[j] == curdomain + 1: q.put((i.node2, i.dom2[j], 1)) elif i.dom[j] == curdomain - 1: q.put((i.node2, i.dom2[j], 0)) if not flag: return False return True
[docs] def check_following_migration(self, edges, p=0): """ UNDER TESTING. :param edges: :return: """ e = copy.copy(edges) visited = [False for _ in e] miggroup = [] cnt = -1 for i in range(0, len(e)): if visited[i]: continue e[i] = list(e[i]) e[i][p] = list(e[i][p]) t1 = sorted(e[i][p], key=lambda tup: tup[0]) if not visited[i]: visited[i] = True miggroup.append([i]) cnt += 1 for j in range(0, len(e)): if j != i and not visited[j]: flag = False e[j] = list(e[j]) e[j][p] = list(e[j][p]) t2 = sorted(e[j][p], key=lambda tup: tup[0]) for num in range(len(miggroup[cnt])): if e[miggroup[cnt][num]][1] == e[j][1] or set(e[miggroup[cnt][num]][0]) & set(e[j][0]) != set(): flag = True break if flag: continue for num in range(0, len(miggroup[cnt])): t1 = sorted(e[miggroup[cnt][num]][p], key=lambda tup: tup[0]) if self.potential_junction(t1[0], t2[0]) and self.potential_junction(t1[1], t2[1]): visited[j] = True miggroup[cnt].append(j) break return miggroup
[docs] def check_strand_is_bonded(self, v): """ check if the strand is bonded :param v: strand :return: domains on strand that is bonded """ bonddomains = [] for i in self.adj[v]: bonddomains += i.dom return bonddomains
[docs] def get_species(self): """ get all species in one bondgraph :return:all species """ speciesnodes = [[] for _ in range(0, self.speciesnum)] for j in range(0, len(self.V)): speciesnodes[self.species[j]].append(self.V[j]) return speciesnodes
[docs] def get_connection(self, startnode, endstrand): """ get connection nodes from the startnode to the endstrand :param startnode: starting node :param endstrand: end strand :return: path from startnode to end strand """ q = queue.Queue() visited = [False for _ in self.V] flag = False prev = {} connect = [] q.put(startnode) while not q.empty(): curstrand, curdomain = q.get() visited[curstrand] = True if curstrand == endstrand: flag = True break for i in self.adj[curstrand]: if not visited[i.node2]: d2 = max(i.dom2) q.put((i.node2, d2)) prev[(i.node2, d2)] = (curstrand, curdomain) if not flag: return None key = (curstrand, curdomain) while key in prev: connect.append(key) connect.append(prev[key]) key = prev[key] return connect
[docs] def get_direction(self, startstrand, endstrand): """ Note that we don't consider loops here. :param endstrand: :param startstrand: :return: """ if len(self.V) <= 2: return False q = queue.Queue() visited = [False for _ in range(len(self.V))] direction = [0 for _ in range(len(self.V))] cnt = 0 q.put(startstrand) while not q.empty(): cnt += 1 curstrand = q.get() if curstrand == endstrand: break # Exception occurs if len(self.adj[curstrand]) == 0: continue for i in self.adj[curstrand]: if not visited[i.node2]: q.put(i.node2) direction[i.node2] = util.flip(direction[i.node1]) if direction[startstrand] != direction[endstrand] and cnt > 2: return True return False
[docs] def merge_bonds_ignoring_nodes(self, v, nodes): """ merge bonds on v ignoring bonds connected to nodes :param v: a strand :param nodes: a set of strands :return: bonds """ if not isinstance(v, tuple): return self.adj[v] bonds = [] for i in v: for j in self.adj[i]: if j.node2 not in nodes: bonds.append(j) return bonds
[docs]class SubBondGraph(BondGraph): """ Subgraph of bondgraph """ Vsub = [] '''vertices''' adj = [] '''adjacency list''' colormap = defaultdict(set) '''map colors to vertices''' colorset = set() '''set of colors''' def __init__(self, V, color, adj, Vp): self.Vsub = V self.color = color self.colorset = set() self.colormap = defaultdict(set) BondGraph.__init__(self, Vp, color, []) if len(self.Vsub) == 1: self.colormap[self.color[self.Vsub[0]]].add(self.Vsub[0]) self.colorset.add(self.color[self.Vsub[0]]) return self.adj = copy.copy(adj) visited = [False for _ in V] d = dict(zip(V, visited)) for v in Vp: if v not in V: self.adj[v] = [] continue for i in range(0, len(self.adj[v])): if self.adj[v][i].node2 not in V: self.adj[v].pop(i) else: if d[self.adj[v][i].node2]: continue else: self.map_colors({(v, self.adj[v][i].dom[0]), (self.adj[v][i].node2, self.adj[v][i].dom2[0])}) d[v] = True
[docs] def map_colors(self, edge): for x in edge: self.colormap[self.color[x[0]]].add(x[0]) self.colorset.add(self.color[x[0]])