Synching data with remote data sources

Previous: Repository queries ] [ Top: Repository manager ] [ Next: Working with attachments ]

Of course, eventually we'll be creating an adaptor class for remote data sources, but for now I don't have time. So for the time being, all remote data sources will be remote instances of the command-line interface. I have that running rather nicely as an inetd-invoked program. That has got to be the easiest way to set up a server! And the repository manager is sufficiently low-usage that it doesn't really need to scale well, so that's cool.

Synching data with remote repositories is a fairly non-trivial thing. It's taken me several weeks just to get all the infrastructure working: remote connections, logging of changes, and so forth. Essentially, to pull modified data from a remote source, we do this:
  1. Determine the remote repository (by looking at list decoration or just being given the remote list.)
  2. Build a remote repository description.
  3. Opening a conversation with it.
  4. Asking for changes to our target list since the last time we pulled.
  5. Iterating through that list, retrieving data objects and writing them to the local list. This process also invokes the appropriate publishing processes.
For this to work, we have to know when the last pull took place. Using the convention that only logging repositories can mirror (which makes sense) we store this information in a file called [list].info in the logging directory. If we don't have a logging directory, then we have to use push_all and pull_all in lieu of delta-based mirroring, but that's a fair tradeoff.

Let's look at pulling first, then we'll move on to pushing. The function is pretty straightforward. Note, however, that for logic's sake, we set the push date to "now" after pulling all data -- otherwise we would have the sad effect that we would always try to upload everything we just downloaded, if we were to mix push and pull_all. We'll do the converse with push_all, for the same reason.
 
int _repos_pull_data (XML * repository, const char * list_id, const char * remote_id, int mode, XML * changelist);
WFTK_EXPORT int   repos_pull     (XML * repository, const char *list_id, const char *remote_id, XML * changelist)
{
   return _repos_pull_data (repository, list_id, remote_id, 0, changelist);
}
WFTK_EXPORT int   repos_pull_all     (XML * repository, const char *list_id, const char *remote_id, XML * changelist)
{
   XML * info;
   XML * last;
   struct tm * timeptr;
   time_t julian;
   char now[32];
   int ret = _repos_pull_data (repository, list_id, remote_id, 1, changelist);

   if (ret > 0) {
      info = xml_load ("_log/%s.info", list_id);
      if (xml_is (info, "xml-error")) {
         xml_free (info);
         info = NULL;
      }
      if (!info) {
         info = xml_create ("synchinfo");
      }
      last = NULL;
      last = xml_search (info, "field", "id", remote_id);
      if (!last) {
         last = xml_create ("field");
         xml_set (last, "id", remote_id);
         xml_append (info, last);
      }

      repos_mark_time (repository, "now");
      xml_set (last, "push", xml_attrval (repository, "now"));
      xml_save (info, "_log/%s.info", list_id);
      xml_free (info);
   }

   return ret;
}
int _repos_pull_data (XML * repository, const char *list_id, const char *remote_id, int mode, XML * changelist)
{
   XML * list;
   XML * remote;
   XML * info;
   XML * last;
   XML * changes;
   XML * cur;
   int count = 0;
   XML * obj;

   list = xml_search (repository, "list", "id", list_id);
   if (!list) {
      return -1;
   }

   if (!remote_id) {
      remote_id = xml_attrval (list, "pull");
      if (!*remote_id) {
         remote_id = xml_attrval (list, "mirror");
      }
      if (!*remote_id) {
         return -1;
      }
   }

   /* Only remote lists are supported right now. */
   remote = xml_search (repository, "remote", "host", remote_id);
   if (!remote) {
      remote = xml_create ("remote");
      xml_set (remote, "host", remote_id);
      xml_append (repository, remote);
   }
   repos_open (remote, NULL, "--remote--");
   if (*xml_attrval (remote, "error-state")) {
      xml_delete (remote);
      return -1;
   }

   /* OK.  Let's get a list of changes since our last pull. */
   info = xml_load ("_log/%s.info", list_id);
   if (xml_is (info, "xml-error")) {
      xml_free (info);
      info = NULL;
   }
   if (!info) {
      info = xml_create ("synchinfo");
   }
   last = NULL;
   last = xml_search (info, "field", "id", remote_id);
   if (!last) {
      last = xml_create ("field");
      xml_set (last, "id", remote_id);
      xml_set (last, "pull", "0000-00-00");
      xml_append (info, last);
   }

   if (changelist) changes = changelist;
   else            changes = xml_create ("list");
   if (!mode) {
      repos_changes (remote, changes, xml_attrval (last, "pull"), list_id);
   } else {
      xml_set (changes, "id", list_id);
      repos_list (remote, changes);
   }
   cur = xml_firstelem (changes);

   if (!cur) {
      if (!changelist) xml_free (changes);
      xml_free (info);
      return 0;
   }

   /* Now we write our current data into the info file. */
   repos_mark_time (remote, "now");
   repos_mark_time (repository, "now");
   xml_set (last, "pull", xml_attrval (remote, "now"));
   xml_set (last, "push", xml_attrval (repository, "now")); /* This is necessary in the case of a pull-fixup-synch process */
   xml_save (info, "_log/%s.info", list_id);

   /* Now we iterate along our change list and do the Right Thing for each change. */
   while (cur) {
      count++;
      if (!*xml_attrval (cur, "action") || !strcmp ("add", xml_attrval (cur, "action"))) {
         obj = repos_get (remote, list_id, xml_attrval (cur, "id"));
         repos_add (repository, list_id, obj);
         xml_free (obj);
      } else if (!strcmp ("del", xml_attrval (cur, "action"))) {
         repos_del (repository, list_id, xml_attrval (cur, "id"));
      } else if (!strcmp ("mod", xml_attrval (cur, "action"))) {
         obj = repos_get (remote, list_id, xml_attrval (cur, "id"));
         repos_merge (repository, list_id, obj, NULL); /* TODO: consider a list mode "synch-raw" which can force "mod" */
         xml_free (obj);
      }

      /* Note we can log other things without confusing our mirrors.  Might come in handy. */

      cur = xml_nextelem (cur);
   }

   if (!changelist) xml_free (changes);
   xml_free (info);

   return count;
}
Pushing is almost the same, except that we look at our own changelog instead of the remote one. I'm not at all sure that it's a good idea to set the pull date to the same as the recorded push date for push_all -- but frankly, if you're pushing and pulling both, it's probably a better idea to use synch anyway; at least this little date dance will prevent us from downloading everything we just uploaded if we're so foolhardy as to avoid synch.
 
