Commit b71a9ba8 authored by shashanktomar11's avatar shashanktomar11

Shashank RnD Code

parent bcceae60
#include <unistd.h>
#include <stdio.h>
#include <iostream>
#include <fstream>
#include <algorithm>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <thread>
#include <vector>
#include <mutex>
#include <chrono>
#include <random>
#include <unordered_map>
#include <utility>
#include <inttypes.h>
#define PORT 8085
#define MILLIS 1000
#define MICRO MILLIS * 1000
#define MAX_TARGET 4
#define K 1024
#define M K * 1024
#define G M * 1024
// in order to turn on debugging, compile with -DDEBUG flag
#ifdef DEBUG
#define D(x) (x)
#else
#define D(x) do{}while(0)
#endif
using namespace std;
// unordered_map<string, uint64_t> m;
vector<uint64_t> latv; // vector to store the differnce in timestamps for each packet
// Time since epoch in microseconds is typecast to uint64_t
/**
* This enum provides symbolic names for the status values returned
* to applications by RAMCloud operations.
*
* 0 means success; anything else means that an error occurred.
* Not all status values can be returned by all operations.
*/
typedef enum Status {
/// Default return value when an operation was successful.
STATUS_OK = 0,
/// Indicates that the server does not know about (and is not responsible
/// for) a given tablet, but that it may exist elsewhere in the system.
/// When it's possible that the tablet exists on another server, this
/// status should be returned (in preference to the definitive
/// TABLE_DOESNT_EXIST).
STATUS_UNKNOWN_TABLET = 1,
/// Indicates that a table does not exist anywhere in the system. At present
/// only the coordinator can say with certainly that a table does not exist.
STATUS_TABLE_DOESNT_EXIST = 2,
/// Indicates that an object does not exist anywhere in the system. Note
/// that unlike with tables there is no UNKNOWN_OBJECT status. This is just
/// because servers will reject operations on objects in unknown tables with
/// a table-related status. If they own a particular tablet, then they can
/// say with certainty if an object exists there or not.
STATUS_OBJECT_DOESNT_EXIST = 3,
STATUS_OBJECT_EXISTS = 4,
STATUS_WRONG_VERSION = 5,
STATUS_NO_TABLE_SPACE = 6,
STATUS_MESSAGE_TOO_SHORT = 7,
STATUS_UNIMPLEMENTED_REQUEST = 8,
STATUS_REQUEST_FORMAT_ERROR = 9,
STATUS_RESPONSE_FORMAT_ERROR = 10,
STATUS_COULDNT_CONNECT = 11,
STATUS_BACKUP_BAD_SEGMENT_ID = 12,
/// Returned by backups when they cannot (or do not wish to) allocate
/// space for a segment replica.
STATUS_BACKUP_OPEN_REJECTED = 13,
STATUS_BACKUP_SEGMENT_OVERFLOW = 14,
STATUS_BACKUP_MALFORMED_SEGMENT = 15,
STATUS_SEGMENT_RECOVERY_FAILED = 16,
/// Indicates that a server is not prepared to handle a request at
/// the present time; the caller should retry at a later time. This
/// status can be returned under many different situations, such as
/// (a) the server is out of resources to execute the request, or
/// (b) the server is not sure it actually has authority to execute
/// the request, and is checking with the coordinator.
STATUS_RETRY = 17,
/// Indicates that the RPC requested an unknown service.
STATUS_SERVICE_NOT_AVAILABLE = 18,
STATUS_TIMEOUT = 19,
/// Indicates that server to which an RPC is directed either never existed,
/// has come and gone, or is currently in crashed state. The server is not
/// in a position to respond to RPCs and probably never will be again
/// (unless the id hasn't yet existed; once a server crashes its id will
/// never be reused).
STATUS_SERVER_NOT_UP = 20,
STATUS_INTERNAL_ERROR = 21,
/// Indicates that the object chosen for an operation does not match the
/// associated requirements. Therefore the chosen object is invalid.
STATUS_INVALID_OBJECT = 22,
/// Indicates that a tablet does not exist. This status is of relevance
/// when doing split or merge operations on tablets are executed.
STATUS_TABLET_DOESNT_EXIST = 23,
/// Indicates that the logic to partition tablets was invoked without a
/// preceeding invocation to start reading replicas off of disk.
STATUS_PARTITION_BEFORE_READ = 24,
/// Indicates that an RPC was intended for a particular server id, but
/// was actually sent to a different server id.
STATUS_WRONG_SERVER = 25,
/// Indicates that the server sending an RPC is not present in the
/// server list of the RPC recipient. Used to help servers discover
/// that they are zombies (the rest of the cluster thinks a zombie
/// is dead, but the zombie thinks it's still alive), so they don't
/// continue servicing requests when other servers have already
/// taken over their tablets. See "Zombies" in designNotes.
STATUS_CALLER_NOT_IN_CLUSTER = 26,
/// Indicates that a single request was too big to fit in an rpc and
/// thus could not be sent/carried out.
STATUS_REQUEST_TOO_LARGE = 27,
/// Indicates that the server does not know about (and is not responsible
/// for) a given indexlet, but that it may exist elsewhere in the system.
/// When it's possible that the indexlet exists on another server, this
/// status should be returned (in preference to the definitive
/// INDEX_DOESNT_EXIST).
STATUS_UNKNOWN_INDEXLET = 28,
/// Indicates that an index does not exist anywhere in the system. At
/// present only the coordinator can say with certainly that an index does
/// not exist.
STATUS_INDEX_DOESNT_EXIST = 29,
/// Indicates that a parameter provided by the client is invalid (for
/// example: it is outside allowed bounds).
STATUS_INVALID_PARAMETER = 30,
/// Indicates that client already received the result of the rpc.
/// It does not make sense to execute the RPC again. Most likely cause
/// is a delayed network packet.
STATUS_STALE_RPC = 31,
/// Indicates that the lease of a client is expired on the coordinator.
/// Master refused to execute the RPC with expired lease.
STATUS_EXPIRED_LEASE = 32,
/// Indicates that a client tried to perform transaction operations after
/// the transaction commit had already started.
STATUS_TX_OP_AFTER_COMMIT = 33,
STATUS_MAX_VALUE = 33,
// Note: if you add a new status value you must make the following
// additional updates:
// * Modify STATUS_MAX_VALUE to have a value equal to the largest
// defined status value, and make sure its definition is the last one
// in the list. STATUS_MAX_VALUE is used primarily for testing.
// * Add new entries in the tables "messages" and "symbols" in Status.cc.
// * Add a new exception class to ClientException.h
// * Add a new "case" to ClientException::throwException to map from
// the status value to a status-specific ClientException subclass.
// * In the Java bindings, add a static class for the exception to
// ClientException.java
// * Add a case for the status of the exception to throw the exception in
// ClientException.java
// * Add the exception to the Status enum in Status.java, making
// sure the status is in the correct position corresponding to its status
// code.
} Status;
/**
* Used in conditional operations to specify conditions under
* which an operation should be aborted with an error.
*
* RejectRules are typically used to ensure consistency of updates;
* for example, we might want to update a value but only if it hasn't
* changed since the last time we read it. If a RejectRules object
* is passed to an operation, the operation will be aborted if any
* of the following conditions are satisfied:
* - doesntExist is nonzero and the object does not exist
* - exists is nonzero and the object does exist
* - versionLeGiven is nonzero and the object exists with a version
* less than or equal to givenVersion.
* - versionNeGiven is nonzero and the object exists with a version
* different from givenVersion.
*/
struct RejectRules {
uint64_t givenVersion;
uint8_t doesntExist;
uint8_t exists;
uint8_t versionLeGiven;
uint8_t versionNeGiven;
} __attribute__((packed));
/**
* Selects the particular service that will handle a given rpc.
* A rpc may only be sent to one particular service; see ServiceMask for
* situations dealing with sets of services on a particular Server.
*/
enum ServiceType {
MASTER_SERVICE,
BACKUP_SERVICE,
COORDINATOR_SERVICE,
ADMIN_SERVICE,
INVALID_SERVICE, // One higher than the max.
};
/**
* Used in linearizable RPCs to check whether or not the RPC can be processed.
*/
struct ClientLease {
uint64_t leaseId; /// A cluster unique id for a specific lease.
/// 0 is used to indicate invalid or expired id.
uint64_t leaseExpiration; /// Cluster time after which the lease may have
/// become invalid.
uint64_t timestamp; /// Cluster time when this lease information was
/// provided by the coordinator.
} __attribute__((packed));
/**
* This enum defines the choices for the "opcode" field in RPC
* headers, which selects a particular operation to perform. Each
* RAMCloud service implements a subset of these operations. If you
* change this table you must also reflect the changes in the following
* locations:
* - The method opcodeSymbol in WireFormat.cc.
* - WireFormatTest.cc's out-of-range test, if ILLEGAL_RPC_TYPE was changed.
* - You may need to modify the "callees" table in scripts/genLevels.py,
* which keeps track of which RPCs invoke which other RPCs.
*/
enum Opcode {
PING = 7,
PROXY_PING = 8,
KILL = 9,
CREATE_TABLE = 10,
GET_TABLE_ID = 11,
DROP_TABLE = 12,
READ = 13,
WRITE = 14,
REMOVE = 15,
ENLIST_SERVER = 16,
GET_SERVER_LIST = 17,
GET_TABLE_CONFIG = 18,
RECOVER = 19,
HINT_SERVER_CRASHED = 20,
RECOVERY_MASTER_FINISHED = 21,
ENUMERATE = 22,
SET_MASTER_RECOVERY_INFO = 23,
FILL_WITH_TEST_DATA = 24,
MULTI_OP = 25,
GET_METRICS = 26,
BACKUP_FREE = 28,
BACKUP_GETRECOVERYDATA = 29,
BACKUP_STARTREADINGDATA = 31,
BACKUP_WRITE = 32,
BACKUP_RECOVERYCOMPLETE = 33,
UPDATE_SERVER_LIST = 35,
BACKUP_STARTPARTITION = 36,
DROP_TABLET_OWNERSHIP = 39,
TAKE_TABLET_OWNERSHIP = 40,
GET_HEAD_OF_LOG = 42,
INCREMENT = 43,
PREP_FOR_MIGRATION = 44,
RECEIVE_MIGRATION_DATA = 45,
REASSIGN_TABLET_OWNERSHIP = 46,
MIGRATE_TABLET = 47,
IS_REPLICA_NEEDED = 48,
SPLIT_TABLET = 49,
GET_SERVER_STATISTICS = 50,
SET_RUNTIME_OPTION = 51,
GET_SERVER_CONFIG = 52,
GET_BACKUP_CONFIG = 53,
GET_MASTER_CONFIG = 55,
GET_LOG_METRICS = 56,
VERIFY_MEMBERSHIP = 57,
GET_RUNTIME_OPTION = 58,
GET_LEASE_INFO = 59,
RENEW_LEASE = 60,
SERVER_CONTROL = 61,
SERVER_CONTROL_ALL = 62,
GET_SERVER_ID = 63,
READ_KEYS_AND_VALUE = 64,
LOOKUP_INDEX_KEYS = 65,
READ_HASHES = 66,
INSERT_INDEX_ENTRY = 67,
REMOVE_INDEX_ENTRY = 68,
CREATE_INDEX = 69,
DROP_INDEX = 70,
DROP_INDEXLET_OWNERSHIP = 71,
TAKE_INDEXLET_OWNERSHIP = 72,
PREP_FOR_INDEXLET_MIGRATION = 73,
SPLIT_AND_MIGRATE_INDEXLET = 74,
COORD_SPLIT_AND_MIGRATE_INDEXLET = 75,
TX_DECISION = 76,
TX_PREPARE = 77,
TX_REQUEST_ABORT = 78,
TX_HINT_FAILED = 79,
ECHO = 80,
ILLEGAL_RPC_TYPE = 81, // 1 + the highest legitimate Opcode
};
/**
* Each RPC request starts with this structure.
*/
struct RequestCommon {
uint16_t opcode; /// Opcode of operation to be performed.
uint16_t service; /// ServiceType to invoke for this rpc.
} __attribute__((packed));
/**
* Each RPC response starts with this structure.
*/
struct ResponseCommon {
Status status; // Indicates whether the operation
// succeeded; if not, it explains why.
} __attribute__((packed));
/**
* This struct describes the packet structure of a write packet
* key differences include the addition of the field uint64_t timestamp;
* to help measure latencies and the ifdef elsedef structure to allow the user
* to set the packet size using commandline macros at compile time
* default size is 100 bytes
*/
struct Write {
static const Opcode opcode = WRITE;
static const ServiceType service = MASTER_SERVICE;
struct Request {
RequestCommon common;
uint64_t tableId;
uint64_t key;
uint64_t timestamp;
ClientLease lease;
uint64_t rpcId;
uint64_t ackId;
uint32_t length; // Includes the total size of the
// keysAndValue blob in bytes.These
// follow immediately after this header
RejectRules rejectRules;
uint8_t async;
uint8_t array[15];
#ifdef P1000
uint64_t array2[112];
uint8_t array3[4];
#endif
#ifdef P10000
uint64_t array2[112];
uint8_t array3[4];
uint64_t array4[1125];
#endif
} __attribute__((packed));
struct Response {
ResponseCommon common;
uint64_t tableId;
uint64_t key;
uint64_t version;
uint64_t timestamp;
uint64_t array[8];
#ifdef P1000
uint64_t array2[112];
uint8_t array3[4];
#endif
#ifdef P10000
uint64_t array2[112];
uint8_t array3[4];
uint64_t array4[1125];
#endif
} __attribute__((packed));
};
int main(int argc, char const *argv[])
{
int bypass_offload = atoi(argv[1]); // first commandline argument, if 0,
//then the code works with SmartNIC offload,
// else it bypasses the offload
int proportion = atoi(argv[2]); // second commandline argument, determines what
//proportion of writes is inconsistent
// (if proportion = x, then the
// inconsistent to consistent write ratio is 1:x-1)
int sock = 0, valread;
struct sockaddr_in serv_addr;
char *hello = "Client preprocessing done";
char buffer1[1024] = {0};
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
printf("\n Socket creation error \n");
return -1;
}
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);
// struct timeval tv; // used in case very large packets show abnormal behaviour
// tv.tv_sec = 0;
// tv.tv_usec = 3000;
// if(setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv)<0)
// {
// printf("\nsetsockopt failed \n");
// return -1;
// }
// Convert IPv4 and IPv6 addresses from text to binary form
if(bypass_offload)
{
if(inet_pton(AF_INET, "192.168.220.60", &serv_addr.sin_addr)<=0)
{
printf("\nInvalid address/ Address not supported \n");
return -1;
}
}
else
{
if(inet_pton(AF_INET, "192.168.220.35", &serv_addr.sin_addr)<=0)
{
printf("\nInvalid address/ Address not supported \n");
return -1;
}
}
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0)
{
printf("\nConnection Failed \n");
return -1;
}
// This part of the code synchronises the programs running on the master, client and the NIC
// it ensures that setup delays do not affect the experiments
valread = read( sock , buffer1, 1024);
if(valread>0)
{
printf("%s\n",buffer1 );
memset(&buffer1[0], 0, sizeof(buffer1));
}
if( send(sock, hello, strlen(hello), 0) != strlen(hello) )
{
perror("send");
}
valread = read( sock , buffer1, 1024);
if(valread>0)
{
printf("%s\n",buffer1 );
memset(&buffer1[0], 0, sizeof(buffer1));
}
int num_replies=0;
static char buffer[2 * M] = {0};
// the main body of the client begins here
struct Write::Request w; // The write request to be sent is generated in this struct
chrono::milliseconds msStart = chrono::duration_cast<chrono::milliseconds>(chrono::high_resolution_clock::now().time_since_epoch());
int i = 0;
int success = 0; // These parameters will measure the number of success
int failure = 0; // failure
int error = 0; // and error responses respectively
for(auto start = std::chrono::steady_clock::now(), now = start; now < start + std::chrono::seconds{60}; now = std::chrono::steady_clock::now())
{//the experiment runs for 60 seconds by default
i++;
w.tableId=1; // request workload is synchronised with master
if(proportion>0) //determines proportion of inconsistent writes
{
if(i%proportion==0)
{
w.key=0;
}
else
{
w.key=i;
}
}
else
{
w.key=i;
}
//setting the relevant parameters of Write::Request packet
w.common.opcode=WRITE;
w.rejectRules.givenVersion=1;
w.rejectRules.versionNeGiven=1;
//Timestamp debugging
chrono::microseconds ms = chrono::duration_cast< chrono::microseconds >(chrono::system_clock::now().time_since_epoch());
uint64_t ts1 = ms.count();
D(printf("Client sent Timestamp1:%" PRIu64 "\n",ts1 ));
//setting the timestamp of Write::Request packet
w.timestamp=ms.count();
//sending the packet
send(sock , &w , sizeof(struct Write::Request), 0 );
//Debugging
D(printf("Client sent request Timestamp:%" PRIu64 "\n",w.timestamp ));
num_replies++;
D(printf("Write::Request message sent\n"));
// since this client runs in interactive mode, it waits for each response before generating new requests
int n = 0;
n = read(sock, buffer, 2 * M);
if(n>0)
{
// The write response received is stored in this struct
struct Write::Response wr;
//copying the contents of the buffer into the struct
memcpy(&wr, buffer, sizeof(wr));
//storing paramters for debugging purposes
int stat = wr.common.status;
int table = wr.tableId;
int key = wr.key;
int vers = wr.version;
uint64_t ts = wr.timestamp;
chrono::microseconds ms = chrono::duration_cast< chrono::microseconds >(chrono::system_clock::now().time_since_epoch());
uint64_t ts2 = ms.count();
D(printf("Client received Timestamp1:%" PRIu64 "\n",ts ));
D(printf("Client current Timestamp:%" PRIu64 "\n",ts2 ));
// difference in timestamps == RTT for this packet
uint64_t tdelta = ts2 - ts;
D(printf("Client received response Table:%d Key:%d Version:%d Status:%d Timestamp:%" PRIu64 "Time Delta:%" PRIu64 "\n",table, key, vers, stat, ts, tdelta ));
//STATUS_WRONG_VERSION is used for termination packets while debugging
if(stat==STATUS_WRONG_VERSION)
{
D(printf("Failure received:%d\n",stat ));
failure++;
latv.push_back(tdelta);
}
//packet is all zeroes, hence error
else if(ts==0)
{
D(printf("Error received:%d\n",stat ));
error++;
}
else if(stat==STATUS_OK)
{
D(printf("Success received:%d\n",stat ));
success++;
latv.push_back(tdelta);
}
else
{
D(printf("Error received:%d\n",stat ));
error++;
}
//Used to generate termination packets while debugging
// if(tdelta>1000000)
// {
// w.common.opcode=ILLEGAL_RPC_TYPE;
// send(sock , &w , sizeof(struct Write::Request), 0 );
// break;
// }
}
memset(&buffer[0], 0, sizeof(buffer));
}
//Printing experiment results to console
//and storing the latency of each packet in latencies_unsorted.txt, latencies.txt
chrono::milliseconds msEnd = chrono::duration_cast<chrono::milliseconds>(chrono::high_resolution_clock::now().time_since_epoch());
cout<<"Time taken:"<<msEnd.count() - msStart.count()<<"\n";
cout<<"Packets Received: "<<i<<"\n"<<"Success Received: "<<success<<"\n"<<"Failure Received: "<<failure<<"\n"<<"Error Received: "<<error<<"\n";
std::ofstream outFileUnsorted("latencies_unsorted.txt");
for (const auto &e : latv) outFileUnsorted << e << "\n";
std::sort (latv.begin(), latv.end());
std::ofstream outFile("latencies.txt");
for (const auto &e : latv) outFile << e << "\n";
auto nth = latv.begin() + (99*latv.size())/100;
std::nth_element(latv.begin(), nth, latv.end());
cout<<"Average Latency: "<<1.0 * std::accumulate(latv.begin(), latv.end()-1, 0LL) / std::distance(latv.begin(), latv.end()-1)<<"\n";
cout<<"Tail Latency (99%): "<<1.0 * std::accumulate(nth, latv.end()-1, 0LL) / std::distance(nth, latv.end()-1)<<"\n";
return 0;
}
#include <unistd.h>
#include <stdio.h>
#include <iostream>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <thread>
#include <vector>
#include <mutex>
#include <chrono>
#include <random>
#include <unordered_map>
#include <utility>
#include <sys/time.h> //FD_SET, FD_ISSET, FD_ZERO macros
#include <inttypes.h>
#define PORT 8085
fd_set readfds;
#define MILLIS 1000
#define MICRO MILLIS * 1000
#define MAX_TARGET 4
#define K 1024
#define M K * 1024
#define G M * 1024
// in order to turn on debugging, compile with -DDEBUG flag
#ifdef DEBUG
#define D(x) (x)
#else
#define D(x) do{}while(0)
#endif
using namespace std;
unordered_map<string, uint64_t> m;
unordered_map<string, uint64_t> master_data; // This hashmap emulates the master's DRAM
/**
* This enum provides symbolic names for the status values returned
* to applications by RAMCloud operations.
*
* 0 means success; anything else means that an error occurred.
* Not all status values can be returned by all operations.
*/
typedef enum Status {
/// Default return value when an operation was successful.
STATUS_OK = 0,
/// Indicates that the server does not know about (and is not responsible
/// for) a given tablet, but that it may exist elsewhere in the system.
/// When it's possible that the tablet exists on another server, this
/// status should be returned (in preference to the definitive
/// TABLE_DOESNT_EXIST).
STATUS_UNKNOWN_TABLET = 1,
/// Indicates that a table does not exist anywhere in the system. At present
/// only the coordinator can say with certainly that a table does not exist.
STATUS_TABLE_DOESNT_EXIST = 2,
/// Indicates that an object does not exist anywhere in the system. Note
/// that unlike with tables there is no UNKNOWN_OBJECT status. This is just
/// because servers will reject operations on objects in unknown tables with
/// a table-related status. If they own a particular tablet, then they can
/// say with certainty if an object exists there or not.
STATUS_OBJECT_DOESNT_EXIST = 3,
STATUS_OBJECT_EXISTS = 4,
STATUS_WRONG_VERSION = 5,
STATUS_NO_TABLE_SPACE = 6,
STATUS_MESSAGE_TOO_SHORT = 7,
STATUS_UNIMPLEMENTED_REQUEST = 8,
STATUS_REQUEST_FORMAT_ERROR = 9,
STATUS_RESPONSE_FORMAT_ERROR = 10,
STATUS_COULDNT_CONNECT = 11,
STATUS_BACKUP_BAD_SEGMENT_ID = 12,
/// Returned by backups when they cannot (or do not wish to) allocate
/// space for a segment replica.
STATUS_BACKUP_OPEN_REJECTED = 13,
STATUS_BACKUP_SEGMENT_OVERFLOW = 14,
STATUS_BACKUP_MALFORMED_SEGMENT = 15,
STATUS_SEGMENT_RECOVERY_FAILED = 16,
/// Indicates that a server is not prepared to handle a request at
/// the present time; the caller should retry at a later time. This
/// status can be returned under many different situations, such as
/// (a) the server is out of resources to execute the request, or
/// (b) the server is not sure it actually has authority to execute
/// the request, and is checking with the coordinator.
STATUS_RETRY = 17,
/// Indicates that the RPC requested an unknown service.
STATUS_SERVICE_NOT_AVAILABLE = 18,
STATUS_TIMEOUT = 19,
/// Indicates that server to which an RPC is directed either never existed,
/// has come and gone, or is currently in crashed state. The server is not
/// in a position to respond to RPCs and probably never will be again
/// (unless the id hasn't yet existed; once a server crashes its id will
/// never be reused).
STATUS_SERVER_NOT_UP = 20,
STATUS_INTERNAL_ERROR = 21,
/// Indicates that the object chosen for an operation does not match the
/// associated requirements. Therefore the chosen object is invalid.
STATUS_INVALID_OBJECT = 22,
/// Indicates that a tablet does not exist. This status is of relevance
/// when doing split or merge operations on tablets are executed.
STATUS_TABLET_DOESNT_EXIST = 23,
/// Indicates that the logic to partition tablets was invoked without a
/// preceeding invocation to start reading replicas off of disk.
STATUS_PARTITION_BEFORE_READ = 24,
/// Indicates that an RPC was intended for a particular server id, but
/// was actually sent to a different server id.
STATUS_WRONG_SERVER = 25,
/// Indicates that the server sending an RPC is not present in the
/// server list of the RPC recipient. Used to help servers discover
/// that they are zombies (the rest of the cluster thinks a zombie
/// is dead, but the zombie thinks it's still alive), so they don't
/// continue servicing requests when other servers have already
/// taken over their tablets. See "Zombies" in designNotes.
STATUS_CALLER_NOT_IN_CLUSTER = 26,
/// Indicates that a single request was too big to fit in an rpc and
/// thus could not be sent/carried out.
STATUS_REQUEST_TOO_LARGE = 27,
/// Indicates that the server does not know about (and is not responsible
/// for) a given indexlet, but that it may exist elsewhere in the system.
/// When it's possible that the indexlet exists on another server, this
/// status should be returned (in preference to the definitive
/// INDEX_DOESNT_EXIST).
STATUS_UNKNOWN_INDEXLET = 28,
/// Indicates that an index does not exist anywhere in the system. At
/// present only the coordinator can say with certainly that an index does
/// not exist.
STATUS_INDEX_DOESNT_EXIST = 29,
/// Indicates that a parameter provided by the client is invalid (for
/// example: it is outside allowed bounds).
STATUS_INVALID_PARAMETER = 30,
/// Indicates that client already received the result of the rpc.
/// It does not make sense to execute the RPC again. Most likely cause
/// is a delayed network packet.
STATUS_STALE_RPC = 31,
/// Indicates that the lease of a client is expired on the coordinator.
/// Master refused to execute the RPC with expired lease.
STATUS_EXPIRED_LEASE = 32,
/// Indicates that a client tried to perform transaction operations after
/// the transaction commit had already started.
STATUS_TX_OP_AFTER_COMMIT = 33,
STATUS_MAX_VALUE = 33,
// Note: if you add a new status value you must make the following
// additional updates:
// * Modify STATUS_MAX_VALUE to have a value equal to the largest
// defined status value, and make sure its definition is the last one
// in the list. STATUS_MAX_VALUE is used primarily for testing.
// * Add new entries in the tables "messages" and "symbols" in Status.cc.
// * Add a new exception class to ClientException.h
// * Add a new "case" to ClientException::throwException to map from
// the status value to a status-specific ClientException subclass.
// * In the Java bindings, add a static class for the exception to
// ClientException.java
// * Add a case for the status of the exception to throw the exception in
// ClientException.java
// * Add the exception to the Status enum in Status.java, making
// sure the status is in the correct position corresponding to its status
// code.
} Status;
/**
* Used in conditional operations to specify conditions under
* which an operation should be aborted with an error.
*
* RejectRules are typically used to ensure consistency of updates;
* for example, we might want to update a value but only if it hasn't
* changed since the last time we read it. If a RejectRules object
* is passed to an operation, the operation will be aborted if any
* of the following conditions are satisfied:
* - doesntExist is nonzero and the object does not exist
* - exists is nonzero and the object does exist
* - versionLeGiven is nonzero and the object exists with a version
* less than or equal to givenVersion.
* - versionNeGiven is nonzero and the object exists with a version
* different from givenVersion.
*/
struct RejectRules {
uint64_t givenVersion;
uint8_t doesntExist;
uint8_t exists;
uint8_t versionLeGiven;
uint8_t versionNeGiven;
} __attribute__((packed));
/**
* Selects the particular service that will handle a given rpc.
* A rpc may only be sent to one particular service; see ServiceMask for
* situations dealing with sets of services on a particular Server.
*/
enum ServiceType {
MASTER_SERVICE,
BACKUP_SERVICE,
COORDINATOR_SERVICE,
ADMIN_SERVICE,
INVALID_SERVICE, // One higher than the max.
};
/**
* Used in linearizable RPCs to check whether or not the RPC can be processed.
*/
struct ClientLease {
uint64_t leaseId; /// A cluster unique id for a specific lease.
/// 0 is used to indicate invalid or expired id.
uint64_t leaseExpiration; /// Cluster time after which the lease may have
/// become invalid.
uint64_t timestamp; /// Cluster time when this lease information was
/// provided by the coordinator.
} __attribute__((packed));
/**
* This enum defines the choices for the "opcode" field in RPC
* headers, which selects a particular operation to perform. Each
* RAMCloud service implements a subset of these operations. If you
* change this table you must also reflect the changes in the following
* locations:
* - The method opcodeSymbol in WireFormat.cc.
* - WireFormatTest.cc's out-of-range test, if ILLEGAL_RPC_TYPE was changed.
* - You may need to modify the "callees" table in scripts/genLevels.py,
* which keeps track of which RPCs invoke which other RPCs.
*/
enum Opcode {
PING = 7,
PROXY_PING = 8,
KILL = 9,
CREATE_TABLE = 10,
GET_TABLE_ID = 11,
DROP_TABLE = 12,
READ = 13,
WRITE = 14,
REMOVE = 15,
ENLIST_SERVER = 16,
GET_SERVER_LIST = 17,
GET_TABLE_CONFIG = 18,
RECOVER = 19,
HINT_SERVER_CRASHED = 20,
RECOVERY_MASTER_FINISHED = 21,
ENUMERATE = 22,
SET_MASTER_RECOVERY_INFO = 23,
FILL_WITH_TEST_DATA = 24,
MULTI_OP = 25,
GET_METRICS = 26,
BACKUP_FREE = 28,
BACKUP_GETRECOVERYDATA = 29,
BACKUP_STARTREADINGDATA = 31,
BACKUP_WRITE = 32,
BACKUP_RECOVERYCOMPLETE = 33,
UPDATE_SERVER_LIST = 35,
BACKUP_STARTPARTITION = 36,
DROP_TABLET_OWNERSHIP = 39,
TAKE_TABLET_OWNERSHIP = 40,
GET_HEAD_OF_LOG = 42,
INCREMENT = 43,
PREP_FOR_MIGRATION = 44,
RECEIVE_MIGRATION_DATA = 45,
REASSIGN_TABLET_OWNERSHIP = 46,
MIGRATE_TABLET = 47,
IS_REPLICA_NEEDED = 48,
SPLIT_TABLET = 49,
GET_SERVER_STATISTICS = 50,
SET_RUNTIME_OPTION = 51,
GET_SERVER_CONFIG = 52,
GET_BACKUP_CONFIG = 53,
GET_MASTER_CONFIG = 55,
GET_LOG_METRICS = 56,
VERIFY_MEMBERSHIP = 57,
GET_RUNTIME_OPTION = 58,
GET_LEASE_INFO = 59,
RENEW_LEASE = 60,
SERVER_CONTROL = 61,
SERVER_CONTROL_ALL = 62,
GET_SERVER_ID = 63,
READ_KEYS_AND_VALUE = 64,
LOOKUP_INDEX_KEYS = 65,
READ_HASHES = 66,
INSERT_INDEX_ENTRY = 67,
REMOVE_INDEX_ENTRY = 68,
CREATE_INDEX = 69,
DROP_INDEX = 70,
DROP_INDEXLET_OWNERSHIP = 71,
TAKE_INDEXLET_OWNERSHIP = 72,
PREP_FOR_INDEXLET_MIGRATION = 73,
SPLIT_AND_MIGRATE_INDEXLET = 74,
COORD_SPLIT_AND_MIGRATE_INDEXLET = 75,
TX_DECISION = 76,
TX_PREPARE = 77,
TX_REQUEST_ABORT = 78,
TX_HINT_FAILED = 79,
ECHO = 80,
ILLEGAL_RPC_TYPE = 81, // 1 + the highest legitimate Opcode
};
/**
* Each RPC request starts with this structure.
*/
struct RequestCommon {
uint16_t opcode; /// Opcode of operation to be performed.
uint16_t service; /// ServiceType to invoke for this rpc.
} __attribute__((packed));
/**
* Each RPC response starts with this structure.
*/
struct ResponseCommon {
Status status; // Indicates whether the operation
// succeeded; if not, it explains why.
} __attribute__((packed));
/**
* This struct describes the packet structure of a write packet
* key differences include the addition of the field uint64_t timestamp;
* to help measure latencies and the ifdef elsedef structure to allow the user
* to set the packet size using commandline macros at compile time
* default size is 100 bytes
*/
struct Write {
static const Opcode opcode = WRITE;
static const ServiceType service = MASTER_SERVICE;
struct Request {
RequestCommon common;
uint64_t tableId;
uint64_t key;
uint64_t timestamp;
ClientLease lease;
uint64_t rpcId;
uint64_t ackId;
uint32_t length; // Includes the total size of the
// keysAndValue blob in bytes.These
// follow immediately after this header
RejectRules rejectRules;
uint8_t async;
uint8_t array[15];
#ifdef P1000
uint64_t array2[112];
uint8_t array3[4];
#endif
#ifdef P10000
uint64_t array2[112];
uint8_t array3[4];
uint64_t array4[1125];
#endif
} __attribute__((packed));
struct Response {
ResponseCommon common;
uint64_t tableId;
uint64_t key;
uint64_t version;
uint64_t timestamp;
uint64_t array[8];
#ifdef P1000
uint64_t array2[112];
uint8_t array3[4];
#endif
#ifdef P10000
uint64_t array2[112];
uint8_t array3[4];
uint64_t array4[1125];
#endif
} __attribute__((packed));
};
int main(int argc, char const *argv[])
{
int sock = 0, valread;
struct sockaddr_in serv_addr;
char *hello = "Master preprocessing done";
char buffer1[1024] = {0};
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
printf("\n Socket creation error \n");
return -1;
}
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);
// Convert IPv4 and IPv6 addresses from text to binary form
if(inet_pton(AF_INET, "192.168.220.35", &serv_addr.sin_addr)<=0)
{
printf("\nInvalid address/ Address not supported \n");
return -1;
}
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0)
{
printf("\nConnection Failed \n");
return -1;
}
int sd, max_sd;
int csd, msd;
int max_clients = 1;
int activity;
int client_socket=0;
valread = read( sock , buffer1, 1024);
if(valread>0)
{
printf("%s\n",buffer1 );
memset(&buffer1[0], 0, sizeof(buffer1));
}
//setting up master's DRAM before responding to the synchronisation packet
for(int i = 0; i<2000000;i++)
{
master_data.insert({"1$"+to_string(i),1});
}
if( send(sock, hello, strlen(hello), 0) != strlen(hello) )
{
perror("send");
}
valread = read( sock , buffer1, 1024);
if(valread>0)
{
printf("%s\n",buffer1 );
memset(&buffer1[0], 0, sizeof(buffer1));
}
while(1)
{
//clear the socket set
FD_ZERO(&readfds);
//add master socket to set
FD_SET(sock, &readfds);
max_sd = sock;
//socket descriptor
sd = client_socket;
//if valid socket descriptor then add to read list
if(sd > 0)
FD_SET( sd , &readfds);
//highest file descriptor number, need it for the select function
if(sd > max_sd)
max_sd = sd;
//wait for an activity on one of the sockets , timeout is NULL ,
//so wait indefinitely
activity = select( max_sd + 1 , &readfds , NULL , NULL , NULL);
if ((activity < 0) && (errno!=EINTR))
{
printf("select error");
}
//If something happened on the master socket ,
if (FD_ISSET(sock, &readfds))
{
int n = 0;
static char buffer[2 * M] = {0};
n = read(sock, buffer, 2 * M);
if(n>0)
{
// The write request received is stored in this struct
struct Write::Request w1;
memcpy(&w1, buffer, sizeof(w1));
int a = w1.common.opcode;
D(printf("Opcode:%d\n",a ));
//comparing the opcode to ensure that only write packets are processed by the offload
//and all other packets pass through
if(a==WRITE)
{
struct Write::Request w;
memcpy(&w, buffer, sizeof(w));
//debugging
D(printf("tableId:%lu\n",w.tableId ));
D(printf("key:%lu\n",w.key));
string s ="";
//Create the key for the hashmap by concatenating
//the tableId and the key in the write packet
s=s+to_string(w.tableId)+"$"+to_string(w.key);
D(printf("HashKey:%s\n",s.c_str()));
//Master checks the reject rules to respond with failure
//if operation is atomic and there is a version number mismatch
if(w.rejectRules.versionNeGiven)
{
string s ="";
//Create the key for the hashmap by concatenating
//the tableId and the key in the write packet
s=s+to_string(w.tableId)+"$"+to_string(w.key);
D(cout<<s<<"\n");
if (master_data.find(s) != master_data.end())
{
D(std::cout << "Key found\n");
uint64_t curr_version_number = master_data[s];
//compare curr_version_number with version number in w
if(w.rejectRules.givenVersion!=curr_version_number)
{
D(std::cout << "version number doesn't match\n");
//raise failure response
struct Write::Response wr;
wr.common.status=STATUS_WRONG_VERSION;
wr.tableId=w.tableId;
wr.key=w.key;
wr.version=curr_version_number;
wr.timestamp=w.timestamp;
send(sock , &wr , sizeof(struct Write::Response), 0 );
D(printf("%s\n"," master raised failure response" ));
}
else
{
D(std::cout << "version number matches\n");
//update version number in master
master_data[s]=master_data[s]+(uint64_t)1;
D(printf("Updated master_data Key:%s Version:%lu\n",s.c_str(), master_data[s]));
//raise success response
struct Write::Response wr;
wr.common.status=STATUS_OK;
wr.tableId=w.tableId;
wr.key=w.key;
wr.version=master_data[s];
wr.timestamp=w.timestamp;
//storing paramters for debugging purposes
int stat = wr.common.status;
int table = wr.tableId;
int key = wr.key;
int vers = wr.version;
uint64_t ts = wr.timestamp;
D(printf("Master sent response Table:%d Key:%d Version:%d Status:%d Timestamp:%" PRIu64 "\n",table, key, vers, stat, ts ));
chrono::microseconds ms = chrono::duration_cast< chrono::microseconds >(chrono::system_clock::now().time_since_epoch());
uint64_t ts2 = ms.count();
D(printf("Master sent response at Timestamp:%" PRIu64 "\n", ts2 ));
// printf("Master sent hashmap Key:%s Version:%lu\n",s.c_str(), vers);
send(sock , &wr , sizeof(struct Write::Response), 0 );
D(printf("%s\n","master raised success response" ));
}
}
else
{
D(std::cout << "Key not found in master data\n");
struct Write::Response wr;
//raise failure response since object does not exist in master's DRAM
wr.common.status=STATUS_OBJECT_DOESNT_EXIST;
wr.version=1;
wr.tableId=w.tableId;
wr.key=w.key;
wr.timestamp=w.timestamp;
send(sock , &wr , sizeof(struct Write::Response), 0 );
D(printf("%s\n","object doesn't exist in master, sent auto failure" ));
}
}
else
{
//raise success response
struct Write::Response wr;
wr.common.status=STATUS_OK;
wr.version=1;
wr.tableId=w.tableId;
wr.key=w.key;
wr.timestamp=w.timestamp;
send(sock , &wr , sizeof(struct Write::Response), 0 );
D(printf("%s\n","non transaction packet, sent auto success\n" ));
}
}
//termination packet for debugging
else if(a==ILLEGAL_RPC_TYPE)
{
struct Write::Response wr;
wr.common.status=STATUS_MAX_VALUE;
wr.version=1;
wr.tableId=w1.tableId;
wr.key=w1.key;
wr.timestamp=w1.timestamp;
send(sock , &wr , sizeof(struct Write::Response), 0 );
chrono::microseconds ms = chrono::duration_cast< chrono::microseconds >(chrono::system_clock::now().time_since_epoch());
uint64_t ts2 = ms.count();
D(printf("Master sent response at Timestamp:%" PRIu64 "\n", ts2 ));
D(printf("%s\n","testing\n" ));
break;
}
}
}
}
return 0;
}
#include <unistd.h>
#include <stdio.h>
#include <iostream>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <thread>
#include <vector>
#include <mutex>
#include <chrono>
#include <random>
#include <unordered_map>
#include <inttypes.h>
#include <sys/time.h> //FD_SET, FD_ISSET, FD_ZERO macros
#define TRUE 1
#define FALSE 0
#define PORT 8085
#define MILLIS 1000
#define MICRO MILLIS * 1000
#define MAX_TARGET 4
#define K 1024
#define M K * 1024
#define G M * 1024
// in order to turn on debugging, compile with -DDEBUG flag
#ifdef DEBUG
#define D(x) (x)
#else
#define D(x) do{}while(0)
#endif
using namespace std;
fd_set readfds;
unordered_map<string, uint64_t> m;
unordered_map<string, uint64_t> master_data; // This hashmap emulates the master's DRAM
/**
* This enum provides symbolic names for the status values returned
* to applications by RAMCloud operations.
*
* 0 means success; anything else means that an error occurred.
* Not all status values can be returned by all operations.
*/
typedef enum Status {
/// Default return value when an operation was successful.
STATUS_OK = 0,
/// Indicates that the server does not know about (and is not responsible
/// for) a given tablet, but that it may exist elsewhere in the system.
/// When it's possible that the tablet exists on another server, this
/// status should be returned (in preference to the definitive
/// TABLE_DOESNT_EXIST).
STATUS_UNKNOWN_TABLET = 1,
/// Indicates that a table does not exist anywhere in the system. At present
/// only the coordinator can say with certainly that a table does not exist.
STATUS_TABLE_DOESNT_EXIST = 2,
/// Indicates that an object does not exist anywhere in the system. Note
/// that unlike with tables there is no UNKNOWN_OBJECT status. This is just
/// because servers will reject operations on objects in unknown tables with
/// a table-related status. If they own a particular tablet, then they can
/// say with certainty if an object exists there or not.
STATUS_OBJECT_DOESNT_EXIST = 3,
STATUS_OBJECT_EXISTS = 4,
STATUS_WRONG_VERSION = 5,
STATUS_NO_TABLE_SPACE = 6,
STATUS_MESSAGE_TOO_SHORT = 7,
STATUS_UNIMPLEMENTED_REQUEST = 8,
STATUS_REQUEST_FORMAT_ERROR = 9,
STATUS_RESPONSE_FORMAT_ERROR = 10,
STATUS_COULDNT_CONNECT = 11,
STATUS_BACKUP_BAD_SEGMENT_ID = 12,
/// Returned by backups when they cannot (or do not wish to) allocate
/// space for a segment replica.
STATUS_BACKUP_OPEN_REJECTED = 13,
STATUS_BACKUP_SEGMENT_OVERFLOW = 14,
STATUS_BACKUP_MALFORMED_SEGMENT = 15,
STATUS_SEGMENT_RECOVERY_FAILED = 16,
/// Indicates that a server is not prepared to handle a request at
/// the present time; the caller should retry at a later time. This
/// status can be returned under many different situations, such as
/// (a) the server is out of resources to execute the request, or
/// (b) the server is not sure it actually has authority to execute
/// the request, and is checking with the coordinator.
STATUS_RETRY = 17,
/// Indicates that the RPC requested an unknown service.
STATUS_SERVICE_NOT_AVAILABLE = 18,
STATUS_TIMEOUT = 19,
/// Indicates that server to which an RPC is directed either never existed,
/// has come and gone, or is currently in crashed state. The server is not
/// in a position to respond to RPCs and probably never will be again
/// (unless the id hasn't yet existed; once a server crashes its id will
/// never be reused).
STATUS_SERVER_NOT_UP = 20,
STATUS_INTERNAL_ERROR = 21,
/// Indicates that the object chosen for an operation does not match the
/// associated requirements. Therefore the chosen object is invalid.
STATUS_INVALID_OBJECT = 22,
/// Indicates that a tablet does not exist. This status is of relevance
/// when doing split or merge operations on tablets are executed.
STATUS_TABLET_DOESNT_EXIST = 23,
/// Indicates that the logic to partition tablets was invoked without a
/// preceeding invocation to start reading replicas off of disk.
STATUS_PARTITION_BEFORE_READ = 24,
/// Indicates that an RPC was intended for a particular server id, but
/// was actually sent to a different server id.
STATUS_WRONG_SERVER = 25,
/// Indicates that the server sending an RPC is not present in the
/// server list of the RPC recipient. Used to help servers discover
/// that they are zombies (the rest of the cluster thinks a zombie
/// is dead, but the zombie thinks it's still alive), so they don't
/// continue servicing requests when other servers have already
/// taken over their tablets. See "Zombies" in designNotes.
STATUS_CALLER_NOT_IN_CLUSTER = 26,
/// Indicates that a single request was too big to fit in an rpc and
/// thus could not be sent/carried out.
STATUS_REQUEST_TOO_LARGE = 27,
/// Indicates that the server does not know about (and is not responsible
/// for) a given indexlet, but that it may exist elsewhere in the system.
/// When it's possible that the indexlet exists on another server, this
/// status should be returned (in preference to the definitive
/// INDEX_DOESNT_EXIST).
STATUS_UNKNOWN_INDEXLET = 28,
/// Indicates that an index does not exist anywhere in the system. At
/// present only the coordinator can say with certainly that an index does
/// not exist.
STATUS_INDEX_DOESNT_EXIST = 29,
/// Indicates that a parameter provided by the client is invalid (for
/// example: it is outside allowed bounds).
STATUS_INVALID_PARAMETER = 30,
/// Indicates that client already received the result of the rpc.
/// It does not make sense to execute the RPC again. Most likely cause
/// is a delayed network packet.
STATUS_STALE_RPC = 31,
/// Indicates that the lease of a client is expired on the coordinator.
/// Master refused to execute the RPC with expired lease.
STATUS_EXPIRED_LEASE = 32,
/// Indicates that a client tried to perform transaction operations after
/// the transaction commit had already started.
STATUS_TX_OP_AFTER_COMMIT = 33,
STATUS_MAX_VALUE = 33,
// Note: if you add a new status value you must make the following
// additional updates:
// * Modify STATUS_MAX_VALUE to have a value equal to the largest
// defined status value, and make sure its definition is the last one
// in the list. STATUS_MAX_VALUE is used primarily for testing.
// * Add new entries in the tables "messages" and "symbols" in Status.cc.
// * Add a new exception class to ClientException.h
// * Add a new "case" to ClientException::throwException to map from
// the status value to a status-specific ClientException subclass.
// * In the Java bindings, add a static class for the exception to
// ClientException.java
// * Add a case for the status of the exception to throw the exception in
// ClientException.java
// * Add the exception to the Status enum in Status.java, making
// sure the status is in the correct position corresponding to its status
// code.
} Status;
/**
* Used in conditional operations to specify conditions under
* which an operation should be aborted with an error.
*
* RejectRules are typically used to ensure consistency of updates;
* for example, we might want to update a value but only if it hasn't
* changed since the last time we read it. If a RejectRules object
* is passed to an operation, the operation will be aborted if any
* of the following conditions are satisfied:
* - doesntExist is nonzero and the object does not exist
* - exists is nonzero and the object does exist
* - versionLeGiven is nonzero and the object exists with a version
* less than or equal to givenVersion.
* - versionNeGiven is nonzero and the object exists with a version
* different from givenVersion.
*/
struct RejectRules {
uint64_t givenVersion;
uint8_t doesntExist;
uint8_t exists;
uint8_t versionLeGiven;
uint8_t versionNeGiven;
} __attribute__((packed));
/**
* Selects the particular service that will handle a given rpc.
* A rpc may only be sent to one particular service; see ServiceMask for
* situations dealing with sets of services on a particular Server.
*/
enum ServiceType {
MASTER_SERVICE,
BACKUP_SERVICE,
COORDINATOR_SERVICE,
ADMIN_SERVICE,
INVALID_SERVICE, // One higher than the max.
};
/**
* Used in linearizable RPCs to check whether or not the RPC can be processed.
*/
struct ClientLease {
uint64_t leaseId; /// A cluster unique id for a specific lease.
/// 0 is used to indicate invalid or expired id.
uint64_t leaseExpiration; /// Cluster time after which the lease may have
/// become invalid.
uint64_t timestamp; /// Cluster time when this lease information was
/// provided by the coordinator.
} __attribute__((packed));
/**
* This enum defines the choices for the "opcode" field in RPC
* headers, which selects a particular operation to perform. Each
* RAMCloud service implements a subset of these operations. If you
* change this table you must also reflect the changes in the following
* locations:
* - The method opcodeSymbol in WireFormat.cc.
* - WireFormatTest.cc's out-of-range test, if ILLEGAL_RPC_TYPE was changed.
* - You may need to modify the "callees" table in scripts/genLevels.py,
* which keeps track of which RPCs invoke which other RPCs.
*/
enum Opcode {
PING = 7,
PROXY_PING = 8,
KILL = 9,
CREATE_TABLE = 10,
GET_TABLE_ID = 11,
DROP_TABLE = 12,
READ = 13,
WRITE = 14,
REMOVE = 15,
ENLIST_SERVER = 16,
GET_SERVER_LIST = 17,
GET_TABLE_CONFIG = 18,
RECOVER = 19,
HINT_SERVER_CRASHED = 20,
RECOVERY_MASTER_FINISHED = 21,
ENUMERATE = 22,
SET_MASTER_RECOVERY_INFO = 23,
FILL_WITH_TEST_DATA = 24,
MULTI_OP = 25,
GET_METRICS = 26,
BACKUP_FREE = 28,
BACKUP_GETRECOVERYDATA = 29,
BACKUP_STARTREADINGDATA = 31,
BACKUP_WRITE = 32,
BACKUP_RECOVERYCOMPLETE = 33,
UPDATE_SERVER_LIST = 35,
BACKUP_STARTPARTITION = 36,
DROP_TABLET_OWNERSHIP = 39,
TAKE_TABLET_OWNERSHIP = 40,
GET_HEAD_OF_LOG = 42,
INCREMENT = 43,
PREP_FOR_MIGRATION = 44,
RECEIVE_MIGRATION_DATA = 45,
REASSIGN_TABLET_OWNERSHIP = 46,
MIGRATE_TABLET = 47,
IS_REPLICA_NEEDED = 48,
SPLIT_TABLET = 49,
GET_SERVER_STATISTICS = 50,
SET_RUNTIME_OPTION = 51,
GET_SERVER_CONFIG = 52,
GET_BACKUP_CONFIG = 53,
GET_MASTER_CONFIG = 55,
GET_LOG_METRICS = 56,
VERIFY_MEMBERSHIP = 57,
GET_RUNTIME_OPTION = 58,
GET_LEASE_INFO = 59,
RENEW_LEASE = 60,
SERVER_CONTROL = 61,
SERVER_CONTROL_ALL = 62,
GET_SERVER_ID = 63,
READ_KEYS_AND_VALUE = 64,
LOOKUP_INDEX_KEYS = 65,
READ_HASHES = 66,
INSERT_INDEX_ENTRY = 67,
REMOVE_INDEX_ENTRY = 68,
CREATE_INDEX = 69,
DROP_INDEX = 70,
DROP_INDEXLET_OWNERSHIP = 71,
TAKE_INDEXLET_OWNERSHIP = 72,
PREP_FOR_INDEXLET_MIGRATION = 73,
SPLIT_AND_MIGRATE_INDEXLET = 74,
COORD_SPLIT_AND_MIGRATE_INDEXLET = 75,
TX_DECISION = 76,
TX_PREPARE = 77,
TX_REQUEST_ABORT = 78,
TX_HINT_FAILED = 79,
ECHO = 80,
ILLEGAL_RPC_TYPE = 81, // 1 + the highest legitimate Opcode
};
/**
* Each RPC request starts with this structure.
*/
struct RequestCommon {
uint16_t opcode; /// Opcode of operation to be performed.
uint16_t service; /// ServiceType to invoke for this rpc.
} __attribute__((packed));
/**
* Each RPC response starts with this structure.
*/
struct ResponseCommon {
Status status; // Indicates whether the operation
// succeeded; if not, it explains why.
} __attribute__((packed));
/**
* This struct describes the packet structure of a write packet
* key differences include the addition of the field uint64_t timestamp;
* to help measure latencies and the ifdef elsedef structure to allow the user
* to set the packet size using commandline macros at compile time
* default size is 100 bytes
*/
struct Write {
static const Opcode opcode = WRITE;
static const ServiceType service = MASTER_SERVICE;
struct Request {
RequestCommon common;
uint64_t tableId;
uint64_t key;
uint64_t timestamp;
ClientLease lease;
uint64_t rpcId;
uint64_t ackId;
uint32_t length; // Includes the total size of the
// keysAndValue blob in bytes.These
// follow immediately after this header
RejectRules rejectRules;
uint8_t async;
uint8_t array[15];
#ifdef P1000
uint64_t array2[112];
uint8_t array3[4];
#endif
#ifdef P10000
uint64_t array2[112];
uint8_t array3[4];
uint64_t array4[1125];
#endif
} __attribute__((packed));
struct Response {
ResponseCommon common;
uint64_t tableId;
uint64_t key;
uint64_t version;
uint64_t timestamp;
uint64_t array[8];
#ifdef P1000
uint64_t array2[112];
uint8_t array3[4];
#endif
#ifdef P10000
uint64_t array2[112];
uint8_t array3[4];
uint64_t array4[1125];
#endif
} __attribute__((packed));
};
int main(int argc, char const *argv[])
{
int server_fd, new_socket, valread;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
// char buffer[1024] = {0};
char *hello = "1st message from server";
char *both = "Both master and client are connected";
char *mprep = "Master preprocessing done";
// Creating socket file descriptor
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0)
{
perror("socket failed");
exit(EXIT_FAILURE);
}
// Forcefully attaching socket to the port 8080
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT,
&opt, sizeof(opt)))
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons( PORT );
// Forcefully attaching socket to the port 8080
if (bind(server_fd, (struct sockaddr *)&address,
sizeof(address))<0)
{
perror("bind failed");
exit(EXIT_FAILURE);
}
if (listen(server_fd, 3) < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}
char buffer1[1024] = {0};
int sd, max_sd;
int csd, msd;
int max_clients = 2;
int activity;
int client_socket[max_clients-1];
int both_connected = 0,first_time=0;
static char buffer[2 * M] = {0};
//initialise all client_socket[] to 0 so not checked
for (int i = 0; i < max_clients; i++)
{
client_socket[i] = 0;
}
for(int i = 0; i<2000000;i++)
{
master_data.insert({"1$"+to_string(i),1});
}
printf("%s\n",mprep );
while(TRUE)
{
//clear the socket set
FD_ZERO(&readfds);
//add master socket to set
FD_SET(server_fd, &readfds);
max_sd = server_fd;
//add child sockets to set
for ( int i = 0 ; i < max_clients ; i++)
{
//socket descriptor
sd = client_socket[i];
//if valid socket descriptor then add to read list
if(sd > 0)
FD_SET( sd , &readfds);
//highest file descriptor number, need it for the select function
if(sd > max_sd)
max_sd = sd;
}
//wait for an activity on one of the sockets , timeout is NULL ,
//so wait indefinitely
activity = select( max_sd + 1 , &readfds , NULL , NULL , NULL);
if ((activity < 0) && (errno!=EINTR))
{
printf("select error");
}
//If something happened on the master socket ,
//then its an incoming connection
if (FD_ISSET(server_fd, &readfds))
{
if ((new_socket = accept(server_fd,
(struct sockaddr *)&address, (socklen_t*)&addrlen))<0)
{
perror("accept");
exit(EXIT_FAILURE);
}
both_connected++;
//inform user of socket number - used in send and receive commands
printf("New connection , socket fd is %d , ip is : %s , port : %d \n" , new_socket , inet_ntoa(address.sin_addr) , ntohs(address.sin_port));
//send new connection greeting message
if( send(new_socket, hello, strlen(hello), 0) != strlen(hello) )
{
perror("send");
}
puts("Welcome message sent successfully");
//add new socket to array of sockets
for (int i = 0; i < max_clients; i++)
{
//if position is empty
if( client_socket[i] == 0 )
{
client_socket[i] = new_socket;
printf("Adding to list of sockets as %d\n" , i);
break;
}
}
}
// This part of the code synchronises the programs running on the master and client
// it ensures that setup delays do not affect the experiments
if(both_connected==1 && first_time==0)
{
valread = read( client_socket[0] , buffer1, 1024);
if(valread>0)
{
printf("%s\n",buffer1 );
memset(&buffer1[0], 0, sizeof(buffer1));
}
if( send(client_socket[0], both, strlen(both), 0) != strlen(both) )
{
perror("send");
}
first_time++;
}
if(both_connected==1)
{
getpeername(client_socket[0] , (struct sockaddr*)&address , (socklen_t*)&addrlen);
csd = client_socket[0];
if (FD_ISSET( csd , &readfds))
{
//Check if it was for closing , and also read the
//incoming message
if ((valread = read( csd , buffer, 2 * M)) == 0)
{
//CLient disconnected, print details
getpeername(csd , (struct sockaddr*)&address , (socklen_t*)&addrlen);
printf("Host disconnected , ip %s , port %d \n" , inet_ntoa(address.sin_addr) , ntohs(address.sin_port));
//Close the socket and mark as 0 in list for reuse
close( csd );
client_socket[0] = 0;
}
else
{
// The write request received is stored in this struct
struct Write::Request w1;
memcpy(&w1, buffer, sizeof(w1));
int a = w1.common.opcode;
D(printf("Opcode:%d\n",a ));
//comparing the opcode to ensure that only write packets are processed by the offload
//and all other packets pass through
if(a==WRITE)
{
struct Write::Request w;
memcpy(&w, buffer, sizeof(w));
//debugging
D(printf("tableId:%lu\n",w.tableId ));
D(printf("key:%lu\n",w.key));
string s ="";
//Create the key for the hashmap by concatenating
//the tableId and the key in the write packet
s=s+to_string(w.tableId)+"$"+to_string(w.key);
D(printf("HashKey:%s\n",s.c_str()));
//Master checks the reject rules to respond with failure
//if operation is atomic and there is a version number mismatch
if(w.rejectRules.versionNeGiven)
{
string s ="";
//Create the key for the hashmap by concatenating
//the tableId and the key in the write packet
s=s+to_string(w.tableId)+"$"+to_string(w.key);
D(cout<<s<<"\n");
if (master_data.find(s) != master_data.end())
{
D(std::cout << "Key found\n");
uint64_t curr_version_number = master_data[s];
//compare curr_version_number with version number in w
if(w.rejectRules.givenVersion!=curr_version_number)
{
D(std::cout << "version number doesn't match\n");
//raise failure response
struct Write::Response wr;
wr.common.status=STATUS_WRONG_VERSION;
wr.tableId=w.tableId;
wr.key=w.key;
wr.version=curr_version_number;
wr.timestamp=w.timestamp;
send(csd , &wr , sizeof(struct Write::Response), 0 );
D(printf("%s\n"," master raised failure response" ));
}
else
{
D(std::cout << "version number matches\n");
//update version number in master
master_data[s]=master_data[s]+(uint64_t)1;
D(printf("Updated master_data Key:%s Version:%lu\n",s.c_str(), master_data[s]));
//raise success response
struct Write::Response wr;
wr.common.status=STATUS_OK;
wr.tableId=w.tableId;
wr.key=w.key;
wr.version=master_data[s];
wr.timestamp=w.timestamp;
//storing paramters for debugging purposes
int stat = wr.common.status;
int table = wr.tableId;
int key = wr.key;
int vers = wr.version;
uint64_t ts = wr.timestamp;
D(printf("Master sent response Table:%d Key:%d Version:%d Status:%d Timestamp:%" PRIu64 "\n",table, key, vers, stat, ts ));
chrono::microseconds ms = chrono::duration_cast< chrono::microseconds >(chrono::system_clock::now().time_since_epoch());
uint64_t ts2 = ms.count();
D(printf("Master sent response at Timestamp:%" PRIu64 "\n", ts2 ));
// printf("Master sent hashmap Key:%s Version:%lu\n",s.c_str(), vers);
send(csd , &wr , sizeof(struct Write::Response), 0 );
D(printf("%s\n","master raised success response" ));
}
}
else
{
D(std::cout << "Key not found in master data\n");
struct Write::Response wr;
//raise failure response since object does not exist in master's DRAM
wr.common.status=STATUS_OBJECT_DOESNT_EXIST;
wr.version=1;
wr.tableId=w.tableId;
wr.key=w.key;
wr.timestamp=w.timestamp;
send(csd , &wr , sizeof(struct Write::Response), 0 );
D(printf("%s\n","object doesn't exist in master, sent auto failure" ));
}
}
else
{
//raise success response
struct Write::Response wr;
wr.common.status=STATUS_OK;
wr.version=1;
wr.tableId=w.tableId;
wr.key=w.key;
wr.timestamp=w.timestamp;
send(csd , &wr , sizeof(struct Write::Response), 0 );
D(printf("%s\n","non transaction packet, sent auto success\n" ));
}
}
}
}
}
}
return 0;
}
\ No newline at end of file
#include <unistd.h>
#include <stdio.h>
#include <iostream>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <thread>
#include <vector>
#include <mutex>
#include <chrono>
#include <random>
#include <unordered_map>
#include <inttypes.h>
#include <sys/time.h> //FD_SET, FD_ISSET, FD_ZERO macros
#define TRUE 1
#define FALSE 0
#define PORT 8085
#define MILLIS 1000
#define MICRO MILLIS * 1000
#define MAX_TARGET 4
#define K 1024
#define M K * 1024
#define G M * 1024
// in order to turn on debugging, compile with -DDEBUG flag
#ifdef DEBUG
#define D(x) (x)
#else
#define D(x) do{}while(0)
#endif
using namespace std;
fd_set readfds;
unordered_map<string, uint64_t> m; //The hashmap on NIC that stores the most recent version number of each object that is created/updated
/**
* This enum provides symbolic names for the status values returned
* to applications by RAMCloud operations.
*
* 0 means success; anything else means that an error occurred.
* Not all status values can be returned by all operations.
*/
typedef enum Status {
/// Default return value when an operation was successful.
STATUS_OK = 0,
/// Indicates that the server does not know about (and is not responsible
/// for) a given tablet, but that it may exist elsewhere in the system.
/// When it's possible that the tablet exists on another server, this
/// status should be returned (in preference to the definitive
/// TABLE_DOESNT_EXIST).
STATUS_UNKNOWN_TABLET = 1,
/// Indicates that a table does not exist anywhere in the system. At present
/// only the coordinator can say with certainly that a table does not exist.
STATUS_TABLE_DOESNT_EXIST = 2,
/// Indicates that an object does not exist anywhere in the system. Note
/// that unlike with tables there is no UNKNOWN_OBJECT status. This is just
/// because servers will reject operations on objects in unknown tables with
/// a table-related status. If they own a particular tablet, then they can
/// say with certainty if an object exists there or not.
STATUS_OBJECT_DOESNT_EXIST = 3,
STATUS_OBJECT_EXISTS = 4,
STATUS_WRONG_VERSION = 5,
STATUS_NO_TABLE_SPACE = 6,
STATUS_MESSAGE_TOO_SHORT = 7,
STATUS_UNIMPLEMENTED_REQUEST = 8,
STATUS_REQUEST_FORMAT_ERROR = 9,
STATUS_RESPONSE_FORMAT_ERROR = 10,
STATUS_COULDNT_CONNECT = 11,
STATUS_BACKUP_BAD_SEGMENT_ID = 12,
/// Returned by backups when they cannot (or do not wish to) allocate
/// space for a segment replica.
STATUS_BACKUP_OPEN_REJECTED = 13,
STATUS_BACKUP_SEGMENT_OVERFLOW = 14,
STATUS_BACKUP_MALFORMED_SEGMENT = 15,
STATUS_SEGMENT_RECOVERY_FAILED = 16,
/// Indicates that a server is not prepared to handle a request at
/// the present time; the caller should retry at a later time. This
/// status can be returned under many different situations, such as
/// (a) the server is out of resources to execute the request, or
/// (b) the server is not sure it actually has authority to execute
/// the request, and is checking with the coordinator.
STATUS_RETRY = 17,
/// Indicates that the RPC requested an unknown service.
STATUS_SERVICE_NOT_AVAILABLE = 18,
STATUS_TIMEOUT = 19,
/// Indicates that server to which an RPC is directed either never existed,
/// has come and gone, or is currently in crashed state. The server is not
/// in a position to respond to RPCs and probably never will be again
/// (unless the id hasn't yet existed; once a server crashes its id will
/// never be reused).
STATUS_SERVER_NOT_UP = 20,
STATUS_INTERNAL_ERROR = 21,
/// Indicates that the object chosen for an operation does not match the
/// associated requirements. Therefore the chosen object is invalid.
STATUS_INVALID_OBJECT = 22,
/// Indicates that a tablet does not exist. This status is of relevance
/// when doing split or merge operations on tablets are executed.
STATUS_TABLET_DOESNT_EXIST = 23,
/// Indicates that the logic to partition tablets was invoked without a
/// preceeding invocation to start reading replicas off of disk.
STATUS_PARTITION_BEFORE_READ = 24,
/// Indicates that an RPC was intended for a particular server id, but
/// was actually sent to a different server id.
STATUS_WRONG_SERVER = 25,
/// Indicates that the server sending an RPC is not present in the
/// server list of the RPC recipient. Used to help servers discover
/// that they are zombies (the rest of the cluster thinks a zombie
/// is dead, but the zombie thinks it's still alive), so they don't
/// continue servicing requests when other servers have already
/// taken over their tablets. See "Zombies" in designNotes.
STATUS_CALLER_NOT_IN_CLUSTER = 26,
/// Indicates that a single request was too big to fit in an rpc and
/// thus could not be sent/carried out.
STATUS_REQUEST_TOO_LARGE = 27,
/// Indicates that the server does not know about (and is not responsible
/// for) a given indexlet, but that it may exist elsewhere in the system.
/// When it's possible that the indexlet exists on another server, this
/// status should be returned (in preference to the definitive
/// INDEX_DOESNT_EXIST).
STATUS_UNKNOWN_INDEXLET = 28,
/// Indicates that an index does not exist anywhere in the system. At
/// present only the coordinator can say with certainly that an index does
/// not exist.
STATUS_INDEX_DOESNT_EXIST = 29,
/// Indicates that a parameter provided by the client is invalid (for
/// example: it is outside allowed bounds).
STATUS_INVALID_PARAMETER = 30,
/// Indicates that client already received the result of the rpc.
/// It does not make sense to execute the RPC again. Most likely cause
/// is a delayed network packet.
STATUS_STALE_RPC = 31,
/// Indicates that the lease of a client is expired on the coordinator.
/// Master refused to execute the RPC with expired lease.
STATUS_EXPIRED_LEASE = 32,
/// Indicates that a client tried to perform transaction operations after
/// the transaction commit had already started.
STATUS_TX_OP_AFTER_COMMIT = 33,
STATUS_MAX_VALUE = 33,
// Note: if you add a new status value you must make the following
// additional updates:
// * Modify STATUS_MAX_VALUE to have a value equal to the largest
// defined status value, and make sure its definition is the last one
// in the list. STATUS_MAX_VALUE is used primarily for testing.
// * Add new entries in the tables "messages" and "symbols" in Status.cc.
// * Add a new exception class to ClientException.h
// * Add a new "case" to ClientException::throwException to map from
// the status value to a status-specific ClientException subclass.
// * In the Java bindings, add a static class for the exception to
// ClientException.java
// * Add a case for the status of the exception to throw the exception in
// ClientException.java
// * Add the exception to the Status enum in Status.java, making
// sure the status is in the correct position corresponding to its status
// code.
} Status;
/**
* Used in conditional operations to specify conditions under
* which an operation should be aborted with an error.
*
* RejectRules are typically used to ensure consistency of updates;
* for example, we might want to update a value but only if it hasn't
* changed since the last time we read it. If a RejectRules object
* is passed to an operation, the operation will be aborted if any
* of the following conditions are satisfied:
* - doesntExist is nonzero and the object does not exist
* - exists is nonzero and the object does exist
* - versionLeGiven is nonzero and the object exists with a version
* less than or equal to givenVersion.
* - versionNeGiven is nonzero and the object exists with a version
* different from givenVersion.
*/
struct RejectRules {
uint64_t givenVersion;
uint8_t doesntExist;
uint8_t exists;
uint8_t versionLeGiven;
uint8_t versionNeGiven;
} __attribute__((packed));
/**
* Selects the particular service that will handle a given rpc.
* A rpc may only be sent to one particular service; see ServiceMask for
* situations dealing with sets of services on a particular Server.
*/
enum ServiceType {
MASTER_SERVICE,
BACKUP_SERVICE,
COORDINATOR_SERVICE,
ADMIN_SERVICE,
INVALID_SERVICE, // One higher than the max.
};
/**
* Used in linearizable RPCs to check whether or not the RPC can be processed.
*/
struct ClientLease {
uint64_t leaseId; /// A cluster unique id for a specific lease.
/// 0 is used to indicate invalid or expired id.
uint64_t leaseExpiration; /// Cluster time after which the lease may have
/// become invalid.
uint64_t timestamp; /// Cluster time when this lease information was
/// provided by the coordinator.
} __attribute__((packed));
/**
* This enum defines the choices for the "opcode" field in RPC
* headers, which selects a particular operation to perform. Each
* RAMCloud service implements a subset of these operations. If you
* change this table you must also reflect the changes in the following
* locations:
* - The method opcodeSymbol in WireFormat.cc.
* - WireFormatTest.cc's out-of-range test, if ILLEGAL_RPC_TYPE was changed.
* - You may need to modify the "callees" table in scripts/genLevels.py,
* which keeps track of which RPCs invoke which other RPCs.
*/
enum Opcode {
PING = 7,
PROXY_PING = 8,
KILL = 9,
CREATE_TABLE = 10,
GET_TABLE_ID = 11,
DROP_TABLE = 12,
READ = 13,
WRITE = 14,
REMOVE = 15,
ENLIST_SERVER = 16,
GET_SERVER_LIST = 17,
GET_TABLE_CONFIG = 18,
RECOVER = 19,
HINT_SERVER_CRASHED = 20,
RECOVERY_MASTER_FINISHED = 21,
ENUMERATE = 22,
SET_MASTER_RECOVERY_INFO = 23,
FILL_WITH_TEST_DATA = 24,
MULTI_OP = 25,
GET_METRICS = 26,
BACKUP_FREE = 28,
BACKUP_GETRECOVERYDATA = 29,
BACKUP_STARTREADINGDATA = 31,
BACKUP_WRITE = 32,
BACKUP_RECOVERYCOMPLETE = 33,
UPDATE_SERVER_LIST = 35,
BACKUP_STARTPARTITION = 36,
DROP_TABLET_OWNERSHIP = 39,
TAKE_TABLET_OWNERSHIP = 40,
GET_HEAD_OF_LOG = 42,
INCREMENT = 43,
PREP_FOR_MIGRATION = 44,
RECEIVE_MIGRATION_DATA = 45,
REASSIGN_TABLET_OWNERSHIP = 46,
MIGRATE_TABLET = 47,
IS_REPLICA_NEEDED = 48,
SPLIT_TABLET = 49,
GET_SERVER_STATISTICS = 50,
SET_RUNTIME_OPTION = 51,
GET_SERVER_CONFIG = 52,
GET_BACKUP_CONFIG = 53,
GET_MASTER_CONFIG = 55,
GET_LOG_METRICS = 56,
VERIFY_MEMBERSHIP = 57,
GET_RUNTIME_OPTION = 58,
GET_LEASE_INFO = 59,
RENEW_LEASE = 60,
SERVER_CONTROL = 61,
SERVER_CONTROL_ALL = 62,
GET_SERVER_ID = 63,
READ_KEYS_AND_VALUE = 64,
LOOKUP_INDEX_KEYS = 65,
READ_HASHES = 66,
INSERT_INDEX_ENTRY = 67,
REMOVE_INDEX_ENTRY = 68,
CREATE_INDEX = 69,
DROP_INDEX = 70,
DROP_INDEXLET_OWNERSHIP = 71,
TAKE_INDEXLET_OWNERSHIP = 72,
PREP_FOR_INDEXLET_MIGRATION = 73,
SPLIT_AND_MIGRATE_INDEXLET = 74,
COORD_SPLIT_AND_MIGRATE_INDEXLET = 75,
TX_DECISION = 76,
TX_PREPARE = 77,
TX_REQUEST_ABORT = 78,
TX_HINT_FAILED = 79,
ECHO = 80,
ILLEGAL_RPC_TYPE = 81, // 1 + the highest legitimate Opcode
};
/**
* Each RPC request starts with this structure.
*/
struct RequestCommon {
uint16_t opcode; /// Opcode of operation to be performed.
uint16_t service; /// ServiceType to invoke for this rpc.
} __attribute__((packed));
/**
* Each RPC response starts with this structure.
*/
struct ResponseCommon {
Status status; // Indicates whether the operation
// succeeded; if not, it explains why.
} __attribute__((packed));
/**
* This struct describes the packet structure of a write packet
* key differences include the addition of the field uint64_t timestamp;
* to help measure latencies and the ifdef elsedef structure to allow the user
* to set the packet size using commandline macros at compile time
* default size is 100 bytes
*/
struct Write {
static const Opcode opcode = WRITE;
static const ServiceType service = MASTER_SERVICE;
struct Request {
RequestCommon common;
uint64_t tableId;
uint64_t key;
uint64_t timestamp;
ClientLease lease;
uint64_t rpcId;
uint64_t ackId;
uint32_t length; // Includes the total size of the
// keysAndValue blob in bytes.These
// follow immediately after this header
RejectRules rejectRules;
uint8_t async;
uint8_t array[15];
#ifdef P1000
uint64_t array2[112];
uint8_t array3[4];
#endif
#ifdef P10000
uint64_t array2[112];
uint8_t array3[4];
uint64_t array4[1125];
#endif
} __attribute__((packed));
struct Response {
ResponseCommon common;
uint64_t tableId;
uint64_t key;
uint64_t version;
uint64_t timestamp;
uint64_t array[8];
#ifdef P1000
uint64_t array2[112];
uint8_t array3[4];
#endif
#ifdef P10000
uint64_t array2[112];
uint8_t array3[4];
uint64_t array4[1125];
#endif
} __attribute__((packed));
};
int main(int argc, char const *argv[])
{
int server_fd, new_socket, valread;
struct sockaddr_in address;
int opt = 1;
int addrlen = sizeof(address);
char *hello = "1st message from server";
char *both = "Both master and client are connected";
// Creating socket file descriptor
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0)
{
perror("socket failed");
exit(EXIT_FAILURE);
}
// Forcefully attaching socket to the port 8080
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT,
&opt, sizeof(opt)))
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons( PORT );
// Forcefully attaching socket to the port 8080
if (bind(server_fd, (struct sockaddr *)&address,
sizeof(address))<0)
{
perror("bind failed");
exit(EXIT_FAILURE);
}
if (listen(server_fd, 3) < 0)
{
perror("listen");
exit(EXIT_FAILURE);
}
// we use select instead of creating multiple threads since continuous polling reduces latency
// as specified in the report
//The best way to optimize latency is to use a single thread for handling all requests. This approach eliminates synchronization between threads, and it also eliminates cache misses required to move data between cores in a multithreaded environment.
char buffer1[1024] = {0};
int sd, max_sd;
int csd, msd;// csd is the client socket descriptor, while msd is the master socket descriptor
int max_clients = 2;
int activity;
int client_socket[max_clients-1];
int both_connected = 0,first_time=0;
static char buffer[2 * M] = {0};
//initialise all client_socket[] to 0 so not checked
for (int i = 0; i < max_clients; i++)
{
client_socket[i] = 0;
}
//event loop
while(TRUE)
{
//clear the socket set
FD_ZERO(&readfds);
//add master socket to set
FD_SET(server_fd, &readfds);
max_sd = server_fd;
//add child sockets to set
for ( int i = 0 ; i < max_clients ; i++)
{
//socket descriptor
sd = client_socket[i];
//if valid socket descriptor then add to read list
if(sd > 0)
FD_SET( sd , &readfds);
//highest file descriptor number, need it for the select function
if(sd > max_sd)
max_sd = sd;
}
//wait for an activity on one of the sockets , timeout is NULL ,
//so wait indefinitely
activity = select( max_sd + 1 , &readfds , NULL , NULL , NULL);
if ((activity < 0) && (errno!=EINTR))
{
printf("select error");
}
//If something happened on the master socket ,
//then its an incoming connection
if (FD_ISSET(server_fd, &readfds))
{
if ((new_socket = accept(server_fd,
(struct sockaddr *)&address, (socklen_t*)&addrlen))<0)
{
perror("accept");
exit(EXIT_FAILURE);
}
both_connected++;
//inform user of socket number - used in send and receive commands
printf("New connection , socket fd is %d , ip is : %s , port : %d \n" , new_socket , inet_ntoa(address.sin_addr) , ntohs(address.sin_port));
//send new connection greeting message
if( send(new_socket, hello, strlen(hello), 0) != strlen(hello) )
{
perror("send");
}
puts("Welcome message sent successfully");
//add new socket to array of sockets
for (int i = 0; i < max_clients; i++)
{
//if position is empty
if( client_socket[i] == 0 )
{
client_socket[i] = new_socket;
printf("Adding to list of sockets as %d\n" , i);
break;
}
}
}
// This part of the code synchronises the programs running on the master, client and the NIC
// it ensures that setup delays do not affect the experiments
if(both_connected==2 && first_time==0)
{
valread = read( client_socket[0] , buffer1, 1024);
if(valread>0)
{
printf("%s\n",buffer1 );
memset(&buffer1[0], 0, sizeof(buffer1));
}
valread = read( client_socket[1] , buffer1, 1024);
if(valread>0)
{
printf("%s\n",buffer1 );
memset(&buffer1[0], 0, sizeof(buffer1));
}
if( send(client_socket[0], both, strlen(both), 0) != strlen(both) )
{
perror("send");
}
if( send(client_socket[1], both, strlen(both), 0) != strlen(both) )
{
perror("send");
}
first_time++;
}
if(both_connected==2)
{
getpeername(client_socket[0] , (struct sockaddr*)&address , (socklen_t*)&addrlen);
// printf("Host disconnected , ip %s , port %d \n" , inet_ntoa(address.sin_addr) , ntohs(address.sin_port));
// Optional code to fix csd and msd irrespective of the order of connection
// if(inet_ntoa(address.sin_addr)=="192.168.220.60")
// {
// csd = client_socket[1];
// msd = client_socket[0];
// }
// else
// {
// csd = client_socket[0];
// msd = client_socket[1];
// }
D(cout<<inet_ntoa(address.sin_addr));
csd = client_socket[0];
msd = client_socket[1];
if (FD_ISSET( csd , &readfds))
{
//Check if it was for closing , and also read the
//incoming message
if ((valread = read( csd , buffer, 2 * M)) == 0)
{
//Client disconnected, print details
getpeername(csd , (struct sockaddr*)&address , (socklen_t*)&addrlen);
printf("Host disconnected , ip %s , port %d \n" , inet_ntoa(address.sin_addr) , ntohs(address.sin_port));
//Close the socket and mark as 0 in list for reuse
close( csd );
client_socket[0] = 0;
}
else
{ // The write request received is stored in this struct
struct Write::Request w1;
//copying the contents of the buffer into the struct
memcpy(&w1, buffer, sizeof(w1));
int a = w1.common.opcode;
D(printf("Opcode:%d\n",a ));
//comparing the opcode to ensure that only write packets are processed by the offload
//and all other packets pass through
if(a==WRITE)
{
struct Write::Request w;
memcpy(&w, buffer, sizeof(w));
//debugging
D(printf("tableId:%lu\n",w.tableId ));
D(printf("key:%lu\n",w.key));
//Check the reject rules to confirm
// whether the write packet was for an atomic operation
if(w.rejectRules.versionNeGiven)
{
string s ="";
//Create the key for the hashmap by concatenating
//the tableId and the key in the write packet
s=s+to_string(w.tableId)+"$"+to_string(w.key);
D(cout<<s<<"\n");
//Case 1: The object's version already exists in the hashamp
if (m.find(s) != m.end())
{
uint64_t curr_version_number = m[s];
//compare curr_version_number with version number in the write request
if(w.rejectRules.givenVersion!=curr_version_number)
{
//raise failure response
struct Write::Response wr;
wr.common.status=STATUS_WRONG_VERSION;
wr.version=curr_version_number;
wr.tableId=w.tableId;
wr.key=w.key;
wr.timestamp=w.timestamp;
//storing paramters for debugging purposes
int stat = wr.common.status;
uint64_t vers = wr.version;
int table = wr.tableId;
int key = wr.key;
uint64_t ts = wr.timestamp;
send(csd , &wr , sizeof(struct Write::Response), 0 );
D(printf("%s\n","raised failure response" ));
D(printf("Server sent response Table:%d Key:%d Version:%lu Status:%d Timestamp:%" PRIu64 "\n",table, key, vers, stat, ts ));
chrono::microseconds ms = chrono::duration_cast< chrono::microseconds >(chrono::system_clock::now().time_since_epoch());
uint64_t ts2 = ms.count();
D(printf("Server sent response at Timestamp:%" PRIu64 "\n", ts2 ));
}
else
{
//version number matches
//send the packet to master's ip
send(msd , &w , sizeof(struct Write::Request), 0 );
D(printf("%s\n","sent the packet to master's ip" ));
}
}
else
{
//Key not found in SmartNIC hashmap
//send the packet to master's ip
send(msd , &w , sizeof(struct Write::Request), 0 );
D(printf("%s\n","sent the packet to master's ip 2" ));
}
}
else
{
//The write does not belong to an atomic operation
//and hence bypasses the offload
send(msd , &w , sizeof(struct Write::Request), 0 );
D(printf("%s\n","sent the packet to master's ip 3" ));
}
}
else if(a==ILLEGAL_RPC_TYPE)
{
//ILLEGAL_RPC_TYPE is used for termination packets while debugging
send(msd , &w1 , sizeof(struct Write::Request), 0 );
D(printf("%s\n\n\n\n","this is the one" ));
break;
}
}
}
if (FD_ISSET( msd , &readfds))
{
//Check if it was for closing , and also read the
//incoming message
if ((valread = read( msd , buffer, 2 * M)) == 0)
{
//Master disconnected, print details
getpeername(msd , (struct sockaddr*)&address , (socklen_t*)&addrlen);
printf("Host disconnected , ip %s , port %d \n" , inet_ntoa(address.sin_addr) , ntohs(address.sin_port));
//Close the socket and mark as 0 in list for reuse
close( msd );
client_socket[1] = 0;
}
else
{
// The write response received is stored in this struct
struct Write::Response wr;
memcpy(&wr, buffer, sizeof(wr));
//debugging
int stat = wr.common.status;
uint64_t vers = wr.version;
string s ="";
//Create the key for the hashmap by concatenating
//the tableId and the key in the write packet
s=s+to_string(wr.tableId)+"$"+to_string(wr.key);
//Insert or the assign the new version number in the hashmap
if (m.find(s) != m.end())
{
m.insert({s,vers});
}
else
{
m[s]=vers;
}
//debugging
int table = wr.tableId;
int key = wr.key;
int ts = wr.timestamp;
D(printf("Server received response Table:%d Key:%d Version:%lu Status:%d Timestamp:%d\n",table, key, vers, stat, ts ));
chrono::microseconds ms = chrono::duration_cast< chrono::microseconds >(chrono::system_clock::now().time_since_epoch());
uint64_t ts2 = ms.count();
D(printf("Server received response at Timestamp:%" PRIu64 "\n", ts2 ));
D(printf("Inserted into hashmap Key:%s Version:%lu\n",s.c_str(), vers));
D(printf("Status:%d\n",stat ));
if(stat==STATUS_WRONG_VERSION)
{
D(printf("Failure received:%d\n",stat ));
}
else if(stat==STATUS_OK)
{
D(printf("Success received:%d\n",stat ));
}
else
{
D(printf("Error received:%d\n",stat ));
}
//send response to client
send(csd , &wr , sizeof(struct Write::Response), 0 );
//debugging - termination case
if(stat==STATUS_MAX_VALUE)
{
// struct Write::Response wr;
// // wr.common.status=STATUS_WRONG_VERSION;
// wr.common.status=STATUS_MAX_VALUE+1;
// wr.version=1;
// wr.tableId=w.tableId;
// wr.key=w.key;
// wr.timestamp=w.timestamp;
// send(sock , &wr , sizeof(struct Write::Response), 0 );
chrono::microseconds ms = chrono::duration_cast< chrono::microseconds >(chrono::system_clock::now().time_since_epoch());
uint64_t ts2 = ms.count();
D(printf("Server sent response at Timestamp:%" PRIu64 "\n", ts2 ));
D(printf("%s\n\n\n\n","testing\n" ));
break;
}
}
}
}
}
return 0;
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment