summaryrefslogtreecommitdiff
path: root/src/tests/t_otp.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/tests/t_otp.py')
-rwxr-xr-xsrc/tests/t_otp.py244
1 files changed, 244 insertions, 0 deletions
diff --git a/src/tests/t_otp.py b/src/tests/t_otp.py
new file mode 100755
index 0000000000000..f098374f9e616
--- /dev/null
+++ b/src/tests/t_otp.py
@@ -0,0 +1,244 @@
+#!/usr/bin/python
+#
+# Author: Nathaniel McCallum <npmccallum@redhat.com>
+#
+# Copyright (c) 2013 Red Hat, Inc.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+
+
+#
+# This script tests OTP, both UDP and Unix Sockets, with a variety of
+# configuration. It requires pyrad to run, but exits gracefully if not found.
+# It also deliberately shuts down the test daemons between tests in order to
+# test how OTP handles the case of short daemon restarts.
+#
+
+from k5test import *
+from Queue import Empty
+import StringIO
+import struct
+
+try:
+ from pyrad import packet, dictionary
+except ImportError:
+ skip_rest('OTP tests', 'Python pyrad module not found')
+try:
+ from multiprocessing import Process, Queue
+except ImportError:
+ skip_rest('OTP tests', 'Python version 2.6 required')
+
+# We could use a dictionary file, but since we need so few attributes,
+# we'll just include them here.
+radius_attributes = '''
+ATTRIBUTE User-Name 1 string
+ATTRIBUTE User-Password 2 octets
+ATTRIBUTE NAS-Identifier 32 string
+'''
+
+class RadiusDaemon(Process):
+ MAX_PACKET_SIZE = 4096
+ DICTIONARY = dictionary.Dictionary(StringIO.StringIO(radius_attributes))
+
+ def listen(self, addr):
+ raise NotImplementedError()
+
+ def recvRequest(self, data):
+ raise NotImplementedError()
+
+ def run(self):
+ addr = self._args[0]
+ secr = self._args[1]
+ pswd = self._args[2]
+ outq = self._args[3]
+
+ if secr:
+ with open(secr) as file:
+ secr = file.read().strip()
+
+ data = self.listen(addr)
+ outq.put("started")
+ (buf, sock, addr) = self.recvRequest(data)
+ pkt = packet.AuthPacket(secret=secr,
+ dict=RadiusDaemon.DICTIONARY,
+ packet=buf)
+
+ usernm = []
+ passwd = []
+ for key in pkt.keys():
+ if key == 'User-Password':
+ passwd = map(pkt.PwDecrypt, pkt[key])
+ elif key == 'User-Name':
+ usernm = pkt[key]
+
+ reply = pkt.CreateReply()
+ replyq = {'user': usernm, 'pass': passwd}
+ if passwd == [pswd]:
+ reply.code = packet.AccessAccept
+ replyq['reply'] = True
+ else:
+ reply.code = packet.AccessReject
+ replyq['reply'] = False
+
+ outq.put(replyq)
+ if addr is None:
+ sock.send(reply.ReplyPacket())
+ else:
+ sock.sendto(reply.ReplyPacket(), addr)
+ sock.close()
+
+class UDPRadiusDaemon(RadiusDaemon):
+ def listen(self, addr):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ sock.bind((addr.split(':')[0], int(addr.split(':')[1])))
+ return sock
+
+ def recvRequest(self, sock):
+ (buf, addr) = sock.recvfrom(RadiusDaemon.MAX_PACKET_SIZE)
+ return (buf, sock, addr)
+
+class UnixRadiusDaemon(RadiusDaemon):
+ def listen(self, addr):
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ if os.path.exists(addr):
+ os.remove(addr)
+ sock.bind(addr)
+ sock.listen(1)
+ return (sock, addr)
+
+ def recvRequest(self, (sock, addr)):
+ conn = sock.accept()[0]
+ sock.close()
+ os.remove(addr)
+
+ buf = ""
+ remain = RadiusDaemon.MAX_PACKET_SIZE
+ while True:
+ buf += conn.recv(remain)
+ remain = RadiusDaemon.MAX_PACKET_SIZE - len(buf)
+ if (len(buf) >= 4):
+ remain = struct.unpack("!BBH", buf[0:4])[2] - len(buf)
+ if (remain <= 0):
+ return (buf, conn, None)
+
+def verify(daemon, queue, reply, usernm, passwd):
+ try:
+ data = queue.get(timeout=1)
+ except Empty:
+ sys.stderr.write("ERROR: Packet not received by daemon!\n")
+ daemon.terminate()
+ sys.exit(1)
+ assert data['reply'] is reply
+ assert data['user'] == [usernm]
+ assert data['pass'] == [passwd]
+ daemon.join()
+
+def otpconfig(toktype, username=None, indicators=None):
+ val = '[{"type": "%s"' % toktype
+ if username is not None:
+ val += ', "username": "%s"' % username
+ if indicators is not None:
+ qind = ['"%s"' % s for s in indicators]
+ jsonlist = '[' + ', '.join(qind) + ']'
+ val += ', "indicators":' + jsonlist
+ val += '}]'
+ return val
+
+prefix = "/tmp/%d" % os.getpid()
+secret_file = prefix + ".secret"
+socket_file = prefix + ".socket"
+with open(secret_file, "w") as file:
+ file.write("otptest")
+atexit.register(lambda: os.remove(secret_file))
+
+conf = {'plugins': {'kdcpreauth': {'enable_only': 'otp'}},
+ 'otp': {'udp': {'server': '127.0.0.1:$port9',
+ 'secret': secret_file,
+ 'strip_realm': 'true',
+ 'indicator': ['indotp1', 'indotp2']},
+ 'unix': {'server': socket_file,
+ 'strip_realm': 'false'}}}
+
+queue = Queue()
+
+realm = K5Realm(kdc_conf=conf)
+realm.run([kadminl, 'modprinc', '+requires_preauth', realm.user_princ])
+flags = ['-T', realm.ccache]
+server_addr = '127.0.0.1:' + str(realm.portbase + 9)
+
+## Test UDP fail / custom username
+daemon = UDPRadiusDaemon(args=(server_addr, secret_file, 'accept', queue))
+daemon.start()
+queue.get()
+realm.run([kadminl, 'setstr', realm.user_princ, 'otp',
+ otpconfig('udp', 'custom')])
+realm.kinit(realm.user_princ, 'reject', flags=flags, expected_code=1)
+verify(daemon, queue, False, 'custom', 'reject')
+
+## Test UDP success / standard username
+daemon = UDPRadiusDaemon(args=(server_addr, secret_file, 'accept', queue))
+daemon.start()
+queue.get()
+realm.run([kadminl, 'setstr', realm.user_princ, 'otp', otpconfig('udp')])
+realm.kinit(realm.user_princ, 'accept', flags=flags)
+verify(daemon, queue, True, realm.user_princ.split('@')[0], 'accept')
+realm.extract_keytab(realm.krbtgt_princ, realm.keytab)
+out = realm.run(['./adata', realm.krbtgt_princ])
+if '+97: [indotp1, indotp2]' not in out:
+ fail('auth indicators not seen in OTP ticket')
+
+# Repeat with an indicators override in the string attribute.
+daemon = UDPRadiusDaemon(args=(server_addr, secret_file, 'accept', queue))
+daemon.start()
+queue.get()
+oconf = otpconfig('udp', indicators=['indtok1', 'indtok2'])
+realm.run([kadminl, 'setstr', realm.user_princ, 'otp', oconf])
+realm.kinit(realm.user_princ, 'accept', flags=flags)
+verify(daemon, queue, True, realm.user_princ.split('@')[0], 'accept')
+realm.extract_keytab(realm.krbtgt_princ, realm.keytab)
+out = realm.run(['./adata', realm.krbtgt_princ])
+if '+97: [indtok1, indtok2]' not in out:
+ fail('auth indicators not seen in OTP ticket')
+
+# Detect upstream pyrad bug
+# https://github.com/wichert/pyrad/pull/18
+try:
+ auth = packet.Packet.CreateAuthenticator()
+ packet.Packet(authenticator=auth, secret="").ReplyPacket()
+except AssertionError:
+ skip_rest('OTP UNIX domain socket tests', 'pyrad assertion bug detected')
+
+## Test Unix fail / custom username
+daemon = UnixRadiusDaemon(args=(socket_file, '', 'accept', queue))
+daemon.start()
+queue.get()
+realm.run([kadminl, 'setstr', realm.user_princ, 'otp',
+ otpconfig('unix', 'custom')])
+realm.kinit(realm.user_princ, 'reject', flags=flags, expected_code=1)
+verify(daemon, queue, False, 'custom', 'reject')
+
+## Test Unix success / standard username
+daemon = UnixRadiusDaemon(args=(socket_file, '', 'accept', queue))
+daemon.start()
+queue.get()
+realm.run([kadminl, 'setstr', realm.user_princ, 'otp', otpconfig('unix')])
+realm.kinit(realm.user_princ, 'accept', flags=flags)
+verify(daemon, queue, True, realm.user_princ, 'accept')
+
+success('OTP tests')