Other examples

This section has some further example parsers that you can study. There are also examples in the Tutorial and in Generating a parser.

SQL SELECT statement parser

This shows a very simplified parser for a SQL SELECT statement, using custom data structures, and the convenient keyword argument syntax for seq() (usable with Python 3.6 and later), followed by Parser.combine_dict().

# A very limited parser for SQL SELECT statements,
# for demo purposes. Supports:
# 1. A simple list of columns (or number/string literals)
# 2. A simple table name
# 3. An optional where condition,
#    which has the form of 'A op B' where A and B are columns, strings or number,
#    and op is a comparison operator
#
# We demonstrate the use of `map` to create AST nodes with a single arg,
# and `seq` for AST nodes with more than one arg.


import attr

from parsy import regex, seq, string, string_from

# -- AST nodes.


@attr.s
class Number:
    value = attr.ib()


@attr.s
class String:
    value = attr.ib()


@attr.s
class Field:
    name = attr.ib()


@attr.s
class Table:
    name = attr.ib()


@attr.s
class Comparison:
    left = attr.ib()
    operator = attr.ib()
    right = attr.ib()


@attr.s
class Select:
    columns = attr.ib()
    table = attr.ib()
    where = attr.ib()


# -- Parsers

number_literal = regex(r'-?[0-9]+').map(int).map(Number)

# We don't support ' in strings or escaping for simplicity
string_literal = regex(r"'[^']*'").map(lambda s: String(s[1:-1]))

identifier = regex('[a-zA-Z][a-zA-Z0-9_]*')

field = identifier.map(Field)

table = identifier.map(Table)

space = regex(r'\s+')  # non-optional whitespace
padding = regex(r'\s*')  # optional whitespace

column_expr = field | string_literal | number_literal

operator = string_from('=', '<', '>', '<=', '>=')

comparison = seq(
    left=column_expr << padding,
    operator=operator,
    right=padding >> column_expr,
).combine_dict(Comparison)

SELECT = string('SELECT')
FROM = string('FROM')
WHERE = string('WHERE')

# Here we demonstrate use of leading underscore to discard parts we don't want,
# which is more readable and convenient than `<<` and `>>` sometimes.
select = seq(
    _select=SELECT + space,
    columns=column_expr.sep_by(padding + string(',') + padding, min=1),
    _from=space + FROM + space,
    table=table,
    where=(space >> WHERE >> space >> comparison).optional(),
    _end=padding + string(';')
).combine_dict(Select)


def test_select():
    assert select.parse(
        "SELECT thing, stuff, 123, 'hello' FROM my_table WHERE id = 1;"
    ) == Select(
        columns=[Field("thing"), Field("stuff"), Number(123), String("hello")],
        table=Table("my_table"),
        where=Comparison(
            left=Field("id"),
            operator="=",
            right=Number(1)
        )
    )


def test_optional_where():
    assert select.parse(
        "SELECT 1 FROM x;"
    ) == Select(
        columns=[Number(1)],
        table=Table("x"),
        where=None,
    )

JSON parser

A full parser for JSON. (This will not be competitive in terms of performance with other implementations!)

from sys import stdin

from parsy import generate, regex, string

whitespace = regex(r'\s*')
lexeme = lambda p: p << whitespace
lbrace = lexeme(string('{'))
rbrace = lexeme(string('}'))
lbrack = lexeme(string('['))
rbrack = lexeme(string(']'))
colon  = lexeme(string(':'))
comma  = lexeme(string(','))
true   = lexeme(string('true')).result(True)
false  = lexeme(string('false')).result(False)
null   = lexeme(string('null')).result(None)
number = lexeme(
    regex(r'-?(0|[1-9][0-9]*)([.][0-9]+)?([eE][+-]?[0-9]+)?')
).map(float)
string_part = regex(r'[^"\\]+')
string_esc = string('\\') >> (
    string('\\')
    | string('/')
    | string('"')
    | string('b').result('\b')
    | string('f').result('\f')
    | string('n').result('\n')
    | string('r').result('\r')
    | string('t').result('\t')
    | regex(r'u[0-9a-fA-F]{4}').map(lambda s: chr(int(s[1:], 16)))
)
quoted = lexeme(string('"') >> (string_part | string_esc).many().concat() << string('"'))


# Circular dependency between array and value means we use `generate` form here
@generate
def array():
    yield lbrack
    elements = yield value.sep_by(comma)
    yield rbrack
    return elements


@generate
def object_pair():
    key = yield quoted
    yield colon
    val = yield value
    return (key, val)


json_object = lbrace >> object_pair.sep_by(comma).map(dict) << rbrace
value = quoted | number | json_object | array | true | false | null
json = whitespace >> value

