-# $Id: Pod.pm,v 1.11 2006-06-06 16:27:56 mike Exp $
+# $Id: Pod.pm,v 1.22 2006-10-06 11:33:07 mike Exp $
package ZOOM::Pod;
use ZOOM;
BEGIN {
- # Just register the name
+ # Just register the names: this doesn't turn the levels on
ZOOM::Log::mask_str("pod");
ZOOM::Log::mask_str("pod_unhandled");
}
which I<must> be asynchronous; or by a ZOOM target string, in which
case the pod module will make the connection object itself.
-If the constructor's first argument is a number, then it is taken as a
-limit on the number of connections to handle at any one time. In this
-case, the pod initially multiplexes between the first I<n>
-connections, and brings further connections into the active subset
-whenever already-active connections are closed.
-
Returns the new pod.
=cut
+# Functionality to be added:
+#
+# If the constructor's first argument is a number, then it is
+# taken as a limit on the number of connections to handle at any
+# one time. In this case, the pod initially multiplexes between
+# the first I<n> connections, and brings further connections
+# into the active subset whenever already-active connections are
+# closed.
+
sub new {
my $class = shift();
my(@conn) = @_;
die "$class with no connections" if @conn == 0;
- my @state; # Hashrefs with application state associated with connections
foreach my $conn (@conn) {
if (!ref $conn) {
$conn = new ZOOM::Connection($conn, 0, async => 1);
# server. Such errors are caught later, by the _check()
# call in wait().
}
- push @state, {};
}
return bless {
conn => \@conn,
- state => \@state,
rs => [],
callback => {},
}, $class;
}
+
+=head2 connections()
+
+ @c = $pod->connections();
+
+Returns a list of the connection objects in the pod.
+
+=cut
+
+sub connections {
+ my $this = shift();
+ return @{ $this->{conn} }
+}
+
+
=head2 option()
$oldElemSet = $pod->option("elementSetName");
When an event occurs during the execution of C<wait()>, the relevant
callback function is called with four arguments: the connection that the
-event happened on; a state hash-reference associated with the
-connection; the result-set associated with the connection; and the
+event happened on; the argument that was passed into C<wait()>;
+the result-set associated with the connection (if there is one); and the
event-type (so that a single function that handles events of multiple
types can switch on the code where necessary). The callback function
can handle the event as it wishes, finishing up by returning an
So a simple event-handler might look like this:
sub got_event {
- ($conn, $state, $rs, $event) = @_;
+ ($conn, $arg, $rs, $event) = @_;
print "event $event on connection ", $conn->option("host"), "\n";
print "Found ", $rs->size(), " records\n"
if $event == ZOOM::Event::RECV_SEARCH;
So a simple error-handler might look like this:
sub got_error {
- ($conn, $state, $rs, $exception) = @_;
+ ($conn, $arg, $rs, $exception) = @_;
if ($exception->isa("ZOOM::Exception")) {
print "Caught error $exception - continuing";
return 0;
die $exception;
}
-The C<$state> argument is a reference to an initially empty hash,
-which the application can use as it sees fit, to store its own
-connection-relation information. For example, an application might
-use C<$state-E<gt>{last}> to keep a record of which was the last record
-retrieved from the associated connection. The pod module itself does
-not use the state hash at all, and applications are also welcome to
-ignore it if they do not need it.
+The C<$arg> argument could be anything at all - it is whatever the
+application code passed into C<wait()>. For example, it could be
+a reference to a hash indexed by the host string of the connections to
+yield some per-connection state information.
+An application might use such information
+to keep a record of which was the last record
+retrieved from the associated connection.
=cut
my($event, $sub) = @_;
my $old = $this->{callback}->{$event};
- $this->{callback}->{$event} = $sub
- if defined $sub;
+ $this->{callback}->{$event} = $sub;
return $old;
}
+=head2 remove_callbacks()
+
+ $pod->remove_callbacks();
+
+Removes all registed callbacks from the pod. This is useful when the
+pod has completed one operation and is about to start the next.
+
+=cut
+
+sub remove_callbacks {
+ my $this = shift();
+ $this->{callback} = {};
+}
+
=head2 search_pqf()
$pod->search_pqf("@attr 1=1003 wedel");
maintain the one-to-one mapping between connections and result-sets.
Submitting a new search on a connection before the old one has
completed will result in a total failure in the nature of causality,
-and the spontaneous existence-failure of the universe. Do not do
-this.
+and the spontaneous existence-failure of the universe. Try to avoid
+doing this too often.
=cut
my($pqf) = @_;
foreach my $i (0..@{ $this->{conn} }-1) {
- $this->{rs}->[$i] = $this->{conn}->[$i]->search_pqf($pqf);
+ my $conn = $this->{conn}->[$i];
+ $this->{rs}->[$i] = $conn->search_pqf($pqf)
+ if !$conn->option("pod_omit");
}
}
=head2 wait()
$err = $pod->wait();
+ # or
+ $err = $pod->wait($arg);
die "$pod->wait() failed with error $err" if $err;
Waits for events on the connections that make up the pod, usually
continuing until there are no more events left and then returning
zero. Whenever an event occurs, a callback function is dispatched as
-described above; if
+described above; if an argument was passed to C<wait()>, then that
+same argument is also passed to each callback invocation. If
that function returns a non-zero value, then C<wait()> terminates
immediately, whether or not any events remain, and returns that value.
immediately (any other value). Exception-handling callbacks may of
course re-throw the exception.
+Connections that have the C<pod_omit> option set are omitted from
+consideration. This is useful if, for example, a connection that is
+part of a pod is known to have encountered an unrecoverable error.
+
=cut
sub wait {
my $this = shift();
+ my($arg) = @_;
+
my $res = 0;
- while ((my $i = ZOOM::event($this->{conn})) != 0) {
+ while (1) {
+ my @conn;
+ my @idxmap; # maps indexes into conn to global indexes
+ foreach my $i (0 .. @{ $this->{conn} }-1) {
+ my $conn = $this->{conn}->[$i];
+ if ($conn->option("pod_omit")) {
+ #ZOOM::Log::log("pod", "connection $i omitted (",
+ #$conn->option("host"), ")");
+ } else {
+ push @conn, $conn;
+ push @idxmap, $i;
+ #ZOOM::Log::log("pod", "connection $i included (",
+ #$conn->option("host"), ")");
+ }
+ }
+
+ last if @conn == 0;
+ my $i0 = ZOOM::event(\@conn);
+ last if $i0 == 0;
+ my $i = 1+$idxmap[$i0-1];
my $conn = $this->{conn}->[$i-1];
+ die "connection-mapping screwup" if $conn ne $conn[$i0-1];
+
my $ev = $conn->last_event();
my $evstr = ZOOM::event_str($ev);
ZOOM::Log::log("pod", "connection ", $i-1, ": event $ev ($evstr)");
}; if ($@) {
my $sub = $this->{callback}->{exception};
die $@ if !defined $sub;
- $res = &$sub($conn, $this->{state}->[$i-1],
- $this->{rs}->[$i-1], $@);
+ $res = &$sub($conn, $arg, $this->{rs}->[$i-1], $@);
last if $res != 0;
next;
}
my $sub = $this->{callback}->{$ev};
if (defined $sub) {
- $res = &$sub($conn, $this->{state}->[$i-1],
- $this->{rs}->[$i-1], $ev);
+ $res = &$sub($conn, $arg, $this->{rs}->[$i-1], $ev);
last if $res != 0;
} else {
ZOOM::Log::log("pod_unhandled", "connection ", $i-1, ": unhandled event $ev ($evstr)");