Source code for parsy

# End-user documentation is in ../../doc/ and so is for the most part not
# duplicated here in the form of doc strings. Code comments and docstrings
# are mainly for internal use.

import operator
import re
from collections import namedtuple
from functools import wraps

from .version import __version__  # noqa: F401

noop = lambda x: x


def line_info_at(stream, index):
    if index > len(stream):
        raise ValueError("invalid index")
    line = stream.count("\n", 0, index)
    last_nl = stream.rfind("\n", 0, index)
    col = index - (last_nl + 1)
    return (line, col)


class ParseError(RuntimeError):
    def __init__(self, expected, stream, index):
        self.expected = expected
        self.stream = stream
        self.index = index

    def line_info(self):
        try:
            return "{}:{}".format(*line_info_at(self.stream, self.index))
        except (TypeError, AttributeError):  # not a str
            return str(self.index)

    def __str__(self):
        expected_list = sorted(repr(e) for e in self.expected)

        if len(expected_list) == 1:
            return f"expected {expected_list[0]} at {self.line_info()}"
        else:
            return f"expected one of {', '.join(expected_list)} at {self.line_info()}"


[docs]class Result(namedtuple("Result", "status index value furthest expected")):
[docs] @staticmethod def success(index, value): return Result(True, index, value, -1, frozenset())
[docs] @staticmethod def failure(index, expected): return Result(False, -1, None, index, frozenset([expected]))
# collect the furthest failure from self and other def aggregate(self, other): if not other: return self if self.furthest > other.furthest: return self elif self.furthest == other.furthest: # if we both have the same failure index, we combine the expected messages. return Result(self.status, self.index, self.value, self.furthest, self.expected | other.expected) else: return Result(self.status, self.index, self.value, other.furthest, other.expected)
[docs]class Parser: """ A Parser is an object that wraps a function whose arguments are a string to be parsed and the index on which to begin parsing. The function should return either Result.success(next_index, value), where the next index is where to continue the parse and the value is the yielded value, or Result.failure(index, expected), where expected is a string indicating what was expected, and the index is the index of the failure. """
[docs] def __init__(self, wrapped_fn): self.wrapped_fn = wrapped_fn
def __call__(self, stream, index): return self.wrapped_fn(stream, index)
[docs] def parse(self, stream): """Parse a string or list of tokens and return the result or raise a ParseError.""" (result, _) = (self << eof).parse_partial(stream) return result
[docs] def parse_partial(self, stream): """ Parse the longest possible prefix of a given string. Return a tuple of the result and the rest of the string, or raise a ParseError. """ result = self(stream, 0) if result.status: return (result.value, stream[result.index :]) else: raise ParseError(result.expected, stream, result.furthest)
[docs] def bind(self, bind_fn): @Parser def bound_parser(stream, index): result = self(stream, index) if result.status: next_parser = bind_fn(result.value) return next_parser(stream, result.index).aggregate(result) else: return result return bound_parser
[docs] def map(self, map_fn): return self.bind(lambda res: success(map_fn(res)))
[docs] def combine(self, combine_fn): return self.bind(lambda res: success(combine_fn(*res)))
[docs] def combine_dict(self, combine_fn): return self.bind( lambda res: success( combine_fn( **{ k: v for k, v in dict(res).items() if k is not None and not (isinstance(k, str) and k.startswith("_")) } ) ) )
[docs] def concat(self): return self.map("".join)
[docs] def then(self, other): return seq(self, other).combine(lambda left, right: right)
[docs] def skip(self, other): return seq(self, other).combine(lambda left, right: left)
[docs] def result(self, res): return self >> success(res)
[docs] def many(self): return self.times(0, float("inf"))
[docs] def times(self, min, max=None): if max is None: max = min @Parser def times_parser(stream, index): values = [] times = 0 result = None while times < max: result = self(stream, index).aggregate(result) if result.status: values.append(result.value) index = result.index times += 1 elif times >= min: break else: return result return Result.success(index, values).aggregate(result) return times_parser
[docs] def at_most(self, n): return self.times(0, n)
[docs] def at_least(self, n): return self.times(n) + self.many()
[docs] def optional(self, default=None): return self.times(0, 1).map(lambda v: v[0] if v else default)
[docs] def until(self, other, min=0, max=float("inf"), consume_other=False): @Parser def until_parser(stream, index): values = [] times = 0 while True: # try parser first res = other(stream, index) if res.status and times >= min: if consume_other: # consume other values.append(res.value) index = res.index return Result.success(index, values) # exceeded max? if times >= max: # return failure, it matched parser more than max times return Result.failure(index, f"at most {max} items") # failed, try parser result = self(stream, index) if result.status: # consume values.append(result.value) index = result.index times += 1 elif times >= min: # return failure, parser is not followed by other return Result.failure(index, "did not find other parser") else: # return failure, it did not match parser at least min times return Result.failure(index, f"at least {min} items; got {times} item(s)") return until_parser
[docs] def sep_by(self, sep, *, min=0, max=float("inf")): zero_times = success([]) if max == 0: return zero_times res = self.times(1) + (sep >> self).times(min - 1, max - 1) if min == 0: res |= zero_times return res
[docs] def desc(self, description): @Parser def desc_parser(stream, index): result = self(stream, index) if result.status: return result else: return Result.failure(index, description) return desc_parser
[docs] def mark(self): @generate def marked(): start = yield line_info body = yield self end = yield line_info return (start, body, end) return marked
[docs] def tag(self, name): return self.map(lambda v: (name, v))
[docs] def should_fail(self, description): @Parser def fail_parser(stream, index): res = self(stream, index) if res.status: return Result.failure(index, description) return Result.success(index, res) return fail_parser
def __add__(self, other): return seq(self, other).combine(operator.add) def __mul__(self, other): if isinstance(other, range): return self.times(other.start, other.stop - 1) return self.times(other) def __or__(self, other): return alt(self, other) # haskelley operators, for fun # # >> def __rshift__(self, other): return self.then(other) # << def __lshift__(self, other): return self.skip(other)
[docs]def alt(*parsers): if not parsers: return fail("<empty alt>") @Parser def alt_parser(stream, index): result = None for parser in parsers: result = parser(stream, index).aggregate(result) if result.status: return result return result return alt_parser
[docs]def seq(*parsers, **kw_parsers): """ Takes a list of list of parsers, runs them in order, and collects their individuals results in a list """ if not parsers and not kw_parsers: return success([]) if parsers and kw_parsers: raise ValueError("Use either positional arguments or keyword arguments with seq, not both") if parsers: @Parser def seq_parser(stream, index): result = None values = [] for parser in parsers: result = parser(stream, index).aggregate(result) if not result.status: return result index = result.index values.append(result.value) return Result.success(index, values).aggregate(result) return seq_parser else: @Parser def seq_kwarg_parser(stream, index): result = None values = {} for name, parser in kw_parsers.items(): result = parser(stream, index).aggregate(result) if not result.status: return result index = result.index values[name] = result.value return Result.success(index, values).aggregate(result) return seq_kwarg_parser
# combinator syntax
[docs]def generate(fn): if isinstance(fn, str): return lambda f: generate(f).desc(fn) @Parser @wraps(fn) def generated(stream, index): # start up the generator iterator = fn() result = None value = None try: while True: next_parser = iterator.send(value) result = next_parser(stream, index).aggregate(result) if not result.status: return result value = result.value index = result.index except StopIteration as stop: returnVal = stop.value if isinstance(returnVal, Parser): return returnVal(stream, index).aggregate(result) return Result.success(index, returnVal).aggregate(result) return generated
index = Parser(lambda _, index: Result.success(index, index)) line_info = Parser(lambda stream, index: Result.success(index, line_info_at(stream, index)))
[docs]def success(val): return Parser(lambda _, index: Result.success(index, val))
[docs]def fail(expected): return Parser(lambda _, index: Result.failure(index, expected))
[docs]def string(s, transform=noop): slen = len(s) transformed_s = transform(s) @Parser def string_parser(stream, index): if transform(stream[index : index + slen]) == transformed_s: return Result.success(index + slen, s) else: return Result.failure(index, s) return string_parser
[docs]def regex(exp, flags=0, group=0): if isinstance(exp, (str, bytes)): exp = re.compile(exp, flags) if isinstance(group, (str, int)): group = (group,) @Parser def regex_parser(stream, index): match = exp.match(stream, index) if match: return Result.success(match.end(), match.group(*group)) else: return Result.failure(index, exp.pattern) return regex_parser
[docs]def test_item(func, description): @Parser def test_item_parser(stream, index): if index < len(stream): if isinstance(stream, bytes): # Subscripting bytes with `[index]` instead of # `[index:index + 1]` returns an int item = stream[index : index + 1] else: item = stream[index] if func(item): return Result.success(index + 1, item) return Result.failure(index, description) return test_item_parser
[docs]def test_char(func, description): # Implementation is identical to test_item return test_item(func, description)
[docs]def match_item(item, description=None): if description is None: description = str(item) return test_item(lambda i: item == i, description)
[docs]def string_from(*strings, transform=noop): # Sort longest first, so that overlapping options work correctly return alt(*(string(s, transform) for s in sorted(strings, key=len, reverse=True)))
[docs]def char_from(string): if isinstance(string, bytes): return test_char(lambda c: c in string, b"[" + string + b"]") else: return test_char(lambda c: c in string, "[" + string + "]")
[docs]def peek(parser): @Parser def peek_parser(stream, index): result = parser(stream, index) if result.status: return Result.success(index, result.value) else: return result return peek_parser
any_char = test_char(lambda c: True, "any character") whitespace = regex(r"\s+") letter = test_char(lambda c: c.isalpha(), "a letter") digit = test_char(lambda c: c.isdigit(), "a digit") decimal_digit = char_from("0123456789")
[docs]@Parser def eof(stream, index): if index >= len(stream): return Result.success(index, None) else: return Result.failure(index, "EOF")
[docs]def from_enum(enum_cls, transform=noop): items = sorted( ((str(enum_item.value), enum_item) for enum_item in enum_cls), key=lambda t: len(t[0]), reverse=True ) return alt(*(string(value, transform=transform).result(enum_item) for value, enum_item in items))
[docs]class forward_declaration(Parser): """ An empty parser that can be used as a forward declaration, especially for parsers that need to be defined recursively. You must use `.become(parser)` before using. """ def __init__(self): pass def _raise_error(self, *args, **kwargs): raise ValueError("You must use 'become' before attempting to call `parse` or `parse_partial`") parse = _raise_error parse_partial = _raise_error def become(self, other): self.__dict__ = other.__dict__ self.__class__ = other.__class__