Friday, May 14, 2010

并行计算框架 ParallelPython

如何在有限的硬件条件下,使程序耗时更少,或者处理数据量更大?

这个问题在进入多核时代以来,似乎多了一种解决途径,那就是充分利用多处理器(核)来并行的完成计算任务.

这里的多处理环境可以是指一台拥有多核处理器甚至多颗处理器的服务器,也可以是指一个拥有多节点的计算机群。


针对开篇提出的那个问题,解决办法其实都是"分而治之"。从数据分割入手,我们可以得到类似MapReduce的解决方案,类似的Python实现也有不少;从程序角度去看,目前主要是以下几种途径:
  • 大规模并行处理系统(MPP,Figure 1)
  • 对称多处理(SMP,Figure 2)
  • 分布式计算(集群/网格计算,Figure 3)



 Figure 1 MPP
Figure 2 SMP
 
 Figure 3 Cluster

本文主要介绍的ParallelPython(简称pp)框架可以有效支持SMP和集群方式进行并行计算。

根据官方介绍,pp提供了在SMP(多CPU或多核)和集群(通过网络连接的多台计算机)上并行执行Python代码的机制,具有以下特性:
  • 在SMP和集群上并行执行Python代码
  • 易于理解和实现的基于工作的并行机制,便于把穿行应用转换成并行的
  • 自动构造最佳配置(默认时工作进程数量等同于系统处理器数量)
  • 动态处理器分配(允许运行时改变工作处理器数量)
  • 函数的工作缓存(透明的缓存机制确保后续调用降低负载)
  • 动态负载均衡(任务被动态的分配到各个处理器上)
  • 基于SHA的连接加密认证
  • 跨平台移植(Windows/Linux/Unix)
和传统的线程模型不同的是,thread和threading模块无法在字节码一级实现并行。因为Python解释器使用GIL(全局解释器锁)来在内部禁止并行执行。这个GIL限制你在SMP机器上同一时间也 只能执行一条字节码指令。而pp在内部使用进程和进程间通信来组织并行计算。并隐藏了所有内部的细节和复杂性,应用程序只需要提交工作任务并取回结果就可以了。

代码说话:
import pp
nodes = ('10.0.0.1',)
jober = pp.Server(ppservers=nodes)
f = jober.submit(func,args,depfunc,module)

submit函数接受worker具体执行的函数func,参数args以及func内部调用的函数depfunc和涉及到的模块module

相应的节点机器上执行:
./ppservers.py

不过节点机器上可能会报错"Socket connection is broken".解决办法如下:

class Close(Exception):
    pass

def send(self, data):
        bufsz = self.bufsz
        t_size = len(data)
        size = struct.pack('!Q', t_size)
        p_size = self.socket.send(size)
        if p_size == 0:
            raise Close('end connection')

        s_size = 0L
        while s_size < t_size:
            nd_sz = min(bufsz, t_size - s_size)
            p_size = self.socket.send(data[s_size:s_size+nd_sz])
            if p_size == 0:
                raise Close('end connection')
            s_size += p_size

Thursday, May 13, 2010

并发编程利器Eventlet

Eventlet是由第二人生(Secondlife)开源的高度伸缩性的Python网络编程库.


根据官方介绍大致特性如下:
  • 非阻塞I/O模型
  • 协程(Coroutines)使得开发者可以采用阻塞式的开发风格,却能够实现非阻塞I/O的效果
  • 隐式事件调度,使得可以在Python解释器或者应用程序的某一部分去使用Eventlet


关于协程,大致可以理解成允许子程序可以多次暂停和恢复执行,是实现多任务的一种有效手段,具体见这里


在Python的世界里,实现了nonblocking I/O的产品并不算少.比如内置的Asyncore和著名的Twisted.相比之下,Eventlet是更容易上手和使用的。


举个例子

import eventlet
pool = eventlet.GreenPool()
while True:    pool.spawn(func,args)

上面这段代码,几乎就是使用eventlet的范式:
  • GreenPool 用来实现协程,保证并行;
  • Spawn     用来调用相应的函数,完成具体业务.
每个func之间切换,实施“你运行一会、我运行一会”,并且在进行切换时必须指定何时切换以及切换到哪,当出现阻塞时,就显式切换到另一 段没有被阻塞的代码段执行,直到原先的阻塞状况消失以后,再人工切换回原来的代码段继续处理.


Eventlet内置提供了一个基于上述原理实现的数据库连接池,目前仅支持MySQL和PostgreSQL.为了测试其性能如何,我参考了gashero的这篇文章,并简化了测试方案.


测试对象分别是MySQLdb(MySQL驱动的Python封装),Eventlet.db_pool,DBUtils


测试代码如下:
import time
import random
import MySQLdb
import eventlet.db_pool as db_pool
from DBUtils.PooledDB import PooledDB

conn_kwargs={'host':'192.168.8.84','user':'root','passwd':'','db':'logs'}
sql="""SELECT * FROM test WHERE id=%d"""
pooled=db_pool.ConnectionPool(MySQLdb,**conn_kwargs)
pooldb=PooledDB(MySQLdb,**conn_kwargs)

def query(conn):
    cur=conn.cursor()
    cur.execute(sql%(random.randint(1,1000)))
    data=cur.fetchall()
    return cur

def print_now():
    print time.strftime("%H:%M:%S")
    return

def test1(times):
    print_now()
    for i in range(0,times):
        conn=MySQLdb.connect(**conn_kwargs)
        r = query(conn)
        r.close()
        conn.close()
    print_now()
    return

def test2(times):
    print_now()
    for i in range(0,times):
        conn=pooled.get()
        try:
            query(conn)
        finally:
            pooled.put(conn)
    print_now()
    return

def test3(times):
    print_now()
    for i in range(0,times):
        conn=pooldb.connection()
        r=query(conn)
        r.close()
        conn.close()
    print_now()
    return


然后进入Python解释器交互环境
Python -i db-pool-test.py
>>> test1(10000) //MySQLdb
16:04:34
16:11:25
>>> test2(10000) //Event
16:12:35
16:15:22
>>> test3(10000) //DBUtils
16:15:28
16:18:09

总体来看,和传统的MySQLdb相比,性能有了很大的提升,和DBUtils差别并不是很明显.


协程凶猛啊!