Commit 97ecdd95 authored by Bhavesh Yadav's avatar Bhavesh Yadav

Fixed keylookup and request forwarding

parent b47089aa
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <netinet/in.h> #include <netinet/in.h>
#define PORT 8080 #define PORT 6089
void sendUDP(char * msg, char ip[], int port, bool recvResponse, char * buffer, int src_port) void sendUDP(char * msg, char ip[], int port, bool recvResponse, char * buffer, int src_port)
{ {
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "chord.h" #include "chord.h"
#include <pthread.h> #include <pthread.h>
#define MAXLOCATIONS 32 #define MAXLOCATIONS 16
#define MAXIDVALUE 65536 #define MAXIDVALUE 65536
int keyToId(char * key){ int keyToId(char * key){
...@@ -158,10 +158,10 @@ void stabilize() { ...@@ -158,10 +158,10 @@ void stabilize() {
fclose(configFile); fclose(configFile);
sortNodes(currNodes,i); sortNodes(currNodes,i);
setSuccessorAndPredecessor(currNodes,i); setSuccessorAndPredecessor(currNodes,i);
system("clear"); // system("clear");
printf("node-id: %d,successor: %d, pred: %d\n",chord.node.nodeId,chord.successor.nodeId,chord.predecessor.nodeId); // printf("node-id: %d,successor: %d, pred: %d\n",chord.node.nodeId,chord.successor.nodeId,chord.predecessor.nodeId);
fixFingers(currNodes,i); fixFingers(currNodes,i);
printFingerTable(); // printFingerTable();
} }
void initChordServer(sockaddr_t serverAddr) { void initChordServer(sockaddr_t serverAddr) {
...@@ -223,27 +223,31 @@ void initChordStructure(struct sockaddr_in *joinNode,sockaddr_t socketAddrServer ...@@ -223,27 +223,31 @@ void initChordStructure(struct sockaddr_in *joinNode,sockaddr_t socketAddrServer
Node closestPreceedingNode(int id){ Node closestPreceedingNode(int id){
int m= log(MAXLOCATIONS); int m= MAXLOCATIONS;
Node* f=chord.fingerTable[0]; Node* f=chord.fingerTable[0];
for (int i = m-1; i >=0; i--) for (int i = m-1; i >=0; i--)
{ {
int nextId = chord.fingerTable[i]->nodeId; int nextId = chord.fingerTable[i]->nodeId;
printf("nextid: %d",nextId); printf("nextid: %d\n",nextId);
if(nextId == 0)
continue;
if (nextId>chord.node.nodeId && nextId<=id) if (nextId>chord.node.nodeId && nextId<=id)
{ {
return *chord.fingerTable[i]; return *chord.fingerTable[i];
} }
/// To handle edge case
if(chord.node.nodeId<chord.predecessor.nodeId)
return chord.node;
else if(chord.node.nodeId>chord.successor.nodeId)
return chord.successor;
} }
// strcpy(ip,f[0].ip); //???? // strcpy(ip,f[0].ip); //????
} }
Node findSuccessor(int id, bool fixFinger){ Node findSuccessor(int id, bool fixFinger){
if(chord.predecessor.nodeId == -1) { printf("successor: %d, pred: %d",chord.successor.nodeId,chord.predecessor.nodeId);
printFingerTable();
if(chord.predecessor.nodeId == chord.node.nodeId) {
return chord.node; return chord.node;
}else if(id>chord.predecessor.nodeId && id<=chord.node.nodeId) }else if(id<=chord.node.nodeId && id > chord.predecessor.nodeId)
return chord.node; return chord.node;
if (id>chord.node.nodeId && id<=chord.successor.nodeId) if (id>chord.node.nodeId && id<=chord.successor.nodeId)
{ {
......
...@@ -33,3 +33,5 @@ void sendUDPToNode(char* msg,Node node, bool recvResponse, char* buffer); ...@@ -33,3 +33,5 @@ void sendUDPToNode(char* msg,Node node, bool recvResponse, char* buffer);
void initChordServer(); void initChordServer();
void createNodeStruct(Node *givenNode,char ip[16],int port,int id); void createNodeStruct(Node *givenNode,char ip[16],int port,int id);
void stabilize(); void stabilize();
Node getSuccessor();
void sendUDP(char * msg, char ip[], int port, bool recvResponse, char * buffer);
\ No newline at end of file
...@@ -67,7 +67,7 @@ int main(int argc, char* argv[]) ...@@ -67,7 +67,7 @@ int main(int argc, char* argv[])
char* cacheptr = buildCache(numSets, setSize); char* cacheptr = buildCache(numSets, setSize);
initStore(); initStore();
int sock_fd, addrlen, msglen, newConnection, client_socket[MAX_CLIENTS], max_sd; int sock_fd, addrlen, msglen, newConnection, client_socket[MAX_CLIENTS], max_sd;
struct sockaddr_in socketAddr,joinServer; struct sockaddr_in socketAddr,joinServer,clientAddr;
puts("getting socket"); puts("getting socket");
if(joinPort!=-1) if(joinPort!=-1)
getSocketFromIpAndPort(joinIp,joinPort,&joinServer); getSocketFromIpAndPort(joinIp,joinPort,&joinServer);
...@@ -76,7 +76,7 @@ int main(int argc, char* argv[]) ...@@ -76,7 +76,7 @@ int main(int argc, char* argv[])
fd_set rset; fd_set rset;
memset(client_socket,0,sizeof(int)*MAX_CLIENTS); memset(client_socket,0,sizeof(int)*MAX_CLIENTS);
if( (sock_fd = socket(AF_INET , SOCK_STREAM , 0)) == 0) { if( (sock_fd = socket(AF_INET , SOCK_DGRAM , 0)) == 0) {
printf("%s",toRespXML("Network Error: Could not create socket")); printf("%s",toRespXML("Network Error: Could not create socket"));
exit(1); exit(1);
} }
...@@ -95,78 +95,88 @@ int main(int argc, char* argv[]) ...@@ -95,78 +95,88 @@ int main(int argc, char* argv[])
// initChordStructure(&joinServer,socketAddr); // initChordStructure(&joinServer,socketAddr);
printf("Listening on port %d \n", PORT); printf("Listening on port %d \n", PORT);
if (listen(sock_fd, BACKLOG) < 0) { // if (listen(sock_fd, BACKLOG) < 0) {
perror("listen"); // perror("listen");
exit(1); // exit(1);
} // }
puts("Waiting for connections ..."); // puts("Waiting for connections ...");
int len, n;
while(1) while(1) {
{ puts("in udp listen");
FD_ZERO(&rset); n = recvfrom(sock_fd, (char *)buffer, 257*1024,
FD_SET(sock_fd, &rset); MSG_WAITALL, ( struct sockaddr *) &clientAddr,
max_sd = sock_fd; &len);
buffer[n] = '\0';
for (int i = 0 ; i < MAX_CLIENTS ; i++) { printf("Client : %s\n", buffer);
int sd = client_socket[i];
if(sd > 0)
FD_SET( sd , &rset);
if(sd > max_sd)
max_sd = sd;
}
if ( (select(max_sd+1,&rset,NULL,NULL,NULL) < 0) && (errno!=EINTR))
printf("select error");
if (FD_ISSET(sock_fd, &rset)){
newConnection = accept(sock_fd, (struct sockaddr *)&socketAddr, (socklen_t*)&addrlen);
if (newConnection < 0) {
perror("Error during Accept");
exit(1);
}
printf("New connection,socket_fd is %d,ip is:%s,port:%d\n", newConnection,inet_ntoa(socketAddr.sin_addr),ntohs(socketAddr.sin_port));
for (int i=0;i<MAX_CLIENTS;i++) {
if( client_socket[i] == 0 ) {
client_socket[i] = newConnection;
printf("Adding to list of sockets as %d\n" , i);
break;
}
}
}
for (int i = 0; i < MAX_CLIENTS; i++) {
int sd = client_socket[i];
if (FD_ISSET(sd,&rset)) {
if ((msglen = read(sd,buffer,1024*257)) == 0) {
getpeername(sd , (struct sockaddr*)&socketAddr,(socklen_t*)&addrlen);
printf("Host disconnected,ip %s,port %d\n",inet_ntoa(socketAddr.sin_addr),ntohs(socketAddr.sin_port));
close(sd);
client_socket[i] = 0;
}
else {
buffer[msglen] = '\0';
// printf("message recieved\n");
// printf("%s",buffer);
puts(buffer); puts(buffer);
addRequestToQueue(tp,decodeRequestAndProcess,buffer,sd); addRequestToQueue(tp,decodeRequestAndProcess,buffer);
/* }
char operation[7],key[257],value[256*1024+1]; // while(1)
extractXML(buffer,key,value,operation); // {
printf("%s\n%s\n%s\n",key,value,operation); // FD_ZERO(&rset);
// FD_SET(sock_fd, &rset);
// max_sd = sock_fd;
ThreadPool Here !!
Call extract from XML procedure to get Operation Type, Key, Value // for (int i = 0 ; i < MAX_CLIENTS ; i++) {
// int sd = client_socket[i];
*/
// write(sd , buffer , strlen(buffer)); // if(sd > 0)
} // FD_SET( sd , &rset);
}
} // if(sd > max_sd)
} // max_sd = sd;
// }
// if ( (select(max_sd+1,&rset,NULL,NULL,NULL) < 0) && (errno!=EINTR))
// printf("select error");
// if (FD_ISSET(sock_fd, &rset)){
// newConnection = accept(sock_fd, (struct sockaddr *)&socketAddr, (socklen_t*)&addrlen);
// if (newConnection < 0) {
// perror("Error during Accept");
// exit(1);
// }
// printf("New connection,socket_fd is %d,ip is:%s,port:%d\n", newConnection,inet_ntoa(socketAddr.sin_addr),ntohs(socketAddr.sin_port));
// for (int i=0;i<MAX_CLIENTS;i++) {
// if( client_socket[i] == 0 ) {
// client_socket[i] = newConnection;
// printf("Adding to list of sockets as %d\n" , i);
// break;
// }
// }
// }
// for (int i = 0; i < MAX_CLIENTS; i++) {
// int sd = client_socket[i];
// if (FD_ISSET(sd,&rset)) {
// if ((msglen = read(sd,buffer,1024*257)) == 0) {
// getpeername(sd , (struct sockaddr*)&socketAddr,(socklen_t*)&addrlen);
// printf("Host disconnected,ip %s,port %d\n",inet_ntoa(socketAddr.sin_addr),ntohs(socketAddr.sin_port));
// close(sd);
// client_socket[i] = 0;
// }
// else {
// buffer[msglen] = '\0';
// // printf("message recieved\n");
// // printf("%s",buffer);
// puts(buffer);
// addRequestToQueue(tp,decodeRequestAndProcess,buffer,sd);
// /*
// char operation[7],key[257],value[256*1024+1];
// extractXML(buffer,key,value,operation);
// printf("%s\n%s\n%s\n",key,value,operation);
// ThreadPool Here !!
// Call extract from XML procedure to get Operation Type, Key, Value
// */
// // write(sd , buffer , strlen(buffer));
// }
// }
// }
// }
return 0; return 0;
} }
\ No newline at end of file
...@@ -133,7 +133,7 @@ char *extractRespXml(FILE *fp,char *buffer){ ...@@ -133,7 +133,7 @@ char *extractRespXml(FILE *fp,char *buffer){
extReq_t *extractXML(char *buffer){ extReq_t *extractXML(char *buffer){
extReq_t *req; extReq_t *req;
req = malloc(sizeof(extReq_t)); req = malloc(sizeof(extReq_t));
char *p,*q; char *p,*q,*r;
req->operation = malloc(sizeof(char)*12); req->operation = malloc(sizeof(char)*12);
req->err = malloc(sizeof(char)*170); req->err = malloc(sizeof(char)*170);
req->val = malloc(sizeof(char)*256*1024+1); req->val = malloc(sizeof(char)*256*1024+1);
...@@ -183,11 +183,14 @@ extReq_t *extractXML(char *buffer){ ...@@ -183,11 +183,14 @@ extReq_t *extractXML(char *buffer){
p = strstr(buffer, "<Address>")+9; p = strstr(buffer, "<Address>")+9;
q = strstr(buffer, "</Address>"); q = strstr(buffer, "</Address>");
if(p!=NULL && q!=NULL){ if(p!=NULL && q!=NULL){
strcpy(req->ipAddr,strtok(p, " ")); r = strstr(p," ");
int l = strlen(req->ipAddr); memcpy(req->ipAddr,p,r-p);
// strcpy(req->ipAddr,strtok(p, " "));
// int l = strlen(req->ipAddr);
req->ipAddr[r-p] = '\0';
char port_s[6]; char port_s[6];
memcpy(port_s,p+l+1,q-p-l-1); memcpy(port_s,r+1,q-r-1);
port_s[q-p-l-1] = '\0'; port_s[q-r-1] = '\0';
req->port = atoi(port_s); req->port = atoi(port_s);
} }
......
...@@ -29,7 +29,7 @@ struct tpool { ...@@ -29,7 +29,7 @@ struct tpool {
bool stop; bool stop;
}; };
static request_t *createJob(tFunction func, void *arg,int clientFD) static request_t *createJob(tFunction func, void *arg)
{ {
request_t *work; request_t *work;
...@@ -39,7 +39,7 @@ static request_t *createJob(tFunction func, void *arg,int clientFD) ...@@ -39,7 +39,7 @@ static request_t *createJob(tFunction func, void *arg,int clientFD)
work = malloc(sizeof(*work)); work = malloc(sizeof(*work));
work->func = func; work->func = func;
work->arg = arg; work->arg = arg;
work->clientFD = clientFD; // work->clientFD = clientFD;
work->next = NULL; work->next = NULL;
return work; return work;
} }
...@@ -90,8 +90,8 @@ static void *threadMainFunction(void *arg) ...@@ -90,8 +90,8 @@ static void *threadMainFunction(void *arg)
if (work != NULL) { if (work != NULL) {
work->func(work->arg); work->func(work->arg);
if(work->arg !=NULL); // if(work->arg !=NULL);
write(work->clientFD,work->arg,sizeof(char)*strlen(work->arg)); // write(work->clientFD,work->arg,sizeof(char)*strlen(work->arg));
deleteJob(work); deleteJob(work);
} }
...@@ -162,7 +162,7 @@ void destroyThreadPool(tpool_t *tm) ...@@ -162,7 +162,7 @@ void destroyThreadPool(tpool_t *tm)
free(tm); free(tm);
} }
bool addRequestToQueue(tpool_t *tm, tFunction func, void *arg,int clientFD) bool addRequestToQueue(tpool_t *tm, tFunction func, void *arg)
{ {
request_t *work; request_t *work;
int msglen = 1024*257+170; int msglen = 1024*257+170;
...@@ -170,7 +170,7 @@ bool addRequestToQueue(tpool_t *tm, tFunction func, void *arg,int clientFD) ...@@ -170,7 +170,7 @@ bool addRequestToQueue(tpool_t *tm, tFunction func, void *arg,int clientFD)
strcpy(copyBuffer,arg); strcpy(copyBuffer,arg);
if (tm == NULL) if (tm == NULL)
return false; return false;
work = createJob(func, copyBuffer,clientFD); work = createJob(func, copyBuffer);
if (work == NULL) if (work == NULL)
return false; return false;
pthread_mutex_lock(&(tm->queueLock)); pthread_mutex_lock(&(tm->queueLock));
...@@ -239,7 +239,7 @@ int init(int argc, char **argv) ...@@ -239,7 +239,7 @@ int init(int argc, char **argv)
strcpy(buffer[3],"<?xml version=\"1.0\" encoding=\"UTF-8\"?><KVMessage type=\"getreq\"><Key>5</Key><Value></Value></KVMessage>"); strcpy(buffer[3],"<?xml version=\"1.0\" encoding=\"UTF-8\"?><KVMessage type=\"getreq\"><Key>5</Key><Value></Value></KVMessage>");
for (i=0; i<4; i++) { for (i=0; i<4; i++) {
vals[i] = i; vals[i] = i;
addRequestToQueue(tm, decodeRequestAndProcess, buffer[i],1); addRequestToQueue(tm, decodeRequestAndProcess, buffer[i]);
} }
tpoolWait(tm); tpoolWait(tm);
free(vals); free(vals);
...@@ -261,8 +261,8 @@ void decodeRequestAndProcess(char* buffer) { ...@@ -261,8 +261,8 @@ void decodeRequestAndProcess(char* buffer) {
puts("key"); puts("key");
puts(request->key); puts(request->key);
int id = keyToId(request->key); int id = keyToId(request->key);
printf("key id : %d",id); printf("key id : %d\n",id);
Node successorNode = findSuccessor(id, false); //find server where the key belongs Node successorNode = findSuccessor(id,false); //find server where the key belongs
if(successorNode.nodeId == chord.node.nodeId) { //key belongs to the server if(successorNode.nodeId == chord.node.nodeId) { //key belongs to the server
puts("Node Id self"); puts("Node Id self");
printf("%d\n Key Id: %d,\n",chord.node.nodeId,id); printf("%d\n Key Id: %d,\n",chord.node.nodeId,id);
...@@ -291,10 +291,16 @@ void decodeRequestAndProcess(char* buffer) { ...@@ -291,10 +291,16 @@ void decodeRequestAndProcess(char* buffer) {
// puts("buffer"); // puts("buffer");
// puts(buffer); // puts(buffer);
} }
puts("buffer");
puts(buffer);
puts(request->ipAddr);
sendUDP(buffer,request->ipAddr,request->port,false,NULL);
}else }else
{ {
puts("Forward request"); puts("Forward request");
printf("key id: %d, successor node id: %d",id,successorNode.nodeId); printf("key id: %d, successor node id: %d\n",id,successorNode.nodeId);
puts("buffer");
puts(buffer);
sendUDPToNode(buffer,successorNode,false,NULL); sendUDPToNode(buffer,successorNode,false,NULL);
buffer = NULL; buffer = NULL;
} }
......
...@@ -9,5 +9,5 @@ tpool_t *createThreadPool(size_t num); ...@@ -9,5 +9,5 @@ tpool_t *createThreadPool(size_t num);
void destroyThreadPool(tpool_t *tm); void destroyThreadPool(tpool_t *tm);
void decodeRequestAndProcess(char* buffer); void decodeRequestAndProcess(char* buffer);
bool addRequestToQueue(tpool_t *tm, tFunction func, void *arg,int clientFD); bool addRequestToQueue(tpool_t *tm, tFunction func, void *arg);
void tpoolWait(tpool_t *tm); void tpoolWait(tpool_t *tm);
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment