分享

测试网络代码分享

 华科小丁 2023-03-27 发布于江苏

程序的主要逻辑如下:

读取一个excel文件中的ip列表,然后使用多线程调用ping统计每个ip的网络参数,最后把结果输出到excel文件中。

代码如下所示:

```c

#! /usr/bin/env python

# -*- coding: UTF-8 -*-

# File: pingtest_test.py

# Date: 2008-09-28

# Author: Michael Field

# Modified By:intheworld

# Date: 2017-4-17

import sys

import os

import getopt

import commands

import subprocess

import re

import time

import threading

import xlrd

import xlwt

TEST = [

  '220.181.57.217',

  '166.111.8.28',

  '202.114.0.242',

  '202.117.0.20',

  '202.112.26.34',

  '202.203.128.33',

  '202.115.64.33',

  '202.201.48.2',

  '202.114.0.242',

  '202.116.160.33',

  '202.202.128.33',

]

RESULT={}

def usage():

 print "USEAGE:"

 print "\t%s -n TEST|excel name [-t times of ping] [-c concurrent number(thread nums)]" %sys.argv[0]

 print "\t TEST为简单测试的IP列表"

 print "\t-t times 测试次数;默认为1000;"

 print "\t-c concurrent number 并行线程数目:默认为10"

 print "\t-h|-?, 帮助信息"

 print "\t 输出为当前目录文件ping_result.txt 和 ping_result.xls"

 print "for example:"

 print "\t./ping_test.py -n TEST -t 1 -c 10"

def div_list(ls,n):

 if not isinstance(ls,list) or not isinstance(n,int):

  return []

 ls_len = len(ls)

 print 'ls length = %s' %ls_len

 if n<=0 or 0==ls_len:

  return []

 if n > ls_len:

  return []

 elif n == ls_len:

  return [[i] for i in ls]

 else:

  j = ls_len/n

  k = ls_len%n

  ### j,j,j,...(前面有n-1个j),j+k

  #步长j,次数n-1

  ls_return = []

  for i in xrange(0,(n-1)*j,j):

   ls_return.append(ls[i:i+j])

  #算上末尾的j+k

  ls_return.append(ls[(n-1)*j:])

  return ls_return

def pin(IP):

 try:

  xpin=subprocess.check_output("ping -n 1 -w 100 %s" %IP, shell=True)

 except Exception:

  xpin = 'empty'

 ms = '=[0-9]+ms'.decode("utf8")

 print "%s" %ms

 print "%s" %xpin

 mstime=re.search(ms,xpin)

 if not mstime:

  MS='timeout'

  return MS

 else:

  MS=mstime.group().split('=')[1]

  return MS.strip('ms')

def count(total_count,I):

 global RESULT

 nowsecond = int(time.time())

 nums = 0

 oknums = 0

 timeout = 0

 lostpacket = 0.0

 total_ms = 0.0

 avgms = 0.0

 maxms = -1

 while nums < total_count:

  nums += 1

  MS = pin(I)

  print 'pin output = %s' %MS

  if MS == 'timeout':

   timeout += 1

   lostpacket = timeout*100.0 / nums

  else:

   oknums += 1

   total_ms = total_ms + float(MS)

   if oknums == 0:

    oknums = 1

    maxms = float(MS)

    avgms = total_ms / oknums

   else:

    avgms = total_ms / oknums

    maxms = max(maxms, float(MS))

  RESULT[I] = (I, avgms, maxms, lostpacket)

def thread_func(t, ipList):

 if not isinstance(ipList,list):

  return

 else:

  for ip in ipList:

   count(t, ip)

def readIpsInFile(excelName):

 data = xlrd.open_workbook(excelName)

 table = data.sheets()[0]

 nrows = table.nrows

 print 'nrows %s' %nrows

 ips = []

 for i in range(nrows):

  ips.append(table.cell_value(i, 0))

  print table.cell_value(i, 0)

 return ips

if __name__ == '__main__':

 file = 'ping_result.txt'

 times = 10

 network = ''

 thread_num = 10

 args = sys.argv[1:]

 try:

  (opts, getopts) = getopt.getopt(args, 'n:t:c:h?')

 except:

  print "\nInvalid command line option detected."

  usage()

  sys.exit(1)

 for opt, arg in opts:

  if opt in ('-n'):

   network = arg

  if opt in ('-h', '-?'):

   usage()

   sys.exit(0)

  if opt in ('-t'):

   times = int(arg)

  if opt in ('-c'):

   thread_num = int(arg)

 f = open(file, 'w')

 workbook = xlwt.Workbook()

 sheet1 = workbook.add_sheet("sheet1", cell_overwrite_ok=True)

 if not isinstance(times,int):

  usage()

  sys.exit(0)

 if network not in ['TEST'] and not os.path.exists(os.path.join(os.path.dirname(__file__), network)):

  print "The network is wrong or excel file does not exist. please check it."

  usage()

  sys.exit(0)

 else:

  if network == 'TEST':

   ips = TEST

  else:

   ips = readIpsInFile(network)

  print 'Starting...'

  threads = []

  nest_list = div_list(ips, thread_num)

  loops = range(len(nest_list))

  print 'Total %s Threads is working...' %len(nest_list)

  for ipList in nest_list:

   t = threading.Thread(target=thread_func,args=(times,ipList))

   threads.append(t)

  for i in loops:

   threads[i].start()

  for i in loops:

   threads[i].join()

  it = 0

  for line in RESULT:

   value = RESULT[line]

   sheet1.write(it, 0, line)

   sheet1.write(it, 1, str('%.2f'%value[1]))

   sheet1.write(it, 2, str('%.2f'%value[2]))

   sheet1.write(it, 3, str('%.2f'%value[3]))

   it+=1

   f.write(line + '\t'+ str('%.2f'%value[1]) + '\t'+ str('%.2f'%value[2]) + '\t'+ str('%.2f'%value[3]) + '\n')

  f.close()

  workbook.save('ping_result.xls')

  print 'Work Done. please check result %s and ping_result.xls.'%file

```

这段代码参照了别人的实现,虽然不是特别复杂,这里还是简单解释一下。

    excel读写使用了xlrd和xlwt,基本都是使用了一些简单的api。

    使用了threading实现多线程并发,和POSIX标准接口非常相似。thread_func是线程的处理函数,它的输入包含了一个ip的List,所以在函数内部通过循环处理各个ip。

    此外,Python的commands在Windows下并不兼容,所以使用了subprocess模块。

到目前为止,我对Python里面字符集的理解还不到位,所以正则表达式匹配的代码并不够强壮,不过目前勉强可以工作。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多