import datetime
from uw.stringtools import multiReplace
[docs]def listFields (db, table):
"""List the fields of the specified table in the database.
The catalog must match the catalog to which the connection db is
connected.
"""
ps = db.prepare ("SELECT column_name FROM information_schema.columns AS c WHERE ROW (c.table_catalog, c.table_schema, c.table_name) = ROW ($1, $2, $3) ORDER BY c.ordinal_position;")
ps.execute (*table)
return [row [0] for row in ps]
[docs]def listPrimaryKey (db, table):
"""List the fields of the primary key of the given table in the database.
The catalog must match the catalog to which the connection db is
connected.
"""
ps = db.prepare ("SELECT ccu.column_name FROM information_schema.table_constraints AS tc NATURAL JOIN information_schema.key_column_usage AS ccu WHERE tc.constraint_type='PRIMARY KEY' AND ROW (tc.table_catalog, tc.table_schema, tc.table_name) = ROW ($1, $2, $3) ORDER BY ccu.ordinal_position;")
ps.execute (*table)
return [row [0] for row in ps]
[docs]def listKeyFields (db, constraint):
"""List, in order, the fields involved in the specified constraint.
"""
ps = db.prepare ("SELECT column_name FROM information_schema.key_column_usage WHERE ROW (constraint_catalog, constraint_schema, constraint_name) = ROW ($1, $2, $3) ORDER BY ordinal_position;")
ps.execute (*constraint)
return [row [0] for row in ps]
[docs]def getForeignKeys (db, table):
"""Get information about the foreign keys of the given table.
The result consists of a list of the foreign keys. Each key is
represented by a tuple containing the following items:
- foreign key constraint name
- foreign key is deferrable
- foreign key is initially deferrable
- target unique constraint
- match option
- update rule
- delete rule
- target table
- target constraint type (primary key or unique)
- target constraint is deferrable
- target constraint is initially deferrable
"""
ps = db.prepare ("SELECT tc.constraint_catalog, tc.constraint_schema, tc.constraint_name, tc.is_deferrable, tc.initially_deferred, rc.unique_constraint_catalog, rc.unique_constraint_schema, rc.unique_constraint_name, rc.match_option, rc.update_rule, rc.delete_rule, tc2.* FROM information_schema.table_constraints AS tc LEFT JOIN information_schema.referential_constraints AS rc USING (constraint_catalog, constraint_schema, constraint_name) LEFT JOIN information_schema.table_constraints AS tc2 ON ROW (rc.unique_constraint_catalog, rc.unique_constraint_schema, rc.unique_constraint_name) = ROW (tc2.constraint_catalog, tc2.constraint_schema, tc2.constraint_name) WHERE tc.constraint_type='FOREIGN KEY' AND (tc.table_catalog, tc.table_schema, tc.table_name) = ROW ($1, $2, $3);")
ps.execute (*table)
return [tuple ([row[0:3], row[3], row[4], row[5:8], row[8], row[9], row[10], row[14:17], row[17], row[18], row[19]]) for row in ps]
[docs]class SQLWriter (object):
def __init__ (self, file, delim='\t', null=r'\N'):
self._file = file
self._delim = delim
self._null = null
changes = {'\b': r'\b', '\f': r'\f', '\n': r'\n',
'\r': r'\r', '\t': r'\t', '\v': r'\v', '\\': r'\\'}
if not delim in changes:
changes[delim] = '\\' + delim
self.replace = multiReplace (changes)
[docs] def escape (self, value):
if value is None:
return self._null
elif isinstance (value, bool):
return 'TRUE' if value else 'FALSE'
elif isinstance (value,
datetime.date) or isinstance (value, datetime.time):
return value.isoformat ()
else:
return self.replace (unicode (value).encode ('utf-8'))
[docs] def write (self, values):
self._file.write (self._delim.join
([self.escape (value) for value in values]) + '\n')
@staticmethod
[docs] def openUploadFiles (dir, tables, **kwargs):
"""Open files corresponding to a list of table names for SQL upload.
Each entry in tables is interpreted as a table name, and a file of that
name is opened in directory dir. The result is a dictionary from
table/file name to a sqltools.SQLWriter for that table.
"""
result = {}
for table in tables:
result[table] = SQLWriter (open ("%s/%s" % (dir, table), 'w'),
**kwargs)
return result