diff --git a/mote/v2/openwrt/package/flukso/luasrc/fluksod.lua b/mote/v2/openwrt/package/flukso/luasrc/fluksod.lua index 82b1ab8..2cf0b4f 100755 --- a/mote/v2/openwrt/package/flukso/luasrc/fluksod.lua +++ b/mote/v2/openwrt/package/flukso/luasrc/fluksod.lua @@ -26,6 +26,7 @@ local dbg = require 'dbg' local nixio = require 'nixio' nixio.fs = require 'nixio.fs' local uci = require 'luci.model.uci'.cursor() +local data = require 'flukso.data' local arg = arg or {} -- needed when this code is not loaded via the interpreter @@ -44,43 +45,166 @@ local POLLIN = nixio.poll_flags('in') -- parse and load /etc/config/flukso local FLUKSO = uci:get_all('flukso') +local WAN_ENABLED = true +local WAN_INTERVAL = 300 +local LAN_ENABLED = true +local TIMESTAMP_MIN = 1234567890 +function dispatch(wan_child, lan_child) + return coroutine.create(function() + local delta = { fdin = nixio.open(DELTA_PATH_IN, O_RDWR_NONBLOCK), + fdout = nixio.open(DELTA_PATH_OUT, O_RDWR) } -local delta = { fdin = nixio.open(DELTA_PATH_IN, O_RDWR_NONBLOCK), - fdout = nixio.open(DELTA_PATH_OUT, O_RDWR) } + if delta.fdin == nil or delta.fdout == nil then + -- TODO output to syslog + print('Error. Unable to open the delta fifos.') + print('Exiting...') + os.exit(1) + end -if delta.fdin == nil or delta.fdout == nil then - -- TODO output to syslog - print('Error. Unable to open the delta fifos.') - print('Exiting...') - os.exit(1) + -- TODO acquire an exclusive lock on the delta fifos or exit + + local function tolua(num) + return num + 1 + end + + for line in delta.fdout:linesource() do + print(line) + + local timestamp, data = line:match('^(%d+)%s+([%d%s]+)$') + timestamp = tonumber(timestamp) + + for i, counter, extra in data:gmatch('(%d+)%s+(%d+)%s+(%d+)') do + i = tonumber(i) + counter = tonumber(counter) + extra = tonumber(extra) + + -- map index(+1!) to sensor id and sensor type + local sensor_id = FLUKSO[tostring(tolua(i))]['id'] + local sensor_type = FLUKSO[tostring(tolua(i))]['type'] + + -- resume both branches + if WAN_ENABLED then + coroutine.resume(wan_child, sensor_id, timestamp, counter) + end + + if LAN_ENABLED then + if sensor_type == 'analog' then + coroutine.resume(lan_child, sensor_id, timestamp, extra) + + elseif sensor_type == 'pulse' then + coroutine.resume(lan_child, sensor_id, timestamp, nil, counter, extra) + end + end + -- check in the e branch whether the counter has increased, if not then discard + -- chech in both branches whether timestamp has increased + -- or do we override?? + end + end + end) end --- TODO acquire an exclusive lock on the delta fifos or exit +function wan_buffer(child) + return coroutine.create(function(sensor_id, timestamp, counter) + local measurements = data.new() + local threshold = timestamp + WAN_INTERVAL + local previous = {} -function tolua(num) - return num + 1 + while true do + if not previous[sensor_id] then + previous[sensor_id] = {} + end + + if timestamp > TIMESTAMP_MIN + and timestamp > (previous[sensor_id].timestamp or 0) + and counter > (previous[sensor_id].counter or 0) + then + + measurements:add(sensor_id, timestamp, counter) + previous[sensor_id].timestamp = timestamp + previous[sensor_id].counter = counter + end + + if timestamp > threshold and next(measurements) then --checking whether table is not empty + coroutine.resume(child, measurements) + threshold = timestamp + WAN_INTERVAL + end + + sensor_id, timestamp, counter = coroutine.yield() + end + end) end -for line in delta.fdout:linesource() do - print(line) +function lan_buffer(child) + return coroutine.create(function(sensor_id, timestamp, power, counter, msec) + local measurements = data.new() + local previous = {} - timestamp, data = line:match('^(%d+)%s+([%d%s]+)$') + local function diff(x, y) -- calculates y - x + if y >= x then + return y - x + else -- y wrapped around 32-bit boundary + return 4294967296 - x + y + end + end - for i, counter, extra in data:gmatch('(%d+)%s+(%d+)%s+(%d+)') do + while true do + if not previous[sensor_id] then + previous[sensor_id] = {} + end - -- map index(+1!) to sensor id and sensor type - local sensor_id = FLUKSO[tostring(tolua(i))]['id'] - local sensor_type = FLUKSO[tostring(tolua(i))]['type'] + if timestamp > TIMESTAMP_MIN and timestamp > (previous[sensor_id].timestamp or 0) then + if not power then -- we're dealing pulse message so first calculate power + if previous[sensor_id].msec and msec > prev[sensor_id].msec then + power = math.floor(diff(previous[sensor_id].counter, counter) / + diff(previous[sensor_id].msec, msec) * 3.6 * 10^6 + 0.5) + end - print(sensor_id, sensor_type, counter, extra) + -- if msec decreased, just update the value in the table + -- but don't make any power calculations since the AVR might have gone through a reset + previous[sensor_id].msec = msec + previous[sensor_id].counter = counter + end - -- resume both branches - -- check in the e branch whether the counter has increased, if not then discard - -- chech in both branches whether timestamp has increased - -- or do we override?? - end + measurements:add(sensor_id, timestamp, power) + previous[sensor_id].timestamp = timestamp + end + + if next(measurements) then --checking whether table is not empty + coroutine.resume(child, measurements) + end + + sensor_id, timestamp, power, counter, msec = coroutine.yield() + end + end) end +function debug(child) + return coroutine.create(function(measurements) + while true do + if DEBUG then + dbg.vardump(measurements) + end + if child then + coroutine.resume(child, measurements) + end + measurements = coroutine.yield() + end + end) +end + +local wan_chain = + wan_buffer( + debug(nil) + ) + +local lan_chain = + lan_buffer( + debug(nil) + ) + +local chain = dispatch(wan_chain, lan_chain) + +coroutine.resume(chain)