Commit f4be34f7 authored by Nick Mathewson's avatar Nick Mathewson 🦀
Browse files

Make the python test scripts work on python3

The python scripts invoked by 'make check' didn't work on python3
before.  That was a problem on systems where 'python' is python3.

Fixes bug 11608; bugfix on 0.2.5.2-alpha.
parent 78b431d3
Loading
Loading
Loading
Loading

changes/bug11608

0 → 100644
+5 −0
Original line number Diff line number Diff line
  o Minor bugfixes (testing):
    - The Python parts of the test scripts now work on Python 3 as well
      as Python 2, so systems where '/usr/bin/python' is Python 3 will
      no longer have the tests break. Fixes bug 11608; bugfix on
      0.2.5.2-alpha.
+2 −2
Original line number Diff line number Diff line
@@ -35,8 +35,8 @@ LINES = sys.stdin.readlines()

for I in range(len(LINES)):
    if matches(LINES[I:], FUNCNAMES):
        print "OK"
        print("OK")
        break
else:
    print "BAD"
    print("BAD")
+28 −21
Original line number Diff line number Diff line
@@ -39,13 +39,14 @@ except ImportError:
import hashlib
import hmac
import subprocess
import sys

# **********************************************************************
# Helpers and constants

def HMAC(key,msg):
    "Return the HMAC-SHA256 of 'msg' using the key 'key'."
    H = hmac.new(key, "", hashlib.sha256)
    H = hmac.new(key, b"", hashlib.sha256)
    H.update(msg)
    return H.digest()

@@ -67,10 +68,10 @@ G_LENGTH = 32
H_LENGTH = 32

PROTOID = b"ntor-curve25519-sha256-1"
M_EXPAND = PROTOID + ":key_expand"
T_MAC    = PROTOID + ":mac"
T_KEY    = PROTOID + ":key_extract"
T_VERIFY = PROTOID + ":verify"
M_EXPAND = PROTOID + b":key_expand"
T_MAC    = PROTOID + b":mac"
T_KEY    = PROTOID + b":key_extract"
T_VERIFY = PROTOID + b":verify"

def H_mac(msg): return H(msg, tweak=T_MAC)
def H_verify(msg): return H(msg, tweak=T_VERIFY)
@@ -91,6 +92,13 @@ class PrivateKey(curve25519mod.Private):

# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

if sys.version < '3':
   def int2byte(i):
      return chr(i)
else:
   def int2byte(i):
      return bytes([i])

def  kdf_rfc5869(key, salt, info, n):

    prk = HMAC(key=salt, msg=key)
@@ -99,7 +107,7 @@ def kdf_rfc5869(key, salt, info, n):
    last = b""
    i = 1
    while len(out) < n:
        m = last + info + chr(i)
        m = last + info + int2byte(i)
        last = h = HMAC(key=prk, msg=m)
        out += h
        i = i + 1
@@ -208,7 +216,7 @@ def server(seckey_b, my_node_id, message, keyBytes=72):
                  pubkey_Y.serialize() +
                  pubkey_X.serialize() +
                  PROTOID +
                  "Server")
                  b"Server")

    msg = pubkey_Y.serialize() + H_mac(auth_input)

@@ -270,7 +278,7 @@ def client_part2(seckey_x, msg, node_id, pubkey_B, keyBytes=72):
                  pubkey_B.serialize() +
                  pubkey_Y.serialize() +
                  pubkey_X.serialize() + PROTOID +
                  "Server")
                  b"Server")

    my_auth = H_mac(auth_input)

@@ -284,7 +292,7 @@ def client_part2(seckey_x, msg, node_id, pubkey_B, keyBytes=72):

# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

def demo(node_id="iToldYouAboutStairs.", server_key=PrivateKey()):
def demo(node_id=b"iToldYouAboutStairs.", server_key=PrivateKey()):
    """
       Try to handshake with ourself.
    """
@@ -294,7 +302,7 @@ def demo(node_id="iToldYouAboutStairs.", server_key=PrivateKey()):
    assert len(skeys) == 72
    assert len(ckeys) == 72
    assert skeys == ckeys
    print "OK"
    print("OK")

