index.js 5.56 KB
Newer Older
1
'use strict';
Mahendra Patel's avatar
Mahendra Patel committed
2
const constants = require(".././constants_local.json")
3
const secrets = require('./secrets.json')
4
const config = require('./config.json')
5
const libSupport = require('./lib')
6 7
libSupport.updateConfig()
const node_id = config.id
8
const {spawn } = require('child_process')
9
const execute = require('./execute')
10
const fs = require('fs')
11
const fetch = require('node-fetch');
12
const os = require('os');
13 14

let metadataDB = `http://${secrets.couchdb_username}:${secrets.couchdb_password}@${constants.couchdb_host}`
nilanjandaw's avatar
nilanjandaw committed
15
metadataDB = metadataDB + "/" + constants.db.function_meta + "/"
16

17
const kafka = require('kafka-node')
18
const logger = libSupport.logger
Nilanjan Daw's avatar
Nilanjan Daw committed
19

20
const local_repository = __dirname + "/local_repository/"
nilanjandaw's avatar
nilanjandaw committed
21
const host_url = "http://" + constants.master_address + ":" + constants.master_port
22

23
let Producer = kafka.Producer,
24
    client = new kafka.KafkaClient({ 
25
        kafkaHost: constants.network.external.kafka_host,
26 27 28
        autoConnect: true
    }),
    producer = new Producer(client),
29 30 31
    Consumer = kafka.Consumer

libSupport.makeTopic(node_id).then(() => {
32
    logger.info("node topic created")
33
    let consumer = new Consumer(client,
34
        [
35
            { topic: node_id, partition: 0, offset: 0 }
36 37 38 39
        ],
        [
            { autoCommit: true }
        ])
40
    consumer.on('message', function (message) {
41
        // logger.info(message);
42
        let topic = message.topic
43
        message = message.value
44
        message = JSON.parse(message)
45 46 47 48 49 50 51
        let runtime = message.runtime
        let functionHash = message.functionHash
        let resource_id = message.resource_id
        let port = message.port
        /**
         * Download necessary files (function file) and Start resource deployment
         */
52
        if (message.type === "execute" && topic === node_id) {
53
            logger.info("Received Deployment request for resource_id: " + resource_id);
54 55 56 57 58
            fetch(metadataDB + functionHash).then(res => res.json())
            .then(json => {
                console.log("metadata", json);
                
                libSupport.download(host_url + "/repository/" + functionHash + ".js", local_repository + functionHash + ".js").then(() => {
59 60 61 62 63 64 65 66
                    let metadata = {
                        resource_id, functionHash,
                        runtime, port,
                        resources: {
                            memory: json.memory
                        }
                    }
                    startWorker(local_repository, producer, metadata)
67
            })
68 69 70 71
            }).catch(err => {
                logger.error("something went wrong" + err.toString())
            });
            
72

73
        }
74 75

    })
76
})
77 78 79 80

/**
 * download and start grunt
 */
81
libSupport.download(constants.grunt_host, "grunt", false).then(() => {
82
    logger.info("Downloaded grunt binary from repository")
83
    fs.chmod('grunt', 0o755, (err) => {
84 85 86
        logger.info("grunt made executable. Starting grunt")
        let grunt = spawn('./grunt', [node_id])
        grunt.stdout.on('data', data => {
87
            // logger.info(data.toString());
88 89 90 91

        })

        grunt.stderr.on('data', data => {
92
            // logger.info(data.toString());
93 94 95 96 97 98 99 100 101 102

        })
        grunt.on('close', (code) => {
            logger.info("Grunt exited with exit code", code);

        })
    })

})

103
    
Nilanjan Daw's avatar
Nilanjan Daw committed
104 105 106 107
/**
 * Start a worker executor of the runtime type
 * @param {String} local_repository 
 * @param {String} functionHash 
108
 * @param {String} resource_id 
Nilanjan Daw's avatar
Nilanjan Daw committed
109 110 111 112
 * @param {String} producer 
 * @param {String} runtime 
 * @param {Number} port 
 */
113 114 115
function startWorker(local_repository, producer, metadata) {
    let runtime = metadata.runtime
    console.log(metadata);
116
    
117 118 119 120 121 122
    logger.info(`Using port ${metadata.port} for functionHash ${metadata.functionHash}`)
    
    if (runtime === "isolate")
        execute.runIsolate(local_repository, metadata)
        .catch(err => {
            logger.error("=====================deployment failed=========================");
123
            logger.error(err)
124 125 126 127
            producer.send([{
                topic: "deployed",
                messages: JSON.stringify({
                    "status": false,
128
                    resource_id: metadata.resource_id,
129 130 131 132 133
                    "reason": "isolate exit"
                })
            }], () => { })
        })
    else if (runtime === "process")
Mahendra Patel's avatar
Mahendra Patel committed
134
//	console.log("rutime is process : ",metadata)
135 136 137 138 139 140 141 142 143 144 145
        execute.runProcess(local_repository, metadata)
        .catch(err => {
            logger.error("=====================deployment failed=========================");
            producer.send([{ topic: "deployed",
                messages: JSON.stringify({ 
                "status": false, 
                resource_id: metadata.resource_id,
                "reason": "process exit"
            }) }], () => { })
        })
    else if (runtime === "container")
146 147
    {
        console.log("rutime is container : ",metadata)
148
        execute.runContainer(metadata)
149
    }
150 151 152 153 154 155 156 157 158
    else {
        producer.send(
            [{
                topic: "response",
                messages: JSON.stringify({ status: "unknown runtime" })
            }], () => { })

        return
    }
159
    
160
}
161 162

function heartbeat() {
163 164 165 166 167 168
    let info = {
        free_mem: os.freemem(),
        cpu_count: os.cpus().length,
        total_mem: os.totalmem(),
        avg_load: os.loadavg()
    }
169 170
    let payload = [{
        topic: "heartbeat",
171 172 173 174 175
        messages: JSON.stringify({
            "address": node_id,
            "system_info": info,
            "timestamp": Date.now()
        })
176
    }]
177
    console.log("daemon system info : ", info)
178
    producer.send(payload, function(cb) {})
179 180
}

181

182
setInterval(heartbeat, 1000);