r67928 - in /branches/upstream/libnet-stomp-perl/current: CHANGES MANIFEST META.yml Makefile.PL lib/Net/Stomp.pm lib/Net/Stomp/Frame.pm xt/ xt/consume_buffering.t

ansgar at users.alioth.debian.org ansgar at users.alioth.debian.org
Thu Jan 27 13:35:36 UTC 2011


Author: ansgar
Date: Thu Jan 27 13:35:18 2011
New Revision: 67928

URL: http://svn.debian.org/wsvn/pkg-perl/?sc=1&rev=67928
Log:
[svn-upgrade] new version libnet-stomp-perl (0.39)

Added:
    branches/upstream/libnet-stomp-perl/current/xt/
    branches/upstream/libnet-stomp-perl/current/xt/consume_buffering.t
Modified:
    branches/upstream/libnet-stomp-perl/current/CHANGES
    branches/upstream/libnet-stomp-perl/current/MANIFEST
    branches/upstream/libnet-stomp-perl/current/META.yml
    branches/upstream/libnet-stomp-perl/current/Makefile.PL
    branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp.pm
    branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp/Frame.pm

Modified: branches/upstream/libnet-stomp-perl/current/CHANGES
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libnet-stomp-perl/current/CHANGES?rev=67928&op=diff
==============================================================================
--- branches/upstream/libnet-stomp-perl/current/CHANGES (original)
+++ branches/upstream/libnet-stomp-perl/current/CHANGES Thu Jan 27 13:35:18 2011
@@ -1,4 +1,18 @@
 Revision history for Perl module Net::Stomp:
+
+0.39 Fri Jan 14 15:59:27 GMT 2011
+  - Cope with EOF during receive_frame better 
+
+0.38_99 Fri Aug 27 00:11:08 BST 2010
+  - Reimplemented Net::Stomp::receive_frame() to properly handle STOMP frames
+    that have extraneous line feed characters after the NULL terminator, before
+    the next frame header (e.g., ActiveMQ).  (Implemented by Michael S. Fischer)
+  - Improve reading performace by reading large chunks and buffering
+  - Read buffer size can now be specified in Net::Stomp->new() with the
+    'bufsize' key.
+  - Possible incompatible change - Net::Stomp::Frame#parse method removed.
+    Hopefully no one was using this. If you were, RTFS and look at
+    Net::Stomp#receive_frame method.
 
 0.38 Tue Aug 03 13:58:10 BST 2010
   - Fix an issue with IO::Handle buffering interfering with IO::Select. This

Modified: branches/upstream/libnet-stomp-perl/current/MANIFEST
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libnet-stomp-perl/current/MANIFEST?rev=67928&op=diff
==============================================================================
--- branches/upstream/libnet-stomp-perl/current/MANIFEST (original)
+++ branches/upstream/libnet-stomp-perl/current/MANIFEST Thu Jan 27 13:35:18 2011
@@ -10,4 +10,5 @@
 README
 t/pod.t
 t/pod_coverage.t
+xt/consume_buffering.t
 Makefile.PL

Modified: branches/upstream/libnet-stomp-perl/current/META.yml
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libnet-stomp-perl/current/META.yml?rev=67928&op=diff
==============================================================================
--- branches/upstream/libnet-stomp-perl/current/META.yml (original)
+++ branches/upstream/libnet-stomp-perl/current/META.yml Thu Jan 27 13:35:18 2011
@@ -1,7 +1,7 @@
 ---
 abstract: 'A Streaming Text Orientated Messaging Protocol Client'
 author:
-  - "Leon Brocard <acme at astray.com>.\nThom May <thom.may at betfair.com>.\nAsh Berlin <ash_github at firemirror.com>."
+  - "Leon Brocard <acme at astray.com>,\nThom May <thom.may at betfair.com>,\nAsh Berlin <ash_github at firemirror.com>,\nMichael S. Fischer <michael at dynamine.net>"
 configure_requires:
   Module::Build: 0.36
 generated_by: 'Module::Build version 0.3603'
@@ -13,7 +13,7 @@
 provides:
   Net::Stomp:
     file: lib/Net/Stomp.pm
-    version: 0.38
+    version: 0.39
   Net::Stomp::Frame:
     file: lib/Net/Stomp/Frame.pm
 requires:
@@ -22,4 +22,4 @@
   IO::Socket::INET: 0
 resources:
   license: http://dev.perl.org/licenses/
-version: 0.38
+version: 0.39

Modified: branches/upstream/libnet-stomp-perl/current/Makefile.PL
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libnet-stomp-perl/current/Makefile.PL?rev=67928&op=diff
==============================================================================
--- branches/upstream/libnet-stomp-perl/current/Makefile.PL (original)
+++ branches/upstream/libnet-stomp-perl/current/Makefile.PL Thu Jan 27 13:35:18 2011
@@ -2,15 +2,15 @@
 use ExtUtils::MakeMaker;
 WriteMakefile
 (
+          'PL_FILES' => {},
+          'INSTALLDIRS' => 'site',
           'NAME' => 'Net::Stomp',
+          'EXE_FILES' => [],
           'VERSION_FROM' => 'lib/Net/Stomp.pm',
           'PREREQ_PM' => {
-                           'Class::Accessor::Fast' => '0',
                            'IO::Select' => '0',
-                           'IO::Socket::INET' => '0'
-                         },
-          'INSTALLDIRS' => 'site',
-          'EXE_FILES' => [],
-          'PL_FILES' => {}
+                           'IO::Socket::INET' => '0',
+                           'Class::Accessor::Fast' => '0'
+                         }
         )
 ;

Modified: branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp.pm
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp.pm?rev=67928&op=diff
==============================================================================
--- branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp.pm (original)
+++ branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp.pm Thu Jan 27 13:35:18 2011
@@ -6,15 +6,20 @@
 use Net::Stomp::Frame;
 use Carp;
 use base 'Class::Accessor::Fast';
-our $VERSION = '0.38';
+our $VERSION = '0.39';
+
 __PACKAGE__->mk_accessors( qw(
     _cur_host failover hostname hosts port select serial session_id socket ssl
-    ssl_options subscriptions _connect_headers
+    ssl_options subscriptions _connect_headers bufsize
 ) );
 
 sub new {
     my $class = shift;
     my $self  = $class->SUPER::new(@_);
+
+    $self->bufsize(8192) unless $self->bufsize;
+
+    $self->{_framebuf} = "";
 
     # We are not subscribed to anything at the start
     $self->subscriptions( {} );
@@ -127,7 +132,7 @@
         $self->socket->close;
     }
     eval { $self->_get_connection };
