From 595aa7563efc94f806ef519d25463a3207f2746d Mon Sep 17 00:00:00 2001 From: Victor Zhestkov Date: Mon, 10 Mar 2025 10:13:39 +0100 Subject: [PATCH] Add DEB822 apt source format support (#692) Co-authored-by: Marek Czernek --- salt/modules/aptpkg.py | 123 ++++++++++++++++++---- tests/pytests/unit/modules/test_aptpkg.py | 122 +++++++++++++++++++++ 2 files changed, 225 insertions(+), 20 deletions(-) diff --git a/salt/modules/aptpkg.py b/salt/modules/aptpkg.py index cd40aea54f..48d2ccb904 100644 --- a/salt/modules/aptpkg.py +++ b/salt/modules/aptpkg.py @@ -59,6 +59,16 @@ try: except ImportError: HAS_APT = False +HAS_DEB822 = False + +if HAS_APT: + try: + from aptsources.sourceslist import Deb822SourceEntry, _deb822 + + HAS_DEB822 = True + except ImportError: + pass + try: import apt_pkg @@ -1907,8 +1917,11 @@ def list_repos(**kwargs): salt '*' pkg.list_repos disabled=True """ repos = {} - sources = SourcesList() - for source in sources.list: + if HAS_DEB822: + sources = SourcesList(deb822=True) + else: + sources = SourcesList() + for source in sources: if _skip_source(source): continue if not HAS_APT: @@ -1916,19 +1929,40 @@ def list_repos(**kwargs): else: signedby = _get_opts(line=source.line)["signedby"].get("value", "") repo = {} + if HAS_DEB822: + try: + signedby = source.section.tags.get("Signed-By", signedby) + except AttributeError: + pass repo["file"] = source.file - repo["comps"] = getattr(source, "comps", []) + repo_comps = getattr(source, "comps", []) + repo_dists = source.dist.split(" ") + repo["comps"] = repo_comps repo["disabled"] = source.disabled repo["enabled"] = not repo[ "disabled" ] # This is for compatibility with the other modules - repo["dist"] = source.dist + repo["dist"] = repo_dists.pop(0) repo["type"] = source.type repo["uri"] = source.uri - repo["line"] = source.line.strip() + if "Types: " in source.line and "\n" in source.line: + repo["line"] = ( + f"{source.type} {source.uri} {repo['dist']} {' '.join(repo_comps)}" + ) + else: + repo["line"] = source.line.strip() repo["architectures"] = getattr(source, "architectures", []) repo["signedby"] = signedby repos.setdefault(source.uri, []).append(repo) + if len(repo_dists): + for dist in repo_dists: + repo_copy = repo.copy() + repo_copy["dist"] = dist + if "Types: " in source.line and "\n" in source.line: + repo_copy["line"] = ( + f"{source.type} {source.uri} {repo_copy['dist']} {' '.join(repo_comps)}" + ) + repos[source.uri].append(repo_copy) return repos @@ -1937,12 +1971,17 @@ def get_repo(repo, **kwargs): Display a repo from the sources.list / sources.list.d The repo passed in needs to be a complete repo entry. + When system uses repository in the deb822 format, + get_repo uses a partial match of distributions. + + In that case, include any distribution of the deb822 + repository in the repo name to match that repo. CLI Examples: .. code-block:: bash - salt '*' pkg.get_repo "myrepo definition" + salt '*' pkg.get_repo "deb URL noble main" """ ppa_auth = kwargs.get("ppa_auth", None) # we have to be clever about this since the repo definition formats @@ -2021,11 +2060,17 @@ def del_repo(repo, **kwargs): The repo passed in must be a fully formed repository definition string. + When system uses repository in the deb822 format, + del_repo uses a partial match of distributions. + + In that case, include any distribution of the deb822 + repository in the repo name to match that repo. + CLI Examples: .. code-block:: bash - salt '*' pkg.del_repo "myrepo definition" + salt '*' pkg.del_repo "deb URL noble main" """ is_ppa = False if repo.startswith("ppa:") and __grains__["os"] in ("Ubuntu", "Mint", "neon"): @@ -2047,7 +2092,10 @@ def del_repo(repo, **kwargs): else: repo = softwareproperties.ppa.expand_ppa_line(repo, dist)[0] - sources = SourcesList() + if HAS_DEB822: + sources = SourcesList(deb822=True) + else: + sources = SourcesList() repos = [s for s in sources.list if not s.invalid] if repos: deleted_from = dict() @@ -2070,12 +2118,14 @@ def del_repo(repo, **kwargs): source.type == repo_type and source.architectures == repo_architectures and source.uri == repo_uri - and source.dist == repo_dist + and repo_dist in source.dist ): s_comps = set(source.comps) r_comps = set(repo_comps) - if s_comps.intersection(r_comps): + if s_comps.intersection(r_comps) or ( + s_comps == set() and r_comps == set() + ): deleted_from[source.file] = 0 source.comps = list(s_comps.difference(r_comps)) if not source.comps: @@ -2551,6 +2601,12 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs): ``ppa:/repo`` format is acceptable. ``ppa:`` format can only be used to create a new repository. + When system uses repository in the deb822 format, mod_repo uses a partial + match of distributions. + + In that case, include any distribution of the deb822 repository in the + repo definition to match that repo. + The following options are available to modify a repo definition: architectures @@ -2605,8 +2661,8 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs): .. code-block:: bash - salt '*' pkg.mod_repo 'myrepo definition' uri=http://new/uri - salt '*' pkg.mod_repo 'myrepo definition' comps=main,universe + salt '*' pkg.mod_repo 'deb URL noble main' uri=http://new/uri + salt '*' pkg.mod_repo 'deb URL noble main' comps=main,universe """ if "refresh_db" in kwargs: refresh = kwargs["refresh_db"] @@ -2726,7 +2782,10 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs): 'cannot parse "ppa:" style repo definitions: {}'.format(repo) ) - sources = SourcesList() + if HAS_DEB822: + sources = SourcesList(deb822=True) + else: + sources = SourcesList() if kwargs.get("consolidate", False): # attempt to de-dup and consolidate all sources # down to entries in sources.list @@ -2743,11 +2802,14 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs): repos = [] for source in sources: - if HAS_APT: + if HAS_APT and not HAS_DEB822: _, invalid, _, _ = _invalid(source.line) if not invalid: repos.append(source) else: + if HAS_DEB822 and source.types == [""]: + # most probably invalid or comment line + continue repos.append(source) mod_source = None @@ -2906,10 +2968,11 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs): # and the resulting source line. The idea here is to ensure # we are not returning bogus data because the source line # has already been modified on a previous run. + apt_source_dists = apt_source.dist.split(" ") repo_matches = ( apt_source.type == repo_type and apt_source.uri.rstrip("/") == repo_uri.rstrip("/") - and apt_source.dist == repo_dist + and repo_dist in apt_source_dists ) kw_matches = apt_source.dist == kw_dist and apt_source.type == kw_type @@ -2928,7 +2991,18 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs): kwargs["comments"] = salt.utils.pkg.deb.combine_comments(kwargs["comments"]) if not mod_source: - mod_source = SourceEntry(repo) + if HAS_DEB822: + apt_source_file = kwargs.get("file") + section = _deb822.Section("") + section["Types"] = repo_type + section["URIs"] = repo_uri + section["Suites"] = repo_dist + section["Components"] = " ".join(repo_comps) + if kwargs.get("trusted") == True or kwargs.get("Trusted") == True: + section["Trusted"] = "yes" + mod_source = Deb822SourceEntry(section, apt_source_file) + else: + mod_source = SourceEntry(repo) if "comments" in kwargs: mod_source.comment = kwargs["comments"] sources.list.append(mod_source) @@ -2950,7 +3024,8 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs): if mod_source.uri != repo_uri: mod_source.uri = repo_uri - mod_source.line = mod_source.str() + if not HAS_DEB822: + mod_source.line = mod_source.str() sources.save() # on changes, explicitly refresh @@ -2962,15 +3037,20 @@ def mod_repo(repo, saltenv="base", aptkey=True, **kwargs): else: signedby = _get_opts(repo)["signedby"].get("value", "") + repo_source_line = mod_source.line + if "Types: " in repo_source_line and "\n" in repo_source_line: + repo_source_line = f"{mod_source.type} {mod_source.uri} {repo_dist} {' '.join(mod_source.comps)}" + return { repo: { "architectures": getattr(mod_source, "architectures", []), + "dist": mod_source.dist, "comps": mod_source.comps, "disabled": mod_source.disabled, "file": mod_source.file, "type": mod_source.type, "uri": mod_source.uri, - "line": mod_source.line, + "line": repo_source_line, "signedby": signedby, } } @@ -3055,7 +3135,10 @@ def _expand_repo_def(os_name, os_codename=None, **kwargs): if kwarg in kwargs: setattr(source_entry, kwarg, kwargs[kwarg]) - source_list = SourcesList() + if HAS_DEB822: + source_list = SourcesList(deb822=True) + else: + source_list = SourcesList() kwargs = {} if not HAS_APT: signedby = source_entry.signedby @@ -3083,7 +3166,7 @@ def _expand_repo_def(os_name, os_codename=None, **kwargs): sanitized["dist"] = _source_entry.dist sanitized["type"] = _source_entry.type sanitized["uri"] = _source_entry.uri - sanitized["line"] = _source_entry.line.strip() + sanitized["line"] = getattr(_source_entry, "line", "").strip() sanitized["architectures"] = getattr(_source_entry, "architectures", []) sanitized["signedby"] = signedby if HAS_APT and signedby: diff --git a/tests/pytests/unit/modules/test_aptpkg.py b/tests/pytests/unit/modules/test_aptpkg.py index 6f0b905ef7..4975a78c38 100644 --- a/tests/pytests/unit/modules/test_aptpkg.py +++ b/tests/pytests/unit/modules/test_aptpkg.py @@ -42,6 +42,25 @@ try: except ImportError: HAS_APTSOURCES = False +HAS_DEB822 = False + +if HAS_APT: + try: + from aptsources.sourceslist import Deb822SourceEntry, _deb822 # pylint: disable=unused-import + + HAS_DEB822 = True + except ImportError: + pass + +HAS_APT_PKG = False + +try: + import apt_pkg + + HAS_APT_PKG = True +except ImportError: + pass + log = logging.getLogger(__name__) @@ -216,6 +235,8 @@ class MockSourceEntry: self.comps = [] self.architectures = [] self.signedby = "" + if HAS_DEB822: + self.types = [] def mysplit(self, line): return line.split() @@ -237,6 +258,107 @@ def configure_loader_modules(): return {aptpkg: {"__grains__": {}}} +@pytest.fixture +def deb822_repo_content(): + return """ +Types: deb +URIs: http://cz.archive.ubuntu.com/ubuntu/ +Suites: noble noble-updates noble-backports +Components: main +Signed-By: /usr/share/keyrings/ubuntu-archive-keyring.gpg +""" + + +@pytest.fixture +def deb822_repo_file(tmp_path: pathlib.Path, deb822_repo_content: str): + """ + Create a Debian-style repository in the deb822 format and return + the path of the repository file. + """ + repo = tmp_path / "sources.list.d" / "test.sources" + repo.parent.mkdir(parents=True, exist_ok=True) + repo.write_text(deb822_repo_content.strip(), encoding="UTF-8") + return repo + + +@pytest.fixture +def mock_apt_config(deb822_repo_file: pathlib.Path): + """ + Mocking common to deb822 testing so that apt_pkg uses the + tmp_path/sources.list.d as the Dir::Etc::sourceparts location + """ + with patch.dict( + aptpkg.__salt__, + {"config.option": MagicMock()}, + ), patch.object(apt_pkg, "config") as mock_config: + mock_config.find_file.return_value = "/etc/apt/sources.list" + mock_config.find_dir.return_value = os.path.dirname(str(deb822_repo_file)) + yield mock_config + + +@pytest.mark.skipif(not HAS_DEB822, reason="Requires deb822 support") +@pytest.mark.skipif(not HAS_APT_PKG, reason="Requires debian/ubuntu apt_pkg system library") +def test_mod_repo_deb822_modify(deb822_repo_file: pathlib.Path, mock_apt_config): + """ + Test that aptpkg can modify an existing repository in the deb822 format. + In this test, we match the repository by name and disable it. + """ + uri = "http://cz.archive.ubuntu.com/ubuntu/" + repo = f"deb {uri} noble main" + + aptpkg.mod_repo(repo, enabled=False, file=str(deb822_repo_file), refresh_db=False) + + repo_file = deb822_repo_file.read_text(encoding="UTF-8") + assert "Enabled: no" in repo_file + assert f"URIs: {uri}" in repo_file + + +@pytest.mark.skipif(not HAS_DEB822, reason="Requires deb822 support") +@pytest.mark.skipif(not HAS_APT_PKG, reason="Requires debian/ubuntu apt_pkg system library") +def test_mod_repo_deb822_add(deb822_repo_file: pathlib.Path, mock_apt_config): + """ + Test that aptpkg can add a repository in the deb822 format. + """ + uri = "http://security.ubuntu.com/ubuntu/" + repo = f"deb {uri} noble-security main" + + aptpkg.mod_repo(repo, file=str(deb822_repo_file), refresh_db=False) + + repo_file = deb822_repo_file.read_text(encoding="UTF-8") + assert f"URIs: {uri}" in repo_file + assert "URIs: http://cz.archive.ubuntu.com/ubuntu/" in repo_file + + +@pytest.mark.skipif(not HAS_DEB822, reason="Requires deb822 support") +@pytest.mark.skipif(not HAS_APT_PKG, reason="Requires debian/ubuntu apt_pkg system library") +def test_del_repo_deb822(deb822_repo_file: pathlib.Path, mock_apt_config): + """ + Test that aptpkg can delete a repository in the deb822 format. + """ + uri = "http://cz.archive.ubuntu.com/ubuntu/" + repo = f"deb {uri} noble main" + + with patch.object(aptpkg, "refresh_db"): + aptpkg.del_repo(repo, file=str(deb822_repo_file)) + + assert not os.path.isfile(str(deb822_repo_file)) + + +@pytest.mark.skipif(not HAS_DEB822, reason="Requires deb822 support") +@pytest.mark.skipif(not HAS_APT_PKG, reason="Requires debian/ubuntu apt_pkg system library") +def test_get_repo_deb822(deb822_repo_file: pathlib.Path, mock_apt_config): + """ + Test that aptpkg can match a repository in the deb822 format. + """ + uri = "http://cz.archive.ubuntu.com/ubuntu/" + repo = f"deb {uri} noble main" + + result = aptpkg.get_repo(repo) + + assert bool(result) + assert result["uri"] == uri + + def test_version(lowpkg_info_var): """ Test - Returns a string representing the package version or an empty string if -- 2.48.1