Source code for uw.local.teaching.webui.accommodation

"""Web UI implementation for accommodation processing (AAS, Relief).

This implements the UI for uploading of accommodation information from AAS
(AccessAbility Services) and the Registrar's "Relief" program.  The same
system is used for CEL (Online courses), but the Web UI is not used to enter
the information into the system.
"""

import csv
import re

from operator import attrgetter

from collections import namedtuple

from datetime import datetime, timedelta

from itertools import groupby

from ll.xist.ns import html

from uw.web.html.form import render_checkbox, render_select
from uw.web.html.format import make_table_format, format_date, format_datetime, format_return
from uw.web.html.join import english_join, html_join

from uw.web.wsgi import status
from uw.web.wsgi.delegate import int_from_arc, delegate_action, delegate_value, delegate_get_post, always
from uw.web.wsgi.form import use_form_param
from uw.web.wsgi.function import return_html

from uw.local.util.format import format_person

from .ui import format_duration

from ...util.identity import use_remote_identity

@return_html
def accommodation_exam_index_handler (cursor, term, admin, roles):
    result = [format_return ('Main Menu', None, None, 'Accomodation')]

    sittings = cursor.execute_tuples ("select * from (select *, exam_sitting_count_allocated_candidates (sitting_id) as count_allocated_candidates, exam_sitting_count_spares(sitting_id) AS spare_count from exam_sitting where (sitting_term_id, sitting_admin_id) = (%(sitting_term_id)s, %(sitting_admin_id)s) and start_time between current_date and uw_business_day_offset (current_date, 5)) as es natural join exam_exam_sitting natural join exam_exam natural join teaching_admin where count_allocated_candidates > 0 order by start_time", sitting_term_id=term.code (), sitting_admin_id=admin.admin_id)
    table = [html.tr (html.th ('Start Time'), html.th ('# of Candidates'), html.th ('# of Spare Copies'), html.th ('Assessment'), html.th ('Masters Approved'), html.th ('Sent for Printing'))]

    if sittings:
        for (start_time, count_allocated_candidates, spare_count), exams in groupby (sittings, attrgetter ('start_time', 'count_allocated_candidates', 'spare_count')):
            exams = list (exams)
            rowspan = len (exams)
            table.append (html.tr (
                        html.td (format_datetime (start_time), rowspan=rowspan),
                        html.td (count_allocated_candidates, rowspan=rowspan),
                        html.td (spare_count, rowspan=rowspan),
                        html.td (exams[0].admin_description),
                        html.td (format_datetime (exams[0].master_approved)),
                        html.td (format_datetime (exams[0].master_accepted))
            ))
            for exam in exams[1:]:
                table.append (html.tr (
                    html.td (exam.admin_description),
                    html.td (format_datetime (exam.master_approved)),
                    html.td (format_datetime (exam.master_accepted))
                ))
    else:
        result.append (html.p ('Currently, there are no expected AAS deliveries in the next 5 business days.'))

    result.append (html.table (table))

    return 'Expected AAS Assessment Deliveries', result

@return_html
def accommodation_exam_index_handler (cursor, term, admin, roles):
    result = [format_return ('Main Menu', None, None, 'Accomodation')]

    sittings = cursor.execute_tuples ("select * from (select *, exam_sitting_count_allocated_candidates (sitting_id) as count_allocated_candidates, exam_sitting_count_spares(sitting_id) AS spare_count from exam_sitting where (sitting_term_id, sitting_admin_id) = (%(sitting_term_id)s, %(sitting_admin_id)s) and start_time between current_date and uw_business_day_offset (current_date, 5)) as es natural join exam_exam_sitting natural join exam_exam natural join teaching_admin where count_allocated_candidates > 0 order by start_time", sitting_term_id=term.code (), sitting_admin_id=admin.admin_id)
    table = [html.tr (html.th ('Start Time'), html.th ('# of Candidates'), html.th ('# of Spare Copies'), html.th ('Assessment'), html.th ('Masters Approved'), html.th ('Sent for Printing'))]

    if sittings:
        for (start_time, count_allocated_candidates, spare_count), exams in groupby (sittings, attrgetter ('start_time', 'count_allocated_candidates', 'spare_count')):
            exams = list (exams)
            rowspan = len (exams)
            table.append (html.tr (
                        html.td (format_datetime (start_time), rowspan=rowspan),
                        html.td (count_allocated_candidates, rowspan=rowspan),
                        html.td (spare_count, rowspan=rowspan),
                        html.td (exams[0].admin_description),
                        html.td (format_datetime (exams[0].master_approved)),
                        html.td (format_datetime (exams[0].master_accepted))
            ))
            for exam in exams[1:]:
                table.append (html.tr (
                    html.td (exam.admin_description),
                    html.td (format_datetime (exam.master_approved)),
                    html.td (format_datetime (exam.master_accepted))
                ))
    else:
        result.append (html.p ('Currently, there are no expected AAS deliveries in the next 5 business days.'))

    result.append (html.table (table))

    return 'Expected AAS Assessment Deliveries', result

