From 25c49be89284972a798a16dab0096240326f48fb Mon Sep 17 00:00:00 2001 From: Bart Van Der Meerssche Date: Mon, 19 Apr 2010 23:34:07 +0200 Subject: [PATCH] Reversed the chaining order in the flukso daemon --- openwrt/package/flukso/src/flukso.lua | 81 +++++++++++++++------------ 1 file changed, 46 insertions(+), 35 deletions(-) diff --git a/openwrt/package/flukso/src/flukso.lua b/openwrt/package/flukso/src/flukso.lua index b4df185..9bdea33 100755 --- a/openwrt/package/flukso/src/flukso.lua +++ b/openwrt/package/flukso/src/flukso.lua @@ -3,6 +3,7 @@ -- -- flukso.lua: flukso deamon running on openwrt -- Copyright (c) 2008-2009 jokamajo.org +-- 2010 flukso.net -- -- This program is free software; you can redistribute it and/or -- modify it under the terms of the GNU General Public License @@ -37,8 +38,8 @@ local param = {xmlrpcaddress = 'http://logger.flukso.net/xmlrpc', device = '/dev/ttyS0', interval = 300} -function receive(device, pwraddress, pwrport, pwrenable) - return coroutine.wrap(function() +function receive(child, device, pwraddress, pwrport, pwrenable) + return coroutine.create(function() -- open the connection to the syslog deamon, specifying our identity posix.openlog('flukso') posix.syslog(30, 'starting the flukso deamon') @@ -59,7 +60,7 @@ function receive(device, pwraddress, pwrport, pwrenable) os.execute('gpioctl set 4 > /dev/null') local meter, value = line:sub(5, 36), tonumber(line:sub(38)) - coroutine.yield(meter, os.time(), value) + coroutine.resume(child, meter, os.time(), value) elseif line:sub(1, 3) == 'pwr' and line:len() == 47 and line:find(':') == 37 then -- user data + additional data integrity checks if pwrenable then udp:send(line) end elseif line:sub(1, 3) == 'msg' then -- control data @@ -74,36 +75,35 @@ function receive(device, pwraddress, pwrport, pwrenable) end) end -function buffer(source, interval) - return coroutine.wrap(function() +function buffer(child, interval) + return coroutine.create(function(meter, timestamp, value) local measurements = data.new() local threshold = os.time() + interval while true do - local meter, timestamp, value = source() if meter ~= nil and timestamp > 1234567890 then measurements:add(meter, timestamp, value) end if timestamp > threshold and next(measurements) then --checking whether table is not empty - coroutine.yield(measurements) + coroutine.resume(child, measurements) threshold = timestamp + interval end + meter, timestamp, value = coroutine.yield() end end) end -function filter(source, span, offset) - return coroutine.wrap(function() +function filter(child, span, offset) + return coroutine.create(function(measurements) while true do - local measurements = source() measurements:filter(span, offset) - coroutine.yield(measurements) + coroutine.resume(child, measurements) + measurements = coroutine.yield() end end) end -function send(source, address, version, method) - return coroutine.wrap(function() +function send(child, address, version, method) + return coroutine.create(function(measurements) while true do - local measurements = source() local auth = auth.new() auth:load() auth:hmac(measurements) @@ -122,19 +122,29 @@ function send(source, address, version, method) else posix.syslog(27, tostring(ret_or_err)..' '..address..' '..tostring(res)) end - coroutine.yield(measurements) + coroutine.resume(child, measurements) + measurements = coroutine.yield() end end) end -function gc(source) - return coroutine.wrap(function() +function gc(child) + return coroutine.create(function(measurements) while true do - local measurements = source() posix.syslog(31, tostring(collectgarbage('count')*1024)..' bytes of memory used by Lua before garbage collection cycle') collectgarbage() -- force a complete garbage collection cycle posix.syslog(31, tostring(collectgarbage('count')*1024)..' bytes of memory used by Lua after garbage collection cycle') - coroutine.yield(measurements) + coroutine.resume(child, measurements) + measurements = coroutine.yield() + end + end) +end + +function debug() + return coroutine.create(function(measurements) + while true do + dbg.vardump(measurements) + measurements = coroutine.yield() end end) end @@ -144,21 +154,22 @@ end -- filter: sweep recursively to filter all redundant entries -- send: report the measurements to the server via xmlrpc -- gc: perform a full garbage collection cycle +-- debug: dump measurements table to stdout -local aggregator = gc( - send( - filter( - filter( - filter( - buffer( - receive(param.device, param.pwraddress, param.pwrport, param.pwrenable) - , param.interval) - , 60, 0) - , 900, 7200) - , 86400, 172800) - , param.xmlrpcaddress, param.xmlrpcversion, param.xmlrpcmethod) - ) +local chain = receive( + buffer( + filter( + filter( + filter( + send( + gc( + debug() + ) + , param.xmlrpcaddress, param.xmlrpcversion, param.xmlrpcmethod) + , 86400, 172800) + , 900, 7200) + , 60, 0) + , param.interval) + , param.device, param.pwraddress, param.pwrport, param.pwrenable) -while true do - dbg.vardump(aggregator()) -end +coroutine.resume(chain)