#2012-03-02 磁针石 #承接软件自动化实施与培训 验证码破解 软件破解 脚本开发 测试和python培训等 #gtalk: ouyangchongwu#gmail.com qq 37391319 博客:testing.blog. #版权所有,转载刊登请来函联系 #自动化测试和python群组: http://groups.google.com/group/automation_testing_python #python qq group: 深圳自动化测试python群:113938272 #武冈深圳qq群:66250781 都梁深圳湖南户外群:49494279 #参考资料下面我们对client.py进行修改,去掉其中的函数部分,并为Session增加一些方法。以实现更加方便的访问: 初始化时,增加3个字典: self.tid2OidDict = pickle.load(open(r"/usr/local/mib/tid2OidDict.txt")) self.tidAliasDict = pickle.load(open(r"/usr/local/mib/tidAliasDict.txt")) self.tidTypeDict = pickle.load(open(r"/usr/local/mib/tidTypeDict.txt")) 增加列表转换方法: def convert_args_to_list(self, args): """ args is a list of tuple, for example: [ ('ntpClientEnabled', '0', 'false'), ('cmEthernetNetPortOamEnabled', '1.1.1.1', 'false'), ('ecpaControlDuration', '1.1.1.1', '25') ] """ var_list = VarList() index = '' snmpValue = snmpType = None for arg in args: tid = arg[0] snmpOid = self.get_oid_from_tid(tid) length = len(arg)
if length >1: index = arg[1]
#arg with tid index value or more if len(arg) >2: value = arg[2] if tid in self.tidAliasDict: snmpValue = self.convert_alias_to_value(tid,value) else: snmpValue = value snmpType = self.get_type_from_tid(tid)
if len(arg) >3: snmpType = arg[3]
var = Varbind(snmpOid,index,snmpValue,snmpType) var_list.append(var) return var_list 增加把别名转换为实际值的方法: def convert_alias_to_value(self, tid,value): if tid in self.tidAliasDict: snmpValue = self.tidAliasDict[tid][value] else: snmpValue = value return snmpValue 增加根据tid取得类型的方法: def get_type_from_tid(self, tid): return self.tidTypeDict[tid] 增加根据tid获取oid的方法: def get_oid_from_tid(self, tid): return self.tid2OidDict[tid] 其他函数也有少许修改。不一一列出:
这样,上次的创建和删除保护组就可以简化成: import netsnmp import ecpa
session = netsnmp.Session(Version=2,DestHost='172.23.192.44',Community='private')
resultList = session.set([ ('cmFacProtGroupSwitchMode', '1.1.1.1','oneplusone'), ('cmFacProtGroupWorkPort', '1.1.1.1',session.get_oid_from_tid('cmEthernetNetPortIndex') + '.1.1.1.1'), ('cmFacProtGroupProtPort', '1.1.1.1',session.get_oid_from_tid('cmEthernetNetPortIndex') + '.1.1.1.2'), ('cmFacProtGroupRowStatus', '1.1.1.1','createAndGo') ])
print resultList
resultList = session.set([ ('cmFacProtGroupRowStatus', '1.1.1.1','destroy') ])
print resultList
我们可以对具体的业务进行测试,定义如下的ecpa.py """ This file description ECPA operation in SNMP """ #!/usr/bin/python # -*- coding: utf-8 -*- # Function: Ssh to remote server # Author: Andrew Xu # CreateDate: 2012/03/01 # $Id: $ # __version__ = $Revision: $
import netsnmp
class ecpaSnmp(object): def __init__(self, snmpObj): self.client = snmpObj
def set_ecpa_stream(self, index, **args): keyDict = { 'index': 'ecpaConfigStreamIndex', 'name': 'ecpaConfigStreamName', 'size': 'ecpaConfigStreamFrameSize', 'rate': 'ecpaConfigStreamRate', 'payloadType': 'ecpaConfigStreamPayloadType', 'destinationMAC': 'ecpaConfigStreamDestinationMAC', 'sourceMAC': 'ecpaConfigStreamSourceMAC', 'outerVlanEnabled': 'ecpaConfigStreamOuterVlanEnabled', 'outerVlanId': 'ecpaConfigStreamOuterVlanId', 'outerVlanPrio': 'ecpaConfigStreamOuterVlanPrio', 'outerVlanEtherType': 'ecpaConfigStreamOuterVlanEtherType', 'innerVlanEnabled': 'ecpaConfigStreamInnerVlanEnabled', 'innerVlanId': 'ecpaConfigStreamInnerVlanId', 'innerVlanPrio': 'ecpaConfigStreamInnerVlanPrio', 'innerVlanEtherType': 'ecpaConfigStreamInnerVlanEtherType', 'ipVersion': 'ecpaConfigStreamIpVersion', 'ipV4Address': 'ecpaConfigStreamIpV4Address', 'ipV6Address': 'ecpaConfigStreamIpV6Address', 'prioMapMode': 'ecpaConfigStreamPrioMapMode', 'prioVal': 'ecpaConfigStreamPrioVal', 'innerVlan2Enabled': 'ecpaConfigStreamInnerVlan2Enabled', 'innerVlan2Id': 'ecpaConfigStreamInnerVlan2Id', 'innerVlan2Prio': 'ecpaConfigStreamInnerVlan2Prio', 'innerVlan2EtherType': 'ecpaConfigStreamInnerVlan2EtherType', 'destIpV4Address': 'ecpaConfigStreamDestIpV4Address', 'destIpV6Address': 'ecpaConfigStreamDestIpV6Address', 'usePortSourceMAC': 'ecpaConfigStreamUsePortSourceMAC' }
setList = [] for arg in args: snmpName = keyDict[arg] snmpValue = args[arg]
print snmpName, index, snmpValue setList.append((snmpName, index, snmpValue)) return self.client.set(setList) 就可以通过如下方式调用刚才的库: #!/usr/bin/python # -*- coding: utf-8 -*- # Function: Ssh to remote server # Author: Andrew Xu # CreateDate: 2012/02/28 import netsnmp import ecpa
session = netsnmp.Session(Version=2,DestHost='172.23.192.44',Community='private') #print session.tidAliasDict ecpaExample = ecpa.ecpaSnmp(session) resultList = ecpaExample.set_ecpa_stream(1,rate='96000',name='test',size='98')
print resultList 现在还存在的问题有:walk只返回值,没有oid,这些问题留到下周解决。walk的示例如下: resultList = session.walk([ ('cmEthernetAccPortEntry'), ]) 返回的错误还没有捕捉。添加如下函数: def get_snmp_error(self): var_list = self.convert_args_to_list([('lastSetErrorInformation','0')]) res = client_intf.get(self, var_list) return res 并对返回进行控制: if res != 1: return self.get_snmp_error() else: return res 现在执行出错时就会有错误报出: # ./test.py ('Error: 257 - Entity already exists. (PROT GROUP-1-1-1-1)',) |
|
来自: 昵称13184394 > 《学习资料》