@return_html
def assign_index_get_handler (cursor, term, admin, roles, form):
    """Offering accommodation index GET URL handler.

    With no form results, displays a list of assessment assignment periods
    for the term, with links to this offering's details for each assignment
    period.

    With "update" in the form, presents a form for updating the default
    room to be used for accommodation by this admin unit.
    """
    result = [format_return ('Main Menu', None, None, None)]

    default_room = cursor.execute_optional_tuple ("select * from accom_teaching_admin join room_room on (room_id = default_room_id) where accom_admin_id = %(admin_id)s", admin_id=admin.admin_id)
    default_room_id = default_room.room_id if default_room is not None else None

    if 'update' in form:
        result.append (html.form (
            html.p (
                'Change default room to: ',
                render_select ("room", cursor.rooms_select (), default_room_id, True),
                ' ',
                html.input (type="submit", name="!update-default", value="Update!"),
            ),
            method="post", action=""
        ))

        return 'Update Default Room', result
    else:
        admin_codes = cursor.execute_tuples ("select * from accom_request_admin_code join teaching_admin on (sitting_admin_id = admin_id) where accom_admin_id=%(admin_id)s order by request_admin_code", admin_id=admin.admin_id)

        if default_room:
            result.append (html.h2 ('Default Room'))
            result.append (html.p (
                'The default room that will be used for accommodation is %s %s. ' %
                    (default_room.building_code, default_room.room_code),
                html.a ('Change…', href="?update"),
            ))

        if admin_codes:
            result.append (html.h2 ('Admin Codes'))
            result.append (make_table_format (
                ('Code', attrgetter ('request_admin_code')),
                ('Admin', attrgetter ('admin_description')),
            ) (admin_codes))

        return 'Upload Accommodation Requests', result

@return_html
def assign_index_post_handler (cursor, term, admin, roles, form):
    """Offering accommodation index POST URL handler.

    Implements the form which allows changing the default room for
    accommodation managed by this admin unit.
    """
    if "!update-default" in form:
        room = form.required_field_value ("room")
        if room:
            room_id = int (room)
        else:
            room_id = None
        cursor.execute_none ("update accom_teaching_admin set default_room_id = %(room_id)s where accom_admin_id = %(admin_id)s", room_id=room_id, admin_id=admin.admin_id)

    raise status.HTTPFound ("./")

@return_html
def request_index_get_handler (cursor, term, admin, roles):
    """Accommodation request list GET URL handler.

    Shows a list of existing requests, as well as a
    form for uploading a new request.
    """
    result = [format_return ('Main Menu', None, None, 'Accommodation')]

    result.append (make_table_format (
        ('Request', attrgetter ('request_id')),
        ('Timestamp', lambda r: html.a (format_datetime (r.request_created), href="%d/" % r.request_id)),
    ) (cursor.execute_tuples ("select term_id, accom_admin_id, request_id, request_created from accom_request_submission_plus where (term_id, accom_admin_id) = (%(term_id)s, %(accom_admin_id)s) order by request_id desc", term_id=term.code (), accom_admin_id=admin.admin_id)))

    result.append (html.form (
        html.p (
            'Upload new request: ',
            html.input (type="file", name="csv", accept="text/csv"),
            ' ',
            html.input (type="submit", name="!create", value="Create Request!"),
        ),
        method="post", enctype="multipart/form-data", action=""
    ))

    return 'Accommodation Requests', result

request_row = namedtuple ('request_row', [
    'row_number', 'uw_id', 'class_id',
    'building_code', 'room_code', 'seat_col', 'seat_row',
    'start_time', 'override_duration',
    'instance_code',
])

error_request_row = namedtuple ('error_request_row', [
    'row_number', 'error_message',
])

