index.js 5.1 KB
Newer Older
1
'use strict';
nilanjandaw's avatar
nilanjandaw committed
2
const constants = require(".././constants.json")
3
const secrets = require('./secrets.json')
4
const config = require('./config.json')
5
const libSupport = require('./lib')
6 7
libSupport.updateConfig()
const node_id = config.id
8
const {spawn } = require('child_process')
9
const execute = require('./execute')
10
const fs = require('fs')
11 12 13
const fetch = require('node-fetch');

let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
Nilanjan Daw's avatar
Nilanjan Daw committed
14
metadataDB = metadataDB + "/" + constants.function_db_name + "/"
15

16
const kafka = require('kafka-node')
17
const logger = libSupport.logger
Nilanjan Daw's avatar
Nilanjan Daw committed
18

19
const local_repository = __dirname + "/local_repository/"
nilanjandaw's avatar
nilanjandaw committed
20
const host_url = "http://" + constants.master_address + ":" + constants.master_port
21

22
let Producer = kafka.Producer,
23
    client = new kafka.KafkaClient({ 
24
        kafkaHost: constants.network.external.kafka_host,
25 26 27
        autoConnect: true
    }),
    producer = new Producer(client),
28 29 30
    Consumer = kafka.Consumer

libSupport.makeTopic(node_id).then(() => {
31
    logger.info("node topic created")
32
    let consumer = new Consumer(client,
33
        [
34
            { topic: node_id, partition: 0, offset: 0 }
35 36 37 38
        ],
        [
            { autoCommit: true }
        ])
39
    consumer.on('message', function (message) {
40
        logger.info(message);
41
        let topic = message.topic
42
        message = message.value
43
        message = JSON.parse(message)
44 45 46 47 48 49 50 51
        let runtime = message.runtime
        let functionHash = message.functionHash
        let resource_id = message.resource_id
        let port = message.port
        /**
         * Download necessary files (function file) and Start resource deployment
         */
        if (message.type === "execute") {
52
            logger.info("Received Deployment request for resource_id: " + resource_id);
53 54 55 56 57
            fetch(metadataDB + functionHash).then(res => res.json())
            .then(json => {
                console.log("metadata", json);
                
                libSupport.download(host_url + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => {
58 59 60 61 62 63 64 65
                    let metadata = {
                        resource_id, functionHash,
                        runtime, port,
                        resources: {
                            memory: json.memory
                        }
                    }
                    startWorker(local_repository, producer, metadata)
66
            })
67 68 69 70
            }).catch(err => {
                logger.error("something went wrong" + err.toString())
            });
            
71

72
        }
73 74

    })
75
})
76 77 78 79

/**
 * download and start grunt
 */
80
libSupport.download(constants.grunt_host, "grunt", false).then(() => {
81
    logger.info("Downloaded grunt binary from repository")
82
    fs.chmod('grunt', 0o755, (err) => {
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
        logger.info("grunt made executable. Starting grunt")
        let grunt = spawn('./grunt', [node_id])
        grunt.stdout.on('data', data => {
            logger.info(data.toString());

        })

        grunt.stderr.on('data', data => {
            logger.info(data.toString());

        })
        grunt.on('close', (code) => {
            logger.info("Grunt exited with exit code", code);

        })
    })

})

102
    
Nilanjan Daw's avatar
Nilanjan Daw committed
103 104 105 106
/**
 * Start a worker executor of the runtime type
 * @param {String} local_repository 
 * @param {String} functionHash 
107
 * @param {String} resource_id 
Nilanjan Daw's avatar
Nilanjan Daw committed
108 109 110 111
 * @param {String} producer 
 * @param {String} runtime 
 * @param {Number} port 
 */
112 113 114
function startWorker(local_repository, producer, metadata) {
    let runtime = metadata.runtime
    console.log(metadata);
115
    
116 117 118 119 120 121 122 123 124 125
    logger.info(`Using port ${metadata.port} for functionHash ${metadata.functionHash}`)
    
    if (runtime === "isolate")
        execute.runIsolate(local_repository, metadata)
        .catch(err => {
            logger.error("=====================deployment failed=========================");
            producer.send([{
                topic: "deployed",
                messages: JSON.stringify({
                    "status": false,
126
                    resource_id: metadata.resource_id,
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
                    "reason": "isolate exit"
                })
            }], () => { })
        })
    else if (runtime === "process")
        execute.runProcess(local_repository, metadata)
        .catch(err => {
            logger.error("=====================deployment failed=========================");
            producer.send([{ topic: "deployed",
                messages: JSON.stringify({ 
                "status": false, 
                resource_id: metadata.resource_id,
                "reason": "process exit"
            }) }], () => { })
        })
    else if (runtime === "container")
        execute.runContainer(metadata)
    else {
        producer.send(
            [{
                topic: "response",
                messages: JSON.stringify({ status: "unknown runtime" })
            }], () => { })

        return
    }
153
    
154
}
155 156

function heartbeat() {
157 158
    let payload = [{
        topic: "heartbeat",
Nilanjan Daw's avatar
Nilanjan Daw committed
159
        messages: JSON.stringify({"address": node_id, "timestamp": Date.now()})
160
    }]
161
    producer.send(payload, function(cb) {})
162 163
}

164

165
setInterval(heartbeat, 1000);