Index: aiohttp-3.8.5/CHANGES/8317.bugfix.rst =================================================================== --- /dev/null +++ aiohttp-3.8.5/CHANGES/8317.bugfix.rst @@ -0,0 +1 @@ +Escaped filenames in static view -- by :user:`bdraco`. Index: aiohttp-3.8.5/aiohttp/web_urldispatcher.py =================================================================== --- aiohttp-3.8.5.orig/aiohttp/web_urldispatcher.py +++ aiohttp-3.8.5/aiohttp/web_urldispatcher.py @@ -1,7 +1,9 @@ import abc import asyncio import base64 +import functools import hashlib +import html import inspect import keyword import os @@ -87,6 +89,8 @@ PATH_SEP: Final[str] = re.escape("/") _ExpectHandler = Callable[[Request], Awaitable[None]] _Resolve = Tuple[Optional["UrlMappingMatchInfo"], Set[str]] +html_escape = functools.partial(html.escape, quote=True) + class _InfoDict(TypedDict, total=False): path: str @@ -696,7 +700,7 @@ class StaticResource(PrefixResource): assert filepath.is_dir() relative_path_to_dir = filepath.relative_to(self._directory).as_posix() - index_of = f"Index of /{relative_path_to_dir}" + index_of = f"Index of /{html_escape(relative_path_to_dir)}" h1 = f"

{index_of}

" index_list = [] @@ -704,7 +708,7 @@ class StaticResource(PrefixResource): for _file in sorted(dir_index): # show file url as relative to static path rel_path = _file.relative_to(self._directory).as_posix() - file_url = self._prefix + "/" + rel_path + quoted_file_url = _quote_path(f"{self._prefix}/{rel_path}") # if file is a directory, add '/' to the end of the name if _file.is_dir(): @@ -713,9 +717,7 @@ class StaticResource(PrefixResource): file_name = _file.name index_list.append( - '
  • {name}
  • '.format( - url=file_url, name=file_name - ) + f'
  • {html_escape(file_name)}
  • ' ) ul = "".format("\n".join(index_list)) body = f"\n{h1}\n{ul}\n" Index: aiohttp-3.8.5/tests/test_web_urldispatcher.py =================================================================== --- aiohttp-3.8.5.orig/tests/test_web_urldispatcher.py +++ aiohttp-3.8.5/tests/test_web_urldispatcher.py @@ -35,35 +35,42 @@ def tmp_dir_path(request): @pytest.mark.parametrize( - "show_index,status,prefix,data", + "show_index,status,prefix,request_path,data", [ - pytest.param(False, 403, "/", None, id="index_forbidden"), + pytest.param(False, 403, "/", "/", None, id="index_forbidden"), pytest.param( True, 200, "/", - b"\n\nIndex of /.\n" - b"\n\n

    Index of /.

    \n\n\n", - id="index_root", + "/", + b"\n\nIndex of /.\n\n\n

    Index of" + b' /.

    \n\n\n", ), pytest.param( True, 200, "/static", - b"\n\nIndex of /.\n" - b"\n\n

    Index of /.

    \n\n\n", + "/static", + b"\n\nIndex of /.\n\n\n

    Index of" + b' /.

    \n\n\n', id="index_static", ), + pytest.param( + True, + 200, + "/static", + "/static/my_dir", + b"\n\nIndex of /my_dir\n\n\n

    " + b'Index of /my_dir

    \n\n\n", + id="index_subdir", + ), ], ) async def test_access_root_of_static_handler( - tmp_dir_path, aiohttp_client, show_index, status, prefix, data + tmp_dir_path, aiohttp_client, show_index, status, prefix, request_path, data ) -> None: # Tests the operation of static file server. # Try to access the root of static file server, and make @@ -88,7 +95,7 @@ async def test_access_root_of_static_han client = await aiohttp_client(app) # Request the root of the static directory. - r = await client.get(prefix) + r = await client.get(request_path) assert r.status == status if data: @@ -97,6 +104,92 @@ async def test_access_root_of_static_han assert read_ == data +@pytest.mark.skipif( + not sys.platform.startswith("linux"), + reason="Invalid filenames on some filesystems (like Windows)", +) +@pytest.mark.parametrize( + "show_index,status,prefix,request_path,data", + [ + pytest.param(False, 403, "/", "/", None, id="index_forbidden"), + pytest.param( + True, + 200, + "/", + "/", + b"\n\nIndex of /.\n\n\n

    Index of" + b' /.

    \n\n\n", + ), + pytest.param( + True, + 200, + "/static", + "/static", + b"\n\nIndex of /.\n\n\n

    Index of" + b' /.

    \n\n\n", + id="index_static", + ), + pytest.param( + True, + 200, + "/static", + "/static/.dir", + b"\n\nIndex of /<img src=0 onerror=alert(1)>.dir</t" + b"itle>\n</head>\n<body>\n<h1>Index of /<img src=0 onerror=alert(1)>.di" + b'r</h1>\n<ul>\n<li><a href="/static/%3Cimg%20src=0%20onerror=alert(1)%3E.di' + b'r/my_file_in_dir">my_file_in_dir</a></li>\n</ul>\n</body>\n</html>', + id="index_subdir", + ), + ], +) +async def test_access_root_of_static_handler_xss( + tmp_path, + aiohttp_client, + show_index, + status, + prefix, + request_path, + data, +) -> None: + # Tests the operation of static file server. + # Try to access the root of static file server, and make + # sure that correct HTTP statuses are returned depending if we directory + # index should be shown or not. + # Ensure that html in file names is escaped. + # Ensure that links are url quoted. + my_file = tmp_path / "<img src=0 onerror=alert(1)>.txt" + my_dir = tmp_path / "<img src=0 onerror=alert(1)>.dir" + my_dir.mkdir() + my_file_in_dir = my_dir / "my_file_in_dir" + + with my_file.open("w") as fw: + fw.write("hello") + + with my_file_in_dir.open("w") as fw: + fw.write("world") + + app = web.Application() + + # Register global static route: + app.router.add_static(prefix, str(tmp_path), show_index=show_index) + client = await aiohttp_client(app) + + # Request the root of the static directory. + async with await client.get(request_path) as r: + assert r.status == status + + if data: + assert r.headers["Content-Type"] == "text/html; charset=utf-8" + read_ = await r.read() + assert read_ == data + + async def test_follow_symlink(tmp_dir_path, aiohttp_client) -> None: # Tests the access to a symlink, in static folder data = "hello world"