diff options
Diffstat (limited to 'third_party/Python/module/pexpect-2.4/pxssh.py')
| -rw-r--r-- | third_party/Python/module/pexpect-2.4/pxssh.py | 311 | 
1 files changed, 311 insertions, 0 deletions
diff --git a/third_party/Python/module/pexpect-2.4/pxssh.py b/third_party/Python/module/pexpect-2.4/pxssh.py new file mode 100644 index 000000000000..04ba25cbff5b --- /dev/null +++ b/third_party/Python/module/pexpect-2.4/pxssh.py @@ -0,0 +1,311 @@ +"""This class extends pexpect.spawn to specialize setting up SSH connections. +This adds methods for login, logout, and expecting the shell prompt. + +$Id: pxssh.py 513 2008-02-09 18:26:13Z noah $ +""" + +from pexpect import * +import pexpect +import time + +__all__ = ['ExceptionPxssh', 'pxssh'] + +# Exception classes used by this module. +class ExceptionPxssh(ExceptionPexpect): +    """Raised for pxssh exceptions. +    """ + +class pxssh (spawn): + +    """This class extends pexpect.spawn to specialize setting up SSH +    connections. This adds methods for login, logout, and expecting the shell +    prompt. It does various tricky things to handle many situations in the SSH +    login process. For example, if the session is your first login, then pxssh +    automatically accepts the remote certificate; or if you have public key +    authentication setup then pxssh won't wait for the password prompt. + +    pxssh uses the shell prompt to synchronize output from the remote host. In +    order to make this more robust it sets the shell prompt to something more +    unique than just $ or #. This should work on most Borne/Bash or Csh style +    shells. + +    Example that runs a few commands on a remote server and prints the result:: +         +        import pxssh +        import getpass +        try:                                                             +            s = pxssh.pxssh() +            hostname = raw_input('hostname: ') +            username = raw_input('username: ') +            password = getpass.getpass('password: ') +            s.login (hostname, username, password) +            s.sendline ('uptime')  # run a command +            s.prompt()             # match the prompt +            print s.before         # print everything before the prompt. +            s.sendline ('ls -l') +            s.prompt() +            print s.before +            s.sendline ('df') +            s.prompt() +            print s.before +            s.logout() +        except pxssh.ExceptionPxssh, e: +            print "pxssh failed on login." +            print str(e) + +    Note that if you have ssh-agent running while doing development with pxssh +    then this can lead to a lot of confusion. Many X display managers (xdm, +    gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI +    dialog box popup asking for a password during development. You should turn +    off any key agents during testing. The 'force_password' attribute will turn +    off public key authentication. This will only work if the remote SSH server +    is configured to allow password logins. Example of using 'force_password' +    attribute:: + +            s = pxssh.pxssh() +            s.force_password = True +            hostname = raw_input('hostname: ') +            username = raw_input('username: ') +            password = getpass.getpass('password: ') +            s.login (hostname, username, password) +    """ + +    def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None): +        spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env) + +        self.name = '<pxssh>' +         +        #SUBTLE HACK ALERT! Note that the command to set the prompt uses a +        #slightly different string than the regular expression to match it. This +        #is because when you set the prompt the command will echo back, but we +        #don't want to match the echoed command. So if we make the set command +        #slightly different than the regex we eliminate the problem. To make the +        #set command different we add a backslash in front of $. The $ doesn't +        #need to be escaped, but it doesn't hurt and serves to make the set +        #prompt command different than the regex. + +        # used to match the command-line prompt +        self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] " +        self.PROMPT = self.UNIQUE_PROMPT + +        # used to set shell command-line prompt to UNIQUE_PROMPT. +        self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '" +        self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '" +        self.SSH_OPTS = "-o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'" +        # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from +        # displaying a GUI password dialog. I have not figured out how to +        # disable only SSH_ASKPASS without also disabling X11 forwarding. +        # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying! +        #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'" +        self.force_password = False +        self.auto_prompt_reset = True  + +    def levenshtein_distance(self, a,b): + +        """This calculates the Levenshtein distance between a and b. +        """ + +        n, m = len(a), len(b) +        if n > m: +            a,b = b,a +            n,m = m,n +        current = range(n+1) +        for i in range(1,m+1): +            previous, current = current, [i]+[0]*n +            for j in range(1,n+1): +                add, delete = previous[j]+1, current[j-1]+1 +                change = previous[j-1] +                if a[j-1] != b[i-1]: +                    change = change + 1 +                current[j] = min(add, delete, change) +        return current[n] + +    def sync_original_prompt (self): + +        """This attempts to find the prompt. Basically, press enter and record +        the response; press enter again and record the response; if the two +        responses are similar then assume we are at the original prompt. This +        is a slow function. It can take over 10 seconds. """ + +        # All of these timing pace values are magic. +        # I came up with these based on what seemed reliable for +        # connecting to a heavily loaded machine I have. +        # If latency is worse than these values then this will fail. + +        try: +            self.read_nonblocking(size=10000,timeout=1) # GAS: Clear out the cache before getting the prompt +        except TIMEOUT: +            pass +        time.sleep(0.1) +        self.sendline() +        time.sleep(0.5) +        x = self.read_nonblocking(size=1000,timeout=1) +        time.sleep(0.1) +        self.sendline() +        time.sleep(0.5) +        a = self.read_nonblocking(size=1000,timeout=1) +        time.sleep(0.1) +        self.sendline() +        time.sleep(0.5) +        b = self.read_nonblocking(size=1000,timeout=1) +        ld = self.levenshtein_distance(a,b) +        len_a = len(a) +        if len_a == 0: +            return False +        if float(ld)/len_a < 0.4: +            return True +        return False + +    ### TODO: This is getting messy and I'm pretty sure this isn't perfect. +    ### TODO: I need to draw a flow chart for this. +    def login (self,server,username,password='',terminal_type='ansi',original_prompt=r"[#$]",login_timeout=10,port=None,auto_prompt_reset=True): + +        """This logs the user into the given server. It uses the +        'original_prompt' to try to find the prompt right after login. When it +        finds the prompt it immediately tries to reset the prompt to something +        more easily matched. The default 'original_prompt' is very optimistic +        and is easily fooled. It's more reliable to try to match the original +        prompt as exactly as possible to prevent false matches by server +        strings such as the "Message Of The Day". On many systems you can +        disable the MOTD on the remote server by creating a zero-length file +        called "~/.hushlogin" on the remote server. If a prompt cannot be found +        then this will not necessarily cause the login to fail. In the case of +        a timeout when looking for the prompt we assume that the original +        prompt was so weird that we could not match it, so we use a few tricks +        to guess when we have reached the prompt. Then we hope for the best and +        blindly try to reset the prompt to something more unique. If that fails +        then login() raises an ExceptionPxssh exception. +         +        In some situations it is not possible or desirable to reset the +        original prompt. In this case, set 'auto_prompt_reset' to False to +        inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh +        uses a unique prompt in the prompt() method. If the original prompt is +        not reset then this will disable the prompt() method unless you +        manually set the PROMPT attribute. """ + +        ssh_options = '-q' +        if self.force_password: +            ssh_options = ssh_options + ' ' + self.SSH_OPTS +        if port is not None: +            ssh_options = ssh_options + ' -p %s'%(str(port)) +        cmd = "ssh %s -l %s %s" % (ssh_options, username, server) + +        # This does not distinguish between a remote server 'password' prompt +        # and a local ssh 'passphrase' prompt (for unlocking a private key). +        spawn._spawn(self, cmd) +        i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT, "(?i)connection closed by remote host"], timeout=login_timeout) + +        # First phase +        if i==0:  +            # New certificate -- always accept it. +            # This is what you get if SSH does not have the remote host's +            # public key stored in the 'known_hosts' cache. +            self.sendline("yes") +            i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) +        if i==2: # password or passphrase +            self.sendline(password) +            i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) +        if i==4: +            self.sendline(terminal_type) +            i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) + +        # Second phase +        if i==0: +            # This is weird. This should not happen twice in a row. +            self.close() +            raise ExceptionPxssh ('Weird error. Got "are you sure" prompt twice.') +        elif i==1: # can occur if you have a public key pair set to authenticate.  +            ### TODO: May NOT be OK if expect() got tricked and matched a false prompt. +            pass +        elif i==2: # password prompt again +            # For incorrect passwords, some ssh servers will +            # ask for the password again, others return 'denied' right away. +            # If we get the password prompt again then this means +            # we didn't get the password right the first time.  +            self.close() +            raise ExceptionPxssh ('password refused') +        elif i==3: # permission denied -- password was bad. +            self.close() +            raise ExceptionPxssh ('permission denied') +        elif i==4: # terminal type again? WTF? +            self.close() +            raise ExceptionPxssh ('Weird error. Got "terminal type" prompt twice.') +        elif i==5: # Timeout +            #This is tricky... I presume that we are at the command-line prompt. +            #It may be that the shell prompt was so weird that we couldn't match +            #it. Or it may be that we couldn't log in for some other reason. I +            #can't be sure, but it's safe to guess that we did login because if +            #I presume wrong and we are not logged in then this should be caught +            #later when I try to set the shell prompt. +            pass +        elif i==6: # Connection closed by remote host +            self.close() +            raise ExceptionPxssh ('connection closed') +        else: # Unexpected  +            self.close() +            raise ExceptionPxssh ('unexpected login response') +        if not self.sync_original_prompt(): +            self.close() +            raise ExceptionPxssh ('could not synchronize with original prompt') +        # We appear to be in. +        # set shell prompt to something unique. +        if auto_prompt_reset: +            if not self.set_unique_prompt(): +                self.close() +                raise ExceptionPxssh ('could not set shell prompt\n'+self.before) +        return True + +    def logout (self): + +        """This sends exit to the remote shell. If there are stopped jobs then +        this automatically sends exit twice. """ + +        self.sendline("exit") +        index = self.expect([EOF, "(?i)there are stopped jobs"]) +        if index==1: +            self.sendline("exit") +            self.expect(EOF) +        self.close() + +    def prompt (self, timeout=20): + +        """This matches the shell prompt. This is little more than a short-cut +        to the expect() method. This returns True if the shell prompt was +        matched. This returns False if there was a timeout. Note that if you +        called login() with auto_prompt_reset set to False then you should have +        manually set the PROMPT attribute to a regex pattern for matching the +        prompt. """ + +        i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout) +        if i==1: +            return False +        return True +         +    def set_unique_prompt (self): + +        """This sets the remote prompt to something more unique than # or $. +        This makes it easier for the prompt() method to match the shell prompt +        unambiguously. This method is called automatically by the login() +        method, but you may want to call it manually if you somehow reset the +        shell prompt. For example, if you 'su' to a different user then you +        will need to manually reset the prompt. This sends shell commands to +        the remote host to set the prompt, so this assumes the remote host is +        ready to receive commands. + +        Alternatively, you may use your own prompt pattern. Just set the PROMPT +        attribute to a regular expression that matches it. In this case you +        should call login() with auto_prompt_reset=False; then set the PROMPT +        attribute. After that the prompt() method will try to match your prompt +        pattern.""" + +        self.sendline ("unset PROMPT_COMMAND") +        self.sendline (self.PROMPT_SET_SH) # sh-style +        i = self.expect ([TIMEOUT, self.PROMPT], timeout=10) +        if i == 0: # csh-style +            self.sendline (self.PROMPT_SET_CSH) +            i = self.expect ([TIMEOUT, self.PROMPT], timeout=10) +            if i == 0: +                return False +        return True + +# vi:ts=4:sw=4:expandtab:ft=python:  | 
