mirror of
https://github.com/openSUSE/osc.git
synced 2024-12-25 17:36:13 +01:00
Correctly handle passwords with utf-8 characters
This commit is contained in:
parent
ac21d2b5ac
commit
d9676debb9
@ -32,17 +32,22 @@ class Osc:
|
||||
shutil.rmtree(self.temp)
|
||||
self.temp = tempfile.mkdtemp(prefix="osc_behave_")
|
||||
self.oscrc = os.path.join(self.temp, "oscrc")
|
||||
self.write_oscrc()
|
||||
|
||||
def write_oscrc(self, username=None, password=None):
|
||||
with open(self.oscrc, "w") as f:
|
||||
f.write("[general]\n")
|
||||
f.write("\n")
|
||||
f.write(f"[https://localhost:{self.context.podman.container.port}]\n")
|
||||
f.write("user=Admin\n")
|
||||
f.write("pass=opensuse\n")
|
||||
f.write(f"user={username or 'Admin'}\n")
|
||||
f.write(f"pass={password or 'opensuse'}\n")
|
||||
f.write("credentials_mgr_class=osc.credentials.PlaintextConfigFileCredentialsManager\n")
|
||||
f.write("sslcertck=0\n")
|
||||
f.write("http_headers =\n")
|
||||
# avoid the initial 401 response by setting auth to Admin:opensuse directly
|
||||
f.write(" authorization: Basic QWRtaW46b3BlbnN1c2U=\n")
|
||||
if not any((username, password)):
|
||||
f.write("http_headers =\n")
|
||||
# avoid the initial 401 response by setting auth to Admin:opensuse directly
|
||||
# write the header only when the default user/pass are used
|
||||
f.write(" authorization: Basic QWRtaW46b3BlbnN1c2U=\n")
|
||||
|
||||
def get_cmd(self):
|
||||
osc_cmd = self.context.config.userdata.get("osc", "osc")
|
||||
@ -62,6 +67,11 @@ def step_impl(context, args):
|
||||
context.cmd_stderr = re.sub(r"^.*InsecureRequestWarning.*\n warnings.warn\(\n", "", context.cmd_stderr)
|
||||
|
||||
|
||||
@behave.step("I configure osc user \"{username}\" with password \"{password}\"")
|
||||
def step_impl(context, username, password):
|
||||
context.osc.write_oscrc(username=username, password=password)
|
||||
|
||||
|
||||
@behave.step('I wait for osc results for "{project}" "{package}"')
|
||||
def step_impl(context, project, package):
|
||||
args = f"results {project} {package} --csv --format='%(code)s,%(dirty)s'"
|
||||
|
14
behave/features/user.feature
Normal file
14
behave/features/user.feature
Normal file
@ -0,0 +1,14 @@
|
||||
Feature: Manage user accounts
|
||||
|
||||
|
||||
# common steps for all scenarios
|
||||
Background:
|
||||
Given I set working directory to "{context.osc.temp}"
|
||||
And I execute osc with args "api -X POST '/person?cmd=register' --file '{context.fixtures}/user/unicode.xml'"
|
||||
|
||||
|
||||
@destructive
|
||||
Scenario: Run `osc ls` under the newly created user that has a password with unicode characters
|
||||
Given I configure osc user "unicode" with password "Password with unicode characters 🚀🚀🚀"
|
||||
When I execute osc with args "ls test:factory"
|
||||
Then the exit code is 0
|
7
behave/fixtures/user/unicode.xml
Normal file
7
behave/fixtures/user/unicode.xml
Normal file
@ -0,0 +1,7 @@
|
||||
<unregisteredperson>
|
||||
<login>unicode</login>
|
||||
<email>unicode@example.com</email>
|
||||
<realname>An account with unicode password</realname>
|
||||
<state>confirmed_user</state>
|
||||
<password>Password with unicode characters 🚀🚀🚀</password>
|
||||
</unregisteredperson>
|
@ -104,10 +104,14 @@ def get_proxy_manager(env):
|
||||
proxy_url = f"{proxy_purl.scheme}://{proxy_purl.host}"
|
||||
|
||||
proxy_headers = urllib3.make_headers(
|
||||
proxy_basic_auth=proxy_purl.auth,
|
||||
user_agent=f"osc/{__version__}",
|
||||
)
|
||||
|
||||
proxy_basic_auth = urllib.parse.unquote(proxy_purl.auth)
|
||||
proxy_basic_auth = proxy_basic_auth.encode("utf-8")
|
||||
proxy_basic_auth = base64.b64encode(proxy_basic_auth).decode()
|
||||
proxy_headers["Proxy-Authorization"] = f"Basic {proxy_basic_auth:s}"
|
||||
|
||||
manager = urllib3.ProxyManager(proxy_url, proxy_headers=proxy_headers)
|
||||
return manager
|
||||
|
||||
@ -549,7 +553,12 @@ class BasicAuthHandler(AuthHandlerBase):
|
||||
return False
|
||||
if not self.user or not self.password:
|
||||
return False
|
||||
request_headers.update(urllib3.make_headers(basic_auth=f"{self.user}:{self.password}"))
|
||||
|
||||
basic_auth = f"{self.user:s}:{self.password:s}"
|
||||
basic_auth = basic_auth.encode("utf-8")
|
||||
basic_auth = base64.b64encode(basic_auth).decode()
|
||||
request_headers["Authorization"] = f"Basic {basic_auth:s}"
|
||||
|
||||
return True
|
||||
|
||||
def process_response(self, url, request_headers, response):
|
||||
|
@ -36,6 +36,11 @@ class _LazyPassword:
|
||||
self._password = password
|
||||
return self._password
|
||||
|
||||
def __format__(self, format_spec):
|
||||
if format_spec.endswith("s"):
|
||||
return f"{self.__str__():{format_spec}}"
|
||||
return super().__format__(format_spec)
|
||||
|
||||
def __len__(self):
|
||||
return len(str(self))
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user