Reversed the chaining order in the flukso daemon
This commit is contained in:
parent
466db9793c
commit
25c49be892
|
@ -3,6 +3,7 @@
|
|||
--
|
||||
-- flukso.lua: flukso deamon running on openwrt
|
||||
-- Copyright (c) 2008-2009 jokamajo.org
|
||||
-- 2010 flukso.net
|
||||
--
|
||||
-- This program is free software; you can redistribute it and/or
|
||||
-- modify it under the terms of the GNU General Public License
|
||||
|
@ -37,8 +38,8 @@ local param = {xmlrpcaddress = 'http://logger.flukso.net/xmlrpc',
|
|||
device = '/dev/ttyS0',
|
||||
interval = 300}
|
||||
|
||||
function receive(device, pwraddress, pwrport, pwrenable)
|
||||
return coroutine.wrap(function()
|
||||
function receive(child, device, pwraddress, pwrport, pwrenable)
|
||||
return coroutine.create(function()
|
||||
-- open the connection to the syslog deamon, specifying our identity
|
||||
posix.openlog('flukso')
|
||||
posix.syslog(30, 'starting the flukso deamon')
|
||||
|
@ -59,7 +60,7 @@ function receive(device, pwraddress, pwrport, pwrenable)
|
|||
os.execute('gpioctl set 4 > /dev/null')
|
||||
|
||||
local meter, value = line:sub(5, 36), tonumber(line:sub(38))
|
||||
coroutine.yield(meter, os.time(), value)
|
||||
coroutine.resume(child, meter, os.time(), value)
|
||||
elseif line:sub(1, 3) == 'pwr' and line:len() == 47 and line:find(':') == 37 then -- user data + additional data integrity checks
|
||||
if pwrenable then udp:send(line) end
|
||||
elseif line:sub(1, 3) == 'msg' then -- control data
|
||||
|
@ -74,36 +75,35 @@ function receive(device, pwraddress, pwrport, pwrenable)
|
|||
end)
|
||||
end
|
||||
|
||||
function buffer(source, interval)
|
||||
return coroutine.wrap(function()
|
||||
function buffer(child, interval)
|
||||
return coroutine.create(function(meter, timestamp, value)
|
||||
local measurements = data.new()
|
||||
local threshold = os.time() + interval
|
||||
|
||||
while true do
|
||||
local meter, timestamp, value = source()
|
||||
if meter ~= nil and timestamp > 1234567890 then measurements:add(meter, timestamp, value) end
|
||||
if timestamp > threshold and next(measurements) then --checking whether table is not empty
|
||||
coroutine.yield(measurements)
|
||||
coroutine.resume(child, measurements)
|
||||
threshold = timestamp + interval
|
||||
end
|
||||
meter, timestamp, value = coroutine.yield()
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
function filter(source, span, offset)
|
||||
return coroutine.wrap(function()
|
||||
function filter(child, span, offset)
|
||||
return coroutine.create(function(measurements)
|
||||
while true do
|
||||
local measurements = source()
|
||||
measurements:filter(span, offset)
|
||||
coroutine.yield(measurements)
|
||||
coroutine.resume(child, measurements)
|
||||
measurements = coroutine.yield()
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
function send(source, address, version, method)
|
||||
return coroutine.wrap(function()
|
||||
function send(child, address, version, method)
|
||||
return coroutine.create(function(measurements)
|
||||
while true do
|
||||
local measurements = source()
|
||||
local auth = auth.new()
|
||||
auth:load()
|
||||
auth:hmac(measurements)
|
||||
|
@ -122,19 +122,29 @@ function send(source, address, version, method)
|
|||
else
|
||||
posix.syslog(27, tostring(ret_or_err)..' '..address..' '..tostring(res))
|
||||
end
|
||||
coroutine.yield(measurements)
|
||||
coroutine.resume(child, measurements)
|
||||
measurements = coroutine.yield()
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
function gc(source)
|
||||
return coroutine.wrap(function()
|
||||
function gc(child)
|
||||
return coroutine.create(function(measurements)
|
||||
while true do
|
||||
local measurements = source()
|
||||
posix.syslog(31, tostring(collectgarbage('count')*1024)..' bytes of memory used by Lua before garbage collection cycle')
|
||||
collectgarbage() -- force a complete garbage collection cycle
|
||||
posix.syslog(31, tostring(collectgarbage('count')*1024)..' bytes of memory used by Lua after garbage collection cycle')
|
||||
coroutine.yield(measurements)
|
||||
coroutine.resume(child, measurements)
|
||||
measurements = coroutine.yield()
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
function debug()
|
||||
return coroutine.create(function(measurements)
|
||||
while true do
|
||||
dbg.vardump(measurements)
|
||||
measurements = coroutine.yield()
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
@ -144,21 +154,22 @@ end
|
|||
-- filter: sweep recursively to filter all redundant entries
|
||||
-- send: report the measurements to the server via xmlrpc
|
||||
-- gc: perform a full garbage collection cycle
|
||||
-- debug: dump measurements table to stdout
|
||||
|
||||
local aggregator = gc(
|
||||
send(
|
||||
filter(
|
||||
filter(
|
||||
filter(
|
||||
buffer(
|
||||
receive(param.device, param.pwraddress, param.pwrport, param.pwrenable)
|
||||
, param.interval)
|
||||
, 60, 0)
|
||||
, 900, 7200)
|
||||
, 86400, 172800)
|
||||
, param.xmlrpcaddress, param.xmlrpcversion, param.xmlrpcmethod)
|
||||
)
|
||||
local chain = receive(
|
||||
buffer(
|
||||
filter(
|
||||
filter(
|
||||
filter(
|
||||
send(
|
||||
gc(
|
||||
debug()
|
||||
)
|
||||
, param.xmlrpcaddress, param.xmlrpcversion, param.xmlrpcmethod)
|
||||
, 86400, 172800)
|
||||
, 900, 7200)
|
||||
, 60, 0)
|
||||
, param.interval)
|
||||
, param.device, param.pwraddress, param.pwrport, param.pwrenable)
|
||||
|
||||
while true do
|
||||
dbg.vardump(aggregator())
|
||||
end
|
||||
coroutine.resume(chain)
|
||||
|
|
Loading…
Reference in New Issue