January 2010 Archives

Gearman::Driver

| | Comments (0) | TrackBacks (0)
Gearman was initially developed by Danga. It's a very nice framework to distribute (sometimes long running) tasks across multiple servers. For a more detailed description see gearman.org where you will also find a rewrite of it in C. First let's take a look at it without Gearman::Driver, using a plain Gearman::XS setup:
#!/usr/bin/env perl
use strict;
use warnings;
use Gearman::XS::Worker;
use Gearman::XS qw(:constants);
use Imager;

my $worker = Gearman::XS::Worker->new;
$worker->add_server( 'localhost', 4730 );

$worker->add_function( "convert_to_jpeg", 0, \&convert_to_jpeg, {} );
$worker->add_function( "convert_to_gif",  0, \&convert_to_gif,  {} );

while (1) {
    my $ret = $worker->work();
    if ( $ret != GEARMAN_SUCCESS ) {
        printf( STDERR "%s\n", $worker->error() );
    }
}

sub convert_to_jpeg {
    my ($job) = @_;
    return _convert( $job->workload, 'jpeg' );
}

sub convert_to_gif {
    my ($job) = @_;
    return _convert( $job->workload, 'gif' );
}

sub _convert {
    my ( $in_data, $format ) = @_;
    my $img = Imager->new();
    my $out_data;
    $img->read( data => $in_data ) or die;
    $img->write( data => \$out_data, type => $format ) or die;
    return $out_data;
}
The problem in this design is to have two functions in the same script. If you depend on running convert_to_jpeg and convert_to_gif at the same time you could split up the script:
#!/usr/bin/env perl
use strict;
use warnings;
use Gearman::XS::Worker;
use Gearman::XS qw(:constants);
use Imager;

my $worker = Gearman::XS::Worker->new;
$worker->add_server( 'localhost', 4730 );

$worker->add_function( "convert_to_jpeg", 0, \&convert_to_jpeg, {} );

while (1) {
    my $ret = $worker->work();
    if ( $ret != GEARMAN_SUCCESS ) {
        printf( STDERR "%s\n", $worker->error() );
    }
}

sub convert_to_jpeg {
    my ($job) = @_;
    return _convert( $job->workload, 'jpeg' );
}

sub _convert {
    my ( $in_data, $format ) = @_;
    my $img = Imager->new();
    my $out_data;
    $img->read( data => $in_data ) or die;
    $img->write( data => \$out_data, type => $format ) or die;
    return $out_data;
}
#!/usr/bin/env perl
use strict;
use warnings;
use Gearman::XS::Worker;
use Gearman::XS qw(:constants);
use Imager;

my $worker = Gearman::XS::Worker->new;
$worker->add_server( 'localhost', 4730 );

$worker->add_function( "convert_to_gif",  0, \&convert_to_gif,  {} );

while (1) {
    my $ret = $worker->work();
    if ( $ret != GEARMAN_SUCCESS ) {
        printf( STDERR "%s\n", $worker->error() );
    }
}

sub convert_to_gif {
    my ($job) = @_;
    return _convert( $job->workload, 'gif' );
}

sub _convert {
    my ( $in_data, $format ) = @_;
    my $img = Imager->new();
    my $out_data;
    $img->read( data => $in_data ) or die;
    $img->write( data => \$out_data, type => $format ) or die;
    return $out_data;
}
The next problem is about memory: Having many scripts running in separate processes using the same (sometimes heavyweight) library is not very smart. And especially in this example where images are processed those separate processes may consume a lot of ram after a long runtime. So it would be cool to share the memory consumed by libraries as well as freeing up memory from time to time. This is where Gearman::Driver comes in handy. It's not only sharing memory by using the advantages of copy-on-write but it's also having other cool features like restarting worker processes after some idle time to free up memory. Besides that it got some telnet interface to change amount of processes on runtime as well as getting some statistics et cetera. So let's re-implement the example above:
package GDExamples::Convert;

use base qw(Gearman::Driver::Worker);
use Moose;
use Imager;

sub process_name {
    my ( $self, $orig, $job_name ) = @_;
    return "$orig ($job_name)";
}

sub convert_to_jpeg : Job : MinProcesses(0) : MaxProcesses(5) {
    my ( $self, $job, $workload ) = @_;
    return _convert( $workload, 'jpeg' );
}

