Loading install +38 −8 Original line number Diff line number Diff line Loading @@ -21,7 +21,8 @@ from __future__ import division, absolute_import from __future__ import print_function, unicode_literals from binascii import hexlify import sys import hashlib from paramiko import MissingHostKeyPolicy, SSHException Loading @@ -31,16 +32,31 @@ from fabric import Connection from fabric_tpa import host from fabric_tpa import VerboseProgram from binascii import hexlify as stdlib_hexlify if sys.version_info >= (3, 8): hexlify = stdlib_hexlify else: def hexlify(data, sep, bytes_per_sep=1): """ replacement for python 3.8's hexlify, which now nicely takes a separator data and sep are bytes, and it returns bytes """ s = stdlib_hexlify(data) return sep.join(s[i:i+bytes_per_sep] for i in range(0, len(s), bytes_per_sep)) class MatchingHostKeyPolicy(MissingHostKeyPolicy): fingerprints_md5_colons = [] def missing_host_key(self, client, hostname, key): hash = hexlify(hashlib.md5(key.asbytes()), ':', 2) hash = hexlify(hashlib.md5(key.asbytes()).digest(), b':', 2).decode('ascii') if hash not in self.fingerprints_md5_colons: raise SSHException( "Server {!r} not in trusted fingerprints: {}".format( hostname, self.fingerprints_md5_colons "Server {!r} key {} not in trusted fingerprints: {!r}".format( hostname, hash, self.fingerprints_md5_colons ) ) Loading @@ -51,21 +67,35 @@ class CustomFingerprintProgram(VerboseProgram): extra_args = [ Argument( names=('fingerprint',), kind=list, default=False, help="expected server fingerprint" ), ] return core_args + extra_args def parse_core(self, argv): super().parse_core(argv) MatchingHostKeyPolicy.fingerprints_md5_colons = self.args.fingerprint.value Connection.default_host_key_policy = MatchingHostKeyPolicy def main(): col = Collection() col.add_task(host.install_hetzner_robot, name='hetzner-robot') program = VerboseProgram(namespace=col) MatchingHostKeyPolicy.fingerprints_md5_colons = program.args.fingerprint.split(' ') Connection.default_host_key_policy = MatchingHostKeyPolicy col.add_task(host.fetch_ssh_host_pubkey) program = CustomFingerprintProgram(namespace=col) program.run() if __name__ == '__main__': try: main() except Exception as e: import traceback import pdb import sys traceback.print_exc() pdb.post_mortem() sys.exit(1) raise e Loading
install +38 −8 Original line number Diff line number Diff line Loading @@ -21,7 +21,8 @@ from __future__ import division, absolute_import from __future__ import print_function, unicode_literals from binascii import hexlify import sys import hashlib from paramiko import MissingHostKeyPolicy, SSHException Loading @@ -31,16 +32,31 @@ from fabric import Connection from fabric_tpa import host from fabric_tpa import VerboseProgram from binascii import hexlify as stdlib_hexlify if sys.version_info >= (3, 8): hexlify = stdlib_hexlify else: def hexlify(data, sep, bytes_per_sep=1): """ replacement for python 3.8's hexlify, which now nicely takes a separator data and sep are bytes, and it returns bytes """ s = stdlib_hexlify(data) return sep.join(s[i:i+bytes_per_sep] for i in range(0, len(s), bytes_per_sep)) class MatchingHostKeyPolicy(MissingHostKeyPolicy): fingerprints_md5_colons = [] def missing_host_key(self, client, hostname, key): hash = hexlify(hashlib.md5(key.asbytes()), ':', 2) hash = hexlify(hashlib.md5(key.asbytes()).digest(), b':', 2).decode('ascii') if hash not in self.fingerprints_md5_colons: raise SSHException( "Server {!r} not in trusted fingerprints: {}".format( hostname, self.fingerprints_md5_colons "Server {!r} key {} not in trusted fingerprints: {!r}".format( hostname, hash, self.fingerprints_md5_colons ) ) Loading @@ -51,21 +67,35 @@ class CustomFingerprintProgram(VerboseProgram): extra_args = [ Argument( names=('fingerprint',), kind=list, default=False, help="expected server fingerprint" ), ] return core_args + extra_args def parse_core(self, argv): super().parse_core(argv) MatchingHostKeyPolicy.fingerprints_md5_colons = self.args.fingerprint.value Connection.default_host_key_policy = MatchingHostKeyPolicy def main(): col = Collection() col.add_task(host.install_hetzner_robot, name='hetzner-robot') program = VerboseProgram(namespace=col) MatchingHostKeyPolicy.fingerprints_md5_colons = program.args.fingerprint.split(' ') Connection.default_host_key_policy = MatchingHostKeyPolicy col.add_task(host.fetch_ssh_host_pubkey) program = CustomFingerprintProgram(namespace=col) program.run() if __name__ == '__main__': try: main() except Exception as e: import traceback import pdb import sys traceback.print_exc() pdb.post_mortem() sys.exit(1) raise e