"""Logic for dealing with sphinx style inventories (e.g. `objects.inv`).These contain mappings of reference names to ids, scoped by domain and object type.This is adapted from the Sphinx inventory.py module.We replicate it here, so that it can be used without Sphinx."""from__future__importannotationsimportargparseimportfunctoolsimportjsonimportreimportzlibfromcollections.abcimportIteratorfromdataclassesimportasdict,dataclassfromtypingimportIO,TYPE_CHECKING,TypedDictfromurllib.requestimporturlopenimportyamlifTYPE_CHECKING:# domain_type:object_type -> name -> (project, version, loc, text)# the `loc` includes the base url, also null `text` is denoted by "-"fromsphinx.util.typingimportInventoryasSphinxInventoryType
[文档]classInventoryItemType(TypedDict):"""A single inventory item."""loc:str"""The location of the item (relative if base_url not None)."""text:str|None"""Implicit text to show for the item."""
[文档]classInventoryType(TypedDict):"""Inventory data."""name:str"""The name of the project."""version:str"""The version of the project."""base_url:str|None"""The base URL of the `loc`."""objects:dict[str,dict[str,dict[str,InventoryItemType]]]"""Mapping of domain -> object type -> name -> item."""
[文档]deffrom_sphinx(inv:SphinxInventoryType)->InventoryType:"""Convert from a Sphinx compliant format."""project=""version=""objs:dict[str,dict[str,dict[str,InventoryItemType]]]={}fordomain_obj_name,dataininv.items():if":"notindomain_obj_name:continuedomain_name,obj_type=domain_obj_name.split(":",1)objs.setdefault(domain_name,{}).setdefault(obj_type,{})forrefname,refdataindata.items():project,version,uri,text=refdataobjs[domain_name][obj_type][refname]={"loc":uri,"text":Noneif(nottextortext=="-")elsetext,}return{"name":project,"version":version,"base_url":None,"objects":objs,}
[文档]defto_sphinx(inv:InventoryType)->SphinxInventoryType:"""Convert to a Sphinx compliant format."""objs:SphinxInventoryType={}fordomain_name,obj_typesininv["objects"].items():forobj_type,refsinobj_types.items():forrefname,refdatainrefs.items():objs.setdefault(f"{domain_name}:{obj_type}",{})[refname]=(inv["name"],inv["version"],refdata["loc"],refdata["text"]or"-",)returnobjs
[文档]defload(stream:IO,base_url:str|None=None)->InventoryType:"""Load inventory data from a stream."""reader=InventoryFileReader(stream)line=reader.readline().rstrip()ifline=="# Sphinx inventory version 1":return_load_v1(reader,base_url)elifline=="# Sphinx inventory version 2":return_load_v2(reader,base_url)else:raiseValueError(f"invalid inventory header: {line}")
def_load_v1(stream:InventoryFileReader,base_url:str|None)->InventoryType:"""Load inventory data (format v1) from a stream."""projname=stream.readline().rstrip()[11:]version=stream.readline().rstrip()[11:]invdata:InventoryType={"name":projname,"version":version,"base_url":base_url,"objects":{},}forlineinstream.readlines():name,objtype,location=line.rstrip().split(None,2)# version 1 did not add anchors to the locationdomain="py"ifobjtype=="mod":objtype="module"location+="#module-"+nameelse:location+="#"+nameinvdata["objects"].setdefault(domain,{}).setdefault(objtype,{})invdata["objects"][domain][objtype][name]={"loc":location,"text":None}returninvdatadef_load_v2(stream:InventoryFileReader,base_url:str|None)->InventoryType:"""Load inventory data (format v2) from a stream."""projname=stream.readline().rstrip()[11:]version=stream.readline().rstrip()[11:]invdata:InventoryType={"name":projname,"version":version,"base_url":base_url,"objects":{},}line=stream.readline()if"zlib"notinline:raiseValueError(f"invalid inventory header (not compressed): {line}")forlineinstream.read_compressed_lines():# be careful to handle names with embedded spaces correctlym=re.match(r"(?x)(.+?)\s+(\S+)\s+(-?\d+)\s+?(\S*)\s+(.*)",line.rstrip())ifnotm:continuename:strtype:strname,type,_,location,text=m.groups()if":"notintype:# wrong type value. type should be in the form of "{domain}:{objtype}"## Note: To avoid the regex DoS, this is implemented in python (refs: #8175)continueif(type=="py:module"andtypeininvdata["objects"]andnameininvdata["objects"][type]):# due to a bug in 1.1 and below,# two inventory entries are created# for Python modules, and the first# one is correctcontinueiflocation.endswith("$"):location=location[:-1]+namedomain,objtype=type.split(":",1)invdata["objects"].setdefault(domain,{}).setdefault(objtype,{})ifnottextortext=="-":text=Noneinvdata["objects"][domain][objtype][name]={"loc":location,"text":text}returninvdata_BUFSIZE=16*1024
[文档]classInventoryFileReader:"""A file reader for an inventory file. This reader supports mixture of texts and compressed texts. """def__init__(self,stream:IO)->None:self.stream=streamself.buffer=b""self.eof=False
@functools.lru_cache(maxsize=256)def_create_regex(pat:str)->re.Pattern[str]:r"""Create a regex from a pattern, that can include `*` wildcards, to match 0 or more characters. `\*` is translated as a literal `*`. """regex=""backslash_last=Falseforcharinpat:ifbackslash_lastandchar=="*":regex+=re.escape(char)backslash_last=Falsecontinueifbackslash_last:regex+=re.escape("\\")backslash_last=Falseifchar=="\\":backslash_last=Truecontinueifchar=="*":regex+=".*"continueregex+=re.escape(char)returnre.compile(regex)
[文档]defmatch_with_wildcard(name:str,pattern:str|None)->bool:r"""Match a whole name with a pattern, that can include `*` wildcards, to match 0 or more characters. To include a literal `*` in the pattern, use `\*`. """ifpatternisNone:returnTrueregex=_create_regex(pattern)returnregex.fullmatch(name)isnotNone
[文档]@dataclassclassInvMatch:"""A match from an inventory."""inv:strdomain:strotype:strname:strproject:strversion:strbase_url:str|Noneloc:strtext:str|None
[文档]deffilter_inventories(inventories:dict[str,InventoryType],*,invs:str|None=None,domains:str|None=None,otypes:str|None=None,targets:str|None=None,)->Iterator[InvMatch]:r"""Filter a set of inventories. Filters are strings that can include `*` wildcards, to match 0 or more characters. To include a literal `*` in the pattern, use `\*`. :param inventories: Mapping of inventory name to inventory data :param invs: the inventory key filter :param domains: the domain name filter :param otypes: the object type filter :param targets: the target name filter """forinv_name,inv_dataininventories.items():ifnotmatch_with_wildcard(inv_name,invs):continuefordomain_name,dom_dataininv_data["objects"].items():ifnotmatch_with_wildcard(domain_name,domains):continueforobj_type,obj_dataindom_data.items():ifnotmatch_with_wildcard(obj_type,otypes):continuefortarget,item_datainobj_data.items():ifmatch_with_wildcard(target,targets):yieldInvMatch(inv=inv_name,domain=domain_name,otype=obj_type,name=target,project=inv_data["name"],version=inv_data["version"],base_url=inv_data["base_url"],loc=item_data["loc"],text=item_data["text"],)
[文档]deffilter_sphinx_inventories(inventories:dict[str,SphinxInventoryType],*,invs:str|None=None,domains:str|None=None,otypes:str|None=None,targets:str|None=None,)->Iterator[InvMatch]:r"""Filter a set of sphinx style inventories. Filters are strings that can include `*` wildcards, to match 0 or more characters. To include a literal `*` in the pattern, use `\*`. :param inventories: Mapping of inventory name to inventory data :param invs: the inventory key filter :param domains: the domain name filter :param otypes: the object type filter :param targets: the target name filter """forinv_name,inv_dataininventories.items():ifnotmatch_with_wildcard(inv_name,invs):continuefordomain_obj_name,dataininv_data.items():if":"notindomain_obj_name:continuedomain_name,obj_type=domain_obj_name.split(":",1)ifnot(match_with_wildcard(domain_name,domains)andmatch_with_wildcard(obj_type,otypes)):continuefortargetindata:ifmatch_with_wildcard(target,targets):project,version,loc,text=data[target]yield(InvMatch(inv=inv_name,domain=domain_name,otype=obj_type,name=target,project=project,version=version,base_url=None,loc=loc,text=Noneif(nottextortext=="-")elsetext,))
[文档]deffilter_string(invs:str|None,domains:str|None,otype:str|None,target:str|None,*,delimiter:str=":",)->str:"""Create a string representation of the filter, from the given arguments."""str_items=[]foritemin(invs,domains,otype,target):ifitemisNone:str_items.append("*")elifdelimiterinitem:str_items.append(f'"{item}"')else:str_items.append(f"{item}")returndelimiter.join(str_items)
[文档]deffetch_inventory(uri:str,*,timeout:None|float=None,base_url:None|str=None)->InventoryType:"""Fetch an inventory from a URL or local path."""ifuri.startswith(("http://","https://")):withurlopen(uri,timeout=timeout)asstream:returnload(stream,base_url=base_url)withopen(uri,"rb")asstream:returnload(stream,base_url=base_url)
[文档]definventory_cli(inputs:None|list[str]=None):"""Command line interface for fetching and parsing an inventory."""parser=argparse.ArgumentParser(description="Parse an inventory file.")parser.add_argument("uri",metavar="[URL|PATH]",help="URI of the inventory file")parser.add_argument("-d","--domain",metavar="DOMAIN",default="*",help="Filter the inventory by domain (`*` = wildcard)",)parser.add_argument("-o","--object-type",metavar="TYPE",default="*",help="Filter the inventory by object type (`*` = wildcard)",)parser.add_argument("-n","--name",metavar="NAME",default="*",help="Filter the inventory by reference name (`*` = wildcard)",)parser.add_argument("-l","--loc",metavar="LOC",help="Filter the inventory by reference location (`*` = wildcard)",)parser.add_argument("-f","--format",choices=["yaml","json"],default="yaml",help="Output format",)parser.add_argument("--timeout",type=float,metavar="SECONDS",help="Timeout for fetching the inventory",)args=parser.parse_args(inputs)base_url=Noneifargs.uri.startswith("http://")orargs.uri.startswith("https://"):try:withurlopen(args.uri,timeout=args.timeout)asstream:invdata=load(stream)base_url=args.uri.rsplit("/",1)[0]exceptException:withurlopen(args.uri+"/objects.inv",timeout=args.timeout)asstream:invdata=load(stream)base_url=args.urielse:withopen(args.uri,"rb")asstream:invdata=load(stream)filtered:InventoryType={"name":invdata["name"],"version":invdata["version"],"base_url":base_url,"objects":{},}formatchinfilter_inventories({"":invdata},domains=args.domain,otypes=args.object_type,targets=args.name,):ifargs.locandnotmatch_with_wildcard(match.loc,args.loc):continuefiltered["objects"].setdefault(match.domain,{}).setdefault(match.otype,{})[match.name]={"loc":match.loc,"text":match.text,}ifargs.format=="json":print(json.dumps(filtered,indent=2,sort_keys=False))else:print(yaml.dump(filtered,sort_keys=False))