import sys
import re
from datetime import datetime
from datetime import date
from functools import partial
from itertools import count
[docs]def compose (*ff):
ff = [f for f in ff if f is not None]
ff.reverse ()
def c (x):
for f in ff:
x = f (x)
return x
return c
[docs]class Location:
def __init__ (self, parent=None, row=None, col=None):
self._parent = parent
self._row = row
self._col = col
def __str__ (self):
if self._parent is None:
result = "[unknown]"
else:
result = str (self._parent)
if self._row is not None:
result += ": Row %i" % self._row
if self._col is not None:
result += ", Column %i" % self._col
return result
[docs] def row (self, row):
return Location (self._parent, row)
[docs] def col (self, col):
return Location (self._parent, self._row, col)
[docs]class ParseError (Exception):
def __init__ (self, value):
self.value = value
def __str__ (self):
return repr(self.value)
[docs]class BadLineError (ParseError):
pass
[docs]class SyntaxError (Exception):
def __init__ (self, msg, loc):
self.msg = msg
self.loc = loc
def __str__ (self):
return "SyntaxError: %s at %s" % (self.msg, self.loc)
[docs]class BadFieldError (SyntaxError):
def __init__ (self, loc, field):
super (BadFieldError, self).__init__ ("Bad field contents (%s)" % field, loc)
[docs]class BadFieldCountError (SyntaxError):
def __init__ (self, loc):
super (BadFieldCountError, self).__init__ ("Bad field count", loc)
[docs]def removeTrailingNewline (line):
r"""Remove the trailing \n from a line."""
lineList = line.split ('\n')
if len (lineList) == 1 or len (lineList) == 2 and lineList[1] == '':
return lineList[0]
else:
raise BadLineError (line)
[docs]def separateString (sep, string):
r"""Split up a string according to the specified separator.
The separator is either a string or a compiled regular expression.
The empty string is interpreted as an empty list.
"""
if string == '':
return []
else:
if isinstance (sep, str):
return string.split (sep)
else:
return re.split (sep, string)
[docs]def readLines (infile):
r"""Read an input stream as a sequence of lines."""
return (removeTrailingNewline(line) for line in infile)
[docs]def readRegex (regex, n=None):
if isinstance (regex, str):
regex = re.compile (regex)
def result (loc, field):
match = re.match (regex, field)
if match:
if n is None:
return match.group (1)
else:
return [match.group (i) for i in range (1, n + 1)]
else:
raise BadFieldError (loc, field)
return result
[docs]def checkString (string=""):
return readRegex ('^' + re.escape (string) + '$', 0)
checkBlank = checkString ()
ignore = readRegex ('.?', 0)
[docs]def makeReadString (encoding):
def result (loc, field):
return field.decode (encoding)
return result
readString = makeReadString ('iso-8859-1')
readUnicode = makeReadString ('utf-8')
[docs]def readInteger (loc, field):
return int (field)
[docs]def readFloat (loc, field):
return float (field)
[docs]def readBoolean (true='T', false='F'):
if not isinstance (true, set):
true = set ([true])
if not isinstance (false, set):
false = set ([false])
if true & false:
raise ValueError('Non-disjoint boolean text representation')
def result (loc, field):
if (field in true):
return True
elif (field in false):
return False
else:
raise BadFieldError (loc, field)
return result
[docs]def readDateTime (format):
def result (loc, field):
if field == '':
return None
else:
return datetime.strptime (field, format)
return result
[docs]def readDate (format):
def result (loc, field):
if field == '':
return None
else:
return datetime.strptime (field, format).date ()
return result
readISODate = readDate ("%Y-%m-%d")
readISOPackedDate = readDate ("%Y%m%d")
[docs]def readTime (format):
def result (loc, field):
if field == '':
return None
else:
return datetime.strptime (field, format).time ()
return result
readISOMinuteTime = readTime ("%H:%M")
readISOFullTime = readTime ("%H:%M:%S")
[docs]def readMultiple (splitField, fieldParser):
def result (loc, field):
loc = Location (loc)
return list (map (fieldParser,
(loc.row (row) for row in count ()), splitField (field)))
return result
[docs]def parseLine (fields, loc, lineList):
if len (lineList) != len (fields):
raise BadFieldCountError (loc)
result = {}
for col in range (len (fields)):
(fieldNames, parser) = fields[col]
values = parser (loc.col (col), lineList[col])
if isinstance (fieldNames, str):
result[fieldNames] = values
else:
result.update (list(zip(fieldNames, values)))
return result
[docs]def parseLines (splitLine, fieldSpec, loc, lines, skippedLines=0):
return map (partial (parseLine, fieldSpec),
(loc.row (row) for row in count (skippedLines)),
(splitLine (line) for line in lines))