Mostrando entradas con la etiqueta procesos. Mostrar todas las entradas
Mostrando entradas con la etiqueta procesos. Mostrar todas las entradas

23 de abril de 2013

Multicore Blast con Python

El otro día Bruno publicó en una entrada del blog un script en Perl para optimizar el tiempo de cálculo de Blast en ordenadores con procesadores multicore. Un gran script, pero creo que se puede hacer más sencillo usando el módulo subprocess de Python.

Bruno usó el módulo Parallel::ForkManager de Perl. en el pasado ya habíamos mostrado en otra entrada las ventajas y peligros de paralelizar procesos con Perl con el módulo threads.

La solución con Python creo que es la más sencilla, consiste en usar el módulo subprocess. Otro módulo de Python que podríamos usar sería multiprocessing, pero no es muy sencillo retornar el standard output de programas externos como Blast para un procesamiento posterior de los resultados.

En el siguiente ejemplo se paraleliza la ejecución del comando 'blastp -version' que retorna un texto con la versión instalada de blastp. Dicho comando se puede cambiar por cualquier otro, así como añadir código al script para procesar los resultados. En el ejemplo se ejecuta el comando 10 veces con 4 procesos simultáneos, chequeando si han terminado cada 2 segundos.

 #!/usr/bin/python  
 # -*- coding: utf-8 -*-   
 #  
   
 # Importar módulos  
 import time  
 import os, sys  
 import subprocess  
 from pprint import pprint  
   
 maximum_process_number = 4 # Número máximo de procesos simultáneos  
 checking_processes_interval = 2 # (segundos) Tiempo de espera para checkear si han terminado los procesos  
 data_to_process = range(10) # Datos a procesar, como ejemplo una lista de números del 0 al 9  
   
 # El proceso a ejecutar (como ejemplo se muestran los datos de la version de Blastp)  
 # Se puede cambiar por cualquier otro programa y comando que no sea Blast  
 def run_process(processes) :  
      print "Running process"  
      processes.append(subprocess.Popen("blastp -version", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE))  
   
 # Programa principal  
 if __name__ == '__main__':  

      # Definir variables  
      count_processes = 0  
      finished_processes = []  
      processes = []  
      results = {}

      # Procesar múltiples datos  
      for i in data_to_process:

           # Chequear si los procesos han terminado (cuando hay tantos procesos como el máximo permitido)  
           while count_processes == maximum_process_number:  
                for p in processes:  
                     p.wait()  
                     if p.returncode == 0:  
                          results[p.pid] = "\n".join(p.stdout)  
                          processes.remove(p)  
                          count_processes -= 1  
                          print "Process finished"  
                     else:   
                          results[p.pid] = "\n".join(p.stdout)  
                          processes.remove(p)  
                          count_processes -= 1  
                          print "Process failed"  
                # Si no han terminado, esperar el tiempo establecido  
                if count_processes == maximum_process_number:  
                     print "Waiting for finished processes"  
                     time.sleep(checking_processes_interval)  

           # Añadir procesos a la cola de ejecución  
           run_process(processes)  
           count_processes += 1  
           print "Process started", i, count_processes  

      # Chequear si los procesos han terminado los últimos procesos  
      while count_processes != 0:  
           for p in processes:  
                p.wait()  
                if p.returncode == 0:  
                     results[p.pid] = "\n".join(p.stdout)  
                     processes.remove(p)  
                     count_processes -= 1  
                     print "Last processes finishing"  
                else:   
                     results[p.pid] = "\n".join(p.stdout)  
                     processes.remove(p)  
                     count_processes -= 1  
                     print "Process failed"  
           if count_processes == maximum_process_number:  
                print count_processes,maximum_process_number  
                print "Waiting for last finished processes"  
                time.sleep(checking_processes_interval) 
 
      # Imprimir resultados  
      print "Output from the processes:"  
      pprint(results)  

4 de mayo de 2011

Paralelizar procesos en perl con threads

Con los modernos ordenadores que tenemos con procesadores de 2, 4, 8 o más núcleos podemos pensar en paralelizar procesos fácilmente en Perl. ¿Qué significa esto? Básicamente que el tiempo que tardará en correr nuestro programa se reducirá proporcionalmente al número de núcleos, si a cada núcleo le mandamos tareas diferentes a realizar (threads)

