|
- # A custom autograder for this project
-
- ################################################################################
- # A mini-framework for autograding
- ################################################################################
-
- import optparse
- import pickle
- import random
- import sys
- import traceback
-
- class WritableNull:
- def write(self, string):
- pass
-
- def flush(self):
- pass
-
- class Tracker(object):
- def __init__(self, questions, maxes, prereqs, mute_output):
- self.questions = questions
- self.maxes = maxes
- self.prereqs = prereqs
-
- self.points = {q: 0 for q in self.questions}
-
- self.current_question = None
-
- self.current_test = None
- self.points_at_test_start = None
- self.possible_points_remaining = None
-
- self.mute_output = mute_output
- self.original_stdout = None
- self.muted = False
-
- def mute(self):
- if self.muted:
- return
-
- self.muted = True
- self.original_stdout = sys.stdout
- sys.stdout = WritableNull()
-
- def unmute(self):
- if not self.muted:
- return
-
- self.muted = False
- sys.stdout = self.original_stdout
-
- def begin_q(self, q):
- assert q in self.questions
- text = 'Question {}'.format(q)
- print('\n' + text)
- print('=' * len(text))
-
- for prereq in sorted(self.prereqs[q]):
- if self.points[prereq] < self.maxes[prereq]:
- print("""*** NOTE: Make sure to complete Question {} before working on Question {},
- *** because Question {} builds upon your answer for Question {}.
- """.format(prereq, q, q, prereq))
- return False
-
- self.current_question = q
- self.possible_points_remaining = self.maxes[q]
- return True
-
- def begin_test(self, test_name):
- self.current_test = test_name
- self.points_at_test_start = self.points[self.current_question]
- print("*** {}) {}".format(self.current_question, self.current_test))
- if self.mute_output:
- self.mute()
-
- def end_test(self, pts):
- if self.mute_output:
- self.unmute()
- self.possible_points_remaining -= pts
- if self.points[self.current_question] == self.points_at_test_start + pts:
- print("*** PASS: {}".format(self.current_test))
- elif self.points[self.current_question] == self.points_at_test_start:
- print("*** FAIL")
-
- self.current_test = None
- self.points_at_test_start = None
-
- def end_q(self):
- assert self.current_question is not None
- assert self.possible_points_remaining == 0
- print('\n### Question {}: {}/{} ###'.format(
- self.current_question,
- self.points[self.current_question],
- self.maxes[self.current_question]))
-
- self.current_question = None
- self.possible_points_remaining = None
-
- def finalize(self):
- import time
- print('\nFinished at %d:%02d:%02d' % time.localtime()[3:6])
- print("\nProvisional grades\n==================")
-
- for q in self.questions:
- print('Question %s: %d/%d' % (q, self.points[q], self.maxes[q]))
- print('------------------')
- print('Total: %d/%d' % (sum(self.points.values()),
- sum([self.maxes[q] for q in self.questions])))
-
- print("""
- Your grades are NOT yet registered. To register your grades, make sure
- to follow your instructor's guidelines to receive credit on your project.
- """)
-
- def add_points(self, pts):
- self.points[self.current_question] += pts
-
- TESTS = []
- PREREQS = {}
- def add_prereq(q, pre):
- if isinstance(pre, str):
- pre = [pre]
-
- if q not in PREREQS:
- PREREQS[q] = set()
- PREREQS[q] |= set(pre)
-
- def test(q, points):
- def deco(fn):
- TESTS.append((q, points, fn))
- return fn
- return deco
-
- def parse_options(argv):
- parser = optparse.OptionParser(description = 'Run public tests on student code')
- parser.set_defaults(
- edx_output=False,
- gs_output=False,
- no_graphics=False,
- mute_output=False,
- check_dependencies=False,
- )
- parser.add_option('--edx-output',
- dest = 'edx_output',
- action = 'store_true',
- help = 'Ignored, present for compatibility only')
- parser.add_option('--gradescope-output',
- dest = 'gs_output',
- action = 'store_true',
- help = 'Ignored, present for compatibility only')
- parser.add_option('--question', '-q',
- dest = 'grade_question',
- default = None,
- help = 'Grade only one question (e.g. `-q q1`)')
- parser.add_option('--no-graphics',
- dest = 'no_graphics',
- action = 'store_true',
- help = 'Do not display graphics (visualizing your implementation is highly recommended for debugging).')
- parser.add_option('--mute',
- dest = 'mute_output',
- action = 'store_true',
- help = 'Mute output from executing tests')
- parser.add_option('--check-dependencies',
- dest = 'check_dependencies',
- action = 'store_true',
- help = 'check that numpy and matplotlib are installed')
- (options, args) = parser.parse_args(argv)
- return options
-
- def main():
- options = parse_options(sys.argv)
- if options.check_dependencies:
- check_dependencies()
- return
-
- if options.no_graphics:
- disable_graphics()
-
- questions = set()
- maxes = {}
- for q, points, fn in TESTS:
- questions.add(q)
- maxes[q] = maxes.get(q, 0) + points
- if q not in PREREQS:
- PREREQS[q] = set()
-
- questions = list(sorted(questions))
- if options.grade_question:
- if options.grade_question not in questions:
- print("ERROR: question {} does not exist".format(options.grade_question))
- sys.exit(1)
- else:
- questions = [options.grade_question]
- PREREQS[options.grade_question] = set()
-
- tracker = Tracker(questions, maxes, PREREQS, options.mute_output)
- for q in questions:
- started = tracker.begin_q(q)
- if not started:
- continue
-
- for testq, points, fn in TESTS:
- if testq != q:
- continue
- tracker.begin_test(fn.__name__)
- try:
- fn(tracker)
- except KeyboardInterrupt:
- tracker.unmute()
- print("\n\nCaught KeyboardInterrupt: aborting autograder")
- tracker.finalize()
- print("\n[autograder was interrupted before finishing]")
- sys.exit(1)
- except:
- tracker.unmute()
- print(traceback.format_exc())
- tracker.end_test(points)
- tracker.end_q()
- tracker.finalize()
-
- ################################################################################
- # Tests begin here
- ################################################################################
-
- import numpy as np
- import matplotlib
- import contextlib
-
- import nn
- import backend
-
- def check_dependencies():
- import matplotlib.pyplot as plt
- import time
- fig, ax = plt.subplots(1, 1)
- ax.set_xlim([-1, 1])
- ax.set_ylim([-1, 1])
- line, = ax.plot([], [], color="black")
- plt.show(block=False)
-
- for t in range(400):
- angle = t * 0.05
- x = np.sin(angle)
- y = np.cos(angle)
- line.set_data([x,-x], [y,-y])
- fig.canvas.draw_idle()
- fig.canvas.start_event_loop(1e-3)
-
- def disable_graphics():
- backend.use_graphics = False
-
- @contextlib.contextmanager
- def no_graphics():
- old_use_graphics = backend.use_graphics
- backend.use_graphics = False
- yield
- backend.use_graphics = old_use_graphics
-
- def verify_node(node, expected_type, expected_shape, method_name):
- if expected_type == 'parameter':
- assert node is not None, (
- "{} should return an instance of nn.Parameter, not None".format(method_name))
- assert isinstance(node, nn.Parameter), (
- "{} should return an instance of nn.Parameter, instead got type {!r}".format(
- method_name, type(node).__name__))
- elif expected_type == 'loss':
- assert node is not None, (
- "{} should return an instance a loss node, not None".format(method_name))
- assert isinstance(node, (nn.SquareLoss, nn.SoftmaxLoss)), (
- "{} should return a loss node, instead got type {!r}".format(
- method_name, type(node).__name__))
- elif expected_type == 'node':
- assert node is not None, (
- "{} should return a node object, not None".format(method_name))
- assert isinstance(node, nn.Node), (
- "{} should return a node object, instead got type {!r}".format(
- method_name, type(node).__name__))
- else:
- assert False, "If you see this message, please report a bug in the autograder"
-
- if expected_type != 'loss':
- assert all([(expected is '?' or actual == expected) for (actual, expected) in zip(node.data.shape, expected_shape)]), (
- "{} should return an object with shape {}, got {}".format(
- method_name, nn.format_shape(expected_shape), nn.format_shape(node.data.shape)))
-
- def trace_node(node_to_trace):
- """
- Returns a set containing the node and all ancestors in the computation graph
- """
- nodes = set()
- tape = []
-
- def visit(node):
- if node not in nodes:
- for parent in node.parents:
- visit(parent)
- nodes.add(node)
- tape.append(node)
-
- visit(node_to_trace)
-
- return nodes
-
- @test('q1', points=6)
- def check_perceptron(tracker):
- import models
-
- print("Sanity checking perceptron...")
- np_random = np.random.RandomState(0)
- # Check that the perceptron weights are initialized to a vector with `dimensions` entries.
- for dimensions in range(1, 10):
- p = models.PerceptronModel(dimensions)
- p_weights = p.get_weights()
- verify_node(p_weights, 'parameter', (1, dimensions), "PerceptronModel.get_weights()")
-
- # Check that run returns a node, and that the score in the node is correct
- for dimensions in range(1, 10):
- p = models.PerceptronModel(dimensions)
- p_weights = p.get_weights()
- verify_node(p_weights, 'parameter', (1, dimensions), "PerceptronModel.get_weights()")
- point = np_random.uniform(-10, 10, (1, dimensions))
- score = p.run(nn.Constant(point))
- verify_node(score, 'node', (1, 1), "PerceptronModel.run()")
- calculated_score = nn.as_scalar(score)
- expected_score = float(np.dot(point.flatten(), p_weights.data.flatten()))
- assert np.isclose(calculated_score, expected_score), (
- "The score computed by PerceptronModel.run() ({:.4f}) does not match the expected score ({:.4f})".format(
- calculated_score, expected_score))
-
- # Check that get_prediction returns the correct values, including the
- # case when a point lies exactly on the decision boundary
- for dimensions in range(1, 10):
- p = models.PerceptronModel(dimensions)
- random_point = np_random.uniform(-10, 10, (1, dimensions))
- for point in (random_point, np.zeros_like(random_point)):
- prediction = p.get_prediction(nn.Constant(point))
- assert prediction == 1 or prediction == -1, (
- "PerceptronModel.get_prediction() should return 1 or -1, not {}".format(
- prediction))
-
- expected_prediction = np.asscalar(np.where(np.dot(point, p.get_weights().data.T) >= 0, 1, -1))
- assert prediction == expected_prediction, (
- "PerceptronModel.get_prediction() returned {}; expected {}".format(
- prediction, expected_prediction))
-
- tracker.add_points(2) # Partial credit for passing sanity checks
-
- print("Sanity checking perceptron weight updates...")
-
- # Test weight updates. This involves constructing a dataset that
- # requires 0 or 1 updates before convergence, and testing that weight
- # values change as expected. Note that (multiplier < -1 or multiplier > 1)
- # must be true for the testing code to be correct.
- dimensions = 2
- for multiplier in (-5, -2, 2, 5):
- p = models.PerceptronModel(dimensions)
- orig_weights = p.get_weights().data.reshape((1, dimensions)).copy()
- if np.abs(orig_weights).sum() == 0.0:
- # This autograder test doesn't work when weights are exactly zero
- continue
- point = multiplier * orig_weights
- sanity_dataset = backend.Dataset(
- x=np.tile(point, (500, 1)),
- y=np.ones((500, 1)) * -1.0
- )
- p.train(sanity_dataset)
- new_weights = p.get_weights().data.reshape((1, dimensions))
-
- if multiplier < 0:
- expected_weights = orig_weights
- else:
- expected_weights = orig_weights - point
-
- if not np.all(new_weights == expected_weights):
- print()
- print("Initial perceptron weights were: [{:.4f}, {:.4f}]".format(
- orig_weights[0,0], orig_weights[0,1]))
- print("All data points in the dataset were identical and had:")
- print(" x = [{:.4f}, {:.4f}]".format(
- point[0,0], point[0,1]))
- print(" y = -1")
- print("Your trained weights were: [{:.4f}, {:.4f}]".format(
- new_weights[0,0], new_weights[0,1]))
- print("Expected weights after training: [{:.4f}, {:.4f}]".format(
- expected_weights[0,0], expected_weights[0,1]))
- print()
- assert False, "Weight update sanity check failed"
-
- print("Sanity checking complete. Now training perceptron")
- model = models.PerceptronModel(3)
- dataset = backend.PerceptronDataset(model)
-
- model.train(dataset)
- backend.maybe_sleep_and_close(1)
-
- assert dataset.epoch != 0, "Perceptron code never iterated over the training data"
-
- accuracy = np.mean(np.where(np.dot(dataset.x, model.get_weights().data.T) >= 0.0, 1.0, -1.0) == dataset.y)
- if accuracy < 1.0:
- print("The weights learned by your perceptron correctly classified {:.2%} of training examples".format(accuracy))
- print("To receive full points for this question, your perceptron must converge to 100% accuracy")
- return
-
- tracker.add_points(4)
-
- @test('q2', points=6)
- def check_regression(tracker):
- import models
- model = models.RegressionModel()
- dataset = backend.RegressionDataset(model)
-
- detected_parameters = None
- for batch_size in (1, 2, 4):
- inp_x = nn.Constant(dataset.x[:batch_size])
- inp_y = nn.Constant(dataset.y[:batch_size])
- output_node = model.run(inp_x)
- verify_node(output_node, 'node', (batch_size, 1), "RegressionModel.run()")
- trace = trace_node(output_node)
- assert inp_x in trace, "Node returned from RegressionModel.run() does not depend on the provided input (x)"
-
- if detected_parameters is None:
- detected_parameters = [node for node in trace if isinstance(node, nn.Parameter)]
-
- for node in trace:
- assert not isinstance(node, nn.Parameter) or node in detected_parameters, (
- "Calling RegressionModel.run() multiple times should always re-use the same parameters, but a new nn.Parameter object was detected")
-
- for batch_size in (1, 2, 4):
- inp_x = nn.Constant(dataset.x[:batch_size])
- inp_y = nn.Constant(dataset.y[:batch_size])
- loss_node = model.get_loss(inp_x, inp_y)
- verify_node(loss_node, 'loss', None, "RegressionModel.get_loss()")
- trace = trace_node(loss_node)
- assert inp_x in trace, "Node returned from RegressionModel.get_loss() does not depend on the provided input (x)"
- assert inp_y in trace, "Node returned from RegressionModel.get_loss() does not depend on the provided labels (y)"
-
- for node in trace:
- assert not isinstance(node, nn.Parameter) or node in detected_parameters, (
- "RegressionModel.get_loss() should not use additional parameters not used by RegressionModel.run()")
-
- tracker.add_points(2) # Partial credit for passing sanity checks
-
- model.train(dataset)
- backend.maybe_sleep_and_close(1)
-
- train_loss = model.get_loss(nn.Constant(dataset.x), nn.Constant(dataset.y))
- verify_node(train_loss, 'loss', None, "RegressionModel.get_loss()")
- train_loss = nn.as_scalar(train_loss)
-
- # Re-compute the loss ourselves: otherwise get_loss() could be hard-coded
- # to always return zero
- train_predicted = model.run(nn.Constant(dataset.x))
- verify_node(train_predicted, 'node', (dataset.x.shape[0], 1), "RegressionModel.run()")
- sanity_loss = 0.5 * np.mean((train_predicted.data - dataset.y)**2)
-
- assert np.isclose(train_loss, sanity_loss), (
- "RegressionModel.get_loss() returned a loss of {:.4f}, "
- "but the autograder computed a loss of {:.4f} "
- "based on the output of RegressionModel.run()".format(
- train_loss, sanity_loss))
-
- loss_threshold = 0.02
- if train_loss <= loss_threshold:
- print("Your final loss is: {:f}".format(train_loss))
- tracker.add_points(4)
- else:
- print("Your final loss ({:f}) must be no more than {:.4f} to receive full points for this question".format(train_loss, loss_threshold))
-
- @test('q3', points=6)
- def check_digit_classification(tracker):
- import models
- model = models.DigitClassificationModel()
- dataset = backend.DigitClassificationDataset(model)
-
- detected_parameters = None
- for batch_size in (1, 2, 4):
- inp_x = nn.Constant(dataset.x[:batch_size])
- inp_y = nn.Constant(dataset.y[:batch_size])
- output_node = model.run(inp_x)
- verify_node(output_node, 'node', (batch_size, 10), "DigitClassificationModel.run()")
- trace = trace_node(output_node)
- assert inp_x in trace, "Node returned from DigitClassificationModel.run() does not depend on the provided input (x)"
-
- if detected_parameters is None:
- detected_parameters = [node for node in trace if isinstance(node, nn.Parameter)]
-
- for node in trace:
- assert not isinstance(node, nn.Parameter) or node in detected_parameters, (
- "Calling DigitClassificationModel.run() multiple times should always re-use the same parameters, but a new nn.Parameter object was detected")
-
- for batch_size in (1, 2, 4):
- inp_x = nn.Constant(dataset.x[:batch_size])
- inp_y = nn.Constant(dataset.y[:batch_size])
- loss_node = model.get_loss(inp_x, inp_y)
- verify_node(loss_node, 'loss', None, "DigitClassificationModel.get_loss()")
- trace = trace_node(loss_node)
- assert inp_x in trace, "Node returned from DigitClassificationModel.get_loss() does not depend on the provided input (x)"
- assert inp_y in trace, "Node returned from DigitClassificationModel.get_loss() does not depend on the provided labels (y)"
-
- for node in trace:
- assert not isinstance(node, nn.Parameter) or node in detected_parameters, (
- "DigitClassificationModel.get_loss() should not use additional parameters not used by DigitClassificationModel.run()")
-
- tracker.add_points(2) # Partial credit for passing sanity checks
-
- model.train(dataset)
-
- test_logits = model.run(nn.Constant(dataset.test_images)).data
- test_predicted = np.argmax(test_logits, axis=1)
- test_accuracy = np.mean(test_predicted == dataset.test_labels)
-
- accuracy_threshold = 0.97
- if test_accuracy >= accuracy_threshold:
- print("Your final test set accuracy is: {:%}".format(test_accuracy))
- tracker.add_points(4)
- else:
- print("Your final test set accuracy ({:%}) must be at least {:.0%} to receive full points for this question".format(test_accuracy, accuracy_threshold))
-
- @test('q4', points=7)
- def check_lang_id(tracker):
- import models
- model = models.LanguageIDModel()
- dataset = backend.LanguageIDDataset(model)
-
- detected_parameters = None
- for batch_size, word_length in ((1, 1), (2, 1), (2, 6), (4, 8)):
- start = dataset.dev_buckets[-1, 0]
- end = start + batch_size
- inp_xs, inp_y = dataset._encode(dataset.dev_x[start:end], dataset.dev_y[start:end])
- inp_xs = inp_xs[:word_length]
-
- output_node = model.run(inp_xs)
- verify_node(output_node, 'node', (batch_size, len(dataset.language_names)), "LanguageIDModel.run()")
- trace = trace_node(output_node)
- for inp_x in inp_xs:
- assert inp_x in trace, "Node returned from LanguageIDModel.run() does not depend on all of the provided inputs (xs)"
-
- # Word length 1 does not use parameters related to transferring the
- # hidden state across timesteps, so initial parameter detection is only
- # run for longer words
- if word_length > 1:
- if detected_parameters is None:
- detected_parameters = [node for node in trace if isinstance(node, nn.Parameter)]
-
- for node in trace:
- assert not isinstance(node, nn.Parameter) or node in detected_parameters, (
- "Calling LanguageIDModel.run() multiple times should always re-use the same parameters, but a new nn.Parameter object was detected")
-
- for batch_size, word_length in ((1, 1), (2, 1), (2, 6), (4, 8)):
- start = dataset.dev_buckets[-1, 0]
- end = start + batch_size
- inp_xs, inp_y = dataset._encode(dataset.dev_x[start:end], dataset.dev_y[start:end])
- inp_xs = inp_xs[:word_length]
- loss_node = model.get_loss(inp_xs, inp_y)
- trace = trace_node(loss_node)
- for inp_x in inp_xs:
- assert inp_x in trace, "Node returned from LanguageIDModel.run() does not depend on all of the provided inputs (xs)"
- assert inp_y in trace, "Node returned from LanguageIDModel.get_loss() does not depend on the provided labels (y)"
-
- for node in trace:
- assert not isinstance(node, nn.Parameter) or node in detected_parameters, (
- "LanguageIDModel.get_loss() should not use additional parameters not used by LanguageIDModel.run()")
-
- tracker.add_points(2) # Partial credit for passing sanity checks
-
- model.train(dataset)
-
- test_predicted_probs, test_predicted, test_correct = dataset._predict('test')
- test_accuracy = np.mean(test_predicted == test_correct)
- accuracy_threshold = 0.81
- if test_accuracy >= accuracy_threshold:
- print("Your final test set accuracy is: {:%}".format(test_accuracy))
- tracker.add_points(5)
- else:
- print("Your final test set accuracy ({:%}) must be at least {:.0%} to receive full points for this question".format(test_accuracy, accuracy_threshold))
-
- if __name__ == '__main__':
- main()
|