diff --git a/osc/conf.py b/osc/conf.py index f77d183d..31c5c9ec 100644 --- a/osc/conf.py +++ b/osc/conf.py @@ -303,10 +303,20 @@ class HostOptions(OscOptions): """ A pointer to public SSH key that corresponds with a private SSH used for authentication: + - keep empty for auto detection - path to the public SSH key - public SSH key filename (must be placed in ~/.ssh) + - fingerprint of a SSH key (2nd column of ``ssh-add -l``) NOTE: The private key may not be available on disk because it could be in a GPG keyring, on YubiKey or forwarded through SSH agent. + + TIP: To give osc a hint which ssh key from the agent to use during auto detection, + append ``obs=`` to the **private** key's comment. + This will also work nicely during SSH agent forwarding, because the comments get forwarded too. + + - To edit the key, run: ``ssh-keygen -c -f ~/.ssh/`` + - To query the key, run: ``ssh-keygen -y -f ~/.ssh/`` + - Example comment: `` obs=api.example.com obs=api-test.example.com`` """ ), ) # type: ignore[assignment] diff --git a/osc/connection.py b/osc/connection.py index b4f3496f..7a8b37de 100644 --- a/osc/connection.py +++ b/osc/connection.py @@ -566,7 +566,21 @@ class SignatureAuthHandler(AuthHandlerBase): def __init__(self, apiurl, user, sshkey, basic_auth_password=None): super().__init__(apiurl) self.user = user - self.sshkey = sshkey + self.sshkey = None + self.sshkey_fingerprint = None + + if sshkey and re.match("^[A-Z0-9]+:.*", sshkey): + # if it starts with a prefix such as 'SHA256:' then it's a fingerprint + self.sshkey_fingerprint = sshkey + else: + self.sshkey = sshkey + + if self.sshkey: + # if only a file name is provided, prepend ~/.ssh + if "/" not in self.sshkey: + self.sshkey = os.path.join("~", ".ssh", self.sshkey) + self.sshkey = os.path.expanduser(self.sshkey) + output.print_msg(f"Using ssh key file configured in oscrc: {self.sshkey}", print_to="debug") self.ssh_keygen_path = shutil.which("ssh-keygen") self.ssh_add_path = shutil.which("ssh-add") @@ -584,53 +598,79 @@ class SignatureAuthHandler(AuthHandlerBase): if not self.ssh_add_path: return [] cmd = [self.ssh_add_path, '-L'] - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") stdout, _ = proc.communicate() if proc.returncode == 0 and stdout.strip(): return stdout.strip().splitlines() else: return [] - @staticmethod - def get_fingerprint(line): - parts = line.strip().split(b" ") - if len(parts) < 2: - raise ValueError(f"Unable to retrieve ssh key fingerprint from line: {line}") - return parts[1] + def list_ssh_agent_fingerprints(self): + if not self.ssh_add_path: + return [] + cmd = [self.ssh_add_path, '-l'] + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") + stdout, _ = proc.communicate() + if proc.returncode == 0 and stdout.strip(): + lines = stdout.strip().splitlines() + return [i.split(" ")[1] for i in lines] + else: + return [] def guess_keyfile(self): # `ssh-keygen -Y sign` requires a file with a key which is not available during ssh agent forwarding # that's why we need to list ssh-agent's keys and store the first one into a temp file keys_in_agent = self.list_ssh_agent_keys() if keys_in_agent: - self.temp_pubkey = tempfile.NamedTemporaryFile() + selected_key = None + + # use ssh key from ssh agent by the specified fingerprint + if self.sshkey_fingerprint: + fingerprints_in_agent = self.list_ssh_agent_fingerprints() + try: + indx = fingerprints_in_agent.index(self.sshkey_fingerprint) + selected_key = keys_in_agent[indx] + output.print_msg(f"Using ssh key from ssh agent that matches fingerprint '{self.sshkey_fingerprint}': {selected_key}", print_to="debug") + except ValueError: + pass + + # use ssh key from ssh agent by key's comment obs= matching the hostname of apiurl + if selected_key is None: + apiurl_hostname = urllib.parse.urlparse(self.apiurl).hostname + for key in keys_in_agent: + comments = key.strip().split(" ")[2:] + pattern = f"obs={apiurl_hostname}" + if pattern in comments: + selected_key = key + output.print_msg(f"Using ssh key from ssh agent that has comment '{pattern}' which matches apiurl '{self.apiurl}': {selected_key}", print_to="debug") + break + + # use the first ssh key from ssh agent + if selected_key is None: + selected_key = keys_in_agent[0] + output.print_msg(f"Using the first ssh key from ssh agent (see `ssh-add -L`): {selected_key}", print_to="debug") + + self.temp_pubkey = tempfile.NamedTemporaryFile(mode="w+") self.temp_pubkey.write(keys_in_agent[0]) self.temp_pubkey.flush() return self.temp_pubkey.name sshdir = os.path.expanduser('~/.ssh') keyfiles = ('id_ed25519', 'id_ed25519_sk', 'id_rsa', 'id_ecdsa', 'id_ecdsa_sk', 'id_dsa') + output.print_msg(f"Searching ssh keys in '{sshdir}' in the following order: {', '.join(keyfiles)}", print_to="debug") for keyfile in keyfiles: keyfile_path = os.path.join(sshdir, keyfile) if os.path.isfile(keyfile_path): + output.print_msg(f"Using ssh key from file: {keyfile_path}", print_to="debug") return keyfile_path return None def ssh_sign(self, data, namespace, keyfile=None): - try: - data = bytes(data, 'utf-8') - except: - pass if not keyfile: keyfile = self.guess_keyfile() - else: - if '/' not in keyfile: - keyfile = f"~/.ssh/{keyfile}" - keyfile = os.path.expanduser(keyfile) - cmd = [self.ssh_keygen_path, '-Y', 'sign', '-f', keyfile, '-n', namespace, '-q'] - proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE) - stdout, _ = proc.communicate(data) + proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, encoding="utf-8") + signature, _ = proc.communicate(data) if self.temp_pubkey: self.temp_pubkey.close() @@ -639,7 +679,6 @@ class SignatureAuthHandler(AuthHandlerBase): if proc.returncode: raise oscerr.OscIOError(None, 'ssh-keygen signature creation failed: %d' % proc.returncode) - signature = decode_it(stdout) match = re.match(r"\A-----BEGIN SSH SIGNATURE-----\n(.*)\n-----END SSH SIGNATURE-----", signature, re.S) if not match: raise oscerr.OscIOError(None, 'could not extract ssh signature')