queue_procdef and process_procdef. Both work
from the queue which is part of the state of the process. The queue_procdef function takes a procdef
or piece of one and adds it to the queue. Simple enough. The process_procdef function, then, does
the bulk of the work. It runs through the queue, looking at each queued item in turn. If the "block" attribute
of an item is "yes", then it's skipped; otherwise it is consumed. Various actions, of course, can then queue
other items. The function returns only when the queue is empty (at which point the process is complete) or everything
in the queue is blocked, meaning that there are only tasks waiting on further input.
Let's look at queue_procdef first.
XML * queue_procdef (XML * session, XML * datasheet, XML * action, const char * where, int extrabit)
{
XML * item;
XML * queue = xml_loc (datasheet, ".state.queue");
char * locbuf;
char * mark;
int idcount;
if (action == NULL) return NULL;
idcount = wftk_value_counter (session, datasheet, "idcount");
item = xml_create("item");
xml_setnum (item, "id", idcount);
xml_set (item, "type", xml_name(action));
locbuf = xml_getlocbuf (action);
mark = strstr (locbuf, ".workflow");
if (mark) {
mark ++;
mark = strchr (mark, '.');
}
if (mark) {
xml_set (item, "loc", mark+1);
free (locbuf);
} else {
xml_set_nodup (item, "loc", locbuf);
}
xml_setf (item, "where", where, extrabit);
xml_set (item, "oncomplete", xml_attrval (action, "oncomplete"));
xml_set (item, "fortask", xml_attrval (action, "fortask"));
xml_append (queue, item);
return (item);
}
|
process_procdef, then talk about why it's the way it is, and then we'll fill in the various actions afterwards.
(May 27, 2002) Once a workflow finishes, we need to check whether it was a top-level workflow.
If so, then we delete the workflow. (We may log this event in the enactment at some point. TODO) If this was
the start task of a defined workflow, then we have to queue up the defined workflow as well.
void process_procdef(XML * session, XML * datasheet, XML * state, XML * queue)
{
XML * item;
XML * workflow;
XML * procdef;
XML * def;
XML * holder;
XML * mark;
XML * task;
XML * data;
XML * next;
XML * newfield;
XML * enactment;
char * defn;
char * value;
const char * type;
int count;
int keep;
WFTK_ADAPTORLIST * adlist;
item = xml_firstelem (queue);
repos_log (session, 5, 2, NULL, "wfcore", "process_procdef running");
/*log_xml_object (session, 6, 2, "wfcore", "QUEUE", queue);*/
while (item != NULL) {
if (!strcmp("yes", xml_attrval(item, "block"))) {
item = xml_nextelem(item);
continue;
}
/*log_xml_object (session, 6, 2, "wfcore", "ITEM", item);*/
workflow = xml_loc (datasheet, xml_attrval (item, "where"));
if (!workflow) {
repos_log (session, 1, 2, NULL, "wfcore", "Unable to find workflow %s in object %s:%s",
xml_attrval (item, "where"),
xml_attrval (datasheet, "dsrep"),
xml_attrval (datasheet, "key"));
item = xml_nextelem (item);
continue;
} /* TODO: saner error behavior. */
/*log_xml_object (session, 7, 2, "wfcore", "WORKFLOW", workflow);*/
procdef = _procdef_load (session, workflow);
if (!procdef) {
repos_log (session, 1, 2, NULL, "wfcore", "Unable to find procdef for workflow %s in object %s:%s",
xml_attrval (item, "where"),
xml_attrval (datasheet, "dsrep"),
xml_attrval (datasheet, "key"));
item = xml_nextelem (item);
continue;
} /* TODO: saner error behavior. */
def = xml_loc (procdef, xml_attrval (item, "loc"));
if (!def) {
repos_log (session, 1, 2, NULL, "wfcore", "Unable to locate item %s in procdef for workflow %s object %s:%s",
xml_attrval (item, "loc"),
xml_attrval (item, "where"),
xml_attrval (datasheet, "dsrep"),
xml_attrval (datasheet, "key"));
item = xml_nextelem (item);
continue;
} /* TODO: saner error behavior. */
type = xml_attrval (item, "type");
keep = 0;
next = NULL;
if (!strcmp (type, "parallel")) {
See Handling parallel
} else if (!strcmp (type, "task")) {
See Handling task
} else if (!strcmp (type, "action")) {
See Handling action
} else if (!strcmp (type, "data")) {
See Handling data
} else if (!strcmp (type, "situation")) {
See Handling situation
} else if (!strcmp (type, "decide")) {
See Handling decide
} else if (!strcmp (type, "alert")) {
See Handling alerts
} else if (!strcmp (type, "start")) {
See start: Starting subprocesses
} else { /* Treat it as a sequence. */
See Handling sequence
}
if (keep) {
xml_set (item, "block", "yes");
item = xml_nextelem(item);
} else {
if (*xml_attrval (item, "oncomplete")) {
_status_set (session, datasheet, xml_attrval (item, "oncomplete"), 0);
}
if (*xml_attrval (item, "parent")) {
next = xml_locf (queue, ".item[%s]", xml_attrval (item, "parent"));
xml_delete (item);
item = next;
xml_set (item, "block", "no");
} else { /* The workflow piece has finished (no parent item). */
holder = NULL;
if (*xml_attrval (item, "fortask")) {
holder = xml_locf (datasheet, ".state.queue.item[%s]", xml_attrval (item, "fortask"));
if (holder) { /* A workflow task; we'll just handle that case right in the interpreter. */
} else { /* Maybe it's an ad-hoc task or something; wftk_task_complete can handle it. */
holder = xml_create ("task");
xml_set (holder, "id", xml_attrval (item, "fortask"));
xml_set (holder, "dsrep", xml_attrval (datasheet, "repository"));
xml_set (holder, "process", xml_attrval (datasheet, "id"));
wftk_task_complete (session, holder);
xml_delete (holder);
holder = NULL;
}
}
mark = xml_parent (def);
if (!xml_parent (mark)) {
/* Defined workflow has completed. We delete it unless it contains state transitions. */
if (strcmp (xml_attrval (workflow, "state"), "yes")) xml_delete_pretty (workflow);
} else if (mark == workflow) {
/* Ad-hoc or startup workflow has completed. */
if (*xml_attrval (workflow, "procdef")) {
/* Startup workflow for a defined procdef. */
mark = xml_first (workflow);
while (mark) {
xml_delete (mark);
mark = xml_first (workflow);
}
procdef = _procdef_load (session, workflow);
holder = queue_procdef (session, datasheet, procdef, xml_attrval (item, "where"), 0);
} else {
/* Ad-hoc workflow has completed. */
if (strcmp (xml_attrval (workflow, "state"), "yes")) xml_delete_pretty (workflow);
}
}
xml_delete (item);
item = holder;
}
}
}
}
|
queue_procdef.
struct _state {
XML * datasheet;
XML * state;
XML * queue;
XML * procdef;
};
|
keep to 1.
if (!strcmp ("", xml_attrval (item, "cur"))) {
next = xml_firstelem (def);
} else {
mark = xml_loc (procdef, xml_attrval (item, "cur"));
next = xml_nextelem (mark);
}
if (next) {
holder = queue_procdef (session, datasheet, next, xml_attrval (item, "where"), 0);
xml_set (holder, "parent", xml_attrval (item, "id"));
xml_set (item, "cur", xml_attrval (holder, "loc"));
keep = 1;
}
|
if (!strcmp ("", xml_attrval (item, "remaining"))) {
count = 0;
next = xml_firstelem (def);
while (next != NULL) {
count ++;
xml_set (queue_procdef (session, datasheet, next, xml_attrval (item, "where"), 0), "parent", xml_attrval (item, "id"));
next = xml_nextelem (next);
}
} else {
count = xml_attrvalnum (item, "remaining");
count--;
}
xml_setnum (item, "remaining", count);
if (count > 0) keep = 1;
|
if (strcmp (xml_attrval (item, "block"), "resume")) {
/* Notify task indices of the new active task. */
task = xml_create ("task");
xml_set (task, "id", xml_attrval (item, "id"));
xml_set_nodup (task, "label", wftk_value_interpreta (session, datasheet, xml_attrval (def, "label")));
if (*xml_attrval (def, "role")) xml_set (task, "role", xml_attrval (def, "role"));
if (*xml_attrval (def, "user")) xml_set (task, "user", xml_attrval (def, "user"));
if (*xml_attrval (def, "tag")) xml_set (task, "tag", xml_attrval (def, "tag"));
xml_set_nodup (task, "priority", wftk_value_interpreta (session, datasheet, xml_attrval (def, "priority")));
if (!*xml_attrval (task, "user") && *xml_attrval (task, "role")) {
xml_set (task, "user", wftk_role_user (session, datasheet, xml_attrval (task, "role")));
}
xml_set (task, "process", xml_attrval (datasheet, "id"));
if (*xml_attrval (datasheet, "list")) {
xml_setf (task, "list", xml_attrval (datasheet, "list"));
xml_setf (task, "process", xml_attrval (datasheet, "key"));
}
mark = xml_firstelem (def);
while (mark) {
if (xml_is (mark, "data")) {
if (!*xml_attrval (mark, "storage") &&
!strchr (xml_attrval (mark, "id"), ':') &&
!wftk_value_find (session, datasheet, xml_attrval (mark, "id"))) { /* i.e. locally stored field which isn't there, */
newfield = wftk_value_make (session, datasheet, xml_attrval (mark, "id"));
xml_copyinto (newfield, mark);
}
}
mark = xml_nextelem (mark);
}
/*log_xml_object (session, 6, 2, "wfcore", "NEW TASK", task);*/
adlist = wftk_get_adaptorlist (session, TASKINDEX);
wftk_call_adaptorlist (adlist, "tasknew", task);
wftk_free_adaptorlist (session, adlist);
xml_free (task);
keep = 1; /* This blocks the current item, so that the active task will be available for retrieval. */
}
|
/* Notify task indices of the new active task. */
mark = xml_loc (procdef, xml_attrval (item, "loc"));
if (mark) {
holder = xml_copy (mark);
if (wftk_session_getuser (session)) {
xml_set (holder, "by", xml_attrval (wftk_session_getuser(session), "id"));
}
xml_set (holder, "dsrep", xml_attrval (datasheet, "dsrep"));
xml_set (holder, "process", xml_attrval (datasheet, "id"));
xml_set (holder, "pdrep", xml_attrval (procdef, "pdrep"));
xml_set (holder, "procdef", xml_attrval (procdef, "id"));
wftk_action (session, holder, datasheet);
if (!strcmp (xml_attrval (holder, "keep"), "yes")) { /* TODO: should not keeping be the default, or keeping? */
/* TODO: find enactment, stash copy. */
enactment = wftk_enactment (session, datasheet);
if (wftk_session_getuser(session)) {
xml_set (holder, "by", xml_attrval (wftk_session_getuser(session), "id"));
}
xml_set_nodup (holder, "at", _wftk_value_special (session, datasheet, "!now"));
xml_append_pretty (enactment, holder);
holder = NULL; /* Don't free holder now. */
}
}
|
data element is really just an assignment, nothing more. Along with decisions, I think that simple
assignment will be enough. I'll also write an eval data "source" which evaluates expressions, and that will give us
plenty of power. Scripts, of course, are actions.
(May 13, 2002): a long standing bug, found at last -- if the data item here processed is a top-level data item,
then it's already been handled during the wftk_process_define call. Setting already-set values back to blanks or defaults
is a bad thing.
(June 29, 2003): almost correct. If the data item is top-level but is after some other workflow items then
it's not a startup value, and should be handled anyway. Sooner or later I'm going to have a whole damned lot of details
worked out about workflow.
mark = xml_loc (procdef, xml_attrval (item, "loc"));
if (!xml_parent (xml_parent (mark))) {
holder = xml_firstelem (xml_parent (mark)); /* Scan top level to check whether a workflow item precedes this data. */
while (holder) {
if (xml_is (holder, "task") ||
xml_is (holder, "sequence") ||
xml_is (holder, "parallel") ||
xml_is (holder, "action") ||
xml_is (holder, "decide") ||
xml_is (holder, "situation") ||
xml_is (holder, "alert")) break; /* Found workflow: handle this data. */
if (holder == mark) break;
holder = xml_nextelem (holder);
}
if (holder == mark) mark = NULL; /* Set to "ignore" if we found this data item before we found other workflow. */
}
if (mark) {
defn = xmlobj_get_direct (mark);
value = wftk_value_interpreta (session, datasheet, defn);
free (defn);
wftk_value_set (session, datasheet, xml_attrval (mark, "id"), value);
free (value);
}
|
wftk_decide function, documented rather completely here.
if (!strcmp ("", xml_attrval (item, "cur"))) {
mark = xml_loc (procdef, xml_attrval (item, "loc"));
if (mark) {
holder = wftk_decide (session, datasheet, mark);
if (*xml_attrval (holder, "loc")) {
next = xml_loc (procdef, xml_attrval (holder, "loc"));
}
xml_free (holder);
}
}
if (next) {
xml_set (queue_procdef (session, datasheet, next, xml_attrval (item, "where"), 0), "parent", xml_attrval (item, "parent"));
keep = 1;
}
|
wftk_notify, which is basically a wrapper for notification adaptors.
The nice thing about this is that it allows us to expose the function itself, which makes it a convenient little
addition to the library.
wftk_notify (session, datasheet, def); |
| This code and documentation are released under the terms of the GNU license. They are copyright (c) 2000-2004, Vivtek. All rights reserved except those explicitly granted under the terms of the GNU license. |