r41143 - in /branches/upstream/libthread-pool-simple-perl/current: Changes MANIFEST META.yml Makefile.PL README Simple.pm t/ t/1.t t/2.t t/3.t t/4.t
ryan52-guest at users.alioth.debian.org
ryan52-guest at users.alioth.debian.org
Mon Aug 3 07:27:25 UTC 2009
Author: ryan52-guest
Date: Mon Aug 3 07:27:18 2009
New Revision: 41143
URL: http://svn.debian.org/wsvn/pkg-perl/?sc=1&rev=41143
Log:
Load ./to_upload/Thread-Pool-Simple-0.23/ into
branches/upstream/libthread-pool-simple-perl/current.
Added:
branches/upstream/libthread-pool-simple-perl/current/Changes
branches/upstream/libthread-pool-simple-perl/current/MANIFEST
branches/upstream/libthread-pool-simple-perl/current/META.yml
branches/upstream/libthread-pool-simple-perl/current/Makefile.PL
branches/upstream/libthread-pool-simple-perl/current/README
branches/upstream/libthread-pool-simple-perl/current/Simple.pm
branches/upstream/libthread-pool-simple-perl/current/t/
branches/upstream/libthread-pool-simple-perl/current/t/1.t
branches/upstream/libthread-pool-simple-perl/current/t/2.t
branches/upstream/libthread-pool-simple-perl/current/t/3.t
branches/upstream/libthread-pool-simple-perl/current/t/4.t
Added: branches/upstream/libthread-pool-simple-perl/current/Changes
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/Changes?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/Changes (added)
+++ branches/upstream/libthread-pool-simple-perl/current/Changes Mon Aug 3 07:27:18 2009
@@ -1,0 +1,43 @@
+Revision history for Perl extension Thread::Pool::Simple.
+
+0.23 24 MAY 2007
+ - only calling ``sleep'' when not busy to avoid 1 sec delay
+ during thread creation
+
+0.22 10 MAY 2007
+ - corrected a race condition where cancelled jobs may slip
+ into the done queue
+ - added ``init'' handler
+ - calling ``sleep'' instead of ``yield'' in main thread to
+ reduce the number of besy() tests
+
+0.21 9 MAY 2007
+ - fixed two defects found by Dominik Gehl
+ - 1. always call pre_handler() before creating the thread
+ - 2. do not count ``fake'' job in busy()
+
+0.20 29 MAR 2007
+ - reworked some code
+
+0.10 14 MAR 2007
+ - added job cancelation support
+
+0.05 8 MAR 2007
+ - re-added lifespan
+ - fixed a `more than max worker' bug
+
+0.04 7 MAR 2007
+ - remove lifespan, pause, and resume
+ - add `passid' config option
+
+0.03 6 MAR 2007
+ - fixed a bug that passes wrong arguments to the handler
+
+0.02 6 MAR 2007
+ - pass default parameters to the handler
+
+0.01 27 FEB 2007
+ - imitial version
+ - original version; created by h2xs 1.22 with options
+ -XA Thread::Pool::Simple
+
Added: branches/upstream/libthread-pool-simple-perl/current/MANIFEST
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/MANIFEST?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/MANIFEST (added)
+++ branches/upstream/libthread-pool-simple-perl/current/MANIFEST Mon Aug 3 07:27:18 2009
@@ -1,0 +1,10 @@
+Changes
+Makefile.PL
+MANIFEST
+README
+Simple.pm
+t/1.t
+t/2.t
+t/3.t
+t/4.t
+META.yml Module meta-data (added by MakeMaker)
Added: branches/upstream/libthread-pool-simple-perl/current/META.yml
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/META.yml?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/META.yml (added)
+++ branches/upstream/libthread-pool-simple-perl/current/META.yml Mon Aug 3 07:27:18 2009
@@ -1,0 +1,10 @@
+# http://module-build.sourceforge.net/META-spec.html
+#XXXXXXX This is a prototype!!! It will change in the future!!! XXXXX#
+name: Thread-Pool-Simple
+version: 0.23
+version_from: Simple.pm
+installdirs: site
+requires:
+
+distribution_type: module
+generated_by: ExtUtils::MakeMaker version 6.30
Added: branches/upstream/libthread-pool-simple-perl/current/Makefile.PL
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/Makefile.PL?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/Makefile.PL (added)
+++ branches/upstream/libthread-pool-simple-perl/current/Makefile.PL Mon Aug 3 07:27:18 2009
@@ -1,0 +1,10 @@
+use 5.008;
+use ExtUtils::MakeMaker;
+
+WriteMakefile(
+ 'NAME' => 'Thread::Pool::Simple',
+ 'VERSION_FROM' => 'Simple.pm',
+ 'PREREQ_PM' => {},
+ ($] >= 5.005 ?
+ (AUTHOR => 'Jianyuan Wu <jwu at cpan.org>') : ()),
+);
Added: branches/upstream/libthread-pool-simple-perl/current/README
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/README?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/README (added)
+++ branches/upstream/libthread-pool-simple-perl/current/README Mon Aug 3 07:27:18 2009
@@ -1,0 +1,26 @@
+Thread/Pool/Simple
+==================
+
+Thread::Pool::Simple provides a simple thread-pool implementaion
+without external dependencies outside core modules.
+
+INSTALLATION
+
+To install this module type the following:
+
+ perl Makefile.PL
+ make
+ make test
+ make install
+
+DEPENDENCIES
+
+This module requires Perl 5.8 (or above) with multi-thread support.
+
+COPYRIGHT AND LICENCE
+
+Copyright (C) 2007 Jianyuan Wu
+
+This library is free software; you can redistribute it and/or modify
+it under the same terms as Perl itself.
+
Added: branches/upstream/libthread-pool-simple-perl/current/Simple.pm
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/Simple.pm?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/Simple.pm (added)
+++ branches/upstream/libthread-pool-simple-perl/current/Simple.pm Mon Aug 3 07:27:18 2009
@@ -1,0 +1,381 @@
+package Thread::Pool::Simple;
+use 5.008;
+use strict;
+use threads;
+use threads::shared;
+use warnings;
+use Carp;
+use Storable qw(nfreeze thaw);
+use Thread::Queue;
+use Thread::Semaphore;
+
+our $VERSION = '0.23';
+
+sub new {
+ my ($class, %arg) = @_;
+ my %config : shared
+ = (min => ($arg{min} || 1),
+ max => ($arg{max} || 10),
+ load => ($arg{load} || 20),
+ lifespan => ($arg{lifespan} || 50000),
+ passid => ($arg{passid} || 0),
+ );
+ my %handler;
+ for (qw(init pre do post)) {
+ next unless exists $arg{$_} && ref $arg{$_} eq 'ARRAY';
+ $handler{$_} = $arg{$_}
+ }
+ my $self = &share({});
+ $self->{config} = \%config;
+ $self->{pending} = Thread::Queue->new();
+ $self->{submitted} = &share({});
+ $self->{done} = &share({});
+ my $state : shared = 0;
+ $self->{state} = \$state;
+ my $worker : shared = 0;
+ $self->{worker} = \$worker;
+ $self->{shutdown_lock} = Thread::Semaphore->new();
+ bless $self, $class;
+ $self->{shutdown_lock}->down();
+ async {
+ $self->_run(\%handler);
+ $self->{shutdown_lock}->up();
+ }->detach();
+ return $self;
+}
+
+sub _run : locked method {
+ my ($self, $handler) = @_;
+ while (1) {
+ last if $self->terminating();
+ if ($self->busy()) {
+ $self->_increase($handler);
+ }
+ else {
+ sleep 1;
+ }
+ threads->yield();
+ }
+ my $worker = $self->{worker};
+ {
+ lock $$worker;
+ cond_wait $$worker while $$worker;
+ }
+}
+
+sub _increase : locked method {
+ my ($self, $handler) = @_;
+ my $max = do { lock %{$self->{config}}; $self->{config}{max} };
+ my $worker = do { lock ${$self->{worker}}; ${$self->{worker}} };
+ return unless $worker < $max;
+
+ $self->_handle_func($handler->{init});
+
+ eval {
+ threads->create(\&_handle, $self, $handler)->detach();
+ lock ${$self->{worker}};
+ ++${$self->{worker}};
+ };
+ carp "fail to add new thread: $@" if $@;
+}
+
+sub _handle {
+ my ($self, $handler) = @_;
+
+ $self->_handle_func($handler->{pre});
+
+ my $do = $handler->{do};
+ my $func = defined $do ? shift @$do : undef;
+ my ($lifespan, $passid)
+ = do {
+ lock %{$self->{config}};
+ @{$self->{config}}{qw(lifespan passid)}
+ };
+ eval {
+ while (!$self->terminating()
+ && $lifespan--
+ ) {
+ my ($id, $job) = unpack 'Na*', $self->{pending}->dequeue();
+ $self->_state(-2) && last unless $id;
+ $self->_drop($id) && next unless $self->job_exists($id);
+
+ my $arg = thaw($job);
+ my @ret;
+ if ($id % 3 == 2) { # void context
+ if (defined $func) {
+ eval {
+ no strict 'refs';
+ scalar $func->($passid ? ($id, @$do, @$arg) : (@$do, @$arg));
+ };
+ $self->_drop($id);
+ next;
+ }
+ }
+ elsif ($id % 3 == 1) { # list context
+ if (defined $func) {
+ @ret = eval {
+ no strict 'refs';
+ $func->($passid ? ($id, @$do, @$arg) : (@$do, @$arg));
+ };
+ }
+ }
+ else { # scalar context
+ if (defined $func) {
+ $ret[0] = eval {
+ no strict 'refs';
+ $func->($passid ? ($id, @$do, @$arg) : (@$do, @$arg));
+ };
+ }
+ }
+
+ $self->_drop($id) && next unless $self->job_exists($id);
+
+ if ($@) {
+ @ret = ('e', $@);
+ }
+ else {
+ unshift @ret, 'n';
+ }
+ my $ret = nfreeze(\@ret);
+ {
+ lock %{$self->{done}};
+ $self->{done}{$id} = $ret;
+ cond_signal %{$self->{done}};
+ }
+ }
+ continue {
+ threads->yield();
+ }
+ };
+ carp "job handling error: $@" if $@;
+
+ $self->_handle_func($handler->{post});
+
+ my $worker = $self->{worker};
+ lock $$worker;
+ --$$worker;
+ cond_signal $$worker;
+}
+
+sub _handle_func {
+ my ($self, $handler) = @_;
+ return unless defined $handler;
+ my @arg = @$handler;
+ my $func = shift @arg;
+ if (defined $func) {
+ eval {
+ no strict 'refs';
+ $func->(@arg);
+ };
+ carp $@ if $@;
+ }
+}
+
+sub _state : locked method {
+ my $self = shift;
+ my $state = $self->{state};
+ lock $$state;
+ return $$state unless @_;
+ my $s = shift;
+ $$state = $s;
+ return $s;
+}
+
+sub join : locked method {
+ my ($self, $nb) = @_;
+ $self->_state(-1);
+ my $max = do { lock %{$self->{config}}; $self->{config}{max} };
+ $self->{pending}->enqueue((pack('Na*', 0, '')) x $max);
+ return if $nb;
+ $self->{shutdown_lock}->down();
+ sleep 1; # cool down, otherwise may coredump while run tests
+}
+
+sub detach : locked method {
+ my ($self) = @_;
+ $self->join(1);
+}
+
+sub busy : locked method {
+ my ($self) = @_;
+ my $worker = do { lock ${$self->{worker}}; ${$self->{worker}} };
+ my ($min, $max, $load) = do { lock %{$self->{config}}; @{$self->{config}}{'min', 'max', 'load'} };
+ my $pending = $self->{pending}->pending();
+
+ # do not count the fake job added after join()
+ $pending -= $max if $self->_state() == -1;
+ return $worker < $min || $pending > $worker * $load;
+}
+
+sub terminating : locked method {
+ my ($self) = @_;
+ my $state = $self->_state();
+ my $job = do { lock %{$self->{submitted}}; keys %{$self->{submitted}} };
+ return 1 if $state == -1 && !$job;
+ return 1 if $state == -2;
+ return;
+}
+
+sub config : locked method {
+ my $self = shift;
+ my $config = $self->{config};
+ lock %$config;
+ return %$config unless @_;
+ %$config = (%$config, @_);
+ return %$config;
+}
+
+sub add : locked method {
+ my $self = shift;
+ my $context = wantarray;
+ $context = 2 unless defined $context; # void context = 2
+ my $arg = nfreeze(\@_);
+ my $id;
+ while (1) {
+ $id = int(rand(time()));
+ next unless $id;
+ ++$id unless $context == $id % 3;
+ ++$id unless $context == $id % 3;
+ {
+ lock %{$self->{submitted}};
+ next if $self->job_exists($id);
+ {
+ # this is necessary as some cancelled jobs may slip in
+ lock %{$self->{done}};
+ delete $self->{done}{$id};
+ }
+ $self->{pending}->enqueue(pack('Na*', $id, $arg));
+ $self->{submitted}{$id} = 1;
+ }
+ last;
+ }
+ return $id;
+}
+
+sub job_exists : locked method {
+ my ($self, $id) = @_;
+ lock %{$self->{submitted}};
+ return $self->{submitted}{$id};
+}
+
+sub job_done : locked method {
+ my ($self, $id) = @_;
+ lock %{$self->{done}};
+ return $self->{done}{$id};
+}
+
+sub _drop : locked method {
+ my ($self, $id) = @_;
+ lock %{$self->{submitted}};
+ delete $self->{submitted}{$id};
+}
+
+sub _remove : locked method {
+ my ($self, $id, $nb) = @_;
+ return if $id % 3 == 2;
+ return unless $self->job_exists($id);
+ my ($exist, $ret);
+ {
+ lock %{$self->{done}};
+ if (!$nb) {
+ cond_wait %{$self->{done}} until exists $self->{done}{$id};
+ cond_signal %{$self->{done}} if 1 < keys %{$self->{done}};
+ }
+ $exist = ($ret) = delete $self->{done}{$id};
+ }
+ $self->_drop($id) if $exist;
+ return $exist unless defined $ret;
+ $ret = thaw($ret);
+ my $err = shift @$ret;
+ croak $ret->[0] if $err eq 'e';
+ return ($exist, @$ret) if $id % 3 == 1;
+ return ($exist, $ret->[0]);
+}
+
+sub remove : locked method {
+ my ($self, $id) = @_;
+ my ($exist, @ret) = $self->_remove($id);
+ return @ret;
+}
+
+
+sub remove_nb : locked method {
+ my ($self, $id) = @_;
+ return $self->_remove($id, 1);
+}
+
+sub cancel : locked method {
+ my ($self, $id) = @_;
+ my ($exist) = eval { $self->remove_nb($id) };
+ if (!$exist) {
+ lock %{$self->{submitted}};
+ $self->{submitted}{$id} = 0;
+ }
+}
+
+sub cancel_all : locked method {
+ my ($self) = @_;
+ my @id = do { lock %{$self->{submitted}}; keys %{$self->{submitted}} };
+ for (@id) {
+ $self->cancel($_);
+ }
+}
+
+1;
+__END__
+
+=head1 NAME
+
+Thread::Pool::Simple - A simple thread-pool implementation
+
+=head1 SYNOPSIS
+
+ use Thread::Pool::Simple;
+
+ my $pool = Thread::Pool::Simple->new(
+ min => 3, # at least 3 workers
+ max => 5, # at most 5 workers
+ load => 10, # increase worker if on average every worker has 10 jobs waiting
+ init => [\&init_handle, $arg1, $arg2, ...] # run before creating worker thread
+ pre => [\&pre_handle, $arg1, $arg2, ...] # run after creating worker thread
+ do => [\&do_handle, $arg1, $arg2, ...] # job handler for each worker
+ post => [\&post_handle, $arg1, $arg2, ...] # run before worker threads end
+ passid => 1, # whether to pass the job id as the first argument to the &do_handle
+ lifespan => 10000, # total jobs handled by each worker
+ );
+
+ my ($id1) = $pool->add(@arg1); # call in list context
+ my $id2 = $pool->add(@arg2); # call in scalar conetxt
+ $pool->add(@arg3) # call in void context
+
+ my @ret = $pool->remove($id1); # get result (block)
+ my $ret = $pool->remove_nb($id2); # get result (no block)
+
+ $pool->cancel($id1); # cancel the job
+ $pool->cancel_all(); # cancel all jobs
+
+ $pool->join(); # wait till all jobs are done
+ $pool->detach(); # don't wait.
+
+=head1 DESCRIPTION
+
+C<Thread::Pool::Simple> provides a simple thread-pool implementaion
+without external dependencies outside core modules.
+
+Jobs can be submitted to and handled by multi-threaded `workers'
+managed by the pool.
+
+=head1 AUTHOR
+
+Jianyuan Wu, E<lt>jwu at cpan.orgE<gt>
+
+=head1 COPYRIGHT AND LICENSE
+
+Copyright 2007 by Jianyuan Wu
+
+This library is free software; you can redistribute it and/or modify
+it under the same terms as Perl itself.
+
+=cut
+
+
Added: branches/upstream/libthread-pool-simple-perl/current/t/1.t
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/t/1.t?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/t/1.t (added)
+++ branches/upstream/libthread-pool-simple-perl/current/t/1.t Mon Aug 3 07:27:18 2009
@@ -1,0 +1,27 @@
+BEGIN {
+ use Config;
+ if (!$Config{useithreads}) {
+ print ("1..0 # Skip: Perl not compiled with 'useithreads'\n");
+ exit 0;
+ }
+}
+use strict;
+use threads;
+use warnings;
+use Test::More qw(no_plan);
+BEGIN { use_ok('Thread::Pool::Simple') };
+
+my $pool = Thread::Pool::Simple->new(min => 5,
+ load => 5,
+ max => 10,
+ do => [sub { return @_; }],
+ );
+
+
+for (1..1000) {
+ my @arg = (1, 2, 3);
+ my ($id, @ret);
+ $pool->add(@arg);
+}
+
+$pool->join();
Added: branches/upstream/libthread-pool-simple-perl/current/t/2.t
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/t/2.t?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/t/2.t (added)
+++ branches/upstream/libthread-pool-simple-perl/current/t/2.t Mon Aug 3 07:27:18 2009
@@ -1,0 +1,40 @@
+BEGIN {
+ use Config;
+ if (!$Config{useithreads}) {
+ print ("1..0 # Skip: Perl not compiled with 'useithreads'\n");
+ exit 0;
+ }
+}
+use strict;
+use threads;
+use warnings;
+use Test::More qw(no_plan);
+BEGIN { use_ok('Thread::Pool::Simple') };
+
+my $pool = Thread::Pool::Simple->new(min => 5,
+ load => 5,
+ max => 10,
+ do => [sub { return @_; }],
+ );
+
+
+for (1..300) {
+ my @arg = (1, 2, 3);
+ my ($id, @ret);
+ $pool->add(@arg);
+
+ ($id) = $pool->add(@arg);
+ @ret = $pool->remove($id);
+ ok("@ret" eq "@arg");
+
+ ($id) = $pool->add(@arg);
+ $pool->cancel($id);
+ @ret = $pool->remove($id);
+ ok(!@ret);
+
+ $id = $pool->add(@arg);
+ @ret = $pool->remove($id);
+ ok($ret[0] == 3);
+}
+
+$pool->join();
Added: branches/upstream/libthread-pool-simple-perl/current/t/3.t
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/t/3.t?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/t/3.t (added)
+++ branches/upstream/libthread-pool-simple-perl/current/t/3.t Mon Aug 3 07:27:18 2009
@@ -1,0 +1,55 @@
+BEGIN {
+ use Config;
+ if (!$Config{useithreads}) {
+ print ("1..0 # Skip: Perl not compiled with 'useithreads'\n");
+ exit 0;
+ }
+}
+use strict;
+use threads;
+use warnings;
+use Test::More qw(no_plan);
+BEGIN { use_ok('Thread::Pool::Simple') };
+
+my $pool = Thread::Pool::Simple->new(min => 5,
+ load => 5,
+ max => 20,
+ do => [sub { return @_; }],
+ );
+
+
+for (1..300) {
+ my @arg = (1, 2, 3);
+ my ($id, @ret);
+ $pool->add(@arg);
+
+ ($id) = $pool->add(@arg);
+ ($id) = $pool->add(@arg);
+ $pool->cancel($id);
+
+ $id = $pool->add(@arg);
+}
+
+$pool->cancel_all();
+
+for (1..300) {
+ my @arg = (5, 6, 7);
+ my ($id, @ret);
+ $pool->add(@arg);
+
+ ($id) = $pool->add(@arg);
+ @ret = $pool->remove($id);
+
+ ok("@ret" eq "@arg");
+
+ ($id) = $pool->add(@arg);
+ $pool->cancel($id);
+ @ret = $pool->remove($id);
+ ok(!@ret);
+
+ $id = $pool->add(@arg);
+ @ret = $pool->remove($id);
+ ok($ret[0] == 3);
+}
+
+$pool->join();
Added: branches/upstream/libthread-pool-simple-perl/current/t/4.t
URL: http://svn.debian.org/wsvn/pkg-perl/branches/upstream/libthread-pool-simple-perl/current/t/4.t?rev=41143&op=file
==============================================================================
--- branches/upstream/libthread-pool-simple-perl/current/t/4.t (added)
+++ branches/upstream/libthread-pool-simple-perl/current/t/4.t Mon Aug 3 07:27:18 2009
@@ -1,0 +1,27 @@
+BEGIN {
+ use Config;
+ if (!$Config{useithreads}) {
+ print ("1..0 # Skip: Perl not compiled with 'useithreads'\n");
+ exit 0;
+ }
+}
+use strict;
+use threads;
+use warnings;
+use Test::More qw(no_plan);
+BEGIN { use_ok('Thread::Pool::Simple') };
+
+my $pool = Thread::Pool::Simple->new(min => 5,
+ load => 5,
+ max => 10,
+ do => [sub { return 444/($_[0] - 1); }],
+ );
+
+
+my @arg = (1, 2, 3);
+my ($id, @ret);
+$id = $pool->add(@arg);
+eval { @ret = $pool->remove($id) };
+ok ($@ =~ /Illegal division by zero/);
+
+$pool->join();
More information about the Pkg-perl-cvs-commits
mailing list