Source code for uw.parse.delimited

import sys
import re
from datetime import datetime
from datetime import date
from functools import partial
from itertools import count

[docs]def compose (*ff): ff = [f for f in ff if f is not None] ff.reverse () def c (x): for f in ff: x = f (x) return x return c
[docs]class Location: def __init__ (self, parent=None, row=None, col=None): self._parent = parent self._row = row self._col = col def __str__ (self): if self._parent is None: result = "[unknown]" else: result = str (self._parent) if self._row is not None: result += ": Row %i" % self._row if self._col is not None: result += ", Column %i" % self._col return result
[docs] def row (self, row): return Location (self._parent, row)
[docs] def col (self, col): return Location (self._parent, self._row, col)
[docs]class ParseError (Exception): def __init__ (self, value): self.value = value def __str__ (self): return repr(self.value)
[docs]class BadLineError (ParseError): pass
[docs]class SyntaxError (Exception): def __init__ (self, msg, loc): self.msg = msg self.loc = loc def __str__ (self): return "SyntaxError: %s at %s" % (self.msg, self.loc)
[docs]class BadFieldError (SyntaxError): def __init__ (self, loc, field): super (BadFieldError, self).__init__ ("Bad field contents (%s)" % field, loc)
[docs]class BadFieldCountError (SyntaxError): def __init__ (self, loc): super (BadFieldCountError, self).__init__ ("Bad field count", loc)
[docs]def removeTrailingNewline (line): r"""Remove the trailing \n from a line.""" lineList = line.split ('\n') if len (lineList) == 1 or len (lineList) == 2 and lineList[1] == '': return lineList[0] else: raise BadLineError (line)
[docs]def separateString (sep, string): r"""Split up a string according to the specified separator. The separator is either a string or a compiled regular expression. The empty string is interpreted as an empty list. """ if string == '': return [] else: if isinstance (sep, str): return string.split (sep) else: return re.split (sep, string)
[docs]def readLines (infile): r"""Read an input stream as a sequence of lines.""" return (removeTrailingNewline(line) for line in infile)
[docs]def isComment (line): line = line.lstrip () return line == '' or line[0] == '#'
[docs]def decomment (lines): return (line for line in lines if not isComment (line))
[docs]def readRegex (regex, n=None): if isinstance (regex, str): regex = re.compile (regex) def result (loc, field): match = re.match (regex, field) if match: if n is None: return match.group (1) else: return [match.group (i) for i in range (1, n + 1)] else: raise BadFieldError (loc, field) return result
[docs]def checkString (string=""): return readRegex ('^' + re.escape (string) + '$', 0)
checkBlank = checkString () ignore = readRegex ('.?', 0)
[docs]def makeReadString (encoding): def result (loc, field): return field.decode (encoding) return result
readString = makeReadString ('iso-8859-1') readUnicode = makeReadString ('utf-8')
[docs]def readInteger (loc, field): return int (field)
[docs]def readFloat (loc, field): return float (field)
[docs]def readBoolean (true='T', false='F'): if not isinstance (true, set): true = set ([true]) if not isinstance (false, set): false = set ([false]) if true & false: raise ValueError('Non-disjoint boolean text representation') def result (loc, field): if (field in true): return True elif (field in false): return False else: raise BadFieldError (loc, field) return result
[docs]def readDateTime (format): def result (loc, field): if field == '': return None else: return datetime.strptime (field, format) return result
[docs]def readDate (format): def result (loc, field): if field == '': return None else: return datetime.strptime (field, format).date () return result
readISODate = readDate ("%Y-%m-%d") readISOPackedDate = readDate ("%Y%m%d")
[docs]def readTime (format): def result (loc, field): if field == '': return None else: return datetime.strptime (field, format).time () return result
readISOMinuteTime = readTime ("%H:%M") readISOFullTime = readTime ("%H:%M:%S")
[docs]def readMultiple (splitField, fieldParser): def result (loc, field): loc = Location (loc) return list (map (fieldParser, (loc.row (row) for row in count ()), splitField (field))) return result
[docs]def parseLine (fields, loc, lineList): if len (lineList) != len (fields): raise BadFieldCountError (loc) result = {} for col in range (len (fields)): (fieldNames, parser) = fields[col] values = parser (loc.col (col), lineList[col]) if isinstance (fieldNames, str): result[fieldNames] = values else: result.update (list(zip(fieldNames, values))) return result
[docs]def parseLines (splitLine, fieldSpec, loc, lines, skippedLines=0): return map (partial (parseLine, fieldSpec), (loc.row (row) for row in count (skippedLines)), (splitLine (line) for line in lines))