int _repos_push_data (XML * repository, const char * list_id, const char * remote_id, int mode);
WFTK_EXPORT int   repos_push     (XML * repository, const char *list_id, const char *remote_id)
{
   return _repos_push_data (repository, list_id, remote_id, 0);
}
WFTK_EXPORT int   repos_push_all     (XML * repository, const char *list_id, const char *remote_id)
{
   XML * info;
   XML * last;
   int ret = _repos_push_data (repository, list_id, remote_id, 1);

   if (ret > 0) {
      info = xml_load ("_log/%s.info", list_id);
      if (xml_is (info, "xml-error")) {
         xml_free (info);
         info = NULL;
      }
      if (!info) {
         info = xml_create ("synchinfo");
      }
      last = NULL;
      last = xml_search (info, "field", "id", remote_id);
      if (!last) {
         last = xml_create ("field");
         xml_set (last, "id", remote_id);
         xml_append (info, last);
      }

      xml_set (last, "pull", xml_attrval (repository, "remote-time"));
      xml_save (info, "_log/%s.info", list_id);
      xml_free (info);
   }

   return ret;
}
int _repos_push_data (XML * repository, const char *list_id, const char *remote_id, int mode)
{
   XML * list;
   XML * remote;
   XML * info;
   XML * last;
   XML * changes;
   XML * cur;
   int count = 0;
   XML * obj;

   list = xml_search (repository, "list", "id", list_id);
   if (!list) {
      return -1;
   }

   if (!remote_id) {
      remote_id = xml_attrval (list, "push");
      if (!*remote_id) {
         remote_id = xml_attrval (list, "mirror");
      }
      if (!*remote_id) {
         return -1;
      }
   }

   /* Only remote lists are supported right now. */
   remote = xml_search (repository, "remote", "host", remote_id);
   if (!remote) {
      remote = xml_create ("remote");
      xml_set (remote, "host", remote_id);
      xml_append (repository, remote);
   }
   repos_open (remote, NULL, "--remote--");
   if (*xml_attrval (remote, "error-state")) {
      xml_delete (remote);
      return -1;
   }

   /* OK.  Let's get a list of changes since our last pull. */
   info = xml_load ("_log/%s.info", list_id);
   if (xml_is (info, "xml-error")) {
      xml_free (info);
      info = NULL;
   }
   if (!info) {
      info = xml_create ("synchinfo");
   }
   last = NULL;
   last = xml_search (info, "field", "id", remote_id);
   if (!last) {
      last = xml_create ("field");
      xml_set (last, "id", remote_id);
      xml_set (last, "push", "0000-00-00");
      xml_append (info, last);
   }

   changes = xml_create ("list");
   if (!mode) {
      repos_changes (repository, changes, xml_attrval (last, "push"), list_id);
   } else {
      xml_set (changes, "id", list_id);
      repos_list (repository, changes);
   }
   cur = xml_firstelem (changes);

   if (!cur) {
      xml_free (changes);
      xml_free (info);
      return 0;
   }

   /* Now we write our current data into the info file. */
   repos_mark_time (repository, "now");
   repos_mark_time (remote, "now");
   xml_set (repository, "remote-time", xml_attrval (remote, "now"));
   xml_set (last, "push", xml_attrval (repository, "now"));
   xml_save (info, "_log/%s.info", list_id);

   /* Now we iterate along our change list and do the Right Thing for each change. */
   while (cur) {
      count++;
      if (!*xml_attrval (cur, "action") || !strcmp ("add", xml_attrval (cur, "action"))) {
         obj = repos_get (repository, list_id, xml_attrval (cur, "id"));
         repos_add (remote, list_id, obj);
         xml_free (obj);
      } else if (!strcmp ("del", xml_attrval (cur, "action"))) {
         repos_del (remote, list_id, xml_attrval (cur, "id"));
      } else if (!strcmp ("mod", xml_attrval (cur, "action"))) {
         obj = repos_get (repository, list_id, xml_attrval (cur, "id"));
         repos_merge (remote, list_id, obj, NULL); /* TODO: consider a list mode "synch-raw" which can force "mod" */
         xml_free (obj);
      }

      /* Note we can log other things without confusing our mirrors.  Might come in handy. */

      cur = xml_nextelem (cur);
   }

   xml_free (changes);
   xml_free (info);

   return count;
}
And synching -- now here we run into something more interesting. To synch effectively, we first pull remote changes, then we push our local ones. Conflict resolution is something I'm not even going to get into yet. But to be safe, we really have to make sure that nobody makes changes to the remote repository while we're doing our synch, otherwise we could wind up missing some changes (or we'll always see and download our own changes). So we really need a lock mechanism. That'll come later.

Basically, synching is this:
  1. Get list of local changes
  2. Get list of remote changes
  3. Pull remote changes
  4. Push local changes (thus remote changes take precedence; at some point we will look at field-by-field merging)
  5. Set push and pull dates for list
March 12, 2002: Added the ability to retrieve a list of changes pulled (like the pull function above) so that local information can be updated outside the repository after new data comes down the wire.
 
WFTK_EXPORT int   repos_synch    (XML * repository, const char *list_id, const char *remote_id, XML * changelist)
{
   XML * list;
   XML * remote;
   XML * info;
   XML * last;
   XML * localchanges;
   XML * remotechanges;
   XML * cur;
   int count = 0;
   XML * obj;

   list = xml_search (repository, "list", "id", list_id);
   if (!list) {
      return -1;
   }

   if (!remote_id) {
      remote_id = xml_attrval (list, "mirror");
      if (!*remote_id) {
         if (*xml_attrval (list, "push")) return repos_push (repository, list_id, remote_id);
         if (*xml_attrval (list, "pull")) return repos_pull (repository, list_id, remote_id, NULL);
         return -1;
      }
   }

   /* Only remote lists are supported right now. */
   remote = xml_search (repository, "remote", "host", remote_id);
   if (!remote) {
      remote = xml_create ("remote");
      xml_set (remote, "host", remote_id);
      xml_append (repository, remote);
   }
   repos_open (remote, NULL, "--remote--");
   if (*xml_attrval (remote, "error-state")) {
      xml_delete (remote);
      return -1;
   }

   /* OK.  Let's get a list of changes since our last pull. */
   info = xml_load ("_log/%s.info", list_id);
   if (xml_is (info, "xml-error")) {
      xml_free (info);
      info = NULL;
   }
   if (!info) {
      info = xml_create ("synchinfo");
   }
   last = NULL;
   last = xml_search (info, "field", "id", remote_id);
   if (!last) {
      last = xml_create ("field");
      xml_set (last, "id", remote_id);
      xml_append (info, last);
   }
   if (!*xml_attrval (last, "push")) xml_set (last, "push", "0000-00-00");
   if (!*xml_attrval (last, "pull")) xml_set (last, "pull", "0000-00-00");

   if (changelist) remotechanges = changelist;
   else            remotechanges = xml_create ("list");
   repos_changes (remote, remotechanges, xml_attrval (last, "pull"), list_id);
   localchanges = xml_create ("list");
   repos_changes (repository, localchanges, xml_attrval (last, "push"), list_id);

   if (!xml_firstelem (remotechanges) && !xml_firstelem (localchanges)) {
      if (!changelist) xml_free (remotechanges);
      xml_free (localchanges);
      xml_free (info);
      return 0;
   }

   /* Now we iterate along our change lists and do the Right Thing for each change. */
   /* We first pull all changes from the remote list, then we push all changes from the local list. */
   cur = xml_firstelem (remotechanges);
   while (cur) {
      count++;
      if (!strcmp ("add", xml_attrval (cur, "action"))) {
         obj = repos_get (remote, list_id, xml_attrval (cur, "id"));
         repos_add (repository, list_id, obj);
         xml_free (obj);
      } else if (!strcmp ("del", xml_attrval (cur, "action"))) {
         repos_del (repository, list_id, xml_attrval (cur, "id"));
      } else if (!strcmp ("mod", xml_attrval (cur, "action"))) {
         obj = repos_get (remote, list_id, xml_attrval (cur, "id"));
         repos_merge (repository, list_id, obj, NULL); /* TODO: consider a list mode "synch-raw" which can force "mod" */
         xml_free (obj);
      }

      cur = xml_nextelem (cur);
   }
   cur = xml_firstelem (localchanges);
   while (cur) {
      count++;
      if (!strcmp ("add", xml_attrval (cur, "action"))) {
         obj = repos_get (repository, list_id, xml_attrval (cur, "id"));
         repos_add (remote, list_id, obj);
         xml_free (obj);
      } else if (!strcmp ("del", xml_attrval (cur, "action"))) {
         repos_del (remote, list_id, xml_attrval (cur, "id"));
      } else if (!strcmp ("mod", xml_attrval (cur, "action"))) {
         obj = repos_get (repository, list_id, xml_attrval (cur, "id"));
         repos_merge (remote, list_id, obj, NULL); /* TODO: consider a list mode "synch-raw" which can force "mod" */
         xml_free (obj);
      }

      cur = xml_nextelem (cur);
   }

   /* Now we write our current data into the info file.  Note that this allows us to avoid noticing
      our own changes next time we synch -- but without locking, it exposes us to missing some changes
      if somebody else synchs while we're busy.  That's why we need locking! */
   repos_mark_time (remote, "now");
   repos_mark_time (repository, "now");

   xml_set (last, "push", xml_attrval (repository, "now"));
   xml_set (last, "pull", xml_attrval (remote, "now"));
   xml_save (info, "_log/%s.info", list_id);

   xml_free (localchanges);
   if (!changelist) xml_free (remotechanges);
   xml_free (info);

   return count;
}
Finally, the repos_mark_time function is used to mark the local time of a repository. It acts upon the repository XML structure itself, for convenience. Note that it's clever enough to deal with remote repositories.
 
WFTK_EXPORT int   repos_mark_time (XML * repository, const char *attr)
{
   struct tm * timeptr;
   time_t julian;
   char now[32];
   const char * recv;
   char * mark;
   struct _repos_remote * sock = (struct _repos_remote *) xml_getbin (repository);

   if (sock) { /* Remote. */
      xml_set (sock->parms, "outgoing", "time\n");
      _repos_send (sock);
      xml_set (repository, attr, "");
      recv = _repos_receive (sock);
      if (*recv == '-') return 1;

      mark = strchr (recv + 6, '+');
      if (!mark) return 1;

      xml_attrncat (repository, attr, recv + 6, mark - recv - 7);
      return 0;
   }

   time (&julian);
   timeptr = localtime (&julian);
   sprintf (now, "%04d-%02d-%02d %02d:%02d:%02d",
                    timeptr->tm_year + 1900, timeptr->tm_mon + 1, timeptr->tm_mday,
                    timeptr->tm_hour, timeptr->tm_min, timeptr->tm_sec);
   xml_set (repository, attr, now);
}
Even though it's not technically a data synch function, it's certainly going to be used most often in the context of data synchronization, so it might as well be defined here.
Previous: Repository queries ] [ Top: Repository manager ] [ Next: Working with attachments ]


This code and documentation are released under the terms of the GNU license. They are copyright (c) 2001-2005, Vivtek. All rights reserved except those explicitly granted under the terms of the GNU license.