pysnmp
pysnmp copied to clipboard
SNMPv3 with security level is authentication dosen't work
Hi etingof
It works fine when my switch uses SNMP v1 v2c, and it works fine when my switch uses SNMPv3 and Security level is not Authentication and privacy. It can‘t work when security level is Authentication and privacy. Can’t see any output information。
# -*- coding: utf-8 -*-
from pysnmp.carrier.asynsock.dispatch import AsyncoreDispatcher
from pysnmp.carrier.asynsock.dgram import udp
from pysnmp.carrier.asynsock.dgram import udp6
from pysnmp.proto import api
from pyasn1.codec.ber import decoder
from pysnmp.entity.rfc3413 import ntfrcv
from pysnmp.entity import engine
from pysnmp.entity import config
from pysnmp.proto.api import v2c
class PySnmpTrapd:
def __init__(self):
self.server_bind_ipv4_address = ""
self.server_bind_ipv6_address = ""
self.server_bind_udp_port = 162
self.comunity = "public"
self.usm_auth_protocol_dist = {"md5": config.usmHMACMD5AuthProtocol, "sha": config.usmHMACSHAAuthProtocol}
self.usm_auth_protocol = self.usm_auth_protocol_dist['md5']
self.usm_priv_protocol = config.usmDESPrivProtocol
self.usm_user_name = "huayajun"
self.usm_auth_passwd = "12345678"
self.usm_priv_passwd = "12345678"
self.engine_id = '\x80\x00\x6a\x92\x03\xE0\xB9\x4D\xB1\xD0\x1F'
self.engine_id = v2c.OctetString(hexValue='80006a9203feee3234aab6')
self.snmp_engine = engine.SnmpEngine()
self.transport_dispatcher = AsyncoreDispatcher()
self.transport_dispatcher.registerRecvCbFun(self._cb_fun)
def config_listen_ipv4_address(self):
self.transport_dispatcher.registerTransport(
udp.domainName,
udp.UdpSocketTransport().openServerMode((self.server_bind_ipv4_address, self.server_bind_udp_port))
)
def config_listen_ipv6_address(self):
self.transport_dispatcher.registerTransport(
udp6.domainName, udp6.Udp6SocketTransport().openServerMode((self.server_bind_ipv6_address, self.server_bind_udp_port))
)
def trapd_server_v1_or_v2c_start(self):
self.transport_dispatcher.jobStarted(1)
try:
self.transport_dispatcher.runDispatcher()
except Exception, e:
self.transport_dispatcher.closeDispatcher()
raise
def _cb_fun(self, transport_dispatcher, transport_domain, transport_address, whole_msg):
while whole_msg:
msg_ver = int(api.decodeMessageVersion(whole_msg))
if msg_ver in api.protoModules:
p_mod = api.protoModules[msg_ver]
else:
print u"不支持的SNMP版本:%s" % msg_ver
return
req_msg, whole_msg = decoder.decode(
whole_msg, asn1Spec = p_mod.Message(),
)
print u"有告警消息来自%s:%s" % (transport_domain, transport_address)
req_pdu = p_mod.apiMessage.getPDU(req_msg)
if req_pdu.isSameTypeWith(p_mod.TrapPDU()):
if msg_ver == api.protoVersion1:
print u"Enterprise:%s" % p_mod.apiTrapPDU.getEnterprise(req_pdu).prettyPrint()
print u"Agent Address:%s" % p_mod.apiTrapPDU.getAgentAddr(req_pdu).prettyPrint()
print u"Generic Trap:%s" % p_mod.apiTrapPDU.getGenericTrap(req_pdu).prettyPrint()
print u"Uptime:%s" % p_mod.apiTrapPDU.getTimeStamp(req_pdu).prettyPrint()
var_binds = p_mod.apiTrapPDU.getVarBindList(req_pdu)
else:
var_binds = p_mod.apiPDU.getVarBindList(req_pdu)
print "Var-binds:"
for oid, val in var_binds:
print oid
print val
a = oid.prettyPrint().strip()
b = val.prettyPrint().strip().split('\n')
print a
for line in b:
item = line.strip()
if item.startswith('string-value'):
print 'string-value='+item.replace('string-value=0x','').decode('hex')
else:
print item
return whole_msg
def config_v3_ipv4_pram(self):
print "config SNMPv3 ipv4 pram now...."
config.addTransport(
self.snmp_engine,
udp.domainName,
udp.UdpTransport().openServerMode((self.server_bind_ipv4_address, self.server_bind_udp_port))
)
def config_v3_ipv6_pram(self):
print "config SNMPv3 ipv6 pram now....."
config.addTransport(
self.snmp_engine,
udp6.domainName,
udp6.Udp6Transport().openServerMode((self.server_bind_ipv6_address, self.server_bind_udp_port))
)
def config_v3_user(self):
print "config SNMPv3 usm user pram now ....."
config.addV3User(
self.snmp_engine,
self.usm_user_name,
self.usm_auth_protocol,
self.usm_auth_passwd,
self.usm_priv_protocol,
self.usm_priv_passwd
)
config.addV3User(
self.snmp_engine,
self.usm_user_name,
self.usm_auth_protocol,
self.usm_auth_passwd,
self.usm_priv_protocol,
self.usm_priv_passwd,
self.engine_id
)
print self.engine_id
def v3_cb_fun(self, snmp_engine, state_reference, context_engine_id, context_name, var_binds, cb_ctx):
f = open("./log.log", 'w+')
transport_domain, transport_address = self.snmp_engine.msgAndPduDsp.getTransportInfo(state_reference)
print transport_address
print transport_domain
print u'Notification from %s, SNMP Engine \"%s\", Context\"%s\"' % (
transport_address, context_engine_id, context_name
)
for name, val in var_binds:
print '%s = %s' % (name.prettyPrint(), val.prettyPrint())
def trapd_v3_server_start(self):
ntfrcv.NotificationReceiver(self.snmp_engine, self.v3_cb_fun)
self.snmp_engine.transportDispatcher.jobStarted(1)
try:
self.snmp_engine.transportDispatcher.runDispatcher()
except:
self.snmp_engine.transportDispatcher.closeDispatcher()
raise
def test():
snmp_trapd = PySnmpTrapd()
snmp_trapd.server_bind_ipv4_address = "182.16.218.2"
snmp_trapd.server_bind_ipv6_address = "2111:5c:218:0:88c9:d215:2b88:7180"
snmp_trapd.config_v3_ipv4_pram()
snmp_trapd.config_v3_ipv6_pram()
snmp_trapd.config_v3_user()
snmp_trapd.trapd_v3_server_start()
# snmp_trapd.config_listen_ipv4_address()
# snmp_trapd.config_listen_ipv6_address()
# snmp_trapd.trapd_server_start()
if __name__ == "__main__":
test()
Does your switch use 80006a9203feee3234aab6 as its SNMP engine ID?
Let's enable pysnmp debugging and watch what is hapenning when an authenticated TRAP message comes in?
from pysnmp import debug
debug.setLogger(debug.Debug('all'))
The legacy releases have several issues in v3 discovery and TRAP support. You will have to test against recent releases instead.