if __name__ == '__main__':
    print(repr(json.parse(stdin.read())))

.proto file parser

A parser for the .proto files for Protocol Buffers, version 3.

This example is useful in showing lots of simple custom data structures for holding the result of the parse. It uses the Parser.tag() method for labelling parts, followed by Parser.combine_dict().

# -*- coding: utf-8 -*-

# Parser for protocol buffer .proto files
import enum as stdlib_enum
from string import ascii_letters, digits, hexdigits, octdigits

import attr

from parsy import char_from, from_enum, generate, regex, seq, string

# This file follows the spec at
# https://developers.google.com/protocol-buffers/docs/reference/proto3-spec
# very closely.

# However, because we are parsing into useful objects, we do transformations
# along the way e.g. turning into integers, strings etc. and custom objects.
# Some of the lowest level items have been implemented using 'regex' and converting
# the descriptions to regular expressions. Higher level constructs have been
# implemented using other parsy primitives and combinators.

# Notes:

# 1. Whitespace is very badly defined in the 'spec', so we guess what is meant.
# 2. The spec doesn't allow for comments, and neither does this parser.
#    Other places mention that C++ style comments are allowed. To support that,
#    this parser would need to be changed into split lexing/parsing stages
#    (otherwise you hit issues with comments start markers within string literals).
# 3. Other notes inline.


# Our utilities
optional_string = lambda s: string(s).times(0, 1).concat()
convert_decimal = int
convert_octal = lambda s: int(s, 8)
convert_hex = lambda s: int(s, 16)
exclude_none = lambda l: [i for i in l if i is not None]


def lexeme(p):
    """
    From a parser (or string), make a parser that consumes
    whitespace on either side.
    """
    if isinstance(p, str):
        p = string(p)
    return regex(r'\s*') >> p << regex(r'\s*')


def is_present(p):
    """
    Given a parser or string, make a parser that returns
    True if the parser matches, False otherwise
    """
    return lexeme(p).optional().map(lambda v: False if v is None else True)


# Our data structures
@attr.s
class Import:
    identifier = attr.ib()
    option = attr.ib()


@attr.s
class Package:
    identifer = attr.ib()


@attr.s
class Option:
    name = attr.ib()
    value = attr.ib()


@attr.s
class Field:
    repeated = attr.ib()
    type = attr.ib()
    name = attr.ib()
    number = attr.ib()
    options = attr.ib()


@attr.s
class OneOfField:
    type = attr.ib()
    name = attr.ib()
    number = attr.ib()
    options = attr.ib()


@attr.s
class OneOf:
    name = attr.ib()
    fields = attr.ib()


@attr.s
class Map:
    key_type = attr.ib()
    type = attr.ib()
    name = attr.ib()
    number = attr.ib()
    options = attr.ib()


@attr.s
class Reserved:
    items = attr.ib()


@attr.s
class Range:
    from_ = attr.ib()
    to = attr.ib()


@attr.s
class EnumField:
    name = attr.ib()
    value = attr.ib()
    options = attr.ib()


@attr.s
class Enum:
    name = attr.ib()
    body = attr.ib()


@attr.s
class Message:
    name = attr.ib()
    body = attr.ib()


@attr.s
class Service:
    name = attr.ib()
    body = attr.ib()


@attr.s
class Rpc:
    name = attr.ib()
    request_stream = attr.ib()
    request_message_type = attr.ib()
    response_stream = attr.ib()
    response_message_type = attr.ib()
    options = attr.ib()


@attr.s
class Proto:
    syntax = attr.ib()
    statements = attr.ib()


# Enums:
class ImportOption(stdlib_enum.Enum):
    WEAK = "weak"
    PUBLIC = "public"


class Type(stdlib_enum.Enum):
    DOUBLE = "double"
    FLOAT = "float"
    INT32 = "int32"
    INT64 = "int64"
    UINT32 = "uint32"
    UINT64 = "uint64"
    SINT32 = "sint32"
    SINT64 = "sint64"
    FIXED32 = "fixed32"
    FIXED64 = "fixed64"
    SFIXED32 = "sfixed32"
    SFIXED64 = "sfixed64"
    BOOL = "bool"
    STRING = "string"
    BYTES = "bytes"


class KeyType(stdlib_enum.Enum):
    INT32 = "int32"
    INT64 = "int64"
    UINT32 = "uint32"
    UINT64 = "uint64"
    SINT32 = "sint32"
    SINT64 = "sint64"
    FIXED32 = "fixed32"
    FIXED64 = "fixed64"
    SFIXED32 = "sfixed32"
    SFIXED64 = "sfixed64"
    BOOL = "bool"
    STRING = "string"