-    while ($@) { 
+    while ($@) {
         sleep(5);
         eval { $self->_get_connection };
     }
@@ -139,6 +144,16 @@
 
 sub can_read {
     my ( $self, $conf ) = @_;
+
+    # If there is any data left in the framebuffer that we haven't read, return
+    # 'true'. But we don't want to spin endlessly, so only return true the
+    # first time. (Anything touching the _framebuf should update this flag when
+    # it does something.
+    if ( $self->{_framebuf_changed} && length $self->{_framebuf} ) {
+        $self->{_framebuf_changed} = 0;
+        return 1;
+    }
+
     $conf ||= {};
     my $timeout = exists $conf->{timeout} ? $conf->{timeout} : undef;
     return $self->select->can_read($timeout) || 0;
@@ -229,7 +244,7 @@
     my ( $self, $frame ) = @_;
 
     #     warn "send [" . $frame->as_string . "]\n";
-    $self->socket->print( $frame->as_string );
+    $self->socket->syswrite( $frame->as_string );
     my $connected = $self->socket->connected;
     unless (defined $connected) {
         $self->_reconnect;
@@ -237,30 +252,98 @@
     }
 }
 
+sub _read_data {
+    my ($self, $timeout) = @_;
+
+    return unless $self->select->can_read($timeout);
+    my $len = $self->socket->sysread($self->{_framebuf},
+                                     $self->bufsize,
+                                     length($self->{_framebuf} || ''));
+
+    if ($len > 0) {
+        $self->{_framebuf_changed} = 1;
+    }
+    else {
+        # EOF detected - connection is gone. We have to reset the framebuf in
+        # case we had a partial frame in there that will never arrive.
+        $self->{_framebuf} = "";
+        delete $self->{_command};
+        delete $self->{_headers};
+    }
+    return $len;
+}
+
+sub _read_headers {
+    my ($self) = @_;
+
+    if ($self->{_framebuf} =~ s/^\n*([^\n].*?)\n\n//s) {
+        $self->{_framebuf_changed} = 1;
+        my $raw_headers = $1;
+        if ($raw_headers =~ s/^(.+)\n//) {
+            $self->{_command} = $1;
+        }
+        foreach my $line (split(/\n/, $raw_headers)) {
+            my ($key, $value) = split(/\s*:\s*/, $line, 2);
+            $self->{_headers}->{$key} = $value;
+        }
+        return 1;
+    }
+    return 0;
+}
+
+sub _read_body {
+    my ($self) = @_;
+
+    my $h = $self->{_headers};
+    if ($h->{'content-length'}) {
+        if (length($self->{_framebuf}) >= $h->{'content-length'}) {
+            $self->{_framebuf_changed} = 1;
+            my $body = substr($self->{framebuf},
+                              0,
+                              $h->{'content-length'},
+                              undef );
+
+            # Trim the trailer off the frame.
+            $self->{_framebuf} =~ s/^.*?\000\n*//s;
+            return Net::Stomp::Frame->new({
+                command => delete $self->{_command},
+                headers => delete $self->{_headers},
+                body => $body
+            });
+        }
+    } elsif ($self->{_framebuf} =~ s/^(.*?)\000\n*//s) {
+        # No content-length header.
+
+        my $body = $1;
+        $self->{_framebuf_changed} = 1;
+        return Net::Stomp::Frame->new({
+              command => delete $self->{_command},
+              headers => delete $self->{_headers},
+              body => $body });
+    }
+
+    return 0;
+}
+
 sub receive_frame {
     my ($self, $conf) = @_;
 
-    my $frame;
-    while (!$frame) {
-
-        # If the user passed in { timeout => 1 } then we wait for up to a
-        # second to read something. If we get no data in that time, then return
-        # undef.
-
-        # But if we get an error (cos we aren't connected) then we should
-        # reconnect and try again.
-        if ( $self->can_read($conf) ) {
-            eval {
-                $frame = Net::Stomp::Frame->parse( $self->socket );
-                1;
-            } or $self->_reconnect;
-        }
-        else {
-            return;
-        }
-    }
-    #     warn "receive [" . $frame->as_string . "]\n";
-    return $frame;
+    my $timeout = exists $conf->{timeout} ? $conf->{timeout} : undef;
+
+    my $connected = $self->socket->connected;
+    unless (defined $connected) {
+        $self->_reconnect;
+    }
+
+    my $done = 0;
+    while ( not $done = $self->_read_headers ) {
+        return undef unless $self->_read_data($timeout);
+    }
+    while ( not $done = $self->_read_body ) {
+        return undef unless $self->_read_data($timeout);
+    }
+
+    return $done;
 }
 
 sub _get_next_transaction {
@@ -396,7 +479,7 @@
 You may also pass in 'client-id', which specifies the JMS Client ID which is
 used in combination to the activemqq.subscriptionName to denote a durable
 subscriber.
-  
+
   $stomp->connect( { login => 'hello', passcode => 'there' } );
 
 =head2 send
@@ -457,17 +540,17 @@
 non-durable topics by dropping old messages - we can set a maximum
 pending limit which once a slow consumer backs up to this high water
 mark we begin to discard old messages.
- 
+
 'activemq.noLocal': Specifies whether or not locally sent messages
 should be ignored for subscriptions. Set to true to filter out locally
 sent messages.
- 
+
 'activemq.prefetchSize': Specifies the maximum number of pending
 messages that will be dispatched to the client. Once this maximum is
 reached no more messages are dispatched until the client acknowledges
 a message. Set to 1 for very fair distribution of messages across
 consumers where processing messages can be slow.
- 
+
 'activemq.priority': Sets the priority of the consumer so that
 dispatching can be weighted in priority order.
 
@@ -493,7 +576,7 @@
 
 =head2 receive_frame
 
-This blocks and returns you the next Stomp frame. 
+This blocks and returns you the next Stomp frame.
 
   my $frame = $stomp->receive_frame;
   warn $frame->body; # do something here
@@ -508,8 +591,8 @@
 
 =head2 can_read
 
-This returns whether a frame is waiting to be read. Optionally takes a
-timeout in seconds:
+This returns whether there is new data is waiting to be read from the STOMP
+server. Optionally takes a timeout in seconds:
 
   my $can_read = $stomp->can_read;
   my $can_read = $stomp->can_read({ timeout => '0.1' });
@@ -538,17 +621,19 @@
 
 L<Net::Stomp::Frame>.
 
-=head1 AUTHOR
-
-Leon Brocard <acme at astray.com>.
-Thom May <thom.may at betfair.com>.
-Ash Berlin <ash_github at firemirror.com>.
+=head1 AUTHORS
+
+Leon Brocard <acme at astray.com>,
+Thom May <thom.may at betfair.com>,
+Ash Berlin <ash_github at firemirror.com>,
+Michael S. Fischer <michael at dynamine.net>
 
 =head1 COPYRIGHT
 
 Copyright (C) 2006-9, Leon Brocard
 Copyright (C) 2009, Thom May, Betfair.com
 Copyright (C) 2010, Ash Berlin, Net-a-Porter.com
+Copyright (C) 2010, Michael S. Fischer
 
 This module is free software; you can redistribute it or modify it
 under the same terms as Perl itself.

Modified: branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp/Frame.pm
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp/Frame.pm?rev=67928&op=diff
==============================================================================
--- branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp/Frame.pm (original)
+++ branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp/Frame.pm Thu Jan 27 13:35:18 2011
@@ -15,7 +15,7 @@
             my $self = shift;
             $self->headers->{$header} = shift if @_;
             $self->headers->{$header};
-            }
+        };
     }
 }
 
@@ -40,64 +40,6 @@
     $frame .= "\n";
     $frame .= $body || '';
     $frame .= "\000";
-}
-
-# NBK - $sock->getline does buffered IO which screws up select.  Use
-# sysread one char at a time to avoid reading part of the next line.
-sub _readline {
-    my($self, $socket, $terminator, $msg) = @_;
-
-    $terminator = "\n" unless defined($terminator);
-    $msg ||= "";
-
-    my $s = "";
-    while( 1 ) {
-        $socket->sysread($s, 1, length($s)) or die("Error reading $msg: $!");
-        last if substr($s, -1) eq $terminator;
-    }
-
-    return $s;
-}
-
-sub parse {
-    my ( $package, $socket ) = @_;
-    local $/ = "\n";
-
-    # read the command
-    my $command;
-    while (1) {
-        $command = $package->_readline($socket, "\n", "command");
-        chop $command;
-        last if $command;
-    }
-
-    # read headers
-    my $headers;
-    while (1) {
-        my $line = $package->_readline($socket, "\n", "header");
-        chop $line;
-        last if $line eq "";
-        my ( $key, $value ) = split(/: ?/, $line, 2);
-        $headers->{$key} = $value;
-    }
-
-    # read the body
-    my $body;
-    my $c;
-    if ( $headers->{"content-length"} ) {
-        $socket->sysread( $body, $headers->{"content-length"} + 1 )
-            || die "Error reading body: $!";
-        $headers->{bytes_message} = 1;
-    } else {
-        $body = $package->_readline($socket, "\000", "body");
-    }
-    # strip trailing null
-    $body =~ s/\000$//;
-    
-    my $frame = Net::Stomp::Frame->new(
-        { command => $command, headers => $headers, body => $body } );
-
-    return $frame;
 }
 
 1;

Added: branches/upstream/libnet-stomp-perl/current/xt/consume_buffering.t
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libnet-stomp-perl/current/xt/consume_buffering.t?rev=67928&op=file
==============================================================================
--- branches/upstream/libnet-stomp-perl/current/xt/consume_buffering.t (added)
+++ branches/upstream/libnet-stomp-perl/current/xt/consume_buffering.t Thu Jan 27 13:35:18 2011
@@ -1,0 +1,61 @@
+# -*- Mode: Perl; tab-width: 2; indent-tabs-mode: nil; -*-
+use strict;
+use warnings;
+
+use Test::More;
+use Test::Exception;
+use Net::Stomp;
+
+my $amount_to_send = 32;
+my $broker_host = '0.0.0.0';
+
+note "About to insert $amount_to_send messages - be patient";
+lives_ok {
+    local $SIG{ALRM} = sub { die "failed to send $amount_to_send messages\n" }; # NB: \n required
+    alarm(10);
+
+    my $connection = Net::Stomp->new({
+        hostname => $broker_host,
+        port     => 61613,
+    });
+    $connection->connect;
+
+    for my $n (1 .. $amount_to_send) {
+        $connection->send({
+            destination => '/queue/consume.many',
+            body => "Message $n - " . scalar localtime
+        })
+    }
+} q{inserted multiple jobs without dying};
+note "Inserted $amount_to_send messages - cheers";
+
+# now try to peel off messages
+use Net::Stomp;
+my $connection;
+lives_ok {
+    $connection = Net::Stomp->new({
+        hostname => $broker_host,
+        debug => 1,
+        port     => 61613,
+    });
+    $connection->connect;
+    $connection->subscribe({
+        destination => '/queue/consume.many',
+        ack         => 'client',
+    });
+} q{connected using Net::Stomp};
+
+note "About to consume $amount_to_send frames - be patient";
+lives_ok {
+    local $SIG{ALRM} = sub { die "failed to consume $amount_to_send messages\n" }; # NB: \n required
+    alarm(10);
+
+    for (1 .. $amount_to_send) {
+        my $frame = $connection->receive_frame;
+        note "read frame $_: " . $frame->body;
+        $connection->ack({frame => $frame});
+    }
+} qq{read $amount_to_send frames};
+note "Consumed $amount_to_send frames - cheers";
+
+done_testing;




More information about the Pkg-perl-cvs-commits mailing list