Pero hay que ser cuidadoso, no todos los programas son adecuados para ser paralelizados. Un programa paralelizable en threads requiere que sus procesos puedan ser independientes y una vez finalizados podamos recoger y unificar sus resultados sin problemas. Además paralelizar significa multiplicar las copias en memoria de las variables, por lo cual sólo podremos paralelizar procesos que no consuman grandes cantidades de memoria o el proceso de copia ralentizará los cálculos.

El código mostrado a continuación realiza una suma de números de tres formas diferentes:
  1. De forma tradicional, con 3 procesos de suma en serie.
  2. Usando 3 threads que calculan simultáneamente.
  3. Usando un bucle con 3 threads que calculan simultáneamente.
La conclusión que podemos obtener de los resultados es que el proceso se realiza casi 3 veces más rápido que de la forma tradicional mediante el método de 3 threads. Pero que cuando se realiza la paralelización para cálculos más sencillos (bucle de threads), no sólo no se mejora el tiempo de cálculo, sino que se ralentiza enormemente el cálculo (5 veces más que el tradicional), esto se debe a que el coste de lanzar los threads no se compensa con el tiempo de cálculo de cada uno.

Resultado:

 Total = 300000000  
 Time taken by the traditional way (3 serial processes) was 26 wallclock secs (26.48 usr 0.00 sys + 0.00 cusr 0.00 csys = 26.48 CPU) seconds  
   
 Total = 300000000  
 Time taken by 3 parallel threads was 9 wallclock secs (25.94 usr 0.00 sys + 0.00 cusr 0.00 csys = 25.94 CPU) seconds  
   
 Total = 300000000  
 Time taken by a loop of 10000 times 3 threads was 104 wallclock secs (103.41 usr 1.01 sys + 0.00 cusr 0.00 csys = 104.42 CPU) seconds  
   

Código:

 use threads;  
 use Benchmark;  
 use Config;  
 $Config{useithreads} or die('Recompile Perl with threads to run this program.');  
   
 # Subroutine to test multithreading  
 sub calc {  
      my ($number) = @_;  
      my $i=0;  
      while ($i<$number) {  
           $i++;  
      }  
      return $i;  
 }  
   
 # Define a number to calculate  
 my $number = 100000000;  
   
 # Start timer  
 my $start = new Benchmark;  
   
 # Run subroutines in the traditional way  
 my $res1 = calc($number);  
 my $res2 = calc($number);  
 my $res3 = calc($number);  
 my $total = $res1 + $res2 + $res3;  
   
 # End timer  
 my $end = new Benchmark;  
   
 # Calculate difference of times  
 my $diff = timediff($end, $start);  
   
 # Print final result  
 print "\nTotal = $total\n";  
   
 # Report benchmark  
 print "Time taken by the traditional way (3 serial processes) was ", timestr($diff, 'all'), " seconds\n\n";  
   
 # Start timer  
 $start = new Benchmark;  
   
 # Create multiple threads running each one the subroutine  
 my $thr1 = threads->create(\&calc, $number);  
 my $thr2 = threads->create(\&calc, $number);  
 my $thr3 = threads->create(\&calc, $number);  
   
 # Check subroutines ending and retrieve results   
 $res1 = $thr1->join();  
 $res2 = $thr2->join();  
 $res3 = $thr3->join();  
 $total = $res1 + $res2 + $res3;  
   
 # End timer  
 $end = new Benchmark;  
   
 # Calculate difference of times  
 $diff = timediff($end, $start);  
   
 # Print final result  
 print "Total = $total\n";  
   
 # Report benchmark  
 print "Time taken by 3 parallel threads was ", timestr($diff, 'all'), " seconds\n\n";  
   
 # Start timer  
 $start = new Benchmark;  
   
 # Divide the process  
 $total = 0;  
 my $divide = 10000;  
 $number = $number / $divide;  
   
 # Create multiple threads running each one the subroutine  
 for (my $i=0; $i<$divide; $i++){  
      $thr1 = threads->create(\&calc, $number);  
      $thr2 = threads->create(\&calc, $number);  
      $thr3 = threads->create(\&calc, $number);  
      # Check subroutines ending and retrieve results   
      $res1 = $thr1->join();  
      $res2 = $thr2->join();  
      $res3 = $thr3->join();  
      $total += $res1 + $res2 + $res3;  
 }  
   
 # End timer  
 $end = new Benchmark;  
   
 # Calculate difference of times  
 $diff = timediff($end, $start);  
   
 # Print final result  
 print "Total = $total\n";  
   
 # Report benchmark  
 print "Time taken by a loop of 10000 times 3 threads was ", timestr($diff, 'all'), " seconds\n\n";