import io import os from typing import Any from typing import Dict from typing import List from typing import Optional from osc import oscerr from osc.util import yaml as osc_yaml from osc.util.models import BaseModel from osc.util.models import Field class Login(BaseModel): # Any time the fields change, make corresponding changes in the ENV variables in the following places: # * Config.get_login() in this file - update ENV variables passed to Login() # * GitObsMainCommand.init_arguments in ../commandline_git.py - update help name: str = Field() # type: ignore[assignment] url: str = Field() # type: ignore[assignment] user: str = Field() # type: ignore[assignment] token: str = Field() # type: ignore[assignment] ssh_key: Optional[str] = Field() # type: ignore[assignment] git_uses_http: Optional[bool] = Field() # type: ignore[assignment] default: Optional[bool] = Field() # type: ignore[assignment] class AlreadyExists(oscerr.OscBaseError): def __init__(self, name): super().__init__() self.name = name def __str__(self): return f"Gitea config entry with name '{self.name}' already exists" class DoesNotExist(oscerr.OscBaseError): def __init__(self, **kwargs): super().__init__() self.kwargs = kwargs def __str__(self): if self.kwargs == {"name": None}: return ( "Could not find a default Gitea config entry\n" " * either set the default entry by running `git-obs login update --set-as-default`\n" " * or run the command with `-G/--gitea-login ` option" ) kwargs_str = ", ".join([f"{key}={value}" for key, value in self.kwargs.items()]) return ( f"Could not find a matching Gitea config entry: {kwargs_str}\n" " * see `git-obs login list` for valid entries" ) def __init__(self, **kwargs): # ignore extra fields for key in list(kwargs): if key not in self.__fields__: kwargs.pop(key, None) super().__init__(**kwargs) def to_human_readable_string(self, *, show_token: bool = False): from osc.output import KeyValueTable table = KeyValueTable(min_key_length=20) table.add("Name", self.name, color="bold") if self.default: table.add("Default", "true", color="bold") table.add("URL", self.url) table.add("User", self.user) if self.ssh_key: table.add("Private SSH key path", self.ssh_key) if self.git_uses_http: table.add("Git uses http(s)", "yes" if self.git_uses_http else "no") if show_token: # tokens are stored in the plain text, there's not reason to protect them too much # let's only hide them from the output by default table.add("Token", self.token) return f"{table}" class Config: """ Manage the tea config.yml file. No data is cached in the objects, all changes are in sync with the file on disk. """ def __init__(self, path: Optional[str] = None): if not path: path = "~/.config/tea/config.yml" self.path = os.path.abspath(os.path.expanduser(path)) self.logins: List[Login] = [] def _read(self) -> Dict[str, Any]: try: with open(self.path, "r") as f: return osc_yaml.yaml_load(f) except FileNotFoundError: return {} def _write(self, data): os.makedirs(os.path.dirname(self.path), mode=0o700, exist_ok=True) with open(self.path, "w") as f: osc_yaml.yaml_dump(data, f) def list_logins(self) -> List[Login]: data = self._read() result = [] for i in data.get("logins", []): login_name = i["name"] # override values from GIT_OBS_LOGIN_{login_name}_{field_name.upper()} for field_name in Login.__fields__.keys(): if field_name in ("name", "url"): continue value = os.environ.get(f"GIT_OBS_LOGIN_{login_name}_{field_name.upper()}", None) if value: i[field_name] = value login = Login(**i) result.append(login) return result def get_login(self, name: Optional[str] = None) -> Login: """ Return ``Login`` object for the given ``name``. If ``name`` equals to ``None``, return the default ``Login``. """ if name is None: # if login name was not explicitly specified, check if there are credentials specified in env env_gitea_url = os.environ.get("GIT_OBS_GITEA_URL", None) env_gitea_user = os.environ.get("GIT_OBS_GITEA_USER", None) env_gitea_token = os.environ.get("GIT_OBS_GITEA_TOKEN", None) env_ssh_key = os.environ.get("GIT_OBS_GITEA_SSH_KEY", None) if env_gitea_url and env_gitea_user and env_gitea_token: return Login( name="", url=env_gitea_url, user=env_gitea_user, token=env_gitea_token, ssh_key=env_ssh_key, ) for login in self.list_logins(): if name is None and login.default: return login if login.name == name: return login raise Login.DoesNotExist(name=name) def get_login_by_url_user(self, url: str, user: str) -> Login: """ Return ``Login`` object for the given ``url`` and ``user``. """ from .git import Git # exact match for login in self.list_logins(): if (login.url, login.user) == (url, user): return login def url_to_hostname(value): netloc = Git.urlparse(value).netloc # remove user from hostname if "@" in netloc: netloc = netloc.split("@")[-1] # remove port from hostname if ":" in netloc: netloc = netloc.split(":")[0] return netloc # match only hostname (netloc without 'user@' and ':port') + user for login in self.list_logins(): if (url_to_hostname(login.url), login.user) == (url_to_hostname(url), user): return login raise Login.DoesNotExist(url=url, user=user) def add_login(self, login: Login): data = self._read() data.setdefault("logins", []) for entry in data["logins"]: if entry.get("name", None) == login.name: raise Login.AlreadyExists(login.name) else: if login.default: entry.pop("default", None) data["logins"].append(login.dict()) self._write(data) def remove_login(self, name: str) -> Login: # throw an exception if the login name doesn't exist login = self.get_login(name) data = self._read() for num, entry in enumerate(list(data["logins"])): if entry.get("name", None) == login.name: data["logins"].pop(num) self._write(data) return login def update_login( self, name: str, new_name: Optional[str] = None, new_url: Optional[str] = None, new_user: Optional[str] = None, new_token: Optional[str] = None, new_ssh_key: Optional[str] = None, new_git_uses_http: Optional[bool] = None, set_as_default: Optional[bool] = None, ) -> Login: login = self.get_login(name) if new_name is not None: login.name = new_name if new_url is not None: login.url = new_url if new_user is not None: login.user = new_user if new_token is not None: login.token = new_token if new_ssh_key is not None: login.ssh_key = new_ssh_key if new_git_uses_http is None: # keep the original value pass elif new_git_uses_http: login.git_uses_http = True else: # remove from the config instead of setting to False login.git_uses_http = None if set_as_default: login.default = True if not login.has_changed(): return login data = self._read() for entry in data["logins"]: if entry.get("name", None) == name: entry.update(login.dict()) # remove keys with no value for key, value in entry.copy().items(): if value is None: del entry[key] else: if set_as_default: entry.pop("default", None) self._write(data) return login