[docs]def regex_match (regex, error_message): """Build a field parser based on the specified regular expression. :param str regex: A regular expression to match field contents. :param str error_message: The error message to report if no match. :return: A parser function. """ regex = re.compile (regex) def parser (cursor, result, errors, value): match = regex.match (value) if match is None: errors.append (error_message) else: result.update (match.groupdict ()) return parser
[docs]def parse_class_id (cursor, result, errors, class_id, subject_code, catalog, section_code): try: class_id = int (class_id) except ValueError: errors.append ("Invalid class number") return try: section_code = '%03d' % int (section_code) except ValueError: errors.append ("Invalid section number") return if not cursor.execute_required_value ("select exists (select from off_course_section_plus where (class_id, subject_code, catalog, section_code) = (%(class_id)s, %(subject_code)s, %(catalog)s, %(section_code)s))", class_id=class_id, subject_code=subject_code, catalog=catalog, section_code=section_code): errors.append ("Nonexistent class section") return result['class_id'] = class_id
[docs]def parse_start_time (cursor, result, errors, exam_date, start_time): try: exam_date = datetime.strptime (exam_date, '%m/%d/%Y') except ValueError: errors.append ("Invalid assessment date") return try: start_time = datetime.strptime (start_time, '%I:%M %p').time () except ValueError: errors.append ("Invalid assessment time") return result['start_time'] = datetime.combine (exam_date, start_time)
[docs]def parse_duration (cursor, result, errors, value): try: result ['override_duration'] = timedelta (minutes=int (value)) except ValueError: errors.append ("Invalid duration")
[docs]def parse_extra (ext): return {'extra_candidate': ext or False}
[docs]def parse_remove (rm): return {'remove': rm or False}
uw_id_re = '^(?P<uw_id>[0-9]{8})$' location_re = '^([A-Z]+ - (?P<building_code>[A-Z]+[1-9]?) (?P<room_code>[0-9]+[A-Z]?)(-(?P<seat_col>[0-9]+))?)?$' barcode_re = '^(?P<instance_code>E[0-9]+)$' aas_fields = ( (['SchoolID'], regex_match (uw_id_re, 'Invalid UW ID')), (['CRN', 'Subject', 'Course', 'Section'], parse_class_id), (['Exam Date', 'Start Time'], parse_start_time), (['LocationName'], regex_match (location_re, 'Invalid room/seat location')), (['Total Length'], parse_duration), (['Barcode'], regex_match (barcode_re, 'Invalid barcode')), ) opt_fields = ( ('Extra', parse_extra), ('Remove', parse_remove), ) # Mapping the column names to database/field value names opt_fields_map = { 'Extra': 'extra_candidate', 'Remove': 'remove', }
[docs]def split_raw_csv (request): return csv.reader (request.split ('\n'))
[docs]def process_raw_request_row (cursor, row_number, row_content): """ :param cursor: DB connection cursor. :param dict row: The raw row, as a map from column titles to field values. :return: (result, errors) """ result = {'row_number': row_number} errors = [] for fields, parser in aas_fields: parser (cursor, result, errors, *(row_content[f] for f in fields)) if 'seat_col' in result: result['seat_row'] = 1 return result, errors
[docs]def process_raw_request (cursor, request): """Parse uploaded accommodation request information. :param cursor: DB connection cursor. :param str request: The raw form field contents. The input is expected to be CSV. The return value is a tuple (result, errors). The result is a list of parsed rows, each represented as a dictionary. The errors is a list of errors encountered. This first builds a list of lists, with one element for each row. Each row is a list of individual field values. The header row is checked to ensure that all expected field are present. If not then processing is aborted and the result is None while the missing fields are reported as errors. Finally all remaining rows of the input are parsed, using the parsers from the field list for the respective fields and ignoring other fields. Blank lines are ignored. """ # Break the request into rows and columns, stripping each cell csv_reader = split_raw_csv (request) # Obtain field headers from first row header_row = next (csv_reader) # We only parse the AAS upload; but may in future parse different uploads field_list = aas_fields # Compute map from field names to column numbers fields_found = dict ((col, j) for j, col in enumerate (header_row)) # Initialize result and error lists result = [] errors = [] # Make sure all required fields are present for fields, _ in field_list: for field in fields: if field not in fields_found: errors.append (field) if errors: return [], [error_request_row (1, 'Required field headings %s missing' % english_join (*('“%s”' % e for e in errors)))] # Add optional field names request_opt_fields = [] for name, parser in opt_fields: if name in fields: request_opt_fields.append (name) field_list = field_list + ((name, parser),) raw_request_row = namedtuple ('request_row', request_row._fields + tuple (request_opt_fields)) # Extract relevant fields from all non-blank rows. # Number the rows starting with 1 for compatibility with spreadsheets for i, row in enumerate (csv_reader, start=2): row = [col.strip () for col in row] if all (col == '' for col in row): continue row = dict ((heading, col) for heading, col in zip (header_row, row)) row_result, row_errors = process_raw_request_row (cursor, i, row) if row_errors: errors.append (error_request_row (i, '\n'.join (row_errors))) else: result.append (raw_request_row (**row_result)) return result, errors
@use_remote_identity @use_form_param @return_html def request_index_post_handler (cursor, remote_identity, term, admin, roles, form): """Accommodation request list GET URL handler. Handles submission of an accommodation request. The raw request is recorded in the database and committed so that it will be recorded for analysis in the event it is useful in debugging a problem that occurs later in processing. Next the request is parsed. If there are errors, they are displayed. Otherwise, the parsed request rows are individually recorded in the database. """ raw_content = form.optional_field_value ("csv").decode () # Ensure we at least have the raw contents recorded in case of an error request_id = cursor.callproc_required_value ("accom_create_raw_request", term.code(), admin.admin_id, remote_identity.person_id, raw_content) cursor.connection.commit () # Parse raw content and record individual requests parsed_content, parse_errors = process_raw_request (cursor, raw_content) if parsed_content is None: result = [ html.p ('Unable to parse uploaded file; the following required field headings are missing:'), html.ul (html.li (e) for e in parse_errors), ] return 'Error: Unable to parse upload', result def create_request_sql (content, table_name): values = { 'term_id': term.code (), 'admin_id': admin.admin_id, 'request_id': request_id, } if content: value_columns = "term_id, accom_admin_id, request_id" column_values = "%(term_id)s, %(admin_id)s, %(request_id)s" % values for field in content[0]._fields: value_columns = value_columns + ", %s" % field column_values = column_values + ", %%(%s)s" % field parsed_sql = "insert into %s (" % table_name + value_columns + ") values (" + column_values + ")" for row in content: values.update (row._asdict ()) cursor.execute_none (parsed_sql, **values) create_request_sql (parsed_content, "accom_request_parsed") for rows in cursor.callproc_values ("accom_delete_duplicate_locations", term.code (), admin.admin_id): parse_errors.append (error_request_row (rows[0], 'Duplicate location with row ' + ''.join (map (str, english_join (*rows[1:]))))) for row in cursor.callproc_values ("accom_delete_conflict_locations", term.code (), admin.admin_id): parse_errors.append (error_request_row (row, 'Conflict with other assessment with seating finalized')) create_request_sql (parse_errors, "accom_request_error") cursor.callproc_none ("accom_transform_upload", term.code (), admin.admin_id) raise status.HTTPFound ("%d/" % request_id)
[docs]def format_room (building_code, room_code): """Utility routine for formating a room code for display. Parameters: building_code -- the short code for the building; room_code -- the short code for the room. Returns the building_code and room_code joined by a non-breaking space, or None in the event they are None. """ if building_code is None: return None else: return '%s %s' % (building_code, room_code)
@return_html def request_get_handler (cursor, term, admin, roles, request): """Accommodation request view GET URL handler. Displays the specific accommodation request. Shows the parsed contents first, followed by the raw request, formatted as a table by splitting it up into lines and columns. If appropriate, also shows form controls for updating the set of skipped rows, and for fulfilling the request. """ result = [format_return ('Main Menu', None, None, None, 'Accommodation')] if request.request_raw_contents is not None: request_raw_contents = [row for row in split_raw_csv (request.request_raw_contents)] request_errors = cursor.execute_tuples ("select row_number, error_message from accom_request_error where (term_id, accom_admin_id, request_id) = (%(term_id)s, %(accom_admin_id)s, %(request_id)s)", term_id=term.code (), accom_admin_id=admin.admin_id, request_id=request.request_id) if request_errors: result.append (html.h2 ('Error Rows')) result.append (html.p ('Error Requests: %d' % len (request_errors))) result.append (html.table ( html.tr ( html.th ('Row', rowspan=2), html.th ('Error', rowspan=2), html.th ('Uploaded Data', colspan=len (request_raw_contents[0])), ), html.tr ( (html.th (column_name) for column_name in request_raw_contents[0])), (html.tr ( html.td (request_error.row_number, style="text-align: right;"), html.td (request_error.error_message), (html.td (column_value) for column_value in request_raw_contents[request_error.row_number - 1])) for request_error in request_errors) )) result.append (html.h2 ('Parsed Contents')) request_submission = cursor.execute_required_tuple ("select * from accom_request_submission where (term_id, accom_admin_id, request_id) = (%(term_id)s, %(accom_admin_id)s, %(request_id)s)", term_id=term.code (), accom_admin_id=admin.admin_id, request_id=request.request_id) result.append (html.p ('Uploaded: ', format_datetime (request_submission.request_created))) fulfilled = bool (request_submission.request_submitted) if fulfilled: result.append (html.p ('Fulfilled: ', format_datetime (request_submission.request_submitted))) else: result.append (html.p ('This accommodation request has not been fulfilled.')) requests = cursor.execute_tuples ("select *, room_seat_number (seat_col, seat_row) as seat from accom_request_parsed_plus where (term_id, accom_admin_id, request_id) = (%(term_id)s, %(accom_admin_id)s, %(request_id)s) order by row_number", term_id=term.code (), accom_admin_id=admin.admin_id, request_id=request.request_id) result.append (html.p ('Valid Requests: %d' % len (requests))) provided_columns = [ 'Row', 'UW ID', 'Class ID', 'Admin Code', 'Start', 'Duration', 'Room', 'Seat', 'Barcode', ] derived_columns = [ 'Remove', 'Extra', 'Student', 'Section', 'Course', 'Assessment', 'Sitting Admin', 'Start Time', 'Room', 'Seat' ] request_table = html.table ( html.tr ( html.th ('Provided', colspan=len (provided_columns)), html.th ('Derived', colspan=len (derived_columns)), ), html.tr ( [html.th (title) for title in provided_columns + derived_columns], ), ) for r in requests: request_table.append (html.tr ( html.td (r.row_number, style="text-align: right;"), html.td (r.uw_id), html.td (r.class_id), html.td (None), # r.request_admin_code html.td (format_datetime (r.start_time)), html.td (format_duration (r.override_duration)), html.td (format_room (r.building_code, r.room_code)), html.td (r.seat_col), html.td (r.instance_code), html.td (None), # r.remove html.td (r.extra_candidate), html.td (format_person (cursor, r.person_id)), html.td (r.section_description), html.td (r.admin_description), html.td (r.exam_title), html.td (r.sitting_admin_description), html.td (format_datetime (r.start_time)), html.td (format_room (r.building_code, r.room_code)), html.td (r.seat), )) if fulfilled: result.append (request_table) if request.request_raw_contents is not None: result.append (html.a ('Raw Table Contents', href="raw")) return 'Accommodation Request %d' % request.request_id, result @return_html def request_raw_handler (cursor, term, admin, roles, request): """ """ result = [format_return ('Main Menu', None, None, None, 'Accommodation', dot='Request')] if request.request_raw_contents is None: result.append (html.p ('No raw contents')) else: result.append (html.p ('Length %d characters.' % len (request.request_raw_contents))) request_raw_contents = list (split_raw_csv (request.request_raw_contents)) result.append (html.table ( html.tr ( html.th ('Row', rowspan=2), html.th ('Uploaded Data', colspan=len (request_raw_contents[0])), ), html.tr (html.th (col) for col in request_raw_contents[0]), [html.tr (html.td (i, style="text-align: right;"), (html.td (col) for col in row)) for i, row in enumerate (request_raw_contents[1:], start=2)] )) return 'Accommodation Request %d Raw Contents' % request.request_id, result
[docs]def request_from_arc (arc, cursor, term, admin, **params): """Interpret a URL path arc as an accommodation request_id. arc -- the URL path arc; cursor -- DB connection cursor; term -- the relevant term; admin -- the relevant admin unit; params -- any additional context not needed for this arc parser. First converts the arc to an integer. """ arc = int_from_arc (arc) if arc is None: return None return cursor.execute_optional_tuple ("select * from accom_request_upload where (term_id, accom_admin_id, request_id) = (%(term_id)s, %(accom_admin_id)s, %(request_id)s)", term_id=term.code (), accom_admin_id=admin.admin_id, request_id=arc)
[docs]def request_delegate (dir_handler, arc_handler): """Delegate to URL handler based on accommodation request_id in URL. """ return delegate_value ('request', dir_handler, always (arc_handler), convert_arc=request_from_arc)
accommodation_handler = request_delegate ( delegate_get_post (request_index_get_handler, request_index_post_handler), delegate_action (delegate_get_post (request_get_handler), { "raw": request_raw_handler }) )