X

xanadu

Project Xanadu

A Hybrid execution environment for Serverless Workloads

Execution environments are typically container oriented for FaaS platforms but this creates problems since containers are on one hand not totally secure and on the other hand not capable of high-performance. This project looks into creating a hybrid execution environment to cater to different workload needs.

Buy one for yourself!

Clone using "git clone --recursive https://git.cse.iitb.ac.in/synerg/xanadu"

Architecture

Xanadu is divided into two extremely loosely coupled modules, the Dispatch System (DS) and the Resource System (RS) module. The RS looks after resource provisioning and consolidation at the host level while the DS looks after handling user requests and executing those requests at the requisite isolation level using resources provided by the RS. A loose architecture diagram of Xanadu is given below.

Xanadu Architecture

Inter-component Communication Interface

The Dispatch Manager (DM) sends a request to the Resource Manager (RM), detailing what resources it needs, on the Kafka topic REQUEST_DM_2_AM.

{
    "resource_id": "unique-transaction-id",
    "timestamp" : "time(2) compatible timestamp",
    "memory": 1024, // in MiB
    ... // Any other resources
}

The RM finds a list of nodes that will satisfy those resource demands and return it to the DM on the Kafka topic RESPONSE_RM_2_DM. Format:

{
    "resource_id": "unique-transaction-id",
    "timestamp" : "time(2) compatible timestamp",
    "grunts": [
           { node_id: some unique ID, port: port address}, ...
     ] // List of machine IDs
}

Once the runtime entity has been launched (or the launch has failed), the Executor sends back a status message on the LOG_COMMON topic.

{
    "message_type" : "deployment",
    "reason": "launch"/"terminate",
    "node_id" : "unique-machine-id",
    "entity_id" : "handle for the actual container/VM/etc.",
    "entity_type" : "docker/libvirt/etc.",
    "resource_id": "logical-entity-id",
    "function_id": "unique-function-id",
    "timestamp" : "time(2) compatible timestamp",
}

Instrumentation data is also sent on the LOG_COMMON topic. This data is sent from whichever part of the pipeline has access to the relevant information, and whoever needs the data is allowed to read it. Each message is required to have atleast three fields: node_id, resource_id and function_id.

{ // From Docker
    "message_type" : "instrumentation",
    "node_id" : "unique-machine-id",
    "resource_id": "logical-entity-id",
    "function_id": "unique-function-id",
    "entity_id" : "handle for the actual container/VM/etc.",
    "entity_type" : "docker/libvirt/etc.",
    "timestamp" : "time(2) compatible timestamp",
    "data" : {
           "cpu_percentage" : 0.43,
           "memory_percentage" : 4.32,
           "memory_used" : "42MB",
           "memory_usable" : "543MB",
           "disk_read" : "4GB",
           "disk_written" : "43GB",
           "net_upload" : "32MB",
           "net_download" : "54MB"
    }
}

{ // Example message from Executor
    "message_type" : "instrumentation",
    "node_id" : "unique-machine-id",
    "resource_id": "logical-entity-id",
    "function_id": "unique-function-id",
    "timestamp" : "time(2) compatible timestamp",
    "cpu" : 343, // in MHz
    "memory": 534, // in MiB
    "network": 234 // in KBps
}

{ // Example message from reverse proxy
    "message_type" : "instrumentation",
    "node_id" : "unique-machine-id",
    "resource_id": "logical-entity-id",
    "function_id": "unique-function-id",
    "timestamp" : "time(2) compatible timestamp",
    "average_fn_time" : 23 // in ms
}

{ // Example message from dispatch manager
    "message_type" : "instrumentation",
    "node_id" : "unique-machine-id",
    "resource_id": "logical-entity-id",
    "function_id": "unique-function-id",
    "timestamp" : "time(2) compatible timestamp",
    "coldstart_time"
}

Dispatch System (DS)

The DS is divided into two submodules the Dispatch Manager and the Dispatch Daemon. The Dispatch Manager runs on the Master node while the Dispatch Daemon runs on each Worker nodes. When a request arrives at the Dispatch Manager, it queries the RM for resources and on receiving the resource requests the Dispatch Daemon to run and execute the function on the specified worker node.

Directory Structure

.
├── constants.json
├── dispatch_daemon
│   ├── config.json
│   ├── execute.js
│   ├── index.js
│   ├── isolate.js
│   ├── lib.js
│   ├── local_repository
│   ├── package.json
│   └── package-lock.json
├── dispatch_manager
│   ├── index.js
│   ├── isolate.js
│   ├── lib.js
│   ├── package.json
│   ├── package-lock.json
│   └── repository
└── package-lock.json

System Requirements

  • Node.js (10.x and above)
  • g++
  • build-essential
  • Docker
  • Java
  • Apache Kafka (Configure to allow auto-delete and auto-registration of topics)
  • couchdb (needs a database named serverless)

Starting the server