# Some extra constants to avoid typing
SEMI = lexeme(";")
EQ = lexeme("=")
LPAREN = lexeme("(")
RPAREN = lexeme(")")
LBRACE = lexeme("{")
RBRACE = lexeme("}")

# -- Beginning of following spec --
# Letters and digits
letter = char_from(ascii_letters)
decimalDigit = char_from(digits)
octalDigit = char_from(octdigits)
hexDigit = char_from(hexdigits)

# Identifiers

# Compared to spec, we add some '_' prefixed items which are not wrapped in `lexeme`,
# on the assumption that spaces in the middle of identifiers are not accepted.
_ident = (letter + (letter | decimalDigit | string("_")).many().concat()).desc('ident')
ident = lexeme(_ident)
fullIdent = lexeme(ident + (string(".") + ident).many().concat()).desc('fullIdent')
_messageName = _ident
messageName = lexeme(ident).desc('messageName')
_enumName = ident
enumName = lexeme(_enumName).desc('enumName')
fieldName = ident.desc('fieldName')
oneofName = ident.desc('oneofName')
mapName = ident.desc('mapName')
serviceName = ident.desc('serviceName')
rpcName = ident.desc('rpcName')
messageType = optional_string(".") + (_ident + string(".")).many().concat() + _messageName
enumType = optional_string(".") + (_ident + string(".")).many().concat() + _enumName

# Integer literals
decimalLit = regex("[1-9][0-9]*").desc('decimalLit').map(convert_decimal)
octalLit   = regex("0[0-7]*").desc('octalLit').map(convert_octal)
hexLit     = regex("0[x|X][0-9a-fA-F]+").desc('octalLit').map(convert_hex)
intLit     = decimalLit | octalLit | hexLit


# Floating-point literals
decimals = r'[0-9]+'
exponent  = r'[e|E][+|-]?' + decimals
floatLit = regex(r'({decimals}\.({decimals})?({exponent})?)|{decimals}{exponent}|\.{decimals}({exponent})?'
                 .format(decimals=decimals, exponent=exponent)).desc('floatLit').map(float)


# Boolean
boolLit = (string("true").result(True) | string("false").result(False)).desc('boolLit')


# String literals
hexEscape = regex(r"\\[x|X]") >> regex("[0-9a-fA-F]{2}").map(convert_hex).map(chr)
octEscape = regex(r"\\") >> regex('[0-7]{2}').map(convert_octal).map(chr)
charEscape = regex(r"\\") >> (
    string("a").result("\a")
    | string("b").result("\b")
    | string("f").result("\f")
    | string("n").result("\n")
    | string("r").result("\r")
    | string("t").result("\t")
    | string("v").result("\v")
    | string("\\").result("\\")
    | string("'").result("'")
    | string('"').result('"')
)
escapes = hexEscape | octEscape | charEscape
# Correction to spec regarding " and ' inside quoted strings
strLit = (string("'") >> (escapes | regex(r"[^\0\n\'\\]")).many().concat() << string("'")
          | string('"') >> (escapes | regex(r"[^\0\n\"\\]")).many().concat() << string('"')).desc('strLit')
quote = string("'") | string('"')

# EmptyStatement
emptyStatement = string(";").result(None)

# Signed numbers:
# (Extra compared to spec, to cope with need to produce signed numeric values)
signedNumberChange = lambda s, num: (-1) if s == "-" else (+1)
sign = regex("[-+]?")
signedIntLit = seq(sign, intLit).combine(signedNumberChange)
signedFloatLit = seq(sign, floatLit).combine(signedNumberChange)


# Constant
# put fullIdent at end to disabmiguate from boolLit
constant = signedIntLit | signedFloatLit | strLit | boolLit | fullIdent

# Syntax
syntax = lexeme("syntax") >> EQ >> quote >> string("proto3") << quote + SEMI

# Import Statement
import_option = from_enum(ImportOption)

import_ = seq(lexeme("import") >> import_option.optional().tag('option'),
              lexeme(strLit).tag('identifier') << SEMI).combine_dict(Import)

# Package
package = seq(lexeme("package") >> fullIdent << SEMI).map(Package)

# Option
optionName = (ident | (LPAREN >> fullIdent << RPAREN)) + (string(".") + ident).many().concat()
option = seq(lexeme("option") >> optionName.tag('name'),
             EQ >> constant.tag('value') << SEMI,
             ).combine_dict(Option)

# Normal field
type_ = lexeme(from_enum(Type) | messageType | enumType)
fieldNumber = lexeme(intLit)

fieldOption = seq(optionName.tag('name'),
                  EQ >> constant.tag('value')).combine_dict(Option)
