上下文管理器

上下文管理器

让对象支持上下文管理器

对象需要定义 __enter____exit__

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from socket import socket, AF_INET, SOCK_STREAM


class LazyConnection:
def __init__(self, address, family=AF_INET, type=SOCK_STREAM):
self.address = address
self.family = family
self.type = type
self.sock = None

def __enter__(self):
if self.sock is not None:
raise RuntimeError('Already connected')
self.sock = socket(self.family, self.type)
self.sock.connect(self.address)
return self.sock

def __exit__(self, exc_ty, exc_val, tb): # 异常类型,异常值和异常的trackback
self.sock.close()
self.sock = None
#return True 如果这里返回True 则代表不处理with中的异常

if __name__ == '__main__':
from functools import partial

conn = LazyConnection(('www.python.org', 80))
# Connection closed
with conn as s:
# conn.__enter__() executes: connection open
s.send(b'GET /index.html HTTP/1.0\r\n')
s.send(b'Host: www.python.org\r\n')
s.send(b'\r\n')
resp = b''.join(iter(partial(s.recv, 8192), b''))
print(resp)
# conn.__exit__() executes: connection closed

线程安全修改版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from socket import socket, AF_INET, SOCK_STREAM
import threading

class LazyConnection:
def __init__(self, address, family=AF_INET, type=SOCK_STREAM):
self.address = address
self.family = AF_INET
self.type = SOCK_STREAM
self.local = threading.local()

def __enter__(self):
if hasattr(self.local, 'sock'):
raise RuntimeError('Already connected')
self.local.sock = socket(self.family, self.type)
self.local.sock.connect(self.address)
return self.local.sock

def __exit__(self, exc_ty, exc_val, tb):
self.local.sock.close()
del self.local.sock

from functools import partial
def test(conn):
with conn as s:
s.send(b'GET /index.html HTTP/1.0\r\n')
s.send(b'Host: www.huawei.com\r\n')
s.send(b'\r\n')
resp = b''.join(iter(partial(s.recv, 8192), b''))

print('Got {} bytes'.format(len(resp)))

if __name__ == '__main__':
conn = LazyConnection(('www.huawei.com', 80))

t1 = threading.Thread(target=test, args=(conn,))
t2 = threading.Thread(target=test, args=(conn,))
t1.start()
t2.start()
t1.join()
t2.join()

使用 contexlib 模块中的 @contextmanager装饰器

实现一个新的上下文管理器的最简单的方法

yield 之前的代码会在上下文管理器中作为 __enter__() 方法执行,
所有在 yield 之后的代码会作为 __exit__() 方法执行

先执行print('《', end=''),遇到yeild ,执行 print('挪威的森林',end='') , 最后执行 print('》', end='')

1
2
3
4
5
6
7
8
9
10
11
12
from contextlib import contextmanager
@contextmanager
def book_mark():
print('《', end='')
yield
print('》', end='')


with book_mark():
print('挪威的森林',end='')

# 《挪威的森林》

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time
class timethis:
def __init__(self, label):
self.label = label

def __enter__(self):
self.start = time.time()
print(f'start = {self.start}')

def __exit__(self, exc_ty, exc_val, exc_tb):
end = time.time()
print(f'{end = }')
print('{}: {}'.format(self.label, end - self.start))

with timethis('counting'):
n = 10000000
while n > 0:
n -= 1

使用from contextlib import contextmanager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import time
from contextlib import contextmanager

@contextmanager
def timethis(label):
start = time.time()
print(f'{start = }')
try:
yield
finally:
end = time.time()
print(f'{end = }')
print(f'{label} : {end - start}')

# Example use
with timethis('counting'):
n = 10000000
while n > 0:
n -= 1

任何对列表的修改只有当所有代码运行完成并且不出现异常的情况下才会生效

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    

from contextlib import contextmanager

@contextmanager
def list_transaction(orig_list):
working = list(orig_list)
yield working
orig_list[:] = working
# 任何对列表的修改只有当所有代码运行完成并且不出现异常的情况下才会生效

items = [1,2,3]
with list_transaction(items) as working:
working.append(4)
working.append(5)

print(items) # [1,2,3,4,5]

with list_transaction(items) as working:
working.append(6)
working.append(7) working是[1,2,3,4,5,6,7]
raise RuntimeError('oops') 执行不了orig_list[:] = working

print(items) ## 还是 [1,2,3,4,5]