diff options
Diffstat (limited to 'pytest/sequencer.py')
-rw-r--r-- | pytest/sequencer.py | 653 |
1 files changed, 653 insertions, 0 deletions
diff --git a/pytest/sequencer.py b/pytest/sequencer.py new file mode 100644 index 000000000000..207d22986470 --- /dev/null +++ b/pytest/sequencer.py @@ -0,0 +1,653 @@ +#! /usr/bin/env python + +from __future__ import print_function + +#__all__ = ['EncDec', 'EncDecSimple', 'EncDecTyped', 'EncDecA', +# 'SequenceError', 'Sequencer'] + +import abc +import struct +import sys + +_ProtoStruct = { + '1': struct.Struct('<B'), + '2': struct.Struct('<H'), + '4': struct.Struct('<I'), + '8': struct.Struct('<Q'), + '_string_': None, # handled specially +} +for _i in (1, 2, 4, 8): + _ProtoStruct[_i] = _ProtoStruct[str(_i)] +del _i + +class EncDec(object): + __metaclass__ = abc.ABCMeta + """ + Base class for en/de-coders, which are put into sequencers. + + All have a name and arbitrary user-supplied auxiliary data + (default=None). + + All provide a pack() and unpack(). The pack() function + returns a "bytes" value. This is internally implemented as a + function apack() that returns a list of struct.pack() bytes, + and pack() just joins them up as needed. + + The pack/unpack functions take a dictionary of variable names + and values, and a second dictionary for conditionals, but at + this level conditionals don't apply: they are just being + passed through. Variable names do apply to array encoders + + EncDec also provide b2s() and s2b() static methods, which + convert strings to bytes and vice versa, as reversibly as + possible (using surrogateescape encoding). In Python2 this is + a no-op since the string type *is* the bytes type (<type + 'unicode'>) is the unicode-ized string type). + + EncDec also provides b2u() and u2b() to do conversion to/from + Unicode. + + These are partly for internal use (all strings get converted + to UTF-8 byte sequences when coding a _string_ type) and partly + for doctests, where we just want some py2k/py3k compat hacks. + """ + def __init__(self, name, aux): + self.name = name + self.aux = aux + + @staticmethod + def b2u(byte_sequence): + "transform bytes to unicode" + return byte_sequence.decode('utf-8', 'surrogateescape') + + @staticmethod + def u2b(unicode_sequence): + "transform unicode to bytes" + return unicode_sequence.encode('utf-8', 'surrogateescape') + + if sys.version_info[0] >= 3: + b2s = b2u + @staticmethod + def s2b(string): + "transform string to bytes (leaves raw byte sequence unchanged)" + if isinstance(string, bytes): + return string + return string.encode('utf-8', 'surrogateescape') + else: + @staticmethod + def b2s(byte_sequence): + "transform bytes to string - no-op in python2.7" + return byte_sequence + @staticmethod + def s2b(string): + "transform string or unicode to bytes" + if isinstance(string, unicode): + return string.encode('utf-8', 'surrogateescape') + return string + + def pack(self, vdict, cdict, val): + "encode value <val> into a byte-string" + return b''.join(self.apack(vdict, cdict, val)) + + @abc.abstractmethod + def apack(self, vdict, cdict, val): + "encode value <val> into [bytes1, b2, ..., bN]" + + @abc.abstractmethod + def unpack(self, vdict, cdict, bstring, offset, noerror=False): + "unpack bytes from <bstring> at <offset>" + + +class EncDecSimple(EncDec): + r""" + Encode/decode a simple (but named) field. The field is not an + array, which requires using EncDecA, nor a typed object + like a qid or stat instance -- those require a Sequence and + EncDecTyped. + + The format is one of '1'/1, '2'/2, '4'/4, '8'/8, or '_string_'. + + Note: using b2s here is purely a doctest/tetsmod python2/python3 + compat hack. The output of e.pack is <type 'bytes'>; b2s + converts it to a string, purely for display purposes. (It might + be better to map py2 output to bytes but they just print as a + string anyway.) In normal use, you should not call b2s here. + + >>> e = EncDecSimple('eggs', 2) + >>> e.b2s(e.pack({}, {}, 0)) + '\x00\x00' + >>> e.b2s(e.pack({}, {}, 256)) + '\x00\x01' + + Values that cannot be packed produce a SequenceError: + + >>> e.pack({}, {}, None) + Traceback (most recent call last): + ... + SequenceError: failed while packing 'eggs'=None + >>> e.pack({}, {}, -1) + Traceback (most recent call last): + ... + SequenceError: failed while packing 'eggs'=-1 + + Unpacking both returns a value, and tells how many bytes it + used out of the bytestring or byte-array argument. If there + are not enough bytes remaining at the starting offset, it + raises a SequenceError, unless noerror=True (then unset + values are None) + + >>> e.unpack({}, {}, b'\x00\x01', 0) + (256, 2) + >>> e.unpack({}, {}, b'', 0) + Traceback (most recent call last): + ... + SequenceError: out of data while unpacking 'eggs' + >>> e.unpack({}, {}, b'', 0, noerror=True) + (None, 2) + + Note that strings can be provided as regular strings, byte + strings (same as regular strings in py2k), or Unicode strings + (same as regular strings in py3k). Unicode strings will be + converted to UTF-8 before being packed. Since this leaves + 7-bit characters alone, these examples work in both py2k and + py3k. (Note: the UTF-8 encoding of u'\u1234' is + '\0xe1\0x88\0xb4' or 225, 136, 180. The b2i trick below is + another py2k vs py3k special case just for doctests: py2k + tries to display the utf-8 encoded data as a string.) + + >>> e = EncDecSimple('spam', '_string_') + >>> e.b2s(e.pack({}, {}, 'p3=unicode,p2=bytes')) + '\x13\x00p3=unicode,p2=bytes' + + >>> e.b2s(e.pack({}, {}, b'bytes')) + '\x05\x00bytes' + + >>> import sys + >>> ispy3k = sys.version_info[0] >= 3 + + >>> b2i = lambda x: x if ispy3k else ord(x) + >>> [b2i(x) for x in e.pack({}, {}, u'\u1234')] + [3, 0, 225, 136, 180] + + The byte length of the utf-8 data cannot exceed 65535 since + the encoding has the length as a 2-byte field (a la the + encoding for 'eggs' here). A too-long string produces + a SequenceError as well. + + >>> e.pack({}, {}, 16384 * 'spam') + Traceback (most recent call last): + ... + SequenceError: string too long (len=65536) while packing 'spam' + + Unpacking strings produces byte arrays. (Of course, + in py2k these are also known as <type 'str'>.) + + >>> unpacked = e.unpack({}, {}, b'\x04\x00data', 0) + >>> etype = bytes if ispy3k else str + >>> print(isinstance(unpacked[0], etype)) + True + >>> e.b2s(unpacked[0]) + 'data' + >>> unpacked[1] + 6 + + You may use e.b2s() to conver them to unicode strings in py3k, + or you may set e.autob2s. This still only really does + anything in py3k, since py2k strings *are* bytes, so it's + really just intended for doctest purposes (see EncDecA): + + >>> e.autob2s = True + >>> e.unpack({}, {}, b'\x07\x00stringy', 0) + ('stringy', 9) + """ + def __init__(self, name, fmt, aux=None): + super(EncDecSimple, self).__init__(name, aux) + self.fmt = fmt + self.struct = _ProtoStruct[fmt] + self.autob2s = False + + def __repr__(self): + if self.aux is None: + return '{0}({1!r}, {2!r})'.format(self.__class__.__name__, + self.name, self.fmt) + return '{0}({1!r}, {2!r}, {3!r})'.format(self.__class__.__name__, + self.name, self.fmt, self.aux) + + __str__ = __repr__ + + def apack(self, vdict, cdict, val): + "encode a value" + try: + if self.struct: + return [self.struct.pack(val)] + sval = self.s2b(val) + if len(sval) > 65535: + raise SequenceError('string too long (len={0:d}) ' + 'while packing {1!r}'.format(len(sval), self.name)) + return [EncDecSimple.string_len.pack(len(sval)), sval] + # Include AttributeError in case someone tries to, e.g., + # pack name=None and self.s2b() tries to use .encode on it. + except (struct.error, AttributeError): + raise SequenceError('failed ' + 'while packing {0!r}={1!r}'.format(self.name, val)) + + def _unpack1(self, via, bstring, offset, noerror): + "internal function to unpack single item" + try: + tup = via.unpack_from(bstring, offset) + except struct.error as err: + if 'unpack_from requires a buffer of at least' in str(err): + if noerror: + return None, offset + via.size + raise SequenceError('out of data ' + 'while unpacking {0!r}'.format(self.name)) + # not clear what to do here if noerror + raise SequenceError('failed ' + 'while unpacking {0!r}'.format(self.name)) + assert len(tup) == 1 + return tup[0], offset + via.size + + def unpack(self, vdict, cdict, bstring, offset, noerror=False): + "decode a value; return the value and the new offset" + if self.struct: + return self._unpack1(self.struct, bstring, offset, noerror) + slen, offset = self._unpack1(EncDecSimple.string_len, bstring, offset, + noerror) + if slen is None: + return None, offset + nexto = offset + slen + if len(bstring) < nexto: + if noerror: + val = None + else: + raise SequenceError('out of data ' + 'while unpacking {0!r}'.format(self.name)) + else: + val = bstring[offset:nexto] + if self.autob2s: + val = self.b2s(val) + return val, nexto + +# string length: 2 byte unsigned field +EncDecSimple.string_len = _ProtoStruct[2] + +class EncDecTyped(EncDec): + r""" + EncDec for typed objects (which are build from PFODs, which are + a sneaky class variant of OrderedDict similar to namedtuple). + + Calling the klass() function with no arguments must create an + instance with all-None members. + + We also require a Sequencer to pack and unpack the members of + the underlying pfod. + + >>> qid_s = Sequencer('qid') + >>> qid_s.append_encdec(None, EncDecSimple('type', 1)) + >>> qid_s.append_encdec(None, EncDecSimple('version', 4)) + >>> qid_s.append_encdec(None, EncDecSimple('path', 8)) + >>> len(qid_s) + 3 + + >>> from pfod import pfod + >>> qid = pfod('qid', ['type', 'version', 'path']) + >>> len(qid._fields) + 3 + >>> qid_inst = qid(1, 2, 3) + >>> qid_inst + qid(type=1, version=2, path=3) + + >>> e = EncDecTyped(qid, 'aqid', qid_s) + >>> e.b2s(e.pack({}, {}, qid_inst)) + '\x01\x02\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00' + >>> e.unpack({}, {}, + ... b'\x01\x02\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00', 0) + (qid(type=1, version=2, path=3), 13) + + If an EncDecTyped instance has a conditional sequencer, note + that unpacking will leave un-selected items set to None (see + the Sequencer example below): + + >>> breakfast = pfod('breakfast', 'eggs spam ham') + >>> breakfast() + breakfast(eggs=None, spam=None, ham=None) + >>> bfseq = Sequencer('breakfast') + >>> bfseq.append_encdec(None, EncDecSimple('eggs', 1)) + >>> bfseq.append_encdec('yuck', EncDecSimple('spam', 1)) + >>> bfseq.append_encdec(None, EncDecSimple('ham', 1)) + >>> e = EncDecTyped(breakfast, 'bfname', bfseq) + >>> e.unpack({}, {'yuck': False}, b'\x02\x01\x04', 0) + (breakfast(eggs=2, spam=None, ham=1), 2) + + This used just two of the three bytes: eggs=2, ham=1. + + >>> e.unpack({}, {'yuck': True}, b'\x02\x01\x04', 0) + (breakfast(eggs=2, spam=1, ham=4), 3) + + This used the third byte, so ham=4. + """ + def __init__(self, klass, name, sequence, aux=None): + assert len(sequence) == len(klass()._fields) # temporary + super(EncDecTyped, self).__init__(name, aux) + self.klass = klass + self.name = name + self.sequence = sequence + + def __repr__(self): + if self.aux is None: + return '{0}({1!r}, {2!r}, {3!r})'.format(self.__class__.__name__, + self.klass, self.name, self.sequence) + return '{0}({1!r}, {2!r}, {3!r}, {4!r})'.format(self.__class__.__name__, + self.klass, self.name, self.sequence, self.aux) + + __str__ = __repr__ + + def apack(self, vdict, cdict, val): + """ + Pack each of our instance variables. + + Note that some packing may be conditional. + """ + return self.sequence.apack(val, cdict) + + def unpack(self, vdict, cdict, bstring, offset, noerror=False): + """ + Unpack each instance variable, into a new object of + self.klass. Return the new instance and new offset. + + Note that some unpacking may be conditional. + """ + obj = self.klass() + offset = self.sequence.unpack_from(obj, cdict, bstring, offset, noerror) + return obj, offset + +class EncDecA(EncDec): + r""" + EncDec for arrays (repeated objects). + + We take the name of repeat count variable, and a sub-coder + (Sequencer instance). For instance, we can en/de-code + repeat='nwname' copies of name='wname', or nwname of + name='wqid', in a Twalk en/de-code. + + Note that we don't pack or unpack the repeat count itself -- + that must be done by higher level code. We just get its value + from vdict. + + >>> subcode = EncDecSimple('wname', '_string_') + >>> e = EncDecA('nwname', 'wname', subcode) + >>> e.b2s(e.pack({'nwname': 2}, {}, ['A', 'BC'])) + '\x01\x00A\x02\x00BC' + + >>> subcode.autob2s = True # so that A and BC decode to py3k str + >>> e.unpack({'nwname': 2}, {}, b'\x01\x00A\x02\x00BC', 0) + (['A', 'BC'], 7) + + When using noerror, the first sub-item that fails to decode + completely starts the None-s. Strings whose length fails to + decode are assumed to be zero bytes long as well, for the + purpose of showing the expected packet length: + + >>> e.unpack({'nwname': 2}, {}, b'\x01\x00A\x02\x00', 0, noerror=True) + (['A', None], 7) + >>> e.unpack({'nwname': 2}, {}, b'\x01\x00A\x02', 0, noerror=True) + (['A', None], 5) + >>> e.unpack({'nwname': 3}, {}, b'\x01\x00A\x02', 0, noerror=True) + (['A', None, None], 7) + + As a special case, supplying None for the sub-coder + makes the repeated item pack or unpack a simple byte + string. (Note that autob2s is not supported here.) + A too-short byte string is simply truncated! + + >>> e = EncDecA('count', 'data', None) + >>> e.b2s(e.pack({'count': 5}, {}, b'12345')) + '12345' + >>> x = list(e.unpack({'count': 3}, {}, b'123', 0)) + >>> x[0] = e.b2s(x[0]) + >>> x + ['123', 3] + >>> x = list(e.unpack({'count': 3}, {}, b'12', 0, noerror=True)) + >>> x[0] = e.b2s(x[0]) + >>> x + ['12', 3] + """ + def __init__(self, repeat, name, sub, aux=None): + super(EncDecA, self).__init__(name, aux) + self.repeat = repeat + self.name = name + self.sub = sub + + def __repr__(self): + if self.aux is None: + return '{0}({1!r}, {2!r}, {3!r})'.format(self.__class__.__name__, + self.repeat, self.name, self.sub) + return '{0}({1!r}, {2!r}, {3!r}, {4!r})'.format(self.__class__.__name__, + self.repeat, self.name, self.sub, self.aux) + + __str__ = __repr__ + + def apack(self, vdict, cdict, val): + "pack each val[i], for i in range(vdict[self.repeat])" + num = vdict[self.repeat] + assert num == len(val) + if self.sub is None: + assert isinstance(val, bytes) + return [val] + parts = [] + for i in val: + parts.extend(self.sub.apack(vdict, cdict, i)) + return parts + + def unpack(self, vdict, cdict, bstring, offset, noerror=False): + "unpack repeatedly, per self.repeat, into new array." + num = vdict[self.repeat] + if num is None and noerror: + num = 0 + else: + assert num >= 0 + if self.sub is None: + nexto = offset + num + if len(bstring) < nexto and not noerror: + raise SequenceError('out of data ' + 'while unpacking {0!r}'.format(self.name)) + return bstring[offset:nexto], nexto + array = [] + for i in range(num): + obj, offset = self.sub.unpack(vdict, cdict, bstring, offset, + noerror) + array.append(obj) + return array, offset + +class SequenceError(Exception): + "sequence error: item too big, or ran out of data" + pass + +class Sequencer(object): + r""" + A sequencer is an object that packs (marshals) or unpacks + (unmarshals) a series of objects, according to their EncDec + instances. + + The objects themselves (and their values) come from, or + go into, a dictionary: <vdict>, the first argument to + pack/unpack. + + Some fields may be conditional. The conditions are in a + separate dictionary (the second or <cdict> argument). + + Some objects may be dictionaries or PFODs, e.g., they may + be a Plan9 qid or stat structure. These have their own + sub-encoding. + + As with each encoder, we have both an apack() function + (returns a list of parts) and a plain pack(). Users should + mostly stick with plain pack(). + + >>> s = Sequencer('monty') + >>> s + Sequencer('monty') + >>> e = EncDecSimple('eggs', 2) + >>> s.append_encdec(None, e) + >>> s.append_encdec(None, EncDecSimple('spam', 1)) + >>> s[0] + (None, EncDecSimple('eggs', 2)) + >>> e.b2s(s.pack({'eggs': 513, 'spam': 65}, {})) + '\x01\x02A' + + When particular fields are conditional, they appear in + packed output, or are taken from the byte-string during + unpacking, only if their condition is true. + + As with struct, use unpack_from to start at an arbitrary + offset and/or omit verification that the entire byte-string + is consumed. + + >>> s = Sequencer('python') + >>> s.append_encdec(None, e) + >>> s.append_encdec('.u', EncDecSimple('spam', 1)) + >>> s[1] + ('.u', EncDecSimple('spam', 1)) + >>> e.b2s(s.pack({'eggs': 513, 'spam': 65}, {'.u': True})) + '\x01\x02A' + >>> e.b2s(s.pack({'eggs': 513, 'spam': 65}, {'.u': False})) + '\x01\x02' + + >>> d = {} + >>> s.unpack(d, {'.u': True}, b'\x01\x02A') + >>> print(d['eggs'], d['spam']) + 513 65 + >>> d = {} + >>> s.unpack(d, {'.u': False}, b'\x01\x02A', 0) + Traceback (most recent call last): + ... + SequenceError: 1 byte(s) unconsumed + >>> s.unpack_from(d, {'.u': False}, b'\x01\x02A', 0) + 2 + >>> print(d) + {'eggs': 513} + + The incoming dictionary-like object may be pre-initialized + if you like; only sequences that decode are filled-in: + + >>> d = {'eggs': None, 'spam': None} + >>> s.unpack_from(d, {'.u': False}, b'\x01\x02A', 0) + 2 + >>> print(d['eggs'], d['spam']) + 513 None + + Some objects may be arrays; if so their EncDec is actually + an EncDecA, the repeat count must be in the dictionary, and + the object itself must have a len() and be index-able: + + >>> s = Sequencer('arr') + >>> s.append_encdec(None, EncDecSimple('n', 1)) + >>> ae = EncDecSimple('array', 2) + >>> s.append_encdec(None, EncDecA('n', 'array', ae)) + >>> ae.b2s(s.pack({'n': 2, 'array': [257, 514]}, {})) + '\x02\x01\x01\x02\x02' + + Unpacking an array creates a list of the number of items. + The EncDec encoder that decodes the number of items needs to + occur first in the sequencer, so that the dictionary will have + acquired the repeat-count variable's value by the time we hit + the array's encdec: + + >>> d = {} + >>> s.unpack(d, {}, b'\x01\x04\x00') + >>> d['n'], d['array'] + (1, [4]) + """ + def __init__(self, name): + self.name = name + self._codes = [] + self.debug = False # or sys.stderr + + def __repr__(self): + return '{0}({1!r})'.format(self.__class__.__name__, self.name) + + __str__ = __repr__ + + def __len__(self): + return len(self._codes) + + def __iter__(self): + return iter(self._codes) + + def __getitem__(self, index): + return self._codes[index] + + def dprint(self, *args, **kwargs): + if not self.debug: + return + if isinstance(self.debug, bool): + dest = sys.stdout + else: + dest = self.debug + print(*args, file=dest, **kwargs) + + def append_encdec(self, cond, code): + "add EncDec en/de-coder, conditional on cond" + self._codes.append((cond, code)) + + def apack(self, vdict, cdict): + """ + Produce packed representation of each field. + """ + packed_data = [] + for cond, code in self._codes: + # Skip this item if it's conditional on a false thing. + if cond is not None and not cdict[cond]: + self.dprint('skip %r - %r is False' % (code, cond)) + continue + + # Pack the item. + self.dprint('pack %r - no cond or %r is True' % (code, cond)) + packed_data.extend(code.apack(vdict, cdict, vdict[code.name])) + + return packed_data + + def pack(self, vdict, cdict): + """ + Flatten packed data. + """ + return b''.join(self.apack(vdict, cdict)) + + def unpack_from(self, vdict, cdict, bstring, offset=0, noerror=False): + """ + Unpack from byte string. + + The values are unpacked into a dictionary vdict; + some of its entries may themselves be ordered + dictionaries created by typedefed codes. + + Raises SequenceError if the string is too short, + unless you set noerror, in which case we assume + you want see what you can get out of the data. + """ + for cond, code in self._codes: + # Skip this item if it's conditional on a false thing. + if cond is not None and not cdict[cond]: + self.dprint('skip %r - %r is False' % (code, cond)) + continue + + # Unpack the item. + self.dprint('unpack %r - no cond or %r is True' % (code, cond)) + obj, offset = code.unpack(vdict, cdict, bstring, offset, noerror) + vdict[code.name] = obj + + return offset + + def unpack(self, vdict, cdict, bstring, noerror=False): + """ + Like unpack_from but unless noerror=True, requires that + we completely use up the given byte string. + """ + offset = self.unpack_from(vdict, cdict, bstring, 0, noerror) + if not noerror and offset != len(bstring): + raise SequenceError('{0} byte(s) unconsumed'.format( + len(bstring) - offset)) + +if __name__ == '__main__': + import doctest + doctest.testmod() |