use strict;
use warnings;
use File::Basename;
use Cwd;
use Getopt::Long;
my $qsub_opts = "";
my $sync = 0;
my $num_threads = 1;
my $gpu = 0;
my $config = "conf/queue.conf";
my %cli_options = ();
my $jobname;
my $jobstart;
my $jobend;
my $array_job = 0;
my $sge_job_id;
sub print_usage() {
print STDERR
"Usage: queue.pl [options] [JOB=1:n] log-file command-line arguments...\n" .
"e.g.: queue.pl foo.log echo baz\n" .
" (which will echo \"baz\", with stdout and stderr directed to foo.log)\n" .
"or: queue.pl -q all.q\@xyz foo.log echo bar \| sed s/bar/baz/ \n" .
" (which is an example of using a pipe; you can provide other escaped bash constructs)\n" .
"or: queue.pl -q all.q\@qyz JOB=1:10 foo.JOB.log echo JOB \n" .
" (which illustrates the mechanism to submit parallel jobs; note, you can use \n" .
" another string other than JOB)\n" .
"Note: if you pass the \"-sync y\" option to qsub, this script will take note\n" .
"and change its behavior. Otherwise it uses qstat to work out when the job finished\n" .
"Options:\n" .
" --config <config-file> (default: $config)\n" .
" --mem <mem-requirement> (e.g. --mem 2G, --mem 500M, \n" .
" also support K and numbers mean bytes)\n" .
" --num-threads <num-threads> (default: $num_threads)\n" .
" --max-jobs-run <num-jobs>\n" .
" --gpu <0|1> (default: $gpu)\n";
exit 1;
}
sub caught_signal {
if ( defined $sge_job_id ) {
my $signal = $!;
system ("qdel $sge_job_id");
print STDERR "Caught a signal: $signal , deleting SGE task: $sge_job_id and exiting\n";
exit(2);
}
}
if (@ARGV < 2) {
print_usage();
}
for (my $x = 1; $x <= 2; $x++) {
while (@ARGV >= 2 && $ARGV[0] =~ m:^-:) {
my $switch = shift @ARGV;
if ($switch eq "-V") {
$qsub_opts .= "-V ";
} else {
my $argument = shift @ARGV;
if ($argument =~ m/^--/) {
print STDERR "WARNING: suspicious argument '$argument' to $switch; starts with '-'\n";
}
if ($switch eq "-sync" && $argument =~ m/^[yY]/) {
$sync = 1;
$qsub_opts .= "$switch $argument ";
} elsif ($switch eq "-pe") {
my $argument2 = shift @ARGV;
$qsub_opts .= "$switch $argument $argument2 ";
$num_threads = $argument2;
} elsif ($switch =~ m/^--/) {
$switch =~ s/^--//;
$switch =~ s/-/_/g;
$cli_options{$switch} = $argument;
} else {
$qsub_opts .= "$switch $argument ";
}
}
}
if ($ARGV[0] =~ m/^([\w_][\w\d_]*)+=(\d+):(\d+)$/) {
$array_job = 1;
$jobname = $1;
$jobstart = $2;
$jobend = $3;
shift;
if ($jobstart > $jobend) {
die "queue.pl: invalid job range $ARGV[0]";
}
if ($jobstart <= 0) {
die "run.pl: invalid job range $ARGV[0], start must be strictly positive (this is a GridEngine limitation).";
}
} elsif ($ARGV[0] =~ m/^([\w_][\w\d_]*)+=(\d+)$/) {
$array_job = 1;
$jobname = $1;
$jobstart = $2;
$jobend = $2;
shift;
} elsif ($ARGV[0] =~ m/.+\=.*\:.*$/) {
print STDERR "queue.pl: Warning: suspicious first argument to queue.pl: $ARGV[0]\n";
}
}
if (@ARGV < 2) {
print_usage();
}
if (exists $cli_options{"config"}) {
$config = $cli_options{"config"};
}
my $default_config_file = <<'EOF';
command qsub -v PATH -cwd -S /bin/bash -j y -l arch=*64*
option mem=* -l mem_free=$0,ram_free=$0
option mem=0
option num_threads=* -pe smp $0
option num_threads=1
option max_jobs_run=* -tc $0
default gpu=0
option gpu=0
option gpu=* -l gpu=$0 -q '*.q'
EOF
$SIG{INT} = \&caught_signal;
$SIG{TERM} = \&caught_signal;
my $opened_config_file = 1;
open CONFIG, "<$config" or $opened_config_file = 0;
my %cli_config_options = ();
my %cli_default_options = ();
if ($opened_config_file == 0 && exists($cli_options{"config"})) {
print STDERR "Could not open config file $config\n";
exit(1);
} elsif ($opened_config_file == 0 && !exists($cli_options{"config"})) {
open (CONFIG, "echo '$default_config_file' |") or die "Unable to open pipe\n";
$config = "Default config";
}
my $qsub_cmd = "";
my $read_command = 0;
while(<CONFIG>) {
chomp;
my $line = $_;
$_ =~ s/\s*#.*//g;
if ($_ eq "") { next; }
if ($_ =~ /^command (.+)/) {
$read_command = 1;
$qsub_cmd = $1 . " ";
} elsif ($_ =~ m/^option ([^=]+)=\* (.+)$/) {
my $option = $1;
my $arg= $2;
if ($arg !~ m:\$0:) {
die "Unable to parse line '$line' in config file ($config)\n";
}
if (exists $cli_options{$option}) {
$arg =~ s/\$0/$cli_options{$option}/g;
$cli_config_options{$option} = $arg;
}
} elsif ($_ =~ m/^option ([^=]+)=(\S+)\s?(.*)$/) {
my $option = $1;
my $value = $2;
my $arg = $3;
if (exists $cli_options{$option}) {
$cli_default_options{($option,$value)} = $arg;
}
} elsif ($_ =~ m/^default (\S+)=(\S+)/) {
my $option = $1;
my $value = $2;
if (!exists $cli_options{$option}) {
$cli_options{$option} = $value;
}
} else {
print STDERR "queue.pl: unable to parse line '$line' in config file ($config)\n";
exit(1);
}
}
close(CONFIG);
if ($read_command != 1) {
print STDERR "queue.pl: config file ($config) does not contain the line \"command .*\"\n";
exit(1);
}
for my $option (keys %cli_options) {
if ($option eq "config") { next; }
if ($option eq "max_jobs_run" && $array_job != 1) { next; }
my $value = $cli_options{$option};
if (exists $cli_default_options{($option,$value)}) {
$qsub_opts .= "$cli_default_options{($option,$value)} ";
} elsif (exists $cli_config_options{$option}) {
$qsub_opts .= "$cli_config_options{$option} ";
} else {
if ($opened_config_file == 0) { $config = "default config file"; }
die "queue.pl: Command line option $option not described in $config (or value '$value' not allowed)\n";
}
}
my $cwd = getcwd();
my $logfile = shift @ARGV;
if ($array_job == 1 && $logfile !~ m/$jobname/
&& $jobend > $jobstart) {
print STDERR "queue.pl: you are trying to run a parallel job but "
. "you are putting the output into just one log file ($logfile)\n";
exit(1);
}
my $cmd = "";
foreach my $x (@ARGV) {
if ($x =~ m/^\S+$/) { $cmd .= $x . " "; }
elsif ($x =~ m:\":) { $cmd .= "'$x' "; } # else if no dbl-quotes, use single
else { $cmd .= "\"$x\" "; }
}
my $dir = dirname($logfile);
my $base = basename($logfile);
my $qdir = "$dir/q";
$qdir =~ s:/(log|LOG)/*q:/q:; # If qdir ends in .../log/q, make it just .../q.
my $queue_logfile = "$qdir/$base";
if (!-d $dir) { system "mkdir -p $dir 2>/dev/null"; }
if (!-d $dir) { die "Cannot make the directory $dir\n"; }
if (! -d "$qdir/sync") {
system "mkdir -p $qdir/sync 2>/dev/null";
sleep(5);
}
my $queue_array_opt = "";
if ($array_job == 1) {
$queue_array_opt = "-t $jobstart:$jobend";
$logfile =~ s/$jobname/\$SGE_TASK_ID/g;
$cmd =~ s/$jobname/\$\{SGE_TASK_ID\}/g;
$queue_logfile =~ s/\.?$jobname//;
}
my $queue_scriptfile = $queue_logfile;
($queue_scriptfile =~ s/\.[a-zA-Z]{1,5}$/.sh/) || ($queue_scriptfile .= ".sh");
if ($queue_scriptfile !~ m:^/:) {
$queue_scriptfile = $cwd . "/" . $queue_scriptfile; # just in case.
}
# We'll write to the standard input of "qsub" (the file-handle Q),
# the job that we want it to execute.
# Also keep our current PATH around, just in case there was something
# in it that we need (although we also source ./path.sh)
my $syncfile = "$qdir/sync/done.$$";
unlink($queue_logfile, $syncfile);
#
# Write to the script file, and then close it.
#
open(Q, ">$queue_scriptfile") || die "Failed to write to $queue_scriptfile";
print Q "
print Q "cd $cwd\n";
print Q ". ./path.sh\n";
print Q "( echo '#' Running on \`hostname\`\n";
print Q " echo '#' Started at \`date\`\n";
print Q " echo -n '# '; cat <<EOF\n";
print Q "$cmd\n";
print Q "EOF\n";
print Q ") >$logfile\n";
print Q "time1=\`date +\"%s\"\`\n";
print Q " ( $cmd ) 2>>$logfile >>$logfile\n";
print Q "ret=\$?\n";
print Q "time2=\`date +\"%s\"\`\n";
print Q "echo '#' Accounting: time=\$((\$time2-\$time1)) threads=$num_threads >>$logfile\n";
print Q "echo '#' Finished at \`date\` with status \$ret >>$logfile\n";
print Q "[ \$ret -eq 137 ] && exit 100;\n";
if ($array_job == 0) {
print Q "touch $syncfile\n";
} else {
print Q "touch $syncfile.\$SGE_TASK_ID\n";
}
print Q "exit \$[\$ret ? 1 : 0]\n";
print Q "## submitted with:\n";
$qsub_cmd .= "-o $queue_logfile $qsub_opts $queue_array_opt $queue_scriptfile >>$queue_logfile 2>&1";
print Q "# $qsub_cmd\n";
if (!close(Q)) {
die "Failed to close the script file (full disk?)";
}
chmod 0755, $queue_scriptfile;
for (my $try = 1; $try < 5; $try++) {
my $ret = system ($qsub_cmd);
if ($ret != 0) {
if ($sync && $ret == 256) {
if (defined $jobname) {
$logfile =~ s/\$SGE_TASK_ID/*/g;
}
print STDERR "queue.pl: job writing to $logfile failed\n";
exit(1);
} else {
print STDERR "queue.pl: Error submitting jobs to queue (return status was $ret)\n";
print STDERR "queue log file is $queue_logfile, command was $qsub_cmd\n";
my $err = `tail $queue_logfile`;
print STDERR "Output of qsub was: $err\n";
if ($err =~ m/gdi request/ || $err =~ m/qmaster/) {
my $waitfor = 20;
print STDERR "queue.pl: It looks like the queue master may be inaccessible. " .
" Trying again after $waitfor seconts\n";
sleep($waitfor);
} else {
exit(1);
}
}
} else {
last;
}
}
if (! $sync) {
my @syncfiles = ();
if (!defined $jobname) {
push @syncfiles, $syncfile;
} else {
for (my $jobid = $jobstart; $jobid <= $jobend; $jobid++) {
push @syncfiles, "$syncfile.$jobid";
}
}
{
open(L, "<$queue_logfile") || die "Error opening log file $queue_logfile";
undef $sge_job_id;
while (<L>) {
if (m/Your job\S* (\d+)[. ].+ has been submitted/) {
if (defined $sge_job_id) {
die "Error: your job was submitted more than once (see $queue_logfile)";
} else {
$sge_job_id = $1;
}
}
}
close(L);
if (!defined $sge_job_id) {
die "Error: log file $queue_logfile does not specify the SGE job-id.";
}
}
my $check_sge_job_ctr=1;
my $wait = 0.1;
my $counter = 0;
foreach my $f (@syncfiles) {
while (! -f $f) {
sleep($wait);
$wait *= 1.2;
if ($wait > 3.0) {
$wait = 3.0;
if (rand() < 0.25) {
if (rand() > 0.5) {
system("touch $qdir/sync/.kick");
} else {
unlink("$qdir/sync/.kick");
}
}
if ($counter++ % 10 == 0) {
system("ls $qdir/sync >/dev/null");
}
}
if (($check_sge_job_ctr < 100 && ($check_sge_job_ctr++ % 10) == 0) ||
($check_sge_job_ctr >= 100 && ($check_sge_job_ctr++ % 50) == 0)) {
if ( -f $f ) { next; }
my $output = `qstat -j $sge_job_id 2>&1`;
my $ret = $?;
if ($ret >> 8 == 1 && $output !~ m/qmaster/ &&
$output !~ m/gdi request/) {
sleep(3);
system("touch $qdir/sync/.kick");
unlink("$qdir/sync/.kick");
if ( -f $f ) { next; }
sleep(7);
system("touch $qdir/sync/.kick");
sleep(1);
unlink("qdir/sync/.kick");
if ( -f $f ) { next; }
sleep(60);
system("touch $qdir/sync/.kick");
sleep(1);
unlink("$qdir/sync/.kick");
if ( -f $f ) { next; }
$f =~ m/\.(\d+)$/ || die "Bad sync-file name $f";
my $job_id = $1;
if (defined $jobname) {
$logfile =~ s/\$SGE_TASK_ID/$job_id/g;
}
my $last_line = `tail -n 1 $logfile`;
if ($last_line =~ m/status 0$/ && (-M $logfile) < 0) {
print STDERR "**queue.pl: syncfile $f was not created but job seems\n" .
"**to have finished OK. Probably your file-system has problems.\n" .
"**This is just a warning.\n";
last;
} else {
chop $last_line;
print STDERR "queue.pl: Error, unfinished job no " .
"longer exists, log is in $logfile, last line is '$last_line', " .
"syncfile is $f, return status of qstat was $ret\n" .
"Possible reasons: a) Exceeded time limit? -> Use more jobs!" .
" b) Shutdown/Frozen machine? -> Run again! Qmaster output " .
"was: $output\n";
exit(1);
}
} elsif ($ret != 0) {
print STDERR "queue.pl: Warning: qstat command returned status $ret (qstat -j $sge_job_id,$!)\n";
print STDERR "queue.pl: output was: $output";
}
}
}
}
unlink(@syncfiles);
}
my @logfiles = ();
if (!defined $jobname) {
push @logfiles, $logfile;
} else {
for (my $jobid = $jobstart; $jobid <= $jobend; $jobid++) {
my $l = $logfile;
$l =~ s/\$SGE_TASK_ID/$jobid/g;
push @logfiles, $l;
}
}
my $num_failed = 0;
my $status = 1;
foreach my $l (@logfiles) {
my @wait_times = (0.1, 0.2, 0.2, 0.3, 0.5, 0.5, 1.0, 2.0, 5.0, 5.0, 5.0, 10.0, 25.0);
for (my $iter = 0; $iter <= @wait_times; $iter++) {
my $line = `tail -10 $l 2>/dev/null`;
if ($line =~ m/with status (\d+)/) {
$status = $1;
last;
} else {
if ($iter < @wait_times) {
sleep($wait_times[$iter]);
} else {
if (! -f $l) {
print STDERR "Log-file $l does not exist.\n";
} else {
print STDERR "The last line of log-file $l does not seem to indicate the "
. "return status as expected\n";
}
exit(1);
}
}
}
if ($status != 0) { $num_failed++; }
}
if ($num_failed == 0) { exit(0); }
else {
if (@logfiles == 1) {
if (defined $jobname) { $logfile =~ s/\$SGE_TASK_ID/$jobstart/g; }
print STDERR "queue.pl: job failed with status $status, log is in $logfile\n";
if ($logfile =~ m/JOB/) {
print STDERR "queue.pl: probably you forgot to put JOB=1:\$nj in your script.\n";
}
} else {
if (defined $jobname) { $logfile =~ s/\$SGE_TASK_ID/*/g; }
my $numjobs = 1 + $jobend - $jobstart;
print STDERR "queue.pl: $num_failed / $numjobs failed, log is in $logfile\n";
}
exit(1);
}