"""Parser for directive options.This is a highly restricted parser for YAML,which only allows a subset of YAML to be used for directive options:- Only block mappings are allowed at the top level- Mapping keys are parsed as strings (plain or quoted)- Mapping values are parsed as strings (plain, quoted, literal `|`, folded `>`)- `#` Comments are allowed and blank linesAdapted from:https://github.com/yaml/pyyaml/commit/957ae4d495cf8fcb5475c6c2f1bce801096b68a5For a good description of multi-line YAML strings, see:https://stackoverflow.com/a/21699210/5033292"""from__future__importannotationsfromcollections.abcimportIterablefromdataclassesimportdataclass,replacefromtypingimportClassVar,Final,Literal,cast
[文档]@dataclassclassPosition:"""Position of a character in a stream."""index:intline:intcolumn:int
[文档]classStreamBuffer:"""A buffer for a stream of characters."""def__init__(self,stream:str):self._buffer=stream+_CHARS_ENDself._index=0self._line=0self._column=0@propertydefindex(self)->int:returnself._index@propertydefline(self)->int:returnself._line@propertydefcolumn(self)->int:returnself._column
[文档]@dataclassclassToken:"""A parsed token from a directive option stream."""id:ClassVar[str]="<unknown>"start:Positionend:Position
[文档]@dataclassclassKeyToken(Token):id:ClassVar[str]="<key>"value:strstyle:Literal[None,"'",'"']=None"""The original style of the string."""
[文档]@dataclassclassValueToken(Token):id:ClassVar[str]="<value>"value:strstyle:Literal[None,"'",'"',"|",">"]=None"""The original style of the string."""
[文档]classTokenizeError(Exception):def__init__(self,problem:str,problem_mark:Position,context:str|None=None,context_mark:Position|None=None,):"""A YAML error with optional context. :param problem: The problem encountered :param problem_mark: The position of the problem :param context: The context of the error, e.g. the parent being scanned :param context_mark: The position of the context """self.context=contextself.context_mark=context_markself.problem=problemself.problem_mark=problem_mark
[文档]defclone(self,line_offset:int,column_offset:int)->TokenizeError:"""Clone the error with the given line and column offsets."""returnTokenizeError(self.problem,replace(self.problem_mark,line=self.problem_mark.line+line_offset,column=self.problem_mark.column+column_offset,),self.context,Noneifself.context_markisNoneelsereplace(self.context_mark,line=self.context_mark.line+line_offset,column=self.context_mark.column+column_offset,),)
def__str__(self)->str:lines=[]ifself.contextisnotNone:lines.append(self.context)ifself.context_markisnotNoneand(self.context_mark.line!=self.problem_mark.lineorself.context_mark.column!=self.problem_mark.column):lines.append(f"at line {self.context_mark.line}, column {self.context_mark.column}")ifself.problemisnotNone:lines.append(self.problem)ifself.problem_markisnotNone:lines.append(f"at line {self.problem_mark.line}, column {self.problem_mark.column}")return"\n".join(lines)
[文档]defoptions_to_items(text:str,line_offset:int=0,column_offset:int=0)->tuple[list[tuple[str,str]],State]:"""Parse a directive option block into (key, value) tuples. :param text: The directive option text. :param line_offset: The line offset to apply to the error positions. :param column_offset: The column offset to apply to the error positions. :raises: `TokenizeError` """output=[]state=State()forkey_token,value_tokenin_to_tokens(text,state,line_offset,column_offset):output.append((key_token.value,value_token.valueifvalue_tokenisnotNoneelse""))returnoutput,state
def_to_tokens(text:str,state:State,line_offset:int=0,column_offset:int=0)->Iterable[tuple[KeyToken,ValueToken|None]]:"""Parse a directive option, and yield key/value token pairs. :param text: The directive option text. :param line_offset: The line offset to apply to the error positions. :param column_offset: The column offset to apply to the error positions. :raises: `TokenizeError` """key_token:KeyToken|None=Nonetry:fortokenin_tokenize(text,state):ifisinstance(token,KeyToken):ifkey_tokenisnotNone:yieldkey_token,Nonekey_token=tokenelifisinstance(token,ValueToken):ifkey_tokenisNone:raiseTokenizeError("expected key before value",token.start)yieldkey_token,tokenkey_token=Noneifkey_tokenisnotNone:yieldkey_token,NoneexceptTokenizeErrorasexc:ifline_offsetorcolumn_offset:raiseexc.clone(line_offset,column_offset)fromexcraisedef_tokenize(text:str,state:State)->Iterable[Token]:"""Yield tokens from a directive option stream."""stream=StreamBuffer(text)whileTrue:_scan_to_next_token(stream,state)ifstream.peek()==_CHARS_END:breakifnotstream.column==0:raiseTokenizeError("expected key to start at column 0",stream.get_position())# find keych=stream.peek()ifchin("'",'"'):yield_scan_flow_scalar(stream,cast(Literal['"',"'"],ch),is_key=True)else:yield_scan_plain_scalar(stream,state,is_key=True)_scan_to_next_token(stream,state)# check next char is colon + spaceifstream.peek()!=":":raiseTokenizeError("expected ':' after key",stream.get_position())start_mark=stream.get_position()stream.forward()end_mark=stream.get_position()yieldColonToken(start_mark,end_mark)_scan_to_next_token(stream,state)# now find valuech=stream.peek()ifstream.column==0:passelifchin("|",">"):yield_scan_block_scalar(stream,cast(Literal["|",">"],ch),state)elifchin("'",'"'):yield_scan_flow_scalar(stream,cast(Literal['"',"'"],ch),is_key=False)else:yield_scan_plain_scalar(stream,state,is_key=False)def_scan_to_next_token(stream:StreamBuffer,state:State)->None:"""Skip spaces, line breaks and comments. The byte order mark is also stripped, if it's the first character in the stream. """ifstream.index==0andstream.peek()=="\ufeff":stream.forward()found=Falsewhilenotfound:whilestream.peek()==" ":stream.forward()ifstream.peek()=="#":state.has_comments=Truewhilestream.peek()notin_CHARS_END_NEWLINE:stream.forward()ifnot_scan_line_break(stream):found=Truedef_scan_plain_scalar(stream:StreamBuffer,state:State,is_key:bool=False)->KeyToken|ValueToken:chunks=[]start_mark=stream.get_position()end_mark=start_markindent=0ifis_keyelse1spaces:list[str]=[]whileTrue:length=0ifstream.peek()=="#":state.has_comments=TruebreakwhileTrue:ch=stream.peek(length)ifchin_CHARS_END_SPACE_TAB_NEWLINEor(is_keyandch==":"andstream.peek(length+1)in_CHARS_END_SPACE_TAB_NEWLINE):breaklength+=1iflength==0:breakchunks.extend(spaces)chunks.append(stream.prefix(length))stream.forward(length)end_mark=stream.get_position()spaces=_scan_plain_spaces(stream,allow_newline=(notis_key))ifnotspacesorstream.peek()=="#"or(stream.column<indent):ifstream.peek()=="#":state.has_comments=Truebreakreturn(KeyToken(start_mark,end_mark,"".join(chunks))ifis_keyelseValueToken(start_mark,end_mark,"".join(chunks)))def_scan_plain_spaces(stream:StreamBuffer,allow_newline:bool=True)->list[str]:chunks=[]length=0whilestream.peek(length)==" ":length+=1whitespaces=stream.prefix(length)stream.forward(length)ch=stream.peek()ifallow_newlineandchin_CHARS_NEWLINE:line_break=_scan_line_break(stream)breaks=[]whilestream.peek()in_CHARS_SPACE_NEWLINE:ifstream.peek()==" ":stream.forward()else:breaks.append(_scan_line_break(stream))ifline_break!="\n":chunks.append(line_break)elifnotbreaks:chunks.append(" ")chunks.extend(breaks)elifwhitespaces:chunks.append(whitespaces)returnchunksdef_scan_line_break(stream:StreamBuffer)->str:# Transforms:# '\r\n' : '\n'# '\r' : '\n'# '\n' : '\n'# '\x85' : '\n'# '\u2028' : '\u2028'# '\u2029 : '\u2029'# default : ''ch=stream.peek()ifchin"\r\n\x85":ifstream.prefix(2)=="\r\n":stream.forward(2)else:stream.forward()return"\n"elifchin"\u2028\u2029":stream.forward()returnchreturn""def_scan_flow_scalar(stream:StreamBuffer,style:Literal["'",'"'],is_key:bool=False)->KeyToken|ValueToken:double=style=='"'chunks=[]start_mark=stream.get_position()quote=stream.peek()stream.forward()chunks.extend(_scan_flow_scalar_non_spaces(stream,double,start_mark))whilestream.peek()!=quote:chunks.extend(_scan_flow_scalar_spaces(stream,start_mark))chunks.extend(_scan_flow_scalar_non_spaces(stream,double,start_mark))stream.forward()end_mark=stream.get_position()return(KeyToken(start_mark,end_mark,"".join(chunks),style)ifis_keyelseValueToken(start_mark,end_mark,"".join(chunks),style))def_scan_flow_scalar_non_spaces(stream:StreamBuffer,double:bool,start_mark:Position)->list[str]:chunks=[]whileTrue:length=0whilestream.peek(length)notin"'\"\\"+_CHARS_END_SPACE_TAB_NEWLINE:length+=1iflength:chunks.append(stream.prefix(length))stream.forward(length)ch=stream.peek()ifnotdoubleandch=="'"andstream.peek(1)=="'":chunks.append("'")stream.forward(2)elif(doubleandch=="'")or(notdoubleandchin'"\\'):chunks.append(ch)stream.forward()elifdoubleandch=="\\":stream.forward()ch=stream.peek()ifchin_ESCAPE_REPLACEMENTS:chunks.append(_ESCAPE_REPLACEMENTS[ch])stream.forward()elifchin_ESCAPE_CODES:length=_ESCAPE_CODES[ch]stream.forward()forkinrange(length):ifstream.peek(k)notin"0123456789ABCDEFabcdef":raiseTokenizeError("expected escape sequence of %d hexadecimal numbers, but found %r"%(length,stream.peek(k)),stream.get_position(),"while scanning a double-quoted scalar",start_mark,)code=int(stream.prefix(length),16)chunks.append(chr(code))stream.forward(length)elifchin_CHARS_NEWLINE:_scan_line_break(stream)chunks.extend(_scan_flow_scalar_breaks(stream))else:raiseTokenizeError(f"found unknown escape character {ch!r}",stream.get_position(),"while scanning a double-quoted scalar",start_mark,)else:returnchunksdef_scan_flow_scalar_spaces(stream:StreamBuffer,start_mark:Position)->list[str]:chunks=[]length=0whilestream.peek(length)in" \t":length+=1whitespaces=stream.prefix(length)stream.forward(length)ch=stream.peek()ifch==_CHARS_END:raiseTokenizeError("found unexpected end of stream",stream.get_position(),"while scanning a quoted scalar",start_mark,)elifchin_CHARS_NEWLINE:line_break=_scan_line_break(stream)breaks=_scan_flow_scalar_breaks(stream)ifline_break!="\n":chunks.append(line_break)elifnotbreaks:chunks.append(" ")chunks.extend(breaks)else:chunks.append(whitespaces)returnchunksdef_scan_flow_scalar_breaks(stream:StreamBuffer)->list[str]:chunks=[]whileTrue:whilestream.peek()in" \t":stream.forward()ifstream.peek()in_CHARS_NEWLINE:chunks.append(_scan_line_break(stream))else:returnchunksdef_scan_block_scalar(stream:StreamBuffer,style:Literal["|",">"],state:State)->ValueToken:indent=0folded=style==">"chunks=[]start_mark=stream.get_position()# Scan the header.stream.forward()chomping,increment=_scan_block_scalar_indicators(stream,start_mark)_scan_block_scalar_ignored_line(stream,start_mark,state)# Determine the indentation level and go to the first non-empty line.min_indent=indent+1ifmin_indent<1:min_indent=1ifincrementisNone:breaks,max_indent,end_mark=_scan_block_scalar_indentation(stream)indent=max(min_indent,max_indent)else:indent=min_indent+increment-1breaks,end_mark=_scan_block_scalar_breaks(stream,indent)line_break=""# Scan the inner part of the block scalar.whilestream.column==indentandstream.peek()!=_CHARS_END:chunks.extend(breaks)leading_non_space=stream.peek()notin" \t"length=0whilestream.peek(length)notin_CHARS_END_NEWLINE:length+=1chunks.append(stream.prefix(length))stream.forward(length)line_break=_scan_line_break(stream)breaks,end_mark=_scan_block_scalar_breaks(stream,indent)ifstream.column==indentandstream.peek()!=_CHARS_END:if(foldedandline_break=="\n"andleading_non_spaceandstream.peek()notin" \t"):ifnotbreaks:chunks.append(" ")else:chunks.append(line_break)else:break# Chomp the tail.ifchompingisnotFalse:chunks.append(line_break)ifchompingisTrue:chunks.extend(breaks)# We are done.returnValueToken(start_mark,end_mark,"".join(chunks),style)def_scan_block_scalar_indicators(stream:StreamBuffer,start_mark:Position)->tuple[bool|None,int|None]:chomping=Noneincrement=Nonech=stream.peek()ifchin"+-":chomping=ch=="+"stream.forward()ch=stream.peek()ifchin"0123456789":increment=int(ch)ifincrement==0:raiseTokenizeError("expected indentation indicator in the range 1-9, but found 0",stream.get_position(),"while scanning a block scalar",start_mark,)stream.forward()elifchin"0123456789":increment=int(ch)ifincrement==0:raiseTokenizeError("expected indentation indicator in the range 1-9, but found 0",stream.get_position(),"while scanning a block scalar",start_mark,)stream.forward()ch=stream.peek()ifchin"+-":chomping=ch=="+"stream.forward()ch=stream.peek()ifchnotin_CHARS_END_SPACE_NEWLINE:raiseTokenizeError(f"expected chomping or indentation indicators, but found {ch!r}",stream.get_position(),"while scanning a block scalar",start_mark,)returnchomping,incrementdef_scan_block_scalar_ignored_line(stream:StreamBuffer,start_mark:Position,state:State)->None:whilestream.peek()==" ":stream.forward()ifstream.peek()=="#":state.has_comments=Truewhilestream.peek()notin_CHARS_END_NEWLINE:stream.forward()ch=stream.peek()ifchnotin_CHARS_END_NEWLINE:raiseTokenizeError(f"expected a comment or a line break, but found {ch!r}",stream.get_position(),"while scanning a block scalar",start_mark,)_scan_line_break(stream)def_scan_block_scalar_indentation(stream:StreamBuffer,)->tuple[list[str],int,Position]:chunks=[]max_indent=0end_mark=stream.get_position()whilestream.peek()in_CHARS_SPACE_NEWLINE:ifstream.peek()!=" ":chunks.append(_scan_line_break(stream))end_mark=stream.get_position()else:stream.forward()ifstream.column>max_indent:max_indent=stream.columnreturnchunks,max_indent,end_markdef_scan_block_scalar_breaks(stream:StreamBuffer,indent:int)->tuple[list[str],Position]:chunks=[]end_mark=stream.get_position()whilestream.column<indentandstream.peek()==" ":stream.forward()whilestream.peek()in_CHARS_NEWLINE:chunks.append(_scan_line_break(stream))end_mark=stream.get_position()whilestream.column<indentandstream.peek()==" ":stream.forward()returnchunks,end_mark_CHARS_END:Final[str]="\0"_CHARS_NEWLINE:Final[str]="\r\n\x85\u2028\u2029"_CHARS_END_NEWLINE:Final[str]="\0\r\n\x85\u2028\u2029"_CHARS_SPACE_NEWLINE:Final[str]=" \r\n\x85\u2028\u2029"_CHARS_END_SPACE_NEWLINE:Final[str]="\0\r\n\x85\u2028\u2029"_CHARS_END_SPACE_TAB_NEWLINE:Final[str]="\0\t\r\n\x85\u2028\u2029"_ESCAPE_REPLACEMENTS:Final[dict[str,str]]={"0":"\0","a":"\x07","b":"\x08","t":"\x09","\t":"\x09","n":"\x0a","v":"\x0b","f":"\x0c","r":"\x0d","e":"\x1b"," ":"\x20",'"':'"',"\\":"\\","/":"/","N":"\x85","_":"\xa0","L":"\u2028","P":"\u2029",}_ESCAPE_CODES:Final[dict[str,int]]={"x":2,"u":4,"U":8,}