fieldOptions = fieldOption.sep_by(lexeme(","), min=1)
fieldOptionList = (lexeme("[") >> fieldOptions << lexeme("]")).optional().map(
    lambda o: [] if o is None else o)

field = seq(is_present("repeated").tag('repeated'),
            type_.tag('type'),
            fieldName.tag('name') << EQ,
            fieldNumber.tag('number'),
            fieldOptionList.tag('options') << SEMI,
            ).combine_dict(Field)


# Oneof and oneof field
oneofField = seq(type_.tag('type'),
                 fieldName.tag('name') << EQ,
                 fieldNumber.tag('number'),
                 fieldOptionList.tag('options') << SEMI,
                 ).combine_dict(OneOfField)
oneof = seq(lexeme("oneof") >> oneofName.tag('name'),
            LBRACE
            >> (oneofField | emptyStatement).many().map(exclude_none).tag('fields')
            << RBRACE
            ).combine_dict(OneOf)

# Map field
keyType = lexeme(from_enum(KeyType))
mapField = seq(lexeme("map") >> lexeme("<") >> keyType.tag('key_type'),
               lexeme(",") >> type_.tag('type'),
               lexeme(">") >> mapName.tag('name'),
               EQ >> fieldNumber.tag('number'),
               fieldOptionList.tag('options') << SEMI
               ).combine_dict(Map)

# Reserved
range_ = seq(lexeme(intLit).tag('from_'),
             (lexeme("to") >> (intLit | lexeme("max"))).optional().tag('to')
             ).combine_dict(Range)
ranges = range_.sep_by(lexeme(","), min=1)
# The spec for 'reserved' indicates 'fieldName' here, which is never a quoted string.
# But the example has a quoted string. We have changed it to 'strLit'
fieldNames = strLit.sep_by(lexeme(","), min=1)
reserved = seq(lexeme("reserved") >> (ranges | fieldNames) << SEMI
               ).combine(Reserved)

# Enum definition
enumValueOption = seq(optionName.tag('name') << EQ,
                      constant.tag('value')
                      ).combine_dict(Option)
enumField = seq(ident.tag('name') << EQ,
                lexeme(intLit).tag('value'),
                (lexeme("[") >> enumValueOption.sep_by(lexeme(","), min=1) << lexeme("]")).optional()
                .map(lambda o: [] if o is None else o).tag('options')
                << SEMI
                ).combine_dict(EnumField)
enumBody = (LBRACE
            >> (option | enumField | emptyStatement).many().map(exclude_none)
            << RBRACE)
enum = seq(lexeme("enum") >> enumName.tag('name'),
           enumBody.tag('body')
           ).combine_dict(Enum)


# Message definition
@generate
def message():
    yield lexeme("message")
    name = yield messageName
    body = yield messageBody
    return Message(name=name, body=body)


messageBody = (LBRACE
               >> (field | enum | message | option | oneof | mapField
                   | reserved | emptyStatement).many()
               << RBRACE)


# Service definition
rpc = seq(lexeme("rpc") >> rpcName.tag('name'),
          LPAREN
          >> (is_present("stream").tag("request_stream")),
          messageType.tag("request_message_type") << RPAREN,
          lexeme("returns") >> LPAREN
          >> (is_present("stream").tag("response_stream")),
          messageType.tag("response_message_type")
          << RPAREN,
          ((LBRACE
           >> (option | emptyStatement).many()
           << RBRACE)
           | SEMI.result([])
           ).optional().map(exclude_none).tag('options')
          ).combine_dict(Rpc)

service = seq(lexeme("service") >> serviceName.tag('name'),
              LBRACE
              >> (option | rpc | emptyStatement).many().map(exclude_none).tag('body')
              << RBRACE
              ).combine_dict(Service)


# Proto file
topLevelDef = message | enum | service
proto = seq(syntax.tag('syntax'),
            (import_ | package | option | topLevelDef | emptyStatement
             ).many().map(exclude_none).tag('statements')
            ).combine_dict(Proto)


EXAMPLE = """
syntax = "proto3";
import public "other.proto";
option java_package = "com.example.foo";
enum EnumAllowingAlias {
  option allow_alias = true;
  UNKNOWN = 0;
  STARTED = 1;
  RUNNING = 2 [(custom_option) = "hello world"];
}
message outer {
  option (my_option).a = true;
  message inner {
    int64 ival = 1;
  }
  repeated inner inner_message = 2;
  EnumAllowingAlias enum_field =3;
  map<int32, string> my_map = 4;
}
"""
# Smoke test - should find 4 top level statements in the example:
assert len(proto.parse(EXAMPLE).statements) == 4