sub convert_to_gif : Job : MinProcesses(0) : MaxProcesses(5) {
    my ( $self, $job, $workload ) = @_;
    return _convert( $workload, 'gif' );
}

sub _convert {
    my ( $in_data, $format ) = @_;
    my $img = Imager->new();
    my $out_data;
    $img->read( data => $in_data ) or die;
    $img->write( data => \$out_data, type => $format ) or die;
    return $out_data;
}
To run that worker you can use the script called gearman_driver.pl which is part of the Gearman::Driver distribution.
usage: gearman_driver.pl [long options...]
        --loglevel           Log level (default: INFO)
        --lib                Example: --lib ./lib --lib /custom/lib
        --max_idle_time      How many seconds a worker may be idle before its killed
        --server             Gearman host[:port][,host[:port]]
        --logfile            Path to logfile (default: gearman_driver.log)
        --console_port       Port of management console (default: 47300)
        --interval           Interval in seconds (see Gearman::Driver::Observer)
        --loglayout          Log message layout (default: [%d] %p %m%n)
        --namespaces         Example: --namespaces My::Workers --namespaces My::OtherWorkers

Run the example: gearman_driver.pl --namespaces GDExamples &

You can easily test the workers using the gearman client:

  • gearman -f GDExamples::Convert::convert_to_jpeg < cpan.png > cpan.jpg
  • gearman -f GDExamples::Convert::convert_to_gif < cpan.png > cpan.gif

Now let's see what we did. First of all you may have noticed the attributes on both methods:

  • sub convert_to_jpeg : Job : MinProcesses(0) : MaxProcesses(5) {}
  • sub convert_to_jpeg : Job : MinProcesses(0) : MaxProcesses(5) {}
  • sub convert_to_jpeg : Job : MinProcesses(0) : MaxProcesses(5) {}

Basically you only need to add the attribute Job to the methods which should be registered with gearmand. All other attributes are optional. Your method will be registered with the full package name:

  • GDExamples::Convert::convert_to_jpeg
  • GDExamples::Convert::convert_to_gif
If you don't like that behaviour you can override prefix. By default it's implemented this way:
sub prefix {
    return ref(shift) . '::';
}

There are other predefined methods which can be overridden, like begin or end. Both are called whenever a job function is called, begin before the job method is called and end afterwards (even if the job method died).

The other two attributes tell Gearman::Driver how many processes should be forked and work on that function/job. If you set MinProcesses to 0 no process is preforked at all. It's being forked on demand: Gearman::Driver monitors the telnet interface of gearmand for queued jobs to see if it's necessary to fork a new process. If there are many jobs in the queue it will fork even more processes until MaxProcesses is reached. After all jobs are done it will kill all processes again. The default behaviour is to do that immediately, but can be changed by setting max_idle_time to 300 (seconds) for example.

Gearman::Driver itself got a telnet interface as well. There's also a client coming with the distribution having readline support. Another feature is to connect to multiple servers at once to send the same commands to all servers. Really handy in a big environment.

gearman_driver_console.pl --server localhost:47300
console> status
localhost:47300> GDExamples::Convert::convert_to_gif   0  5  0  2010-01-30T19:45:43  1970-01-01T00:00:00   
localhost:47300> GDExamples::Convert::convert_to_jpeg  0  5  0  2010-01-30T19:45:53  1970-01-01T00:00:00   
localhost:47300> .
console> set_processes GDExamples::Convert::convert_to_gif 2 10
localhost:47300> OK
localhost:47300> .
console> status
localhost:47300> GDExamples::Convert::convert_to_gif   2  10  2  2010-01-30T19:45:43  1970-01-01T00:00:00   
localhost:47300> GDExamples::Convert::convert_to_jpeg  0   5  0  2010-01-30T19:45:53  1970-01-01T00:00:00   
localhost:47300> .

I've just shipped v0.01017 to the CPAN containing the examples in this blog post. If you can't wait for it, you can fetch it from GitHub. Any comments/suggestions/rants will be highly appreciated. Thanks.

About this Archive

This page is an archive of entries from January 2010 listed from newest to oldest.

May 2009 is the previous archive.

Find recent content on the main index or look in the archives to find all content.