内容导读 解决了一个困扰已久的问题:如何从资金源头出发,可视化追踪资金流向并找到资金归集客户。技术贴时间,代码很业余,专业人士请忽略!
问题提出 这是一个多次被问到,也是审计工作中多次遇到的问题:
知道资金的源头,想知道资金最终流向了哪里?资金流动过程中是否有归集?
比如,在如下示例数据中,需要追踪客户 N0、 N1 转出资金的最终去向,有哪些客户是资金归集客户?
这个问题的难点在于,资金流动过程中,经过几手过账是不确定的,资金也可能存在拆分归集后的划转,多个客户之间的交易时间也是交叉不同步的 。
因此,这个问题就转为三个子问题:
资金流转有时间维度,流转过程中后手的日期要晚于前手 业务思路 资金交易涉及众多对手,随着时间的推移,就构成了资金交易网络。比如,上述示例数据用网络可视化展现后如下:
从图中可见,上述三个子问题就转化为如何可视化展现:
剔除无关交易。B->C 有两笔资金划转,但其中一笔交易日期为 20210122 的交易晚于上一手 N0->B 的日期20210201。C2->E 也类似。 技术方案 既然是交易网络,那么它就是复杂网络的子集,可以用复杂网络的相关算法进行分析。
关于复杂网络的相关知识,参见《数字化审计实务指南》(人民邮电出版社)P97页 ,或本公众号的文章:《利用社交网络分析(SNA)挖出“围标”线索 》、《前沿科技在数字化审计中的应用案例 》。
从复杂网络技术的角度,上述问题就转为:
找到以 N0、N1 为 source 到网络中其他点连通的路径,且经过的点最多 。这就是复杂网络中的最短路径算法(shortest_paths),网络中的一个点到另一个点的路径不止一条,每条路径的长度可能不同,把路径长度最短的那条叫做最短路径。找到网络中中介中心性( Betweeness Centrality )最高的点D、C 。中介中心性(Betweeness Centrality)可以简单理解为,其他任意两个点要建立连接关系,都需要这个点为中介。在信息传播过程中,网络中这样点就是大V,在资金网络中,这样的客户就是资金中介。Python环境下的 networkx 库是专门用于复杂网络分析的库,提供了大量的算法和函数。
在 networkx 库中求解最短路径(shortest_paths)的函数有 all_shortest_paths 、 all_simple_edge_paths 。求解中介中心性(Betweeness Centrality)的函数有 betweenness_centrality 、 load_centrality 、 edge_betweenness_centrality 。
详细介绍,参见 networkx 库官方文档的算法库说明:https:///documentation/stable/reference/algorithms/index.html
数据分析环境 本文分析所使用的环境具体如下:
软件或环境 说明 Win10 64位 系统环境 Python 3.8.5 数据分析语言平台 networkx 2.5 复杂网络分析库 pandas 1.1.3 数据读取处理库 pyvis 0.1.9 关系网络可视化库
networkx 自带的draw函数也可以可视化网络,但生成的是静态图,没有交互性。本例使用PyVis 库进行交互式展现。
PyVis 是一个可交互的图可视化库,可以直接读取和交互式展现networkx生成的网络。官方文档:https://pyvis./en/latest/index.html
在生成交互式网络时,如果熟悉HTML+CSS,可以直接修改PyVis 安装目录下的模板文件 python3\Lib\site-packages\pyvis\templates\template.html ,美化可视化效果。
实现代码 代码可以按住屏幕,左右滑动查看
1.将交易数据转为有向交易网络 # -*- coding:utf-8 -*- import networkx as nximport pandas as pdfrom pyvis.network import Network# 读取流水数据 df_trade = pd.read_excel('交易流水示例.xlsx' ,converters={'转账日期' :str})# 交易是有方向的 按照方向提取数据生成图 转入转出分别生成有向图digraph G1 = nx.from_pandas_edgelist(df_trade.loc[df_trade['转出转入标志' ]=='转出' ], source='客户名称' ,target='对方客户名称' , edge_attr=['转账日期' ,'转账金额' ], create_using=nx.MultiDiGraph()) G2 = nx.from_pandas_edgelist(df_trade.loc[df_trade['转出转入标志' ]=='转入' ], source='对方客户名称' ,target='客户名称' , edge_attr=['转账日期' ,'转账金额' ], create_using=nx.MultiDiGraph())# 合并两个子图 G = nx.compose(G1,G2)
2.计算中介中心性 # 计算点的 中介中心性(Betweeness Centrality) betweenness = nx.load_centrality(G)# 取中介中心性最大的5个点 print(sorted(betweenness.items(),key = lambda x:x[1 ],reverse=True )[0 :5 ])# 转为dict存储 后续用于可视化设置点大小 keynodesdict = dict(sorted(betweenness.items(),key = lambda x:x[1 ],reverse=True )[0 :5 ])
从输出结果看,D、N2、C等中介度值最大,是资金归集点,这和人工对图的观察是一致的。
[('D', 0.1388888888888889), ('N2', 0.1), ('C', 0.08888888888888889), ('B', 0.05555555555555556), ('N5', 0.044444444444444446)]
3.找出以 N0/N1 为源头的最短路径 # 定义用于存储最短路径中边的list # 后续用于标注路径中这些边的颜色 specEdges = []# 计算点和其他点的路径长度 pathlen = dict(nx.shortest_path_length(G))# 找出以'N0','N1'为源头的路径中最长的路径 for node in ['N0' ,'N1' ]: for k,v in pathlen[node].items(): # 找出路径中最长的路径 if (v==max(pathlen[node].values())): # 找到最远的目标点,然后将源头和目标点之间的最短路径都提取出来 # 没有考虑日期因素 for p in nx.all_simple_edge_paths(G,source=node,target=k): for i in range(0 ,len(p)-1 ): e1 = p[i] e2 = p[i+1 ] e1_date = (G.get_edge_data(e1[0 ],e1[1 ])[e1[2 ]]['转账日期' ]) e2_date = (G.get_edge_data(e2[0 ],e2[1 ])[e2[2 ]]['转账日期' ]) # 考虑日期因素 # 如果后一手的转账日期小于前一手 则路径就到当前为止 if e1_date>e2_date: p = p[0 :i+1 ] break # 显示有效路径 并存入最短路径list print(p) specEdges = specEdges + p# 不同的路径中 边有重复 去重 specEdges = list(set(specEdges))
从上面的交易网络观察可见,以'N0','N1'为源头的世界尽头是'E'、'F' 。shortest_path_length 计算出的结果如下图,与人工观察是一致的。
4.用PyVis可视化 # 初始化pyvis网络图 net = Network(height='800px' ,width='800px' ,directed=True ,heading='资金网络' )# 由于两点之间有多条边 必须设置参数 opts = ''' var options = { 'physics': { 'minVelocity': 0.5, 'solver': 'forceAtlas2Based' } } ''' net.set_options(opts)# 边的标签设置为转账日期 for edge in G.edges(data=True ): edge[2 ]['label' ] = edge[2 ]['转账日期' ]# 边的光标悬停提示设置为转账金额 for edge in G.edges(data=True ): edge[2 ]['title' ] = str(edge[2 ]['转账金额' ])+'元' # 根据点中介度大小设置点的尺寸 for k,v in keynodesdict.items(): G.nodes[k]['size' ]=10 *(v/min(keynodesdict.values()))# 根据点中介度大小设置最大三个点为红色 for k,v in list(keynodesdict.items())[0 :3 ]: G.nodes[k]['color' ]='red' # 将找出来的边标注为红色 for edge in specEdges: G.edges[edge]['color' ]='red' net.from_nx(G)#输出并生成demo2.html net.show('资金可视化追踪.html' )
生成的是一个HTML文件,浏览器自动打开后,可见资金可视化追踪图。如下图: