pysnmp icon indicating copy to clipboard operation
pysnmp copied to clipboard

SNMPv3 with security level is authentication dosen't work

Open joyran opened this issue 7 years ago • 2 comments

Hi etingof

It works fine when my switch uses SNMP v1 v2c, and it works fine when my switch uses SNMPv3 and Security level is not Authentication and privacy. It can‘t work when security level is Authentication and privacy. Can’t see any output information。

# -*- coding: utf-8 -*-

from pysnmp.carrier.asynsock.dispatch import AsyncoreDispatcher
from pysnmp.carrier.asynsock.dgram import udp
from pysnmp.carrier.asynsock.dgram import udp6
from pysnmp.proto import api
from pyasn1.codec.ber import decoder
from pysnmp.entity.rfc3413 import ntfrcv
from pysnmp.entity import engine
from pysnmp.entity import config
from pysnmp.proto.api import v2c


class PySnmpTrapd:
    def __init__(self):
        self.server_bind_ipv4_address = ""
        self.server_bind_ipv6_address = ""
        self.server_bind_udp_port = 162
        self.comunity = "public"
        self.usm_auth_protocol_dist = {"md5": config.usmHMACMD5AuthProtocol, "sha": config.usmHMACSHAAuthProtocol}
        self.usm_auth_protocol = self.usm_auth_protocol_dist['md5']
        self.usm_priv_protocol = config.usmDESPrivProtocol
        self.usm_user_name = "huayajun"
        self.usm_auth_passwd = "12345678"
        self.usm_priv_passwd = "12345678"
        self.engine_id = '\x80\x00\x6a\x92\x03\xE0\xB9\x4D\xB1\xD0\x1F'
        self.engine_id = v2c.OctetString(hexValue='80006a9203feee3234aab6')
        self.snmp_engine = engine.SnmpEngine()
        self.transport_dispatcher = AsyncoreDispatcher()
        self.transport_dispatcher.registerRecvCbFun(self._cb_fun)

    def config_listen_ipv4_address(self):
        self.transport_dispatcher.registerTransport(
            udp.domainName,
            udp.UdpSocketTransport().openServerMode((self.server_bind_ipv4_address, self.server_bind_udp_port))
        )

    def config_listen_ipv6_address(self):
        self.transport_dispatcher.registerTransport(
            udp6.domainName, udp6.Udp6SocketTransport().openServerMode((self.server_bind_ipv6_address, self.server_bind_udp_port))
        )

    def trapd_server_v1_or_v2c_start(self):
        self.transport_dispatcher.jobStarted(1)

        try:
            self.transport_dispatcher.runDispatcher()
        except Exception, e:
            self.transport_dispatcher.closeDispatcher()
            raise

    def _cb_fun(self, transport_dispatcher, transport_domain, transport_address, whole_msg):
        while whole_msg:
            msg_ver = int(api.decodeMessageVersion(whole_msg))
            if msg_ver in api.protoModules:
                p_mod = api.protoModules[msg_ver]
            else:
                print u"不支持的SNMP版本:%s" % msg_ver
                return
            req_msg, whole_msg = decoder.decode(
                whole_msg, asn1Spec = p_mod.Message(),
            )
            print u"有告警消息来自%s:%s" % (transport_domain, transport_address)
            req_pdu = p_mod.apiMessage.getPDU(req_msg)
            if req_pdu.isSameTypeWith(p_mod.TrapPDU()):
                if msg_ver == api.protoVersion1:
                    print u"Enterprise:%s" % p_mod.apiTrapPDU.getEnterprise(req_pdu).prettyPrint()
                    print u"Agent Address:%s" % p_mod.apiTrapPDU.getAgentAddr(req_pdu).prettyPrint()
                    print u"Generic Trap:%s" % p_mod.apiTrapPDU.getGenericTrap(req_pdu).prettyPrint()
                    print u"Uptime:%s" % p_mod.apiTrapPDU.getTimeStamp(req_pdu).prettyPrint()
                    var_binds = p_mod.apiTrapPDU.getVarBindList(req_pdu)
                else:
                    var_binds = p_mod.apiPDU.getVarBindList(req_pdu)
                print "Var-binds:"
                for oid, val in var_binds:
                    print oid
                    print val
                    a = oid.prettyPrint().strip()
                    b = val.prettyPrint().strip().split('\n')
                    print a
                    for line in b:
                        item = line.strip()
                        if item.startswith('string-value'):
                            print 'string-value='+item.replace('string-value=0x','').decode('hex')
                        else:
                            print item
            return whole_msg

    def config_v3_ipv4_pram(self):
        print "config SNMPv3 ipv4 pram now...."
        config.addTransport(
            self.snmp_engine,
            udp.domainName,
            udp.UdpTransport().openServerMode((self.server_bind_ipv4_address, self.server_bind_udp_port))
        )

    def config_v3_ipv6_pram(self):
        print "config SNMPv3 ipv6 pram now....."
        config.addTransport(
            self.snmp_engine,
            udp6.domainName,
            udp6.Udp6Transport().openServerMode((self.server_bind_ipv6_address, self.server_bind_udp_port))
        )

    def config_v3_user(self):
        print "config SNMPv3 usm user pram now ....."
        config.addV3User(
            self.snmp_engine,
            self.usm_user_name,
            self.usm_auth_protocol,
            self.usm_auth_passwd,
            self.usm_priv_protocol,
            self.usm_priv_passwd
        )
        config.addV3User(
            self.snmp_engine,
            self.usm_user_name,
            self.usm_auth_protocol,
            self.usm_auth_passwd,
            self.usm_priv_protocol,
            self.usm_priv_passwd,
            self.engine_id
        )
        print self.engine_id

    def v3_cb_fun(self, snmp_engine, state_reference, context_engine_id, context_name, var_binds, cb_ctx):
        f = open("./log.log", 'w+')
        transport_domain, transport_address = self.snmp_engine.msgAndPduDsp.getTransportInfo(state_reference)
        print transport_address
        print transport_domain
        print u'Notification from %s, SNMP Engine \"%s\", Context\"%s\"' % (
            transport_address, context_engine_id, context_name
        )
        for name, val in var_binds:
            print '%s = %s' % (name.prettyPrint(), val.prettyPrint())

    def trapd_v3_server_start(self):
        ntfrcv.NotificationReceiver(self.snmp_engine, self.v3_cb_fun)
        self.snmp_engine.transportDispatcher.jobStarted(1)
        try:
            self.snmp_engine.transportDispatcher.runDispatcher()
        except:
            self.snmp_engine.transportDispatcher.closeDispatcher()
            raise




def test():
    snmp_trapd = PySnmpTrapd()
    snmp_trapd.server_bind_ipv4_address = "182.16.218.2"
    snmp_trapd.server_bind_ipv6_address = "2111:5c:218:0:88c9:d215:2b88:7180"
    snmp_trapd.config_v3_ipv4_pram()
    snmp_trapd.config_v3_ipv6_pram()
    snmp_trapd.config_v3_user()
    snmp_trapd.trapd_v3_server_start()
    # snmp_trapd.config_listen_ipv4_address()
    # snmp_trapd.config_listen_ipv6_address()
    # snmp_trapd.trapd_server_start()
if __name__ == "__main__":
    test()

joyran avatar Mar 05 '18 01:03 joyran

Does your switch use 80006a9203feee3234aab6 as its SNMP engine ID?

Let's enable pysnmp debugging and watch what is hapenning when an authenticated TRAP message comes in?

from pysnmp import debug

debug.setLogger(debug.Debug('all'))

etingof avatar Mar 12 '18 15:03 etingof

The legacy releases have several issues in v3 discovery and TRAP support. You will have to test against recent releases instead.

lextm avatar Aug 26 '24 08:08 lextm