diff --git a/osc/conf.py b/osc/conf.py index 97503aea..c25861b8 100644 --- a/osc/conf.py +++ b/osc/conf.py @@ -1824,29 +1824,33 @@ def get_config(override_conffile=None, else: conffile = identify_conf() - conffile = os.path.expanduser(conffile) - if not os.path.exists(conffile): - raise oscerr.NoConfigfile(conffile, account_not_configured_text % conffile) + if conffile in ["", "/dev/null"]: + cp = OscConfigParser.OscConfigParser() + cp.add_section("general") + else: + conffile = os.path.expanduser(conffile) + if not os.path.exists(conffile): + raise oscerr.NoConfigfile(conffile, account_not_configured_text % conffile) - # make sure oscrc is not world readable, it may contain a password - conffile_stat = os.stat(conffile) - if conffile_stat.st_mode != 0o600: - try: - os.chmod(conffile, 0o600) - except OSError as e: - if e.errno in (errno.EROFS, errno.EPERM): - print(f"Warning: Configuration file '{conffile}' may have insecure file permissions.") - else: - raise e + # make sure oscrc is not world readable, it may contain a password + conffile_stat = os.stat(conffile) + if conffile_stat.st_mode != 0o600: + try: + os.chmod(conffile, 0o600) + except OSError as e: + if e.errno in (errno.EROFS, errno.EPERM): + print(f"Warning: Configuration file '{conffile}' may have insecure file permissions.") + else: + raise e - cp = get_configParser(conffile) + cp = get_configParser(conffile) - if not cp.has_section('general'): - # FIXME: it might be sufficient to just assume defaults? - msg = config_incomplete_text % conffile - defaults = Options().dict() - msg += new_conf_template % defaults - raise oscerr.ConfigError(msg, conffile) + if not cp.has_section("general"): + # FIXME: it might be sufficient to just assume defaults? + msg = config_incomplete_text % conffile + defaults = Options().dict() + msg += new_conf_template % defaults + raise oscerr.ConfigError(msg, conffile) global config @@ -1868,29 +1872,44 @@ def get_config(override_conffile=None, urls = [i for i in cp.sections() if i != "general"] for url in urls: apiurl = sanitize_apiurl(url) - username = cp[url].get("user", None) - if username is None: - raise oscerr.ConfigMissingCredentialsError(f"No user found in section {url}", conffile, url) - + # the username will be overwritten later while reading actual config values + username = cp[url].get("user", "") host_options = HostOptions(apiurl=apiurl, username=username, _parent=config) + known_ini_keys = set() for name, field in host_options.__fields__.items(): + # the following code relies on interating through fields in a given order: aliases, username, credentials_mgr_class, password + ini_key = field.extra.get("ini_key", name) known_ini_keys.add(ini_key) - if name == "password": - # we need to handle the password first because it may be stored in a keyring instead of a config file - creds_mgr = _get_credentials_manager(url, cp) - value = creds_mgr.get_password(url, host_options.username, defer=True, apiurl=host_options.apiurl) - if value is None: - raise oscerr.ConfigMissingCredentialsError("No password found in section {url}", conffile, url) - value = Password(value) + # iterate through aliases and store the value of the the first env that matches OSC_HOST_{ALIAS}_{NAME} + env_value = None + for alias in host_options.aliases: + alias = alias.replace("-", "_") + env_key = f"OSC_HOST_{alias.upper()}_{name.upper()}" + env_value = os.environ.get(env_key, None) + if env_value is not None: + break + + if env_value is not None: + value = env_value elif ini_key in cp[url]: value = cp[url][ini_key] else: - continue + value = None - host_options.set_value_from_string(name, value) + if name == "credentials_mgr_class": + # HACK: inject credentials_mgr_class back in case we have specified it from env to have it available for reading password + if value: + cp[url][credentials.AbstractCredentialsManager.config_entry] = value + elif name == "password": + creds_mgr = _get_credentials_manager(url, cp) + if env_value is None: + value = creds_mgr.get_password(url, host_options.username, defer=True, apiurl=host_options.apiurl) + + if value is not None: + host_options.set_value_from_string(name, value) for key, value in cp[url].items(): if key.startswith("_"): @@ -1941,6 +1960,45 @@ def get_config(override_conffile=None, config.set_value_from_string(name, value) + # BEGIN: override credentials for the default apiurl + + # OSC_APIURL is handled already because it's a regular field + env_username = os.environ.get("OSC_USERNAME", "") + env_credentials_mgr_class = os.environ.get("OSC_CREDENTIALS_MGR_CLASS", None) + env_password = os.environ.get("OSC_PASSWORD", None) + + if config.apiurl not in config.api_host_options: + host_options = HostOptions(apiurl=config.apiurl, username=env_username, _parent=config) + config.api_host_options[config.apiurl] = host_options + # HACK: inject section so we can add credentials_mgr_class later + cp.add_section(config.apiurl) + + host_options = config.api_host_options[config.apiurl] + if env_username: + host_options.set_value_from_string("username", env_username) + + if env_credentials_mgr_class: + host_options.set_value_from_string("credentials_mgr_class", env_credentials_mgr_class) + # HACK: inject credentials_mgr_class in case we have specified it from env to have it available for reading password + cp[config.apiurl]["credentials_mgr_class"] = env_credentials_mgr_class + + if env_password: + password = Password(env_password) + host_options.password = password + elif env_credentials_mgr_class: + creds_mgr = _get_credentials_manager(config.apiurl, cp) + password = creds_mgr.get_password(config.apiurl, host_options.username, defer=True, apiurl=host_options.apiurl) + host_options.password = password + + # END: override credentials for the default apiurl + + for apiurl, host_options in config.api_host_options.items(): + if not host_options.username: + raise oscerr.ConfigMissingCredentialsError(f"No user configured for apiurl {apiurl}", conffile, apiurl) + + if not host_options.password: + raise oscerr.ConfigMissingCredentialsError(f"No password configured for apiurl {apiurl}", conffile, apiurl) + for key, value in cp["general"].items(): if key.startswith("_"): continue diff --git a/osc/credentials.py b/osc/credentials.py index 0674a8d9..70598827 100644 --- a/osc/credentials.py +++ b/osc/credentials.py @@ -74,7 +74,10 @@ class AbstractCredentialsManager: class PlaintextConfigFileCredentialsManager(AbstractCredentialsManager): def get_password(self, url, user, defer=True, apiurl=None): - return self._cp.get(url, 'pass', raw=True) + password = self._cp.get(url, "pass", fallback=None, raw=True) + if password is None: + return None + return conf.Password(password) def set_password(self, url, user, password): self._cp.set(url, 'pass', password) @@ -108,7 +111,8 @@ class ObfuscatedConfigFileCredentialsManager(PlaintextConfigFileCredentialsManag passwd = self._cp.get(url, 'passx', raw=True) else: passwd = super().get_password(url, user, apiurl=apiurl) - return self.decode_password(passwd) + password = self.decode_password(passwd) + return conf.Password(password) def set_password(self, url, user, password): compressed_pw = bz2.compress(password.encode('ascii')) diff --git a/tests/test_conf.py b/tests/test_conf.py index bb7cba38..3569632f 100644 --- a/tests/test_conf.py +++ b/tests/test_conf.py @@ -5,6 +5,8 @@ import unittest import osc.conf +from .common import patch + OSCRC = """ [general] @@ -85,7 +87,7 @@ credentials_mgr_class=osc.credentials.PlaintextConfigFileCredentialsManager user = Admin pass = opensuse passx = unused -aliases = osc +aliases = obs http_headers = Authorization: Basic QWRtaW46b3BlbnN1c2U= X-Foo: Bar @@ -423,7 +425,7 @@ class TestExampleConfig(unittest.TestCase): self.assertEqual(host_options._extra_fields, {"plugin-option": "plugin-host-option", "new-option": "value"}) def test_apiurl_aliases(self): - expected = {"https://api.opensuse.org": "https://api.opensuse.org", "osc": "https://api.opensuse.org"} + expected = {"https://api.opensuse.org": "https://api.opensuse.org", "obs": "https://api.opensuse.org"} self.assertEqual(self.config.apiurl_aliases, expected) self.assertEqual(self.config["apiurl_aliases"], expected) @@ -489,5 +491,157 @@ class TestConf(unittest.TestCase): self.assertNotEqual(id(conf1.api_host_options), id(conf2.api_host_options)) +class TestCredentialsFromEnv(unittest.TestCase): + def setUp(self): + osc.conf.config = None + self.oscrc = "" + + @patch.dict(os.environ, {"OSC_APIURL": "https://example.com"}, clear=True) + def test_new_apiurl(self): + # missing user + self.assertRaises( + osc.oscerr.ConfigMissingCredentialsError, + osc.conf.get_config, + override_conffile=self.oscrc, + ) + + @patch.dict( + os.environ, + {"OSC_APIURL": "https://example.com", "OSC_USERNAME": "user"}, + clear=True, + ) + def test_new_apiurl_username(self): + # missing password + self.assertRaises( + osc.oscerr.ConfigMissingCredentialsError, + osc.conf.get_config, + override_conffile=self.oscrc, + ) + + @patch.dict( + os.environ, + { + "OSC_APIURL": "https://example.com", + "OSC_USERNAME": "user", + "OSC_PASSWORD": "secret", + }, + clear=True, + ) + def test_new_apiurl_username_password(self): + # missing password + osc.conf.get_config(override_conffile=self.oscrc) + conf = osc.conf.config + host_options = conf["api_host_options"][conf["apiurl"]] + self.assertEqual(conf.apiurl, "https://example.com") + self.assertEqual(host_options.apiurl, "https://example.com") + self.assertEqual(host_options.username, "user") + self.assertEqual(host_options.password, "secret") + self.assertEqual(host_options.credentials_mgr_class, None) + + @patch.dict( + os.environ, + { + "OSC_APIURL": "https://example.com", + "OSC_USERNAME": "user", + "OSC_PASSWORD": "secret", + }, + clear=True, + ) + def test_new_apiurl_username_password(self): + # missing password + osc.conf.get_config(override_conffile=self.oscrc) + conf = osc.conf.config + host_options = conf["api_host_options"][conf["apiurl"]] + self.assertEqual(conf.apiurl, "https://example.com") + self.assertEqual(host_options.apiurl, "https://example.com") + self.assertEqual(host_options.username, "user") + self.assertEqual(host_options.password, "secret") + self.assertEqual(host_options.credentials_mgr_class, None) + + @patch.dict( + os.environ, + { + "OSC_APIURL": "https://example.com", + "OSC_USERNAME": "user", + "OSC_PASSWORD": "secret", + "OSC_CREDENTIALS_MGR_CLASS": "osc.credentials.PlaintextConfigFileCredentialsManager", + }, + clear=True, + ) + def test_new_apiurl_username_password_credmgr(self): + # missing password + osc.conf.get_config(override_conffile=self.oscrc) + conf = osc.conf.config + host_options = conf["api_host_options"][conf.apiurl] + self.assertEqual(conf.apiurl, "https://example.com") + self.assertEqual(host_options.apiurl, "https://example.com") + self.assertEqual(host_options.username, "user") + self.assertEqual(host_options.password, "secret") + self.assertEqual(host_options.credentials_mgr_class, "osc.credentials.PlaintextConfigFileCredentialsManager") + + +class TestHostOptionsFromEnv(unittest.TestCase): + def setUp(self): + self.tmpdir = tempfile.mkdtemp(prefix="osc_test_") + self.oscrc = os.path.join(self.tmpdir, "oscrc") + with open(self.oscrc, "w", encoding="utf-8") as f: + f.write(OSCRC) + osc.conf.get_config(override_conffile=self.oscrc) + self.config = osc.conf.config + + def tearDown(self): + shutil.rmtree(self.tmpdir) + + @patch.dict( + os.environ, + { + "OSC_HOST_OBS_USERNAME": "user", + "OSC_HOST_OBS_PASSWORD": "secret", + "OSC_HOST_OBS_CREDENTIALS_MGR_CLASS": "osc.credentials.PlaintextConfigFileCredentialsManager", + "OSC_HOST_OBS_REALNAME": "User", + "OSC_HOST_OBS_EMAIL": "user@example.com", + }, + clear=True, + ) + def test_host_options(self): + osc.conf.get_config(override_conffile=self.oscrc) + conf = osc.conf.config + host_options = conf["api_host_options"][conf["apiurl"]] + self.assertEqual(conf.apiurl, "https://api.opensuse.org") + self.assertEqual(host_options.apiurl, "https://api.opensuse.org") + self.assertEqual(host_options.username, "user") + self.assertEqual(host_options.password, "secret") + self.assertEqual(host_options.credentials_mgr_class, "osc.credentials.PlaintextConfigFileCredentialsManager") + self.assertEqual(host_options.realname, "User") + self.assertEqual(host_options.email, "user@example.com") + + @patch.dict( + os.environ, + { + "OSC_HOST_OBS_USERNAME": "user", + "OSC_HOST_OBS_PASSWORD": "secret", + "OSC_HOST_OBS_CREDENTIALS_MGR_CLASS": "osc.credentials.PlaintextConfigFileCredentialsManager", + "OSC_HOST_OBS_REALNAME": "User", + "OSC_HOST_OBS_EMAIL": "user@example.com", + "OSC_USERNAME": "USER", + "OSC_PASSWORD": "SECRET", + "OSC_CREDENTIALS_MGR_CLASS": "osc.credentials.TransientCredentialsManager", + }, + clear=True, + ) + def test_host_options_overrides(self): + # thest if OSC_{USERNAME,PASSWORD,CREDENTIALS_MGR_CLASS} prevail over OSC_HOST_* options + osc.conf.get_config(override_conffile=self.oscrc) + conf = osc.conf.config + host_options = conf["api_host_options"][conf["apiurl"]] + self.assertEqual(conf.apiurl, "https://api.opensuse.org") + self.assertEqual(host_options.apiurl, "https://api.opensuse.org") + self.assertEqual(host_options.username, "USER") + self.assertEqual(host_options.password, "SECRET") + self.assertEqual(host_options.credentials_mgr_class, "osc.credentials.TransientCredentialsManager") + self.assertEqual(host_options.realname, "User") + self.assertEqual(host_options.email, "user@example.com") + + if __name__ == "__main__": unittest.main()