Source code for parsy

from __future__ import annotations

import enum
import operator
import re
from dataclasses import dataclass
from functools import wraps
from typing import Any, Callable, FrozenSet

__version__ = "2.1"

noop = lambda x: x


def line_info_at(stream, index):
    if index > len(stream):
        raise ValueError("invalid index")
    line = stream.count("\n", 0, index)
    last_nl = stream.rfind("\n", 0, index)
    col = index - (last_nl + 1)
    return (line, col)


class ParseError(RuntimeError):
    def __init__(self, expected, stream, index):
        self.expected = expected
        self.stream = stream
        self.index = index

    def line_info(self):
        try:
            return "{}:{}".format(*line_info_at(self.stream, self.index))
        except (TypeError, AttributeError):  # not a str
            return str(self.index)

    def __str__(self):
        expected_list = sorted(repr(e) for e in self.expected)

        if len(expected_list) == 1:
            return f"expected {expected_list[0]} at {self.line_info()}"
        else:
            return f"expected one of {', '.join(expected_list)} at {self.line_info()}"


[docs]@dataclass class Result: status: bool index: int value: Any furthest: int expected: FrozenSet[str]
[docs] @staticmethod def success(index, value): return Result(True, index, value, -1, frozenset())
[docs] @staticmethod def failure(index, expected): return Result(False, -1, None, index, frozenset([expected]))
# collect the furthest failure from self and other def aggregate(self, other): if not other: return self if self.furthest > other.furthest: return self elif self.furthest == other.furthest: # if we both have the same failure index, we combine the expected messages. return Result(self.status, self.index, self.value, self.furthest, self.expected | other.expected) else: return Result(self.status, self.index, self.value, other.furthest, other.expected)
[docs]class Parser: """ A Parser is an object that wraps a function whose arguments are a string to be parsed and the index on which to begin parsing. The function should return either Result.success(next_index, value), where the next index is where to continue the parse and the value is the yielded value, or Result.failure(index, expected), where expected is a string indicating what was expected, and the index is the index of the failure. """
[docs] def __init__(self, wrapped_fn: Callable[[str | bytes | list, int], Result]): """ Creates a new Parser from a function that takes a stream and returns a Result. """ self.wrapped_fn = wrapped_fn
def __call__(self, stream: str | bytes | list, index: int): return self.wrapped_fn(stream, index)
[docs] def parse(self, stream: str | bytes | list) -> Any: """Parses a string or list of tokens and returns the result or raise a ParseError.""" (result, _) = (self << eof).parse_partial(stream) return result
[docs] def parse_partial(self, stream: str | bytes | list) -> tuple[Any, str | bytes | list]: """ Parses the longest possible prefix of a given string. Returns a tuple of the result and the unparsed remainder, or raises ParseError """ result = self(stream, 0) if result.status: return (result.value, stream[result.index :]) else: raise ParseError(result.expected, stream, result.furthest)
[docs] def bind(self, bind_fn): @Parser def bound_parser(stream, index): result = self(stream, index) if result.status: next_parser = bind_fn(result.value) return next_parser(stream, result.index).aggregate(result) else: return result return bound_parser
[docs] def map(self, map_function: Callable) -> Parser: """ Returns a parser that transforms the produced value of the initial parser with map_function. """ return self.bind(lambda res: success(map_function(res)))
[docs] def combine(self, combine_fn: Callable) -> Parser: """ Returns a parser that transforms the produced values of the initial parser with ``combine_fn``, passing the arguments using ``*args`` syntax. The initial parser should return a list/sequence of parse results. """ return self.bind(lambda res: success(combine_fn(*res)))
[docs] def combine_dict(self, combine_fn: Callable) -> Parser: """ Returns a parser that transforms the value produced by the initial parser using the supplied function/callable, passing the arguments using the ``**kwargs`` syntax. The value produced by the initial parser must be a mapping/dictionary from names to values, or a list of two-tuples, or something else that can be passed to the ``dict`` constructor. If ``None`` is present as a key in the dictionary it will be removed before passing to ``fn``, as will all keys starting with ``_``. """ return self.bind( lambda res: success( combine_fn( **{ k: v for k, v in dict(res).items() if k is not None and not (isinstance(k, str) and k.startswith("_")) } ) ) )
[docs] def concat(self) -> Parser: """ Returns a parser that concatenates together (as a string) the previously produced values. """ return self.map("".join)
[docs] def then(self, other: Parser) -> Parser: """ Returns a parser which, if the initial parser succeeds, will continue parsing with ``other``. This will produce the value produced by ``other``. """ return seq(self, other).combine(lambda left, right: right)
[docs] def skip(self, other: Parser) -> Parser: """ Returns a parser which, if the initial parser succeeds, will continue parsing with ``other``. It will produce the value produced by the initial parser. """ return seq(self, other).combine(lambda left, right: left)
[docs] def result(self, value: Any) -> Parser: """ Returns a parser that, if the initial parser succeeds, always produces the passed in ``value``. """ return self >> success(value)
[docs] def many(self) -> Parser: """ Returns a parser that expects the initial parser 0 or more times, and produces a list of the results. """ return self.times(0, float("inf"))
[docs] def times(self, min: int, max: int = None) -> Parser: """ Returns a parser that expects the initial parser at least ``min`` times, and at most ``max`` times, and produces a list of the results. If only one argument is given, the parser is expected exactly that number of times. """ if max is None: max = min @Parser def times_parser(stream, index): values = [] times = 0 result = None while times < max: result = self(stream, index).aggregate(result) if result.status: values.append(result.value) index = result.index times += 1 elif times >= min: break else: return result return Result.success(index, values).aggregate(result) return times_parser
[docs] def at_most(self, n: int) -> Parser: """ Returns a parser that expects the initial parser at most ``n`` times, and produces a list of the results. """ return self.times(0, n)
[docs] def at_least(self, n: int) -> Parser: """ Returns a parser that expects the initial parser at least ``n`` times, and produces a list of the results. """ return self.times(n) + self.many()
[docs] def optional(self, default: Any = None) -> Parser: """ Returns a parser that expects the initial parser zero or once, and maps the result to a given default value in the case of no match. If no default value is given, ``None`` is used. """ return self.times(0, 1).map(lambda v: v[0] if v else default)
[docs] def until(self, other: Parser, min: int = 0, max: int = float("inf"), consume_other: bool = False) -> Parser: """ Returns a parser that expects the initial parser followed by ``other``. The initial parser is expected at least ``min`` times and at most ``max`` times. By default, it does not consume ``other`` and it produces a list of the results excluding ``other``. If ``consume_other`` is ``True`` then ``other`` is consumed and its result is included in the list of results. """ @Parser def until_parser(stream, index): values = [] times = 0 while True: # try parser first res = other(stream, index) if res.status and times >= min: if consume_other: # consume other values.append(res.value) index = res.index return Result.success(index, values) # exceeded max? if times >= max: # return failure, it matched parser more than max times return Result.failure(index, f"at most {max} items") # failed, try parser result = self(stream, index) if result.status: # consume values.append(result.value) index = result.index times += 1 elif times >= min: # return failure, parser is not followed by other return Result.failure(index, "did not find other parser") else: # return failure, it did not match parser at least min times return Result.failure(index, f"at least {min} items; got {times} item(s)") return until_parser
[docs] def sep_by(self, sep: Parser, *, min: int = 0, max: int = float("inf")) -> Parser: """ Returns a new parser that repeats the initial parser and collects the results in a list. Between each item, the ``sep`` parser is run (and its return value is discarded). By default it repeats with no limit, but minimum and maximum values can be supplied. """ zero_times = success([]) if max == 0: return zero_times res = self.times(1) + (sep >> self).times(min - 1, max - 1) if min == 0: res |= zero_times return res
[docs] def desc(self, description: str) -> Parser: """ Returns a new parser with a description added, which is used in the error message if parsing fails. """ @Parser def desc_parser(stream, index): result = self(stream, index) if result.status: return result else: return Result.failure(index, description) return desc_parser
[docs] def mark(self) -> Parser: """ Returns a parser that wraps the initial parser's result in a value containing column and line information of the match, as well as the original value. The new value is a 3-tuple: ((start_row, start_column), original_value, (end_row, end_column)) """ @generate def marked(): start = yield line_info body = yield self end = yield line_info return (start, body, end) return marked
[docs] def tag(self, name: str) -> Parser: """ Returns a parser that wraps the produced value of the initial parser in a 2 tuple containing ``(name, value)``. This provides a very simple way to label parsed components """ return self.map(lambda v: (name, v))
[docs] def should_fail(self, description: str) -> Parser: """ Returns a parser that fails when the initial parser succeeds, and succeeds when the initial parser fails (consuming no input). A description must be passed which is used in parse failure messages. This is essentially a negative lookahead """ @Parser def fail_parser(stream, index): res = self(stream, index) if res.status: return Result.failure(index, description) return Result.success(index, res) return fail_parser
def __add__(self, other: Parser) -> Parser: return seq(self, other).combine(operator.add) def __mul__(self, other: Parser) -> Parser: if isinstance(other, range): return self.times(other.start, other.stop - 1) return self.times(other) def __or__(self, other: Parser) -> Parser: return alt(self, other) # haskelley operators, for fun # # >> def __rshift__(self, other: Parser) -> Parser: return self.then(other) # << def __lshift__(self, other: Parser) -> Parser: return self.skip(other)
[docs]def alt(*parsers: Parser) -> Parser: """ Creates a parser from the passed in argument list of alternative parsers, which are tried in order, moving to the next one if the current one fails. """ if not parsers: return fail("<empty alt>") @Parser def alt_parser(stream, index): result = None for parser in parsers: result = parser(stream, index).aggregate(result) if result.status: return result return result return alt_parser
[docs]def seq(*parsers: Parser, **kw_parsers: Parser) -> Parser: """ Takes a list of parsers, runs them in order, and collects their individuals results in a list, or in a dictionary if you pass them as keyword arguments. """ if not parsers and not kw_parsers: return success([]) if parsers and kw_parsers: raise ValueError("Use either positional arguments or keyword arguments with seq, not both") if parsers: @Parser def seq_parser(stream, index): result = None values = [] for parser in parsers: result = parser(stream, index).aggregate(result) if not result.status: return result index = result.index values.append(result.value) return Result.success(index, values).aggregate(result) return seq_parser else: @Parser def seq_kwarg_parser(stream, index): result = None values = {} for name, parser in kw_parsers.items(): result = parser(stream, index).aggregate(result) if not result.status: return result index = result.index values[name] = result.value return Result.success(index, values).aggregate(result) return seq_kwarg_parser
[docs]def generate(fn) -> Parser: """ Creates a parser from a generator function """ if isinstance(fn, str): return lambda f: generate(f).desc(fn) @Parser @wraps(fn) def generated(stream, index): # start up the generator iterator = fn() result = None value = None try: while True: next_parser = iterator.send(value) result = next_parser(stream, index).aggregate(result) if not result.status: return result value = result.value index = result.index except StopIteration as stop: returnVal = stop.value if isinstance(returnVal, Parser): return returnVal(stream, index).aggregate(result) return Result.success(index, returnVal).aggregate(result) return generated
index = Parser(lambda _, index: Result.success(index, index)) line_info = Parser(lambda stream, index: Result.success(index, line_info_at(stream, index)))
[docs]def success(value: Any) -> Parser: """ Returns a parser that does not consume any of the stream, but produces ``value``. """ return Parser(lambda _, index: Result.success(index, value))
[docs]def fail(expected: str) -> Parser: """ Returns a parser that always fails with the provided error message. """ return Parser(lambda _, index: Result.failure(index, expected))
[docs]def string(expected_string: str, transform: Callable[[str], str] = noop) -> Parser: """ Returns a parser that expects the ``expected_string`` and produces that string value. Optionally, a transform function can be passed, which will be used on both the expected string and tested string. """ slen = len(expected_string) transformed_s = transform(expected_string) @Parser def string_parser(stream, index): if transform(stream[index : index + slen]) == transformed_s: return Result.success(index + slen, expected_string) else: return Result.failure(index, expected_string) return string_parser
[docs]def regex(exp: str, flags=0, group: int | str | tuple = 0) -> Parser: """ Returns a parser that expects the given ``exp``, and produces the matched string. ``exp`` can be a compiled regular expression, or a string which will be compiled with the given ``flags``. Optionally, accepts ``group``, which is passed to re.Match.group https://docs.python.org/3/library/re.html#re.Match.group> to return the text from a capturing group in the regex instead of the entire match. """ if isinstance(exp, (str, bytes)): exp = re.compile(exp, flags) if isinstance(group, (str, int)): group = (group,) @Parser def regex_parser(stream, index): match = exp.match(stream, index) if match: return Result.success(match.end(), match.group(*group)) else: return Result.failure(index, exp.pattern) return regex_parser
[docs]def test_item(func: Callable[..., bool], description: str) -> Parser: """ Returns a parser that tests a single item from the list of items being consumed, using the callable ``func``. If ``func`` returns ``True``, the parse succeeds, otherwise the parse fails with the description ``description``. """ @Parser def test_item_parser(stream, index): if index < len(stream): if isinstance(stream, bytes): # Subscripting bytes with `[index]` instead of # `[index:index + 1]` returns an int item = stream[index : index + 1] else: item = stream[index] if func(item): return Result.success(index + 1, item) return Result.failure(index, description) return test_item_parser
[docs]def test_char(func: Callable[..., bool], description: str) -> Parser: """ Returns a parser that tests a single character with the callable ``func``. If ``func`` returns ``True``, the parse succeeds, otherwise the parse fails with the description ``description``. """ # Implementation is identical to test_item return test_item(func, description)
[docs]def match_item(item: Any, description: str = None) -> Parser: """ Returns a parser that tests the next item (or character) from the stream (or string) for equality against the provided item. Optionally a string description can be passed. """ if description is None: description = str(item) return test_item(lambda i: item == i, description)
[docs]def string_from(*strings: str, transform: Callable[[str], str] = noop): """ Accepts a sequence of strings as positional arguments, and returns a parser that matches and returns one string from the list. The list is first sorted in descending length order, so that overlapping strings are handled correctly by checking the longest one first. """ # Sort longest first, so that overlapping options work correctly return alt(*(string(s, transform) for s in sorted(strings, key=len, reverse=True)))
[docs]def char_from(string: str | bytes): """ Accepts a string and returns a parser that matches and returns one character from the string. """ if isinstance(string, bytes): return test_char(lambda c: c in string, b"[" + string + b"]") else: return test_char(lambda c: c in string, "[" + string + "]")
[docs]def peek(parser: Parser) -> Parser: """ Returns a lookahead parser that parses the input stream without consuming chars. """ @Parser def peek_parser(stream, index): result = parser(stream, index) if result.status: return Result.success(index, result.value) else: return result return peek_parser
any_char = test_char(lambda c: True, "any character") whitespace = regex(r"\s+") letter = test_char(lambda c: c.isalpha(), "a letter") digit = test_char(lambda c: c.isdigit(), "a digit") decimal_digit = char_from("0123456789")
[docs]@Parser def eof(stream, index): """ A parser that only succeeds if the end of the stream has been reached. """ if index >= len(stream): return Result.success(index, None) else: return Result.failure(index, "EOF")
[docs]def from_enum(enum_cls: type[enum.Enum], transform=noop) -> Parser: """ Given a class that is an enum.Enum class https://docs.python.org/3/library/enum.html , returns a parser that will parse the values (or the string representations of the values) and return the corresponding enum item. """ items = sorted( ((str(enum_item.value), enum_item) for enum_item in enum_cls), key=lambda t: len(t[0]), reverse=True ) return alt(*(string(value, transform=transform).result(enum_item) for value, enum_item in items))
[docs]class forward_declaration(Parser): """ An empty parser that can be used as a forward declaration, especially for parsers that need to be defined recursively. You must use `.become(parser)` before using. """ def __init__(self): pass def _raise_error(self, *args, **kwargs): raise ValueError("You must use 'become' before attempting to call `parse` or `parse_partial`") parse = _raise_error parse_partial = _raise_error def become(self, other: Parser): """ Take on the behavior of the given parser. """ self.__dict__ = other.__dict__ self.__class__ = other.__class__