Source code for parsy

# -*- coding: utf-8 -*- #

# End-user documentation is in ../../doc/ and so is for the most part not
# duplicated here in the form of doc strings. Code comments and docstrings
# are mainly for internal use.

import operator
import re
import sys
from collections import namedtuple
from functools import wraps

from .version import __version__  # noqa: F401

noop = lambda x: x


def line_info_at(stream, index):
    if index > len(stream):
        raise ValueError("invalid index")
    line = stream.count("\n", 0, index)
    last_nl = stream.rfind("\n", 0, index)
    col = index - (last_nl + 1)
    return (line, col)


class ParseError(RuntimeError):
    def __init__(self, expected, stream, index):
        self.expected = expected
        self.stream = stream
        self.index = index

    def line_info(self):
        try:
            return '{}:{}'.format(*line_info_at(self.stream, self.index))
        except (TypeError, AttributeError):  # not a str
            return str(self.index)

    def __str__(self):
        expected_list = sorted(repr(e) for e in self.expected)

        if len(expected_list) == 1:
            return 'expected {} at {}'.format(expected_list[0], self.line_info())
        else:
            return 'expected one of {} at {}'.format(', '.join(expected_list), self.line_info())


[docs]class Result(namedtuple('Result', 'status index value furthest expected')):
[docs] @staticmethod def success(index, value): return Result(True, index, value, -1, frozenset())
[docs] @staticmethod def failure(index, expected): return Result(False, -1, None, index, frozenset([expected]))
# collect the furthest failure from self and other def aggregate(self, other): if not other: return self if self.furthest > other.furthest: return self elif self.furthest == other.furthest: # if we both have the same failure index, we combine the expected messages. return Result(self.status, self.index, self.value, self.furthest, self.expected | other.expected) else: return Result(self.status, self.index, self.value, other.furthest, other.expected)
[docs]class Parser(object): """ A Parser is an object that wraps a function whose arguments are a string to be parsed and the index on which to begin parsing. The function should return either Result.success(next_index, value), where the next index is where to continue the parse and the value is the yielded value, or Result.failure(index, expected), where expected is a string indicating what was expected, and the index is the index of the failure. """
[docs] def __init__(self, wrapped_fn): self.wrapped_fn = wrapped_fn
def __call__(self, stream, index): return self.wrapped_fn(stream, index)
[docs] def parse(self, stream): """Parse a string or list of tokens and return the result or raise a ParseError.""" (result, _) = (self << eof).parse_partial(stream) return result
[docs] def parse_partial(self, stream): """ Parse the longest possible prefix of a given string. Return a tuple of the result and the rest of the string, or raise a ParseError. """ result = self(stream, 0) if result.status: return (result.value, stream[result.index:]) else: raise ParseError(result.expected, stream, result.furthest)
[docs] def bind(self, bind_fn): @Parser def bound_parser(stream, index): result = self(stream, index) if result.status: next_parser = bind_fn(result.value) return next_parser(stream, result.index).aggregate(result) else: return result return bound_parser
[docs] def map(self, map_fn): return self.bind(lambda res: success(map_fn(res)))
[docs] def combine(self, combine_fn): return self.bind(lambda res: success(combine_fn(*res)))
[docs] def combine_dict(self, combine_fn): return self.bind(lambda res: success(combine_fn(**{ k: v for k, v in dict(res).items() if k is not None and not (isinstance(k, str) and k.startswith('_')) })))
[docs] def concat(self): return self.map(''.join)
[docs] def then(self, other): return seq(self, other).combine(lambda left, right: right)
[docs] def skip(self, other): return seq(self, other).combine(lambda left, right: left)
[docs] def result(self, res): return self >> success(res)
[docs] def many(self): return self.times(0, float('inf'))
[docs] def times(self, min, max=None): if max is None: max = min @Parser def times_parser(stream, index): values = [] times = 0 result = None while times < max: result = self(stream, index).aggregate(result) if result.status: values.append(result.value) index = result.index times += 1 elif times >= min: break else: return result return Result.success(index, values).aggregate(result) return times_parser
[docs] def at_most(self, n): return self.times(0, n)
[docs] def at_least(self, n): return self.times(n) + self.many()
[docs] def optional(self): return self.times(0, 1).map(lambda v: v[0] if v else None)
[docs] def sep_by(self, sep, *, min=0, max=float('inf')): zero_times = success([]) if max == 0: return zero_times res = self.times(1) + (sep >> self).times(min - 1, max - 1) if min == 0: res |= zero_times return res
[docs] def desc(self, description): @Parser def desc_parser(stream, index): result = self(stream, index) if result.status: return result else: return Result.failure(index, description) return desc_parser
[docs] def mark(self): @generate def marked(): start = yield line_info body = yield self end = yield line_info return (start, body, end) return marked
[docs] def tag(self, name): return self.map(lambda v: (name, v))
[docs] def should_fail(self, description): @Parser def fail_parser(stream, index): res = self(stream, index) if res.status: return Result.failure(index, description) return Result.success(index, res) return fail_parser
def __add__(self, other): return seq(self, other).combine(operator.add) def __mul__(self, other): if isinstance(other, range): return self.times(other.start, other.stop - 1) return self.times(other) def __or__(self, other): return alt(self, other) # haskelley operators, for fun # # >> def __rshift__(self, other): return self.then(other) # << def __lshift__(self, other): return self.skip(other)
[docs]def alt(*parsers): if not parsers: return fail('<empty alt>') @Parser def alt_parser(stream, index): result = None for parser in parsers: result = parser(stream, index).aggregate(result) if result.status: return result return result return alt_parser
if sys.version_info >= (3, 6): # Only 3.6 and later supports kwargs that remember their order, # so only have this kwarg signature on Python 3.6 and above def seq(*parsers, **kw_parsers): """ Takes a list of list of parsers, runs them in order, and collects their individuals results in a list """ if not parsers and not kw_parsers: return success([]) if parsers and kw_parsers: raise ValueError("Use either positional arguments or keyword arguments with seq, not both") if parsers: @Parser def seq_parser(stream, index): result = None values = [] for parser in parsers: result = parser(stream, index).aggregate(result) if not result.status: return result index = result.index values.append(result.value) return Result.success(index, values).aggregate(result) return seq_parser else: @Parser def seq_kwarg_parser(stream, index): result = None values = {} for name, parser in kw_parsers.items(): result = parser(stream, index).aggregate(result) if not result.status: return result index = result.index values[name] = result.value return Result.success(index, values).aggregate(result) return seq_kwarg_parser else:
[docs] def seq(*parsers): """ Takes a list of list of parsers, runs them in order, and collects their individuals results in a list """ if not parsers: return success([]) @Parser def seq_parser(stream, index): result = None values = [] for parser in parsers: result = parser(stream, index).aggregate(result) if not result.status: return result index = result.index values.append(result.value) return Result.success(index, values).aggregate(result) return seq_parser
# combinator syntax
[docs]def generate(fn): if isinstance(fn, str): return lambda f: generate(f).desc(fn) @Parser @wraps(fn) def generated(stream, index): # start up the generator iterator = fn() result = None value = None try: while True: next_parser = iterator.send(value) result = next_parser(stream, index).aggregate(result) if not result.status: return result value = result.value index = result.index except StopIteration as stop: returnVal = stop.value if isinstance(returnVal, Parser): return returnVal(stream, index).aggregate(result) return Result.success(index, returnVal).aggregate(result) return generated
index = Parser(lambda _, index: Result.success(index, index)) line_info = Parser(lambda stream, index: Result.success(index, line_info_at(stream, index)))
[docs]def success(val): return Parser(lambda _, index: Result.success(index, val))
[docs]def fail(expected): return Parser(lambda _, index: Result.failure(index, expected))
[docs]def string(s, transform=noop): slen = len(s) transformed_s = transform(s) @Parser def string_parser(stream, index): if transform(stream[index:index + slen]) == transformed_s: return Result.success(index + slen, s) else: return Result.failure(index, s) return string_parser
[docs]def regex(exp, flags=0): if isinstance(exp, str): exp = re.compile(exp, flags) @Parser def regex_parser(stream, index): match = exp.match(stream, index) if match: return Result.success(match.end(), match.group(0)) else: return Result.failure(index, exp.pattern) return regex_parser
[docs]def test_item(func, description): @Parser def test_item_parser(stream, index): if index < len(stream): item = stream[index] if func(item): return Result.success(index + 1, item) return Result.failure(index, description) return test_item_parser
[docs]def test_char(func, description): # Implementation is identical to test_item return test_item(func, description)
[docs]def match_item(item, description=None): if description is None: description = str(item) return test_item(lambda i: item == i, description)
[docs]def string_from(*strings, transform=noop): # Sort longest first, so that overlapping options work correctly return alt(*[string(s, transform) for s in sorted(strings, key=len, reverse=True)])
[docs]def char_from(string): return test_char(lambda c: c in string, "[" + string + "]")
[docs]def peek(parser): @Parser def peek_parser(stream, index): result = parser(stream, index) if result.status: return Result.success(index, result.value) else: return result return peek_parser
any_char = test_char(lambda c: True, "any character") whitespace = regex(r'\s+') letter = test_char(lambda c: c.isalpha(), 'a letter') digit = test_char(lambda c: c.isdigit(), 'a digit') decimal_digit = char_from("0123456789") @Parser def eof(stream, index): if index >= len(stream): return Result.success(index, None) else: return Result.failure(index, 'EOF')
[docs]def from_enum(enum_cls, transform=noop): items = sorted([(str(enum_item.value), enum_item) for enum_item in enum_cls], key=lambda t: len(t[0]), reverse=True) return alt(*[string(value, transform=transform).result(enum_item) for value, enum_item in items])