#!/usr/bin/env python3 # -*- coding: utf-8 -*- # vim: set noexpandtab tabstop=4 shiftwidth=4 autoindent smartindent: # Searches over the network to find out which RPM package distributes # a given file. # # Written by Lucas C. Villa Real # Released under the GNU GPL version 2 or above. import os import re import sys import argparse import subprocess from html.parser import HTMLParser class VersionCmp: def __init__(self, obj, *args): self.obj = obj def __lt__(self, other): return self.compare(self.obj, other.obj) < 0 def __gt__(self, other): return self.compare(self.obj, other.obj) > 0 def __eq__(self, other): return self.compare(self.obj, other.obj) == 0 def __le__(self, other): return self.compare(self.obj, other.obj) <= 0 def __ge__(self, other): return self.compare(self.obj, other.obj) >= 0 def __ne__(self, other): return self.compare(self.obj, other.obj) != 0 def test(self, candidate, reference): if candidate[0].isalpha() and reference[0].isalpha(): return self.compare(candidate, reference) # Handle majors such as "1:xxx.yyy" if ":" in candidate and ":" in reference: pass elif ":" in candidate: candidate = ":".join(candidate.split(":")[1:]) elif ":" in reference: reference = ":".join(reference.split(":")[1:]) c_parts = candidate.split(".") r_parts = reference.split(".") while True: if len(c_parts) <= 1 or len(r_parts) <= 1: # Return a comparison of the major numbers try: a = int("".join(c for c in c_parts[0] if c.isdigit())) b = int("".join(r for r in r_parts[0] if r.isdigit())) a_chars = "".join(c for c in c_parts[0] if not c.isdigit()) b_chars = "".join(r for r in r_parts[0] if not r.isdigit()) if a == b and len(a_chars) and len(b_chars): return self.compare(a_chars, b_chars) except ValueError: pass return self.compare(".".join(c_parts), ".".join(r_parts)) else: # compare 2 numbers try: a = int("".join(c for c in c_parts[0] if c.isdigit())) b = int("".join(r for r in r_parts[0] if r.isdigit())) ret = self.compare(a, b) if ret != 0: return ret except ValueError: return self.compare(".".join(c_parts), ".".join(r_parts)) c_parts = c_parts[1:] r_parts = r_parts[1:] def compare(self, a, b): if a == b: return 0 return -1 if a < b else 1 def cmp_to_key(): return VersionCmp class PackageInfo: def __init__(self): self.name = "" # package name self.filter = "" # version-based filter passed by the user (if any) self.versions = [] # list of versions found self.infopages = [] # URL where further package details are given self.urls = [] # candidate urls self.best = -1 # index of best choice class RPMFind_Parser(HTMLParser): ''' Parses the HTML data output by rpmfind.net ''' def __init__(self): self.tags = [] self.attrs = [] self.names = [] self.versions = [] self.releases = [] self.infopages = [] self.candidates = {} HTMLParser.__init__(self) def handle_starttag(self, tag, attrs): self.tags.append(tag) self.attrs.append(attrs) def handle_endtag(self, tag): self.tags.pop() self.attrs.pop() def handle_data(self, data): if len(self.tags) and self.tags[-1] == "a" and data.find(".rpm") >= 0 and data.find(".src.rpm") < 0: href = self.attrs[-1][0][1].replace("\\", "").replace("'", "") self.candidates[data] = href elif len(self.tags) and self.tags[-1] == "a" and data.find(".html") >= 0 and data.find(".src.html") < 0: # self.attrs[-1] = [("href", "\\'/linux/RPM/fedora/....html\\'")] href = self.attrs[-1][0][1].replace("\\", "").replace("'", "") self.infopages.append(href) elif len(self.tags) and self.tags[-1] == "td" and data.find("Name:") >= 0: pkgname = data.replace("Name:", "").strip() self.names.append(pkgname) elif len(self.tags) and self.tags[-1] == "td" and data.find("Version:") >= 0: pkgversion = data.replace("Version:", "").strip() self.versions.append(pkgversion) elif len(self.tags) and self.tags[-1] == "td" and data.find("Release:") >= 0: pkgrelease = data.replace("Release:", "").strip() self.releases.append(pkgrelease) def get_pkginfo(self, baseuri=""): info = PackageInfo() info.name = "" if len(self.names) == 0 else self.names[0] info.infopages = list(self.infopages) info.versions = list(["{0}.{1}".format(i[0],i[1]) for i in zip(self.versions, self.releases)]) info.urls = [baseuri+self.candidates[k] for k in self.candidates.keys()] return info class RPMFinder: def __init__(self): self.baseuri = "http://rpmfind.net" def find(self, path, arch, distroname, distrocode): ''' Searches rpmfind.net for a given file. Arch and distro can be provided to narrow the search scope. Returns the package info on success or None if no matches were found. ''' self.path = path self.arch = arch self.distroname = distroname.replace(" ", "+") self.distrocode = distrocode requested_archs = self.arch.split(",") for archnum,arch in enumerate(requested_archs): pkginfo = self.__search_rpmfind_net(arch) if pkginfo is None: continue indexes = self.__filter_by_name(pkginfo.urls, pkginfo.name, arch) if len(indexes) == 0 and archnum == len(requested_archs)-1: # User possibly requested more than one architecture (e.g., "noarch,x86_64") # and we had no exact package name matches. Since the RPM database holds aliases # for several packages we must give a second chance to the results returned # by our call to search_rpmfind_net(). indexes = range(0,len(pkginfo.urls)) if len(indexes) == 0: continue if any(op in self.path for op in [">", "<", "="]): op, version = self.__path_op_and_version() pkginfo.filter = "{0} {1}".format(op, version) for i in self.__filter_by_version(pkginfo, indexes, op, version, arch): if pkginfo.best < 0 or VersionCmp(None).test(pkginfo.versions[best], pkginfo.versions[i]) > 0: pkginfo.best = i else: # Not sure what's best to do other than returning the first match. pkginfo.best = 0 if pkginfo.best >= 0: return pkginfo return None def __filter_by_name(self, pkgnames, name, arch): # Compile a regex that catches package names derived from the basename given by self.path. # Example: perl-DBICx when perl-DBI is wanted. regex = re.compile(r"{0}\-[0-9]+.*{1}.{2}.rpm".format(name, self.distrocode, arch)) indexes = [] for i,pkgname in enumerate(pkgnames): if regex.match(os.path.basename(pkgname)): indexes.append(i) return indexes def __filter_by_version(self, pkginfo, indexes, op, version, arch): filtered_indexes = [] for i in indexes: match = pkginfo.urls[i] pkg_version = os.path.basename(match).replace(pkginfo.name, "") pkg_version = pkg_version.replace("{0}.{1}.rpm".format(self.distrocode, arch), "") pkg_version = pkg_version.strip("-").strip(".") vcmp = VersionCmp(None) if op == ">" and vcmp.test(pkg_version, version) > 0: filtered_indexes.append(i) elif op == ">=" and vcmp.test(pkg_version, version) >= 0: filtered_indexes.append(i) elif op == "=" and vcmp.test(pkg_version, version) == 0: filtered_indexes.append(i) elif op == "<" and vcmp.test(pkg_version, version) < 0: filtered_indexes.append(i) elif op == "<=" and vcmp.test(pkg_version, version) <= 0: filtered_indexes.append(i) return filtered_indexes def __path_op_and_version(self): if ">=" in self.path: return ">=", self.path.split(">=")[1].strip() elif ">" in self.path: return ">", self.path.split(">")[1].strip() elif "<=" in self.path: return "<=", self.path.split("<=")[1].strip() elif "<" in self.path: return "<", self.path.split("<")[1].strip() elif "=" in self.path: return "=", self.path.split("=")[1].strip() else: sys.stderr.write("could not extract op and version from {}\n".format(self.path)) return None, None def __search_rpmfind_net(self, arch): path = self.path.replace("/", "%2F") query = "/linux/rpm2html/search.php" query += "?query={0}&submit=Search+...&system={1}&arch={2}".format(path, self.distroname, arch) htmlparser = RPMFind_Parser() try: html = subprocess.check_output(["wget", "--quiet", "{0}{1}".format(self.baseuri, query), "-O", "-"]) htmlparser.feed(str(html)) except subprocess.CalledProcessError: # Fatal error sys.stderr.write("error retrieving url {0}{1}\n".format(self.baseuri, query)) return None for infopage in htmlparser.get_pkginfo().infopages: try: html = subprocess.check_output(["wget", "--quiet", "{0}{1}".format(self.baseuri, infopage), "-O", "-"]) htmlparser.feed(str(html)) except subprocess.CalledProcessError: # Non-fatal error sys.stderr.write("error retrieving url {0}{1}\n".format(self.baseuri, infopage)) return htmlparser.get_pkginfo(self.baseuri) def main(): argparser = argparse.ArgumentParser(argument_default="") argparser.add_argument("--path", type=str, help="File name to search for in the remote RPM databases") argparser.add_argument("--arch", type=str, help="Architecture (optional)") argparser.add_argument("--distroname", type=str, help="Distribution name (optional)") argparser.add_argument("--distrocode", type=str, help="Distribution code (optional)") args = argparser.parse_args() if len(args.path) == 0: argparser.print_help() sys.exit(1) pkginfo = RPMFinder().find(args.path, args.arch, args.distroname, args.distrocode) if pkginfo and pkginfo.best >= 0: if len(pkginfo.filter): print("{0} {1} # {2}".format(pkginfo.name, pkginfo.filter, pkginfo.urls[pkginfo.best])) else: print("{0} # {1}".format(pkginfo.name, pkginfo.urls[pkginfo.best])) if __name__ == "__main__": main()