# ======================================================================
def timing():
@@ -304,7 +312,7 @@ def timing():
    import timeit
    t = timeit.Timer(stmt="ntor_ref.demo(N,SK)",
       setup="import ntor_ref,curve25519;N='ABCD'*5;SK=ntor_ref.PrivateKey()")
    print t.timeit(number=1000)
    print(t.timeit(number=1000))

# ======================================================================

@@ -315,7 +323,7 @@ def kdf_vectors():
    import binascii
    def kdf_vec(inp):
        k = kdf(inp, T_KEY, M_EXPAND, 100)
        print repr(inp), "\n\""+ binascii.b2a_hex(k)+ "\""
        print(repr(inp), "\n\""+ binascii.b2a_hex(k)+ "\"")
    kdf_vec("")
    kdf_vec("Tor")
    kdf_vec("AN ALARMING ITEM TO FIND ON YOUR CREDIT-RATING STATEMENT")
@@ -328,13 +336,13 @@ def test_tor():
       Call the test-ntor-cl command-line program to make sure we can
       interoperate with Tor's ntor program
    """
    enhex=binascii.b2a_hex
    enhex=lambda s: binascii.b2a_hex(s)
    dehex=lambda s: binascii.a2b_hex(s.strip())

    PROG = "./src/test/test-ntor-cl"
    PROG = b"./src/test/test-ntor-cl"
    def tor_client1(node_id, pubkey_B):
        " returns (msg, state) "
        p = subprocess.Popen([PROG, "client1", enhex(node_id),
        p = subprocess.Popen([PROG, b"client1", enhex(node_id),
                              enhex(pubkey_B.serialize())],
                             stdout=subprocess.PIPE)
        return map(dehex, p.stdout.readlines())
@@ -352,7 +360,7 @@ def test_tor():
        return map(dehex, p.stdout.readlines())


    node_id = "thisisatornodeid$#%^"
    node_id = b"thisisatornodeid$#%^"
    seckey_b = PrivateKey()
    pubkey_B = seckey_b.get_public()

@@ -377,14 +385,13 @@ def test_tor():
    assert c_keys == s_keys
    assert len(c_keys) == 90

    print "OK"
    print("OK")

# ======================================================================

if __name__ == '__main__':
    import sys
    if len(sys.argv) < 2:
        print __doc__
        print(__doc__)
    elif sys.argv[1] == 'gen_kdf_vectors':
        kdf_vectors()
    elif sys.argv[1] == 'timing':
@@ -395,4 +402,4 @@ if __name__ == '__main__':
        test_tor()

    else:
        print __doc__
        print(__doc__)
+25 −6
Original line number Diff line number Diff line
@@ -8,12 +8,14 @@

__all__ = ['smult_curve25519_base', 'smult_curve25519']

import sys

P = 2 ** 255 - 19
A = 486662

def expmod(b, e, m):
  if e == 0: return 1
  t = expmod(b, e / 2, m) ** 2 % m
  t = expmod(b, e // 2, m) ** 2 % m
  if e & 1: t = (t * b) % m
  return t

@@ -23,12 +25,14 @@ def inv(x):
# Addition and doubling formulas taken from Appendix D of "Curve25519:
# new Diffie-Hellman speed records".

def add((xn,zn), (xm,zm), (xd,zd)):
def add(n,m,d):
  (xn,zn), (xm,zm), (xd,zd) = n, m, d
  x = 4 * (xm * xn - zm * zn) ** 2 * zd
  z = 4 * (xm * zn - zm * xn) ** 2 * xd
  return (x % P, z % P)

def double((xn,zn)):
def double(n):
  (xn,zn) = n
  x = (xn ** 2 - zn ** 2) ** 2
  z = 4 * xn * zn * (xn ** 2 + A * xn * zn + zn ** 2)
  return (x % P, z % P)
@@ -40,19 +44,34 @@ def curve25519(n, base):
  # (m+1)th multiple of base.
  def f(m):
    if m == 1: return (one, two)
    (pm, pm1) = f(m / 2)
    (pm, pm1) = f(m // 2)
    if (m & 1):
      return (add(pm, pm1, one), double(pm1))
    return (double(pm), add(pm, pm1, one))
  ((x,z), _) = f(n)
  return (x * inv(z)) % P

if sys.version < '3':
    def b2i(c):
        return ord(c)
    def i2b(i):
        return chr(i)
    def ba2bs(ba):
        return "".join(ba)
else:
    def b2i(c):
        return c
    def i2b(i):
        return i
    def ba2bs(ba):
        return bytes(ba)

def unpack(s):
  if len(s) != 32: raise ValueError('Invalid Curve25519 argument')
  return sum(ord(s[i]) << (8 * i) for i in range(32))
  return sum(b2i(s[i]) << (8 * i) for i in range(32))

def pack(n):
  return ''.join([chr((n >> (8 * i)) & 255) for i in range(32)])
  return ba2bs([i2b((n >> (8 * i)) & 255) for i in range(32)])

def clamp(n):
  n &= ~7
+71 −52
Original line number Diff line number Diff line
@@ -27,6 +27,21 @@ class UnexpectedSuccess(Exception):
class UnexpectedFailure(Exception):
    pass

if sys.version < '3':
    def b2s(b):
       return b
    def s2b(s):
       return s
    def NamedTemporaryFile():
       return tempfile.NamedTemporaryFile(delete=False)
else:
    def b2s(b):
       return str(b, 'ascii')
    def s2b(s):
       return s.encode('ascii')
    def NamedTemporaryFile():
       return tempfile.NamedTemporaryFile(mode="w",delete=False,encoding="ascii")

def contents(fn):
    f = open(fn)
    try:
@@ -42,10 +57,10 @@ def run_tor(args, failure=False):
        raise UnexpectedFailure()
    elif not result and failure:
        raise UnexpectedSuccess()
    return output
    return b2s(output)

def spaceify_fp(fp):
    for i in xrange(0, len(fp), 4):
    for i in range(0, len(fp), 4):
        yield fp[i:i+4]

def lines(s):
@@ -62,7 +77,7 @@ def strip_log_junk(line):

def randstring(entropy_bytes):
    s = os.urandom(entropy_bytes)
    return binascii.b2a_hex(s)
    return b2s(binascii.b2a_hex(s))

def findLineContaining(lines, s):
    for ln in lines:
@@ -74,59 +89,61 @@ class CmdlineTests(unittest.TestCase):

    def test_version(self):
        out = run_tor(["--version"])
        self.failUnless(out.startswith("Tor version "))
        self.assertEquals(len(lines(out)), 1)
        self.assertTrue(out.startswith("Tor version "))
        self.assertEqual(len(lines(out)), 1)

    def test_quiet(self):
        out = run_tor(["--quiet", "--quumblebluffin", "1"], failure=True)
        self.assertEquals(out, "")
        self.assertEqual(out, "")

    def test_help(self):
        out = run_tor(["--help"], failure=False)
        out2 = run_tor(["-h"], failure=False)
        self.assert_(out.startswith("Copyright (c) 2001"))
        self.assert_(out.endswith(
        self.assertTrue(out.startswith("Copyright (c) 2001"))
        self.assertTrue(out.endswith(
            "tor -f <torrc> [args]\n"
            "See man page for options, or https://www.torproject.org/ for documentation.\n"))
        self.assert_(out == out2)
        self.assertTrue(out == out2)

    def test_hush(self):
        torrc = tempfile.NamedTemporaryFile(delete=False)
        torrc = NamedTemporaryFile()
        torrc.close()
        try:
            out = run_tor(["--hush", "-f", torrc.name,
                           "--quumblebluffin", "1"], failure=True)
        finally:
            os.unlink(torrc.name)
        self.assertEquals(len(lines(out)), 2)
        self.assertEqual(len(lines(out)), 2)
        ln = [ strip_log_junk(l) for l in lines(out) ]
        self.assertEquals(ln[0], "Failed to parse/validate config: Unknown option 'quumblebluffin'.  Failing.")
        self.assertEquals(ln[1], "Reading config failed--see warnings above.")
        self.assertEqual(ln[0], "Failed to parse/validate config: Unknown option 'quumblebluffin'.  Failing.")
        self.assertEqual(ln[1], "Reading config failed--see warnings above.")

    def test_missing_argument(self):
        out = run_tor(["--hush", "--hash-password"], failure=True)
        self.assertEquals(len(lines(out)), 2)
        self.assertEqual(len(lines(out)), 2)
        ln = [ strip_log_junk(l) for l in lines(out) ]
        self.assertEquals(ln[0], "Command-line option '--hash-password' with no value. Failing.")
        self.assertEqual(ln[0], "Command-line option '--hash-password' with no value. Failing.")

    def test_hash_password(self):
        out = run_tor(["--hash-password", "woodwose"])
        result = lines(out)[-1]
        self.assertEquals(result[:3], "16:")
        self.assertEquals(len(result), 61)
        self.assertEqual(result[:3], "16:")
        self.assertEqual(len(result), 61)
        r = binascii.a2b_hex(result[3:])
        self.assertEquals(len(r), 29)
        self.assertEqual(len(r), 29)

        salt, how, hashed = r[:8], r[8], r[9:]
        self.assertEquals(len(hashed), 20)
        self.assertEqual(len(hashed), 20)
        if type(how) == type("A"):
          how = ord(how)

        count = (16 + (ord(how) & 15)) << ((ord(how) >> 4) + 6)
        stuff = salt + "woodwose"
        count = (16 + (how & 15)) << ((how >> 4) + 6)
        stuff = salt + s2b("woodwose")
        repetitions = count // len(stuff) + 1
        inp = stuff * repetitions
        inp = inp[:count]

        self.assertEquals(hashlib.sha1(inp).digest(), hashed)
        self.assertEqual(hashlib.sha1(inp).digest(), hashed)

    def test_digests(self):
        main_c = os.path.join(TOP_SRCDIR, "src", "or", "main.c")
@@ -136,12 +153,14 @@ class CmdlineTests(unittest.TestCase):
        out = run_tor(["--digests"])
        main_line = [ l for l in lines(out) if l.endswith("/main.c") ]
        digest, name = main_line[0].split()
        actual = hashlib.sha1(open(main_c).read()).hexdigest()
        self.assertEquals(digest, actual)
        f = open(main_c, 'rb')
        actual = hashlib.sha1(f.read()).hexdigest()
        f.close()
        self.assertEqual(digest, actual)

    def test_dump_options(self):
        default_torrc = tempfile.NamedTemporaryFile(delete=False)
        torrc = tempfile.NamedTemporaryFile(delete=False)
        default_torrc = NamedTemporaryFile()
        torrc = NamedTemporaryFile()
        torrc.write("SocksPort 9999")
        torrc.close()
        default_torrc.write("SafeLogging 0")
@@ -161,27 +180,27 @@ class CmdlineTests(unittest.TestCase):
            os.unlink(torrc.name)
            os.unlink(default_torrc.name)

        self.assertEquals(len(lines(out_sh)), 2)
        self.assert_(lines(out_sh)[0].startswith("DataDirectory "))
        self.assertEquals(lines(out_sh)[1:],
        self.assertEqual(len(lines(out_sh)), 2)
        self.assertTrue(lines(out_sh)[0].startswith("DataDirectory "))
        self.assertEqual(lines(out_sh)[1:],
            [ "SocksPort 9999" ])

        self.assertEquals(len(lines(out_nb)), 2)
        self.assertEquals(lines(out_nb),
        self.assertEqual(len(lines(out_nb)), 2)
        self.assertEqual(lines(out_nb),
            [ "SafeLogging 0",
              "SocksPort 9999" ])

        out_fl = lines(out_fl)
        self.assert_(len(out_fl) > 100)
        self.assert_("SocksPort 9999" in out_fl)
        self.assert_("SafeLogging 0" in out_fl)
        self.assert_("ClientOnly 0" in out_fl)
        self.assertTrue(len(out_fl) > 100)
        self.assertTrue("SocksPort 9999" in out_fl)
        self.assertTrue("SafeLogging 0" in out_fl)
        self.assertTrue("ClientOnly 0" in out_fl)

        self.assert_(out_verif.endswith("Configuration was valid\n"))
        self.assertTrue(out_verif.endswith("Configuration was valid\n"))

    def test_list_fingerprint(self):
        tmpdir = tempfile.mkdtemp(prefix='ttca_')
        torrc = tempfile.NamedTemporaryFile(delete=False)
        torrc = NamedTemporaryFile()
        torrc.write("ORPort 9999\n")
        torrc.write("DataDirectory %s\n"%tmpdir)
        torrc.write("Nickname tippi")
@@ -200,21 +219,21 @@ class CmdlineTests(unittest.TestCase):
        fp = fp.strip()
        nn_fp = fp.split()[0]
        space_fp = " ".join(spaceify_fp(fp.split()[1]))
        self.assertEquals(lastlog,
        self.assertEqual(lastlog,
              "Your Tor server's identity key fingerprint is '%s'"%fp)
        self.assertEquals(lastline, "tippi %s"%space_fp)
        self.assertEquals(nn_fp, "tippi")
        self.assertEqual(lastline, "tippi %s"%space_fp)
        self.assertEqual(nn_fp, "tippi")

    def test_list_options(self):
        out = lines(run_tor(["--list-torrc-options"]))
        self.assert_(len(out)>100)
        self.assert_(out[0] <= 'AccountingMax')
        self.assert_("UseBridges" in out)
        self.assert_("SocksPort" in out)
        self.assertTrue(len(out)>100)
        self.assertTrue(out[0] <= 'AccountingMax')
        self.assertTrue("UseBridges" in out)
        self.assertTrue("SocksPort" in out)

    def test_cmdline_args(self):
        default_torrc = tempfile.NamedTemporaryFile(delete=False)
        torrc = tempfile.NamedTemporaryFile(delete=False)
        default_torrc = NamedTemporaryFile()
        torrc = NamedTemporaryFile()
        torrc.write("SocksPort 9999\n")
        torrc.write("SocksPort 9998\n")
        torrc.write("ORPort 9000\n")
@@ -242,14 +261,14 @@ class CmdlineTests(unittest.TestCase):
        out_1 = [ l for l in lines(out_1) if not l.startswith("DataDir") ]
        out_2 = [ l for l in lines(out_2) if not l.startswith("DataDir") ]

        self.assertEquals(out_1,
        self.assertEqual(out_1,
                          ["ControlPort 9500",
                           "Nickname eleventeen",
                           "ORPort 9000",
                           "ORPort 9001",
                           "SocksPort 9999",
                           "SocksPort 9998"])
        self.assertEquals(out_2,
        self.assertEqual(out_2,
                          ["ExtORPort 9005",
                           "Nickname eleventeen",
                           "ORPort 9000",
@@ -261,13 +280,13 @@ class CmdlineTests(unittest.TestCase):
        fname = "nonexistent_file_"+randstring(8)
        out = run_tor(["-f", fname, "--verify-config"], failure=True)
        ln = [ strip_log_junk(l) for l in lines(out) ]
        self.assert_("Unable to open configuration file" in ln[-2])
        self.assert_("Reading config failed" in ln[-1])
        self.assertTrue("Unable to open configuration file" in ln[-2])
        self.assertTrue("Reading config failed" in ln[-1])

        out = run_tor(["-f", fname, "--verify-config", "--ignore-missing-torrc"])
        ln = [ strip_log_junk(l) for l in lines(out) ]
        self.assert_(findLineContaining(ln, ", using reasonable defaults"))
        self.assert_("Configuration was valid" in ln[-1])
        self.assertTrue(findLineContaining(ln, ", using reasonable defaults"))
        self.assertTrue("Configuration was valid" in ln[-1])

if __name__ == '__main__':
    unittest.main()