Source code for sign_lens.sign_lens

import os

import pandas as pd
import networkx as nx
import numpy as np

from texttable import Texttable
from collections import Counter
import matplotlib.pyplot as plt

from .utils import SignedTriadFeaExtra, SignedBipartiteFeaExtra


class SignLensBase:
    def __init__(self) -> None:
        raise NotImplementedError("Subclasses should implement this")

    def calc_node_num(self) -> int:
        """
        calculate the number of nodes

        Returns
        -------
        int
            the node number
        """
        raise NotImplementedError("Subclasses should implement this")

    def calc_edge_num(self) -> int:
        """
        calculate the number of edges

        Returns
        -------
        int
            the edge number
        """
        raise NotImplementedError("Subclasses should implement this")

    def calc_sign_dist(self) -> tuple:
        """
        calculate sign distribution

        Returns
        -------
        tuple
            (positive edge number, negative edge number, pos_neg_ratio)
        """
        raise NotImplementedError("Subclasses should implement this")

    def report_signed_metrics(self):
        """
        report_signed_metrics print signed social networks analysis metrics


        Returns
        -------
        NoneType
        """
        raise NotImplementedError("Subclasses should implement this")


[docs]class SignLens(SignLensBase): """ SignLens is a class for analyzing signed networks. """ def __init__(self, edgelist_fpath, seperator='\t', header=None): """ __init__ sign_lens class for signed graph modeling It is used for analyzing signed directed networks Parameters ---------- edgelist_fpath : str It is the file path for analyzing. seperator : str, optional The file seperator, by default '\t' """ self.edgelist_fpath = edgelist_fpath self.edge_df = pd.read_csv(self.edgelist_fpath, sep=seperator, header=header) self.edge_df.columns = ['source_node', 'target_node', 'sign'] node_list = set(self.edge_df.target_node.tolist() + self.edge_df.source_node.tolist()) self.node_dict = {i: ind for ind, i in enumerate(node_list)} self.edge_df['source_node'] = self.edge_df['source_node'].apply( lambda x: self.node_dict[x]) self.edge_df['target_node'] = self.edge_df['target_node'].apply( lambda x: self.node_dict[x]) self.G = nx.DiGraph(self.edge_df[['source_node', 'target_node']].values.tolist()) pos_edge = self.edge_df[self.edge_df['sign'] > 0] self.pos_G = nx.DiGraph(pos_edge[['source_node', 'target_node']].values.tolist()) neg_edge = self.edge_df[self.edge_df['sign'] < 0] self.neg_G = nx.DiGraph(neg_edge[['source_node', 'target_node']].values.tolist())
[docs] def report_signed_metrics(self, output_dir='output') -> str: r""" Report signed metrics for a signed network. The main signed network metrics include *sign distribution*, *balanced triangle distrubition*, *signed in-degree distribution*, *signed out-degree distribution*, *in-degree distribution*, *out-degree distribution*, *hop plot* and *singular value distribution* according to this paper: `"BalanSiNG: Fast and Scalable Generation of Realistic Signed Networks" <https://openproceedings.org/2020/conf/edbt/paper_102.pdf>`_. Parameters ---------- output_dir : str, optional It will output some figures to the ourput_dir, by default 'output' Returns ------- str The table for signed metrics. """ args = {} pos_num, neg_num, pos_r = self.calc_sign_dist() args['The number of nodes'] = self.calc_node_num() args['The number of edges (+, -, total)'] = (pos_num, neg_num, pos_num + neg_num) args['sign distribution (+)'] = pos_r triads_dist, b_ratio, u_ratio = self.calc_signed_triads_dist() args['balanced triangle distribution'] = b_ratio args['unbalanced triangle distribution'] = u_ratio args['signed triangle (+++, ++-, +--, ---)'] = tuple( [round(i, 4) for i in triads_dist]) # export plot for degree distributions G_in_degree, pos_G_in_degree, neg_G_in_degree = self.calc_signed_in_degree( ) G_out_degree, pos_G_out_degree, neg_G_out_degree = self.calc_signed_in_degree( ) fnames = ['In-degree', 'Out-degree'] datas = [(G_in_degree, pos_G_in_degree, neg_G_in_degree), (G_out_degree, pos_G_out_degree, neg_G_out_degree)] if not os.path.exists(output_dir): os.mkdir(output_dir) for fname, data in zip(fnames, datas): data0, data1, data2 = data fig, ax = plt.subplots() fpath = os.path.join(output_dir, fname + '.pdf') cc = Counter(list(data0.values())) ax.scatter(cc.keys(), cc.values(), s=60, alpha=0.9, edgecolors="k") ax.set_xscale("log") ax.set_yscale("log") ax.set_xlabel(fname) ax.set_ylabel('Count') ax.set_aspect(1. / ax.get_data_ratio()) ax.figure.savefig(fpath) args[f'{fname}output'] = fpath fpath = os.path.join(output_dir, fname + '-sign.pdf') fig, ax = plt.subplots() cc = Counter(list(data1.values())) ax.scatter(cc.keys(), cc.values(), s=60, alpha=0.7, color='g', label='Positive') cc = Counter(list(data2.values())) ax.scatter(cc.keys(), cc.values(), s=60, alpha=0.7, color="r", label='Negative') ax.set_xscale("log") ax.set_yscale("log") ax.set_xlabel(fname) ax.set_ylabel('Count') ax.legend() ax.set_aspect(1. / ax.get_data_ratio()) ax.figure.savefig(fpath) args[f'{fname} sign output'] = fpath # plot hopcnt fname = 'Hop' fpath = os.path.join(output_dir, fname + '.pdf') res = self.calc_hop_dist() cc = Counter(list(res.values())) fig, ax = plt.subplots() ax.scatter(cc.keys(), cc.values(), s=60, alpha=0.7, color='b') ax.set_yscale("log") ax.set_xlabel(fname) ax.set_ylabel('Count') ax.set_aspect(1. / ax.get_data_ratio()) ax.figure.savefig(fpath) args[f'{fname} sign output'] = fpath # plot Sigular Value sv = self.calc_singular_value_dist() ind = range(1, len(sv) + 1) fig, ax = plt.subplots() ax.scatter(ind[:100], sv[:100]) ax.margins(x=0) ax.set_ylim([1, 100]) ax.set_xscale("log") ax.set_yscale("log") ax.set_xlabel('Top-k') ax.set_ylabel('Singular Values') ax.set_aspect(1. / ax.get_data_ratio()) fpath = os.path.join(output_dir, 'Top-K.pdf') ax.figure.savefig(fpath) args['Singular value distribution'] = fpath keys = args.keys() t = Texttable() t.add_rows([["Metrics", "Value"]] + [[k.replace("_", " ").capitalize(), args[k]] for k in keys]) print(t.draw())
[docs] def calc_node_num(self) -> int: r""" calculate the number of nodes Returns ------- int the node number """ node_list = self.edge_df.target_node.tolist( ) + self.edge_df.source_node.tolist() return len(set(node_list))
[docs] def calc_edge_num(self) -> int: r""" calculate the number of edges Returns ------- int the edge number """ return len(self.edge_df)
[docs] def calc_sign_dist(self) -> tuple: r""" calculate sign distribution Returns ------- tuple (positive edge number, negative edge number, pos_neg_ratio) """ pos_num = len(self.edge_df[self.edge_df['sign'] > 0]) neg_num = len(self.edge_df[self.edge_df['sign'] < 0]) return (pos_num, neg_num, pos_num / (pos_num + neg_num))
[docs] def calc_signed_in_degree(self) -> tuple: r""" calculate signed in degree Returns ------- tuple (G_in_degree, pos_G_in_degree, neg_G_in_dergee) """ G_in_degree = {i[0]: i[1] for i in self.G.in_degree()} pos_G_in_degree = {i[0]: i[1] for i in self.pos_G.in_degree()} neg_G_in_dergee = {i[0]: i[1] for i in self.neg_G.in_degree()} return (G_in_degree, pos_G_in_degree, neg_G_in_dergee)
[docs] def calc_signed_out_degree(self) -> tuple: r""" calculate signed out degree Returns ------- tuple (G_in_degree, pos_G_in_degree, neg_G_in_dergee) """ G_out_degree = {i[0]: i[1] for i in self.G.out_degree()} pos_G_out_degree = {i[0]: i[1] for i in self.pos_G.out_degree()} neg_G_out_dergee = {i[0]: i[1] for i in self.neg_G.out_degree()} return (G_out_degree, pos_G_out_degree, neg_G_out_dergee)
[docs] def calc_hop_dist(self) -> dict: r""" calculate the distrubiton of hops Returns ------- dict the dict of ``{'d': counts }`` """ short_dict = {i[0]: i[1] for i in nx.shortest_path_length(self.G)} hop_dist = {} v_max = 0 for i, v in short_dict.items(): for j, k in v.items(): key = tuple((i, j)) if not np.isinf(k): hop_dist[key] = k if k > v_max: v_max = k return hop_dist
[docs] def calc_singular_value_dist(self) -> np.array: r""" calculated singular value distribution Returns ------- return the svd results of undirected unsigned matrice """ uG = self.G.to_undirected() A = nx.to_numpy_matrix(uG) u, s, vh = np.linalg.svd(A, full_matrices=True) return s
[docs] def calc_balanced_triangle_dist(self) -> tuple: r""" calculate balanced triangle distributions Returns ------- tuple (balanced triads, unbalanced triads) """ model = SignedTriadFeaExtra(self.edgelist_fpath, undirected=False) s0, s1, s2, s3 = model.calc_balance_and_status_triads_num() ratio = (s1 + s2) / s0 return ratio, 1 - ratio
[docs] def calc_signed_triads_dist(self) -> tuple: r""" calculate signed triads distributions Returns ------- tuple ((+++, ++-, +--, ---), balanced triads, unbalanced triads) """ model = SignedTriadFeaExtra(self.edgelist_fpath, undirected=False) res = model.calc_balance_triads_dist() # +++ ++- +-- --- b_triad = res[0] + res[2] u_triad = res[1] + res[3] return res, b_triad, u_triad
class SignBipartiteLens(SignLensBase): def __init__(self, edgelist_fpath, seperator='\t', header=None): self.edgelist_fpath = edgelist_fpath self.seperator = seperator self.edge_df = pd.read_csv(self.edgelist_fpath, sep=seperator, header=header) self.edge_df.columns = ['node_u', 'node_v', 'sign'] def calc_signed_bipartite_butterfly_dist(self): model = SignedBipartiteFeaExtra(self.edgelist_fpath, seperator=self.seperator, header=None) signs, res = model.calc_signed_butterfly_dist() return signs, res def calc_node_num(self) -> int: r""" calculate the number of nodes Returns ------- int the node number """ node_list1 = set(self.edge_df.node_u.tolist()) node_list2 = set(self.edge_df.node_v.tolist()) return len(set(node_list1)), len(set(node_list2)) def calc_edge_num(self) -> int: r""" calculate the number of edges Returns ------- int the edge number """ return len(self.edge_df) def calc_sign_dist(self) -> tuple: r""" calculate sign distribution Returns ------- tuple (positive edge number, negative edge number, pos_neg_ratio) """ pos_num = len(self.edge_df[self.edge_df['sign'] > 0]) neg_num = len(self.edge_df[self.edge_df['sign'] < 0]) return (pos_num, neg_num, pos_num / (pos_num + neg_num)) def report_signed_metrics(self, output_dir='output') -> str: args = {} args['The number of nodes'] = self.calc_node_num() pos_num, neg_num, pos_r = self.calc_sign_dist() args['The number of edges (+, -, total)'] = (pos_num, neg_num, pos_num + neg_num) args['sign distribution (+)'] = pos_r signs, res = self.calc_signed_butterfly_dist() args['balanced butterfly distribution'] = sum(res[:-2]) args['unbalanced butterfly distribution'] = sum(res[-2:]) sign_str = ",".join(signs) args[f'signed butterfly ({sign_str})'] = [round(i, 3) for i in res] keys = args.keys() t = Texttable() t.add_rows([["Metrics", "Value"]] + [[k.replace("_", " ").capitalize(), args[k]] for k in keys]) print(t.draw())