mirror of
https://github.com/openSUSE/osc.git
synced 2026-02-17 17:40:43 +01:00
feat(gitea): Add support for SSH agent authentication
This change introduces support for authenticating against the Gitea API using an SSH agent. This provides a more secure and convenient alternative to storing private keys or API tokens directly in the configuration file. Key changes include: - A new HttpSigner class that can sign HTTP requests using either a local private key or a key from an SSH agent. - The osc git-obs login add and login update commands now accept --ssh-agent and --ssh-key-agent-pub arguments to configure SSH agent-based authentication. - The paramiko library is now used for SSH agent communication. It is an optional dependency and only required when using the SSH agent feature. - The Gitea API connection logic is updated to use the HttpSigner when SSH authentication is configured. This improves user experience by allowing them to leverage their existing SSH agent setup for Gitea authentication, avoiding the need to handle sensitive credentials manually.
This commit is contained in:
@@ -36,6 +36,15 @@ Create a Gitea token
|
||||
- Once you hit the "Generate Token" button, the page reloads and the token appears in the blue rectangle on top.
|
||||
This is the only chance to copy it because it will never show up again.
|
||||
|
||||
Add your SSH key
|
||||
------------------
|
||||
|
||||
- Visit your Gitea user's "SSH / GPG Keys" settings page (Profile picture -> Settings -> "SSH / GPG Keys"):
|
||||
`https://src.opensuse.org/user/settings/keys <https://src.opensuse.org/user/settings/keys>`_
|
||||
- Add Key name and as Content the public SSH key content (e.g., from `~/.ssh/id_rsa.pub`)
|
||||
|
||||
.. note::
|
||||
For HTTP Signature authentication, ``git-obs`` only supports ``RSA`` and ``ed25519`` keys.
|
||||
|
||||
Add a login entry to the git-obs configuration file
|
||||
---------------------------------------------------
|
||||
@@ -47,6 +56,20 @@ Add a login entry to the git-obs configuration file
|
||||
- If the ``--token`` option in the command above is omitted,
|
||||
the command will prompt you to enter the token securely.
|
||||
|
||||
- Alternatively, you can configure a login to authenticate Gitea API requests
|
||||
by signing them with an SSH key. This method, known as HTTP Signature, is an
|
||||
alternative to using a Gitea token. Before using this method, ensure you have
|
||||
added your public SSH key to your Gitea account settings.
|
||||
|
||||
- Using an SSH key file::
|
||||
|
||||
git-obs login add opensuse --url https://src.opensuse.org --user USER --ssh-key /path/to/your/private_key --ssh-key-agent-pub 'SHA256:XXXXXXX'
|
||||
|
||||
- Using an SSH agent (for example with a hardware token)::
|
||||
|
||||
git-obs login add opensuse --url https://src.opensuse.org --user USER --ssh-agent --ssh-key-agent-pub 'SHA256:XXXXXXX'
|
||||
|
||||
The ``--ssh-key-agent-pub`` argument specifies the public key's signature.
|
||||
|
||||
Using the login entries
|
||||
-----------------------
|
||||
@@ -235,4 +258,4 @@ Known issues
|
||||
- Reviews by groups/teams are not handled well.
|
||||
If you approve, the team disappears and gets replaced with your login.
|
||||
Then is not possible to search for such the team reviews and for example monitor
|
||||
re-review requests during a team member's absence.
|
||||
re-review requests during a team member's absence.
|
||||
@@ -1,4 +1,5 @@
|
||||
import getpass
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
|
||||
@@ -21,6 +22,8 @@ class LoginAddCommand(osc.commandline_git.GitObsCommand):
|
||||
self.parser.add_argument("--user", help="Gitea username", required=True)
|
||||
self.parser.add_argument("--token", help="Gitea access token; omit or set to '-' to invoke a secure interactive prompt")
|
||||
self.parser.add_argument("--ssh-key", metavar="PATH", help="Path to a private SSH key").completer = complete_ssh_key_path
|
||||
self.parser.add_argument("--ssh-agent", action="store_true", help="Use ssh-agent for authentication")
|
||||
self.parser.add_argument("--ssh-key-agent-pub", help="Public SSH key signature for ssh-agent authentication")
|
||||
self.parser.add_argument("--git-uses-http", action="store_true", help="Git uses http(s) instead of SSH", default=None)
|
||||
self.parser.add_argument("--quiet", action="store_true", help="Mute unnecessary output when using this login entry")
|
||||
self.parser.add_argument("--set-as-default", help="Set the new login entry as default", action="store_true", default=None)
|
||||
@@ -34,11 +37,32 @@ class LoginAddCommand(osc.commandline_git.GitObsCommand):
|
||||
|
||||
# TODO: try to authenticate to verify that the new entry works
|
||||
|
||||
while not args.token or args.token == "-":
|
||||
args.token = getpass.getpass(prompt=f"Enter Gitea token for user '{args.user}': ")
|
||||
if not (args.ssh_key or args.ssh_agent) or not args.ssh_key_agent_pub:
|
||||
self.parser.error("For SSH authentication, either --ssh-key or --ssh-agent must be specified together with --ssh-key-agent-pub")
|
||||
elif (args.ssh_key and args.ssh_agent) or (args.ssh_key and args.token) or (args.ssh_agent and args.token):
|
||||
self.parser.error("SSH authentication cannot be used together with token authentication, and --ssh-key and --ssh-agent cannot be used together")
|
||||
|
||||
ssh_login = (args.ssh_key or args.ssh_agent) and args.ssh_key_agent_pub
|
||||
|
||||
if args.ssh_key:
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
if not os.path.isfile(args.ssh_key):
|
||||
self.parser.error(f"SSH key file '{args.ssh_key}' does not exist")
|
||||
if not os.access(args.ssh_key, os.R_OK):
|
||||
self.parser.error(f"SSH key file '{args.ssh_key}' is not readable")
|
||||
with open(args.ssh_key, "rb") as key_file:
|
||||
try:
|
||||
serialization.load_ssh_private_key(key_file.read(), password=None)
|
||||
except Exception:
|
||||
self.parser.error(f"SSH key file '{args.ssh_key}' is not a valid SSH private key")
|
||||
|
||||
if not ssh_login:
|
||||
while not args.token or args.token == "-":
|
||||
args.token = getpass.getpass(prompt=f"Enter Gitea token for user '{args.user}': ")
|
||||
|
||||
if args.token and not re.match(r"^[0-9a-f]{40}$", args.token):
|
||||
self.parser.error("Invalid token format, 40 hexadecimal characters expected")
|
||||
if args.token and not re.match(r"^[0-9a-f]{40}$", args.token):
|
||||
self.parser.error("Invalid token format, 40 hexadecimal characters expected")
|
||||
|
||||
login_obj = gitea_api.Login(
|
||||
name=args.name,
|
||||
@@ -46,6 +70,8 @@ class LoginAddCommand(osc.commandline_git.GitObsCommand):
|
||||
user=args.user,
|
||||
token=args.token,
|
||||
ssh_key=args.ssh_key,
|
||||
ssh_agent=args.ssh_agent,
|
||||
ssh_key_agent_pub=args.ssh_key_agent_pub,
|
||||
git_uses_http=args.git_uses_http,
|
||||
default=args.set_as_default,
|
||||
)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import getpass
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
|
||||
@@ -22,10 +23,75 @@ class LoginUpdateCommand(osc.commandline_git.GitObsCommand):
|
||||
self.parser.add_argument("--new-user", metavar="USER", help="Gitea username")
|
||||
self.parser.add_argument("--new-token", metavar="TOKEN", help="Gitea access token; set to '-' to invoke a secure interactive prompt")
|
||||
self.parser.add_argument("--new-ssh-key", metavar="PATH", help="Path to a private SSH key").completer = complete_ssh_key_path
|
||||
self.parser.add_argument("--new-ssh-agent", help="Use ssh-agent for authentication", choices=["0", "1", "yes", "no"], default=None)
|
||||
self.parser.add_argument("--new-ssh-key-agent-pub", help="Public SSH key signature for ssh-agent authentication")
|
||||
self.parser.add_argument("--new-git-uses-http", help="Git uses http(s) instead of SSH", choices=["0", "1", "yes", "no"], default=None)
|
||||
self.parser.add_argument("--new-quiet", help="Mute unnecessary output when using this login entry", choices=["0", "1", "yes", "no"], default=None)
|
||||
self.parser.add_argument("--set-as-default", action="store_true", help="Set the login entry as default")
|
||||
|
||||
def _get_ssh_settings(self, args, original_login_obj):
|
||||
# Parse new_ssh_agent
|
||||
new_ssh_agent = None
|
||||
if args.new_ssh_agent in ("0", "no"):
|
||||
new_ssh_agent = False
|
||||
elif args.new_ssh_agent in ("1", "yes"):
|
||||
new_ssh_agent = True
|
||||
|
||||
if args.new_ssh_key and new_ssh_agent:
|
||||
self.parser.error("Cannot specify both --new-ssh-key and --new-ssh-agent")
|
||||
|
||||
# Determine final state
|
||||
final_ssh_key = original_login_obj.ssh_key
|
||||
final_ssh_agent = original_login_obj.ssh_agent
|
||||
final_ssh_key_agent_pub = original_login_obj.ssh_key_agent_pub
|
||||
|
||||
if args.new_ssh_key:
|
||||
final_ssh_key = args.new_ssh_key
|
||||
# If setting a key, disable agent
|
||||
final_ssh_agent = False
|
||||
|
||||
if new_ssh_agent is not None:
|
||||
final_ssh_agent = new_ssh_agent
|
||||
# If enabling agent, clear key
|
||||
if new_ssh_agent:
|
||||
final_ssh_key = None
|
||||
|
||||
if args.new_ssh_key_agent_pub:
|
||||
final_ssh_key_agent_pub = args.new_ssh_key_agent_pub
|
||||
|
||||
# Validate SSH configuration changes
|
||||
if args.new_ssh_key or args.new_ssh_agent:
|
||||
# If SSH is being enabled or modified, require the pub key
|
||||
if (final_ssh_key or final_ssh_agent) and not args.new_ssh_key_agent_pub:
|
||||
self.parser.error("For SSH authentication, either --new-ssh-key or --new-ssh-agent must be specified together with --new-ssh-key-agent-pub")
|
||||
|
||||
if original_login_obj.ssh_agent and final_ssh_agent is False and not final_ssh_key:
|
||||
self.parser.error("Cannot switch from ssh-agent authentication to SSH key authentication without specifying a new SSH key")
|
||||
|
||||
if original_login_obj.ssh_key and final_ssh_agent is True:
|
||||
print("Warning: switching from SSH key authentication to ssh-agent authentication, the specified SSH key will be deleted", file=sys.stderr)
|
||||
|
||||
if original_login_obj.ssh_agent and args.new_ssh_key:
|
||||
print("Warning: switching from ssh-agent authentication to SSH key authentication, the ssh-agent setting will be disabled", file=sys.stderr)
|
||||
|
||||
if (final_ssh_key or final_ssh_agent) and args.new_token:
|
||||
self.parser.error("Token authentication cannot be used together with SSH authentication")
|
||||
|
||||
if args.new_ssh_key:
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
if not os.path.isfile(args.new_ssh_key):
|
||||
self.parser.error(f"SSH key file '{args.new_ssh_key}' does not exist")
|
||||
if not os.access(args.new_ssh_key, os.R_OK):
|
||||
self.parser.error(f"SSH key file '{args.new_ssh_key}' is not readable")
|
||||
with open(args.new_ssh_key, "rb") as key_file:
|
||||
try:
|
||||
serialization.load_ssh_private_key(key_file.read(), password=None)
|
||||
except Exception:
|
||||
self.parser.error(f"SSH key file '{args.new_ssh_key}' is not a valid SSH private key")
|
||||
|
||||
return final_ssh_key, final_ssh_agent, final_ssh_key_agent_pub
|
||||
|
||||
def run(self, args):
|
||||
print(f"Updating a Gitea credentials entry with name '{args.name}' ...", file=sys.stderr)
|
||||
print(f" * Config path: {self.gitea_conf.path}", file=sys.stderr)
|
||||
@@ -34,8 +100,8 @@ class LoginUpdateCommand(osc.commandline_git.GitObsCommand):
|
||||
# TODO: try to authenticate to verify that the updated entry works
|
||||
|
||||
original_login_obj = self.gitea_conf.get_login(args.name)
|
||||
print("Original entry:")
|
||||
print(original_login_obj.to_human_readable_string())
|
||||
|
||||
final_ssh_key, final_ssh_agent, final_ssh_key_agent_pub = self._get_ssh_settings(args, original_login_obj)
|
||||
|
||||
if args.new_token == "-":
|
||||
print(file=sys.stderr)
|
||||
@@ -51,7 +117,7 @@ class LoginUpdateCommand(osc.commandline_git.GitObsCommand):
|
||||
new_git_uses_http = True
|
||||
else:
|
||||
new_git_uses_http = None
|
||||
|
||||
|
||||
if args.new_quiet in ("0", "no"):
|
||||
new_quiet = False
|
||||
elif args.new_quiet in ("1", "yes"):
|
||||
@@ -65,11 +131,17 @@ class LoginUpdateCommand(osc.commandline_git.GitObsCommand):
|
||||
new_url=args.new_url,
|
||||
new_user=args.new_user,
|
||||
new_token=args.new_token,
|
||||
new_ssh_key=args.new_ssh_key,
|
||||
new_ssh_key=final_ssh_key,
|
||||
new_ssh_agent=final_ssh_agent,
|
||||
new_ssh_key_agent_pub=final_ssh_key_agent_pub,
|
||||
new_git_uses_http=new_git_uses_http,
|
||||
new_quiet=new_quiet,
|
||||
set_as_default=args.set_as_default,
|
||||
)
|
||||
|
||||
print("")
|
||||
print("Original entry:")
|
||||
print(original_login_obj.to_human_readable_string())
|
||||
print("")
|
||||
print("Updated entry:")
|
||||
print(updated_login_obj.to_human_readable_string())
|
||||
|
||||
@@ -18,8 +18,10 @@ class Login(BaseModel):
|
||||
name: str = Field() # type: ignore[assignment]
|
||||
url: str = Field() # type: ignore[assignment]
|
||||
user: str = Field() # type: ignore[assignment]
|
||||
token: str = Field() # type: ignore[assignment]
|
||||
token: Optional[str] = Field() # type: ignore[assignment]
|
||||
ssh_key: Optional[str] = Field() # type: ignore[assignment]
|
||||
ssh_agent: Optional[bool] = Field() # type: ignore[assignment]
|
||||
ssh_key_agent_pub: Optional[str] = Field() # type: ignore[assignment]
|
||||
git_uses_http: Optional[bool] = Field() # type: ignore[assignment]
|
||||
quiet: Optional[bool] = Field() # type: ignore[assignment]
|
||||
default: Optional[bool] = Field() # type: ignore[assignment]
|
||||
@@ -77,6 +79,10 @@ class Login(BaseModel):
|
||||
table.add("User", self.user)
|
||||
if self.ssh_key:
|
||||
table.add("Private SSH key path", self.ssh_key)
|
||||
if self.ssh_agent:
|
||||
table.add("Use ssh-agent", "yes" if self.ssh_agent else "no")
|
||||
if self.ssh_key_agent_pub:
|
||||
table.add("Public SSH key for ssh-agent", self.ssh_key_agent_pub)
|
||||
if self.git_uses_http:
|
||||
table.add("Git uses http(s)", "yes" if self.git_uses_http else "no")
|
||||
if self.quiet:
|
||||
@@ -241,6 +247,8 @@ class Config:
|
||||
new_user: Optional[str] = None,
|
||||
new_token: Optional[str] = None,
|
||||
new_ssh_key: Optional[str] = None,
|
||||
new_ssh_agent: Optional[bool] = None,
|
||||
new_ssh_key_agent_pub: Optional[str] = None,
|
||||
new_git_uses_http: Optional[bool] = None,
|
||||
new_quiet: Optional[bool] = None,
|
||||
set_as_default: Optional[bool] = None,
|
||||
@@ -255,8 +263,18 @@ class Config:
|
||||
login.user = new_user
|
||||
if new_token is not None:
|
||||
login.token = new_token
|
||||
if new_ssh_key is not None:
|
||||
# ssh_key can be set to None to switch to SSH agent authentication
|
||||
if new_ssh_key is None and login.ssh_key:
|
||||
login.ssh_key = None
|
||||
elif new_ssh_key is not None:
|
||||
login.ssh_key = new_ssh_key
|
||||
# ssh_agent can be set to None to switch to SSH key authentication
|
||||
if new_ssh_agent is None and login.ssh_agent:
|
||||
login.ssh_agent = None
|
||||
elif new_ssh_agent is not None:
|
||||
login.ssh_agent = new_ssh_agent
|
||||
if new_ssh_key_agent_pub is not None:
|
||||
login.ssh_key_agent_pub = new_ssh_key_agent_pub
|
||||
|
||||
if new_git_uses_http is None:
|
||||
# keep the original value
|
||||
|
||||
@@ -13,7 +13,7 @@ import urllib3.exceptions
|
||||
import urllib3.response
|
||||
|
||||
from .conf import Login
|
||||
|
||||
from .http_signer import HttpSigner
|
||||
|
||||
RE_HTTP_HEADER_LINK = re.compile('<(?P<url>.*?)>; rel="(?P<rel>.*?)",?')
|
||||
|
||||
@@ -93,7 +93,7 @@ class Connection:
|
||||
"""
|
||||
Return relative url prefixed with "/api/v1/" followed with concatenated ``*path``.
|
||||
"""
|
||||
url_path = ["", "api", "v1"] + [urllib.parse.quote(i, safe="/:") for i in path]
|
||||
url_path = ["", "api", "v1"] + [urllib.parse.quote(i.lstrip("/"), safe="/:") for i in path]
|
||||
url_path_str = "/".join(url_path)
|
||||
|
||||
if query is None:
|
||||
@@ -130,7 +130,12 @@ class Connection:
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
if self.login.token:
|
||||
if self.login.ssh_key or self.login.ssh_agent:
|
||||
try:
|
||||
headers.update(HttpSigner(self.login).get_signed_header(method, url))
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to sign the request using SSH credentials: {e}")
|
||||
elif self.login.token:
|
||||
headers["Authorization"] = f"token {self.login.token}"
|
||||
|
||||
if json_data and isinstance(json_data, dict):
|
||||
|
||||
210
osc/gitea_api/http_signer.py
Normal file
210
osc/gitea_api/http_signer.py
Normal file
@@ -0,0 +1,210 @@
|
||||
import datetime
|
||||
import os
|
||||
import base64
|
||||
import hashlib
|
||||
import sys
|
||||
|
||||
class HttpSigner:
|
||||
def __init__(self, login_obj):
|
||||
self.login_obj = login_obj
|
||||
|
||||
def _get_signer_from_file(self):
|
||||
"""
|
||||
Returns a signing function and algorithm name using a local private key file.
|
||||
"""
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa, ed25519, padding
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
|
||||
key_path = self.login_obj.ssh_key
|
||||
|
||||
if not os.path.exists(key_path):
|
||||
raise FileNotFoundError(f"SSH private key not found at: {key_path}")
|
||||
|
||||
with open(key_path, "rb") as key_file:
|
||||
private_key = serialization.load_ssh_private_key(key_file.read(), password=None)
|
||||
|
||||
algorithm_name = ""
|
||||
if isinstance(private_key, ed25519.Ed25519PrivateKey):
|
||||
algorithm_name = "ed25519"
|
||||
|
||||
def sign_func(data_bytes):
|
||||
return private_key.sign(data_bytes)
|
||||
|
||||
elif isinstance(private_key, rsa.RSAPrivateKey):
|
||||
algorithm_name = "rsa-sha512"
|
||||
|
||||
def sign_func(data_bytes):
|
||||
return private_key.sign(
|
||||
data_bytes,
|
||||
padding.PKCS1v15(),
|
||||
hashes.SHA512()
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unsupported key type: {type(private_key)}")
|
||||
|
||||
return sign_func, algorithm_name
|
||||
|
||||
def _get_signer_from_agent(self):
|
||||
"""
|
||||
Returns a signing function and algorithm name using the SSH Agent.
|
||||
Requires 'paramiko'.
|
||||
"""
|
||||
try:
|
||||
import paramiko
|
||||
except ImportError:
|
||||
raise ImportError("The 'paramiko' library is required for SSH Agent support. Please run: pip install paramiko")
|
||||
|
||||
agent = paramiko.agent.Agent()
|
||||
agent_keys = agent.get_keys()
|
||||
|
||||
if not agent_keys:
|
||||
raise RuntimeError("No keys found in SSH Agent. Is it running and have you added keys (ssh-add)?")
|
||||
|
||||
target_fingerprint = self.login_obj.ssh_key_agent_pub
|
||||
found_key = None
|
||||
|
||||
# Remove "SHA256:" prefix if present for calculation/comparison
|
||||
target_fp_clean = target_fingerprint
|
||||
if target_fp_clean.startswith("SHA256:"):
|
||||
target_fp_clean = target_fp_clean[7:]
|
||||
|
||||
for key in agent_keys:
|
||||
# Calculate SHA256 fingerprint of the key to match against configuration
|
||||
# Paramiko keys have .asbytes() which gives the public key blob
|
||||
key_blob = key.asbytes()
|
||||
fp_bytes = hashlib.sha256(key_blob).digest()
|
||||
# OpenSSH uses standard base64 without padding usually, but python's b64encode adds padding.
|
||||
# We need to strip standard padding '='
|
||||
fp_str = base64.b64encode(fp_bytes).decode('ascii').rstrip('=')
|
||||
|
||||
if fp_str == target_fp_clean:
|
||||
found_key = key
|
||||
break
|
||||
|
||||
if not found_key:
|
||||
print("Available keys in agent:")
|
||||
for key in agent_keys:
|
||||
key_blob = key.asbytes()
|
||||
fp_bytes = hashlib.sha256(key_blob).digest()
|
||||
fp_str = base64.b64encode(fp_bytes).decode('ascii').rstrip('=')
|
||||
print(f" - {key.get_name()} SHA256:{fp_str}")
|
||||
raise ValueError(f"Key with fingerprint {target_fingerprint} not found in SSH Agent.")
|
||||
|
||||
#print(f"Using SSH Agent key: {found_key.get_name()}")
|
||||
|
||||
# Determine algorithm name
|
||||
# Paramiko's get_name() usually returns 'ssh-rsa', 'ssh-ed25519', etc.
|
||||
algo_map = {
|
||||
"ssh-ed25519": "ed25519",
|
||||
"ssh-rsa": "rsa-sha512", # We assume we want sha512 for RSA
|
||||
"rsa-sha2-512": "rsa-sha512",
|
||||
"rsa-sha2-256": "rsa-sha256"
|
||||
}
|
||||
|
||||
# Default to the key type, mapped or raw
|
||||
algorithm_name = algo_map.get(found_key.get_name(), found_key.get_name())
|
||||
|
||||
def sign_func(data_bytes):
|
||||
# agent.sign_data returns a Message object or bytes containing the SSH signature blob
|
||||
# The SSH signature blob is: [4 byte len][algo name][4 byte len][signature]
|
||||
# We need to send the signature part.
|
||||
|
||||
# Note: For RSA, we might need to specify flags to get SHA512.
|
||||
# Paramiko's sign_ssh_data doesn't easily expose flags in the high level AgentKey API
|
||||
# in older versions, but let's try just signing.
|
||||
# Modern agents usually negotiate or we might just get what we get.
|
||||
|
||||
# If it is RSA, we really want rsa-sha2-512.
|
||||
# But paramiko AgentKey.sign_ssh_data just calls the agent.
|
||||
|
||||
# result is a paramiko.message.Message or bytes
|
||||
# It typically contains the entire blob: [string algo][string sig]
|
||||
# Use key.sign_ssh_data(data)
|
||||
|
||||
sig_result = found_key.sign_ssh_data(data_bytes)
|
||||
|
||||
# sig_result is usually a paramiko.Message object.
|
||||
if hasattr(sig_result, 'rewind'):
|
||||
sig_result.rewind()
|
||||
# The structure is: string(algo), string(signature_blob)
|
||||
sig_algo = sig_result.get_string()
|
||||
sig_blob = sig_result.get_string()
|
||||
return sig_blob
|
||||
else:
|
||||
# Maybe it returned bytes? Parse it manually
|
||||
# SSH string format: 4 bytes len, string data
|
||||
# We skip the algo name
|
||||
import struct
|
||||
if not isinstance(sig_result, bytes):
|
||||
raise TypeError(f"Unexpected return type from sign_ssh_data: {type(sig_result)}")
|
||||
|
||||
offset = 0
|
||||
algo_len = struct.unpack('>I', sig_result[offset:offset+4])[0]
|
||||
offset += 4 + algo_len
|
||||
|
||||
sig_len = struct.unpack('>I', sig_result[offset:offset+4])[0]
|
||||
offset += 4
|
||||
sig_blob = sig_result[offset:offset+sig_len]
|
||||
return sig_blob
|
||||
|
||||
return sign_func, algorithm_name
|
||||
|
||||
def get_signed_header(self, method, path):
|
||||
"""
|
||||
Sign the request data using the configured authentication method (SSH key or agent).
|
||||
Returns a tuple of (signature, algorithm_name).
|
||||
"""
|
||||
if self.login_obj.ssh_key:
|
||||
sign_func, algorithm_name = self._get_signer_from_file()
|
||||
elif self.login_obj.ssh_agent:
|
||||
sign_func, algorithm_name = self._get_signer_from_agent()
|
||||
else:
|
||||
raise ValueError("No SSH authentication method configured for this login entry.")
|
||||
|
||||
# Timestamps
|
||||
now = datetime.datetime.now(datetime.timezone.utc)
|
||||
created = int(now.timestamp())
|
||||
expires = int((now + datetime.timedelta(seconds=10)).timestamp())
|
||||
|
||||
# (request-target): lowercase_method path
|
||||
if path.startswith("http"):
|
||||
from urllib.parse import urlparse
|
||||
|
||||
parsed = urlparse(path)
|
||||
path = parsed.path
|
||||
if parsed.query:
|
||||
path += f"?{parsed.query}"
|
||||
|
||||
request_target_value = f"{method.lower()} {path}"
|
||||
|
||||
#print(f"Signing request target: {request_target_value}")
|
||||
|
||||
signing_string = (
|
||||
f"(request-target): {request_target_value}\n"
|
||||
f"(created): {created}\n"
|
||||
f"(expires): {expires}"
|
||||
)
|
||||
|
||||
try:
|
||||
signature_bytes = sign_func(signing_string.encode('utf-8'))
|
||||
signature_b64 = base64.b64encode(signature_bytes).decode('utf-8')
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to sign data: {e}")
|
||||
|
||||
headers_list = "(request-target) (created) (expires)"
|
||||
|
||||
signature_header = (
|
||||
f'keyId="{self.login_obj.ssh_key_agent_pub}",'
|
||||
f'algorithm="{algorithm_name}",'
|
||||
f'headers="{headers_list}",'
|
||||
f'signature="{signature_b64}",'
|
||||
f'created={created},'
|
||||
f'expires={expires}'
|
||||
)
|
||||
|
||||
headers = {
|
||||
'Signature': signature_header
|
||||
}
|
||||
|
||||
return headers
|
||||
Reference in New Issue
Block a user