mirror of
https://github.com/openSUSE/osc.git
synced 2025-01-03 21:36:15 +01:00
Implement reading credentials from environmental variables
Options for apiurls can be set via OSC_HOST_<ALIAS>_<OPTION>=... This requires a configured alias in the config file. Setting the default apiurl via OSC_APIURL=... was working already. Also OSC_CONFIG= / --config= was already implemented to skip loading configuration entirely. Options for the default apiurl can be now set via: OSC_USERNAME=... OSC_CREDENTIALS_MGR_CLASS=... OSC_PASSWORD=... This, for example, makes running osc in containers with credentials stored in environmental variables possible: OSC_CONFIG= OSC_APIURL=<url> OSC_USERNAME=<user> OSC_PASSWORD=<password> osc ...
This commit is contained in:
parent
7d27b6d140
commit
82216c72b4
80
osc/conf.py
80
osc/conf.py
@ -1872,29 +1872,44 @@ def get_config(override_conffile=None,
|
|||||||
urls = [i for i in cp.sections() if i != "general"]
|
urls = [i for i in cp.sections() if i != "general"]
|
||||||
for url in urls:
|
for url in urls:
|
||||||
apiurl = sanitize_apiurl(url)
|
apiurl = sanitize_apiurl(url)
|
||||||
username = cp[url].get("user", None)
|
# the username will be overwritten later while reading actual config values
|
||||||
if username is None:
|
username = cp[url].get("user", "")
|
||||||
raise oscerr.ConfigMissingCredentialsError(f"No user found in section {url}", conffile, url)
|
|
||||||
|
|
||||||
host_options = HostOptions(apiurl=apiurl, username=username, _parent=config)
|
host_options = HostOptions(apiurl=apiurl, username=username, _parent=config)
|
||||||
|
|
||||||
known_ini_keys = set()
|
known_ini_keys = set()
|
||||||
for name, field in host_options.__fields__.items():
|
for name, field in host_options.__fields__.items():
|
||||||
|
# the following code relies on interating through fields in a given order: aliases, username, credentials_mgr_class, password
|
||||||
|
|
||||||
ini_key = field.extra.get("ini_key", name)
|
ini_key = field.extra.get("ini_key", name)
|
||||||
known_ini_keys.add(ini_key)
|
known_ini_keys.add(ini_key)
|
||||||
|
|
||||||
if name == "password":
|
# iterate through aliases and store the value of the the first env that matches OSC_HOST_{ALIAS}_{NAME}
|
||||||
# we need to handle the password first because it may be stored in a keyring instead of a config file
|
env_value = None
|
||||||
creds_mgr = _get_credentials_manager(url, cp)
|
for alias in host_options.aliases:
|
||||||
value = creds_mgr.get_password(url, host_options.username, defer=True, apiurl=host_options.apiurl)
|
alias = alias.replace("-", "_")
|
||||||
if value is None:
|
env_key = f"OSC_HOST_{alias.upper()}_{name.upper()}"
|
||||||
raise oscerr.ConfigMissingCredentialsError("No password found in section {url}", conffile, url)
|
env_value = os.environ.get(env_key, None)
|
||||||
value = Password(value)
|
if env_value is not None:
|
||||||
|
break
|
||||||
|
|
||||||
|
if env_value is not None:
|
||||||
|
value = env_value
|
||||||
elif ini_key in cp[url]:
|
elif ini_key in cp[url]:
|
||||||
value = cp[url][ini_key]
|
value = cp[url][ini_key]
|
||||||
else:
|
else:
|
||||||
continue
|
value = None
|
||||||
|
|
||||||
host_options.set_value_from_string(name, value)
|
if name == "credentials_mgr_class":
|
||||||
|
# HACK: inject credentials_mgr_class back in case we have specified it from env to have it available for reading password
|
||||||
|
if value:
|
||||||
|
cp[url][credentials.AbstractCredentialsManager.config_entry] = value
|
||||||
|
elif name == "password":
|
||||||
|
creds_mgr = _get_credentials_manager(url, cp)
|
||||||
|
if env_value is None:
|
||||||
|
value = creds_mgr.get_password(url, host_options.username, defer=True, apiurl=host_options.apiurl)
|
||||||
|
|
||||||
|
if value is not None:
|
||||||
|
host_options.set_value_from_string(name, value)
|
||||||
|
|
||||||
for key, value in cp[url].items():
|
for key, value in cp[url].items():
|
||||||
if key.startswith("_"):
|
if key.startswith("_"):
|
||||||
@ -1945,6 +1960,45 @@ def get_config(override_conffile=None,
|
|||||||
|
|
||||||
config.set_value_from_string(name, value)
|
config.set_value_from_string(name, value)
|
||||||
|
|
||||||
|
# BEGIN: override credentials for the default apiurl
|
||||||
|
|
||||||
|
# OSC_APIURL is handled already because it's a regular field
|
||||||
|
env_username = os.environ.get("OSC_USERNAME", "")
|
||||||
|
env_credentials_mgr_class = os.environ.get("OSC_CREDENTIALS_MGR_CLASS", None)
|
||||||
|
env_password = os.environ.get("OSC_PASSWORD", None)
|
||||||
|
|
||||||
|
if config.apiurl not in config.api_host_options:
|
||||||
|
host_options = HostOptions(apiurl=config.apiurl, username=env_username, _parent=config)
|
||||||
|
config.api_host_options[config.apiurl] = host_options
|
||||||
|
# HACK: inject section so we can add credentials_mgr_class later
|
||||||
|
cp.add_section(config.apiurl)
|
||||||
|
|
||||||
|
host_options = config.api_host_options[config.apiurl]
|
||||||
|
if env_username:
|
||||||
|
host_options.set_value_from_string("username", env_username)
|
||||||
|
|
||||||
|
if env_credentials_mgr_class:
|
||||||
|
host_options.set_value_from_string("credentials_mgr_class", env_credentials_mgr_class)
|
||||||
|
# HACK: inject credentials_mgr_class in case we have specified it from env to have it available for reading password
|
||||||
|
cp[config.apiurl]["credentials_mgr_class"] = env_credentials_mgr_class
|
||||||
|
|
||||||
|
if env_password:
|
||||||
|
password = Password(env_password)
|
||||||
|
host_options.password = password
|
||||||
|
elif env_credentials_mgr_class:
|
||||||
|
creds_mgr = _get_credentials_manager(config.apiurl, cp)
|
||||||
|
password = creds_mgr.get_password(config.apiurl, host_options.username, defer=True, apiurl=host_options.apiurl)
|
||||||
|
host_options.password = password
|
||||||
|
|
||||||
|
# END: override credentials for the default apiurl
|
||||||
|
|
||||||
|
for apiurl, host_options in config.api_host_options.items():
|
||||||
|
if not host_options.username:
|
||||||
|
raise oscerr.ConfigMissingCredentialsError(f"No user configured for apiurl {apiurl}", conffile, apiurl)
|
||||||
|
|
||||||
|
if not host_options.password:
|
||||||
|
raise oscerr.ConfigMissingCredentialsError(f"No password configured for apiurl {apiurl}", conffile, apiurl)
|
||||||
|
|
||||||
for key, value in cp["general"].items():
|
for key, value in cp["general"].items():
|
||||||
if key.startswith("_"):
|
if key.startswith("_"):
|
||||||
continue
|
continue
|
||||||
|
@ -5,6 +5,8 @@ import unittest
|
|||||||
|
|
||||||
import osc.conf
|
import osc.conf
|
||||||
|
|
||||||
|
from .common import patch
|
||||||
|
|
||||||
|
|
||||||
OSCRC = """
|
OSCRC = """
|
||||||
[general]
|
[general]
|
||||||
@ -85,7 +87,7 @@ credentials_mgr_class=osc.credentials.PlaintextConfigFileCredentialsManager
|
|||||||
user = Admin
|
user = Admin
|
||||||
pass = opensuse
|
pass = opensuse
|
||||||
passx = unused
|
passx = unused
|
||||||
aliases = osc
|
aliases = obs
|
||||||
http_headers =
|
http_headers =
|
||||||
Authorization: Basic QWRtaW46b3BlbnN1c2U=
|
Authorization: Basic QWRtaW46b3BlbnN1c2U=
|
||||||
X-Foo: Bar
|
X-Foo: Bar
|
||||||
@ -423,7 +425,7 @@ class TestExampleConfig(unittest.TestCase):
|
|||||||
self.assertEqual(host_options._extra_fields, {"plugin-option": "plugin-host-option", "new-option": "value"})
|
self.assertEqual(host_options._extra_fields, {"plugin-option": "plugin-host-option", "new-option": "value"})
|
||||||
|
|
||||||
def test_apiurl_aliases(self):
|
def test_apiurl_aliases(self):
|
||||||
expected = {"https://api.opensuse.org": "https://api.opensuse.org", "osc": "https://api.opensuse.org"}
|
expected = {"https://api.opensuse.org": "https://api.opensuse.org", "obs": "https://api.opensuse.org"}
|
||||||
self.assertEqual(self.config.apiurl_aliases, expected)
|
self.assertEqual(self.config.apiurl_aliases, expected)
|
||||||
self.assertEqual(self.config["apiurl_aliases"], expected)
|
self.assertEqual(self.config["apiurl_aliases"], expected)
|
||||||
|
|
||||||
@ -489,5 +491,157 @@ class TestConf(unittest.TestCase):
|
|||||||
self.assertNotEqual(id(conf1.api_host_options), id(conf2.api_host_options))
|
self.assertNotEqual(id(conf1.api_host_options), id(conf2.api_host_options))
|
||||||
|
|
||||||
|
|
||||||
|
class TestCredentialsFromEnv(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
osc.conf.config = None
|
||||||
|
self.oscrc = ""
|
||||||
|
|
||||||
|
@patch.dict(os.environ, {"OSC_APIURL": "https://example.com"}, clear=True)
|
||||||
|
def test_new_apiurl(self):
|
||||||
|
# missing user
|
||||||
|
self.assertRaises(
|
||||||
|
osc.oscerr.ConfigMissingCredentialsError,
|
||||||
|
osc.conf.get_config,
|
||||||
|
override_conffile=self.oscrc,
|
||||||
|
)
|
||||||
|
|
||||||
|
@patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{"OSC_APIURL": "https://example.com", "OSC_USERNAME": "user"},
|
||||||
|
clear=True,
|
||||||
|
)
|
||||||
|
def test_new_apiurl_username(self):
|
||||||
|
# missing password
|
||||||
|
self.assertRaises(
|
||||||
|
osc.oscerr.ConfigMissingCredentialsError,
|
||||||
|
osc.conf.get_config,
|
||||||
|
override_conffile=self.oscrc,
|
||||||
|
)
|
||||||
|
|
||||||
|
@patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{
|
||||||
|
"OSC_APIURL": "https://example.com",
|
||||||
|
"OSC_USERNAME": "user",
|
||||||
|
"OSC_PASSWORD": "secret",
|
||||||
|
},
|
||||||
|
clear=True,
|
||||||
|
)
|
||||||
|
def test_new_apiurl_username_password(self):
|
||||||
|
# missing password
|
||||||
|
osc.conf.get_config(override_conffile=self.oscrc)
|
||||||
|
conf = osc.conf.config
|
||||||
|
host_options = conf["api_host_options"][conf["apiurl"]]
|
||||||
|
self.assertEqual(conf.apiurl, "https://example.com")
|
||||||
|
self.assertEqual(host_options.apiurl, "https://example.com")
|
||||||
|
self.assertEqual(host_options.username, "user")
|
||||||
|
self.assertEqual(host_options.password, "secret")
|
||||||
|
self.assertEqual(host_options.credentials_mgr_class, None)
|
||||||
|
|
||||||
|
@patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{
|
||||||
|
"OSC_APIURL": "https://example.com",
|
||||||
|
"OSC_USERNAME": "user",
|
||||||
|
"OSC_PASSWORD": "secret",
|
||||||
|
},
|
||||||
|
clear=True,
|
||||||
|
)
|
||||||
|
def test_new_apiurl_username_password(self):
|
||||||
|
# missing password
|
||||||
|
osc.conf.get_config(override_conffile=self.oscrc)
|
||||||
|
conf = osc.conf.config
|
||||||
|
host_options = conf["api_host_options"][conf["apiurl"]]
|
||||||
|
self.assertEqual(conf.apiurl, "https://example.com")
|
||||||
|
self.assertEqual(host_options.apiurl, "https://example.com")
|
||||||
|
self.assertEqual(host_options.username, "user")
|
||||||
|
self.assertEqual(host_options.password, "secret")
|
||||||
|
self.assertEqual(host_options.credentials_mgr_class, None)
|
||||||
|
|
||||||
|
@patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{
|
||||||
|
"OSC_APIURL": "https://example.com",
|
||||||
|
"OSC_USERNAME": "user",
|
||||||
|
"OSC_PASSWORD": "secret",
|
||||||
|
"OSC_CREDENTIALS_MGR_CLASS": "osc.credentials.PlaintextConfigFileCredentialsManager",
|
||||||
|
},
|
||||||
|
clear=True,
|
||||||
|
)
|
||||||
|
def test_new_apiurl_username_password_credmgr(self):
|
||||||
|
# missing password
|
||||||
|
osc.conf.get_config(override_conffile=self.oscrc)
|
||||||
|
conf = osc.conf.config
|
||||||
|
host_options = conf["api_host_options"][conf.apiurl]
|
||||||
|
self.assertEqual(conf.apiurl, "https://example.com")
|
||||||
|
self.assertEqual(host_options.apiurl, "https://example.com")
|
||||||
|
self.assertEqual(host_options.username, "user")
|
||||||
|
self.assertEqual(host_options.password, "secret")
|
||||||
|
self.assertEqual(host_options.credentials_mgr_class, "osc.credentials.PlaintextConfigFileCredentialsManager")
|
||||||
|
|
||||||
|
|
||||||
|
class TestHostOptionsFromEnv(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.tmpdir = tempfile.mkdtemp(prefix="osc_test_")
|
||||||
|
self.oscrc = os.path.join(self.tmpdir, "oscrc")
|
||||||
|
with open(self.oscrc, "w", encoding="utf-8") as f:
|
||||||
|
f.write(OSCRC)
|
||||||
|
osc.conf.get_config(override_conffile=self.oscrc)
|
||||||
|
self.config = osc.conf.config
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
shutil.rmtree(self.tmpdir)
|
||||||
|
|
||||||
|
@patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{
|
||||||
|
"OSC_HOST_OBS_USERNAME": "user",
|
||||||
|
"OSC_HOST_OBS_PASSWORD": "secret",
|
||||||
|
"OSC_HOST_OBS_CREDENTIALS_MGR_CLASS": "osc.credentials.PlaintextConfigFileCredentialsManager",
|
||||||
|
"OSC_HOST_OBS_REALNAME": "User",
|
||||||
|
"OSC_HOST_OBS_EMAIL": "user@example.com",
|
||||||
|
},
|
||||||
|
clear=True,
|
||||||
|
)
|
||||||
|
def test_host_options(self):
|
||||||
|
osc.conf.get_config(override_conffile=self.oscrc)
|
||||||
|
conf = osc.conf.config
|
||||||
|
host_options = conf["api_host_options"][conf["apiurl"]]
|
||||||
|
self.assertEqual(conf.apiurl, "https://api.opensuse.org")
|
||||||
|
self.assertEqual(host_options.apiurl, "https://api.opensuse.org")
|
||||||
|
self.assertEqual(host_options.username, "user")
|
||||||
|
self.assertEqual(host_options.password, "secret")
|
||||||
|
self.assertEqual(host_options.credentials_mgr_class, "osc.credentials.PlaintextConfigFileCredentialsManager")
|
||||||
|
self.assertEqual(host_options.realname, "User")
|
||||||
|
self.assertEqual(host_options.email, "user@example.com")
|
||||||
|
|
||||||
|
@patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{
|
||||||
|
"OSC_HOST_OBS_USERNAME": "user",
|
||||||
|
"OSC_HOST_OBS_PASSWORD": "secret",
|
||||||
|
"OSC_HOST_OBS_CREDENTIALS_MGR_CLASS": "osc.credentials.PlaintextConfigFileCredentialsManager",
|
||||||
|
"OSC_HOST_OBS_REALNAME": "User",
|
||||||
|
"OSC_HOST_OBS_EMAIL": "user@example.com",
|
||||||
|
"OSC_USERNAME": "USER",
|
||||||
|
"OSC_PASSWORD": "SECRET",
|
||||||
|
"OSC_CREDENTIALS_MGR_CLASS": "osc.credentials.TransientCredentialsManager",
|
||||||
|
},
|
||||||
|
clear=True,
|
||||||
|
)
|
||||||
|
def test_host_options_overrides(self):
|
||||||
|
# thest if OSC_{USERNAME,PASSWORD,CREDENTIALS_MGR_CLASS} prevail over OSC_HOST_* options
|
||||||
|
osc.conf.get_config(override_conffile=self.oscrc)
|
||||||
|
conf = osc.conf.config
|
||||||
|
host_options = conf["api_host_options"][conf["apiurl"]]
|
||||||
|
self.assertEqual(conf.apiurl, "https://api.opensuse.org")
|
||||||
|
self.assertEqual(host_options.apiurl, "https://api.opensuse.org")
|
||||||
|
self.assertEqual(host_options.username, "USER")
|
||||||
|
self.assertEqual(host_options.password, "SECRET")
|
||||||
|
self.assertEqual(host_options.credentials_mgr_class, "osc.credentials.TransientCredentialsManager")
|
||||||
|
self.assertEqual(host_options.realname, "User")
|
||||||
|
self.assertEqual(host_options.email, "user@example.com")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
Reference in New Issue
Block a user