#! /usr/bin/python # -*- coding: utf-8 -*- """ Word embedding vectors Author: Rasoul Kaljahi See LICENSE file. """ import sys import zipfile, collections import numpy as np class WordEmbedding: ''' Class for managing word embeddings ''' def __init__(self, pVerbosity = 1): ''' Constructor ''' # verbosity of processes self.verbosity = pVerbosity self.embeddings = None # numpy array containing final word embeddings self.normalized = None # whether the embeddings are normalized self.wordIDs = None # vocab words and their IDs (indexes) self.IDWords = None # vocab IDs and their words # training data attributes self.corpus = None # original corpus as list of sentences self.wordCounts = None # vocab words and their counts self.data = None # the data: list of word IDs (not word forms) in their # original order in the corpus self._lowercase = False # unknown token embedidng vector self._unknown = None @property def dimension(self): ''' Returns the dimension of the loaded vectors ''' if self.embeddings is None: return 0 else: return len(self.embeddings[0]) @property def vocabSize(self): ''' Returns the vocabulary size of the embedding vectors ''' if self.wordIDs is None: return 0 else: return len(self.wordIDs) def loadData(self, pCorpusFilename, pVocabSize, pflgKeepOrgCorpus=False, pflgKeepVocabCounts=False): ''' Loads data from an input corpus The original corpus can be optionally stored for later uses. The default is to delete it after loading required data. ''' # reading data if zipfile.is_zipfile(pCorpusFilename): with zipfile.ZipFile(pCorpusFilename) as f: vlCorpusLines = f.read(f.namelist()[0]).strip().split('\n') else: vlCorpusLines = open(pCorpusFilename).read().strip().split('\n') vlWords = [w for l in vlCorpusLines for w in l.split()] # loading data if pflgKeepVocabCounts: self.wordCounts = {"UNK": 0} # initializing vocab words/counts by UNK; to be counted later self.wordIDs = {"UNK": 0} # initializing vocal words/IDs by UNK self.IDWords = {0: "UNK"} # initializing vocab IDs/words by UNK self.data = [] # getting words IDs based on word frequency order (only most common words specified by vocab size) for i, (word, count) in enumerate(collections.Counter(vlWords).most_common(pVocabSize - 1), start=1): if pflgKeepVocabCounts: self.wordCounts[word] = count self.wordIDs[word] = i for word in vlWords: if word in self.wordIDs: id = self.wordIDs[word] self.IDWords[id] = word else: id = 0 if pflgKeepVocabCounts: self.wordCounts["UNK"] += 1 self.data.append(id) if self.verbosity > 0: print("Data size:") print("\t%d (%dM) lines" % (len(vlCorpusLines), len(vlCorpusLines) / 1000000)) print("\t%d (%dM) words\n" % (len(vlWords), len(vlWords) / 1000000)) if self.verbosity > 1: if pflgKeepVocabCounts: print('Most common words:\n\t%s\n' % '\n\t'.join(["%s: %d" % (w, c) for w, c in sorted(self.wordCounts.iteritems(), key=lambda x: x[1], reverse=True)[:10]])) print( 'Sample word indexes:\n\t%s\n' % '\n\t'.join( ["%s: %s" % (w, self.wordIDs[w]) for w in self.wordIDs][:10])) print('Sample index words:\n\t%s\n' % '\n\t'.join( ["%s: %s" % (id, self.IDWords[id]) for id in self.IDWords][:10])) print( 'Sample data:\n\t%s\n\n\t%s\n' % ( self.data[:53], ' '.join([sefl.IDWords[id] for id in self.data[:53]]))) # deleting the corpus if asked and the word list if not pflgKeepOrgCorpus: self.corpus = vlCorpusLines del vlCorpusLines del vlWords def getWordFreq(self, pWord): ''' Returns the frequency of a given word in the vocabulary ''' if self.wordCounts is None: print( "Word counts are not stored or no data is loaded. Data should be loaded with pflgKeepVocabCount parameter set to true.") elif pWord in self.wordCounts: return self.wordCounts[pWord] else: print('Word "%s" not found' % pWord) def loadEmbeddings(self, pWVFilename, pIsLowerCase=False, plFilterVocab=None, pflgCaseSensitiveFilter=True, pflgNormalized=False, pUnknownToken = None): ''' Loads pre-trained word embeddings from file in general format In the general format, each word is represented in a line which contains the word and the vector all separated by space/tab. pIsLowerCase determines the case of the embeddings' vocabulary. Optionally, a vocabulary list can be provided to filter the vectors being loaded into the memory. This can reduce the memory usage in scenarios where the list of words in use are known in advance. If the word vectors are lowercase, the filter word lookup will be case insensitive. Otherwise, another argument is used to determine the case-sensitivity of the lookup, the default value of which is true, i.e. case sensitive. The loaded vectors may or may not be already normalized. For example, Google News pre-trained vectors are not normalized. When the vectors are normalized, cosine similarity can be camputed simply using dot product without the need to normalization by the product of vector norms. However, for normalized vectors, this normalization will have no effect. This information is passed through pflgNormalized and is needed for computing similarities. ''' self._lowercase = pIsLowerCase # filter vocabulary case if plFilterVocab is not None and (self._lowercase or not pflgCaseSensitiveFilter): vlFilterVocab = [w.lower() for w in plFilterVocab] else: vlFilterVocab = plFilterVocab self.embeddings = [] self.wordIDs = {} self.IDWords = {} vWordID = 0 for i, vLine in enumerate(open(pWVFilename), start=1): vlLSplit = vLine.split() # vector word case if not pflgCaseSensitiveFilter and not self._lowercase: vWord = vlLSplit[0].lower() else: vWord = vlLSplit[0] if vlFilterVocab is None or vWord in vlFilterVocab: self.embeddings.append(np.array([float(n) for n in vlLSplit[1:]])) self.wordIDs[vlLSplit[0]] = vWordID self.IDWords[vWordID] = vlLSplit[0] vWordID += 1 if pUnknownToken is not None and vWord == pUnknownToken: self._unknown = np.array([float(n) for n in vlLSplit[1:]]) if i % 1000 == 0: sys.stdout.write('.') sys.stdout.flush() self.embeddings = np.array(self.embeddings) self.normalized = pflgNormalized sys.stdout.write('\n') def loadGloVeVectors(self, pWVFilename, pIsLowerCase=False, plFilterVocab=None, pflgCaseSensitiveFilter=True, pflgNormalized=False, pUnknownToken = None): ''' Loads word vectors from GloVe word embedding file Both .zip and text files are accepted. See load() for parameters. ''' if pWVFilename[-3:].lower() == "zip": self._loadGloVeVectorsZip(pWVFilename, pIsLowerCase, plFilterVocab, pflgCaseSensitiveFilter, pflgNormalized, pUnknownToken) else: self.loadEmbeddings(pWVFilename, pIsLowerCase, plFilterVocab, pflgCaseSensitiveFilter, pflgNormalized, pUnknownToken) def _loadGloVeVectorsZip(self, pWVZipFilename, pIsLowerCase=False, plFilterVocab=None, pflgCaseSensitiveFilter=True, pflgNormalized=False, pUnknownToken = None): ''' Loads word vectors from GloVe zip file ''' self._lowercase = pIsLowerCase import zipfile as zp vZipFile = zp.ZipFile(pWVZipFilename) # filter vocabulary case if plFilterVocab is not None and (self._lowercase or not pflgCaseSensitiveFilter): vlFilterVocab = [w.lower() for w in plFilterVocab] else: vlFilterVocab = plFilterVocab self.embeddings = [] self.wordIDs = {} self.IDWords = {} vWordID = 0 for i, vLine in enumerate(vZipFile.open(vZipFile.namelist()[0]), start=1): vlLSplit = vLine.split() # vector word case if not pflgCaseSensitiveFilter and not self._lowercase: vWord = vlLSplit[0].lower() else: vWord = vlLSplit[0] if vlFilterVocab is None or vlLSplit[0] in vlFilterVocab: self.embeddings.append(np.array([float(n) for n in vlLSplit[1:]])) self.wordIDs[vlLSplit[0]] = vWordID self.IDWords[vWordID] = vlLSplit[0] vWordID += 1 if pUnknownToken is not None and vWord == pUnknownToken: self._unknown = np.array([float(n) for n in vlLSplit[1:]]) if i % 1000 == 0: sys.stdout.write('.') sys.stdout.flush() self.embeddings = np.array(self.embeddings) self.normalized = pflgNormalized sys.stdout.write('\n') def loadW2VBinVectors(self, pWVBinFilename, pIsLowerCase=False, plFilterVocab=None, pflgCaseSensitiveFilter=True, pflgNormalized=False, pUnknownToken = None): ''' Loads word vectors from word2vec file in binary format The method uses gensim to load the vectors. Parameters are the same as in load() method. pUnknownToken specifies the token in the input embedding vectors which represents unknown tokens (e.g. UNK) if there is one. ''' import gensim self._lowercase = pIsLowerCase # filter vocabulary case if plFilterVocab is not None and (self._lowercase or not pflgCaseSensitiveFilter): vlFilterVocab = [w.lower() for w in plFilterVocab] else: vlFilterVocab = plFilterVocab # loading word vectors using gensim vGensimModel = gensim.models.Word2Vec.load_word2vec_format(pWVBinFilename, binary=True) self.embeddings = [] self.wordIDs = {} self.IDWords = {} vWordID = 0 if vlFilterVocab is not None: for vWord in vlFilterVocab: try: self.embeddings.append(vGensimModel[vWord]) self.wordIDs[vWord] = vWordID self.IDWords[vWordID] = vWord vWordID += 1 except KeyError: continue sys.stdout.write('.') sys.stdout.flush() else: sys.stdout.write('Reading the entire vocabulary...') sys.stdout.flush() for vWord in vGensimModel.vocab: self.embeddings.append(vGensimModel[vWord]) self.wordIDs[vWord] = vWordID self.IDWords[vWordID] = vWord vWordID += 1 sys.stdout.write(' done.') sys.stdout.flush() self.embeddings = np.array(self.embeddings) if pUnknownToken is not None: self._unknown = vGensimModel[pUnknownToken] self.normalized = pflgNormalized sys.stdout.write('\n') def loadW2VTxtVectors(self, pWVTxtFilename, pIsLowerCase=False, plFilterVocab=None, pflgCaseSensitiveFilter=True, pflgNormalized=False, pUnknownToken = None): ''' Loads word vectors from word2vec file in text format ''' self._lowercase = pIsLowerCase # filter vocabulary case if plFilterVocab is not None and (self._lowercase or not pflgCaseSensitiveFilter): vlFilterVocab = [w.lower() for w in plFilterVocab] else: vlFilterVocab = plFilterVocab vfWV = open(pWVTxtFilename) # skipping the header vfWV.readline() # loading word vectors self.embeddings = [] self.wordIDs = {} self.IDWords = {} vWordID = 0 for i, vLine in enumerate(vfWV, start=1): vlLSplit = vLine.split() # vector word case if not pflgCaseSensitiveFilter and not self._lowercase: vWord = vlLSplit[0].lower() else: vWord = vlLSplit[0] if vlFilterVocab is None or vWord in vlFilterVocab: self.embeddings.append(np.array([float(n) for n in vlLSplit[1:]])) self.wordIDs[vlLSplit[0]] = vWordID self.IDWords[vWordID] = vlLSplit[0] if pUnknownToken is not None and vWord == pUnknownToken: self._unknown = np.array([float(n) for n in vlLSplit[1:]]) if i % 1000 == 0: sys.stdout.write('.') sys.stdout.flush() self.embeddings = np.array(self.embeddings) self.normalized = pflgNormalized sys.stdout.write('\n') def normalizeEmbeddings(self): ''' Normalizes embeddings ''' self.embeddings = self.embeddings / np.sqrt(np.sum((np.square(self.embeddings)), axis=1, keepdims=True)) self.normalized = True def getVector(self, pWord, pUnknown = "empty"): ''' Returns the embedding vector of the given word if exists and None otherwise pUknown specifies what should be returned in case the given word in not found. It can be one of hte following: - empty: an empty list is returned - zero: an array of zeros is returned - unknown: the vector for the unknown word is returned 9see self._unknown ''' try: if self._lowercase: return self.embeddings[self.wordIDs[pWord.lower()]] else: return self.embeddings[self.wordIDs[pWord]] except KeyError: if pUnknown.lower() == "empty": return [] elif pUnknown.lower() == "zero": return np.zeros(self.dimension) elif pUnknown.lower() == "unknown": return self.unknown def getWordEmbedding(self, pWord): ''' Returns the embedding vector of the given word if exists and None otherwise ''' return self.getVector(pWord) @property def unknown(self): ''' Returns the embedding vector of unknown words ''' return self._unknown def getAvgVector(self, plWords): ''' Returns the average of the vectors of the given words Unknown words will be ignored and not included in the averaging. If all words are unknown, a vector of zeros will be returned. ''' vlSum = np.zeros(self.dimension) for vWord in plWords: try: if self._lowercase: vlSum = np.add(vlSum, self.embeddings[self.wordIDs[pWord]]) else: vlSum = np.add(vlSum, self.embeddings[self.wordIDs[pWord.lower()]]) except KeyError: continue return vlSum def calcSimMatrix(self, plWords=None, pSimMeasure="cosine", pflgReturn=False): ''' Calculates the similarities between every pair of words from the given list of words or the entire vocabulary using the specified similarity measure It prints the results but can also optionally return them in a matrix, which is implemented using a dictionary: {"word1": {"word2": 0.02, "word3": 0.91}, "word2": {"word1": 0.02, "word3": 0.59}, "word3": {"word1": 0.91, "word2": 0.91}} For access efficiency, duplicates are allowed, but there are no entries for the similarity of a word with itself. If no word list is given, the entire vocabulary of the loaded vectors will be used. ''' ########## change to matrix operations for efficieny if plWords is None: vlWords = self.wordIDs else: vlWords = list(set(plWords)) vdSimMatrix = {} for vWord1 in vlWords: vdSimMatrix[vWord1] = {} for vWord2 in vlWords: if vWord1 != vWord2: if pSimMeasure.lower().startswith("cos"): vSim = self.calcCosSim(vWord1, vWord2) elif pSimMeasure.lower().startswith("euc"): vSim = self.calcEuclideanSim(vWord1, vWord2) else: raise Exception("Unknown similarity measure: %s" % pSimMeasure) print "%s\t%s\t%s" % (vWord1, vWord2, vSim) if pflgReturn: vdSimMatrix[vWord1][vWord2] = vSim if pflgReturn: return vdSimMatrix def extractSimilarWords(self, pWord, pSimWordNum): ''' Extracts and returns similar words to a given word It returns pSimWordNum number of similar words and only if the given word exists in the vocabulary. ''' vaWV = self.getVector(pWord) if len(vaWV) == 0: print "Word %s not found" % pWord else: if not self.normalized: print "Embedding vectors are not normalized. Use normalizeEmbeddings() to normalize them first." return vaSimilarities = np.dot(vaWV, np.transpose(self.embeddings)) vlNearstIDs = (-vaSimilarities[:]).argsort()[1: pSimWordNum + 1] return [(self.IDWords[id], vaSimilarities[id]) for id in vlNearstIDs] def calcCosSim(self, pWord1, pWord2): ''' Calculates and returns the cosine similarity of the given words ''' vlWV1 = self.getVector(pWord1) vlWV2 = self.getVector(pWord2) if len(vlWV1) == 0: raise Exception("Word %s not found" % pWord1) if len(vlWV2) == 0: raise Exception("Word %s not found" % pWord2) if self.normalized: return np.dot(vlWV1, vlWV2) else: return np.dot(vlWV1, vlWV2) / (np.linalg.norm(vlWV1) * np.linalg.norm(vlWV2)) def calcEuclideanSim(self, pWord1, pWord2): ''' Calculates and returns the euclidean similarity of the given words Euclidean similarity is a function of Euclidean distance calculated as: ES = 1 / (1 + ED) ''' return 1.0 / (1 + self.calcEuclideanDist(pWord1, pWord2)) def calcEuclideanDist(self, pWord1, pWord2): ''' Calculates and returns the euclidean distance of the given words ''' vlWV1 = self.getVector(pWord1) vlWV2 = self.getVector(pWord2) if len(vlWV1) == 0: raise Exception("Word %s not found" % pWord1) if len(vlWV2) == 0: raise Exception("Word %s not found" % pWord2) return np.linalg.norm(vlWV1 - vlWV2) def scaleEmbeddings(self, pRange): ''' Scales embedding values (vector elements) into the given range ''' vMin = np.min(self.embeddings) vMax = np.max(self.embeddings) self.embeddings = pRange[0] + (self.embeddings - vMin) * (pRange[1] - pRange[0]) * 1.0 / (vMax - vMin)