#!/usr/bin/python # # radical.py - Radius udp packet ripper! Decodes radius packets so we can see # what the hell is going on inside them! # # 19981223 Chris Miles # # $Id: radical.py,v 1.2 2001/11/07 04:25:56 chris Exp $ # $Source: /import/cvsroot/radical/radical.py,v $ # # Notes: RFC 2138 import sys,os,re,string VERSION = "1.15" USAGE = "usage: radical [-h] [-d ] [-u ] []" # Radius attribute format types FORMAT_NONE = '' FORMAT_STRING = 'string' FORMAT_ADDRESS = 'ipaddr' FORMAT_INTEGER = 'integer' FORMAT_TIME = 'date' RADIUS_CODE = { 1 : "Access-Request", 2 : "Access-Accept", 3 : "Access-Reject", 4 : "Accounting-Request", 5 : "Accounting-Response", 11 : "Access-Challenge", 12 : "Status-Server (experimental)", 13 : "Status-Client (experimental)", 255 : "Reserved" } RADIUS_ATTRIBUTE = { 1 : "User-Name", 2 : "User-Password", 3 : "CHAP-Password", 4 : "NAS-IP-Address", 5 : "NAS-Port", 6 : "Service-Type", 7 : "Framed-Protocol", 8 : "Framed-IP-Address", 9 : "Framed-IP-Netmask", 10 : "Framed-Routing", 11 : "Filter-Id", 12 : "Framed-MTU", 13 : "Framed-Compression", 14 : "Login-IP-Host", 15 : "Login-Service", 16 : "Login-TCP-Port", 17 : "(unassigned)", 18 : "Reply-Message", 19 : "Callback-Number", 20 : "Callback-Id", 21 : "(unassigned)", 22 : "Framed-Route", 23 : "Framed-IPX-Network", 24 : "State", 25 : "Class", 26 : "Vendor-Specific", 27 : "Session-Timeout", 28 : "Idle-Timeout", 29 : "Termination-Action", 30 : "Called-Station-Id", 31 : "Calling-Station-Id", 32 : "NAS-Identifier", 33 : "Proxy-State", 34 : "Login-NAT-Service", 35 : "Login-NAT-Node", 36 : "Login-NAT-Group", 37 : "Framed-AppleTalk-Link", 38 : "Framed-AppleTalk-Network", 39 : "Framed-AppleTalk-Zone", 40 : "(reserved for accounting)", 41 : "(reserved for accounting)", 42 : "(reserved for accounting)", 43 : "(reserved for accounting)", 44 : "(reserved for accounting)", 45 : "(reserved for accounting)", 46 : "(reserved for accounting)", 47 : "(reserved for accounting)", 48 : "(reserved for accounting)", 49 : "(reserved for accounting)", 50 : "(reserved for accounting)", 51 : "(reserved for accounting)", 52 : "(reserved for accounting)", 53 : "(reserved for accounting)", 54 : "(reserved for accounting)", 55 : "(reserved for accounting)", 56 : "(reserved for accounting)", 57 : "(reserved for accounting)", 58 : "(reserved for accounting)", 59 : "(reserved for accounting)", 60 : "CHAP-Challenge", 61 : "NAS-Port-Type", 62 : "Port-Limit", 63 : "Login-NAT-Port" } RADIUS_ATTRIBUTE_FORMAT = { 1 : FORMAT_STRING, 2 : FORMAT_STRING, 3 : FORMAT_STRING, 4 : FORMAT_ADDRESS, 5 : FORMAT_INTEGER, 6 : FORMAT_INTEGER, 7 : FORMAT_INTEGER, 8 : FORMAT_ADDRESS, 9 : FORMAT_ADDRESS, 10 : FORMAT_INTEGER, 11 : FORMAT_STRING, 12 : FORMAT_INTEGER, 13 : FORMAT_INTEGER, 14 : FORMAT_ADDRESS, 15 : FORMAT_INTEGER, 16 : FORMAT_INTEGER, 17 : FORMAT_NONE, 18 : FORMAT_STRING, 19 : FORMAT_STRING, 20 : FORMAT_STRING, 21 : FORMAT_NONE, 22 : FORMAT_STRING, 23 : FORMAT_INTEGER, 24 : FORMAT_STRING, 25 : FORMAT_STRING, 26 : FORMAT_STRING, 27 : FORMAT_INTEGER, 28 : FORMAT_INTEGER, 29 : FORMAT_STRING, 30 : FORMAT_INTEGER, 31 : FORMAT_STRING, 32 : FORMAT_STRING, 33 : FORMAT_STRING, 34 : FORMAT_STRING, 35 : FORMAT_STRING, 36 : FORMAT_STRING, 37 : FORMAT_INTEGER, 38 : FORMAT_INTEGER, 39 : FORMAT_STRING, 40 : FORMAT_NONE, 41 : FORMAT_NONE, 42 : FORMAT_NONE, 43 : FORMAT_NONE, 44 : FORMAT_NONE, 45 : FORMAT_NONE, 46 : FORMAT_NONE, 47 : FORMAT_NONE, 48 : FORMAT_NONE, 49 : FORMAT_NONE, 50 : FORMAT_NONE, 51 : FORMAT_NONE, 52 : FORMAT_NONE, 53 : FORMAT_NONE, 54 : FORMAT_NONE, 55 : FORMAT_NONE, 56 : FORMAT_NONE, 57 : FORMAT_NONE, 58 : FORMAT_NONE, 59 : FORMAT_NONE, 60 : FORMAT_STRING, 61 : FORMAT_INTEGER, 62 : FORMAT_INTEGER, 63 : FORMAT_STRING } class attribute: def __init__( self, radius_attribute, radius_attribute_format, type, length, value ): self.radius_attribute = radius_attribute self.radius_attribute_format = radius_attribute_format self.type = type self.length = length self.value = value def __str__( self ): str = "type=%d " % (self.type) try: str = str + "(%s) " % (self.radius_attribute[self.type]) except KeyError: str = str + "(--) " str = str + "length=%d value=" % (self.length) try: if self.radius_attribute_format[self.type] == FORMAT_STRING: str = str + "'%s'" % (cleanstr(self.value)) elif self.radius_attribute_format[self.type] == FORMAT_INTEGER: try: str = str + "%ld" % (self.value) except OverflowError: str = str + "" % (self.value) except TypeError: pass elif self.radius_attribute_format[self.type] == FORMAT_ADDRESS: str = str + "%s" % (self.value) else: str = str + "%s" % (self.value) except KeyError: str = str + "%s" % (self.value) return str class radpkt: def __init__( self, radius_attribute, radius_attribute_format, src_host, dest_host, src_port, dest_port, len ): self.radius_attribute = radius_attribute self.radius_attribute_format = radius_attribute_format self.src_host = src_host self.dest_host = dest_host self.src_port = src_port self.dest_port = dest_port self.len = len def __str__( self ): str = " Source Host: %-30s Port: %s\n" % (self.src_host, self.src_port) str = str + " Destination Host: %-30s Port: %s\n" % (self.dest_host, self.dest_port) str = str + " Length : %s\n" % (self.len) try: str = str + " Code : %d %s\n" % (self.code,RADIUS_CODE[self.code]) except KeyError: str = str + " Code : %d \n" % (self.code) str = str + " Identifier : %d\n" % (self.identifier) str = str + " Length : %d\n" % (self.length) str = str + " Authenticator : %s\n" % (self.authenticator) str = str + " Attributes: \n" for a in self.attributes: str = str + " %s\n" % (a) return str def pktparse( self, packate ): """Decode packate data into Radius fields & attributes.""" # Code self.code = int(packate[42]) ##print "code:",self.code,RADIUS_CODE[self.code] # Identifier self.identifier = hex2int(packate[43]) ##print "identifier:",self.identifier # Length self.length = hex2int(packate[44]) * 256 + hex2int(packate[45]) ##print "length:",self.length # Authenticator self.authenticator = packate[46:62] ##print "authenticator:",self.authenticator # Pull out attributes self.attributes = [] # list of attribute objects i = 62 while i < len(packate): type = hex2int( packate[i] ) try: length = hex2int( packate[i+1] ) if length == 0: return try: format = self.radius_attribute_format[type] except: value = packate[i+2:i+length] else: if format == FORMAT_STRING: rawstr = packate[i+2:i+length] value = '' for a in rawstr: value = value + chr(hex2int(a)) elif format == FORMAT_INTEGER: rawstr = packate[i+2:i+length] value = 0L for a in rawstr: value = value * 256L value = value + string.atol( '0x'+a, 0 ) elif format == FORMAT_ADDRESS: rawstr = packate[i+2:i+length] try: value = "%d.%d.%d.%d" % (hex2int(rawstr[0]),hex2int(rawstr[1]),hex2int(rawstr[2]),hex2int(rawstr[3])) except IndexError: value = "
" else: value = packate[i+2:i+length] except IndexError: value = None attr = attribute( self.radius_attribute, self.radius_attribute_format, type, length, value ) ##DEBUG ##print "attr:",attr self.attributes.append( attr ) i = i + length ##DEBUG ##print "attributes:" ##for a in self.attributes: ## print " %s" % (a) ##print ##print def getAttribute(self, num): """Return attribute value for attribute type= or None if not valid. """ for a in self.attributes: if a.type == num: # found attribute return a.value return None # did not find attribute def getIdentifier(self): """Return radius packet Identifier.""" return self.identifier def getCode(self): """Return radius packet Code.""" return self.code class radsnoop: fileptr = None def __init__( self, radius_attribute, radius_attribute_format, fileptr ): self.radius_attribute = radius_attribute self.radius_attribute_format = radius_attribute_format self.fileptr = fileptr def rip( self ): pktstart_re = "(\S+) -> (\S+) UDP D=(\d+) S=(\d+) LEN=(\d+)" sre = re.compile( pktstart_re ) # find start of next packet line = self.fileptr.readline() inx = None while len(line) != 0: inx = sre.search( line ) if inx != None: break line = self.fileptr.readline() if inx == None: # reached EOF return None ##DEBUG ##print "found start of packet: %s" % (line) newrad = radpkt( self.radius_attribute, self.radius_attribute_format, inx.group(1), inx.group(2), inx.group(4), inx.group(3), inx.group(5) ) ##DEBUG ##print "newrad:" ##print " src_host =",newrad.src_host ##print " dest_host =",newrad.dest_host ##print " src_port =",newrad.src_port ##print " dest_port =",newrad.dest_port ##print " len =",newrad.len #packet_re = "\s*\d+: (?:([0-9a-f][0-9a-f])([0-9a-f][0-9a-f])?\s+){1,8}.*" packet_re = "\s*\d+: ([0-9a-f][0-9a-f])([0-9a-f][0-9a-f])? ([0-9a-f][0-9a-f])?([0-9a-f][0-9a-f])? ([0-9a-f][0-9a-f])?([0-9a-f][0-9a-f])? ([0-9a-f][0-9a-f])?([0-9a-f][0-9a-f])? ([0-9a-f][0-9a-f])?([0-9a-f][0-9a-f])? ([0-9a-f][0-9a-f])?([0-9a-f][0-9a-f])? ([0-9a-f][0-9a-f])?([0-9a-f][0-9a-f])? ([0-9a-f][0-9a-f])?([0-9a-f][0-9a-f])? .*" sre = re.compile( packet_re ) # skip blank line line = self.fileptr.readline() # now read in packet raw_pkt = [] # get each line of current packet until a blank line line = self.fileptr.readline() inx = None while len(line) > 2: inx = sre.search( line ) if inx == None: break ##DEBUG ##print "line:",line, ##print " inx:",inx.groups() bytes = inx.groups() for b in bytes: if b != None: raw_pkt.append( b ) line = self.fileptr.readline() ##DEBUG ##print "raw_pkt len=",len(raw_pkt) ##print "raw_pkt:",raw_pkt newrad.pktparse( raw_pkt ) return newrad def hex2int( hexval ): return string.atoi( '0x' + hexval, 0 ) # Read in a radius dictionary (to replace RADIUS_ATTRIBUTE and # RADIUS_ATTRIBUTE_FORMAT) def read_dictionary( filename ): df = open( filename, 'r' ) attribute_re = "\s*ATTRIBUTE\s+([a-zA-z0-9_-]+)\s+(\d+)\s+([a-z]+).*" sre = re.compile( attribute_re ) radius_attribute = {} radius_attribute_format = {} line = df.readline() while len(line) != 0: if line[0] != '\n' and line[0] != '#': inx = sre.search( line ) if inx != None: radius_attribute[int(inx.group(2))] = inx.group(1) radius_attribute_format[int(inx.group(2))] = inx.group(3) line = df.readline() df.close() return (radius_attribute,radius_attribute_format) # sort by numerical value def sortnum(a,b): if int(a) > int(b): return 1 if int(a) < int(b): return -1 return 0 def cleanstr(dirty): """Returns a 'clean' ascii string by converting any non-printable characters in dirty to '<0x00>' form. """ clean = "" for c in dirty: if ord(c) < 32 or ord(c) > 126: c = '<0x%02x>' % (ord(c)) clean = clean + c return clean def main(): """Main starts here.""" inputstdin = 0 # default: not reading from stdin (require a filename) user_filter = None # default: no user filter # default radius attribute tables - can be overridden radius_attribute = RADIUS_ATTRIBUTE radius_attribute_format = RADIUS_ATTRIBUTE_FORMAT # parse arguments for optional switches while len(sys.argv) > 1: if sys.argv[1][0] != '-': # abort loop if next argument not a switch break if sys.argv[1] == '-d': (radius_attribute,radius_attribute_format) = read_dictionary(sys.argv[2]) del(sys.argv[1]) del(sys.argv[1]) ##DEBUG - show dictionary ##ra = radius_attribute.keys() ##ra.sort(sortnum) ##for a in ra: ## print "ATTRIBUTE %-35s %-3s %s" % (radius_attribute[a],a,radius_attribute_format[a]) continue if sys.argv[1] == '-h': # help print "radical v%s" % (VERSION) print " - Radius packet decoder - Chris Miles 19981223" print print USAGE print " * can be obtained with a command like:" print " '# snoop -x host udp port 1645 > snoop.out'" print " * If no file is specified, input is read from STDIN." print " * -u forces output to be filtered to username (and subsequent reply" print " packets) only." print del(sys.argv[1]) sys.exit(-1) # abort anyway if sys.argv[1] == '-u': # specify a user to filter by if len(sys.argv) < 3: print USAGE sys.exit(-1) user_filter = sys.argv[2] del(sys.argv[1]) del(sys.argv[1]) continue if len(sys.argv) == 1: # no args - read from stdin inputstdin = 1 # open file containing snoop output - should be produced with: # eleanba# snoop -x host eleanba udp port 1645 # if no filename specified, use stdin if inputstdin == 0: snoopfile = sys.argv[1] inf = open( snoopfile, 'r' ) else: inf = sys.stdin # create instance of radsnoop object rad = radsnoop(radius_attribute, radius_attribute_format, inf) idlist = [] # track packet ids if user filtering on try: pktnum = 0 radpkt = rad.rip() while radpkt != None: if user_filter != None: id = radpkt.getIdentifier() if radpkt.getAttribute(1) != user_filter and id not in idlist: # skip packet if filter active and doesn't match and we are # not waiting for a response packet for previous id radpkt = rad.rip() continue else: if radpkt.getCode() not in [1,4]: # this is a response so remove id from list if needed if id in idlist: idlist.remove(id) elif id not in idlist: # save packet Identifier idlist.append(radpkt.getIdentifier()) # show contents of radpkt... pktnum = pktnum + 1 print "Radius packet %d" % pktnum print radpkt radpkt = rad.rip() except IOError: excnum = sys.exc_value[0] excstr = sys.exc_value[1] if excnum == 32 and excstr == 'Broken pipe': pass # pipe gone - just exit quietly else: # display exception and abort sys.stderr.write( "radical: exception %d: %s\n" % (excnum,excstr) ) sys.exit(1) except KeyboardInterrupt: # exit gracefully for ^C pass rad.fileptr.close() if __name__ == "__main__": main() ### End radical.py