Source code for sign_lens.utils
import os
import numpy as np
import scipy.sparse as sp
from collections import defaultdict
from tqdm import tqdm
[docs]class SignedTriadFeaExtra:
def __init__(self, edgelist_fpath, undirected=False, seperator='\t'):
self.undirected = undirected
self.seperator = seperator
res = self.init_edgelists(edgelist_fpath)
self.pos_in_edgelists, self.pos_out_edgelists, self.neg_in_edgelists, self.neg_out_edgelists = res
def init_edgelists(self, edgelist_fpath):
pos_out_edgelists = defaultdict(list)
neg_out_edgelists = defaultdict(list)
pos_in_edgelists = defaultdict(list)
neg_in_edgelists = defaultdict(list)
with open(edgelist_fpath) as f:
for line in f.readlines():
x, y, z = line.split(self.seperator)
x = int(x)
y = int(y)
z = int(z)
if z == 1:
pos_out_edgelists[x].append(y)
pos_in_edgelists[y].append(x)
else:
neg_out_edgelists[x].append(y)
neg_in_edgelists[y].append(x)
if self.undirected:
# if undireced, repeat it
x, y = y, x
if z == 1:
pos_out_edgelists[x].append(y)
pos_in_edgelists[y].append(x)
else:
neg_out_edgelists[x].append(y)
neg_in_edgelists[y].append(x)
return pos_in_edgelists, pos_out_edgelists, neg_in_edgelists, neg_out_edgelists
def get_pos_indegree(self, v):
return len(self.pos_in_edgelists[v])
def get_pos_outdegree(self, v):
return len(self.pos_out_edgelists[v])
def get_neg_indegree(self, v):
return len(self.neg_in_edgelists[v])
def get_neg_outdegree(self, v):
return len(self.neg_out_edgelists[v])
def common_neighbors(self, u, v):
u_neighbors = self.pos_in_edgelists[u] + self.neg_in_edgelists[u] + \
self.pos_out_edgelists[u] + self.neg_out_edgelists[u]
v_neighbors = self.pos_in_edgelists[v] + self.neg_in_edgelists[v] + \
self.pos_out_edgelists[v] + self.neg_out_edgelists[v]
return len(set(u_neighbors).intersection(set(v_neighbors)))
[docs] def extract_triad_counts(self, u, v) -> tuple:
r"""
.. math::
A \times B \alpha
"""
d1_1 = len(set(self.pos_out_edgelists[u]).intersection(set(self.pos_in_edgelists[v])))
d1_2 = len(set(self.pos_out_edgelists[u]).intersection(set(self.neg_in_edgelists[v])))
d1_3 = len(set(self.neg_out_edgelists[u]).intersection(set(self.pos_in_edgelists[v])))
d1_4 = len(set(self.neg_out_edgelists[u]).intersection(set(self.neg_in_edgelists[v])))
d2_1 = len(set(self.pos_out_edgelists[u]).intersection(set(self.pos_out_edgelists[v])))
d2_2 = len(set(self.pos_out_edgelists[u]).intersection(set(self.neg_out_edgelists[v])))
d2_3 = len(set(self.neg_out_edgelists[u]).intersection(set(self.pos_out_edgelists[v])))
d2_4 = len(set(self.neg_out_edgelists[u]).intersection(set(self.neg_out_edgelists[v])))
d3_1 = len(set(self.pos_in_edgelists[u]).intersection(set(self.pos_out_edgelists[v])))
d3_2 = len(set(self.pos_in_edgelists[u]).intersection(set(self.neg_out_edgelists[v])))
d3_3 = len(set(self.neg_in_edgelists[u]).intersection(set(self.pos_out_edgelists[v])))
d3_4 = len(set(self.neg_in_edgelists[u]).intersection(set(self.neg_out_edgelists[v])))
d4_1 = len(set(self.pos_in_edgelists[u]).intersection(set(self.pos_in_edgelists[v])))
d4_2 = len(set(self.pos_in_edgelists[u]).intersection(set(self.neg_in_edgelists[v])))
d4_3 = len(set(self.neg_in_edgelists[u]).intersection(set(self.pos_in_edgelists[v])))
d4_4 = len(set(self.neg_in_edgelists[u]).intersection(set(self.neg_in_edgelists[v])))
return d1_1, d1_2, d1_3, d1_4, d2_1, d2_2, d2_3, d2_4, d3_1, d3_2, d3_3, d3_4, d4_1, d4_2, d4_3, d4_4
def calc_balance_triads_num(self):
s0, s1, s2, s3 = self.calc_balance_and_status_triads_num()
return s1 + s2, s0
def calc_balance_triads_dist(self):
t1 = [] # +++
t2 = [] # ++-
t3 = [] # +--
t4 = [] # ---
for x in list(self.pos_out_edgelists):
for y in self.pos_out_edgelists[x]:
mask1 = [1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0] # +++
mask2 = [0, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, 0] # ++-
mask3 = [0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1] # +--
mask4 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] # ---
rs = self.extract_triad_counts(x, y)
t1.append(np.dot(mask1, rs))
t2.append(np.dot(mask2, rs))
t3.append(np.dot(mask3, rs))
t4.append(np.dot(mask4, rs))
for x in list(self.neg_out_edgelists):
for y in self.neg_out_edgelists[x]:
mask1 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] # +++
mask2 = [1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0] # ++-
mask3 = [0, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, 0] # +--
mask4 = [0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1] # ---
rs = self.extract_triad_counts(x, y)
t1.append(np.dot(mask1, rs))
t2.append(np.dot(mask2, rs))
t3.append(np.dot(mask3, rs))
t4.append(np.dot(mask4, rs))
s1 = np.sum(t1)
s2 = np.sum(t2)
s3 = np.sum(t3)
s4 = np.sum(t4)
res = np.array([s1, s2, s3, s4])
return res / res.sum()
def calc_balance_and_status_triads_num(self):
rs0 = []
rs1 = []
rs2 = []
rs3 = []
for x in list(self.pos_out_edgelists):
for y in self.pos_out_edgelists[x]:
mask1 = [1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 1] # both satify
mask2 = [0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0] # only balance
mask3 = [0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0] # only status
rs = self.extract_triad_counts(x, y)
rs0.append(rs)
rs1.append(np.dot(mask1, rs))
rs2.append(np.dot(mask2, rs))
rs3.append(np.dot(mask3, rs))
for x in list(self.neg_out_edgelists):
for y in self.neg_out_edgelists[x]:
mask1 = [0, 1, 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 0]
mask2 = [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0]
mask3 = [0, 0, 0, 1, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 1]
rs = self.extract_triad_counts(x, y)
rs0.append(rs)
rs1.append(np.dot(mask1, rs))
rs2.append(np.dot(mask2, rs))
rs3.append(np.dot(mask3, rs))
s0 = np.sum(rs0)
s1 = np.sum(rs1)
s2 = np.sum(rs2)
s3 = np.sum(rs3)
print('all triangle', s0)
print('both', s1, s1 / s0)
print('balance', s2, s2 / s0)
print('status', s3, s3 / s0)
return s0, s1, s2, s3
class SignedTriadFeaExtraByMatrace:
def __init__(self, edgelist_fpath, undirected=False, seperator='\t'):
self.undirected = undirected
self.seperator = seperator
self.init_matrice(edgelist_fpath)
def init_matrice(self, edgelist_fpath):
pos_edgelist = []
neg_edgelist = []
node_set = {}
with open(edgelist_fpath) as f:
for line in f:
x, y, z = line.strip().split(self.seperator)
if x not in node_set:
node_set[x] = len(node_set)
if y not in node_set:
node_set[y] = len(node_set)
x = int(x)
y = int(y)
z = int(z)
if z == 1:
pos_edgelist.append((x, y))
else:
neg_edgelist.append((x, y))
node_num = len(node_set)
pos_edge_array = np.array(pos_edgelist)
neg_edge_array = np.array(neg_edgelist)
row = pos_edge_array[:, 0]
col = pos_edge_array[:, 1]
data = np.ones_like(pos_edge_array[:, 0])
self.pos_mat = sp.coo_matrix((data, (row, col)), shape=(node_num, node_num))
row = neg_edge_array[:, 0]
col = neg_edge_array[:, 1]
data = np.ones_like(neg_edge_array[:, 0])
self.neg_mat = sp.coo_matrix((data, (row, col)), shape=(node_num, node_num))
def calc_balance_and_status_triads_num(self):
r"""
calc_balance_and_status_triads_num
.. math::
{A_1^+} \cdot {A_1^+} \odot (1 - I)\odot {A_1^+}
"""
A_plus = self.pos_mat
A_minus = self.neg_mat
ts = [
[(A_plus, A_plus), (A_plus, A_minus), (A_minus, A_plus), (A_minus, A_minus)],
[(A_plus, A_plus.T), (A_plus, A_minus.T), (A_minus, A_plus.T), (A_minus, A_minus.T)],
[(A_plus.T, A_plus.T), (A_plus.T, A_minus.T), (A_minus.T, A_plus.T), (A_minus.T, A_minus.T)],
[(A_plus.T, A_plus), (A_plus.T, A_minus), (A_minus.T, A_plus), (A_minus.T, A_minus)],
]
rs0 = []
rs1 = []
rs2 = []
rs3 = []
# pos
rs = []
for t in ts:
for a, b in t:
res = np.dot(a, b)
res.setdiag(0)
res = res.multiply(A_plus)
rs.append(res.sum())
mask1 = [1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 1] # both satify
mask2 = [0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0] # only balance
mask3 = [0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0] # only status
rs = np.array(rs)
rs0.append(rs)
rs1.append(np.dot(mask1, rs))
rs2.append(np.dot(mask2, rs))
rs3.append(np.dot(mask3, rs))
rs = []
for t in ts:
for a, b in t:
res = np.dot(a, b)
res.setdiag(0)
res = res.multiply(A_minus)
rs.append(res.sum())
mask1 = [0, 1, 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 0]
mask2 = [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0]
mask3 = [0, 0, 0, 1, 1, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 1]
rs = np.array(rs)
rs0.append(rs)
rs1.append(np.dot(mask1, rs))
rs2.append(np.dot(mask2, rs))
rs3.append(np.dot(mask3, rs))
s0 = np.sum(rs0)
s1 = np.sum(rs1)
s2 = np.sum(rs2)
s3 = np.sum(rs3)
print('all triangle', s0)
print('both', s1, s1 / s0)
print('balance', s2, s2 / s0)
print('status', s3, s3 / s0)
return s0, s1, s2, s3
class SignedBipartiteFeaExtra:
def __init__(self, edgelist_fpath, seperator='\t', header=None) -> None:
self.edgelist_fpath = edgelist_fpath
self.seperator = seperator
self.init_edgelists()
def init_edgelists(self):
self.pos_a_b = defaultdict(set)
self.pos_b_a = defaultdict(set)
self.neg_a_b = defaultdict(set)
self.neg_b_a = defaultdict(set)
edges = []
with open(self.edgelist_fpath) as f:
for line in f:
a, b, s = map(int, line.strip().split(self.seperator))
edges.append((a, b, s))
if s > 0:
self.pos_a_b[a].add(b)
self.pos_b_a[b].add(a)
else:
self.neg_a_b[a].add(b)
self.neg_b_a[a].add(a)
self.edges = np.array(edges)
def count_values(self, a_b_1, a_b_2, a_b_3, a_b_4, a1, a2):
b1 = a_b_1[a1]
b2 = a_b_2[a1]
b3 = a_b_3[a2]
b4 = a_b_4[a2]
aa = b1.intersection(b3)
bb = b2.intersection(b4)
cnt1 = len(aa)
cnt2 = len(bb)
return cnt1 * cnt2 - len(aa.intersection(bb))
def calc_signed_bipartite_butterfly_dist(self):
a_set = set([i[0] for i in self.edges])
mapper = {'++++': 0, '----': 0, '++--': 0, '+-+-': 0, '+--+': 0, '+---': 0, '+++-': 0}
for a1 in tqdm(a_set):
for a2 in a_set:
if a1 == a2:
continue
mapper['++++'] += self.count_values(self.pos_a_b, self.pos_a_b, self.pos_a_b, self.pos_a_b, a1, a2)
mapper['++--'] += self.count_values(self.pos_a_b, self.pos_a_b, self.neg_a_b, self.neg_a_b, a1, a2)
mapper['+++-'] += self.count_values(self.pos_a_b, self.pos_a_b, self.pos_a_b, self.neg_a_b, a1, a2)
mapper['+---'] += self.count_values(self.pos_a_b, self.neg_a_b, self.neg_a_b, self.neg_a_b, a1, a2)
mapper['+-+-'] += self.count_values(self.pos_a_b, self.neg_a_b, self.pos_a_b, self.neg_a_b, a1, a2)
mapper['+--+'] += self.count_values(self.pos_a_b, self.neg_a_b, self.neg_a_b, self.pos_a_b, a1, a2)
mapper['----'] += self.count_values(self.neg_a_b, self.neg_a_b, self.neg_a_b, self.neg_a_b, a1, a2)
sum_s = sum(mapper.values())
res_sign = [
'++++',
'+--+',
'++--',
'+-+-',
'----',
'+++-',
'+---'
]
return res_sign, [mapper[i]/sum_s for i in res_sign]
class SignedBipartiteFeaExtraByMatrace:
def __init__(self, edgelist_fpath, seperator='\t', header=None) -> None:
self.edgelist_fpath = edgelist_fpath
self.seperator = seperator
self.init_matrice(edgelist_fpath)
def init_matrice(self, edgelist_fpath):
pos_edgelist = []
neg_edgelist = []
node_set1 = {}
node_set2 = {}
with open(edgelist_fpath) as f:
for line in f.readlines():
x, y, z = line.strip().split(self.seperator)
if x not in node_set1:
node_set1[x] = len(node_set1)
if y not in node_set2:
node_set2[y] = len(node_set2)
x = int(x)
y = int(y)
z = int(z)
if z == 1:
pos_edgelist.append((x, y))
else:
neg_edgelist.append((x, y))
node_num1 = len(node_set1)
node_num2 = len(node_set2)
pos_edge_array = np.array(pos_edgelist)
neg_edge_array = np.array(neg_edgelist)
row = pos_edge_array[:, 0]
col = pos_edge_array[:, 1]
data = np.ones_like(pos_edge_array[:, 0])
self.pos_mat = sp.coo_matrix((data, (row, col)), shape=(node_num1, node_num2))
row = neg_edge_array[:, 0]
col = neg_edge_array[:, 1]
data = np.ones_like(neg_edge_array[:, 0])
self.neg_mat = sp.coo_matrix((data, (row, col)), shape=(node_num1, node_num2))
def calc_signed_bipartite_butterfly_dist(self):
mapper = {'++++': 0, '----': 0, '++--': 0, '+-+-': 0, '+--+': 0, '+---': 0, '+++-': 0}
mapper_operataions = [
[self.pos_mat, self.pos_mat.T, self.pos_mat, self.pos_mat.T],
[self.neg_mat, self.neg_mat.T, self.neg_mat, self.neg_mat.T],
[self.pos_mat, self.pos_mat.T, self.neg_mat, self.neg_mat.T],
[self.pos_mat, self.neg_mat.T, self.pos_mat, self.neg_mat.T],
[self.pos_mat, self.neg_mat.T, self.neg_mat, self.pos_mat.T],
[self.pos_mat, self.neg_mat.T, self.neg_mat, self.neg_mat.T],
[self.pos_mat, self.pos_mat.T, self.pos_mat, self.neg_mat.T],
]
for map, operation in zip(mapper.keys(), mapper_operataions):
a, b, c, d = operation
res = a.dot(b)
res.setdiag(0)
res = res.dot(c)
res = res.dot(d)
v = res.diagonal().sum()
mapper[map] = v
sum_s = sum(mapper.values())
res_sign = [
'++++',
'+--+',
'++--',
'+-+-',
'----',
'+++-',
'+---'
]
return res_sign, [mapper[i]/sum_s for i in res_sign]
class SignedPathFeaExtraByMatrace:
def __init__(self, edgelist_fpath, nodefea_fpath, seperator='\t', header=None) -> None:
self.edgelist_fpath = edgelist_fpath
self.nodefea_fpath = nodefea_fpath
self.seperator = seperator
self.init_matrice(edgelist_fpath)
def init_matrice(self, edgelist_fpath):
pos_edgelist = []
neg_edgelist = []
node_set1 = {}
node_set2 = {}
def compute_path(self):
pass
if __name__ == "__main__":
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
test_fpath = os.path.abspath(os.path.join(BASE_DIR, '..', 'tests', 'test_datas', 'simple_case.edgelist'))
model = SignedTriadFeaExtra(edgelist_fpath=test_fpath)
model.calc_balance_and_status_triads_num()