15 de mayo de 2013

Multicore con threads, threads::shared y Thread::Queue para controlar la RAM utilizada

Multicore con threads, threads::shared y Thread::Queue para controlar la RAM utilizada


Hace unos dias tanto Alvaro como Bruno publicaron unos blogs incluyendo scripts, tanto en Perl como en Python, en los cuales discutían sobre el uso y los problemas operativos al utilizar los modulos threads y threads::shared. Basicamente el problema principal es que, a veces es imposible utilizarlos ya que los problemas de duplicación de memoria hace que el programador no se pueda beneficiar de utilizar todos los procesadores de un ordenador, ya que se ve limitado por la cantidad de RAM que tiene disponible. Esa es una de las premisas que comentaba Bruno en su post. Yo me he encontrado multiples veces con este problema, por ejemplo procesando ficheros de genomas de Uniprot, que pueden tener ~ 30GB en sus casos mas extremos. En estos casos, al utilizar los modulos threads y threads::shared, normalmente lo que ocurre es que la memoria se va llenando mientras avanza el trabajo de las threads, y como resultado final el sistema operativo termina matando al proceso porque ocupa toda la RAM. He encontrado, mezclando código de varias personas, una forma de resolver este asunto utilizando estos modulos, y sin depender de la RAM disponible, combinando el uso de threads y threads::shared con Thread::Queue. De hecho, lo que se hace es generar una cola (en este caso una FIFO) en la cual un número predeterminado de 'worker threads' vayan sacando trabajo mientras vayan terminando. La idea es simultanear los procesos de llenado de la cola y trabajo, lo cual tambien permite controlar el tamaño de la cola, y por lo tanto la memoria utilizada. La idea sería, que esta aproximación podría combinarse con cualquier tarea que se necesite hacer, de hecho el código que compartiré a continuación, se podría adaptar fácilmente al compartido por Bruno para multi Blast.
Básicamente el programa a continuación hace lo que comentaba arriba, leer un fichero en formato Swissprot, y escanear las secuencias buscando un bias de glutamina y asparagina. El tamaño de la cola y por tanto la memoria utilizada se fija con la constante (Q_limit), en este caso definida para guardar 10000 lineas, aunque esto se puede modificar a conveniencia. Luego se generan 8 'worker threads', definidos por la constante (CORES), que van sacando trabajo mientras se llena la FIFO. Luego las 'worker threads' devuelven predicciones de secuencias con este tipo de bias y el programa principal las imprime en un fichero texto. Basado en este ejemplo, es muy facil adaptar esto a cualquier tarea que implique escanear secuencias en la búsqueda de cualquier propiedad, en ficheros enormes. Por ejemplo, un programa similar a este procesa en aproximadamente 2 horas un fichero de 35 GB con 64 CPUs y una utilización máxima de RAM de 5 GB.
Bueno aqui va el código, y espero sea útil la idea:
#!/usr/bin/perl -w

#Loading all the libraries to run in a multithreading environment
use warnings;
use threads;
use threads::shared;
use Thread::Queue;

my %aa_scores = (
   'A' => -0.149,
   'C' => -1.000,
   'D' => -0.395,
   'E' => -0.726,
   'F' => -0.125,
   'G' => 0.010,
   'H' => -0.034,
   'I' => -0.397,
   'K' => -0.496,
   'L' => -0.408,
   'M' => 0.044,
   'N' => 0.659,
   'P' => 0.059,
   'Q' => 0.536,
   'R' => -0.314,
   'S' => 0.192,
   'T' => -0.070,
   'V' => -0.450,
   'W' => -0.908,
   'Y' => 0.206,
   'O' => 0,
   'U' => 0,
   'B' => 0,
   'Z' => 0,
   'X' => 0,
);

use constant WINDOW => 60;
use constant CUTOFF => 12;
use constant CORES => 8; #Set the number of cores to use
use constant Q_limit => 10000; #Set the number of elements in the Queue

my $Q = new Thread::Queue (); #Initializing the Queue obect
my @threads;

#Loading the Swissprot file from command line argument
my $dat_file = shift;
$/ = "//\n"; #Reading the multiple lines of a Swissprot entry in a single line. The (//) line corresponds to the entry end makr 
#As this huge files are ussually distributed compressed here I pipe with zcat to avoid decompresing prior to processing
open (DAT_FILE, "zcat $dat_file |") or die "Sequence file couldn't be opened for reading\n";

#First generating the Queue
my $builder = threads->create (\&Queue_generator);

#And then setting up the worker threads
push @threads, threads->create (\&parse_swissprot) for (1 .. CORES);

#Wait until all the threads finish their work
$builder->join;

#Now that the work is done, collect the returned info extracted by the worker threads
#It is neccesary to fill a hash with all the partial results to be able to print the
#results with the scanning of the complete file in an organized fashion
my %domains;
foreach my $thread (@threads)
{
   my %partial_results = %{$thread->join};
 
   while (my ($organism, $predictions_ref) = each (%partial_results))
   {
      my @predictions;
      while (my ($seq_id, $info_ref) = each (%{$predictions_ref}))
      {
         my $score = $info_ref->{'Score'};
         my $domain = $info_ref->{'Seq'};
         my $window_pos = $info_ref->{'Window'};
   
         my $protein_id = $1 if ($seq_id =~ m/^>(\w+);/);
         my $protein_ac = $1 if ($seq_id =~ m/^>$protein_id;\s(\w+);\s/);
   
         push @predictions, "\t$protein_id|$protein_ac\tWindow Position=$window_pos; Score=$score | Predicted Domain: $domain\n";
      }
      push @{$domains{$organism}}, @predictions;#Loading all the predictions returned by each thread for a given organism
   }
}

  
#And finally printing the predictions of domains with a specific amino acid bias in the complete file
#organized by organisms
open (OUTFILE, ">predictions.txt");
while (my ($organism, $predictions_ref) = each (%domains))
{
   my @predictions = @{$predictions_ref};
 
   my $total_predictions = scalar (@predictions);
   print OUTFILE ">$organism: Total=$total_predictions\n@predictions\n";
}
close (OUTFILE);
close (DAT_FILE);

#Here I construct the Queue. The idea is that while the generator is feeding the queue other working threads
#are retrieving (emptying) from the queue and the memory used can be controlled during execution
sub Queue_generator
{
   while (my $line = <DAT_FILE>)
   {
      chomp ($line);
      while (1)
      {
         #The line is only send to processing if the number of items in the queue is less than a fixed size defined by Q_limit
         #thus a line is trapped in this infinite loop until the queue has space to receive it
         if ($Q->pending () < Q_limit) 
         {
            $Q->enqueue ($line);
            last; #Once the line has been loaded in the queue we need to break out of the infinite loop to read the next line from the file
         }
      }
   }
 
   #Once the file is exhausted it is needed to alert the worker threads that the work is over
   #When a thread catch and UNDEF variable it stops working
   $Q->enqueue (undef) for (1 .. CORES);
}

#This is the code of the worker threads
sub parse_swissprot
{
   my %predictions; #The variable to return to the main program
 
   #Here the thread DEQUEUE one line from the queue stack
   while (my $line = $Q->dequeue)
   {
      my @patterns = ();
      #Getting the Entry Name
      my $seq_id = $1 if ($line =~ m/^ID\s+(\w+)\s+/);
      
      # And now getting the Accession Number lines
      @patterns = $line =~ m/^AC\s+(.*)/mg;
      # and finally just getting the primary AC -i.e. the first of all the possible multiple ACs-
      my $seq_ac = $1 if ($patterns[0] =~ m/^(\w+);/);
      
      #And now getting the rest of the Entry info
      @patterns = $line =~ m/^DE\s+(.*)/mg;
      my $description = join ' ', @patterns;
      @patterns = $line =~ m/^SQ\s+(.*)/mg;
      my $seq_info = join ' ', @patterns;
      @patterns = $line =~ m/^OS\s+(.*)/mg;
      my $organism = join ' ', @patterns;
      @patterns = $line =~ m/^\s+(.*)/mg;
      my $seq = join '', @patterns;
      $seq =~ s/\s+//g;
      
      my $id = '>' . $seq_id . '; ' . $seq_ac . '; ' . $description . ' ' . $seq_info . "  [$organism]";
      
      next unless (length ($seq) >= WINDOW);
      my %results;
      
      #Loop to scan the complete sequence of the corresponding entry
      #After scanning the sequence only the highest scoring stretch is returned
      for my $i (0 .. (length ($seq) - WINDOW))
      {
         my $domain = substr ($seq, $i, WINDOW);
         my @domain = split (//, $domain);
   
         my $domain_score = 0;
         foreach my $aa (@domain)
         {
            $domain_score += $aa_scores{$aa};
         }
         $domain_score = sprintf ("%.3f", $domain_score);
   
         #Updating the $results variable to store only the highest scoring stretch
         if (exists ($results{$id}))
         {
            $results{$id} = {
               'Score' => $domain_score,
               'Seq' => $domain,
               'Window' => $i,
               'CompleteSeq' => $seq,
            } if ($domain_score > $results{$id}{'Score'});
         }
         else
         {
            $results{$id} = {
               'Score' => $domain_score,
               'Seq' => $domain,
               'Window' => $i,
               'CompleteSeq' => $seq,
            };
         }
      }
  
      #Loading the entry in the variable returned to the main program only if the highest scoring stretch passes the cutoff
      if ($results{$id}{'Score'} >= CUTOFF)
      {
         $predictions{$organism}{$id} = $results{$id};
      }
      $seq_id = $seq_ac = $description = $seq_info = $organism = $seq = undef;
   }
   return (\%predictions);
}