import numpy as np import os def mnist(dataset='mnist.pkl.gz', onehot=True): import six.moves.cPickle as pickle import gzip # Download the MNIST dataset if it is not present data_dir, data_file = os.path.split(dataset) if data_dir == "" and not os.path.isfile(dataset): # Check if dataset is in the data directory. new_path = os.path.join( os.path.split(__file__)[0], dataset ) if os.path.isfile(new_path) or data_file == 'mnist.pkl.gz': dataset = new_path if (not os.path.isfile(dataset)) and data_file == 'mnist.pkl.gz': from six.moves import urllib origin = ( 'http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz' ) print('Downloading data from %s' % origin) urllib.request.urlretrieve(origin, dataset) # Load the dataset with gzip.open(dataset, 'rb') as f: try: train_set, valid_set, test_set = pickle.load(f, encoding='latin1') except: train_set, valid_set, test_set = pickle.load(f) # train_set, valid_set, test_set format: tuple(input, target) # input is a numpy.ndarray of 2 dimensions (a matrix), np.float32 # where each row corresponds to an example. target is a # numpy.ndarray of 1 dimension (vector), np.int64 that has the same length # as the number of rows in the input. It should give the target # to the example with the same index in the input. if onehot: train_set = (train_set[0], convert_to_one_hot(train_set[1], 10)) valid_set = (valid_set[0], convert_to_one_hot(valid_set[1], 10)) test_set = (test_set[0], convert_to_one_hot(test_set[1], 10)) return train_set, valid_set, test_set def cifar10(directory='CIFAR_10', onehot=True): import six.moves.cPickle as pickle file_lists = [os.path.join(directory, 'cifar-10-batches-py', 'data_batch_%d' % i) for i in range(1, 6)] +\ [os.path.join(directory, 'cifar-10-batches-py', 'test_batch')] if not all([os.path.exists(fl) for fl in file_lists]): from tqdm import tqdm from six.moves import urllib import tarfile filename = "cifar-10-python.tar.gz" if not os.path.exists(filename): def gen_bar_updater(): pbar = tqdm(total=None) def bar_update(count, block_size, total_size): if pbar.total is None and total_size: pbar.total = total_size progress_bytes = count * block_size pbar.update(progress_bytes - pbar.n) return bar_update print('Downloading CIFAR 10 dataset...') url = "https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz" urllib.request.urlretrieve( url, filename, reporthook=gen_bar_updater()) with tarfile.open(filename, 'r:gz') as tar: tar.extractall(path=directory) images, labels = [], [] for filename in file_lists[:5]: with open(filename, 'rb') as fo: cifar10 = pickle.load(fo, encoding='latin1') for i in range(len(cifar10["labels"])): image = cifar10["data"][i] image = image.astype(float) images.append(image) labels += cifar10["labels"] images = np.array(images, dtype='float') labels = np.array(labels, dtype='int') train_images, train_labels = images, labels images, labels = [], [] for filename in file_lists[5:]: with open(filename, 'rb') as fo: cifar10 = pickle.load(fo, encoding='latin1') for i in range(len(cifar10["labels"])): image = cifar10["data"][i] image = image.astype(float) images.append(image) labels += cifar10["labels"] images = np.array(images, dtype='float') labels = np.array(labels, dtype='int') test_images, test_labels = images, labels if onehot: train_labels = convert_to_one_hot(train_labels, 10) test_labels = convert_to_one_hot(test_labels, 10) return train_images, train_labels, test_images, test_labels def cifar100(directory='CIFAR_100', onehot=True): import six.moves.cPickle as pickle file_lists = [os.path.join(directory, 'cifar-100-python', 'train'), os.path.join(directory, 'cifar-100-python', 'test')] if not all([os.path.exists(fl) for fl in file_lists]): from tqdm import tqdm from six.moves import urllib import tarfile filename = "cifar-100-python.tar.gz" if not os.path.exists(filename): def gen_bar_updater(): pbar = tqdm(total=None) def bar_update(count, block_size, total_size): if pbar.total is None and total_size: pbar.total = total_size progress_bytes = count * block_size pbar.update(progress_bytes - pbar.n) return bar_update print('Downloading CIFAR 100 dataset...') url = "https://www.cs.toronto.edu/~kriz/cifar-100-python.tar.gz" urllib.request.urlretrieve( url, filename, reporthook=gen_bar_updater()) with tarfile.open(filename, 'r:gz') as tar: tar.extractall(path=directory) with open(file_lists[0], 'rb') as input_file: train_file = pickle.load(input_file, encoding='latin1') train_images = train_file['data'] train_labels = train_file['fine_labels'] train_images = np.array(train_images, dtype='float').reshape( train_images.shape[0], 3, 32, 32) train_labels = np.array(train_labels, dtype='int') with open(file_lists[1], 'rb') as input_file: test_file = pickle.load(input_file, encoding='latin1') test_images = test_file['data'] test_labels = test_file['fine_labels'] test_images = np.array(test_images, dtype='float').reshape( test_images.shape[0], 3, 32, 32) test_labels = np.array(test_labels, dtype='int') if onehot: train_labels = convert_to_one_hot(train_labels, 100) test_labels = convert_to_one_hot(test_labels, 100) return train_images, train_labels, test_images, test_labels def normalize_cifar(num_class=10, onehot=True): if num_class == 10: x_train, y_train, x_test, y_test = cifar10(onehot=onehot) elif num_class == 100: x_train, y_train, x_test, y_test = cifar100(onehot=onehot) else: raise NotImplementedError x_train = x_train.reshape((-1, 3, 32, 32)) x_test = x_test.reshape((-1, 3, 32, 32)) x_train = x_train.astype('float32') x_test = x_test.astype('float32') x_train[:, 0, :, :] = ( x_train[:, 0, :, :] - np.mean(x_train[:, 0, :, :])) / np.std(x_train[:, 0, :, :]) x_train[:, 1, :, :] = ( x_train[:, 1, :, :] - np.mean(x_train[:, 1, :, :])) / np.std(x_train[:, 1, :, :]) x_train[:, 2, :, :] = ( x_train[:, 2, :, :] - np.mean(x_train[:, 2, :, :])) / np.std(x_train[:, 2, :, :]) x_test[:, 0, :, :] = ( x_test[:, 0, :, :] - np.mean(x_test[:, 0, :, :])) / np.std(x_test[:, 0, :, :]) x_test[:, 1, :, :] = ( x_test[:, 1, :, :] - np.mean(x_test[:, 1, :, :])) / np.std(x_test[:, 1, :, :]) x_test[:, 2, :, :] = ( x_test[:, 2, :, :] - np.mean(x_test[:, 2, :, :])) / np.std(x_test[:, 2, :, :]) return x_train, y_train, x_test, y_test def tf_normalize_cifar(num_class=10, onehot=True): if num_class == 10: x_train, y_train, x_test, y_test = cifar10(onehot=onehot) elif num_class == 100: x_train, y_train, x_test, y_test = cifar100(onehot=onehot) else: raise NotImplementedError x_train = x_train.reshape((-1, 3, 32, 32)) x_test = x_test.reshape((-1, 3, 32, 32)) x_train = x_train.transpose([0, 2, 3, 1]).astype('float32') x_test = x_test.transpose([0, 2, 3, 1]).astype('float32') x_train[:, :, :, 0] = ( x_train[:, :, :, 0] - np.mean(x_train[:, :, :, 0])) / np.std(x_train[:, :, :, 0]) x_train[:, :, :, 1] = ( x_train[:, :, :, 1] - np.mean(x_train[:, :, :, 1])) / np.std(x_train[:, :, :, 1]) x_train[:, :, :, 2] = ( x_train[:, :, :, 2] - np.mean(x_train[:, :, :, 2])) / np.std(x_train[:, :, :, 2]) x_test[:, :, :, 0] = ( x_test[:, :, :, 0] - np.mean(x_test[:, :, :, 0])) / np.std(x_test[:, :, :, 0]) x_test[:, :, :, 1] = ( x_test[:, :, :, 1] - np.mean(x_test[:, :, :, 1])) / np.std(x_test[:, :, :, 1]) x_test[:, :, :, 2] = ( x_test[:, :, :, 2] - np.mean(x_test[:, :, :, 2])) / np.std(x_test[:, :, :, 2]) return x_train, y_train, x_test, y_test def convert_to_one_hot(vals, max_val=0): """Helper method to convert label array to one-hot array.""" if max_val == 0: max_val = vals.max() + 1 one_hot_vals = np.zeros((vals.size, max_val)) one_hot_vals[np.arange(vals.size), vals] = 1 return one_hot_vals ######################## # Not in use currently # ######################## def data_augmentation(images, mode='train', flip=False, crop=False, crop_shape=(24, 24, 3), whiten=False, noise=False, noise_mean=0, noise_std=0.01): if crop: if mode == 'train': images = self._image_crop(images, shape=crop_shape) elif mode == 'test': images = self._image_crop_test(images, shape=crop_shape) if flip: images = self._image_flip(images) if whiten: images = self._image_whitening(images) if noise: images = self._image_noise(images, mean=noise_mean, std=noise_std) return images def _image_crop(images, shape): new_images = [] for i in range(images.shape[0]): old_image = images[i, :, :, :] old_image = numpy.pad(old_image, [[4, 4], [4, 4], [0, 0]], 'constant') left = numpy.random.randint(old_image.shape[0] - shape[0] + 1) top = numpy.random.randint(old_image.shape[1] - shape[1] + 1) new_image = old_image[left: left+shape[0], top: top+shape[1], :] new_images.append(new_image) return numpy.array(new_images) def _image_crop_test(images, shape): new_images = [] for i in range(images.shape[0]): old_image = images[i, :, :, :] old_image = numpy.pad(old_image, [[4, 4], [4, 4], [0, 0]], 'constant') left = int((old_image.shape[0] - shape[0]) / 2) top = int((old_image.shape[1] - shape[1]) / 2) new_image = old_image[left: left+shape[0], top: top+shape[1], :] new_images.append(new_image) return numpy.array(new_images) def _image_flip(images): for i in range(images.shape[0]): old_image = images[i, :, :, :] if numpy.random.random() < 0.5: new_image = cv2.flip(old_image, 1) else: new_image = old_image images[i, :, :, :] = new_image return images def _image_whitening(images): for i in range(images.shape[0]): old_image = images[i, :, :, :] new_image = (old_image - numpy.mean(old_image)) / numpy.std(old_image) images[i, :, :, :] = new_image return images def _image_noise(images, mean=0, std=0.01): for i in range(images.shape[0]): old_image = images[i, :, :, :] new_image = old_image for i in range(image.shape[0]): for j in range(image.shape[1]): for k in range(image.shape[2]): new_image[i, j, k] += random.gauss(mean, std) images[i, :, :, :] = new_image return images