From 893cb4f4467eda98733b7ade4c779712893dfb68 Mon Sep 17 00:00:00 2001 From: Bart Van Der Meerssche Date: Sun, 30 Jan 2011 22:57:37 +0100 Subject: [PATCH] [fluksod] port remaining coroutines to v2 --- .../package/flukso/luasrc/flukso/data.lua | 122 ++++++++++++++++++ .../openwrt/package/flukso/luasrc/fluksod.lua | 99 +++++++++++++- 2 files changed, 219 insertions(+), 2 deletions(-) create mode 100644 mote/v2/openwrt/package/flukso/luasrc/flukso/data.lua diff --git a/mote/v2/openwrt/package/flukso/luasrc/flukso/data.lua b/mote/v2/openwrt/package/flukso/luasrc/flukso/data.lua new file mode 100644 index 0000000..f3fcce6 --- /dev/null +++ b/mote/v2/openwrt/package/flukso/luasrc/flukso/data.lua @@ -0,0 +1,122 @@ +--[[ + + data.lua: property and methods for manipulating incoming measurements + + Copyright (c) 2009 jokamajo.org + 2010 - 2011 Bart Van Der Meerssche + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + +]]-- + + +local os, math, table, string = + os, math, table, string + +local getfenv, setmetatable, pairs, ipairs = + getfenv, setmetatable, pairs, ipairs + +module (...) +local modenv = getfenv() -- module environment + +-- private +local function timestamps(T) + local H = {} -- helper table, an indexed array containing all the measurement's timestamps + for timestamp in pairs(T) do + H[#H+1] = timestamp + end + + table.sort(H) -- sort in ascending order, oldest timestamps will be treated first + return H +end + +function new() + return setmetatable({}, {__index = modenv}) +end + +function add(M, meter, timestamp, value) + if not M[meter] then + M[meter] = {} + end + + M[meter][timestamp] = value +end + +function clear(M) + for meter in pairs(M) do + M[meter] = nil + end +end + +function filter(M, span, offset) + for meter, T in pairs(M) do + local H = timestamps(T) + local i = 2 + while not (H[i+1] == nil or H[i] > os.time()-offset) do + if math.floor(H[i-1]/span) == math.floor(H[i]/span) and + math.floor(H[i]/span) == math.floor(H[i+1]/span) then + T[H[i]] = nil + table.remove(H, i) + + else + i = i+1 + end + end + end +end + +function truncate(M, cutoff) + for meter, T in pairs(M) do + local H = timestamps(T) + + for i = H[1], os.time() - cutoff do + T[i] = nil + end + end +end + +function fill(M) + for meter, T in pairs(M) do + local H = timestamps(T) + + for i = H[#H]-1, H[1]+1, -1 do + if T[i] == nil or T[i] == '"nan"' then + T[i] = T[i+1] + end + end + + for i = H[#H]+1, os.time() do + T[i] = '"nan"' + end + end +end + +function json_encode(M) + local J = {} + + for meter, T in pairs(M) do + local H = timestamps(T) + local SB = {'['} -- use a string buffer for building up the JSON string + + for k, timestamp in ipairs(H) do + SB[#SB+1] = '[' .. timestamp .. ',' .. T[timestamp] .. '],' + end + + SB[#SB] = SB[#SB]:sub(1, -2) -- remove the trialing comma from the last entry + SB[#SB+1] = ']' + J[meter] = table.concat(SB) + end + + return J +end diff --git a/mote/v2/openwrt/package/flukso/luasrc/fluksod.lua b/mote/v2/openwrt/package/flukso/luasrc/fluksod.lua index 2cf0b4f..e5e1583 100755 --- a/mote/v2/openwrt/package/flukso/luasrc/fluksod.lua +++ b/mote/v2/openwrt/package/flukso/luasrc/fluksod.lua @@ -41,6 +41,8 @@ local DELTA_PATH_OUT = DELTA_PATH .. '/out' local O_RDWR = nixio.open_flags('rdwr') local O_RDWR_NONBLOCK = nixio.open_flags('rdwr', 'nonblock') +local O_RDWR_CREAT = nixio.open_flags('rdwr', 'creat') + local POLLIN = nixio.poll_flags('in') -- parse and load /etc/config/flukso @@ -50,6 +52,17 @@ local WAN_INTERVAL = 300 local LAN_ENABLED = true local TIMESTAMP_MIN = 1234567890 +local WAN_FILTER = { [1] = {}, [2] = {}, [3] = {} } +WAN_FILTER[1].span = 60 +WAN_FILTER[1].offset = 0 +WAN_FILTER[2].span = 900 +WAN_FILTER[2].offset = 7200 +WAN_FILTER[3].span = 86400 +WAN_FILTER[3].offset = 172800 + +local LAN_POLISH_CUTOFF = 60 +local LAN_PUBLISH_PATH = DAEMON_PATH .. '/sensor' + function dispatch(wan_child, lan_child) return coroutine.create(function() local delta = { fdin = nixio.open(DELTA_PATH_IN, O_RDWR_NONBLOCK), @@ -135,6 +148,37 @@ function wan_buffer(child) end) end +function filter(child, span, offset) + return coroutine.create(function(measurements) + while true do + measurements:filter(span, offset) + coroutine.resume(child, measurements) + measurements = coroutine.yield() + end + end) +end + + +function send(child) -- TODO fill in dummy send + return coroutine.create(function(measurements) + while true do + -- measurements:clear() + coroutine.resume(child, measurements) + measurements = coroutine.yield() + end + end) +end + +function gc(child) + return coroutine.create(function(measurements) + while true do + collectgarbage() -- force a complete garbage collection cycle + coroutine.resume(child, measurements) + measurements = coroutine.yield() + end + end) +end + function lan_buffer(child) return coroutine.create(function(sensor_id, timestamp, power, counter, msec) local measurements = data.new() @@ -179,6 +223,43 @@ function lan_buffer(child) end) end +function polish(child, cutoff) + return coroutine.create(function(measurements) + while true do + measurements:fill() + measurements:truncate(cutoff) + coroutine.resume(child, measurements) + measurements = coroutine.yield() + end + end) +end + +function publish(child, dir) + return coroutine.create(function(measurements) + nixio.fs.mkdirr(dir) + + for file in nixio.fs.dir(dir) do + nixio.fs.unlink(file) + end + + while true do + local measurements_json = measurements:json_encode() + + for sensor_id, json in pairs(measurements_json) do + local file = dir .. '/' .. sensor_id + + nixio.fs.unlink(file) + fd = nixio.open(file, O_RDWR_CREAT) + fd:write(json) + fd:close() + end + + coroutine.resume(child, measurements) + measurements = coroutine.yield() + end + end) +end + function debug(child) return coroutine.create(function(measurements) while true do @@ -197,12 +278,26 @@ end local wan_chain = wan_buffer( - debug(nil) + filter( + filter( + filter( + send( + gc( + debug(nil) + ) + ) + , WAN_FILTER[3].span, WAN_FILTER[3].offset) + , WAN_FILTER[2].span, WAN_FILTER[2].offset) + , WAN_FILTER[1].span, WAN_FILTER[1].offset) ) local lan_chain = lan_buffer( - debug(nil) + polish( + publish( + debug(nil) + , LAN_PUBLISH_PATH) + , LAN_POLISH_CUTOFF) ) local chain = dispatch(wan_chain, lan_chain)