####################################################################################################### # # The Python SOAP server is a really easy way to export workflow functionality via SOAP. # This is just the beginning; this is where we'll also be defining some WfMC interface stuff. # More information at http://www.vivtek.com/wftk/doc/code/python # # Copyright (c) 2003, Vivtek, and released under the GPL, as follows: # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # ####################################################################################################### |
import sys import SOAPpy from xmlapi import * import repmgr import re |
if len(sys.argv) > 1: repos = sys.argv[1] else: repos = 'system.defn' try: repository = xml_read (repos) except: print "Unable to read system definition file '%s'." % repos sys.exit (1) repmgr.open (repository, "SOAP server") |
def get (list_id, key, mode='map', auth=None, task_key=None): if mode == 'new': key = None if mode == 'edit' or mode == 'new' or mode == 'display': return xml_stringcontent (repmgr.form (repository, list_id, key, mode)) if task_key==None: defn = repmgr.defn (repository, list_id) rec = repmgr.get (repository, list_id, key) else: defn = repmgr.defn (repository, "_tasks") rec = repmgr.task_get (repository, list_id, key, task_key) if mode == 'full': return xml_string(rec) if mode == 'tasks': ret = [] tasklist = xml_create ("list"); xml_set (tasklist, "id", "_tasks"); repmgr.tasks_direct (repository, rec, tasklist); for t in xml_elements(tasklist): rec = {} rec['key'] = xml_attrval (t, 'id') rec['id'] = xml_attrval (t, 'id') rec['label'] = xml_attrval (t, 'label') rec['user'] = xml_attrval (t, 'user') rec['role'] = xml_attrval (t, 'role') ret.append (rec) return ret # Default = 'map' ret = {} for f in xml_elements(rec): if xml_is(f, 'field'): ret[xml_attrval (f, 'id')] = xmlobj_get (rec, defn, xml_attrval (f, 'id')) if task_key != None: ret['key'] = xml_attrval (rec, 'key') return ret def list (list_id, mode='keys', auth=None): keys = repmgr.list (repository, list_id) #if mode == return keys |
def build_obj (fields, fields_xml, loose_fields): if fields_xml == None: obj = xml_create ("record") else: obj = xml_parse (fields_xml.decode("utf8")) # TODO: Handle "fields" for k in loose_fields.keys(): if re.match ("^fld_", k): name = k[4:] xmlobj_set (obj, None, name, (loose_fields[k]).encode("latin")) return obj |
def add (list_id, fields=None, fields_xml=None, auth=None, **loose_fields): obj = build_obj (fields, fields_xml, loose_fields) repmgr.add (repository, list_id, obj) return repmgr.getkey (repository, list_id, obj) def mod (list_id, key, fields=None, fields_xml=None, auth=None, **loose_fields): obj = build_obj (fields, fields_xml, loose_fields) repmgr.merge (repository, list_id, obj, key) return repmgr.getkey (repository, list_id, obj) def merge (list_id, key, fields=None, fields_xml=None, auth=None, **loose_fields): obj = build_obj (fields, fields_xml, loose_fields) repmgr.merge (repository, list_id, obj, key) return repmgr.getkey (repository, list_id, obj) |
def delete (list_id, key, auth=None): ret = repmgr.delete (repository, list_id, key) return ret |
def tasks (auth=None): ret = [] tasklist = xml_create ("list"); xml_set (tasklist, "id", "_tasks"); repmgr.list (repository, tasklist); for t in xml_elements(tasklist): rec = {} for f in xml_elements(t): if xml_is(f, 'field'): rec[xml_attrval (f, 'id')] = xmlobj_get (t, None, xml_attrval (f, 'id')) ret.append (rec) return ret def todo (auth=None): ret = [] tasklist = xml_create ("list"); xml_set (tasklist, "id", "_todo"); repmgr.list (repository, tasklist); for t in xml_elements(tasklist): rec = {} for f in xml_elements(t): if xml_is(f, 'field'): rec[xml_attrval (f, 'id')] = xmlobj_get (t, None, xml_attrval (f, 'id')) ret.append (rec) return ret |
def auth(userid, password, mode='clear', auth=None): return userid |
#TODO: override server definitions on command line server_url = xmlobj_getconf (repository, "config.soap.server", "localhost") server_port = int(xmlobj_getconf (repository, "config.soap.port", "8000")) SOAPpy.Config.debug=int(xmlobj_getconf (repository, "config.soap.debug", "0")) # Set to '1' to see everything that happens, and I mean everything. server = SOAPpy.SOAPServer((server_url, server_port)) server.registerFunction(get) server.registerFunction(list) server.registerFunction(add) server.registerFunction(mod) server.registerFunction(merge) server.registerFunction(delete) server.registerFunction(tasks) server.registerFunction(todo) server.registerFunction(auth) print "Listening on %s:%d for requests against %s" % (server_url, server_port, repos) if SOAPpy.Config.debug==1: print "Debug mode ON" server.serve_forever() # TODO: maybe some error handling here. |
This code and documentation are released under the terms of the GNU license. They are copyright (c) 2003, Vivtek. All rights reserved except those explicitly granted under the terms of the GNU license. This presentation was prepared with LPML. Try literate programming. You'll like it. |