Source code for uw.sql.wrap
from threading import Lock
[docs]class Connection (object):
def __init__ (self, connection, pool):
self.__connection = connection
self.__pool = pool
self.__closed = False
self.__lock = Lock ()
@property
def pool (self):
return self.__pool
@property
def closed (self):
return self.__closed
[docs] def close (self, ignore_duplicate_close=False):
try:
self.__lock.acquire ()
if self.closed:
if ignore_duplicate_close:
return
else:
raise Exception ("Connection already closed")
self.__closed = True
self.rollback ()
self.__connection.close ()
finally:
self.__lock.release ()
[docs] def cursor (self):
return self.pool.cursor_class (self, self.__connection.cursor ())
def __del__ (self):
self.close (ignore_duplicate_close=True)
def __getattr__ (self, name):
return getattr (self.__connection, name)
[docs]class Cursor (object):
def __init__ (self, connection, cursor):
self.__connection = connection
self.__cursor = cursor
@property
def connection (self):
return self.__connection
def __getattr__ (self, name):
return getattr (self.__cursor, name)
from .dbapi import fetch_none
from .dbapi import fetch_optional_value
from .dbapi import fetch_required_value
from .dbapi import fetch_values
from .dbapi import fetch_optional_tuple
from .dbapi import fetch_required_tuple
from .dbapi import fetch_tuples
from .dbapi import execute_none
from .dbapi import execute_optional_value
from .dbapi import execute_required_value
from .dbapi import execute_values
from .dbapi import execute_optional_tuple
from .dbapi import execute_required_tuple
from .dbapi import execute_tuples
from .dbapi import callproc_none
from .dbapi import callproc_optional_value
from .dbapi import callproc_required_value
from .dbapi import callproc_values
from .dbapi import callproc_optional_tuple
from .dbapi import callproc_required_tuple
from .dbapi import callproc_tuples
from queue import Queue
[docs]class ConnectionPool (object):
def __init__ (self, open_connection, cursor_class):
self.__open_connection = open_connection
self.__cursor_class = cursor_class
@property
def cursor_class (self):
return self.__cursor_class
[docs] def put (self, connection):
"""Release the connection back to the pool.
Temporarily, just forget about the connection (i.e., close it).
"""
connection.close ()
[docs] def connect (self):
"""Obtain an available connection from the pool.
Temporarily, just open a new connection every time.
"""
return Connection (self.__open_connection (), self)
import psycopg2
import psycopg2.extensions
psycopg2.extensions.register_type (psycopg2.extensions.UNICODE)
[docs]def open_psycopg2_db_service_connection (service=None):
if service is None:
dsn = ""
else:
dsn = "service=%s" % service
def connect ():
conn = psycopg2.connect (dsn=dsn)
conn.set_client_encoding ('UNICODE')
return conn
return connect
[docs]def open_psycopg2_db_service_cursor (service=None, cursor_class=Cursor):
'''Open a connection and obtain a cursor on it.
This is intended for programs which just need a single cursor, such as
command-line utilities or otherwise short-lived processes.
'''
return ConnectionPool (open_psycopg2_db_service_connection
(service=service), cursor_class).connect ().cursor ()