Commit b4722ef3 authored by Shah Rinku's avatar Shah Rinku

restructured git

parent 8f618288
File added
File added
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="GradleMigrationSettings" migrationVersion="1" />
<component name="GradleSettings">
<option name="linkedExternalProjectsSettings">
<GradleProjectSettings>
<option name="distributionType" value="DEFAULT_WRAPPED" />
<option name="externalProjectPath" value="$PROJECT_DIR$" />
<option name="modules">
<set>
<option value="$PROJECT_DIR$" />
<option value="$PROJECT_DIR$/app" />
</set>
</option>
</GradleProjectSettings>
</option>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="libraries-with-intellij-classes">
<option name="intellijApiContainingLibraries">
<list>
<LibraryCoordinatesState>
<option name="artifactId" value="ideaIU" />
<option name="groupId" value="com.jetbrains.intellij.idea" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="ideaIU" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="ideaIC" />
<option name="groupId" value="com.jetbrains.intellij.idea" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="ideaIC" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="pycharmPY" />
<option name="groupId" value="com.jetbrains.intellij.pycharm" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="pycharmPY" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="pycharmPC" />
<option name="groupId" value="com.jetbrains.intellij.pycharm" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="pycharmPC" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="clion" />
<option name="groupId" value="com.jetbrains.intellij.clion" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="clion" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="riderRD" />
<option name="groupId" value="com.jetbrains.intellij.rider" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="riderRD" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="goland" />
<option name="groupId" value="com.jetbrains.intellij.goland" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="goland" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
</list>
</option>
</component>
</project>
\ No newline at end of file
app.name="HPDOS-Client"
app.version="0.1.4"
app.concurrency=5
app.runtime=10
app.data_size=10
app.data_conversion_factor=B
app.private_ratio=0.8
app.cycle_create=1
app.cycle_read=4
app.cycle_update=3
app.cycle_delete=1
package HpdosClient.MessageFormat;
public class MessageConstants {
public static final int INIT_VERSION = 0;
public static final int INVALID_VERSION = -1;
public static final int METADATA_ACCESS_PRIVATE = 700;
public static final int METADATA_ACCESS_SHARED = 777;
// 00 to 99 - Client Server Interaction operations
public static final int PACKET_METADATA_REQUEST = 0;
public static final int PACKET_METADATA_RESPONSE = 1;
// Distinguishing ACK and NACK packets
public static final int STATUS_OK = 200;
public static final int STATUS_UNAUTHORIZED_PRIVATE_KEY_ACCESS = 401;
public static final int STATUS_REPLICATION_FAILED = 402;
public static final int STATUS_SERVER_NOT_MASTER = 403;
public static final int STATUS_KEY_NOT_FOUND = 404;
public static final int STATUS_REPLICATION_TIMEOUT = 405;
public static final int STATUS_IO_WRITE_FAILED = 406;
public static final int STATUS_KEY_EXISTS = 407;
public static final int STATUS_UPDATE_VERSION_MISMATCH = 408;
// 100 to 199 - HPDOS System internal operations
public static final int MASTER_HEARTBEAT = 100;
public static final int METADATA_CREATE = 101;
public static final int METADATA_READ = 102;
public static final int METADATA_UPDATE = 103;
public static final int METADATA_DELETE = 104;
}
package HpdosClient.lib;
import hpdos.grpc.Ack;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
public class StorageModel {
private int version;
private int dataSize;
private final String key; // key is immutable
private long crc;
private int accessType;
private String value;
private final String owner; // ownership is immutable
public StorageModel(int version, int dataSize, String key, int accessType, String owner, long crc, String value) {
this.version = version;
this.dataSize = dataSize;
this.key = key;
this.accessType = accessType;
this.value = value;
this.owner = owner;
// calculate CRC32 based on the value field
this.crc = crc;
}
public void updateData(Ack ack) {
this.version = ack.getVersion();
this.dataSize = ack.getDataSize();
this.value = ack.getValue();
this.crc = ack.getCrc();
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public int getDataSize() {
return dataSize;
}
public void setDataSize(int dataSize) {
this.dataSize = dataSize;
}
public String getKey() {
return key;
}
public long getCrc() {
return crc;
}
public void setCrc(long crc) {
this.crc = crc;
}
public int getAccessType() {
return accessType;
}
public void setAccessType(int accessType) {
this.accessType = accessType;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String getOwner() {
return owner;
}
@Override
public boolean equals(Object obj) {
if (obj == null)
return false;
if (obj.getClass() != this.getClass())
return false;
StorageModel model = (StorageModel) obj;
return this.getKey().equals(model.getKey());
}
@Override
public String toString() {
return "key: " + this.key + "\n" +
"dataSize: " + this.dataSize + "\n" +
"version: " + this.version + "\n" +
"owner: " + this.owner + "\n" +
"accessType: " + this.accessType + "\n" +
"crc: " + this.crc + "\n" +
"value: " + this.value;
}
}
package HpdosClient.lib;
import HpdosClient.ConfigConstants;
import HpdosClient.MessageFormat.MessageConstants;
import HpdosClient.MessageFormat.RequestBuilder;
import com.google.common.util.concurrent.ListenableFuture;
import hpdos.grpc.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.*;
public class StorageService {
private final String clientID;
private ManagedChannel masterChannel;
private ArrayList<ManagedChannel> channels;
private NetworkServiceGrpc.NetworkServiceFutureStub masterStub;
private final ArrayList<NetworkServiceGrpc.NetworkServiceFutureStub> stubs;
private List<Follower> replicaSet;
public StorageService(String clientID) {
this.clientID = clientID;
this.stubs = new ArrayList<>();
}
public void retrieveFollowerList() {
NetworkServiceGrpc.NetworkServiceBlockingStub stub = NetworkServiceGrpc.newBlockingStub(this.masterChannel);
ResponseList responseList = stub.getReadReplicaList(null);
this.replicaSet = responseList.getFollowerList();
// for (Follower follower: this.replicaSet) {
// System.out.println(follower);
// }
}
public void cleanup() {
for (ManagedChannel channel: this.channels)
channel.shutdown();
}
public void initStorage() {
masterChannel = ManagedChannelBuilder.
forAddress(ConfigConstants.HOST, ConfigConstants.PORT)
.usePlaintext()
.build();
channels = new ArrayList<>();
channels.add(masterChannel);
masterStub = NetworkServiceGrpc.newFutureStub(this.masterChannel);
retrieveFollowerList();
for (Follower follower: replicaSet) {
ManagedChannel channel = ManagedChannelBuilder.
forAddress(follower.getIp(), follower.getPort())
.usePlaintext()
.build();
channels.add(channel);
NetworkServiceGrpc.NetworkServiceFutureStub stub = NetworkServiceGrpc.newFutureStub(channel);
stubs.add(stub);
}
}
public ListenableFuture<Packet> create(String key, String value, int accessType) {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_CREATE,
0, value.length(), key,
0, accessType, this.clientID, value));
Packet packet = RequestBuilder.buildPacket(request);
return this.masterStub.createMetadata(packet);
}
public ListenableFuture<Packet> read(String key) {
int rnd = new Random().nextInt(this.stubs.size());
NetworkServiceGrpc.NetworkServiceFutureStub stub = this.stubs.get(rnd);
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_READ,
0, 0, key, 0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, ""));
Packet packet = RequestBuilder.buildPacket(request);
return stub.readMetadata(packet);
}
public ListenableFuture<Packet> update(String key, String value, int version) {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_UPDATE,
version, value.length(), key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, value));
Packet packet = RequestBuilder.buildPacket(request);
return this.masterStub.updateMetadata(packet);
}
public ListenableFuture<Packet> delete(String key, int version) {
ArrayList<Request> request = new ArrayList<>();
request.add(RequestBuilder.buildRequest(MessageConstants.METADATA_DELETE,
version, 0, key,
0, MessageConstants.METADATA_ACCESS_PRIVATE, this.clientID, ""));
Packet packet = RequestBuilder.buildPacket(request);
return this.masterStub.deleteMetadata(packet);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="GradleMigrationSettings" migrationVersion="1" />
<component name="GradleSettings">
<option name="linkedExternalProjectsSettings">
<GradleProjectSettings>
<option name="distributionType" value="DEFAULT_WRAPPED" />
<option name="externalProjectPath" value="$PROJECT_DIR$" />
<option name="gradleJvm" value="#JAVA_HOME" />
<option name="modules">
<set>
<option value="$PROJECT_DIR$" />
<option value="$PROJECT_DIR$/app" />
</set>
</option>
</GradleProjectSettings>
</option>
</component>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="libraries-with-intellij-classes">
<option name="intellijApiContainingLibraries">
<list>
<LibraryCoordinatesState>
<option name="artifactId" value="ideaIU" />
<option name="groupId" value="com.jetbrains.intellij.idea" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="ideaIU" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="ideaIC" />
<option name="groupId" value="com.jetbrains.intellij.idea" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="ideaIC" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="pycharmPY" />
<option name="groupId" value="com.jetbrains.intellij.pycharm" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="pycharmPY" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="pycharmPC" />
<option name="groupId" value="com.jetbrains.intellij.pycharm" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="pycharmPC" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="clion" />
<option name="groupId" value="com.jetbrains.intellij.clion" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="clion" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="riderRD" />
<option name="groupId" value="com.jetbrains.intellij.rider" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="riderRD" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="goland" />
<option name="groupId" value="com.jetbrains.intellij.goland" />
</LibraryCoordinatesState>
<LibraryCoordinatesState>
<option name="artifactId" value="goland" />
<option name="groupId" value="com.jetbrains" />
</LibraryCoordinatesState>
</list>
</option>
</component>
</project>
\ No newline at end of file
app.name=HPDOS-Server
app.version=0.1.6
app.REPLICATION_TYPE=async
app.REPLICATOR_THREAD_POOL_SIZE=140
/*
* This file was generated by the Gradle 'init' task.
*
* This generated file contains a sample Java application project to get you started.
* For more details take a look at the 'Building Java & JVM projects' chapter in the Gradle
* User Manual available at https://docs.gradle.org/6.8.3/userguide/building_java_projects.html
*/
plugins {
// Apply the application plugin to add support for building a CLI application in Java.
id 'application'
id "com.google.protobuf" version "0.8.15"
id "java"
}
repositories {
// Use JCenter for resolving dependencies.
jcenter()
mavenCentral()
}
dependencies {
// Use JUnit test framework.
testImplementation 'junit:junit:4.13'
// This dependency is used by the application.
implementation 'com.google.guava:guava:29.0-jre'
implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.15.6'
implementation 'io.grpc:grpc-netty-shaded:1.36.0'
implementation 'io.grpc:grpc-protobuf:1.36.0'
implementation 'io.grpc:grpc-stub:1.36.0'
compileOnly 'org.apache.tomcat:annotations-api:6.0.53' // necessary for Java 9+
}
application {
// Define the main class for the application.
mainClass = 'hpdos.MetadataServer'
}
sourceSets {
src {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
}
protobuf {
protoc {
artifact = 'com.google.protobuf:protoc:3.15.6'
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:1.36.0"
}
}
generateProtoTasks {
all()*.plugins {
grpc {}
}
}
}
\ No newline at end of file
package hpdos;
public class ConfigConstants {
public static final String HOST = "localhost";
public static final int PORT = 8080;
public static final int HEARTBEAT_INTERVAL = 500;
public static final int REPLICATION_TIMEOUT = 5000;
public static final String replicationAsync = "async";
// Backend types 300-399
public static final int BACKEND_IN_MEMORY = 300;
public static final int LSM_BACKEND = 301;
public static final int BINARY_TREE_BACKEND = 302;
public static final int REPLICATOR_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
}
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package hpdos;
import hpdos.grpc.HeartbeatRequest;
import hpdos.grpc.HeartbeatResponse;
import hpdos.grpc.HeartbeatServiceGrpc;
import hpdos.handler.HeartbeatHandler;
import hpdos.handler.IOHandler;
import hpdos.handler.NetworkHandler;
import hpdos.handler.ReplicateHandler;
import hpdos.lib.*;
import hpdos.message.MessageConstants;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
public class MetadataServer {
private Server server;
private final HashMap<String, MasterFollower> followers;
private final String serverID;
private boolean isMaster = false;
private int port;
private final String host;
private IOHandler ioHandler;
private ReplicationService replicationService;
private final Properties properties;
public MetadataServer(String propertiesFile) throws IOException {
this.followers = new HashMap<>();
this.serverID = UUID.randomUUID().toString();
this.port = 10000 + (int)(Math.random() * 40000);
this.host = "localhost";
this.replicationService = null;
this.properties = new Properties();
InputStream inputStream = new FileInputStream(propertiesFile);
properties.load(inputStream);
}
public String getGreeting() {
return "Hello World!";
}
public boolean startMasterServices() {
this.server = ServerBuilder.forPort(ConfigConstants.PORT)
.addService(new NetworkHandler(this.ioHandler, this.replicationService))
.addService(new HeartbeatHandler(followers, serverID))
.build();
try {
this.server.start();
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
public boolean startFollowerServices() {
// In case of followers NetworkHandler will only serve read requests for private metadata
// Other network handler services will fail
this.server = ServerBuilder.forPort(port)
.addService(new NetworkHandler(this.ioHandler, this.replicationService))
.addService(new ReplicateHandler(this.ioHandler))
.build();
try {
this.server.start();
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
public void blockForIO(Server server) {
if (server == null)
return;
try {
server.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void announceToMaster() {
final ManagedChannel channel = ManagedChannelBuilder.
forAddress(ConfigConstants.HOST, ConfigConstants.PORT)
.usePlaintext()
.build();
HeartbeatServiceGrpc.HeartbeatServiceBlockingStub stub = HeartbeatServiceGrpc.newBlockingStub(channel);
HeartbeatRequest.Builder heartbeat = HeartbeatRequest.newBuilder();
heartbeat.setPacketType(MessageConstants.PACKET_METADATA_REQUEST);
heartbeat.setOperationType(MessageConstants.MASTER_HEARTBEAT);
heartbeat.setFollowerID(this.serverID);
heartbeat.setIp(this.host);
heartbeat.setPort(this.port);
try {
HeartbeatResponse response = stub.heartbeat(heartbeat.build());
} catch (Exception e) {
System.out.println("Metadata Master not found, electing self as master");
this.isMaster = true;
this.port = ConfigConstants.PORT;
}
channel.shutdown();
}
private void startHeartbeatService() {
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
announceToMaster();
}
}, ConfigConstants.HEARTBEAT_INTERVAL, ConfigConstants.HEARTBEAT_INTERVAL);
}
private IOHandler initStorage(int backend) {
StorageService storageService;
switch (backend) {
case ConfigConstants.BACKEND_IN_MEMORY:
storageService = new MemoryStorageService();
break;
case ConfigConstants.BINARY_TREE_BACKEND:
case ConfigConstants.LSM_BACKEND:
default: return null;
}
return new IOHandler(storageService, this.isMaster);
}
private void cleanup () {
if (this.replicationService != null) {
try {
this.replicationService.cleanup();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
MetadataServer metaDataServer = null;
try {
metaDataServer = new MetadataServer(args[0]);
} catch (IOException e) {
System.out.println("Config file missing or unreadable");
System.exit(-1);
}
System.out.println(metaDataServer.getGreeting());
System.out.println("Starting Metadata service");
System.out.println("Initialising storage service");
// Check ConfigConstants for available storage options
metaDataServer.ioHandler = metaDataServer.initStorage(ConfigConstants.BACKEND_IN_MEMORY);
if (metaDataServer.ioHandler == null) {
System.out.println("Storage server initialisation error");
return;
}
System.out.println("Searching for MetadataMaster");
metaDataServer.announceToMaster();
if (metaDataServer.isMaster) {
metaDataServer.replicationService = new InlineReplicationService(
metaDataServer.followers, metaDataServer.properties);
System.out.println("Started master replication module");
boolean status = metaDataServer.startMasterServices();
System.out.println("Master ID: " + metaDataServer.serverID);
if (status) {
System.out.println("Starting Master MetadataServer at: " + ConfigConstants.PORT);
metaDataServer.blockForIO(metaDataServer.server);
}
else
System.out.println("Failed to create server");
} else {
System.out.println("Master Node detected.\nStarting heartbeat service");
metaDataServer.startHeartbeatService();
System.out.println("Starting replication service");
boolean status = metaDataServer.startFollowerServices();
if (status) {
System.out.println("Starting Follower MetadataServer at: " + metaDataServer.port);
metaDataServer.blockForIO(metaDataServer.server);
}
else
System.out.println("Failed to create server");
}
// Adding a shutdown hook
Runtime current = Runtime.getRuntime();
current.addShutdownHook(new Thread(metaDataServer::cleanup));
}
}
package hpdos.handler;
import hpdos.grpc.Ack;
import hpdos.grpc.Nack;
import hpdos.grpc.Request;
import hpdos.grpc.Response;
import hpdos.lib.StorageModel;
import hpdos.lib.StorageService;
import hpdos.lib.StoredModel;
import hpdos.message.MessageConstants;
import hpdos.message.ResponseBuilder;
public class IOHandler {
StorageService storageService;
private final boolean isMaster;
public IOHandler(StorageService storageService, boolean isMaster) {
this.storageService = storageService;
this.isMaster = isMaster;
}
public Response create(Request request) {
Ack ack = null; Nack nack = null;
StorageModel blob = new StorageModel(
MessageConstants.INIT_VERSION,
request.getDataSize(),
request.getKey(),
request.getAccessType(),
request.getClientID(),
request.getValue());
StoredModel storedData = storageService.create(request.getKey(), blob);
if (storedData.getStatus() == MessageConstants.STATUS_OK) {
ack = ResponseBuilder.buildAck(storedData.getData().getVersion(),
storedData.getData().getDataSize(),
storedData.getData().getKey(),
storedData.getData().getCrc(),
storedData.getData().getValue());
return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE, MessageConstants.STATUS_OK,
ack, null);
}
else {
nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION,
request.getKey());
return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_CREATE, storedData.getStatus(),
null, nack);
}
}
public Response update(Request request) {
Ack ack = null; Nack nack = null;
StorageModel blob = new StorageModel(
request.getVersion(),
request.getDataSize(),
request.getKey(),
request.getAccessType(),
request.getClientID(),
request.getValue());
StoredModel storedData = storageService.update(request.getKey(), blob);
if (storedData.getStatus() == MessageConstants.STATUS_OK) {
ack = ResponseBuilder.buildAck(storedData.getData().getVersion(),
storedData.getData().getDataSize(),
storedData.getData().getKey(),
storedData.getData().getCrc(),
storedData.getData().getValue());
return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_UPDATE, MessageConstants.STATUS_OK,
ack, null);
}
else {
int version = (storedData.getStatus() == MessageConstants.STATUS_UPDATE_VERSION_MISMATCH)?
storedData.getData().getVersion(): MessageConstants.INVALID_VERSION;
nack = ResponseBuilder.buildNack(version, request.getKey());
return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_UPDATE, storedData.getStatus(),
null, nack);
}
}
/**
* Removes the object from the KV store if the request version matches with the current version of data
* @param request Models to be deleted. Only the key, clientID and version needs to be valid
* @return previous value if successful else null with status
*/
public Response delete(Request request) {
Ack ack = null; Nack nack = null;
StorageModel blob = new StorageModel(
request.getVersion(),
request.getDataSize(),
request.getKey(),
request.getAccessType(),
request.getClientID(),
request.getValue());
StoredModel storedData = storageService.delete(request.getKey(), blob);
if (storedData.getStatus() == MessageConstants.STATUS_OK) {
ack = ResponseBuilder.buildAck(storedData.getData().getVersion(),
storedData.getData().getDataSize(),
storedData.getData().getKey(),
storedData.getData().getCrc(),
storedData.getData().getValue());
return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_DELETE, MessageConstants.STATUS_OK,
ack, null);
}
else {
int version = (storedData.getStatus() == MessageConstants.STATUS_UPDATE_VERSION_MISMATCH)?
storedData.getData().getVersion(): MessageConstants.INVALID_VERSION;
nack = ResponseBuilder.buildNack(version, request.getKey());
return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_DELETE, storedData.getStatus(),
null, nack);
}
}
public Response read(Request request) {
Ack ack = null; Nack nack = null;
int status;
// Only Metadata Master can serve shared object lookup requests
if (!isMaster && request.getAccessType() == MessageConstants.METADATA_ACCESS_SHARED) {
status = MessageConstants.STATUS_SERVER_NOT_MASTER;
nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION,
request.getKey());
} else {
StoredModel readData = storageService.readByKey(request.getKey());
if (readData.getStatus() == MessageConstants.STATUS_KEY_NOT_FOUND) {
status = MessageConstants.STATUS_KEY_NOT_FOUND;
nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION,
request.getKey());
} else if (readData.getData() != null &&
readData.getData().getAccessType() == MessageConstants.METADATA_ACCESS_PRIVATE &&
!request.getClientID().equals(readData.getData().getOwner())) {
status = MessageConstants.STATUS_UNAUTHORIZED_PRIVATE_KEY_ACCESS;
nack = ResponseBuilder.buildNack(MessageConstants.INVALID_VERSION,
request.getKey());
} else {
status = MessageConstants.STATUS_OK;
ack = ResponseBuilder.buildAck(readData.getData().getVersion(),
readData.getData().getDataSize(),
readData.getData().getKey(),
readData.getData().getCrc(),
readData.getData().getValue());
}
}
return ResponseBuilder.buildResponsePacket(
MessageConstants.METADATA_READ, status,
ack, nack);
}
}
package hpdos.handler;
import com.google.common.base.Stopwatch;
import hpdos.grpc.*;
import hpdos.message.MessageConstants;
import hpdos.message.ResponseBuilder;
import io.grpc.stub.StreamObserver;
public class ReplicateHandler extends ReplicationServiceGrpc.ReplicationServiceImplBase {
private final IOHandler ioHandler;
public ReplicateHandler(IOHandler ioHandler) {
this.ioHandler = ioHandler;
}
@Override
public void replicateMetadata(ReplicationRequest replicationRequest, StreamObserver<ReplicationResponse> responseObserver) {
ReplicationResponse responsePacket = null;
// System.out.println("Replication request " + replicationRequest);
Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start();
if (replicationRequest.getPacketType() == MessageConstants.PACKET_METADATA_REQUEST) {
if (replicationRequest.getRequestList().size() > 1)
System.out.println("REQUEST BATCHING");
for (Request request: replicationRequest.getRequestList()) {
if (request.getOperationType() == MessageConstants.METADATA_CREATE) {
Response response = ioHandler.create(request);
responsePacket = ResponseBuilder.buildReplicationResponse(response);
} else if (request.getOperationType() == MessageConstants.METADATA_UPDATE) {
Response response = ioHandler.update(request);
responsePacket = ResponseBuilder.buildReplicationResponse(response);
} else if (request.getOperationType() == MessageConstants.METADATA_DELETE) {
Response response = ioHandler.delete(request);
responsePacket = ResponseBuilder.buildReplicationResponse(response);
}
// System.out.println(responsePacket);
responseObserver.onNext(responsePacket);
}
responseObserver.onCompleted();
}
stopwatch.stop();
// System.out.println("Replicate Handler " + stopwatch);
}
}
package hpdos.lib;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import hpdos.ConfigConstants;
import hpdos.grpc.ReplicationRequest;
import hpdos.grpc.ReplicationResponse;
import hpdos.grpc.ReplicationServiceGrpc;
import hpdos.grpc.Response;
import hpdos.message.MessageConstants;
import hpdos.message.ResponseBuilder;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import javax.annotation.Nonnull;
import java.util.*;
import java.util.concurrent.*;
public class InlineReplicationService implements ReplicationService {
private final HashMap<String, MasterFollower> followers;
private final HashMap<String, ManagedChannel> channels;
private ExecutorService executorService;
private final boolean isReplicationAsync;
public InlineReplicationService(HashMap<String, MasterFollower> followers, Properties properties) {
this.followers = followers;
this.channels = new HashMap<>();
String replicationType = (String) properties.get("app.REPLICATION_TYPE");
this.isReplicationAsync = replicationType.equals(ConfigConstants.replicationAsync);
if (!this.isReplicationAsync) {
int replicationThreadPoolSize;
if (properties.containsKey("app.REPLICATOR_THREAD_POOL_SIZE")) {
replicationThreadPoolSize = Integer.parseInt((String)
properties.get("app.REPLICATOR_THREAD_POOL_SIZE"));
} else {
replicationThreadPoolSize = ConfigConstants.REPLICATOR_THREAD_POOL_SIZE;
}
this.executorService = Executors.newFixedThreadPool(replicationThreadPoolSize);
System.out.println("Creating synchronous replication pool. Pool size: " + replicationThreadPoolSize);
} else {
System.out.println("Replication to be handled using asynchronous handlers");
}
}
@Override
public void cleanup() throws InterruptedException {
for (ManagedChannel channel: channels.values())
channel.shutdown();
if (this.executorService != null) {
executorService.shutdown();
boolean status = executorService.
awaitTermination(MessageConstants.STATUS_REPLICATION_TIMEOUT, TimeUnit.MILLISECONDS);
if (!status)
executorService.shutdownNow();
}
}
private void establishChannels() {
for (String followerID: followers.keySet()) {
if (!channels.containsKey(followerID)) {
MasterFollower follower = followers.get(followerID);
ManagedChannel channel = ManagedChannelBuilder
.forAddress(follower.getIp(), follower.getPort())
.usePlaintext()
.build();
channels.put(follower.getFollowerID(), channel);
}
}
}
@Override
public ReplicationResponse replicateMetadata(ReplicationRequest replicationRequest) throws ExecutionException, InterruptedException {
if (this.isReplicationAsync) {
return replicateMetadataAsync(replicationRequest);
} else {
return replicateMetadataSync(replicationRequest);
}
}
public ReplicationResponse replicateMetadataSync(ReplicationRequest replicationRequest)
throws InterruptedException, ExecutionException {
Set<Callable<ReplicationResponse>> callables = new HashSet<>();
// new followers have joined or left.
// TODO: Handle follower leaving scenario
// FIXME: fix edge case where equal number of followers leaving and joining won't trigger connection reestablishment
if (channels.size() != followers.size()) {
establishChannels();
}
for (ManagedChannel channel: channels.values()) {
callables.add(() -> {
ReplicationServiceGrpc.ReplicationServiceBlockingStub stub =
ReplicationServiceGrpc.newBlockingStub(channel);
return stub.replicateMetadata(replicationRequest);
});
}
List<Future<ReplicationResponse>> futures = executorService.invokeAll(callables);
HashMap<String, Response> responseHashMap = new HashMap<>();
Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start();
for (Future<ReplicationResponse> future: futures) {
ReplicationResponse replicationResponse;
replicationResponse = future.get(); //TODO: Add and handle get timeout. Timeout related constants already added
for (Response receivedResponse: replicationResponse.getResponseList()) {
int status = receivedResponse.getStatus();
if (status == MessageConstants.STATUS_OK) {
if (!responseHashMap.containsKey(receivedResponse.getAck().getKey()))
responseHashMap.put(receivedResponse.getAck().getKey(), receivedResponse);
} else {
responseHashMap.put(receivedResponse.getNack().getKey(), receivedResponse);
}
}
}
stopwatch.stop();
// System.out.println("replicateMetadata ReplicationService " + stopwatch);
return ResponseBuilder.
buildReplicationResponse(new ArrayList<>(responseHashMap.values()));
}
/**
* Incomplete implementation do not use
* @param replicationRequest replication request sent
* @return replication response
*/
public ReplicationResponse replicateMetadataAsync(ReplicationRequest replicationRequest) throws InterruptedException {
CountDownLatch replicationWaiter = new CountDownLatch(this.followers.size());
HashMap<String, Response> responseHashMap = new HashMap<>();
if (channels.size() != followers.size()) {
establishChannels();
}
for (ManagedChannel channel: channels.values()) {
ReplicationServiceGrpc.ReplicationServiceFutureStub stub =
ReplicationServiceGrpc.newFutureStub(channel);
ListenableFuture<ReplicationResponse> res = stub.replicateMetadata(replicationRequest);
Futures.addCallback(res, new FutureCallback<>() {
@Override
public void onSuccess(ReplicationResponse result) {
for (Response receivedResponse: result.getResponseList()) {
int status = receivedResponse.getStatus();
if (status == MessageConstants.STATUS_OK) {
if (!responseHashMap.containsKey(receivedResponse.getAck().getKey()))
responseHashMap.put(receivedResponse.getAck().getKey(), receivedResponse);
} else {
responseHashMap.put(receivedResponse.getNack().getKey(), receivedResponse);
}
}
replicationWaiter.countDown();
}
@Override
public void onFailure(@Nonnull Throwable t) {
replicationWaiter.countDown();
}
}, MoreExecutors.directExecutor());
}
Stopwatch stopwatch = Stopwatch.createUnstarted();
stopwatch.start();
replicationWaiter.await();
stopwatch.stop();
// System.out.println("replication latency" + stopwatch);
return ResponseBuilder.
buildReplicationResponse(new ArrayList<>(responseHashMap.values()));
}
@Override
public HashMap<String, MasterFollower> getFollowers() {
return followers;
}
}
package hpdos.lib;
import hpdos.message.MessageConstants;
import java.util.concurrent.ConcurrentHashMap;
public class MemoryStorageService implements StorageService {
private final ConcurrentHashMap<String, StorageModel> memoryKVStore;
public MemoryStorageService() {
this.memoryKVStore = new ConcurrentHashMap<>();
}
@Override
public StoredModel create(String key, StorageModel value) {
try {
if (memoryKVStore.putIfAbsent(key, value) != null)
return new StoredModel(null, MessageConstants.STATUS_KEY_EXISTS);
return new StoredModel(value, MessageConstants.STATUS_OK);
} catch (Exception e) {
return null;
}
}
@Override
public StoredModel readByKey(String key) {
if (!memoryKVStore.containsKey(key))
return new StoredModel(null, MessageConstants.STATUS_KEY_NOT_FOUND);
return new StoredModel(memoryKVStore.get(key), MessageConstants.STATUS_OK);
}
@Override
public StoredModel update(String key, StorageModel value) {
StorageModel previousValue = memoryKVStore.get(key);
if (previousValue == null)
return new StoredModel(null, MessageConstants.STATUS_KEY_NOT_FOUND);
else if (previousValue.getAccessType() == MessageConstants.METADATA_ACCESS_PRIVATE
&& !previousValue.getOwner().equals(value.getOwner()))
return new StoredModel(null, MessageConstants.STATUS_UNAUTHORIZED_PRIVATE_KEY_ACCESS);
// the request will have the old version number of the data to be inserted, we only update the data
// with a new version number if at the time of update the two versions match
// else we reject the update
StorageModel newValue = value.createVersionUpdatedModel();
boolean status = memoryKVStore.replace(key, value, newValue); // the equals method is overridden in Storage model
// to equate two objects based on their version numbers
if (status)
return new StoredModel(newValue, MessageConstants.STATUS_OK);
else
return new StoredModel(previousValue, MessageConstants.STATUS_UPDATE_VERSION_MISMATCH);
}
@Override
public StoredModel delete(String key, StorageModel value) {
StorageModel previousValue = memoryKVStore.get(key);
if (previousValue == null)
return new StoredModel(null, MessageConstants.STATUS_KEY_NOT_FOUND);
else if (previousValue.getAccessType() == MessageConstants.METADATA_ACCESS_PRIVATE
&& !previousValue.getOwner().equals(value.getOwner()))
return new StoredModel(null, MessageConstants.STATUS_UNAUTHORIZED_PRIVATE_KEY_ACCESS);
boolean status = memoryKVStore.remove(key, value);
if (status)
return new StoredModel(previousValue, MessageConstants.STATUS_OK);
else
return new StoredModel(previousValue, MessageConstants.STATUS_UPDATE_VERSION_MISMATCH);
}
}
package hpdos.lib;
import hpdos.grpc.ReplicationRequest;
import hpdos.grpc.ReplicationResponse;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
public interface ReplicationService {
abstract ReplicationResponse replicateMetadata(ReplicationRequest replicationRequest) throws InterruptedException, ExecutionException;
abstract void cleanup() throws InterruptedException;
abstract HashMap<String, MasterFollower> getFollowers();
}
package hpdos.lib;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
public class StorageModel {
private int version;
private int dataSize;
private final String key; // key is immutable
private long crc;
private int accessType;
private String value;
private final String owner; // ownership is immutable
public StorageModel(int version, int dataSize, String key, int accessType, String owner, String value) {
this.version = version;
this.dataSize = dataSize;
this.key = key;
this.accessType = accessType;
this.value = value;
this.owner = owner;
// calculate CRC32 based on the value field
byte[] bytes = value.getBytes();
Checksum checksum = new CRC32();
checksum.update(bytes, 0, bytes.length);
this.crc = checksum.getValue();
}
public StorageModel createVersionUpdatedModel() {
int updatedVersion = (this.getVersion() + 1) % Integer.MAX_VALUE; // version wraps around
return new StorageModel(updatedVersion, this.getDataSize(),
this.getKey(), this.getAccessType(), this.getOwner(), this.getValue());
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public int getDataSize() {
return dataSize;
}
public void setDataSize(int dataSize) {
this.dataSize = dataSize;
}
public String getKey() {
return key;
}
public long getCrc() {
return crc;
}
public void setCrc(long crc) {
this.crc = crc;
}
public int getAccessType() {
return accessType;
}
public void setAccessType(int accessType) {
this.accessType = accessType;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public String getOwner() {
return owner;
}
@Override
public boolean equals(Object obj) {
if (obj == null)
return false;
if (obj.getClass() != this.getClass())
return false;
StorageModel model = (StorageModel) obj;
return this.getVersion() == model.getVersion();
}
}
package hpdos.message;
public class MessageConstants {
public static final int INIT_VERSION = 1;
public static final int INVALID_VERSION = -1;
public static final int METADATA_ACCESS_PRIVATE = 700;
public static final int METADATA_ACCESS_SHARED = 777;
// 00 to 99 - Client Server Interaction operations
public static final int PACKET_METADATA_REQUEST = 0;
public static final int PACKET_METADATA_RESPONSE = 1;
// Distinguishing ACK and NACK packets
public static final int STATUS_OK = 200;
public static final int STATUS_UNAUTHORIZED_PRIVATE_KEY_ACCESS = 401;
public static final int STATUS_REPLICATION_FAILED = 402;
public static final int STATUS_SERVER_NOT_MASTER = 403;
public static final int STATUS_KEY_NOT_FOUND = 404;
public static final int STATUS_REPLICATION_TIMEOUT = 405;
public static final int STATUS_IO_WRITE_FAILED = 406;
public static final int STATUS_KEY_EXISTS = 407;
public static final int STATUS_UPDATE_VERSION_MISMATCH = 408;
// 100 to 199 - HPDOS System internal operations
public static final int MASTER_HEARTBEAT = 100;
public static final int METADATA_CREATE = 101;
public static final int METADATA_READ = 102;
public static final int METADATA_UPDATE = 103;
public static final int METADATA_DELETE = 104;
}
package hpdos.message;
import hpdos.grpc.*;
import java.util.ArrayList;
public class ResponseBuilder {
public static Ack buildAck(int version, int dataSize, String key, long crc, String value) {
Ack.Builder ack = Ack.newBuilder();
ack.setKey(key);
ack.setVersion(version);
ack.setDataSize(dataSize);
ack.setCrc(crc);
ack.setValue(value);
return ack.build();
}
public static Nack buildNack(int version, String key) {
Nack.Builder nack = Nack.newBuilder();
nack.setKey(key);
nack.setVersion(version);
return nack.build();
}
public static Response buildResponsePacket(int operationType, int status,
Ack ack,
Nack nack) {
Response.Builder response = Response.newBuilder();
response.setOperationType(operationType);
response.setStatus(status);
if (ack != null)
response.setAck(ack);
else
response.clearAck();
if (nack != null)
response.setNack(nack);
else
response.clearNack();
return response.build();
}
public static Packet buildPacket(ArrayList<Response> responses) {
Packet.Builder packet = Packet.newBuilder();
packet.setPacketType(MessageConstants.PACKET_METADATA_RESPONSE);
packet.addAllResponse(responses);
return packet.build();
}
public static Packet buildPacket(Response response) {
Packet.Builder packet = Packet.newBuilder();
packet.setPacketType(MessageConstants.PACKET_METADATA_RESPONSE);
packet.addResponse(response);
return packet.build();
}
public static ReplicationResponse buildReplicationResponse(Response response) {
ReplicationResponse.Builder packet = ReplicationResponse.newBuilder();
packet.setPacketType(MessageConstants.PACKET_METADATA_RESPONSE);
packet.addResponse(response);
return packet.build();
}
public static ReplicationResponse buildReplicationResponse(ArrayList<Response> response) {
ReplicationResponse.Builder packet = ReplicationResponse.newBuilder();
packet.setPacketType(MessageConstants.PACKET_METADATA_RESPONSE);
if (response.size() > 1)
System.out.println("RESPONSE BATCHING" + response.size());
try {
for (Response subResponse: response)
if (subResponse != null)
packet.addResponse(subResponse);
} catch (Exception e) {
System.out.println(response);
System.exit(0);
}
return packet.build();
}
}
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package hpdos;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.*;
public class AppTest {
@Test public void testAppHasAGreeting() throws IOException {
MetadataServer classUnderTest = new MetadataServer("dummypath");
assertNotNull("app should have a greeting", classUnderTest.getGreeting());
}
}
#### Metadata server with kernel stack and LSM-tree backend
- Install gradle version
https://linuxize.com/post/how-to-install-gradle-on-ubuntu-20-04/
- Steps to compile/run metadata application
-- Server
--- ./gradlew build
--- ./gradlew installDist
--- ./app/build/install/app/bin/app
-- Client
--- ./gradlew build
--- ./gradlew installDist
--- ./app/build/install/app/bin/app <absolute-path>/app.config
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment