Commit fdb5107f authored by Saswat's avatar Saswat

Initial commit

parents
import numpy as np
import nltk
# nltk.download("universal_tagset")
# nltk.download('brown')
from nltk.corpus import brown
import warnings
warnings.filterwarnings("ignore")
def getData():
X,Y=[],[]
brown_corpus = brown.tagged_sents(tagset='universal')
for sentence in brown_corpus:
X_sen = []
Y_sen = []
for entity in sentence:
X_sen.append(entity[0])
Y_sen.append(entity[1])
X.append(X_sen)
Y.append(Y_sen)
return X, Y
\ No newline at end of file
File added
{
"cells": [
{
"cell_type": "code",
"execution_count": 6,
"id": "d8fe16be",
"metadata": {},
"outputs": [],
"source": [
"from data_process import getData\n",
"import nltk \n",
"import numpy as np\n",
"import pandas as pd\n",
"import random\n",
"import os\n",
"from utils import seed_all\n",
"import torch\n",
"import torch.nn as nn\n",
"from torchtext.data.utils import get_tokenizer\n",
"import gensim.downloader as api\n",
"from gensim.models import KeyedVectors\n",
"from keras.preprocessing.text import Tokenizer\n",
"from tensorflow.keras.preprocessing.sequence import pad_sequences\n",
"from keras.utils.np_utils import to_categorical\n",
"from sklearn.model_selection import train_test_split\n",
"from model import *\n",
"from tqdm import tqdm\n",
"import torch.optim as optim \n",
"import sys\n",
"from data_process import *\n",
"import pickle\n",
"from nltk.tokenize import word_tokenize\n",
"import torch.nn.functional as F"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "8e37383f",
"metadata": {},
"outputs": [],
"source": [
"\n",
"MAX_SEQ_LENGTH = 200 # greater than 100 in length will be truncated\n",
"loss_function = nn.CrossEntropyLoss()\n",
"HIDDEN_DIM = 64\n",
"\n",
"MODEL_PATH = \"model1.bin\"\n",
"EMBEDDING_SIZE = 100\n",
"device = torch.device(\"cpu\")\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "2ff0da4a",
"metadata": {},
"outputs": [],
"source": [
"class Inference():\n",
" def __init__(self, model, word2vec, word_tokenizer, pos_tokenizer):\n",
" self.model = model\n",
" self.word2vec = word2vec\n",
" self.word2vec_dim = len(self.word2vec[\"man\"])\n",
" self.word_tokenizer = word_tokenizer\n",
" self.pos_tokenizer = pos_tokenizer\n",
" pass\n",
"\n",
" def get_word_embeddings(self, word):\n",
" if word in self.word2vec:\n",
" return torch.tensor(self.word2vec[word])\n",
" else:\n",
" print(\"Coudn't find embeddings for: \", word)\n",
" return torch.empty(self.word2vec_dim).normal_(mean=0, std=1)\n",
"\n",
" def rulebased_unkword_tag(self, word):\n",
" \"\"\"\n",
" Tag a word using its various lexical characteristic.\n",
" This is used a last resort when word embedding for a word cannot be found.\n",
" \"\"\"\n",
" exp_tag = \"NOUN\"\n",
" if (word.endswith(\"able\")):\n",
" exp_tag = \"ADJ\"\n",
" elif (word.endswith(\"ly\")):\n",
" exp_tag = \"ADV\"\n",
" if (word.endswith(\"ing\") or word.endswith(\"ed\")):\n",
" exp_tag = \"VERB\"\n",
" elif all(i.isdigit() for i in word):\n",
" exp_tag = \"NUM\"\n",
" return exp_tag\n",
"\n",
" def preprocess_sentence(self, sentence):\n",
" sentence = sentence.lower()\n",
" sentence = word_tokenize(sentence)\n",
" return sentence\n",
"\n",
" def get_tags(self, sentence):\n",
" sentence = self.preprocess_sentence(sentence)\n",
" tag_output = []\n",
" #print(sentence)\n",
" encoder_input = torch.zeros(1, len(sentence), EMBEDDING_SIZE)\n",
" for i in range(len(sentence)):\n",
" emb = self.get_word_embeddings(sentence[i])\n",
" encoder_input[0, i, :] = torch.tensor(emb)\n",
" \n",
" lstm_out, _ = self.model.lstm(encoder_input.view(len(sentence), 1, -1))\n",
" tag_space = self.model.hidden2tag(lstm_out.view(len(sentence), -1))\n",
" tag_scores = F.log_softmax(tag_space, dim=1)\n",
" pos_tags = torch.argmax(tag_scores, dim = -1)\n",
" #print(pos_tags)\n",
" output = [self.pos_tokenizer.index_word[x.item()] for x in pos_tags]\n",
" return output\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "04ac9d76",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Reading embedding matrix\n",
"Embeddings shape: (49816, 100)\n",
"Loading dicts....\n",
"Loading word2vec...\n",
"Word2vec Loaded.\n"
]
}
],
"source": [
"print(\"Reading embedding matrix\")\n",
"f = open(\"embddings.npy\", \"rb\")\n",
"embedding_weights = np.load(f)\n",
"\n",
"print(\"Embeddings shape: {}\".format(embedding_weights.shape))\n",
"\n",
"print(f\"Loading dicts....\")\n",
"word_tokenizer = None\n",
"tag_tokenizer = None\n",
"with open('word_tokenizer.pickle', 'rb') as handle:\n",
" word_tokenizer = pickle.load(handle)\n",
"\n",
"with open('pos_tokenizer.pickle', 'rb') as handle:\n",
" tag_tokenizer = pickle.load(handle)\n",
"\n",
"VOCABULARY_SIZE = len(word_tokenizer.word_index)\n",
"\n",
"print(f\"Loading word2vec...\")\n",
"word2vec = api.load(\"glove-wiki-gigaword-100\")\n",
"print(f\"Word2vec Loaded.\")"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "caa6cc2f",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Number of classes: 13\n",
"Loading Models....\n"
]
},
{
"data": {
"text/plain": [
"<All keys matched successfully>"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"NUM_CLASSES = 13\n",
"print(f\"Number of classes: {NUM_CLASSES}\")\n",
"embedding_weights = torch.tensor(embedding_weights, dtype=torch.float32)\n",
"model = PosTaggerBi(EMBEDDING_SIZE, HIDDEN_DIM, VOCABULARY_SIZE, NUM_CLASSES, embedding_weights)\n",
"print(f\"Loading Models....\")\n",
"model.load_state_dict(torch.load(MODEL_PATH,map_location=torch.device('cpu')))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "392ef454",
"metadata": {},
"outputs": [],
"source": [
"inf = Inference(model, word2vec, word_tokenizer, tag_tokenizer)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1f1ecc4c",
"metadata": {},
"outputs": [],
"source": [
"\n",
"INPUT_SENTENCE = \"No more skipping classes.\"\n",
"\n",
"\n",
"\n",
"output = inf.get_tags(INPUT_SENTENCE)\n",
"token_inp = inf.preprocess_sentence(INPUT_SENTENCE)\n",
"#print(tag_tokenizer.word_index)\n",
"for x,y in zip(token_inp, output):\n",
" print(x,\"\\t: \",y)\n",
"#print(token_inp)\n",
"#print(output)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "967e4f34",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
from data_process import getData
import nltk
import numpy as np
import pandas as pd
import random
import os
from utils import seed_all
import pickle
import torch
import torch.nn as nn
from torchtext.data.utils import get_tokenizer
import gensim.downloader as api
from gensim.models import KeyedVectors
from keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from keras.utils.np_utils import to_categorical
from sklearn.model_selection import train_test_split
from model import *
from tqdm import tqdm
import torch.optim as optim
import sys
from sklearn import metrics
from sklearn.metrics import fbeta_score
MAX_SEQ_LENGTH = 100 # greater than 100 in length will be truncated
loss_function = nn.CrossEntropyLoss()
HIDDEN_DIM = 64
#seed_all()
def evalFunction(model, X, Y, device):
model.to(device)
cpu_target = None
cpu_output = None
eval_loss = 0
eval_acc = 0
cor = 0
tot = 0
for sentence, tags in tqdm(zip(X,Y),total=len(X)):
with torch.no_grad():
mask_index = np.where(sentence == 0)[0]
mask_start_index = mask_index[0] if mask_index.any() else -1
if cpu_target is None:
cpu_target = np.array(tags[:mask_start_index])
else:
cpu_target = np.append(cpu_target, np.array(tags[:mask_start_index]), axis=0)
sentence = sentence.to(device,dtype=torch.long)
tags = tags.to(device, dtype=torch.float32)
tag_scores = model(sentence)
eval_loss += loss_function(tag_scores, tags)
if cpu_output is None:
cpu_output = tag_scores[:mask_start_index].cpu().numpy()
else:
cpu_output = np.append(cpu_output, tag_scores[:mask_start_index].cpu().numpy(), axis=0)
cor = sum(np.argmax(cpu_output, axis=1) == np.argmax(cpu_target, axis=1))
tot = len(cpu_output)
acc = cor/tot
print(f"Evaluation -> loss:{eval_loss}, accuracy:{cor/tot}")
return eval_loss, acc
def trainer(model, X, Y, X_val, Y_val, epochs, device, optimizer):
model.to(device)
model.train()
val_loss = []
best_accuracy = 0
for epoch in tqdm(range(epochs)):
epoch_loss = 0
for sentence, tags in tqdm(zip(X,Y),total=len(X)):
sentence = sentence.to(device,dtype=torch.long)
tags = tags.to(device, dtype=torch.float32)
model.zero_grad()
tag_scores = model(sentence)
# print(tags.shape, tag_scores.shape)
# sys.exit()
loss = loss_function(tag_scores, tags)
epoch_loss += loss.item()
loss.backward()
optimizer.step()
eval_loss, accuracy = evalFunction(model, X_val, Y_val, device)
val_loss.append(eval_loss)
if accuracy > best_accuracy:
print("Saving model")
torch.save(model.state_dict(), "model1.bin")
best_accuracy = accuracy
print(f"Epoch {epoch+1} Loss-> ", epoch_loss)
# with torch.no_grad():
# inputs = X[0][0].to(device, torch.long)
# tags = Y[0][0].to(device, dtype=torch.float32)
# tag_scores = model(inputs)
# print(loss_function(tag_scores, tags))
if __name__ == "__main__":
words, pos_tags = getData()
num_vocab_words = len(set(word.lower() for sentence in words for word in sentence))
num_vocab_tags = len(set(tag.lower() for sentence in pos_tags for tag in sentence))
print("Total number of tagged sentences: {}".format(len(words)))
print("Vocabulary size: {}".format(num_vocab_words))
print("Total number of tags: {}".format(num_vocab_tags))
print('sample X: ', words[0], '\n')
print('sample Y: ', pos_tags[0], '\n')
# Tokenize input sentence
word_tokenizer = Tokenizer()
word_tokenizer.fit_on_texts(words)
words_encoded = word_tokenizer.texts_to_sequences(words)
# Tokenize pos tags
tag_tokenizer = Tokenizer()
tag_tokenizer.fit_on_texts(pos_tags)
pos_tags_encoded = tag_tokenizer.texts_to_sequences(pos_tags)
with open('word_tokenizer.pickle', 'wb') as handle:
pickle.dump(word_tokenizer, handle)
with open('pos_tokenizer.pickle', 'wb') as handle:
pickle.dump(tag_tokenizer, handle)
print("** Raw data point **", "\n", "-"*100, "\n")
print('X: ', words[0], '\n')
print('Y: ', pos_tags[0], '\n')
print()
print("** Encoded data point **", "\n", "-"*100, "\n")
print('X: ', words_encoded[0], '\n')
print('Y: ', pos_tags_encoded[0], '\n')
lens = [len(seq) for seq in words_encoded]
print("Length of longest sentence: {}".format(max(lens)))
# truncate amd pad sentence to equal length
words_padded = pad_sequences(words_encoded, maxlen=MAX_SEQ_LENGTH, padding="post", truncating="post")
pos_tags_padded = pad_sequences(pos_tags_encoded, maxlen=MAX_SEQ_LENGTH, padding="post", truncating="post")
print(words_padded[0], "\n"*3)
print(pos_tags_padded[0])
X, Y = words_padded, pos_tags_padded
# load word2vec from the gensim library
word2vec = api.load("glove-wiki-gigaword-100")
# result = word2vec.most_similar(positive = ["king", "woman"], negative = ["man"])
# print(result)
# print()
EMBEDDING_SIZE = len(word2vec['man']) # here 100
VOCABULARY_SIZE = len(word_tokenizer.word_index) + 1
embedding_weights = np.zeros((VOCABULARY_SIZE, EMBEDDING_SIZE))
word2id = word_tokenizer.word_index
# copy vectors from word2vec model to the words present in corpus
for word, index in word2id.items():
try:
embedding_weights[index, :] = word2vec[word]
except Exception as e:
pass
print("Embeddings shape: {}".format(embedding_weights.shape))
#open("embeddings.npy")
np.save("embddings.npy", embedding_weights)
#embedding_weights = np.load(f)
Y = to_categorical(Y)
print(Y.shape)
NUM_CLASSES = Y.shape[2]
# Data conversion for torch
embedding_weights = torch.tensor(embedding_weights, dtype=torch.float32)
X = torch.tensor(X, dtype=torch.long)
Y = torch.tensor(Y, dtype=torch.float32)
# Split data into train, test, val
TEST_SIZE = 0.15
VALID_SIZE = 0.15
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=TEST_SIZE, random_state=4)
X_train, X_validation, Y_train, Y_validation = train_test_split(X_train, Y_train, test_size=VALID_SIZE, random_state=4)
print("TRAINING DATA")
print('Shape of input sequences: {}'.format(X_train.shape))
print('Shape of output sequences: {}'.format(Y_train.shape))
print("-"*50)
print("VALIDATION DATA")
print('Shape of input sequences: {}'.format(X_validation.shape))
print('Shape of output sequences: {}'.format(Y_validation.shape))
print("-"*50)
print("TESTING DATA")
print('Shape of input sequences: {}'.format(X_test.shape))
print('Shape of output sequences: {}'.format(Y_test.shape))
device = torch.device('cpu')
embedding_weights = embedding_weights.to(device)
model = PosTaggerBi(EMBEDDING_SIZE, HIDDEN_DIM, VOCABULARY_SIZE, NUM_CLASSES, embedding_weights)
optimizer = optim.Adam(model.parameters(),lr=5e-5)
trainer(model, X_train, Y_train, X_validation, Y_validation, 20, device, optimizer=optimizer)
\ No newline at end of file
import torch
import torch.nn as nn
import torch.nn.functional as F
class PosTagger(nn.Module):
def __init__(self,embedding_dim, hidden_dim, vocab_size, tag_classes,embeddings):
super(PosTagger, self).__init__()
self.hidden_dim = hidden_dim
self.word_embeddings = nn.Embedding.from_pretrained(embeddings,freeze=True)
self.lstm = nn.LSTM(embedding_dim, hidden_dim)
self.hidden2tag = nn.Linear(hidden_dim, tag_classes)
def forward(self, sentence):
embeds = self.word_embeddings(sentence)
lstm_out, _ = self.lstm(embeds.view(len(sentence), 1, -1))
tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1))
tag_scores = F.log_softmax(tag_space, dim=1)
return tag_scores
class PosTaggerBi(nn.Module):
def __init__(self,embedding_dim, hidden_dim, vocab_size, tag_classes,embeddings):
super(PosTaggerBi, self).__init__()
self.hidden_dim = hidden_dim
self.word_embeddings = nn.Embedding.from_pretrained(embeddings,freeze=True)
self.lstm = nn.LSTM(embedding_dim, hidden_dim, bidirectional=True)
self.hidden2tag = nn.Linear(hidden_dim*2, tag_classes)
def forward(self, sentence):
embeds = self.word_embeddings(sentence)
lstm_out, _ = self.lstm(embeds.view(len(sentence), 1, -1))
tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1))
tag_scores = F.log_softmax(tag_space, dim=1)
return tag_scores
File added
def seed_all(seed:int = 1004):
random.seed(seed)
np.random.seed(seed)
os.environ["PYTHONHASHSEED"] = str(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment