Add remove_callbacks()
[irspy-moved-to-github.git] / lib / ZOOM / Pod.pm
index 122a108..3e2f9ce 100644 (file)
@@ -1,4 +1,4 @@
-# $Id: Pod.pm,v 1.9 2006-05-12 13:28:40 mike Exp $
+# $Id: Pod.pm,v 1.21 2006-09-27 12:48:20 mike Exp $
 
 package ZOOM::Pod;
 
 
 package ZOOM::Pod;
 
@@ -8,7 +8,7 @@ use warnings;
 use ZOOM;
 
 BEGIN {
 use ZOOM;
 
 BEGIN {
-    # Just register the name
+    # Just register the names: this doesn't turn the levels on
     ZOOM::Log::mask_str("pod");
     ZOOM::Log::mask_str("pod_unhandled");
 }
     ZOOM::Log::mask_str("pod");
     ZOOM::Log::mask_str("pod_unhandled");
 }
@@ -72,7 +72,6 @@ handle errors.
  $pod = new ZOOM::Pod("bagel.indexdata.com/gils",
                       "bagel.indexdata.com/marc");
 
  $pod = new ZOOM::Pod("bagel.indexdata.com/gils",
                       "bagel.indexdata.com/marc");
 
-
 Creates a new pod containing one or more connections.  Each connection
 may be specified either by an existing C<ZOOM::Connection> object,
 which I<must> be asynchronous; or by a ZOOM target string, in which
 Creates a new pod containing one or more connections.  Each connection
 may be specified either by an existing C<ZOOM::Connection> object,
 which I<must> be asynchronous; or by a ZOOM target string, in which
@@ -82,12 +81,20 @@ Returns the new pod.
 
 =cut
 
 
 =cut
 
+# Functionality to be added:
+#
+#      If the constructor's first argument is a number, then it is
+#      taken as a limit on the number of connections to handle at any
+#      one time.  In this case, the pod initially multiplexes between
+#      the first I<n> connections, and brings further connections
+#      into the active subset whenever already-active connections are
+#      closed.
+
 sub new {
     my $class = shift();
     my(@conn) = @_;
 
     die "$class with no connections" if @conn == 0;
 sub new {
     my $class = shift();
     my(@conn) = @_;
 
     die "$class with no connections" if @conn == 0;
-    my @state; # Hashrefs with application state associated with connections
     foreach my $conn (@conn) {
        if (!ref $conn) {
            $conn = new ZOOM::Connection($conn, 0, async => 1);
     foreach my $conn (@conn) {
        if (!ref $conn) {
            $conn = new ZOOM::Connection($conn, 0, async => 1);
@@ -95,12 +102,10 @@ sub new {
            # server.  Such errors are caught later, by the _check()
            # call in wait(). 
        }
            # server.  Such errors are caught later, by the _check()
            # call in wait(). 
        }
-       push @state, {};
     }
 
     return bless {
        conn => \@conn,
     }
 
     return bless {
        conn => \@conn,
-       state => \@state,
        rs => [],
        callback => {},
     }, $class;
        rs => [],
        callback => {},
     }, $class;
@@ -146,8 +151,8 @@ events, by multiple invocations of C<callback()>.
 
 When an event occurs during the execution of C<wait()>, the relevant
 callback function is called with four arguments: the connection that the
 
 When an event occurs during the execution of C<wait()>, the relevant
 callback function is called with four arguments: the connection that the
-event happened on; a state hash-reference associated with the
-connection; the result-set associated with the connection; and the
+event happened on; the argument that was passed into C<wait()>;
+the result-set associated with the connection (if there is one); and the
 event-type (so that a single function that handles events of multiple
 types can switch on the code where necessary).  The callback function
 can handle the event as it wishes, finishing up by returning an
 event-type (so that a single function that handles events of multiple
 types can switch on the code where necessary).  The callback function
 can handle the event as it wishes, finishing up by returning an
@@ -158,7 +163,7 @@ C<wait()>.
 So a simple event-handler might look like this:
 
  sub got_event {
 So a simple event-handler might look like this:
 
  sub got_event {
-      ($conn, $state, $rs, $event) = @_;
+      ($conn, $arg, $rs, $event) = @_;
       print "event $event on connection ", $conn->option("host"), "\n";
       print "Found ", $rs->size(), " records\n"
          if $event == ZOOM::Event::RECV_SEARCH;
       print "event $event on connection ", $conn->option("host"), "\n";
       print "Found ", $rs->size(), " records\n"
          if $event == ZOOM::Event::RECV_SEARCH;
@@ -177,7 +182,7 @@ the exception using C<die $exception>.
 So a simple error-handler might look like this:
 
  sub got_error {
 So a simple error-handler might look like this:
 
  sub got_error {
-      ($conn, $state, $rs, $exception) = @_;
+      ($conn, $arg, $rs, $exception) = @_;
       if ($exception->isa("ZOOM::Exception")) {
           print "Caught error $exception - continuing";
           return 0;
       if ($exception->isa("ZOOM::Exception")) {
           print "Caught error $exception - continuing";
           return 0;
@@ -185,13 +190,13 @@ So a simple error-handler might look like this:
       die $exception;
  }
 
       die $exception;
  }
 
-The C<$state> argument is a reference to an initially empty hash,
-which the application can use as it sees fit, to store its own
-connection-relation information.  For example, an application might
-use C<$state-E<gt>{last}> to keep a record of which was the last record
-retrieved from the associated connection.  The pod module itself does
-not use the state hash at all, and applications are also welcome to
-ignore it if they do not need it.
+The C<$arg> argument could be anything at all - it is whatever the
+application code passed into C<wait()>.  For example, it could be
+a reference to a hash indexed by the host string of the connections to
+yield some per-connection state information.
+An application might use such information
+to keep a record of which was the last record
+retrieved from the associated connection.
 
 =cut
 
 
 =cut
 
@@ -200,12 +205,25 @@ sub callback {
     my($event, $sub) = @_;
 
     my $old = $this->{callback}->{$event};
     my($event, $sub) = @_;
 
     my $old = $this->{callback}->{$event};
-    $this->{callback}->{$event} = $sub
-       if defined $sub;
+    $this->{callback}->{$event} = $sub;
 
     return $old;
 }
 
 
     return $old;
 }
 
+=head2 remove_callbacks()
+
+ $pod->remove_callbacks();
+
+Removes all registed callbacks from the pod.  This is useful when the
+pod has completed one operation and is about to start the next.
+
+=cut
+
+sub remove_callbacks {
+    my $this = shift();
+    $this->{callback} = {};
+}
+
 =head2 search_pqf()
 
  $pod->search_pqf("@attr 1=1003 wedel");
 =head2 search_pqf()
 
  $pod->search_pqf("@attr 1=1003 wedel");
@@ -223,8 +241,8 @@ have one search active on it at a time: this allows the pod to
 maintain the one-to-one mapping between connections and result-sets.
 Submitting a new search on a connection before the old one has
 completed will result in a total failure in the nature of causality,
 maintain the one-to-one mapping between connections and result-sets.
 Submitting a new search on a connection before the old one has
 completed will result in a total failure in the nature of causality,
-and the spontaneous existence-failure of the universe.  Do not do
-this.
+and the spontaneous existence-failure of the universe.  Try to avoid
+doing this too often.
 
 =cut
 
 
 =cut
 
@@ -233,19 +251,24 @@ sub search_pqf {
     my($pqf) = @_;
 
     foreach my $i (0..@{ $this->{conn} }-1) {
     my($pqf) = @_;
 
     foreach my $i (0..@{ $this->{conn} }-1) {
-       $this->{rs}->[$i] = $this->{conn}->[$i]->search_pqf($pqf);
+       my $conn = $this->{conn}->[$i];
+       $this->{rs}->[$i] = $conn->search_pqf($pqf)
+           if !$conn->option("pod_omit");
     }
 }
 
 =head2 wait()
 
  $err = $pod->wait();
     }
 }
 
 =head2 wait()
 
  $err = $pod->wait();
+ # or
+ $err = $pod->wait($arg);
  die "$pod->wait() failed with error $err" if $err;
 
 Waits for events on the connections that make up the pod, usually
 continuing until there are no more events left and then returning
 zero.  Whenever an event occurs, a callback function is dispatched as
  die "$pod->wait() failed with error $err" if $err;
 
 Waits for events on the connections that make up the pod, usually
 continuing until there are no more events left and then returning
 zero.  Whenever an event occurs, a callback function is dispatched as
-described above; if
+described above; if an argument was passed to C<wait()>, then that
+same argument is also passed to each callback invocation.  If
 that function returns a non-zero value, then C<wait()> terminates
 immediately, whether or not any events remain, and returns that value.
 
 that function returns a non-zero value, then C<wait()> terminates
 immediately, whether or not any events remain, and returns that value.
 
@@ -257,36 +280,61 @@ indicates whether C<wait()> should continue (return-value 0) or return
 immediately (any other value).  Exception-handling callbacks may of
 course re-throw the exception.
 
 immediately (any other value).  Exception-handling callbacks may of
 course re-throw the exception.
 
+Connections that have the C<pod_omit> option set are omitted from
+consideration.  This is useful if, for example, a connection that is
+part of a pod is known to have encountered an unrecoverable error.
+
 =cut
 
 sub wait {
     my $this = shift();
 =cut
 
 sub wait {
     my $this = shift();
+    my($arg) = @_;
+
     my $res = 0;
 
     my $res = 0;
 
-    while ((my $i = ZOOM::event($this->{conn})) != 0) {
+    while (1) {
+       my @conn;
+       my @idxmap; # maps indexes into conn to global indexes
+       foreach my $i (0 .. @{ $this->{conn} }-1) {
+           my $conn = $this->{conn}->[$i];
+           if ($conn->option("pod_omit")) {
+               #ZOOM::Log::log("pod", "connection $i omitted (",
+                              #$conn->option("host"), ")");
+             } else {
+                 push @conn, $conn;
+                 push @idxmap, $i;
+                 #ZOOM::Log::log("pod", "connection $i included (",
+                                #$conn->option("host"), ")");
+             }
+       }
+
+       last if @conn == 0;
+       my $i0 = ZOOM::event(\@conn);
+       last if $i0 == 0;
+       my $i = 1+$idxmap[$i0-1];
        my $conn = $this->{conn}->[$i-1];
        my $conn = $this->{conn}->[$i-1];
+       die "connection-mapping screwup" if $conn ne $conn[$i0-1];
+
        my $ev = $conn->last_event();
        my $evstr = ZOOM::event_str($ev);
        my $ev = $conn->last_event();
        my $evstr = ZOOM::event_str($ev);
-       ZOOM::Log::log("pod", "connection ", $i-1, ": $evstr");
+       ZOOM::Log::log("pod", "connection ", $i-1, ": event $ev ($evstr)");
 
        eval {
            $conn->_check();
        }; if ($@) {
            my $sub = $this->{callback}->{exception};
            die $@ if !defined $sub;
 
        eval {
            $conn->_check();
        }; if ($@) {
            my $sub = $this->{callback}->{exception};
            die $@ if !defined $sub;
-           $res = &$sub($conn, $this->{state}->[$i-1],
-                        $this->{rs}->[$i-1], $@);
+           $res = &$sub($conn, $arg, $this->{rs}->[$i-1], $@);
            last if $res != 0;
            next;
        }
 
        my $sub = $this->{callback}->{$ev};
        if (defined $sub) {
            last if $res != 0;
            next;
        }
 
        my $sub = $this->{callback}->{$ev};
        if (defined $sub) {
-           $res = &$sub($conn, $this->{state}->[$i-1],
-                        $this->{rs}->[$i-1], $ev);
+           $res = &$sub($conn, $arg, $this->{rs}->[$i-1], $ev);
            last if $res != 0;
        } else {
            last if $res != 0;
        } else {
-           ZOOM::Log::log("pod_unhandled", "unhandled event $ev ($evstr)");
+           ZOOM::Log::log("pod_unhandled", "connection ", $i-1, ": unhandled event $ev ($evstr)");
        }
     }
 
        }
     }