#! /usr/bin/env python # -*- coding: utf-8 -*- """ Wrapper around any-gram kernel Author: Rasoul Kaljahi See LICENSE file. """ from sklearn import svm from sklearn.multiclass import OneVsRestClassifier import pickle import numpy as np class AnyGramWrapper: ''' Wrapper class around any-gram kernel to be used by scikit learn SVC ''' def __init__(self, pAnyGram): ''' Constructor ''' self.trainXs = None self.trainYs = None self.trainAuxs = None # auxiliary training input self.testXs = None self.testYs = None self.testAuxs = None # auxiliary test input self.ag = pAnyGram self._pcTrainKernel = None # precomputed kernel from the preloaded training data self._pcTestKernel = None # precomputed kernel from the preloaded test data self._model = None def loadTrainSet(self, pXs, pYs): ''' Loads the training data The data set is provided via two lists, one containing a list of text instances (preferably tokenized) and one containing the labels matching the text instances. ''' self.trainXs = self.ag.formatData(pXs) self.trainYs = [int(y) for y in pYs] self._pcTrainKernel = None def loadTrainAux(self, pAux): ''' Loads auxiliary training input The input must be a 3D array/list of shape (#train_innstances, #max_text_length, #auxiliary_dimension ''' if self.trainXs is None or len(self.trainXs) == 0: raise Exception("Training dataset should be loaded first!") self.trainAuxs = self.formatAux(pAux) def loadTestSet(self, pXs, pYs): ''' Loads the training data The data set is provided via two lists, one containing a list of text instances (preferably tokenized) and one containing the labels matching the text instances. ''' self.testXs = self.ag.formatData(pXs) self.testYs = [int(y) for y in pYs] self._pcTestKernel = None def loadTestAux(self, pAux): ''' Loads auxiliary test input The input must be a 3D array/list of shape (#train_innstances, #max_text_length, #auxiliary_dimension ''' if self.testXs is None or len(self.testXs) == 0: raise Exception("Test dataset should be loaded first!") self.testAuxs = self.formatAux(pAux) def formatAux(self, pAux): ''' Formats the auxiliary data The input is a 3D list of shape (#train_innstances, #max_text_length, #auxiliary_dimension) ''' if isinstance(pAux, list): # padding the 2nd dimensions of list (#max_text_length) vFixedDimList = [s[:self.ag.maxTxtLen] + [[0] * len(s[0])] * (self.ag.maxTxtLen - len(s)) for s in pAux] for s in vFixedDimList: if len(s) != 80: print len(s) for t in s: if len(t) != 1: print len(t) for a in t: if not isinstance(a, int): print type(a) return np.array(vFixedDimList, dtype=np.float64) else: return pAux def loadEmbeddings(self, pWEFilename, pIsLowerCase): ''' Loads word embeddings from the given file Embeddings should be loaded before the data, because the vocabulary used for formatting data should be extracted from the word embeddings (when the word embeddings are used). pIsLowerCase specifies whether the vocabulary of the word embedding is lowercased. ''' self.ag.loadEmbeddings(pWEFilename, pIsLowerCase) def precomputeTrainKernel(self): ''' Precomputes the kernel from the training data ''' self._pcTrainKernel = self._preComputeKernel(self.trainXs, self.trainXs, self.trainAuxs, self.trainAuxs) def _getPrecomputedTrainKernel(self): ''' Returns the precomputed kernel from the training data If the kernel is not precomputed yet, it will do so first. ''' if self._pcTrainKernel is None: self.precomputeTrainKernel() return self._pcTrainKernel def precomputeTestKernel(self): ''' Precomputes the kernel from the test data ''' self._pcTestKernel = self._preComputeKernel(self.testXs, self.trainXs, self.testAuxs, self.trainAuxs) def _getPrecomputedTestKernel(self): ''' Returns the precomputed kernel from the test data If the kernel is not precomputed yet, it will do so first. ''' if self._pcTestKernel is None: self.precomputeTestKernel() return self._pcTestKernel def _preComputeKernel(self, pX1, pX2, pAux1 = None, pAux2 = None): ''' Computes and returns kernel with the given data The data should be formatted by AnyGram.formatData(). ''' if self.trainXs is None or len(self.trainXs) == 0: raise Exception("Training dataset is empty!") if pAux1 is not None and pAux2 is not None: return self.ag.computeKernel(pX1.astype(np.float64), pX2.astype(np.float64), pAux1.astype(np.float64), pAux2.astype(np.float64)) else: return self.ag.computeKernel(pX1.astype(np.float64), pX2.astype(np.float64)) def combinePrecomputedTrainKernel(self, paKernelMatrix, pCombMethod): ''' Combines the anygram kernel computed on the training set here with any given kernel matrix using the specified method The given kernel matrix should match the computed any-gram kernel matrix in shape. The combination methods supported here are: + or add: add corresponding elements in the two kernel matrices * or multiply: multiply corresponding elements in the two kernel matrices arith: arithmetic mean of the corresponding elements in the two kernel matrices geo: geometric mean ''' if self._pcTrainKernel in None: raise Exception("Kernel is not precomputed yet. Run precomputeTrainKernel() first.") self._pcTrainKernel = self._combineKernels(self._pcTrainKernel, paKernelMatrix, pCombMethod) def combinePrecomputedTestKernel(self, paKernelMatrix, pCombMethod): ''' Combines the anygram kernel computed on the test set here with any given kernel matrix using the specified method The given kernel matrix should match the computed any-gram kernel matrix in shape. The combination methods supported here are: + or add: add corresponding elements in the two kernel matrices * or multiply: multiply corresponding elements in the two kernel matrices arith: arithmetic mean of the corresponding elements in the two kernel matrices geo: geometric mean ''' if self._pcTestKernel in None: raise Exception("Kernel is not precomputed yet. Run precomputeTrainKernel() first.") self._pcTestKernel = self._combineKernels(self._pcTestKernel, paKernelMatrix, pCombMethod) def _combineKernels(self, paKernelMatrix1, paKernelMatrix2, pCombMethod): ''' Combines and returns two given kernel matrices with the givem method The given kernel matrices should have the same shapes. The combination methods supported here are: + or add: add corresponding elements in the two kernel matrices * or multiply: multiply corresponding elements in the two kernel matrices arith: arithmetic mean of the corresponding elements in the two kernel matrices geo: geometric mean ''' if paKernelMatrix1.shape != paKernelMatrix2.shape: raise Exception("The shape of the given kernel matrix is not valid: " % paKernelMatrix1.shape) if pCombMethod.lower() in ['+', "add"]: return np.add(paKernelMatrix1, paKernelMatrix1) elif pCombMethod.lower() in ['*', "multiply"]: return np.multiply(paKernelMatrix1, paKernelMatrix1) elif pCombMethod.lower().startswith("arith"): return np.add(paKernelMatrix1, paKernelMatrix2) / 2 elif pCombMethod.lower().startswith("geo"): return np.sqrt(np.multiply(paKernelMatrix1, paKernelMatrix2)) def train(self, pflgUsePrecompKernel = False, pMCMethod = None, C = 1, class_weight = None): ''' Trains and returns anygram model If pflgUsePrecompKernel is set to true, SVC will use precomputed kernel. This can save time when the data or kernel computation parameters remain the same in repeated trainings (e.g. in tunning). pMCMethod is decision_function_shape parameter of the scikit.svm.SVC and specifies the multiclass classification method. The othe parameters are those of scikit.svm.SVC. ''' if self.trainXs is None or len(self.trainXs) == 0: raise Exception("Training dataset is empty!") if pflgUsePrecompKernel: vKernel = "precomputed" X = self._getPrecomputedTrainKernel() else: vKernel = self.ag X = self.trainXs if pMCMethod is None: vSVC = svm.SVC(kernel = vKernel, C = C, class_weight = class_weight) elif pMCMethod.lower() == "ovo": vSVC = svm.SVC(kernel = vKernel, decision_function_shape = "ovo", C = C, class_weight = class_weight) elif pMCMethod.lower() in ["ova", "ovr"]: vSVC = OneVsRestClassifier(svm.SVC(kernel = vKernel, decision_function_shape = pMCMethod, C = C, class_weight = class_weight)) else: raise Exception("Unknown multiclass classification method: %s", pMCMethod) # training self._model = vSVC.fit(X, self.trainYs) @property def model(self): ''' Trained scikit SVC model ''' return self._model def saveModel(self, pFilename): ''' Saves the SVC model by pickling it to a given file ''' ## ToDo: when saving the model, the vocabulary and the embeddings (for WESS) must also be saved pickle.dump(self._model, open(pFilename, 'w')) def loadModel(self, pModelPickle): ''' Loads the pickled SVC model ''' ## ToDo: model should have been saved with a vocabulary and embedding which must also be loaded here self._model = pickle.load(open(pModelPickle)) def test(self, pTestXs = None, pTestYs = None): ''' Tests the given models on the loaded test set ''' if pTestXs is not None and pTestYs is not None: self.loadTestSet(pXs = pTestXs, pYs = pTestYs) if isinstance(self._model, OneVsRestClassifier): vKernel = self._model.estimators_[0].kernel elif isinstance(self._model, svm.SVC): vKernel = self._model.kernel # prediction if vKernel == "precomputed": vaPreds = self._model.predict(self._getPrecomputedTestKernel()) else: vaPreds = self._model.predict(self.testXs) # scoring vScore = self._score(vaPreds, self.testYs) return vaPreds, vScore def predict(self, pXs, pAux = None): ''' Predicts the labels of the given data ''' vaXs = self.ag.formatData(pXs) vaAuxs = self.formatAux(pAux) if isinstance(self._model, OneVsRestClassifier): vKernel = self._model.estimators_[0].kernel elif isinstance(self._model, svm.SVC): vKernel = self._model.kernel if vKernel == "precomputed": return self._model.predict(self._preComputeKernel(vaXs, self.trainXs, vaAuxs, self.trainAuxs)) else: return self._model.predict(vaXs) def _score(self, plPreds, plGolds): ''' Scores the given predictions ''' vCorrect = 0 for p, g in zip(plPreds, plGolds): if p == g: vCorrect += 1 return vCorrect * 1.0 / len(plPreds)