r67928 - in /branches/upstream/libnet-stomp-perl/current: CHANGES MANIFEST META.yml Makefile.PL lib/Net/Stomp.pm lib/Net/Stomp/Frame.pm xt/ xt/consume_buffering.t
ansgar at users.alioth.debian.org
ansgar at users.alioth.debian.org
Thu Jan 27 13:35:36 UTC 2011
Author: ansgar
Date: Thu Jan 27 13:35:18 2011
New Revision: 67928
URL: http://svn.debian.org/wsvn/pkg-perl/?sc=1&rev=67928
Log:
[svn-upgrade] new version libnet-stomp-perl (0.39)
Added:
branches/upstream/libnet-stomp-perl/current/xt/
branches/upstream/libnet-stomp-perl/current/xt/consume_buffering.t
Modified:
branches/upstream/libnet-stomp-perl/current/CHANGES
branches/upstream/libnet-stomp-perl/current/MANIFEST
branches/upstream/libnet-stomp-perl/current/META.yml
branches/upstream/libnet-stomp-perl/current/Makefile.PL
branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp.pm
branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp/Frame.pm
Modified: branches/upstream/libnet-stomp-perl/current/CHANGES
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libnet-stomp-perl/current/CHANGES?rev=67928&op=diff
==============================================================================
--- branches/upstream/libnet-stomp-perl/current/CHANGES (original)
+++ branches/upstream/libnet-stomp-perl/current/CHANGES Thu Jan 27 13:35:18 2011
@@ -1,4 +1,18 @@
Revision history for Perl module Net::Stomp:
+
+0.39 Fri Jan 14 15:59:27 GMT 2011
+ - Cope with EOF during receive_frame better
+
+0.38_99 Fri Aug 27 00:11:08 BST 2010
+ - Reimplemented Net::Stomp::receive_frame() to properly handle STOMP frames
+ that have extraneous line feed characters after the NULL terminator, before
+ the next frame header (e.g., ActiveMQ). (Implemented by Michael S. Fischer)
+ - Improve reading performace by reading large chunks and buffering
+ - Read buffer size can now be specified in Net::Stomp->new() with the
+ 'bufsize' key.
+ - Possible incompatible change - Net::Stomp::Frame#parse method removed.
+ Hopefully no one was using this. If you were, RTFS and look at
+ Net::Stomp#receive_frame method.
0.38 Tue Aug 03 13:58:10 BST 2010
- Fix an issue with IO::Handle buffering interfering with IO::Select. This
Modified: branches/upstream/libnet-stomp-perl/current/MANIFEST
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libnet-stomp-perl/current/MANIFEST?rev=67928&op=diff
==============================================================================
--- branches/upstream/libnet-stomp-perl/current/MANIFEST (original)
+++ branches/upstream/libnet-stomp-perl/current/MANIFEST Thu Jan 27 13:35:18 2011
@@ -10,4 +10,5 @@
README
t/pod.t
t/pod_coverage.t
+xt/consume_buffering.t
Makefile.PL
Modified: branches/upstream/libnet-stomp-perl/current/META.yml
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libnet-stomp-perl/current/META.yml?rev=67928&op=diff
==============================================================================
--- branches/upstream/libnet-stomp-perl/current/META.yml (original)
+++ branches/upstream/libnet-stomp-perl/current/META.yml Thu Jan 27 13:35:18 2011
@@ -1,7 +1,7 @@
---
abstract: 'A Streaming Text Orientated Messaging Protocol Client'
author:
- - "Leon Brocard <acme at astray.com>.\nThom May <thom.may at betfair.com>.\nAsh Berlin <ash_github at firemirror.com>."
+ - "Leon Brocard <acme at astray.com>,\nThom May <thom.may at betfair.com>,\nAsh Berlin <ash_github at firemirror.com>,\nMichael S. Fischer <michael at dynamine.net>"
configure_requires:
Module::Build: 0.36
generated_by: 'Module::Build version 0.3603'
@@ -13,7 +13,7 @@
provides:
Net::Stomp:
file: lib/Net/Stomp.pm
- version: 0.38
+ version: 0.39
Net::Stomp::Frame:
file: lib/Net/Stomp/Frame.pm
requires:
@@ -22,4 +22,4 @@
IO::Socket::INET: 0
resources:
license: http://dev.perl.org/licenses/
-version: 0.38
+version: 0.39
Modified: branches/upstream/libnet-stomp-perl/current/Makefile.PL
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libnet-stomp-perl/current/Makefile.PL?rev=67928&op=diff
==============================================================================
--- branches/upstream/libnet-stomp-perl/current/Makefile.PL (original)
+++ branches/upstream/libnet-stomp-perl/current/Makefile.PL Thu Jan 27 13:35:18 2011
@@ -2,15 +2,15 @@
use ExtUtils::MakeMaker;
WriteMakefile
(
+ 'PL_FILES' => {},
+ 'INSTALLDIRS' => 'site',
'NAME' => 'Net::Stomp',
+ 'EXE_FILES' => [],
'VERSION_FROM' => 'lib/Net/Stomp.pm',
'PREREQ_PM' => {
- 'Class::Accessor::Fast' => '0',
'IO::Select' => '0',
- 'IO::Socket::INET' => '0'
- },
- 'INSTALLDIRS' => 'site',
- 'EXE_FILES' => [],
- 'PL_FILES' => {}
+ 'IO::Socket::INET' => '0',
+ 'Class::Accessor::Fast' => '0'
+ }
)
;
Modified: branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp.pm
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp.pm?rev=67928&op=diff
==============================================================================
--- branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp.pm (original)
+++ branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp.pm Thu Jan 27 13:35:18 2011
@@ -6,15 +6,20 @@
use Net::Stomp::Frame;
use Carp;
use base 'Class::Accessor::Fast';
-our $VERSION = '0.38';
+our $VERSION = '0.39';
+
__PACKAGE__->mk_accessors( qw(
_cur_host failover hostname hosts port select serial session_id socket ssl
- ssl_options subscriptions _connect_headers
+ ssl_options subscriptions _connect_headers bufsize
) );
sub new {
my $class = shift;
my $self = $class->SUPER::new(@_);
+
+ $self->bufsize(8192) unless $self->bufsize;
+
+ $self->{_framebuf} = "";
# We are not subscribed to anything at the start
$self->subscriptions( {} );
@@ -127,7 +132,7 @@
$self->socket->close;
}
eval { $self->_get_connection };
- while ($@) {
+ while ($@) {
sleep(5);
eval { $self->_get_connection };
}
@@ -139,6 +144,16 @@
sub can_read {
my ( $self, $conf ) = @_;
+
+ # If there is any data left in the framebuffer that we haven't read, return
+ # 'true'. But we don't want to spin endlessly, so only return true the
+ # first time. (Anything touching the _framebuf should update this flag when
+ # it does something.
+ if ( $self->{_framebuf_changed} && length $self->{_framebuf} ) {
+ $self->{_framebuf_changed} = 0;
+ return 1;
+ }
+
$conf ||= {};
my $timeout = exists $conf->{timeout} ? $conf->{timeout} : undef;
return $self->select->can_read($timeout) || 0;
@@ -229,7 +244,7 @@
my ( $self, $frame ) = @_;
# warn "send [" . $frame->as_string . "]\n";
- $self->socket->print( $frame->as_string );
+ $self->socket->syswrite( $frame->as_string );
my $connected = $self->socket->connected;
unless (defined $connected) {
$self->_reconnect;
@@ -237,30 +252,98 @@
}
}
+sub _read_data {
+ my ($self, $timeout) = @_;
+
+ return unless $self->select->can_read($timeout);
+ my $len = $self->socket->sysread($self->{_framebuf},
+ $self->bufsize,
+ length($self->{_framebuf} || ''));
+
+ if ($len > 0) {
+ $self->{_framebuf_changed} = 1;
+ }
+ else {
+ # EOF detected - connection is gone. We have to reset the framebuf in
+ # case we had a partial frame in there that will never arrive.
+ $self->{_framebuf} = "";
+ delete $self->{_command};
+ delete $self->{_headers};
+ }
+ return $len;
+}
+
+sub _read_headers {
+ my ($self) = @_;
+
+ if ($self->{_framebuf} =~ s/^\n*([^\n].*?)\n\n//s) {
+ $self->{_framebuf_changed} = 1;
+ my $raw_headers = $1;
+ if ($raw_headers =~ s/^(.+)\n//) {
+ $self->{_command} = $1;
+ }
+ foreach my $line (split(/\n/, $raw_headers)) {
+ my ($key, $value) = split(/\s*:\s*/, $line, 2);
+ $self->{_headers}->{$key} = $value;
+ }
+ return 1;
+ }
+ return 0;
+}
+
+sub _read_body {
+ my ($self) = @_;
+
+ my $h = $self->{_headers};
+ if ($h->{'content-length'}) {
+ if (length($self->{_framebuf}) >= $h->{'content-length'}) {
+ $self->{_framebuf_changed} = 1;
+ my $body = substr($self->{framebuf},
+ 0,
+ $h->{'content-length'},
+ undef );
+
+ # Trim the trailer off the frame.
+ $self->{_framebuf} =~ s/^.*?\000\n*//s;
+ return Net::Stomp::Frame->new({
+ command => delete $self->{_command},
+ headers => delete $self->{_headers},
+ body => $body
+ });
+ }
+ } elsif ($self->{_framebuf} =~ s/^(.*?)\000\n*//s) {
+ # No content-length header.
+
+ my $body = $1;
+ $self->{_framebuf_changed} = 1;
+ return Net::Stomp::Frame->new({
+ command => delete $self->{_command},
+ headers => delete $self->{_headers},
+ body => $body });
+ }
+
+ return 0;
+}
+
sub receive_frame {
my ($self, $conf) = @_;
- my $frame;
- while (!$frame) {
-
- # If the user passed in { timeout => 1 } then we wait for up to a
- # second to read something. If we get no data in that time, then return
- # undef.
-
- # But if we get an error (cos we aren't connected) then we should
- # reconnect and try again.
- if ( $self->can_read($conf) ) {
- eval {
- $frame = Net::Stomp::Frame->parse( $self->socket );
- 1;
- } or $self->_reconnect;
- }
- else {
- return;
- }
- }
- # warn "receive [" . $frame->as_string . "]\n";
- return $frame;
+ my $timeout = exists $conf->{timeout} ? $conf->{timeout} : undef;
+
+ my $connected = $self->socket->connected;
+ unless (defined $connected) {
+ $self->_reconnect;
+ }
+
+ my $done = 0;
+ while ( not $done = $self->_read_headers ) {
+ return undef unless $self->_read_data($timeout);
+ }
+ while ( not $done = $self->_read_body ) {
+ return undef unless $self->_read_data($timeout);
+ }
+
+ return $done;
}
sub _get_next_transaction {
@@ -396,7 +479,7 @@
You may also pass in 'client-id', which specifies the JMS Client ID which is
used in combination to the activemqq.subscriptionName to denote a durable
subscriber.
-
+
$stomp->connect( { login => 'hello', passcode => 'there' } );
=head2 send
@@ -457,17 +540,17 @@
non-durable topics by dropping old messages - we can set a maximum
pending limit which once a slow consumer backs up to this high water
mark we begin to discard old messages.
-
+
'activemq.noLocal': Specifies whether or not locally sent messages
should be ignored for subscriptions. Set to true to filter out locally
sent messages.
-
+
'activemq.prefetchSize': Specifies the maximum number of pending
messages that will be dispatched to the client. Once this maximum is
reached no more messages are dispatched until the client acknowledges
a message. Set to 1 for very fair distribution of messages across
consumers where processing messages can be slow.
-
+
'activemq.priority': Sets the priority of the consumer so that
dispatching can be weighted in priority order.
@@ -493,7 +576,7 @@
=head2 receive_frame
-This blocks and returns you the next Stomp frame.
+This blocks and returns you the next Stomp frame.
my $frame = $stomp->receive_frame;
warn $frame->body; # do something here
@@ -508,8 +591,8 @@
=head2 can_read
-This returns whether a frame is waiting to be read. Optionally takes a
-timeout in seconds:
+This returns whether there is new data is waiting to be read from the STOMP
+server. Optionally takes a timeout in seconds:
my $can_read = $stomp->can_read;
my $can_read = $stomp->can_read({ timeout => '0.1' });
@@ -538,17 +621,19 @@
L<Net::Stomp::Frame>.
-=head1 AUTHOR
-
-Leon Brocard <acme at astray.com>.
-Thom May <thom.may at betfair.com>.
-Ash Berlin <ash_github at firemirror.com>.
+=head1 AUTHORS
+
+Leon Brocard <acme at astray.com>,
+Thom May <thom.may at betfair.com>,
+Ash Berlin <ash_github at firemirror.com>,
+Michael S. Fischer <michael at dynamine.net>
=head1 COPYRIGHT
Copyright (C) 2006-9, Leon Brocard
Copyright (C) 2009, Thom May, Betfair.com
Copyright (C) 2010, Ash Berlin, Net-a-Porter.com
+Copyright (C) 2010, Michael S. Fischer
This module is free software; you can redistribute it or modify it
under the same terms as Perl itself.
Modified: branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp/Frame.pm
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp/Frame.pm?rev=67928&op=diff
==============================================================================
--- branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp/Frame.pm (original)
+++ branches/upstream/libnet-stomp-perl/current/lib/Net/Stomp/Frame.pm Thu Jan 27 13:35:18 2011
@@ -15,7 +15,7 @@
my $self = shift;
$self->headers->{$header} = shift if @_;
$self->headers->{$header};
- }
+ };
}
}
@@ -40,64 +40,6 @@
$frame .= "\n";
$frame .= $body || '';
$frame .= "\000";
-}
-
-# NBK - $sock->getline does buffered IO which screws up select. Use
-# sysread one char at a time to avoid reading part of the next line.
-sub _readline {
- my($self, $socket, $terminator, $msg) = @_;
-
- $terminator = "\n" unless defined($terminator);
- $msg ||= "";
-
- my $s = "";
- while( 1 ) {
- $socket->sysread($s, 1, length($s)) or die("Error reading $msg: $!");
- last if substr($s, -1) eq $terminator;
- }
-
- return $s;
-}
-
-sub parse {
- my ( $package, $socket ) = @_;
- local $/ = "\n";
-
- # read the command
- my $command;
- while (1) {
- $command = $package->_readline($socket, "\n", "command");
- chop $command;
- last if $command;
- }
-
- # read headers
- my $headers;
- while (1) {
- my $line = $package->_readline($socket, "\n", "header");
- chop $line;
- last if $line eq "";
- my ( $key, $value ) = split(/: ?/, $line, 2);
- $headers->{$key} = $value;
- }
-
- # read the body
- my $body;
- my $c;
- if ( $headers->{"content-length"} ) {
- $socket->sysread( $body, $headers->{"content-length"} + 1 )
- || die "Error reading body: $!";
- $headers->{bytes_message} = 1;
- } else {
- $body = $package->_readline($socket, "\000", "body");
- }
- # strip trailing null
- $body =~ s/\000$//;
-
- my $frame = Net::Stomp::Frame->new(
- { command => $command, headers => $headers, body => $body } );
-
- return $frame;
}
1;
Added: branches/upstream/libnet-stomp-perl/current/xt/consume_buffering.t
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libnet-stomp-perl/current/xt/consume_buffering.t?rev=67928&op=file
==============================================================================
--- branches/upstream/libnet-stomp-perl/current/xt/consume_buffering.t (added)
+++ branches/upstream/libnet-stomp-perl/current/xt/consume_buffering.t Thu Jan 27 13:35:18 2011
@@ -1,0 +1,61 @@
+# -*- Mode: Perl; tab-width: 2; indent-tabs-mode: nil; -*-
+use strict;
+use warnings;
+
+use Test::More;
+use Test::Exception;
+use Net::Stomp;
+
+my $amount_to_send = 32;
+my $broker_host = '0.0.0.0';
+
+note "About to insert $amount_to_send messages - be patient";
+lives_ok {
+ local $SIG{ALRM} = sub { die "failed to send $amount_to_send messages\n" }; # NB: \n required
+ alarm(10);
+
+ my $connection = Net::Stomp->new({
+ hostname => $broker_host,
+ port => 61613,
+ });
+ $connection->connect;
+
+ for my $n (1 .. $amount_to_send) {
+ $connection->send({
+ destination => '/queue/consume.many',
+ body => "Message $n - " . scalar localtime
+ })
+ }
+} q{inserted multiple jobs without dying};
+note "Inserted $amount_to_send messages - cheers";
+
+# now try to peel off messages
+use Net::Stomp;
+my $connection;
+lives_ok {
+ $connection = Net::Stomp->new({
+ hostname => $broker_host,
+ debug => 1,
+ port => 61613,
+ });
+ $connection->connect;
+ $connection->subscribe({
+ destination => '/queue/consume.many',
+ ack => 'client',
+ });
+} q{connected using Net::Stomp};
+
+note "About to consume $amount_to_send frames - be patient";
+lives_ok {
+ local $SIG{ALRM} = sub { die "failed to consume $amount_to_send messages\n" }; # NB: \n required
+ alarm(10);
+
+ for (1 .. $amount_to_send) {
+ my $frame = $connection->receive_frame;
+ note "read frame $_: " . $frame->body;
+ $connection->ack({frame => $frame});
+ }
+} qq{read $amount_to_send frames};
+note "Consumed $amount_to_send frames - cheers";
+
+done_testing;
More information about the Pkg-perl-cvs-commits
mailing list