最近新换了工作,新公司是一家做通信领域的公司(baicells)。虽然仍旧是从事测试开发相关的工作, 但通信领域毕竟是新手,所以最近一直在恶补相关基础知识,耽误了文章的更新。
本文主要分享几个系统下统计网络速率的几个小方法,供参考。
1.网络设备
该方法比较简单直接,不借助其他工具,直接读取一定时间间隔的网络收发数据量,然后进行速率计算。 优点是简单粗暴,通用性比较强。 缺点只能按照网络设备进行统计,不能按照接口进行划分。
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | #!/usr/bin/env pythrn
# *-* coding=utf-8 *-*
import time
import sys
import os
from optparse import OptionParser
def recive_args():
parser = OptionParser()
parser.add_option("-m", "--mode", dest="mode",
help="mode")
parser.add_option("-t", "--time",dest="time",
help="time")
(options, args) = parser.parse_args()
test_args = {}
test_args["mode"] = options.mode
test_args["time"] = options.time
return test_args
class NetRate:
def __init__(self, time, mode):
self.time=int(time)
self.mode=mode
def rx(self):
totalrx = []
try:
ifstat = open('/proc/net/dev').readlines()
for i,interface in enumerate(ifstat):
stat = interface.split()
if i >= 3:
totalrx.append(stat)
rxdata = 0
for rxtmp in totalrx:
rxdata += float(rxtmp[1])
return rxdata
except:
return 0
def tx(self):
totaltx = []
try:
ifstat = open('/proc/net/dev').readlines()
for i,interface in enumerate(ifstat):
print ifstat
stat = interface.split()
if i >= 3:
totaltx.append(stat)
txdata = 0
for txtmp in totaltx:
txdata += float(txtmp[9])
return txdata
except:
return 0
def run(self):
if self.mode == "RX":
beforerx = self.rx()
time.sleep(self.time)
afterrx = self.rx()
return (afterrx - beforerx) * 8 / self.time
else:
beforetx = self.tx()
time.sleep(self.time)
aftertx = self.tx()
return (aftertx - beforetx) * 8 / self.time
args = recive_args()
test = NetRate(args["time"], args["mode"])
rate = test.run()
|
代码比较简单,不再分析哈,主要时间不多。
2. TCPdump
下面这个方法,是调用tcpdump工具,进行速率统计,tcpdump是比较常用的网络分析工具。 优点:可以按照端口进行统计 缺点:速率太高时,由于tcpdump打印过快,碍于python执行效率问题(更大可能是博主水平问题哈),速率统计可能不太准确
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | #!/usr/bin/env python
#*-*coding=utf-8*-*
import re,sys
import time
import os
import subprocess
import datetime
import signal
import multiprocessing
import threading
import Queue
from optparse import OptionParser
def recive_args():
parser = OptionParser()
parser.add_option("-i", "--ip", dest="targetip",
help="ip")
parser.add_option("-t", "--time",dest="time",
help="time")
(options, args) = parser.parse_args()
test_args={}
test_args["ip"]= options.targetip
test_args["time"]=options.time
return test_args
class Getrate:
def get_rate_ip(self,IP):
GET_LENGTH = re.compile(r'length (\d+):')
total = 0
test=[]
tcpdump = subprocess.Popen('tcpdump -e -i p1p1 -nn host %s' % IP, shell=True, stdout=subprocess.PIPE)
for line in iter(tcpdump.stdout.readline, ''):
rt = GET_LENGTH.findall(line)
if len(rt):
total += int(rt[0])
while not self.result.empty():
test.append(self.result.get())
if "TIMEOUT" in test:
break
self.result.put((total*8))
return True
def getrate(self,IP, times):
initlog = subprocess.Popen("echo '' > /opt/tcp.log", shell=True)
self.result =Queue.Queue()
LOGFILE = open('/opt/tcp.log', 'a')
t1 = threading.Thread(target=self.get_rate_ip, name="thread1", args=(IP,))
t1.start()
time.sleep(int(times))
self.result.put("TIMEOUT")
t1.join()
result = self.result.get()
killdump = subprocess.Popen("killall -9 tcpdump", shell=True)
LOGFILE.write("%s:%s" %(IP, result/int(times)))
return result/int(times)
if __name__ == '__main__':
sysargs=recive_args()
test = Getrate()
result = test.getrate("192.168.100.103", sysargs["time"])
|
其他方法
比如借助sar,系统自带的log等
部分代码如下:
sar:
def _ftp_getrate_sar(self,expectspeed,mode,testtime,ratetime,clientip,
netdev,hostname,username, password,
port):
port = int(port)
testtime=int(testtime)
ratetime = int(ratetime)
client=paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname,port,username, password)
times = int(ratetime/3)
if mode == "RX":
cmd = "sar -n DEV 3 %s | grep Average | grep %s | awk -F ' ' '{print $5}' | tr -d '\n'" % (times,netdev)
elif mode == "TX":
cmd = "sar -n DEV 3 %s | grep Average | grep %s | awk -F ' ' '{print $6}' | tr -d '\n'" % (times,netdev)
else:
cmd = "sar -n DEV 3 %s | grep Average | grep %s | awk -F ' ' '{print $6 + $5}' | tr -d '\n'" % (times,netdev)
starttime = time.time()
time.sleep(ratetime)
while time.time() - starttime < (testtime-ratetime):
stdin,stdout,stderr=client.exec_command(cmd)
testrate = float(stdout.read())
rate = testrate * 8 * 1024
print "test rate is %s" %rate
if rate < expectspeed:
self.Status.put("LOW")
self.Status_rate.put("LOW")
print "test rate is below than expectspeed"
return False
return True
后续会继续总结分享一些工作中用到的测试方法,敬请期待。