[libmessage-passing-zeromq-perl] 50/78: More updates - stress test and HWM

Jonas Smedegaard js at alioth.debian.org
Mon Sep 30 09:28:26 UTC 2013


This is an automated email from the git hooks/post-receive script.

js pushed a commit to branch master
in repository libmessage-passing-zeromq-perl.

commit 3c06284e76ab14e0116b971730a4ae4d761be644
Author: Tomas Doran <bobtfish at bobtfish.net>
Date:   Sun Jun 10 13:09:10 2012 +0100

    More updates - stress test and HWM
---
 Changes                                       |    6 ++
 lib/Message/Passing/Input/ZeroMQ.pm           |    3 +-
 lib/Message/Passing/Output/ZeroMQ.pm          |    5 +-
 lib/Message/Passing/ZeroMQ/Role/HasASocket.pm |   34 ++++++------
 t/stress.t                                    |   73 +++++++++++++++++++++++++
 5 files changed, 100 insertions(+), 21 deletions(-)

diff --git a/Changes b/Changes
index 10cd476..4f2b799 100644
--- a/Changes
+++ b/Changes
@@ -1,5 +1,11 @@
     - Improve documentation.
 
+    - Fix incorrect docs to do with linger option
+
+    - Add ability to override default high water marks on sockets.
+
+    - Increase default high water mark for the output
+
 0.004
     - Changes to match up with Message::Passing 0.006
 
diff --git a/lib/Message/Passing/Input/ZeroMQ.pm b/lib/Message/Passing/Input/ZeroMQ.pm
index bc96287..df01350 100644
--- a/lib/Message/Passing/Input/ZeroMQ.pm
+++ b/lib/Message/Passing/Input/ZeroMQ.pm
@@ -19,10 +19,11 @@ has '+_socket' => (
 
 sub _socket_type { 'SUB' }
 
+sub _build_socket_hwm { 100000 }
+
 after setsockopt => sub {
     my ($self, $socket) = @_;
     $socket->setsockopt(ZMQ_SUBSCRIBE, '');
-    $socket->setsockopt(ZMQ_HWM, 100000); # Buffer up to 100k messages.
 };
 
 sub _try_rx {
diff --git a/lib/Message/Passing/Output/ZeroMQ.pm b/lib/Message/Passing/Output/ZeroMQ.pm
index 5e22eb7..d5d1a1b 100644
--- a/lib/Message/Passing/Output/ZeroMQ.pm
+++ b/lib/Message/Passing/Output/ZeroMQ.pm
@@ -13,10 +13,7 @@ has '+_socket' => (
 
 sub _socket_type { 'PUB' }
 
-after setsockopt => sub {
-    my ($self, $socket) = @_;
-    $socket->setsockopt(ZMQ_HWM, 1000); # Buffer up to 100k messages.
-};
+sub _build_socket_hwm { 10000 }
 
 sub consume {
     my $self = shift;
diff --git a/lib/Message/Passing/ZeroMQ/Role/HasASocket.pm b/lib/Message/Passing/ZeroMQ/Role/HasASocket.pm
index 222d07f..63cd1d4 100644
--- a/lib/Message/Passing/ZeroMQ/Role/HasASocket.pm
+++ b/lib/Message/Passing/ZeroMQ/Role/HasASocket.pm
@@ -49,7 +49,16 @@ sub _build_socket {
     $socket;
 }
 
-sub setsockopt {}
+has socket_hwm => (
+    is => 'ro',
+    isa => 'Int',
+    builder => '_build_socket_hwm',
+);
+
+sub setsockopt {
+    my ($self, $socket) = @_;
+    $socket->setsockopt(ZMQ_HWM, $self->socket_hwm);
+};
 
 has socket_bind => (
     is => 'ro',
@@ -122,24 +131,17 @@ The pair of PUSH, receives a proportion of messages distributed.
 
 Bool indicating the value of the ZMQ_LINGER options.
 
-Defaults to 0 meaning sockets are lossy, but will not block.
-
-=head3 linger off (default)
-
-Sending messages will be buffered on the client side up to the set
-buffer for this connection. Further messages will be dropped until
-the buffer starts to empty.
+Defaults to 0 meaning sockets will not block on shutdown if a server
+is unavailable (i.e. queued messages will be discarded).
 
-Receiving messages will be buffered by ZeroMQ for you until you're
-ready to receive them, after which they will be discarded.
+=head3 socket_hwm
 
-=head3 linger off
+Set the High Water Mark for the socket. Depending on the socket type,
+messages are likely to be discarded once this high water mark is exceeded
+(i.e. there are more than this many messages buffered).
 
-Sending messages will be be buffered on the client side up to the set
-buffer for this connection. If this buffer fills, then ZeroMQ will block
-the program which was trying to send the message. If the client quits
-before all messages were sent, ZeroMQ will block exit until they have been
-sent.
+A value of 0 disables the high water mark, meaning that messages will be
+buffered until RAM runs out.
 
 =head1 METHODS
 
diff --git a/t/stress.t b/t/stress.t
new file mode 100644
index 0000000..12a8da5
--- /dev/null
+++ b/t/stress.t
@@ -0,0 +1,73 @@
+use strict;
+use warnings;
+use Test::More;
+use AnyEvent;
+use JSON qw/ encode_json /;
+use Message::Passing::Input::ZeroMQ;
+use Message::Passing::Output::Test;
+use Message::Passing::Output::ZeroMQ;
+use Message::Passing::Filter::Decoder::JSON;
+use Time::HiRes qw( gettimeofday tv_interval );
+
+my $parent = $$;
+
+our $ITRS = 100000;
+
+sub _receiver_child {
+    my $i = 0;
+    my $cv = AnyEvent->condvar;
+    my $input = Message::Passing::Input::ZeroMQ->new(
+        socket_bind => 'tcp://*:5558',
+        socket_hwm => 0,
+        output_to => Message::Passing::Filter::Decoder::JSON->new(output_to => Message::Passing::Output::Test->new(
+            cb => sub {
+                $i++;
+                $cv->send if $i > $::ITRS;
+            },
+        )),
+    );
+    $cv->recv;
+    exit 0;
+}
+
+sub _sender_child {
+    my $output = Message::Passing::Output::ZeroMQ->new(
+        connect => 'tcp://127.0.0.1:5558',
+        linger => 1,
+        socket_hwm => 0,
+    );
+
+    my $run = sub {
+        $output->consume(encode_json {foo => 'bar'}) for 0..$::ITRS;
+        exit 0;
+    };
+    local $SIG{USR1} = sub { goto $run };
+    while (1) { sleep 1 }
+};
+
+my $receiver_pid = fork;
+if ($receiver_pid) { # Parent
+}
+else { # Child
+    _receiver_child();
+}
+
+my $sender_pid = fork;
+if ($sender_pid) { #Parent
+}
+else { # Child
+    _sender_child();
+}
+sleep 2; # Wait for children to spin up.
+my $t0 = [gettimeofday];
+#print "the code took:",timestr($td),"\n";
+kill('USR1', $sender_pid);
+
+is waitpid($sender_pid, 0), $sender_pid;
+is waitpid($receiver_pid, 0), $receiver_pid;
+my $elapsed = tv_interval ( $t0, [gettimeofday]);
+diag "Took " . $elapsed . "s for " . $ITRS . " iterations";
+diag $ITRS/$elapsed . " messages per second";
+
+done_testing;
+

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-perl/packages/libmessage-passing-zeromq-perl.git



More information about the Pkg-perl-cvs-commits mailing list