Chow's Notes

web.py源码(持续更新)

文件结构:

utils.py中定义的是些 General Utilities,通用工具集和数据结构

1.对序列进行分组的函数(生成器的很好使用,值得借鉴)group

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def group(seq, size):
def take(seq, n):
for i in xrange(n):
yield seq.next()
if not hasattr(seq, 'next'):
seq = iter(seq)
while True:
inger=take(seq, size)
x = list(inger)
if x:
yield x
else:
break
/*重点在take生成器,每次调用后seq会停留在的上次迭代n次seq结束的seq[n]处,因为seq被转换成了迭代器,next会一直有效直到所有元素迭代完为止,
把生成器转换成列表,实际上也是调用next,直到抛出StopIteration异常,被list捕获*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def take(seq, n):
for i in xrange(n):
yield seq.next()
#while True:
# inger=take(seq,2)
# import time
# time.sleep(1)
# print list(inger) #迭代到最后一个异常被捕获
res=take(seq,2)
#print list(res)
print res.next()
print res.next()
res=take(seq,2)
#print list(res)
print res.next()
print res.next()
res=take(seq,2)
print list(res)
print res.next()
print res.next() #StopIteration,被list捕获

web.py-0.111中的实现不能,分出最后一个元素

1
2
3
4
5
6
7
8
9
10
11
12
def group(seq, size):
"""Breaks 'seq' into a generator of lists with length 'size'."""
if not hasattr(seq, 'next'): seq = iter(seq)
while True: yield [seq.next() for i in xrange(size)]
seq=iter([1,2,3,4,5])
#r=group(seq,2)
#print r.next()
#print r.next()
#print r.next() #抛出异常,因此没有生成[5]

2.缓存函数每个线程调用的返回结果。memoize

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class memoize:(web.py-0.111定义的版本)
def __init__(self, func):
self.func = func;
self.cache = {}
def __call__(self, *a, **k):
key = (a, tuple(k.items()))
if key not in self.cache:
print "no cahe"
self.cache[key] = self.func(*a, **k)
print "cache",self.cache[key]
return self.cache[key]
def test(a):
return a
me_test=memoize(test)
me_test("aa")
t=threading.Thread(target=me_test,args=('aa',))
t.start()
t.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class Memoize:(web.py 0.37)
"""
'Memoizes' a function, caching its return values for each input.
If `expires` is specified, values are recalculated after `expires` seconds.
If `background` is specified, values are recalculated in a separate thread.
>>> calls = 0
>>> def howmanytimeshaveibeencalled():
... global calls
... calls += 1
... return calls
>>> fastcalls = memoize(howmanytimeshaveibeencalled)
>>> howmanytimeshaveibeencalled()
1
>>> howmanytimeshaveibeencalled()
2
>>> fastcalls()
3
>>> fastcalls()
3
>>> import time
>>> fastcalls = memoize(howmanytimeshaveibeencalled, .1, background=False)
>>> fastcalls()
4
>>> fastcalls()
4
>>> time.sleep(.2)
>>> fastcalls()
5
>>> def slowfunc():
... time.sleep(.1)
... return howmanytimeshaveibeencalled()
>>> fastcalls = memoize(slowfunc, .2, background=True)
>>> fastcalls()
6
>>> timelimit(.05)(fastcalls)()
6
>>> time.sleep(.2)
>>> timelimit(.05)(fastcalls)()
6
>>> timelimit(.05)(fastcalls)()
6
>>> time.sleep(.2)
>>> timelimit(.05)(fastcalls)()
7
>>> fastcalls = memoize(slowfunc, None, background=True)
>>> threading.Thread(target=fastcalls).start()
>>> time.sleep(.01)
>>> fastcalls()
9
"""
def __init__(self, func, expires=None, background=True):
self.func = func
self.cache = {}
self.expires = expires
self.background = background
self.running = {}
def __call__(self, *args, **keywords):
key = (args, tuple(keywords.items()))
if not self.running.get(key):
self.running[key] = threading.Lock()
def update(block=False):
if self.running[key].acquire(block):
try:
self.cache[key] = (self.func(*args, **keywords), time.time())
finally:
self.running[key].release()
if key not in self.cache:
update(block=True)
elif self.expires and (time.time() - self.cache[key][1]) > self.expires:
if self.background:
threading.Thread(target=update).start()
else:
update()
return self.cache[key][0]
memoize = Memoize
re_compile = memoize(re.compile) #@@ threadsafe?
re_compile.__doc__ = """
A memoized version of re.compile.
"""

3.重定向输出到一个对象中(在类中定义一个write方法,用于重定向)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import sys
class A(object):
text=''
class _outputter(object):
def write(self, x):
A.text+=x
_oldstdout=sys.stdout
sys.stdout=_outputter()
print "xxxxxxxx"
sys.stdout=_oldstdout
print "ctx:",A.text

4.捕获异常,格式化信息输出的函数(可以借用)(web.py-0111)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def djangoerror():
def _get_lines_from_file(filename, lineno, context_lines):
"""
Returns context_lines before and after lineno from file.
Returns (pre_context_lineno, pre_context, context_line, post_context).
"""
try:
source = open(filename).readlines()
lower_bound = max(0, lineno - context_lines)
upper_bound = lineno + context_lines
#pre_context = [line.strip('\n') for line in source[lower_bound:lineno]]
       pre_context = [line.strip('\n') for line in source[lower_bound-1:lineno-1]]
#context_line = source[lineno].strip('\n')
       context_line = source[lineno-1].strip('\n')#因为readlines是从索引0开始的
#post_context = [line.strip('\n') for line in source[lineno+1:upper_bound]]
       post_context = [line.strip('\n') for line in source[lineno:upper_bound]]
return lower_bound, pre_context, context_line, post_context
except (OSError, IOError):
return None, [], None, []
exception_type, exception_value, tb = sys.exc_info()
frames = []
while tb is not None:
filename = tb.tb_frame.f_code.co_filename
function = tb.tb_frame.f_code.co_name
#lineno = tb.tb_lineno - 1 #异常信息所在行号显示不对
    lineno = tb.tb_lineno
pre_context_lineno, pre_context, context_line, post_context = _get_lines_from_file(filename, lineno, 7)
frames.append({
'tb': tb,
'filename': filename,
'function': function,
'lineno': lineno,
'vars': tb.tb_frame.f_locals.items(),
'id': id(tb),
'pre_context': pre_context,
'context_line': context_line,
'post_context': post_context,
'pre_context_lineno': pre_context_lineno,
})
tb = tb.tb_next

frames 保存跟踪异常回溯的每层 信息

1
2
3
4
5
6
7
8
9
lastframe = frames[-1]
urljoin = urlparse.urljoin
input_ = input()
cookies_ = cookies()
context_ = context
prettify = pprint.pformat
#格式化的异常信息,方便在模板展示
#frames 以及下面的局部变量可以 用于HTML模板系统展示

用于异常展示的HTML模板(django框架里面的):基于Cheetah模板系统(Cheetah是一个用Python写的模板系统与代码生成工具)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def djangoerror():
def _get_lines_from_file(filename, lineno, context_lines):
"""
Returns context_lines before and after lineno from file.
Returns (pre_context_lineno, pre_context, context_line, post_context).
"""
try:
source = open(filename).readlines()
lower_bound = max(0, lineno - context_lines)
upper_bound = lineno + context_lines
pre_context = [line.strip('\n') for line in source[lower_bound:lineno]]
context_line = source[lineno-1].strip('\n')#因为readlines是从索引0开始的
post_context = [line.strip('\n') for line in source[lineno:upper_bound]]
return lower_bound, pre_context, context_line, post_context
except (OSError, IOError):
return None, [], None, []
exception_type, exception_value, tb = sys.exc_info()
frames = []
while tb is not None:
filename = tb.tb_frame.f_code.co_filename
function = tb.tb_frame.f_code.co_name
lineno = tb.tb_lineno #- 1#不减一显示的是报异常那句后面的语句,在_get_lines_from_file,处理readlines减1,显示正常
pre_context_lineno, pre_context, context_line, post_context = _get_lines_from_file(filename, lineno, 7)
frames.append({
'tb': tb,
'filename': filename,
'function': function,
'lineno': lineno,
'vars': tb.tb_frame.f_locals.items(),
'id': id(tb),
'pre_context': pre_context,
'context_line': context_line,
'post_context': post_context,
'pre_context_lineno': pre_context_lineno,
})
tb = tb.tb_next
lastframe = frames[-1]
urljoin = urlparse.urljoin
input_ = input()
cookies_ = cookies()
context_ = context
prettify = pprint.pformat
#格式化的异常信息,方便在模板展示
return render(DJANGO_500_PAGE, asTemplate=True, isString=True)

5.application类

钩子函数:
loadhook(func) 在处理请求之前 func()
unloadhook(func) 在处理请求之后 func()

app.add_processor(processor) processor的参数为handler:请求处理函数

web.config.debug=False 请求处理不会自动重新加载 app所在模块的对象,web.py自带的服务器默认值为True
在 wsgi.py 中web.config.setdefault(‘debug’, _is_dev_mode())设置debug的默认值,_is_dev_mode判断是否是开发环境

wsgifunc 方法,返回一个兼容WSGI的application函数 ,接受一系列中间件middleware参数

该方法中定义的函数

peep(iterator) ”’Peeps into an iterator by doing an iteration and returns an equivalent iterator.”’

is_generator(x) 判断对象是否是一个生成器

wsgi

_cleanup 方法 清除所有ThreadedDict的是实例对象(每实例化一个线程字典对象都会被保存)

load 方法,用env字典变量初始化web.ctx各项属性值. 在wsgiffunc方法返回的wsgi函数中调用

no finish…

web.py中定义几种数据结构:

1. Storage类

在utils文件中定义, 类字典对象,从内建对象dict派生,通过实现特殊方法:getattr,setattr,delattr,
以至于像设置、获取、删除对象的属性和值一样设置、获取、删除字典的键和值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Storage(dict):
def __getattr__(self, key):
try:
return self[key]
except KeyError, k:
raise AttributeError, k
def __setattr__(self, key, value):
self[key] = value
def __delattr__(self, key):
try:
del self[key]
except KeyError, k:
raise AttributeError, k
def __repr__(self):
return '' + dict.__repr__(self) + '>'

web.config 初始化的是一个storge对象

2.ThreadedDict类

在utils文件中定义,线程字典,线程安全的,该实例化对象存在于主线程中,但是其属性值 在每一个线程中是独立的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
"""
Thread local storage.
>>> d = ThreadedDict()
>>> d.x = 1
>>> d.x
1
>>> import threading
>>> def f(): d.x = 2
...
>>> t = threading.Thread(target=f)
>>> t.start()
>>> t.join()
>>> d.x
1
"""
当前版本的实现:(依赖于threading模块的local (在python23.py 有个专门对python2.3实现的threadlocal))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import threading
from threading import local as threadlocal
class threadlocal(object):
"""Implementation of threading.local for python2.3.
"""
def __getattribute__(self, name):
if name == "__dict__":
return threadlocal._getd(self)
else:
try:
return object.__getattribute__(self, name)
except AttributeError:
try:
return self.__dict__[name]
except KeyError:
raise AttributeError, name
def __setattr__(self, name, value):
self.__dict__[name] = value
def __delattr__(self, name):
try:
del self.__dict__[name]
except KeyError:
raise AttributeError, name
def _getd(self):
t = threading.currentThread()
if not hasattr(t, '_d'):
# using __dict__ of thread as thread local storage
t._d = {}
_id = id(self)
# there could be multiple instances of threadlocal.
# use id(self) as key
if _id not in t._d:
t._d[_id] = {}
return t._d[_id]
class ThreadedDict(threadlocal):
_instances = set()
def __init__(self):
ThreadedDict._instances.add(self)
def __del__(self):
ThreadedDict._instances.remove(self)
def __hash__(self):
return id(self)
def clear_all():
"""Clears all ThreadedDict instances.
"""
for t in list(ThreadedDict._instances):
t.clear()
clear_all = staticmethod(clear_all)
# Define all these methods to more or less fully emulate dict -- attribute access
# is built into threading.local.
def __getitem__(self, key):
return self.__dict__[key]
def __setitem__(self, key, value):
self.__dict__[key] = value
def __delitem__(self, key):
del self.__dict__[key]
def __contains__(self, key):
return key in self.__dict__
has_key = __contains__
def clear(self):
self.__dict__.clear()
def copy(self):
return self.__dict__.copy()
def get(self, key, default=None):
return self.__dict__.get(key, default)
def items(self):
return self.__dict__.items()
def iteritems(self):
return self.__dict__.iteritems()
def keys(self):
return self.__dict__.keys()
def iterkeys(self):
return self.__dict__.iterkeys()
iter = iterkeys
def values(self):
return self.__dict__.values()
def itervalues(self):
return self.__dict__.itervalues()
def pop(self, key, *args):
return self.__dict__.pop(key, *args)
def popitem(self):
return self.__dict__.popitem()
def setdefault(self, key, default=None):
return self.__dict__.setdefault(key, default)
def update(self, *args, **kwargs):
self.__dict__.update(*args, **kwargs)
def __repr__(self):
return '' % self.__dict__
__str__ = __repr__
#该类继承自 threading.local 类,然后定制成类字典对象

web.py 0.111中的实现。依赖于threading中的currentThread:(每个线程都必须用线程id作为键更新全局_context字典对象,以便线程根据自己的线程id访问/设置自己的属性)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class threadeddict:
def __init__(self, d):
self.__dict__['_threadeddict__d'] = d #相当于设置一个__d私有属相
def __getattr__(self, a):
return getattr(self.__d[currentThread()], a)
def __getitem__(self, i):
return self.__d[currentThread()][i]
def __setattr__(self, a, v):
return setattr(self.__d[currentThread()], a, v)
def __setitem__(self, i, v):
self.__d[currentThread()][i] = v
###使用示例####
_context = {currentThread():Storage()}
mydata=threadeddict(_context)
mydata.number=345
print mydata.number
def f():
_context[currentThread()]= Storage()#添加键位当前线程id的项
mydata.number=22
print mydata.number
t=threading.Thread(target=f)
t.start()
t.join()
print mydata.number
-----------
>345
>22
>345

附————

os.stat(mod.file).st_mtime os.stat()获取文件的相关属性

itertools.chain 分别迭代多个序列或迭代器对象,然后连接在一起转换成一个迭代器

sys._getframe(1).f_locals

Return a frame object from the call stack. If optional integer depth is
given, return the frame object that many calls below the top of the stack.
If that is deeper than the call stack, ValueError is raised. The default
for depth is zero, returning the frame at the top of the call stack.