fwdpy

forward-time population genetic simulation in python

Home
Reference manual
Google Group

View the Project on GitHub

.. _BGS_multiprocessing:

Advanced example: background selection (revisited)

This is the same background selection simulation as in the previous example, but with the following change to the implementation details:

The purpose of this example is to show that there are multiple ways to do things in terms of how to use parallel processing to perform simulations. Further, the technique of writing results to an SQLite database is very powerful as it allows many analyses (“aggregations”) to be done without loading all of your simulation results into RAM.

Rationale

We designed fwdpy to run simulations in parallel with ease. The design enables work flows where you run a “batch” of simulations and then process them as you need. However, this can create a bottleneck on the CPU resources when the processing is done back on the Python side:

  1. Run a bunch of simulations in parallel
  2. Process each replicate serially
  3. Return to step 1 until you’ve obtained the desired number of replicates

An alternative is to use Python’s built-in support for multiprocessing, which boils down to running jobs in separate Python threads and thus bypassing the dreaded GIL. This design allows you to set up the equivalent of a thread pool for running your simulations. Each process runs a single simulation, processes it, and then exits.

The main challenge with this design is that it requires better organization on your end. You need to write the function that will be farmed out to other processes, and its arguments must consist of pure Python types, meaning that none of fwdpy’s Cython-based classes can be used.

The other challenge is synchronizing the output. Here, we use multiprocessing.Lock to do that for us.

#Use Python 3's print a a function.
#This future-proofs the code in the notebook
from __future__ import print_function
#Import fwdpy.  Give it a shorter name
import fwdpy as fp
##Other libs we need
import numpy as np
import pandas as pd
import math
import os
import sqlite3
import multiprocessing as mp
import libsequence.polytable as polyt
import libsequence.summstats as sstats

Define the function that we will run in separate Python processes

The details of setting up the simulation are identical to the prevous BGS example.

LOCK=mp.Lock()
def simulate_async(args):
    """
    This function will be run in a separate process
    using the multiprocessing module.  Its argument 
    list is a tuple.

    """
    #Assign names to the tuple values
    seed,dbname,tablename = args
    
    # Where neutral mutations occur:
    nregions = [fp.Region(beg=0,end=1,weight=1)]

    # Where selected mutations occur:
    sregions = [fp.ConstantS(beg=-1,end=0,weight=1,s=-0.05,h=1),
                fp.ConstantS(beg=1,end=2,weight=1,s=-0.05,h=1)]

    # Recombination:
    recregions = [fp.Region(beg=-1,end=2,weight=1)]

    #Population size
    N=1000
    #We'll evolve for 10N generations.
    #nlist is a list of population sizes over time.
    #len(nlist) is the length of the simulation
    #We use numpy arrays for speed and optimised RAM
    #use.  Note the dtype=np.uint32, which means 32-bit
    #unsigned integer. Failure to use this type will
    #cause a run-time error.
    nlist = np.array([N]*(10*N),dtype=np.uint32)

    #Initalize a random number generator with seed value of 101
    rng = fp.GSLrng(seed)

    pops = fp.evolve_regions(rng,  
                         1,       #Simulate only 1 population at a time     
                         N,        
                         nlist[0:],
                         0.005,    
                         0.01,     
                         0.005,    
                         nregions, 
                         sregions, 
                         recregions)
    sample = fp.get_samples(rng,pops[0],20)
    simdatasNeut = polyt.SimData(sample[0])
    polySIMn = sstats.PolySIM(simdatasNeut)
    ##Append stats into our growing DataFrame:
    summstats=[{'thetapi':polySIMn.thetapi(),'npoly':polySIMn.numpoly(),'thetaw':polySIMn.thetaw()}]
    DF=pd.DataFrame(summstats)
    
    #We must prevent multiple processes from
    #writing to the database at once.
    #We use our global lock as a mutex 
    #to ensure that only 1 process is writing
    #at a time.
    LOCK.acquire()
    con = sqlite3.connect(dbname)
    DF.to_sql(tablename,con,if_exists='append',index=False)
    con.close()
    LOCK.release()

Run the simulations

The following block of code sets up a thread pool to run the above function using 40 separate processes.

if os.path.isfile('BGSmp.db'):
    os.remove('BGSmp.db')
np.random.seed(101)
#Generate the arguments to pass to simulate_async.
#The arguments for mp.Pool.imap_unordered must be a tuple.
#Our list of arguments will be 1000 elements long. Each tuple
#contains a random seed.  If this were a study for publication,
#I would be more careful and guarantee that each seed is unique.
args=[(seed,'BGSmp.db','stats') for seed in np.random.randint(0,42000000,1000)]
#P a thread pool using the number of processors on your machine
#If you have < 40 cores, it'll spawn new processes as old ones finish.
P=mp.Pool() 
#Pass the arguments along to the process pool.
#This will run 1,000 replicate simulations
#and output data from each to our sqlite3 
#database.
P.imap_unordered(simulate_async,args)
P.close()
P.join()

Getting the mean diversity

#open database connection:
c=sqlite3.connect('BGSmp.db')
#Get means for each column:
x=pd.read_sql_query('select avg(npoly),avg(thetapi),avg(thetaw) from stats',c)
c.close()
os.remove('BGSmp.db')
print(x)
   avg(npoly)  avg(thetapi)  avg(thetaw)
0      57.635     16.033353    16.245555

The ‘thetapi’ record is our mean $\pi$ from all of the simulations, and it is quite close to the theoretical value.