Other examples¶
This section has some further example parsers that you can study. There are also examples in the Tutorial and in Generating a parser.
JSON parser¶
from sys import stdin
from parsy import generate, regex, string
whitespace = regex(r'\s*')
lexeme = lambda p: p << whitespace
lbrace = lexeme(string('{'))
rbrace = lexeme(string('}'))
lbrack = lexeme(string('['))
rbrack = lexeme(string(']'))
colon = lexeme(string(':'))
comma = lexeme(string(','))
true = lexeme(string('true')).result(True)
false = lexeme(string('false')).result(False)
null = lexeme(string('null')).result(None)
number = lexeme(
regex(r'-?(0|[1-9][0-9]*)([.][0-9]+)?([eE][+-]?[0-9]+)?')
).map(float)
string_part = regex(r'[^"\\]+')
string_esc = string('\\') >> (
string('\\')
| string('/')
| string('"')
| string('b').result('\b')
| string('f').result('\f')
| string('n').result('\n')
| string('r').result('\r')
| string('t').result('\t')
| regex(r'u[0-9a-fA-F]{4}').map(lambda s: chr(int(s[1:], 16)))
)
quoted = lexeme(string('"') >> (string_part | string_esc).many().concat() << string('"'))
# Circular dependency between array and value means we use `generate` form here
@generate
def array():
yield lbrack
elements = yield value.sep_by(comma)
yield rbrack
return elements
@generate
def object_pair():
key = yield quoted
yield colon
val = yield value
return (key, val)
json_object = lbrace >> object_pair.sep_by(comma).map(dict) << rbrace
value = quoted | number | json_object | array | true | false | null
json = whitespace >> value
if __name__ == '__main__':
print(repr(json.parse(stdin.read())))
.proto file parser¶
A parser for the .proto
files for Protocol Buffers, version 3.
This example is useful in showing lots of simple custom data structures for holding the result of the parse.
# -*- coding: utf-8 -*-
# Parser for protocol buffer .proto files
import enum as stdlib_enum
from string import ascii_letters, digits, hexdigits, octdigits
import attr
from parsy import char_from, from_enum, generate, regex, seq, string
# This file follows the spec at
# https://developers.google.com/protocol-buffers/docs/reference/proto3-spec
# very closely.
# However, because we are parsing into useful objects, we do transformations
# along the way e.g. turning into integers, strings etc. and custom objects.
# Some of the lowest level items has been implemented using 'regex' and converting
# the descriptions to regular expressions. Higher level constructs have been
# implemented using other parsy primitives and combinators.
# Notes:
# 1. Whitespace is very badly defined in the 'spec', so we guess what is meant.
# 2. The spec doesn't allow for comments, and neither does this parser.
# Other places mention that C++ style comments are allowed. To support that,
# this parser would need to be changed into split lexing/parsing stages
# (otherwise you hit issues with comments start markers within string literals).
# 3. Other notes inline.
# Our utilities
optional_string = lambda s: string(s).times(0, 1).concat()
convert_decimal = int
convert_octal = lambda s: int(s, 8)
convert_hex = lambda s: int(s, 16)
exclude_none = lambda l: [i for i in l if i is not None]
def lexeme(p):
"""
From a parser (or string), make a parser that consumes
whitespace on either side.
"""
if isinstance(p, str):
p = string(p)
return regex(r'\s*') >> p << regex(r'\s*')
def is_present(p):
"""
Given a parser or string, make a parser that returns
True if the parser matches, False otherwise
"""
return lexeme(p).optional().map(lambda v: False if v is None else True)
# Our data structures
@attr.s
class Import:
identifier = attr.ib()
option = attr.ib()
@attr.s
class Package:
identifer = attr.ib()
@attr.s
class Option:
name = attr.ib()
value = attr.ib()
@attr.s
class Field:
repeated = attr.ib()
type = attr.ib()
name = attr.ib()
number = attr.ib()
options = attr.ib()
@attr.s
class OneOfField:
type = attr.ib()
name = attr.ib()
number = attr.ib()
options = attr.ib()
@attr.s
class OneOf:
name = attr.ib()
fields = attr.ib()
@attr.s
class Map:
key_type = attr.ib()
type = attr.ib()
name = attr.ib()
number = attr.ib()
options = attr.ib()
@attr.s
class Reserved:
items = attr.ib()
@attr.s
class Range:
from_ = attr.ib()
to = attr.ib()
@attr.s
class EnumField:
name = attr.ib()
value = attr.ib()
options = attr.ib()
@attr.s
class Enum:
name = attr.ib()
body = attr.ib()
@attr.s
class Message:
name = attr.ib()
body = attr.ib()
@attr.s
class Service:
name = attr.ib()
body = attr.ib()
@attr.s
class Rpc:
name = attr.ib()
request_stream = attr.ib()
request_message_type = attr.ib()
response_stream = attr.ib()
response_message_type = attr.ib()
options = attr.ib()
@attr.s
class Proto:
syntax = attr.ib()
statements = attr.ib()
# Enums:
class ImportOption(stdlib_enum.Enum):
WEAK = "weak"
PUBLIC = "public"
class Type(stdlib_enum.Enum):
DOUBLE = "double"
FLOAT = "float"
INT32 = "int32"
INT64 = "int64"
UINT32 = "uint32"
UINT64 = "uint64"
SINT32 = "sint32"
SINT64 = "sint64"
FIXED32 = "fixed32"
FIXED64 = "fixed64"
SFIXED32 = "sfixed32"
SFIXED64 = "sfixed64"
BOOL = "bool"
STRING = "string"
BYTES = "bytes"
class KeyType(stdlib_enum.Enum):
INT32 = "int32"
INT64 = "int64"
UINT32 = "uint32"
UINT64 = "uint64"
SINT32 = "sint32"
SINT64 = "sint64"
FIXED32 = "fixed32"
FIXED64 = "fixed64"
SFIXED32 = "sfixed32"
SFIXED64 = "sfixed64"
BOOL = "bool"
STRING = "string"
# Some extra constants to avoid typing
SEMI = lexeme(";")
EQ = lexeme("=")
LPAREN = lexeme("(")
RPAREN = lexeme(")")
LBRACE = lexeme("{")
RBRACE = lexeme("}")
# -- Beginning of following spec --
# Letters and digits
letter = char_from(ascii_letters)
decimalDigit = char_from(digits)
octalDigit = char_from(octdigits)
hexDigit = char_from(hexdigits)
# Identifiers
# Compared to spec, we add some '_' prefixed items which are not wrapped in `lexeme`,
# on the assumption that spaces in the middle of identifiers are not accepted.
_ident = (letter + (letter | decimalDigit | string("_")).many().concat()).desc('ident')
ident = lexeme(_ident)
fullIdent = lexeme(ident + (string(".") + ident).many().concat()).desc('fullIdent')
_messageName = _ident
messageName = lexeme(ident).desc('messageName')
_enumName = ident
enumName = lexeme(_enumName).desc('enumName')
fieldName = ident.desc('fieldName')
oneofName = ident.desc('oneofName')
mapName = ident.desc('mapName')
serviceName = ident.desc('serviceName')
rpcName = ident.desc('rpcName')
messageType = optional_string(".") + (_ident + string(".")).many().concat() + _messageName
enumType = optional_string(".") + (_ident + string(".")).many().concat() + _enumName
# Integer literals
decimalLit = regex("[1-9][0-9]*").desc('decimalLit').map(convert_decimal)
octalLit = regex("0[0-7]*").desc('octalLit').map(convert_octal)
hexLit = regex("0[x|X][0-9a-fA-F]+").desc('octalLit').map(convert_hex)
intLit = decimalLit | octalLit | hexLit
# Floating-point literals
decimals = r'[0-9]+'
exponent = r'[e|E][+|-]?' + decimals
floatLit = regex(r'({decimals}\.({decimals})?({exponent})?)|{decimals}{exponent}|\.{decimals}({exponent})?'
.format(decimals=decimals, exponent=exponent)).desc('floatLit').map(float)
# Boolean
boolLit = (string("true").result(True) | string("false").result(False)).desc('boolLit')
# String literals
hexEscape = regex(r"\\[x|X]") >> regex("[0-9a-fA-F]{2}").map(convert_hex).map(chr)
octEscape = regex(r"\\") >> regex('[0-7]{2}').map(convert_octal).map(chr)
charEscape = regex(r"\\") >> (
string("a").result("\a") |
string("b").result("\b") |
string("f").result("\f") |
string("n").result("\n") |
string("r").result("\r") |
string("t").result("\t") |
string("v").result("\v") |
string("\\").result("\\") |
string("'").result("'") |
string('"').result('"')
)
escapes = hexEscape | octEscape | charEscape
# Correction to spec regarding " and ' inside quoted strings
strLit = (string("'") >> (escapes | regex(r"[^\0\n\'\\]")).many().concat() << string("'") |
string('"') >> (escapes | regex(r"[^\0\n\"\\]")).many().concat() << string('"')).desc('strLit')
quote = string("'") | string('"')
# EmptyStatement
emptyStatement = string(";").result(None)
# Signed numbers:
# (Extra compared to spec, to cope with need to produce signed numeric values)
signedNumberChange = lambda s, num: (-1) if s == "-" else (+1)
sign = regex("[-+]?")
signedIntLit = seq(sign, intLit).combine(signedNumberChange)
signedFloatLit = seq(sign, floatLit).combine(signedNumberChange)
# Constant
# put fullIdent at end to disabmiguate from boolLit
constant = signedIntLit | signedFloatLit | strLit | boolLit | fullIdent
# Syntax
syntax = lexeme("syntax") >> EQ >> quote >> string("proto3") << quote + SEMI
# Import Statement
import_option = from_enum(ImportOption)
import_ = seq(lexeme("import") >> import_option.optional().tag('option'),
lexeme(strLit).tag('identifier') << SEMI).combine_dict(Import)
# Package
package = seq(lexeme("package") >> fullIdent << SEMI).map(Package)
# Option
optionName = (ident | (LPAREN >> fullIdent << RPAREN)) + (string(".") + ident).many().concat()
option = seq(lexeme("option") >> optionName.tag('name'),
EQ >> constant.tag('value') << SEMI,
).combine_dict(Option)
# Normal field
type_ = lexeme(from_enum(Type) | messageType | enumType)
fieldNumber = lexeme(intLit)
fieldOption = seq(optionName.tag('name'),
EQ >> constant.tag('value')).combine_dict(Option)
fieldOptions = fieldOption.sep_by(lexeme(","), min=1)
fieldOptionList = (lexeme("[") >> fieldOptions << lexeme("]")).optional().map(
lambda o: [] if o is None else o)
field = seq(is_present("repeated").tag('repeated'),
type_.tag('type'),
fieldName.tag('name') << EQ,
fieldNumber.tag('number'),
fieldOptionList.tag('options') << SEMI,
).combine_dict(Field)
# Oneof and oneof field
oneofField = seq(type_.tag('type'),
fieldName.tag('name') << EQ,
fieldNumber.tag('number'),
fieldOptionList.tag('options') << SEMI,
).combine_dict(OneOfField)
oneof = seq(lexeme("oneof") >> oneofName.tag('name'),
LBRACE >>
(oneofField | emptyStatement).many().map(exclude_none).tag('fields')
<< RBRACE
).combine_dict(OneOf)
# Map field
keyType = lexeme(from_enum(KeyType))
mapField = seq(lexeme("map") >> lexeme("<") >> keyType.tag('key_type'),
lexeme(",") >> type_.tag('type'),
lexeme(">") >> mapName.tag('name'),
EQ >> fieldNumber.tag('number'),
fieldOptionList.tag('options') << SEMI
).combine_dict(Map)
# Reserved
range_ = seq(lexeme(intLit).tag('from_'),
(lexeme("to") >> (intLit | lexeme("max"))).optional().tag('to')
).combine_dict(Range)
ranges = range_.sep_by(lexeme(","), min=1)
# The spec for 'reserved' indicates 'fieldName' here, which is never a quoted string.
# But the example has a quoted string. We have changed it to 'strLit'
fieldNames = strLit.sep_by(lexeme(","), min=1)
reserved = seq(lexeme("reserved") >> (ranges | fieldNames) << SEMI
).combine(Reserved)
# Enum definition
enumValueOption = seq(optionName.tag('name') << EQ,
constant.tag('value')
).combine_dict(Option)
enumField = seq(ident.tag('name') << EQ,
lexeme(intLit).tag('value'),
(lexeme("[") >> enumValueOption.sep_by(lexeme(","), min=1) << lexeme("]")).optional()
.map(lambda o: [] if o is None else o).tag('options')
<< SEMI
).combine_dict(EnumField)
enumBody = (LBRACE >>
(option | enumField | emptyStatement).many().map(exclude_none)
<< RBRACE)
enum = seq(lexeme("enum") >> enumName.tag('name'),
enumBody.tag('body')
).combine_dict(Enum)
# Message definition
@generate
def message():
yield lexeme("message")
name = yield messageName
body = yield messageBody
return Message(name=name, body=body)
messageBody = (LBRACE >>
(field | enum | message | option | oneof | mapField |
reserved | emptyStatement).many()
<< RBRACE)
# Service definition
rpc = seq(lexeme("rpc") >> rpcName.tag('name'),
LPAREN >>
(is_present("stream").tag("request_stream")),
messageType.tag("request_message_type") << RPAREN,
lexeme("returns") >> LPAREN >>
(is_present("stream").tag("response_stream")),
messageType.tag("response_message_type")
<< RPAREN,
((LBRACE >>
(option | emptyStatement).many()
<< RBRACE)
| SEMI.result([])
).optional().map(exclude_none).tag('options')
).combine_dict(Rpc)
service = seq(lexeme("service") >> serviceName.tag('name'),
LBRACE >>
(option | rpc | emptyStatement).many().map(exclude_none).tag('body')
<< RBRACE
).combine_dict(Service)
# Proto file
topLevelDef = message | enum | service
proto = seq(syntax.tag('syntax'),
(import_ | package | option | topLevelDef | emptyStatement
).many().map(exclude_none).tag('statements')
).combine_dict(Proto)
EXAMPLE = """
syntax = "proto3";
import public "other.proto";
option java_package = "com.example.foo";
enum EnumAllowingAlias {
option allow_alias = true;
UNKNOWN = 0;
STARTED = 1;
RUNNING = 2 [(custom_option) = "hello world"];
}
message outer {
option (my_option).a = true;
message inner {
int64 ival = 1;
}
repeated inner inner_message = 2;
EnumAllowingAlias enum_field =3;
map<int32, string> my_map = 4;
}
"""
# Smoke test - should find 4 top level statements in the example:
assert len(proto.parse(EXAMPLE).statements) == 4