added more test cases and implemented more commands, had to change some, since newer firmware versions lost some cmds

This commit is contained in:
Stefan Kögl 2014-03-08 19:26:17 +01:00
parent 1d5a54aca7
commit 1bda5388d6
2 changed files with 121 additions and 48 deletions

View file

@ -56,14 +56,16 @@ class OSC2CamServer(SimpleOSCServer):
return cams
def handleConnResult(self, connection, client_address=None, cmd=None):
def handleConnResult(self, connection, *args, **kwargs):
conn_result = connection.getresponse()
if client_address is not None:
if "client_address" in kwargs:
response = OSCMessage("/Response")
response.appendTypedArg(cmd, "s")
response.appendTypedArg(kwargs["cmd"], "s")
for arg in args:
response.append(arg)
response.appendTypedArg(conn_result.status, "i")
response.appendTypedArg(conn_result.reason, "s")
self.socket.sendto(response.encode_osc(), client_address)
self.socket.sendto(response.encode_osc(), kwargs["client_address"])
if conn_result.status != 200:
print "%s. Error: %d, %s" % (datetime.now().strftime("%x %X"), conn_result.status, conn_result.reason)
@ -89,15 +91,21 @@ class OSC2CamServer(SimpleOSCServer):
direction = args[0]
now = datetime.now().strftime("%x %X")
if direction in ("home", "up", "down", "left", "right", "upleft", "upright", "downleft", "downright", "repeat", "stop"):
if direction in ("home", "up", "down", "left", "right", "upleft", "upright", "downleft", "downright"):
print "%s: move camera %d to: dir %r" % (now, cam_id, direction)
connection = self.connections[cam_id]
connection.request("GET", "%s?move=%s" % (self.ptz_ctl_url, direction))
self.handleConnResult(connection, client_address, "moveCam")
url = "%s?move=%s" % (self.ptz_ctl_url, direction)
try:
repeat = args[1] == "repeat"
url += "?move=repeat"
except IndexError:
pass
connection.request("GET", url)
self.handleConnResult(connection, *args, client_address=client_address, cmd="moveCam")
def use_cam_preset(self, cam_id, args):
def use_cam_preset(self, cam_id, args, client_address):
""" says given ip cam to use a predefined position preset"""
presetno = args[0]
@ -106,33 +114,43 @@ class OSC2CamServer(SimpleOSCServer):
print "%s: use camera %d preset %d" % (now, cam_id, presetno)
connection.request("GET", "%s?gotoserverpresetno=%d" % (self.ptz_ctl_url, presetno))
self.handleConnResult(connection, client_address, "useCamPreset")
self.handleConnResult(connection, *args, client_address=client_address, cmd="useCamPreset")
def set_cam_preset(self, cam_id, args):
def set_cam_preset(self, cam_id, args, client_address):
""" saves the actual position of given ip cam to a preset"""
presetno = args[0]
connection = self.connections[cam_id]
connection.request("GET", "%s?setserverpresetno=%d&home=yes" % (self.ptz_config_url, presetno))
self.handleConnResult(connection, client_address, "setCamPreset")
connection.request("GET", "%s?setserverpresetno=%d" % (self.ptz_config_url, presetno))
self.handleConnResult(connection, *args, client_address=client_address, cmd="setCamPreset")
def zoom_cam(self, cam_id, args):
def zoom_cam(self, cam_id, args, client_address):
""" tells given ip cam to zoom in or out"""
direction = None
arg = args[0]
if arg == "out":
direction = 0
elif arg == "in":
direction = 1
step = args[0]
now = datetime.now().strftime("%x %X")
print "%s: zoom camera %d to position %d" % (now, cam_id, step)
connection = self.connections[cam_id]
connection.request("GET", "%s?zoom=%s" % (self.ptz_ctl_url, direction))
self.handleConnResult(connection, client_address, "zoomCam")
connection.request("GET", "%s?zoom=%d" % (self.ptz_ctl_url, step))
self.handleConnResult(connection, *args, client_address=client_address, cmd="zoomCam")
def toggle_night_view(self, cam_id, args):
def focus_cam(self, cam_id, args, client_address):
""" tells given ip cam to zoom in or out"""
step = args[0]
now = datetime.now().strftime("%x %X")
print "%s: focus camera %d to position %d" % (now, cam_id, step)
connection = self.connections[cam_id]
connection.request("GET", "%s?focus=%d" % (self.ptz_ctl_url, step))
self.handleConnResult(connection, *args, client_address=client_address, cmd="focusCam")
def toggle_night_view(self, cam_id, args, client_address):
""" toggles the night view function of given ip cam"""
arg = args[0]
@ -144,27 +162,40 @@ class OSC2CamServer(SimpleOSCServer):
connection = self.connections[cam_id]
connection.request("GET", "%sparam?action=update&Image.I0.Appearance.NightMode=%s" % (self.set_url, state))
self.handleConnResult(connection, client_address, "toggleNightView")
self.handleConnResult(connection, *args, client_address=client_address, cmd="toggleNightView")
def stop_cam(self, cam_id, args, client_address):
"""stops cam movement"""
now = datetime.now().strftime("%x %X")
print "%s: stop camera %d" % (now, cam_id)
connection = self.connections[cam_id]
connection.request("GET", "%s?move=stop" % (self.ptz_ctl_url))
self.handleConnResult(connection, *args, client_address=client_address, cmd="stopCam")
def dispatchMessage(self, osc_address, typetags, args, packet, client_address):
""" dispatches parsed osc messages to the ip cam command methods"""
print "client_address", client_address
cam_id = args.pop(0)
if osc_address == "/moveCam":
self.move_cam(cam_id, args, client_address)
elif osc_address == "/setCamPreset":
self.set_cam_preset(cam_id, args)
self.set_cam_preset(cam_id, args, client_address)
elif osc_address == "/useCamPreset":
self.use_cam_preset(cam_id, args)
self.use_cam_preset(cam_id, args, client_address)
elif osc_address == "/zoomCam":
self.zoom_cam(cam_id, args)
self.zoom_cam(cam_id, args, client_address)
elif osc_address == "/focusCam":
self.zoom_cam(cam_id, args, client_address)
elif osc_address == "/stopCam":
self.zoom_cam(cam_id, args, client_address)
elif osc_address == "/toggleNightView":
self.toggle_night_view(cam_id, args)
self.toggle_night_view(cam_id, args, client_address)
elif osc_address == "/subscribe":
self.__subscription_handler(osc_address, typetags, args, packet, client_address)
def __subscription_handler(self, addr, typetags, args, client_address):
"""handles a target subscription.

View file

@ -20,8 +20,6 @@
import socket, argparse, sys
from chaosc.simpleOSCServer import SimpleOSCServer
try:
from chaosc.c_osc_lib import OSCMessage, decode_osc
@ -30,27 +28,66 @@ except ImportError, e:
from chaosc.osc_lib import OSCMessage, decode_osc
def send(sock, binary, args):
sent = sock.sendto(binary, (args.osc2cam_host, args.osc2cam_port))
response = sock.recv(4096)
if response:
osc_address, typetags, arguments = decode_osc(response, 0, len(response))
print osc_address, arguments
def test_moveCam(args, cam_id, sock):
directions = ("home", "up", "down", "left", "right", "upleft", "upright", "downleft", "downright", "repeat", "stop")
def test_move_cam(args, cam_id, sock):
directions = ("home", "up", "down", "left", "right", "upleft", "upright", "downleft", "downright")
for direction in directions:
moveCam = OSCMessage("/moveCam")
moveCam.appendTypedArg(cam_id, "i")
moveCam.appendTypedArg(direction, "s")
binary = moveCam.encode_osc()
message = OSCMessage("/moveCam")
message.appendTypedArg(cam_id, "i")
message.appendTypedArg(direction, "s")
binary = message.encode_osc()
send(sock, binary, args)
try:
sent = sock.sendto(binary, (args.osc2cam_host, args.osc2cam_port))
except socket.error, e:
if e[0] in (7, 65): # 7 = 'no address associated with nodename', 65 = 'no route to host'
raise e
else:
raise Exception("while sending OSCMessage 'moveCam' for cam_id %r with direction %r to %r:%r: %s" % (cam_id, direction, args.osc2cam_host, args.osc2cam_port, str(e)))
response = sock.recv(4096)
if response:
osc_address, typetags, arguments = decode_osc(response, 0, len(response))
print osc_address, arguments
def test_use_cam_preset(args, cam_id, sock):
for i in range(10):
message = OSCMessage("/useCamPreset")
message.appendTypedArg(cam_id, "i")
message.appendTypedArg(i, "i")
binary = message.encode_osc()
send(sock, binary, args)
def test_set_cam_preset(args, cam_id, sock):
message = OSCMessage("/setCamPreset")
message.appendTypedArg(cam_id, "i")
message.appendTypedArg(0, "i")
binary = message.encode_osc()
send(sock, binary, args)
def test_zoom_cam(args, cam_id, sock):
for i in (1, 5000, 9999):
message = OSCMessage("/zoomCam")
message.appendTypedArg(cam_id, "i")
message.appendTypedArg(i, "i")
binary = message.encode_osc()
send(sock, binary, args)
def test_focus_cam(args, cam_id, sock):
for i in (1, 5000, 9999):
message = OSCMessage("/focusCam")
message.appendTypedArg(cam_id, "i")
message.appendTypedArg(i, "i")
binary = message.encode_osc()
send(sock, binary, args)
def test_toggle_night_view(args, cam_id, sock):
for i in ("on", "off"):
message = OSCMessage("/toggleNightView")
message.appendTypedArg(cam_id, "i")
message.appendTypedArg(i, "s")
binary = message.encode_osc()
send(sock, binary, args)
@ -87,6 +124,11 @@ def main():
#sock.connect((args.osc2cam_host, args.osc2cam_port))
for ix, (host, port) in enumerate(cams):
test_moveCam(args, ix, sock)
test_move_cam(args, ix, sock)
test_use_cam_preset(args, ix, sock)
test_set_cam_preset(args, ix, sock)
test_zoom_cam(args, ix, sock)
test_focus_cam(args, ix, sock)
test_toggle_night_view(args, ix, sock)
main()