Commit 05ebb3df authored by AXEL JAMES's avatar AXEL JAMES

adds key copying

parent c7f5444d
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include <math.h> #include <math.h>
#include "chord.h" #include "chord.h"
#include <pthread.h> #include <pthread.h>
#include "kvstore.h"
#define MAXLOCATIONS 16 #define MAXLOCATIONS 16
#define MAXIDVALUE 65536 #define MAXIDVALUE 65536
...@@ -126,16 +127,24 @@ void setSuccessorAndPredecessor(Node allNodes[],int size) { ...@@ -126,16 +127,24 @@ void setSuccessorAndPredecessor(Node allNodes[],int size) {
if(chord.node.nodeId==allNodes[i].nodeId) if(chord.node.nodeId==allNodes[i].nodeId)
break; break;
} }
Node oldPred = chord.predecessor;
if(i==0){ if(i==0){
chord.predecessor = allNodes[size-1]; chord.predecessor = allNodes[size-1];
}else{ }else{
chord.predecessor = allNodes[i-1]; chord.predecessor = allNodes[i-1];
} }
if(i == size -1) { if(i == size -1) {
chord.successor = allNodes[0]; chord.successor = allNodes[0];
}else { }else {
chord.successor = allNodes[i+1]; chord.successor = allNodes[i+1];
} }
if (chord.predecessor.nodeId!=oldPred.nodeId)
{
TransferKeys(chord.predecessor.nodeId, chord.predecessor.ip, chord.predecessor.port);
}
} }
void *periodicStabilize(void *arg) { void *periodicStabilize(void *arg) {
......
...@@ -35,3 +35,4 @@ void createNodeStruct(Node *givenNode,char ip[16],int port,int id); ...@@ -35,3 +35,4 @@ void createNodeStruct(Node *givenNode,char ip[16],int port,int id);
void stabilize(); void stabilize();
Node getSuccessor(); Node getSuccessor();
void sendUDP(char * msg, char ip[], int port, bool recvResponse, char * buffer); void sendUDP(char * msg, char ip[], int port, bool recvResponse, char * buffer);
int nodeToId(struct sockaddr_in *socketAddr)
\ No newline at end of file
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/stat.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <sys/time.h> #include <sys/time.h>
#include <stdbool.h> #include <stdbool.h>
...@@ -66,6 +67,7 @@ int main(int argc, char* argv[]) ...@@ -66,6 +67,7 @@ int main(int argc, char* argv[])
printf("NumSets = %d",numSets); printf("NumSets = %d",numSets);
char* cacheptr = buildCache(numSets, setSize); char* cacheptr = buildCache(numSets, setSize);
initStore(); initStore();
int sock_fd, addrlen, msglen, newConnection, client_socket[MAX_CLIENTS], max_sd; int sock_fd, addrlen, msglen, newConnection, client_socket[MAX_CLIENTS], max_sd;
struct sockaddr_in socketAddr,joinServer,clientAddr; struct sockaddr_in socketAddr,joinServer,clientAddr;
puts("getting socket"); puts("getting socket");
...@@ -91,6 +93,17 @@ int main(int argc, char* argv[]) ...@@ -91,6 +93,17 @@ int main(int argc, char* argv[])
perror("bind failed"); perror("bind failed");
exit(1); exit(1);
} }
int id = nodeToId(&socketAddr);
sprintf(dir,"/%d/",id);
struct stat st = {0};
if (stat(dir, &st) == -1) {
mkdir(dir, 0700);
}
initChordServer(socketAddr); initChordServer(socketAddr);
// initChordStructure(&joinServer,socketAddr); // initChordStructure(&joinServer,socketAddr);
printf("Listening on port %d \n", PORT); printf("Listening on port %d \n", PORT);
......
...@@ -4,6 +4,9 @@ ...@@ -4,6 +4,9 @@
#include<pthread.h> #include<pthread.h>
#include<unistd.h> #include<unistd.h>
#include<stdbool.h> #include<stdbool.h>
#include "chord.h"
#include "parsexml.h"
#include "kvstore.h"
#define NUMFILES 10 #define NUMFILES 10
typedef struct Store typedef struct Store
...@@ -47,11 +50,13 @@ bool dumpToFile(char* key, char* val){ ...@@ -47,11 +50,13 @@ bool dumpToFile(char* key, char* val){
int setNo = getFileId(key); int setNo = getFileId(key);
char fileName[10]; char fileName[20];
char tempfileName[10]; char tempfileName[20];
bool keyExists = false; bool keyExists = false;
strcpy(tempfileName,"temp"); strcpy(tempfileName,dir);
sprintf(fileName,"%d",setNo); strcat(tempfileName,"temp");
// strcpy(fileName,dir);
sprintf(fileName,"%s%d",dir,setNo);
strcat(fileName,".csv"); strcat(fileName,".csv");
strcat(tempfileName,fileName); strcat(tempfileName,fileName);
...@@ -68,7 +73,6 @@ bool dumpToFile(char* key, char* val){ ...@@ -68,7 +73,6 @@ bool dumpToFile(char* key, char* val){
{ {
/* code */ /* code */
while(!feof(fp)){ while(!feof(fp)){
printf("ch2");
if(fscanf(fp,"%[^,],%[^\n]\n",storedKey,storedVal) == EOF) if(fscanf(fp,"%[^,],%[^\n]\n",storedKey,storedVal) == EOF)
break; break;
...@@ -84,7 +88,6 @@ bool dumpToFile(char* key, char* val){ ...@@ -84,7 +88,6 @@ bool dumpToFile(char* key, char* val){
if(!keyExists) if(!keyExists)
fprintf(newFp,"%s,%s\n",key,val); fprintf(newFp,"%s,%s\n",key,val);
printf("ch3");
remove(fileName); remove(fileName);
// free(fp); // free(fp);
...@@ -104,11 +107,14 @@ bool deleteStore(char* key) { ...@@ -104,11 +107,14 @@ bool deleteStore(char* key) {
int setNo = getFileId(key); int setNo = getFileId(key);
char fileName[10]; char fileName[20];
char tempfileName[10]; char tempfileName[20];
bool keyExists = false; bool keyExists = false;
strcpy(tempfileName,"temp"); // strcpy(tempfileName,"temp");
sprintf(fileName,"%d",setNo); // sprintf(fileName,"%d",setNo);
strcpy(tempfileName,dir);
strcat(tempfileName,"temp");
sprintf(fileName,"%s%d",dir,setNo);
strcat(fileName,".csv"); strcat(fileName,".csv");
strcat(tempfileName,fileName); strcat(tempfileName,fileName);
...@@ -143,40 +149,49 @@ bool deleteStore(char* key) { ...@@ -143,40 +149,49 @@ bool deleteStore(char* key) {
return keyExists; return keyExists;
} }
char* restoreFromFile(char* key) { void TransferKeys(int id, char ip[], int port) {
/////////////////add///////////////// char * storedKey, *storedVal;
int setNo = getFileId(key); char *operation = (char *)malloc(sizeof(char)*7);
/////////////////add end///////////////// strcpy(operation,"putreq");
for (int i = 0; i < NUMFILES; i++)
{
/* code */
// int setNo = getFileId(key);
char fileName[10]; char fileName[10];
sprintf(fileName,"%d",setNo); // sprintf(fileName,"%d",i);
sprintf(fileName,"%s%d",dir,i);
strcat(fileName,".csv"); strcat(fileName,".csv");
/////////////////add///////////////// /////////////////add/////////////////
pthread_mutex_lock(&(store.locks[setNo])); pthread_mutex_lock(&(store.locks[i]));
/////////////////add end///////////////// /////////////////add end/////////////////
FILE *fp = fopen(fileName,"r+"); FILE *fp = fopen(fileName,"r+");
if(fp==NULL) if(fp==NULL)
return NULL; continue;
char* storedKey = malloc(sizeof(char)*256); storedKey = malloc(sizeof(char)*256);
char* storedVal = malloc(sizeof(char)*256*1024+1); storedVal = malloc(sizeof(char)*256*1024+1);
while(1){ while(1){
if(fscanf(fp,"%[^,],%[^\n]\n",storedKey,storedVal) == EOF) if(fscanf(fp,"%[^,],%[^\n]\n",storedKey,storedVal) == EOF)
break; break;
if(!strcmp(key,storedKey)) { int keyid = keyToId(storedKey);
free(storedKey); if(keyid<=id) {
fclose(fp); // free(storedKey);
pthread_mutex_unlock(&(store.locks[setNo])); // pthread_mutex_unlock(&(store.locks[i]));
return storedVal; char *msg = toXML(operation,storedKey,storedVal,NULL,0);
sendUDP(msg,ip,port,false,NULL);
} }
} }
fclose(fp); fclose(fp);
/////////////////add///////////////// /////////////////add/////////////////
pthread_mutex_unlock(&(store.locks[setNo])); pthread_mutex_unlock(&(store.locks[i]));
/////////////////add end///////////////// }
///////////////add end/////////////////
free(storedKey); free(storedKey);
return NULL; return NULL;
...@@ -196,8 +211,9 @@ void dumpToFileXML(char* filename) { ...@@ -196,8 +211,9 @@ void dumpToFileXML(char* filename) {
fprintf(dumpPtr,"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<KVStore>\n"); fprintf(dumpPtr,"<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<KVStore>\n");
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
char fileSet[10]; char fileSet[20];
sprintf(fileSet,"%d",i); // sprintf(fileSet,"%d",i);
sprintf(fileSet,"%s%d",dir,i);
strcat(fileSet,".csv"); strcat(fileSet,".csv");
FILE *fp = fopen(fileSet,"r+"); FILE *fp = fopen(fileSet,"r+");
while(1){ while(1){
...@@ -228,3 +244,42 @@ int iniit() { ...@@ -228,3 +244,42 @@ int iniit() {
return 0; return 0;
} }
char * restoreFromFile(char* key) {
/////////////////add/////////////////
int setNo = getFileId(key);
/////////////////add end/////////////////
char fileName[20];
sprintf(fileName,"%s%d",dir,setNo);
// sprintf(fileName,"%d",setNo);
strcat(fileName,".csv");
/////////////////add/////////////////
pthread_mutex_lock(&(store.locks[setNo]));
/////////////////add end/////////////////
FILE *fp = fopen(fileName,"r+");
if(fp==NULL)
return NULL;
char* storedKey = malloc(sizeof(char)*256);
char* storedVal = malloc(sizeof(char)*256*1024+1);
while(1){
if(fscanf(fp,"%[^,],%[^\n]\n",storedKey,storedVal) == EOF)
break;
if(!strcmp(key,storedKey)) {
free(storedKey);
fclose(fp);
pthread_mutex_unlock(&(store.locks[setNo]));
return storedVal;
}
}
fclose(fp);
/////////////////add/////////////////
pthread_mutex_unlock(&(store.locks[setNo]));
/////////////////add end/////////////////
free(storedKey);
return NULL;
}
...@@ -2,4 +2,6 @@ ...@@ -2,4 +2,6 @@
bool dumpToFile(char* key,char* val); bool dumpToFile(char* key,char* val);
bool deleteStore(char* key); bool deleteStore(char* key);
char* restoreFromFile(char* key); char* restoreFromFile(char* key);
char* TransferKeys(int id, char ip[], int port);
void initStore(); void initStore();
char * dir[15];
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment