#!/usr/bin/env perl
use strict;
use warnings;
use Gearman::XS::Worker;
use Gearman::XS qw(:constants);
use Imager;
my $worker = Gearman::XS::Worker->new;
$worker->add_server( 'localhost', 4730 );
$worker->add_function( "convert_to_jpeg", 0, \&convert_to_jpeg, {} );
$worker->add_function( "convert_to_gif", 0, \&convert_to_gif, {} );
while (1) {
my $ret = $worker->work();
if ( $ret != GEARMAN_SUCCESS ) {
printf( STDERR "%s\n", $worker->error() );
}
}
sub convert_to_jpeg {
my ($job) = @_;
return _convert( $job->workload, 'jpeg' );
}
sub convert_to_gif {
my ($job) = @_;
return _convert( $job->workload, 'gif' );
}
sub _convert {
my ( $in_data, $format ) = @_;
my $img = Imager->new();
my $out_data;
$img->read( data => $in_data ) or die;
$img->write( data => \$out_data, type => $format ) or die;
return $out_data;
}
The problem in this design is to have two functions in the same script. If you depend on running convert_to_jpeg and convert_to_gif at the same time you could split up the script:
#!/usr/bin/env perl
use strict;
use warnings;
use Gearman::XS::Worker;
use Gearman::XS qw(:constants);
use Imager;
my $worker = Gearman::XS::Worker->new;
$worker->add_server( 'localhost', 4730 );
$worker->add_function( "convert_to_jpeg", 0, \&convert_to_jpeg, {} );
while (1) {
my $ret = $worker->work();
if ( $ret != GEARMAN_SUCCESS ) {
printf( STDERR "%s\n", $worker->error() );
}
}
sub convert_to_jpeg {
my ($job) = @_;
return _convert( $job->workload, 'jpeg' );
}
sub _convert {
my ( $in_data, $format ) = @_;
my $img = Imager->new();
my $out_data;
$img->read( data => $in_data ) or die;
$img->write( data => \$out_data, type => $format ) or die;
return $out_data;
}
#!/usr/bin/env perl
use strict;
use warnings;
use Gearman::XS::Worker;
use Gearman::XS qw(:constants);
use Imager;
my $worker = Gearman::XS::Worker->new;
$worker->add_server( 'localhost', 4730 );
$worker->add_function( "convert_to_gif", 0, \&convert_to_gif, {} );
while (1) {
my $ret = $worker->work();
if ( $ret != GEARMAN_SUCCESS ) {
printf( STDERR "%s\n", $worker->error() );
}
}
sub convert_to_gif {
my ($job) = @_;
return _convert( $job->workload, 'gif' );
}
sub _convert {
my ( $in_data, $format ) = @_;
my $img = Imager->new();
my $out_data;
$img->read( data => $in_data ) or die;
$img->write( data => \$out_data, type => $format ) or die;
return $out_data;
}
The next problem is about memory: Having many scripts running in separate processes using the same (sometimes heavyweight) library is not very smart. And especially in this example where images are processed those separate processes may consume a lot of ram after a long runtime. So it would be cool to share the memory consumed by libraries as well as freeing up memory from time to time. This is where Gearman::Driver comes in handy. It's not only sharing memory by using the advantages of copy-on-write but it's also having other cool features like restarting worker processes after some idle time to free up memory. Besides that it got some telnet interface to change amount of processes on runtime as well as getting some statistics et cetera.
So let's re-implement the example above:
package GDExamples::Convert;
use base qw(Gearman::Driver::Worker);
use Moose;
use Imager;
sub process_name {
my ( $self, $orig, $job_name ) = @_;
return "$orig ($job_name)";
}
sub convert_to_jpeg : Job : MinProcesses(0) : MaxProcesses(5) {
my ( $self, $job, $workload ) = @_;
return _convert( $workload, 'jpeg' );
}
sub convert_to_gif : Job : MinProcesses(0) : MaxProcesses(5) {
my ( $self, $job, $workload ) = @_;
return _convert( $workload, 'gif' );
}
sub _convert {
my ( $in_data, $format ) = @_;
my $img = Imager->new();
my $out_data;
$img->read( data => $in_data ) or die;
$img->write( data => \$out_data, type => $format ) or die;
return $out_data;
}
To run that worker you can use the script called gearman_driver.pl which is part of the Gearman::Driver distribution.
usage: gearman_driver.pl [long options...]
--loglevel Log level (default: INFO)
--lib Example: --lib ./lib --lib /custom/lib
--max_idle_time How many seconds a worker may be idle before its killed
--server Gearman host[:port][,host[:port]]
--logfile Path to logfile (default: gearman_driver.log)
--console_port Port of management console (default: 47300)
--interval Interval in seconds (see Gearman::Driver::Observer)
--loglayout Log message layout (default: [%d] %p %m%n)
--namespaces Example: --namespaces My::Workers --namespaces My::OtherWorkers
Run the example: gearman_driver.pl --namespaces GDExamples &
You can easily test the workers using the gearman client:
- gearman -f GDExamples::Convert::convert_to_jpeg < cpan.png > cpan.jpg
- gearman -f GDExamples::Convert::convert_to_gif < cpan.png > cpan.gif
Now let's see what we did. First of all you may have noticed the attributes on both methods:
- sub convert_to_jpeg : Job : MinProcesses(0) : MaxProcesses(5) {}
- sub convert_to_jpeg : Job : MinProcesses(0) : MaxProcesses(5) {}
- sub convert_to_jpeg : Job : MinProcesses(0) : MaxProcesses(5) {}
Basically you only need to add the attribute Job to the methods which should be registered with gearmand. All other attributes are optional. Your method will be registered with the full package name:
- GDExamples::Convert::convert_to_jpeg
- GDExamples::Convert::convert_to_gif
sub prefix {
return ref(shift) . '::';
}
There are other predefined methods which can be overridden, like begin or end. Both are called whenever a job function is called, begin before the job method is called and end afterwards (even if the job method died).
The other two attributes tell Gearman::Driver how many processes should be forked and work on that function/job. If you set MinProcesses to 0 no process is preforked at all. It's being forked on demand: Gearman::Driver monitors the telnet interface of gearmand for queued jobs to see if it's necessary to fork a new process. If there are many jobs in the queue it will fork even more processes until MaxProcesses is reached. After all jobs are done it will kill all processes again. The default behaviour is to do that immediately, but can be changed by setting max_idle_time to 300 (seconds) for example.
Gearman::Driver itself got a telnet interface as well. There's also a client coming with the distribution having readline support. Another feature is to connect to multiple servers at once to send the same commands to all servers. Really handy in a big environment.
gearman_driver_console.pl --server localhost:47300 console> status localhost:47300> GDExamples::Convert::convert_to_gif 0 5 0 2010-01-30T19:45:43 1970-01-01T00:00:00 localhost:47300> GDExamples::Convert::convert_to_jpeg 0 5 0 2010-01-30T19:45:53 1970-01-01T00:00:00 localhost:47300> . console> set_processes GDExamples::Convert::convert_to_gif 2 10 localhost:47300> OK localhost:47300> . console> status localhost:47300> GDExamples::Convert::convert_to_gif 2 10 2 2010-01-30T19:45:43 1970-01-01T00:00:00 localhost:47300> GDExamples::Convert::convert_to_jpeg 0 5 0 2010-01-30T19:45:53 1970-01-01T00:00:00 localhost:47300> .
I've just shipped v0.01017 to the CPAN containing the examples in this blog post. If you can't wait for it, you can fetch it from GitHub. Any comments/suggestions/rants will be highly appreciated. Thanks.
