5 de junio de 2013

FASTQ sort + parallel

Buenas,
recientemente en el laboratorio hemos estado manipulando archivos de pares de secuencias (pair-end reads) en formato FASTQ. Una de las tareas habituales ha sido limpiar los archivos de secuencias de baja calidad, ya sea recortando o eliminando directamente, para luego volver a definir parejas entre las secuencias que superaron el corte. Una estrategia posible para esta tarea es simplemente linearizar las secuencias, de manera que cada una ocupe ahora una sola línea, separando con tabuladores la cabecera, la secuencia, el separador y las calidades. Por ejemplo, la siguiente secuencia:

@SEQ_ID
GATTTGGGGTTCAAAGCAGTA...
+
!''*((((***+))%%%++)(...
 
quedaría así:

@SEQ_ID  GATTTGGGGTTCAAAGCAGTA...   +   !''*((((***+))%%%++)(...

Tras esta transformación ya sí es posible ordenar un archivo FASTQ con GNU sort, que viene instalado en cualquier sistema linux (y se puede instalar en Windows). GNU sort es ideal para ordenar conjuntos de datos que no caben en memoria (M), como a menudo ocurre con los archivos FASTQ, porque de manera implícita divide el problema inicial, de tamaño N, en N/M trozos que luego mezcla (merge) de manera externa.

En nuestra experiencia GNU sort es significativemente más eficiente que nuestros scripts para este tipo de problemas, puesto que ya trae de fábrica toda la lógica para partir el problema en trozos y luego mezclar las soluciones parciales. Sólo hay que tener cuidado de asignar la variable de ambiente LC_ALL, por ejemplo con:

$ export LC_ALL=POSIX

y echar a andar. Muy bien. Pero te quedas con la duda de si estás sacando el máximo partido a tu CPU multicore, podremos optimizar sort en paralelo? Y si invocamos a GNU parallel (mira el vídeo)?

Nos ponemos manos a la obra y hacemos pruebas con un archivo FASTQ linearizado real, de 576Mb:
 
$ ls -lh /tmp/unsortedXXpJ6CaB
-rw-------. 1 576M Jun  5 10:15 /tmp/unsortedXXpJ6CaB
 
Ahora lo ordenamos con GNU sort, dándole un máximo de 500Mb de área en RAM para trabajar (la M de antes):
 
$ time sort -k 1,1 -u -S 500M /tmp/unsortedXXpJ6CaB > /tmp/unsortedXXpJ6CaB.S
real 0m7.628s
user 0m5.143s
sys 0m2.373s

Finalmente probamos ahora con parallel, agrupando las secuencias en grupos de 100.000 elementos (ojo con esto, puedes llegar a obtener resultados parcialmente desordenados porque la segunda llamada a parallel puede recibir más argumentos de los que el shell soporta). Cambiando este valor a 10000 o a 10E6 los resultados son similares:

$ time cat /tmp/unsortedXXpJ6CaB | parallel -N 100000 --pipe --files sort -k 1,1 | 
   parallel -Xj1 sort -k 1,1 -u -m {} ';' rm {} > /tmp/unsortedXXpJ6CaB.P
real 0m15.451s
user 0m9.919s
sys 0m8.371s

Comprobamos que los resultados son idénticos:
 
$ diff /tmp/unsortedXXpJ6CaB.S /tmp/unsortedXXpJ6CaB.P

Conclusión de estas pruebas: no vale la pena complicarse con parallel para ordenar grandes archivos FASTQ, ya que probablemente el cuello de botella sea el merge final, y eso parece resolverlo mejor directamente GNU sort. De todos modos es posible que haya otras maneras de invocar a parallel más ventajosas
Si en vuestras pruebas obtenéis resultados distintos por favor escribid,
un saludo,
Bruno



15 de mayo de 2013

Multicore con threads, threads::shared y Thread::Queue para controlar la RAM utilizada

Multicore con threads, threads::shared y Thread::Queue para controlar la RAM utilizada


Hace unos dias tanto Alvaro como Bruno publicaron unos blogs incluyendo scripts, tanto en Perl como en Python, en los cuales discutían sobre el uso y los problemas operativos al utilizar los modulos threads y threads::shared. Basicamente el problema principal es que, a veces es imposible utilizarlos ya que los problemas de duplicación de memoria hace que el programador no se pueda beneficiar de utilizar todos los procesadores de un ordenador, ya que se ve limitado por la cantidad de RAM que tiene disponible. Esa es una de las premisas que comentaba Bruno en su post. Yo me he encontrado multiples veces con este problema, por ejemplo procesando ficheros de genomas de Uniprot, que pueden tener ~ 30GB en sus casos mas extremos. En estos casos, al utilizar los modulos threads y threads::shared, normalmente lo que ocurre es que la memoria se va llenando mientras avanza el trabajo de las threads, y como resultado final el sistema operativo termina matando al proceso porque ocupa toda la RAM. He encontrado, mezclando código de varias personas, una forma de resolver este asunto utilizando estos modulos, y sin depender de la RAM disponible, combinando el uso de threads y threads::shared con Thread::Queue. De hecho, lo que se hace es generar una cola (en este caso una FIFO) en la cual un número predeterminado de 'worker threads' vayan sacando trabajo mientras vayan terminando. La idea es simultanear los procesos de llenado de la cola y trabajo, lo cual tambien permite controlar el tamaño de la cola, y por lo tanto la memoria utilizada. La idea sería, que esta aproximación podría combinarse con cualquier tarea que se necesite hacer, de hecho el código que compartiré a continuación, se podría adaptar fácilmente al compartido por Bruno para multi Blast.
Básicamente el programa a continuación hace lo que comentaba arriba, leer un fichero en formato Swissprot, y escanear las secuencias buscando un bias de glutamina y asparagina. El tamaño de la cola y por tanto la memoria utilizada se fija con la constante (Q_limit), en este caso definida para guardar 10000 lineas, aunque esto se puede modificar a conveniencia. Luego se generan 8 'worker threads', definidos por la constante (CORES), que van sacando trabajo mientras se llena la FIFO. Luego las 'worker threads' devuelven predicciones de secuencias con este tipo de bias y el programa principal las imprime en un fichero texto. Basado en este ejemplo, es muy facil adaptar esto a cualquier tarea que implique escanear secuencias en la búsqueda de cualquier propiedad, en ficheros enormes. Por ejemplo, un programa similar a este procesa en aproximadamente 2 horas un fichero de 35 GB con 64 CPUs y una utilización máxima de RAM de 5 GB.
Bueno aqui va el código, y espero sea útil la idea:
#!/usr/bin/perl -w

#Loading all the libraries to run in a multithreading environment
use warnings;
use threads;
use threads::shared;
use Thread::Queue;

my %aa_scores = (
   'A' => -0.149,
   'C' => -1.000,
   'D' => -0.395,
   'E' => -0.726,
   'F' => -0.125,
   'G' => 0.010,
   'H' => -0.034,
   'I' => -0.397,
   'K' => -0.496,
   'L' => -0.408,
   'M' => 0.044,
   'N' => 0.659,
   'P' => 0.059,
   'Q' => 0.536,
   'R' => -0.314,
   'S' => 0.192,
   'T' => -0.070,
   'V' => -0.450,
   'W' => -0.908,
   'Y' => 0.206,
   'O' => 0,
   'U' => 0,
   'B' => 0,
   'Z' => 0,
   'X' => 0,
);

use constant WINDOW => 60;
use constant CUTOFF => 12;
use constant CORES => 8; #Set the number of cores to use
use constant Q_limit => 10000; #Set the number of elements in the Queue

my $Q = new Thread::Queue (); #Initializing the Queue obect
my @threads;

#Loading the Swissprot file from command line argument
my $dat_file = shift;
$/ = "//\n"; #Reading the multiple lines of a Swissprot entry in a single line. The (//) line corresponds to the entry end makr 
#As this huge files are ussually distributed compressed here I pipe with zcat to avoid decompresing prior to processing
open (DAT_FILE, "zcat $dat_file |") or die "Sequence file couldn't be opened for reading\n";

#First generating the Queue
my $builder = threads->create (\&Queue_generator);

#And then setting up the worker threads
push @threads, threads->create (\&parse_swissprot) for (1 .. CORES);

#Wait until all the threads finish their work
$builder->join;

#Now that the work is done, collect the returned info extracted by the worker threads
#It is neccesary to fill a hash with all the partial results to be able to print the
#results with the scanning of the complete file in an organized fashion
my %domains;
foreach my $thread (@threads)
{
   my %partial_results = %{$thread->join};
 
   while (my ($organism, $predictions_ref) = each (%partial_results))
   {
      my @predictions;
      while (my ($seq_id, $info_ref) = each (%{$predictions_ref}))
      {
         my $score = $info_ref->{'Score'};
         my $domain = $info_ref->{'Seq'};
         my $window_pos = $info_ref->{'Window'};
   
         my $protein_id = $1 if ($seq_id =~ m/^>(\w+);/);
         my $protein_ac = $1 if ($seq_id =~ m/^>$protein_id;\s(\w+);\s/);
   
         push @predictions, "\t$protein_id|$protein_ac\tWindow Position=$window_pos; Score=$score | Predicted Domain: $domain\n";
      }
      push @{$domains{$organism}}, @predictions;#Loading all the predictions returned by each thread for a given organism
   }
}

  
#And finally printing the predictions of domains with a specific amino acid bias in the complete file
#organized by organisms
open (OUTFILE, ">predictions.txt");
while (my ($organism, $predictions_ref) = each (%domains))
{
   my @predictions = @{$predictions_ref};
 
   my $total_predictions = scalar (@predictions);
   print OUTFILE ">$organism: Total=$total_predictions\n@predictions\n";
}
close (OUTFILE);
close (DAT_FILE);

#Here I construct the Queue. The idea is that while the generator is feeding the queue other working threads
#are retrieving (emptying) from the queue and the memory used can be controlled during execution
sub Queue_generator
{
   while (my $line = <DAT_FILE>)
   {
      chomp ($line);
      while (1)
      {
         #The line is only send to processing if the number of items in the queue is less than a fixed size defined by Q_limit
         #thus a line is trapped in this infinite loop until the queue has space to receive it
         if ($Q->pending () < Q_limit) 
         {
            $Q->enqueue ($line);
            last; #Once the line has been loaded in the queue we need to break out of the infinite loop to read the next line from the file
         }
      }
   }
 
   #Once the file is exhausted it is needed to alert the worker threads that the work is over
   #When a thread catch and UNDEF variable it stops working
   $Q->enqueue (undef) for (1 .. CORES);
}

#This is the code of the worker threads
sub parse_swissprot
{
   my %predictions; #The variable to return to the main program
 
   #Here the thread DEQUEUE one line from the queue stack
   while (my $line = $Q->dequeue)
   {
      my @patterns = ();
      #Getting the Entry Name
      my $seq_id = $1 if ($line =~ m/^ID\s+(\w+)\s+/);
      
      # And now getting the Accession Number lines
      @patterns = $line =~ m/^AC\s+(.*)/mg;
      # and finally just getting the primary AC -i.e. the first of all the possible multiple ACs-
      my $seq_ac = $1 if ($patterns[0] =~ m/^(\w+);/);
      
      #And now getting the rest of the Entry info
      @patterns = $line =~ m/^DE\s+(.*)/mg;
      my $description = join ' ', @patterns;
      @patterns = $line =~ m/^SQ\s+(.*)/mg;
      my $seq_info = join ' ', @patterns;
      @patterns = $line =~ m/^OS\s+(.*)/mg;
      my $organism = join ' ', @patterns;
      @patterns = $line =~ m/^\s+(.*)/mg;
      my $seq = join '', @patterns;
      $seq =~ s/\s+//g;
      
      my $id = '>' . $seq_id . '; ' . $seq_ac . '; ' . $description . ' ' . $seq_info . "  [$organism]";
      
      next unless (length ($seq) >= WINDOW);
      my %results;
      
      #Loop to scan the complete sequence of the corresponding entry
      #After scanning the sequence only the highest scoring stretch is returned
      for my $i (0 .. (length ($seq) - WINDOW))
      {
         my $domain = substr ($seq, $i, WINDOW);
         my @domain = split (//, $domain);
   
         my $domain_score = 0;
         foreach my $aa (@domain)
         {
            $domain_score += $aa_scores{$aa};
         }
         $domain_score = sprintf ("%.3f", $domain_score);
   
         #Updating the $results variable to store only the highest scoring stretch
         if (exists ($results{$id}))
         {
            $results{$id} = {
               'Score' => $domain_score,
               'Seq' => $domain,
               'Window' => $i,
               'CompleteSeq' => $seq,
            } if ($domain_score > $results{$id}{'Score'});
         }
         else
         {
            $results{$id} = {
               'Score' => $domain_score,
               'Seq' => $domain,
               'Window' => $i,
               'CompleteSeq' => $seq,
            };
         }
      }
  
      #Loading the entry in the variable returned to the main program only if the highest scoring stretch passes the cutoff
      if ($results{$id}{'Score'} >= CUTOFF)
      {
         $predictions{$organism}{$id} = $results{$id};
      }
      $seq_id = $seq_ac = $description = $seq_info = $organism = $seq = undef;
   }
   return (\%predictions);
}



23 de abril de 2013

Multicore Blast con Python

El otro día Bruno publicó en una entrada del blog un script en Perl para optimizar el tiempo de cálculo de Blast en ordenadores con procesadores multicore. Un gran script, pero creo que se puede hacer más sencillo usando el módulo subprocess de Python.

Bruno usó el módulo Parallel::ForkManager de Perl. en el pasado ya habíamos mostrado en otra entrada las ventajas y peligros de paralelizar procesos con Perl con el módulo threads.

La solución con Python creo que es la más sencilla, consiste en usar el módulo subprocess. Otro módulo de Python que podríamos usar sería multiprocessing, pero no es muy sencillo retornar el standard output de programas externos como Blast para un procesamiento posterior de los resultados.

En el siguiente ejemplo se paraleliza la ejecución del comando 'blastp -version' que retorna un texto con la versión instalada de blastp. Dicho comando se puede cambiar por cualquier otro, así como añadir código al script para procesar los resultados. En el ejemplo se ejecuta el comando 10 veces con 4 procesos simultáneos, chequeando si han terminado cada 2 segundos.

 #!/usr/bin/python  
 # -*- coding: utf-8 -*-   
 #  
   
 # Importar módulos  
 import time  
 import os, sys  
 import subprocess  
 from pprint import pprint  
   
 maximum_process_number = 4 # Número máximo de procesos simultáneos  
 checking_processes_interval = 2 # (segundos) Tiempo de espera para checkear si han terminado los procesos  
 data_to_process = range(10) # Datos a procesar, como ejemplo una lista de números del 0 al 9  
   
 # El proceso a ejecutar (como ejemplo se muestran los datos de la version de Blastp)  
 # Se puede cambiar por cualquier otro programa y comando que no sea Blast  
 def run_process(processes) :  
      print "Running process"  
      processes.append(subprocess.Popen("blastp -version", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE))  
   
 # Programa principal  
 if __name__ == '__main__':  

      # Definir variables  
      count_processes = 0  
      finished_processes = []  
      processes = []  
      results = {}

      # Procesar múltiples datos  
      for i in data_to_process:

           # Chequear si los procesos han terminado (cuando hay tantos procesos como el máximo permitido)  
           while count_processes == maximum_process_number:  
                for p in processes:  
                     p.wait()  
                     if p.returncode == 0:  
                          results[p.pid] = "\n".join(p.stdout)  
                          processes.remove(p)  
                          count_processes -= 1  
                          print "Process finished"  
                     else:   
                          results[p.pid] = "\n".join(p.stdout)  
                          processes.remove(p)  
                          count_processes -= 1  
                          print "Process failed"  
                # Si no han terminado, esperar el tiempo establecido  
                if count_processes == maximum_process_number:  
                     print "Waiting for finished processes"  
                     time.sleep(checking_processes_interval)  

           # Añadir procesos a la cola de ejecución  
           run_process(processes)  
           count_processes += 1  
           print "Process started", i, count_processes  

      # Chequear si los procesos han terminado los últimos procesos  
      while count_processes != 0:  
           for p in processes:  
                p.wait()  
                if p.returncode == 0:  
                     results[p.pid] = "\n".join(p.stdout)  
                     processes.remove(p)  
                     count_processes -= 1  
                     print "Last processes finishing"  
                else:   
                     results[p.pid] = "\n".join(p.stdout)  
                     processes.remove(p)  
                     count_processes -= 1  
                     print "Process failed"  
           if count_processes == maximum_process_number:  
                print count_processes,maximum_process_number  
                print "Waiting for last finished processes"  
                time.sleep(checking_processes_interval) 
 
      # Imprimir resultados  
      print "Output from the processes:"  
      pprint(results)  

9 de abril de 2013

split_blast.pl : real multicore BLAST

Hola,
sin duda BLAST es una de las herramientas esenciales para cualquiera que se dedique a la bioinformática. En una entrada anterior ya comentábamos que actualmente está vigente la rama BLAST+ del software, que ahora mismo va por la versión 2.2.28, con nuevas funcionalidades y una interfaz más fácil de usar. Sin embargo, como comentan algunos usuarios de SEQanswers o Biostar,  las versiones actuales de BLAST no permiten aprovechar al máximo la capacidad de los actuales procesadores multicore, ya que sólo parte de la búsqueda de secuencias similares está paralelizada en el código. Esta es posiblemente una de las razones por las que han sido tan habituales los clusters de cálculo en los laboratorios de bioinformática, que sí permiten hacer en paralelo este tipo de cálculos. Estos son algunos ejemplos de artículos que discuten este tema: http://bioinformatics.oxfordjournals.org/content/19/14/1865 ,
http://bioinformatics.oxfordjournals.org/content/27/2/182 ,
http://www.korilog.com/index.php/KLAST-high-performance-sequence-similarity-search-tool.html .

A lo que iba, ahora mismo cualquiera tiene delante una máquina multicore (en Linux comprueba /proc/cpuinfo) y con ella podremos acelerar significativamente nuestras búsquedas con BLAST si se cumple una condición:

1) la memoria RAM de tu hardware debe superar con creces el tamaño de la base de secuencias que queremos rastrear

Si esta condición se cumple en tu caso, sigue leyendo. El siguiente código Perl, que gestiona procesos con fork por medio del módulo Parallel-ForkManager, te permitirá exprimir tú máquina, partiendo el problema inicial en pedazos de tamaño igual que serán enviados de manera eficiente a procesar a los cores disponibles de tu máquina. En mis pruebas, el tamaño óptimo de pedazo es de 100 secuencias, y el número de cores óptimo es el físico de la máquina en cuestión.


El script se ejecuta modificando el comando nativo de BLAST, por ejemplo con 8 cores y trabajos de 250 secuencias:

$ split_blast.pl 8 250 blastp -query input.faa -db uniprot.fasta -outfmt 6 

Para más detalles guarda el código en un archivo y ejecútalo en el terminal:

 #!/usr/bin/perl -w  
   
 # Mar2013 Bruno Contreras http://www.eead.csic.es/compbio/  
 # Script to take advantage of multicore machines when running BLAST,  
 # which seems to scale up to the number of physical cores in our tests.  
 # Supports old BLAST (blastall) and new BLAST+ binaries.  
 #   
 # The gain in speed is proportional to the number of physical cores allocated,   
 # at the cost of proportionaly increasing the allocated memory. For this reason probably is not  
 # a good idea to use it in searches against huge databases such as nr or nt, unless your RAM allows it.  
 #  
 # Requires module Parallel::ForkManager (http://search.cpan.org/perldoc?Parallel%3A%3AForkManager)  
   
 use strict;  
   
 if(eval{ require Parallel::ForkManager }){ import Parallel::ForkManager; }  
 else  
 {  
    die "\n# Failed to locale required module Parallel::ForkManager\n".  
       "# Please install it by running in your terminal (as a root, preferably with sudo):\n\$ cpan -i Parallel::ForkManager\n\n";  
 }  
   
 my $VERBOSE = 1;  
 my $DEFAULTBATCHSIZE = 100;  
 my $BLASTDBVARNAME = 'BLASTDB';  
   
 # constant as indices to access read_FASTA_file_arrays  
 use constant NAME => 0;  
 use constant SEQ => 1;  
   
 my ($n_of_cores,$batchsize,$raw_command,$command) = (1,$DEFAULTBATCHSIZE,'','');  
 my ($outfileOK,$blastplusOK,$input_seqfile,$output_file,$db_file) = (0,0,'','','');  
 my $start_time;  
   
 if(!$ARGV[2])  
 {  
   print "\nScript to take advantage of multicore machines when running BLAST.\n\n";  
   print "Usage: perl split_blast.pl <number of processors/cores> <batch size> <blast command>\n\n";  
   print "<number of processors/cores> : while 1 is accepted, at least 2 should be requested\n";  
   print "<batch size> : is the number of sequences to be blasted in each batch, $DEFAULTBATCHSIZE works well in our tests\n";  
   print "<blast command> : is the explicit blast command that you would run in your terminal, either BLAST+ or old BLAST\n\n";  
   print "Example: split_blast.pl 8 250 blastp -query input.faa -db uniprot.fasta -outfmt 6 -out out.blast\n\n";  
   print "Please escape any quotes in your BLAST command. For instance:\n\n";  
   print "blastall -p blastp -i input.faa -d nr.fasta -m 8 -F 'm S' -o out.blast\n\n";  
   print "should be escaped like this:\n\n";  
   die "blastall -p blastp -i input.faa -d nr.fasta -m 8 -F \\'m\\ S\\' -o out.blast\n";  
 }  
 else ## parse command-line arguments  
 {  
   $n_of_cores = shift(@ARGV);  
   if($n_of_cores < 0){ $n_of_cores = 1 }  
   
   $batchsize = shift(@ARGV);  
   if($batchsize < 0){ $batchsize = $DEFAULTBATCHSIZE } # optimal in our tests  
   
   $raw_command = join(' ',@ARGV);  
   
   if($raw_command =~ /\-i (\S+)/)  
   {  
     $input_seqfile = $1;  
     if($raw_command =~ /\-o (\S+)/){ $output_file = $1 }  
     if($raw_command =~ /\-d (\S+)/){ $db_file = $1 }  
   }  
   elsif($raw_command =~ /\-query (\S+)/)  
   {  
     $blastplusOK = 1;  
     $input_seqfile = $1;  
     if($raw_command =~ /\-out (\S+)/){ $output_file = $1 }  
     if($raw_command =~ /\-db (\S+)/){ $db_file = $1 }  
   }  
   else{ die "# ERROR: BLAST command must include an input file [-i,-query]\n" }  
   
   if(!-s $input_seqfile){ die "# ERROR: cannot find input file $input_seqfile\n" }  
   elsif(!-s $db_file &&  
     ($ENV{$BLASTDBVARNAME} && !-s $ENV{$BLASTDBVARNAME}.'/'.$db_file) )  
   {  
     die "# ERROR: cannot find BLAST database file $db_file\n"  
   }  
   
   # remove BLAST threads commands  
   $command = $raw_command;  
   $command =~ s/\-num_threads \d+//;  
   $command =~ s/\-a \d+//;  
   
   if(!$output_file)  
   {  
    import File::Temp qw/tempfile/;  
     my ($fh, $filename) = tempfile();  
     $output_file = $filename;  
     if($blastplusOK){ $command .= " -out $output_file " }  
     else{ $command .= " -o $output_file " } #print "$command\n";  
   }  
   else{ $outfileOK = 1 }  
   
   print "# parameters: max number of processors=$n_of_cores ".  
    "batchsize=$batchsize \$VERBOSE=$VERBOSE\n";  
   print "# raw BLAST command: $raw_command\n\n";  
 }  
   
 if($outfileOK)  
 {   
    use Benchmark;     
    $start_time = new Benchmark();   
 }  
   
   
 ## parse sequences  
 my $fasta_ref = read_FASTA_file_array($input_seqfile);  
 if(!@$fasta_ref)  
 {  
   die "# ERROR: could not extract sequences from file $input_seqfile\n";  
 }  
   
 ## split input in batches of max $BLASTBATCHSIZE sequences  
 my ($batch,$batch_command,$fasta_file,$blast_file,@batches,@tmpfiles);  
 my $lastseq = scalar(@$fasta_ref)-1;  
 my $total_seqs_batch = $batch = 0;  
   
 $fasta_file = $input_seqfile . $batch;  
 $blast_file = $input_seqfile . $batch . '.blast';  
 $batch_command = $command;  
 if($batch_command =~ /\-i (\S+)/){ $batch_command =~ s/-i \Q$1\E/-i $fasta_file/ }  
 if($batch_command =~ /\-query (\S+)/){ $batch_command =~ s/-query \Q$1\E/-query $fasta_file/ }  
 $batch_command =~ s/$output_file/$blast_file/;  
 push(@batches,$batch_command);  
 push(@tmpfiles,[$fasta_file,$blast_file,$batch_command]);  
   
 open(BATCH,">$fasta_file") || die "# EXIT : cannot create batch file $fasta_file : $!\n";  
   
 foreach my $seq (0 .. $#{$fasta_ref})  
 {  
   $total_seqs_batch++;  
   print BATCH ">$fasta_ref->[$seq][NAME]\n$fasta_ref->[$seq][SEQ]\n";  
   if($seq == $lastseq || ($batchsize && $total_seqs_batch == $batchsize))  
   {  
     close(BATCH);  
   
     if($seq < $lastseq) # take care of remaining sequences/batches  
     {  
       $total_seqs_batch = 0;  
       $batch++;  
       $fasta_file = $input_seqfile . $batch;  
       $blast_file = $input_seqfile . $batch . '.blast';  
       $batch_command = $command;  
       if($batch_command =~ /\-i (\S+)/){ $batch_command =~ s/-i \Q$1\E/-i $fasta_file/ }  
       if($batch_command =~ /\-query (\S+)/){ $batch_command =~ s/-query \Q$1\E/-query $fasta_file/ }  
       $batch_command =~ s/$output_file/$blast_file/;  
       push(@batches,$batch_command);  
       push(@tmpfiles,[$fasta_file,$blast_file,$batch_command]);  
   
       open(BATCH,">$fasta_file") ||  
        die "# ERROR : cannot create batch file $fasta_file : $!\n";  
     }  
   }  
 }  
   
 undef($fasta_ref);  
   
 ## create requested number of threads  
 if($batch < $n_of_cores)  
 {  
   $n_of_cores = $batch;  
   print "# WARNING: using only $n_of_cores cores\n";  
 }  
 my $pm = Parallel::ForkManager->new($n_of_cores);  
   
 ## submit batches to allocated threads  
 foreach $batch_command (@batches)  
 {  
   $pm->start($batch_command) and next; # fork  
   
   print "# running $batch_command in child process $$\n" if($VERBOSE);  
   open(BATCHJOB,"$batch_command 2>&1 |");  
   while(<BATCHJOB>)  
   {  
     if(/Error/){ print; last }  
     elsif($VERBOSE){ print }  
   }  
   close(BATCHJOB);  
   
   if($VERBOSE)  
   {  
     printf("# memory footprint of child process %d is %s\n",  
       $$,calc_memory_footprint($$));  
   }  
   
   $pm->finish(); # exit the child process  
 }  
   
 $pm->wait_all_children();  
   
 ## put together BLAST results, no sort needed  
 my $blastOK = 1;  
 unlink($output_file) if(-s $output_file && $outfileOK);  
   
 foreach my $file (@tmpfiles)  
 {  
   ($fasta_file,$blast_file,$batch_command) = @$file;  
   if(!-e $blast_file)  
   {  
     unlink($output_file) if(-e $output_file);  
   
    if($blastOK)  
    {  
        print "# ERROR : did not produced BLAST file $blast_file ,".  
            " probably job failed: $batch_command\n";  
    }  
    $blastOK = 0;  
   }  
   else  
   {  
     print "# adding $blast_file results to $output_file\n" if($VERBOSE);  
     system("cat $blast_file >> $output_file"); # efficient, less portable  
   }  
   unlink($fasta_file,$blast_file);  
 }  
   
 exit if(!$blastOK);  
   
 if(!$outfileOK)  
 {  
   open(OUTBLAST,$output_file) || die "# ERROR : cannot read temporary outfile $output_file : $!\n";  
   while(<OUTBLAST>)  
   {  
     print;  
   }  
   close(OUTBLAST);  
   
   unlink($output_file);  
 }  
 else  
 {  
    my $end_time = new Benchmark();  
    print "\n# runtime: ".timestr(timediff($end_time,$start_time),'all')."\n";  
 }  
   
 if($VERBOSE)  
 {  
   printf("# memory footprint of parent process %d is %s\n",  
     $$,calc_memory_footprint($$));  
 }  
   
 ####################################################  
   
 sub read_FASTA_file_array  
 {  
    # in FASTA format  
    # returns a reference to a 2D array for 4 secondary keys: NAME,SEQ  
    # first valid index (first sequence) is '0'  
   my ( $infile ) = @_;  
   my (@FASTA,$name,$seq,$n_of_sequences);  
   $n_of_sequences = -1;  
   open(FASTA,"<$infile") || die "# read_FASTA_sequence: cannot read $infile $!:\n";  
   while(<FASTA>)  
   {  
     next if(/^$/);  
     next if(/^#/);  
     if(/^\>(.*?)[\n\r]/)  
     {  
       $n_of_sequences++; # first sequence ID is 0  
       $name = $1; #print "# $n_of_sequences $name\n";  
       $FASTA[$n_of_sequences][NAME] = $name;  
     }  
     elsif($n_of_sequences>-1)  
     {  
       $_ =~ s/[\s|\n|\-|\.]//g;  
       $FASTA[$n_of_sequences][SEQ] .= $_; # conserve case  
     }  
   }  
   close(FASTA);  
     
   return \@FASTA;  
 }  
   
 sub calc_memory_footprint  
 {  
   my ($pid) = @_;  
   my $KB = 0;  
   my $ps = `ps -o rss $pid 2>&1`;  
   while($ps =~ /(\d+)/g){ $KB += $1 }  
   return sprintf("%3.1f MB",$KB/1024);  
 }  
   
 
Si prefieres hacerlo en Python prueba estas soluciones:
http://qiime.org/scripts/parallel_blast.html ,
http://www.ruffus.org.uk/examples/bioinformatics/

Hasta luego,
Bruno

Actualización 17/04/2013
Unos días después de compartir este código descubro GNU parallel (http://www.gnu.org/software/parallel), todavía no instalado por defecto en los Linux que tengo a mi alcance, que generaliza y simplifica hasta el extremo la tarea que hacíamos con Perl, con algo como:

$ cat input.faa | parallel --block 100k --recstart '>' --pipe blastp -outfmt 6 -db uniprot.fasta -query - 

Estos dos comandos concatenados por una tubería 1) parten el archivo de secuencias en bloques de 100Kbytes, separando las entradas por medio del carácter '>',  y 2) envían cada bloque a CPUs/cores disponibles en ese momento. Esencialmente es lo mismo que hace el código Perl. En mis pruebas tiene una ganancia en tiempo de cálculo muy similar, con un consumo de memoria también similar.


31 de marzo de 2013

Procesar argumentos con múltiples opciones en un script en Python

En los últimos tiempos estoy vendiendo mi alma al diablo y he empezado a usar Python. Personalmente prefiero Perl, Python me parece un lenguaje más anárquico y desorganizado. Sin embargo, Python está de moda en Bioinformática y muchos de los módulos más actualizados se están escribiendo en este lenguaje.

Hoy voy a hablar del problema que me he encontrado para encontrar un módulo que procese las opciones de línea de comandos que normalmente introducimos en los scripts, por ej.: ./script.py -i archivo1 -o archivo2 Estas opciones suelen ser archivos de entrada, salida, o simplemente parámetros que cambian la ejecución del programa. También sirve para pedir ayuda sobre como usar el script: ./script.py -h 

En Perl existe el módulo Getopt::Long para procesar dichos parámetros de línea de comando de una forma sencilla y robusta.

En Python el módulo equivalente es getopt. A continuación se muestra un ejemplo de uso. Sin embargo este módulo no contempla la posibilidad de procesar múltiples opciones para un mismo argumento, por ejemplo, varios archivos de entrada.

 #!/usr/bin/python  
 # -*- coding: utf-8 -*-   
 """  
      Script que usa el módulo 'getopt' para parsear los argumentos y opciones de línea de comandos  
 """  
 import sys, re, os, copy  
 # Importar el módulo 'getopt'  
 import getopt  
 # Imprimir en pantalla el script y sus argumentos y opciones  
 print 'ARGV:', " ".join(sys.argv[0:]),"\n"  
 # Información de ayuda de uso del script  
 def help() :  
      """Ayuda sobre opciones del script en línea de comandos"""  
      print "usage:",sys.argv[0], "[options]\n"  
      print " -h this message\n"  
      print " -i input file/s\n"  
      print " -o output file\n"  
 # Definir los argumentos obligatorios (:) y opcionales, en sus formas abreviada y completa  
 try:  
      options, args = getopt.getopt(sys.argv[1:], "hi:o:", ["help", "input", "output"])  
 except getopt.GetoptError as err:  
      # Imprimir ayuda y salir si hay algún error o argumento incorrecto  
      print str(err)  
      help()  
      sys.exit(2)  
 output = None  
 verbose = False  
 # Definir el tipo de las variables que guardarán las opciones  
 INP_input = ''  
 INP_output = ''  
 # Asignar las opciones a cada argumento  
 for _opt, _arg in options:  
      if _opt in ("-i", "--input"):  
           INP_input = _arg  
      elif _opt in ("-o", "--output"):  
           INP_output = _arg  
      elif _opt in ("-h", "--help"):  
           help()  
           sys.exit()  
      else:  
           assert False, "unhandled option"  
 # Imprimir en pantalla todo lo anotado  
 print "Script:",sys.argv[0],"\n"  
 print "Argumentos y opciones:\n",  
 for _opt,_arg in options :  
      print " -"+_opt+": "+_arg  

Para solucionar este problema de Python de procesar argumentos de línea de comandos con múltiples opciones he creado mi propio código, puesto que me parece fundamental para cualquier script bioinformático. En el siguiente ejemplo se pueden definir listas para guardar múltiples opciones para un único argumento (ej. varios archivos). El código es provisional y le faltarían muchas opciones que sí tiene el módulo getopt, sin embargo, ofrece una solución al problema y espero ir mejorándolo con el tiempo.

 #!/usr/bin/python  
 # -*- coding: utf-8 -*-   
 """  
      Script que parsea los argumentos y opciones de línea de comandos  
      permitiendo usar listas para guardar argumentos con múltiples opciones  
 """  
 import sys, re, os, copy  
 from types import *  
 # Imprimir en pantalla el script y sus argumentos y opciones  
 print 'ARGV:', " ".join(sys.argv[0:]),"\n"  
 # Información de ayuda de uso del script  
 def help() :  
      """Ayuda sobre opciones del script en línea de comandos"""  
      print "usage:",sys.argv[0], "[options]\n"  
      print " -h this message\n"  
      print " -i input file/s\n"  
      print " -o output file\n"  
      exit()  
 # Definir el tipo de las variables que guardarán las opciones  
 INP_input = []  
 INP_output = ''  
 # Asignar a cada argumento una de las anteriores variables  
 options = {'h': 'help', 'help': 'help', 'i': 'INP_input', 'input': 'INP_input', 'o': 'INP_output', 'output': 'INP_output'}  
 argv_var = str()  
 # Leer los argumentos y opciones  
 for _argv in sys.argv[1:] :  
      # Leer los argumentos que comienzan con guión  
      if re.match("^-", _argv) :  
           for _opt,_var in options.iteritems() :  
                if re.compile("^-%s$" % (_opt)).match(_argv) :  
                     argv_var = copy.deepcopy(_var)  
                     if type(vars()[argv_var]) is FunctionType :  
                          vars()[argv_var]()  
                     break  
      # Anotar las opciones de cada argumento  
      else :  
           if argv_var is not None :  
                if type(vars()[argv_var]) is ListType :  
                     vars()[argv_var].append(_argv)  
                else:  
                     vars()[argv_var] += _argv  
 # Imprimir en pantalla todo lo anotado  
 print "Script:",sys.argv[0],"\n"  
 print "Argumentos y opciones:\n",  
 for _opt,_var in options.iteritems() :  
      if vars()[_var] is not None and type(vars()[_var]) is not FunctionType:  
           #print _opt,pprint(type(vars()[_var]))  
           if type(vars()[_var]) is ListType:  
                print " -"+_opt+": "+str(", ".join(vars()[_var]))  
           else :  
                print " -"+_opt+": "+str(vars()[_var])