After nodejs has been installed

  • Install the dependencies: execute npm install from within the project folder, dispatch_manager folder and dispatch_daemon folder on each node the daemon is going to run.
  • Modify the constants.json file as required to reflect the dispatch manager and other services IP addresses.
  • Update Speculation related parameters as required in the constants.json file.
  • Create a folder named local_repository in the dispatch_daemon folder on all the nodes it is running.
  • Start the Dispatch Manager and Dispatch Daemon server as node index.js and the Resource Manager as node rm.jsin separate terminals.

Internal Communication Interfaces

Dispatch Manager (DM)

Internally DM uses Apache Kafka for interaction between the Dispatch Manager (DM) and the Dispatch Agents, while the messages are in JSON format.

Every Dispatch Agent listens on a topic which is its own UID (Currently the primary IP Address), the Dispatch Manager listens on the topics "response" and "heartbeat".

  • Request Message: When a request is received at the Dispatch Manager, it directs the Dispatch Agent to start a worker environment. A message is sent via the
  • chose Worker's ID topic. \ Format:
{ type: "execute",
  function_id: "onetime unique ID",
  runtime: "isolation runtime",
  functionHash: "hash of the function to be run" }
  • Response Message: In response, the worker executes the function, pulling resources from the central repository as required and sends a response. \ Format:
{ status: 'success',
  result: 'result of the execution',
  function_id: 'onetime unique ID' }
  • Heartbeat Message: The Dispatch Daemons also publish a periodic Heartbeat message back to the Dispatch Manager as a liveness test.\ Format:
{ address: 'UID of the worker' }

Interaction API

The platform works via a HTTP API based interface, the interface is divided into two parts:

  • Deploy: The deploy interface is used to upload the function file and store on the server, and also setup containers and VM images.\ An example CURL command:
curl -X POST \
  http://localhost:8080/serverless/deploy \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -H 'cache-control: no-cache' \
  -H 'content-type: multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW' \
  -F runtime=container \
  -F serverless=@/home/nilanjan/Desktop/serverless/hybrid/test/script.js

The POST request contains two parameters: 1. runtime which specifies the runtime to use viz. isolate, process, container or virtual machine and 2. severless which sends the serverless function as file via multipart/form-data.\ On successful deployment the API returns a function key which is to be for function execution.

  • Execute: To execute the submitted function, we use the Execute API.\ An example CURL command:
 curl -X POST \
  http://localhost:8080/serverless/execute/761eec785d64451203293427bea5c7ad \
  -H 'cache-control: no-cache' \
  -H 'content-type: multipart/form-data; boundary=----WebKitFormBoundary7MA4YWxkTrZu0gW' \
  -F runtime=process

The API takes a route value as the key returned by the deploy API and a runtime parameter specifying the runtime to be used.

Resource System

Dependencies

  1. clang: version 9.0
  2. librdkafka: version 0.11.6

Internal Messages

Upon being launched, each Resource Daemon (RD) sends a JOIN message to the RM on the Kafka topic JOIN_RD_2_RM.

{
    "node_id": "unique-machine-id",
    "timestamp" : "iso-8601-timestamp"
}

After this, RDs send a heartbeat message to the RM periodically on topic HEARTBEAT_RD_2_RM. These messages contain the current state of all the resources being tracked by RDs on each machine. This data is cached by the RM.

{
    "node_id": "unique-machine-id",
    "timestamp" : "time(2) compatible timestamp",
    "memory": 1024, // in MiB
    ... // Any other resources
}

If the RM recieves a heartbeat from an RD which has not joined before (due to either the RM dying and some older messages being stuck in Kafka), it sends a rejoin command to the RD on topic REJOIN_RM_2_RD.

{
    "node_id": "unique-machine-id",
    "timestamp" : "iso-8601-timestamp"
}

Also, if the RM doesn't recieve heartbeats from some RD for some amount of time, it assumes that the RD is dead and frees its resources. If the RD sends a beat after this, the rejoin message is sent.

The RM, upon recieving the request from the DM, checks its local cache to find a suitable machine. If it finds some, it sends a message back to the DM on topic RESPONSE_RM_2_DM.

{
    "resource_id": "unique-transaction-id",
    "timestamp" : "time(2) compatible timestamp",
//    "port": 2343 --- NOT IMPLEMENTED YET
    "nodes": ["a", "b", ...] // List of unique machine IDs
}

If, on the other hand, the RM can't find any such machine in its cache, it sends a message to all the RDs requesting their current status. This message is posted on the topic REQUEST_RM_2_RD. Format:

{
    "resource_id": "unique-transaction-id",
    "timestamp" : "time(2) compatible timestamp",
    "memory": 1024, // in MiB
    ... // Any other resources
}

The RDs recieve this message and send back whether on not they satisfy the constraints on topic RESPONSE_RD_2_RM.

{
    "node_id": "unique-machine-id",
    "resource_id": "unique-transaction-id",
    "timestamp" : "time(2) compatible timestamp",
    "success" : 0/1 // 0 = fail, 1 = success
}

The RM waits for a certain amount of time for the RDs; then, it sends a list of however many RDs have replied affirmatively to the DM on topic RESPONSE_RM_2_DM, as described above.