#! /usr/bin/python # -*- coding: utf-8 -*- """ Any-gram kernels Author: Rasoul Kaljahi See LICENSE file. """ from collections import Counter import numpy as np import ctypes, os import we class AnyGram: ''' Class to prepare and compute any-gram kernel ToDo: extend to multiple sentences per instance ''' def __init__(self, pMethod, pMaxTxtLen, pWESTThreshold = 0, pflgNormalizeKernel = True): ''' Constructor pMaxTxtLen is the maximum text length of the instances in terms of tokens. Longer instances will be cutoff and shorter ones will be padded. The same length should also be used at prediction time if a new AnyGram object is created. Otherwise scikit will complain. ''' self.method = pMethod # method to be used in comparing tokens: # - sm: string match # - wess: word embedding similarity score # - west: word embedding similarity threshold self._westThreshold = pWESTThreshold # threshold for WEST method self.maxTxtLen = pMaxTxtLen # Maximum text length of the instances in terms of tokens: longer instances # will be cutoff and shorter ones will be padded. self.embeddings = None self.vocabulary = None # dictionary of tokens and their indicies self._iVocabulary = None # dictionary of token indicies and their forms (inverse vocabulary) self.l = 0.4 # lambda self._normalizeKernel = pflgNormalizeKernel # whether or not normalize the kernel values def setWESTThreshold(self, pThreshold): ''' Sets the word embedding similarity threshold for WEST method The value must be between 0 and 1. ''' self._westThreshold = pThreshold @property def vocabSize(self): ''' Returns the size of the vocabulary ''' return len(self.vocabulary) @property def iVocabulary(self): ''' Returns the inverse vocabulary (dictionary of token indicies and their forms) The inverse vocabulary is created on demand. ''' if self._iVocabulary is None: self._iVocabulary = {v: k for k, v in self.vocabulary.iteritems()} # adding something for padding self._iVocabulary[0] = '_PAD_' return self._iVocabulary def __call__(self, pX, pTrainX): ''' Call method ''' return self.computeKernel(pX, pTrainX) def computeKernel(self, pX, pTrainX, pXAux = None, pTrainXAux = None): ''' Computes and returns the kernel values pXAux and pTrainXAux are the auxiliary matirx input. The should have the same number of rows as pX and pTrainX respectively, but they can have a custom number of columns (equal for both though). At the moment these can only be used with a precomputed kernel. ''' if self.method.lower() == "sm": return self._computeKernelSM(pX, pTrainX, pXAux, pTrainXAux) elif self.method.lower() == "west": return self._computeKernelWEST(pX, pTrainX, pXAux, pTrainXAux) elif self.method.lower() == "wess": return self._computeKernelWESS(pX, pTrainX, pXAux, pTrainXAux) def _computeKernelSM(self, pX, pTrainX, pXAux = None, pTrainXAux = None): ''' Computes and returns the kernel using string match method pXAux and pTrainXAux are the auxiliary matirx input. The should have the same number of rows as pX and pTrainX respectively, but they can have a custom number of columns (equal for both though). At the moment these can only be used with a precomputed kernel. For every pair of tokens, every pair of peer elements in the auxiliary input will be compared against each other. ''' if pXAux is None: pXAux = np.empty(shape = (0, 0, 0)) pTrainXAux = np.empty(shape=(0, 0, 0)) vLib = ctypes.cdll.LoadLibrary(os.path.dirname(os.path.realpath(__file__)) + "/agk.so") vLib.compKernelMatrixSM.argtypes = [np.ctypeslib.ndpointer(dtype=np.float64, ndim=1, flags="C_CONTIGUOUS"), ctypes.c_int, ctypes.c_int, np.ctypeslib.ndpointer(dtype=np.float64, ndim=1, flags="C_CONTIGUOUS"), ctypes.c_int, ctypes.c_int, np.ctypeslib.ndpointer(dtype=np.float64, ndim=1, flags="C_CONTIGUOUS"), np.ctypeslib.ndpointer(dtype=np.float64, ndim=1, flags="C_CONTIGUOUS"), ctypes.c_int, ctypes.c_int, ctypes.c_int] vLib.compKernelMatrixSM.restype = np.ctypeslib.ndpointer(dtype=np.float64, shape=(pX.shape[0], pTrainX.shape[0]), flags="C_CONTIGUOUS") if id(pX) == id(pTrainX): vPhase = 0 else: vPhase = 1 vK = vLib.compKernelMatrixSM(pX.flatten(), pX.shape[0], pX.shape[1], pTrainX.flatten(), pTrainX.shape[0], pTrainX.shape[1], pXAux.flatten(), pTrainXAux.flatten(), pXAux.shape[2], vPhase, self._normalizeKernel) return vK def _computeKernelWEST(self, pX, pTrainX, pXAux = None, pTrainXAux = None): ''' Computes and returns the kernel using word embedding similarity threshold method pXAux and pTrainXAux are the auxiliary matirx input. The should have the same number of rows as pX and pTrainX respectively, but they can have a custom number of columns (equal for both though). At the moment these can only be used with a precomputed kernel. For every token, the auxiliary input will be concatenated to the end of its embedding vector. ''' if pXAux is None: pXAux = np.empty(shape = (0, 0, 0)) pTrainXAux = np.empty(shape=(0, 0, 0)) vLib = ctypes.cdll.LoadLibrary(os.path.dirname(os.path.realpath(__file__)) + "/agk.so") vLib.compKernelMatrixWEST.argtypes = [np.ctypeslib.ndpointer(dtype=np.float64, ndim=1, flags="C_CONTIGUOUS"), ctypes.c_int, ctypes.c_int, np.ctypeslib.ndpointer(dtype=np.float64, ndim=1, flags="C_CONTIGUOUS"), ctypes.c_int, ctypes.c_int, np.ctypeslib.ndpointer(dtype=np.float64, ndim=1, flags="C_CONTIGUOUS"), np.ctypeslib.ndpointer(dtype=np.float64, ndim=1, flags="C_CONTIGUOUS"), ctypes.c_int, np.ctypeslib.ndpointer(dtype=np.float64, ndim=1, flags="C_CONTIGUOUS"), ctypes.c_int, ctypes.c_int, ctypes.c_double, ctypes.c_int, ctypes.c_int] vLib.compKernelMatrixWEST.restype = np.ctypeslib.ndpointer(dtype=np.float64, shape=(pX.shape[0], pTrainX.shape[0]), flags="C_CONTIGUOUS") if id(pX) == id(pTrainX): vPhase = 0 else: vPhase = 1 vK = vLib.compKernelMatrixWEST(pX.flatten(), pX.shape[0], pX.shape[1], pTrainX.flatten(), pTrainX.shape[0], pTrainX.shape[1], pXAux.flatten(), pTrainXAux.flatten(), pXAux.shape[2], self.embeddings.flatten(), self.embeddings.shape[0], self.embeddings.shape[1], ctypes.c_double(self._westThreshold), vPhase, self._normalizeKernel) return vK def _computeKernelWESS(self, pX, pTrainX, pXAux = None, pTrainXAux = None): ''' Computes and returns the kernel using word embedding similarity score method pXAux and pTrainXAux are the auxiliary matirx input. The should have the same number of rows as pX and pTrainX respectively, but they can have a custom number of columns (equal for both though). At the moment these can only be used with a precomputed kernel. For every token, the auxiliary input will be concatenated to the end of its embedding vector. ''' if pXAux is None: pXAux = np.empty(shape = (0, 0, 0)) pTrainXAux = np.empty(shape=(0, 0, 0)) vLib = ctypes.cdll.LoadLibrary(os.path.dirname(os.path.realpath(__file__)) + "/agk.so") vLib.compKernelMatrixWESS.argtypes = [np.ctypeslib.ndpointer(dtype=np.float64, ndim=1, flags="C_CONTIGUOUS"), ctypes.c_int, ctypes.c_int, np.ctypeslib.ndpointer(dtype=np.float64, ndim=1, flags="C_CONTIGUOUS"), ctypes.c_int, ctypes.c_int, np.ctypeslib.ndpointer(dtype=np.float64, ndim=1, flags="C_CONTIGUOUS"), np.ctypeslib.ndpointer(dtype=np.float64, ndim=1, flags="C_CONTIGUOUS"), ctypes.c_int, np.ctypeslib.ndpointer(dtype=np.float64, ndim=1, flags="C_CONTIGUOUS"), ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.c_int] vLib.compKernelMatrixWESS.restype = np.ctypeslib.ndpointer(dtype=np.float64, shape=(pX.shape[0], pTrainX.shape[0]), flags="C_CONTIGUOUS") if id(pX) == id(pTrainX): vPhase = 0 else: vPhase = 1 vK = vLib.compKernelMatrixWESS(pX.flatten(), pX.shape[0], pX.shape[1], pTrainX.flatten(), pTrainX.shape[0], pTrainX.shape[1], pXAux.flatten(), pTrainXAux.flatten(), pXAux.shape[2], self.embeddings.flatten(), self.embeddings.shape[0], self.embeddings.shape[1], vPhase, self._normalizeKernel) return vK def loadVocabulary(self, pdVocabulary): ''' Loads vocabulary to be used for formatting the input data to learning algorithm The vocabulary should be a dictionary of tokens and their indices. ''' self.vocabulary = dict(pdVocabulary) def loadEmbeddings(self, pWEFilename, pIsLowerCase): ''' Loads word embeddings from the given file Word embeddings should be loaded before formatting data, because the vocabulary used for formatting data should be extracted from the word embeddings (when the word embeddings are used). pIsLowerCase specifies whether the vocabulary of the word embedding is lowercased. ''' # loading embeddings vWE = we.WordEmbedding() vWE.loadEmbeddings(pWVFilename=pWEFilename, pIsLowerCase=pIsLowerCase) # genertaing vocabulary with embeddings' vocabulary: 0 need to be reserved for padding self.vocabulary = {k: v + 1 for k, v in vWE.wordIDs.iteritems()} # adding the 0 vector (for padding index) at the beginning self.embeddings = np.concatenate((np.zeros((1, vWE.dimension)), vWE.embeddings)) def _generateVocabulary(self, pllCorpus): ''' Extracts the vocabulary from the input corpus and stores it in a dictionary of tokens and their indices Token frequencies are used as order in generating the indices. 0 is reserved for padding the input when formatting it for the learning algorithm, so the starting index is 1. ''' # 0 is resereved for padding vStartingIdx = 1 # flattening corpus vlTokens = [t for s in pllCorpus for t in s] # computing frequencies vdTokenFreqs = Counter(vlTokens) # sorting based on token frequency vlSortedTokens = [t for t, f in sorted([(t, f) for (t, f) in vdTokenFreqs.iteritems()], key=lambda x: x[1], reverse=True)] # creating token-index map self.vocabulary = {} # filling the vocabulary for idx, vToken in enumerate(vlSortedTokens, start=vStartingIdx): self.vocabulary[vToken] = idx def formatData(self, pCorpus, pflgLowercase = False): ''' Converts the format of the input corpus from list of texts into arrays of token indices to be used as input to the learning algorithm It needs a vocabulary mapping tokens to indicies for conversion. If a vocabulary is already loaded, it will be used. Otherwsie, the vocabulary will be generated from the corpus. Use this function before feeding the input to the learning algorithm. For example, with scikit SVC, given X as a list of tokenized sentences (e.g. ["This is sentence 1 .", "This is sentence 2 ."]), and Y the labels: any_gram = AnyGram() X = any_gram.formatData(X) clf = svm.SVC(kernel = any_gram) clf.fit(X, Y) If word embedding vectors are used, they should be loaded before formatting the data, to create the vocabulary based on the word indices of the word embeddings. Otherwise, a vocabulary will be created by this method and when loading embeddings afterwards, the indices will be correct. If a token is not in the vocabulary, it will be added to its end. This only happens in case of using word embeddings. Since the output is a 2D array, the sentences are padded to the length of the longest sentence. 0 is used as the padding index. ''' # spliting the input sentences into tokens if it is not already (NOTE: the input is supposed to be tokenized) if pflgLowercase: vllCorpus = [[t.lower() for t in s.split()] for s in pCorpus] else: vllCorpus = [[t for t in s.split()] for s in pCorpus] if self.vocabulary is None: self._generateVocabulary(vllCorpus) # finding the longest sentence # vMaxLen = max([len(s) for s in vllCorpus]) vaOutput = np.zeros((len(pCorpus), self.maxTxtLen), dtype=np.int32) for i in range(vaOutput.shape[0]): for j in range(min(len(vllCorpus[i]), self.maxTxtLen)): if vllCorpus[i][j] not in self.vocabulary: self.vocabulary[vllCorpus[i][j]] = self.vocabSize + 1 vaOutput[i][j] = self.vocabulary[vllCorpus[i][j]] return vaOutput