Distributed jobs with Gearman

The first time I heard of Gearman was at Stack Overflow where a question was asked on how to stop workers nicely. On which an excellent joke was made by Cletus: “See now I was going to reply “Bitte halten Sie!” :-)”. Since then Gearman was stuck in my mind. So far I haven’t had the chance to use it for anything, but for my current project Maximus I need to be able to distribute jobs for several tasks such as uploading files, fetching from SCM repositories and so on. And Gearman is perfect for that kind of job.

The Gearman server was originally implemented in Perl but has now moved to C. I’m not sure if they’re still working on the Perl implementation, but the most recent release was in January this year.

In this post I’ll demonstrate how to setup a simple client and worker. The client sends tasks to the Gearman server and the worker registers itself with the server. When the server receives a task it checks if there’s a worker available and delegates the task to the worker. Once finished with the job the worker notifies the server and in turn the server notifies the client that the requested job is finished. It’s also possible not to wait for the task to be done. It depends on your specific situation and the job that needs to be executed if you want this or not.

Setup

First, install the required modules. Note that on Windows Danga::Socket isn’t stable so Gearman might occasionally fail a job. So far I haven’t had any issues with Linux. We’ll also be installing some additional modules that we’re going to use for the client and worker.

$ cpanm Gearman::Server
$ cpanm http://search.cpan.org/CPAN/authors/id/D/DO/DORMANDO/Gearman-1.11.tar.gz
$ cpanm WWW::Mechanize
$ cpanm Archive::Zip
$ cpanm JSON::Any

MyApp::Functions

Now lets make a module that contains some functionality that’ll be used by the worker. We’re making this modular so it’s easier to test these components. Both the client and the worker scripts should be just that, scripts. The functions in this module can fetch the thumbnail links from an Altavista image search page. The amount of pages to scan is limited to 20, but this can be adjusted. It also has a function to archive the downloads directory.

package MyApp::Functions;
use strict;
use warnings;
use Archive::Zip qw(:ERROR_CODES :CONSTANTS);
use Exporter 'import';
use File::Basename;
use File::Spec;
use LWP::Simple;
use WWW::Mechanize;

our @EXPORT_OK = qw(fetch_image_links download_images archive_downloads);

=head2 fetch_image_links

Fetch all image (thumbnail) links from the search results page
=cut
sub fetch_image_links {
	my($keyword, $limit) = @_;
	$keyword ||= 'perl';
	$limit ||= 20;

	my $mech = WWW::Mechanize->new;
	$mech->get('http://www.altavista.com/image/results?q=' . $keyword);

	my $count = 0;
	my @images;
	do {
		$count++;
		foreach my $image( $mech->images ) {
			next if $image->tag() ne 'img';
			next unless index($image->url(), 'nimage') > 0;
			push @images, $image->url();
		}
	}
	while( $count < $limit && $mech->follow_link( text_regex => qr/>>/ ) );

	return @images;
}

=head2 download_images

Download every supplied image from the list

The thumbnails from Altavista are in JPEG format. If you choose to use and or
modify this code for your own needs be sure to safe the file in the correct
format.
=cut
sub download_images {
	my @images = @_;

	mkdir('download') unless -d 'download';

	foreach(@images) {
		my $filepath = File::Spec->catfile('download', basename($_) . '.jpg');
		next if -f $filepath;

		my $content = get($_);
		open my $fh, '>', $filepath or die($!);
		binmode $fh;
		print $fh $content;
		close $fh;
	}
}

=head2 archive_downloads

Archive the download directory
=cut
sub archive_downloads {
	my $zip = Archive::Zip->new();
	my $name = 'backup-' . time();
	$zip->addTree('download' , $name);

	my $status = $zip->writeToFileNamed( $name . '.zip' );
	die "Archiving failed!" if $status != AZ_OK;
}

1;

worker.pl

Now that we’ve got our functionality in place it’s time to setup the worker. This worker provides 2 functions. fetch_thumbnails will do a search, collect the thumbnail links and will download them to the download directory. archive_downloads will create a Zip archive with the contents of the downloads directory.

#!/usr/bin/perl
use strict;
use warnings;
use lib './lib';
use Gearman::Worker;
use MyApp::Functions qw(fetch_image_links download_images archive_downloads);
use JSON::Any;

my $worker = Gearman::Worker->new;
$worker->job_servers('127.0.0.1');

# Using JSON to unserialize arguments
my $json = JSON::Any->new;

# fetch_thumbnails: Search and fetch thumbnails
$worker->register_function('fetch_thumbnails', sub {
	my @images = fetch_image_links( @{$json->decode($_[0]->arg)} );
	download_images( @images );
});

# archive_downloads: Archive download directory
$worker->register_function('archive_downloads', \&archive_downloads);

$worker->work while 1;

client.pl

A worker needs to have something to do, so lets create a client that can dispatch tasks to it. Our client will create a taskset, containing the fetch_thumbnails and archive_downloads tasks. This taskset is being sent to the server, which distributes it to the workers. When the workers are done the client gets a signal to continue. After this we’ll execute another task to create another archive. With do_task the client will wait for the task to finish. Finally, another archive_downloads task is being dispatched to the background. The client will instantly finish after this, even if the task isn’t finished yet. Still, the task is being executed by a worker when available.

#!/usr/bin/perl
use strict;
use warnings;
use lib './lib';
use Gearman::Client;
use JSON::Any;

my $client = Gearman::Client->new;
$client->job_servers('127.0.0.1');

# Using JSON to serialize arguments
my $json = JSON::Any->new;

# Create a taskset that will search and download thumbnails and finally archives
# it to a ZIP-file.
my $taskset = $client->new_task_set;

$taskset->add_task('fetch_thumbnails', $json->encode(['perl', 5]), {
	on_complete => sub {
		print "Downloaded all thumbnails\n";
	},
});

# Run the taskset and wait for it to complete
$taskset->wait;

# Create an archive and wait for it
$client->do_task('archive_downloads');

# And create a third archive, but don't for this one
$client->dispatch_background('archive_downloads');

Try it!

With all code in place open 3 terminals. 1 For gearmand, 1 for the worker and 1 for the script. Start them in the given order.

$ gearmand --debug=1
$ perl worker.pl
$ perl client.pl

The terminal running the client.pl should display the following:

Downloaded all thumbnails

Check the downloads directory to see all the thumbnails. The root directory of the scripts should contain 2 zip archives. If not then the jobs executed and finished the same second. Congratulations, you have a very fast machine! The name of the archive contains a timestamp. Change the code to use something more unique.

Please do note…

Do note that the example code won’t really work with multiple clients and workers. That’s because thumbnails are being stored inside the same directory and the archive_downloads task simply creates a snapshot of the download directory. I’ll leave it up to you as an exercise to figure this out. This post would simply become too long to cover this. This is merely an introduction to Gearman.

As a final note, I do realize we have Storable for serialization, but somehow it got borked on my Strawberry Perl installation.

Other

There’s also a Plack/PSGI script available to retrieve Gearman statistics. It’s available on GitHub. I haven’t tried it so I don’t know if it’s even compatible with the Perl server implementation. But I thought it was worth to mention it.

License

All code supplied here is licensed under the MIT license.

1 thought on “Distributed jobs with Gearman”

Comments are closed.

Scroll to Top