python中一切皆对象

函数和类也是对象,属于python的一等公民

  • 赋值给一个变量
  • 可以添加到集合对象中
  • 可以作为参数传递给函数
  • 可以当做函数的返回值

type、object和class之间的关系

type可以生成一个类,可以返回对象的类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
a = 1
b = "abc"
print(type(1)) # <class 'int'>
print(type(int)) # <class 'type'>
print(type(b))
print(type(str))

# type->int->1
# type->class->obj

# object是最顶层的基类
# type也是一个类,同时type也是一个对象
class Student:
pass

class MyStudent(Student):
pass

stu = Student()
print(type(stu)) # <class '__main__.Student'>
print(type(Student)) # <class 'type'>

print(int.__bases__) # (<class 'object'>,)
print(Student.__bases__) # (<class 'object'>,)
print(MyStudent.__bases__) # (<class '__main__.Student'>,)

print(type.__bases__) # (<class 'object'>,)
print(object.__bases__) # ()
print(type(object)) # (<class 'type'>,)

魔法函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class MyVector(object):
def __init__(self, x, y):
self.x = x
self.y = y

def __add__(self, other):
re_vector = MyVector(self.x+other.x, self.y+other.y)
return re_vector

def __str__(self):
return "x:{x}, y:{y}".format(x=self.x, y=self.y)

first_vec = MyVector(1,2)
second_vec = MyVector(3,4)
print(first_vec + second_vec) # x:4, y:6

抽象基类(abc模块)

1
2
3
4
5
6
7
8
9
10
11
12
class Company(object):
def __init__(self, employee_list):
self.employee = employee_list

def __len__(self):
return len(self.employee)

com = Company(['reese', 'neo'])

# 在某些情况下希望判定某个对象的类型
from collections.abc import Sized
print(isinstance(com, Sized))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import abc

class CacheBase(metaclass=abc.ABCMeta):
@abc.abstractmethod
def get(self, key):
pass

@abc.abstractmethod
def set(self, key, value):
pass


class RedisCache(CacheBase):
pass

redis_cache = RedisCache()

# 会抛异常,强制你实现基类的get、set方法

python的自省机制

自省是通过一定的机制查询到对象的内部结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Person:
name = "user"


class Student(Person):
def __init__(self, school_name):
self.school_name = school_name


if __name__ == "__main__":
user = Student("cwz")

print(user.__dict__)


# 输出:{'school_name': 'cwz'}

super()真的是调用父类的吗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class A:
def __init__(self):
print('A')


class B(A):
def __init__(self):
print("B")
super().__init__()


class C(A):
def __init__(self):
print("C")
super(C, self).__init__()


class D(B, C):
def __init__(self):
print("D")
super(D, self).__init__()


if __name__ == '__main__':
D()


# 输出结果:
D
B
C
A

# 调用super()函数的顺序 mro

mixin模式特点:

  • Mixin类功能单一
  • 不和基类关联,可以和任意基类组合
  • 在mixin中不要使用super这种用法

with语句

上下文管理协议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Simple:
def __enter__(self):
print("enter")
return self

def __exit__(self, exc_type, exc_val, exc_tb):
print("exit")

def do_something(self):
print("do something")


with Simple() as simple:
simple.do_something()

contextlib

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import contextlib

@contextlib.contextmanager
def test(x):
print("start...")
yield
print("end...")

with test(1) as f:
print("进行中")

start...
进行中
end...

自定义序列类

python中序列类型的abc继承关系

1
from collections import abc

切片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#模式[start:end:step]
"""
其中,第一个数字start表示切片开始位置,默认为0;
第二个数字end表示切片截止(但不包含)位置(默认为列表长度);
第三个数字step表示切片的步长(默认为1)。
当start为0时可以省略,当end为列表长度时可以省略,
当step为1时可以省略,并且省略步长时可以同时省略最后一个冒号。
另外,当step为负整数时,表示反向切片,这时start应该比end的值要大才行。
"""
aList = [3, 4, 5, 6, 7, 9, 11, 13, 15, 17]
print (aList[::]) # 返回包含原列表中所有元素的新列表
print (aList[::-1]) # 返回包含原列表中所有元素的逆序列表
print (aList[::2]) # 隔一个取一个,获取偶数位置的元素
print (aList[1::2]) # 隔一个取一个,获取奇数位置的元素
print (aList[3:6]) # 指定切片的开始和结束位置
aList[0:100] # 切片结束位置大于列表长度时,从列表尾部截断
aList[100:] # 切片开始位置大于列表长度时,返回空列表

aList[len(aList):] = [9] # 在列表尾部增加元素
aList[:0] = [1, 2] # 在列表头部插入元素
aList[3:3] = [4] # 在列表中间位置插入元素
aList[:3] = [1, 2] # 替换列表元素,等号两边的列表长度相等
aList[3:] = [4, 5, 6] # 等号两边的列表长度也可以不相等
aList[::2] = [0] * 3 # 隔一个修改一个
print (aList)
aList[::2] = ['a', 'b', 'c'] # 隔一个修改一个
aList[::2] = [1,2] # 左侧切片不连续,等号两边列表长度必须相等
aList[:3] = [] # 删除列表中前3个元素

del aList[:3] # 切片元素连续
del aList[::2] # 切片元素不连续,隔一个删一个

自定义可切片对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import numbers


class Group:
# 支持切片操作
def __init__(self, group_name, company_name, staffs):
self.group_name = group_name
self.company_name = company_name
self.staffs = staffs

def __reversed__(self):
self.staffs.reverse()

def __getitem__(self, item):
cls = type(self)
if isinstance(item, slice):
return cls(group_name=self.group_name, company_name=self.company_name, staffs=self.staffs[item])
elif isinstance(item, numbers.Integral):
return cls(group_name=self.group_name, company_name=self.company_name, staffs=[self.staffs[item]])

def __len__(self):
return len(self.staffs)

def __iter__(self):
return iter(self.staffs)

def __contains__(self, item):
if item in self.staffs:
return True
else:
return False


staffs = ["cwz", "reese", "neo", "setcreed"]
group = Group("user", "imooc", staffs)
print(group[:2])
print(len(group))
for user in group:
print(user)
reversed(group)
if "cwz" in group:
print("ok")

bisect维护已排序序列

1
2
3
4
5
6
7
8
9
10
11
12
13
import bisect

# 用来处理已排序的序列,;用来维持已排序的序列,升序

inter_list = []
bisect.insort(inter_list, 3)
bisect.insort(inter_list, 1)
bisect.insort(inter_list, 5)
bisect.insort(inter_list, 7)
bisect.insort(inter_list, 0)
print(inter_list) # [0, 1, 3, 5, 7]

print(bisect.bisect(inter_list, 3))

什么时候不使用列表

1
2
3
import array

# array和list的一个重要区别,array只能存放指定的数据类型

列表推导式、生成器表达式、字典推导式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#用简介的方式去遍历可迭代对象生成需要格式的列表
int_list = [1,2,3,4,5]

qu_list = [item * item for item in int_list]
print (type(qu_list))
int_list = [1,2,-3,4,5]

qu_list = [item if item > 0 else abs(item) for item in int_list]

#笛卡尔积
int_list1 = [1,2]
int_list2 = [3,4]

qu_list = [(first, second) for first in int_list1 for second in int_list2]

my_dict = {
"key1":"bobby1",
"key2":"bobby2"
}

# qu_list = [(key, value) for key, value in my_dict.items()]
#
# qu_list2 = list(((key, value) for key, value in my_dict.items()))
#
# for item in qu_list2:
# print (item)

int_list = [1,2,3,4,5]

def process_item(item):
return str(item)

int_dict = {process_item(item):item for item in int_list}
#列表生成式,第一:能用尽量用, 因为效率高
print (int_dict)


dict的abc继承关系

1
2
3
4
5
from collections.abc import Mapping, MutableMapping
# dict属于mapping类似

a = {}
print(isinstance(a, MutableMapping))

dict的子类

1
2
3
4
5
6
7
8
from collections import UserDict, defaultdict

class Mydict(UserDict):
def __setitem__(self, key, item):
return super().__setitem__(key, item*2)

my_dict = Mydict(one=2)
print(my_dict) # {'one': 4}

set和frozenset

1
s = frozenset("abcd")  # frozenset可以作为dict的key

dict和set实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

from random import randint


def load_list_data(total_nums, target_nums):
"""
从文件中读取数据,以list的方式返回
:param total_nums: 读取的数量
:param target_nums: 需要查询的数据的数量
"""
all_data = []
target_data = []
file_name = "G:/慕课网课程/AdvancePython/fbobject_idnew.txt"
with open(file_name, encoding="utf8", mode="r") as f_open:
for count, line in enumerate(f_open):
if count < total_nums:
all_data.append(line)
else:
break

for x in range(target_nums):
random_index = randint(0, total_nums)
if all_data[random_index] not in target_data:
target_data.append(all_data[random_index])
if len(target_data) == target_nums:
break

return all_data, target_data

def load_dict_data(total_nums, target_nums):
"""
从文件中读取数据,以dict的方式返回
:param total_nums: 读取的数量
:param target_nums: 需要查询的数据的数量
"""
all_data = {}
target_data = []
file_name = "G:/慕课网课程/AdvancePython/fbobject_idnew.txt"
with open(file_name, encoding="utf8", mode="r") as f_open:
for count, line in enumerate(f_open):
if count < total_nums:
all_data[line] = 0
else:
break
all_data_list = list(all_data)
for x in range(target_nums):
random_index = randint(0, total_nums-1)
if all_data_list[random_index] not in target_data:
target_data.append(all_data_list[random_index])
if len(target_data) == target_nums:
break

return all_data, target_data


def find_test(all_data, target_data):
#测试运行时间
test_times = 100
total_times = 0
import time
for i in range(test_times):
find = 0
start_time = time.time()
for data in target_data:
if data in all_data:
find += 1
last_time = time.time() - start_time
total_times += last_time
return total_times/test_times


if __name__ == "__main__":
# all_data, target_data = load_list_data(10000, 1000)
# all_data, target_data = load_list_data(100000, 1000)
# all_data, target_data = load_list_data(1000000, 1000)


# all_data, target_data = load_dict_data(10000, 1000)
# all_data, target_data = load_dict_data(100000, 1000)
# all_data, target_data = load_dict_data(1000000, 1000)
all_data, target_data = load_dict_data(2000000, 1000)
last_time = find_test(all_data, target_data)

#dict查找的性能远远大于list
#在list中随着list数据的增大 查找时间会增大
#在dict中查找元素不会随着dict的增大而增大
print(last_time)

#1. dict的key或者set的值 都必须是可以hash的
#不可变对象 都是可hash的, str, fronzenset, tuple,自己实现的类 __hash__
#2. dict的内存花销大,但是查询速度快, 自定义的对象 或者python内部的对象都是用dict包装的
# 3. dict的存储顺序和元素添加顺序有关
# 4. 添加数据有可能改变已有数据的顺序

数组是一个连续的空间,不需要从头到尾遍历

python的变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#python和java中的变量本质不一样,python的变量实质上是一个指针 int str, 便利贴

a = 1
a = "abc"
#1. a贴在1上面
#2. 先生成对象 然后贴便利贴

a = [1,2,3]
b = a
print (id(a), id(b))
print (a is b)
# b.append(4)
# print (a)

a = [1,2,3,4]
b = [1,2,3,4]

class People:
pass

person = People()
if type(person) is People:
print ("yes")
# print(a == b)
# print (id(a), id(b))
# print (a is b)

一个经典的错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def add(a, b):
a += b
return a

class Company:
def __init__(self, name, staffs=[]):
self.name = name
self.staffs = staffs
def add(self, staff_name):
self.staffs.append(staff_name)
def remove(self, staff_name):
self.staffs.remove(staff_name)

if __name__ == "__main__":
com1 = Company("com1", ["bobby1", "bobby2"])
com1.add("bobby3")
com1.remove("bobby1")
print (com1.staffs)

com2 = Company("com2")
com2.add("bobby")
print(com2.staffs)

print (Company.__init__.__defaults__)

com3 = Company("com3")
com3.add("bobby5")
print (com2.staffs)
print (com3.staffs)
print (com2.staffs is com3.staffs)

# a = 1
# b = 2
#
# a = [1,2]
# b = [3,4]
#
# a = (1, 2)
# b = (3, 4)
#
# c = add(a, b)
#
# print(c)
# print(a, b)

元类编程

property动态属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from datetime import datetime, date

class User:
def __init__(self, name, birthday):
self.name = name
self.birthday = birthday
self._age = 0

# def get_age(self):
# return datetime.now().year - self.birthday.year

@property
def age(self):
return datetime.now().year - self.birthday.year

@age.setter
def age(self, value):
self._age = value

if __name__ == '__main__':
user = User("cwz", date(year=1994, month=5, day=10))
user.age = 30
print(user._age)
print(user.age)

__getattr__找不到属性触发,

__getattribute__ 不管有没有找到属性都会触发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from datetime import date

class User:
def __init__(self, name, birthday, info={}):
self.name = name
self.birthday = birthday
self.info = info

def __getattr__(self, item):
return self.info[item]


if __name__ == '__main__':
user = User("cwz", date(year=1994, month=5, day=10), info={"height": 178})
print(user.height)

属性描述符

举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
class User:
def __init__(self, name, birthday):
self.name = name
self.birthday = birthday
self._age = 0

@property
def age(self):
return datetime.now().year - self.birthday.year

@age.setter
def age(self, value):
self._age = value

如果要校验name是否是字符串类型,就需要@age.setter这样写重复的代码。

只要实现下面三个魔法方法中任意一个方法,就是属性描述符对象

1
2
3
4
5
6
7
8
9
class IntField:
def __get__(self, instance, owner):
pass

def __set__(self, instance, value):
pass

def __delete__(self, instance):
pass
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import numbers

class IntField:
# 数据属性描述符,实现了__get__和__set__
def __get__(self, instance, owner):
return self.value

def __set__(self, instance, value):
if not isinstance(value, numbers.Integral):
raise ValueError('int value need')
self.value = value

def __delete__(self, instance):
pass


class NonDataField:
# 非数据属性描述符,只实现了__get__
def __get__(self, instance, owner):
pass

class User:
age = IntField()

if __name__ == '__main__':
user = User()
user.age = 30
print(user.age)


'''
如果user是某个类的实例,那么user.age(以及等价的getattr(user,’age’))
首先调用__getattribute__。如果类定义了__getattr__方法,
那么在__getattribute__抛出 AttributeError 的时候就会调用到__getattr__,
而对于描述符(__get__)的调用,则是发生在__getattribute__内部的。
user = User(), 那么user.age 顺序如下:

(1)如果“age”是出现在User或其基类的__dict__中, 且age是data descriptor (数据属性描述符), 那么调用其__get__方法, 否则

(2)如果“age”出现在user的__dict__中, 那么直接返回 obj.__dict__[‘age’], 否则

(3)如果“age”出现在User或其基类的__dict__中

(3.1)如果age是non-data descriptor,那么调用其__get__方法, 否则

(3.2)返回 __dict__[‘age’]

(4)如果User有__getattr__方法,调用__getattr__方法,否则

(5)抛出AttributeError

'''

这样可以控制给对象赋值的行为

__new____init__

1
2
3
4
5
6
7
8
9
10
11
12
13
class User:
def __new__(cls, *args, **kwargs):
print (" in new ")
return super().__new__(cls)
def __init__(self, name):
print (" in init")
pass
a = int()
#new 是用来控制对象的生成过程, 在对象生成之前
#init是用来完善对象的
#如果new方法不返回对象, 则不会调用init函数
if __name__ == "__main__":
user = User(name="bobby")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 函数动态创建类
def create_class(name):
if name == "user":
class User:
def __str__(self):
return "user"
return User
elif name == "company":
class Company:
def __str__(self):
return "company"
return Company

if __name__ == '__main__':
my_class = create_class("user")
print(my_class())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#类也是对象,type创建类的类
def create_class(name):
if name == "user":
class User:
def __str__(self):
return "user"
return User
elif name == "company":
class Company:
def __str__(self):
return "company"
return Company

#type动态创建类
# User = type("User", (), {})

def say(self):
return "i am user"
# return self.name


class BaseClass():
def answer(self):
return "i am baseclass"


class MetaClass(type):
def __new__(cls, *args, **kwargs):
return super().__new__(cls, *args, **kwargs)

from collections.abc import *

#什么是元类, 元类是创建类的类 对象<-class(对象)<-type
class User(metaclass=MetaClass):
def __init__(self, name):
self.name = name
def __str__(self):
return "user"
#python中类的实例化过程,会首先寻找metaclass,通过metaclass去创建user类
#去创建类对象,实例

if __name__ == "__main__":
# MyClass = create_class("user")
# my_obj = MyClass()
# print(type(my_obj))

# User = type("User", (BaseClass, ), {"name":"user", "say":say})
my_obj = User(name="bobby")
print(my_obj)

元类实现简单的ORM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# 需求
import numbers


class Field:
pass

class IntField(Field):
# 数据描述符
def __init__(self, db_column, min_value=None, max_value=None):
self._value = None
self.min_value = min_value
self.max_value = max_value
self.db_column = db_column
if min_value is not None:
if not isinstance(min_value, numbers.Integral):
raise ValueError("min_value must be int")
elif min_value < 0:
raise ValueError("min_value must be positive int")
if max_value is not None:
if not isinstance(max_value, numbers.Integral):
raise ValueError("max_value must be int")
elif max_value < 0:
raise ValueError("max_value must be positive int")
if min_value is not None and max_value is not None:
if min_value > max_value:
raise ValueError("min_value must be smaller than max_value")

def __get__(self, instance, owner):
return self._value

def __set__(self, instance, value):
if not isinstance(value, numbers.Integral):
raise ValueError("int value need")
if value < self.min_value or value > self.max_value:
raise ValueError("value must between min_value and max_value")
self._value = value


class CharField(Field):
def __init__(self, db_column, max_length=None):
self._value = None
self.db_column = db_column
if max_length is None:
raise ValueError("you must spcify max_lenth for charfiled")
self.max_length = max_length

def __get__(self, instance, owner):
return self._value

def __set__(self, instance, value):
if not isinstance(value, str):
raise ValueError("string value need")
if len(value) > self.max_length:
raise ValueError("value len excess len of max_length")
self._value = value


class ModelMetaClass(type):
def __new__(cls, name, bases, attrs, **kwargs):
if name == "BaseModel":
return super().__new__(cls, name, bases, attrs, **kwargs)
fields = {}
for key, value in attrs.items():
if isinstance(value, Field):
fields[key] = value
attrs_meta = attrs.get("Meta", None)
_meta = {}
db_table = name.lower()
if attrs_meta is not None:
table = getattr(attrs_meta, "db_table", None)
if table is not None:
db_table = table
_meta["db_table"] = db_table
attrs["_meta"] = _meta
attrs["fields"] = fields
del attrs["Meta"]
return super().__new__(cls, name, bases, attrs, **kwargs)


class BaseModel(metaclass=ModelMetaClass):
def __init__(self, *args, **kwargs):
for key, value in kwargs.items():
setattr(self, key, value)
return super().__init__()

def save(self):
fields = []
values = []
for key, value in self.fields.items():
db_column = value.db_column
if db_column is None:
db_column = key.lower()
fields.append(db_column)
value = getattr(self, key)
values.append(str(value))

sql = "insert {db_table}({fields}) value({values})".format(db_table=self._meta["db_table"],
fields=",".join(fields), values=",".join(values))
pass

class User(BaseModel):
name = CharField(db_column="name", max_length=10)
age = IntField(db_column="age", min_value=1, max_value=100)

class Meta:
db_table = "user"


if __name__ == "__main__":
user = User(name="bobby", age=28)
# user.name = "bobby"
# user.age = 28
user.save()

python的迭代协议

1
2
3
4
5
6
7
8
9
10
#什么是迭代协议
#迭代器是什么? 迭代器是访问集合内元素的一种方式, 一般用来遍历数据
#迭代器和以下标的访问方式不一样, 迭代器是不能返回的, 迭代器提供了一种惰性方式数据的方式
#[] list , __iter__

from collections.abc import Iterable, Iterator
a = [1,2]
iter_rator = iter(a)
print (isinstance(a, Iterable))
print (isinstance(iter_rator, Iterator))

迭代器和可迭代对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from collections.abc import Iterator

class Company(object):
def __init__(self, employee_list):
self.employee = employee_list

def __iter__(self):
return MyIterator(self.employee)

# def __getitem__(self, item):
# return self.employee[item]


class MyIterator(Iterator):
def __init__(self, employee_list):
self.iter_list = employee_list
self.index = 0

def __next__(self):
#真正返回迭代值的逻辑
try:
word = self.iter_list[self.index]
except IndexError:
raise StopIteration
self.index += 1
return word

if __name__ == "__main__":
company = Company(["tom", "bob", "jane"])
my_itor = iter(company)
# while True:
# try:
# print (next(my_itor))
# except StopIteration:
# pass

# next(my_itor)
for item in company:
print (item)

生成器的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#生成器函数,函数里只要有yield关键字
def gen_func():
yield 1
yield 2
yield 3

def fib(index):
if index <= 2:
return 1
else:
return fib(index-1) + fib(index-2)

def fib2(index):
re_list = []
n,a,b = 0,0,1
while n<index:
re_list.append(b)
a,b = b, a+b
n += 1
return re_list

def gen_fib(index):
n,a,b = 0,0,1
while n<index:
yield b
a,b = b, a+b
n += 1

for data in gen_fib(10):
print (data)
# print (gen_fib(10))
# 斐波拉契 0 1 1 2 3 5 8
#惰性求值, 延迟求值提供了可能

def func():
return 1

if __name__ == "__main__":
#生成器对象, python编译字节码的时候就产生了,
gen = gen_func()
for value in gen:
print (value)
# re = func()
# pass

python是如何实现生成器的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#1.python中函数的工作原理
"""

"""
import inspect
frame = None
def foo():
bar()
def bar():
global frame
frame = inspect.currentframe()

#python.exe会用一个叫做 PyEval_EvalFramEx(c函数)去执行foo函数, 首先会创建一个栈帧(stack frame)
"""
python一切皆对象,栈帧对象, 字节码对象
当foo调用子函数 bar, 又会创建一个栈帧
所有的栈帧都是分配在堆内存上,这就决定了栈帧可以独立于调用者存在
"""
# import dis
# print(dis.dis(foo))

foo()
print(frame.f_code.co_name)
caller_frame = frame.f_back
print(caller_frame.f_code.co_name)


def gen_func():
yield 1
name = "bobby"
yield 2
age = 30
return "imooc"

import dis
gen = gen_func()
print (dis.dis(gen))

print(gen.gi_frame.f_lasti)
print(gen.gi_frame.f_locals)
next(gen)
print(gen.gi_frame.f_lasti)
print(gen.gi_frame.f_locals)
next(gen)
print(gen.gi_frame.f_lasti)
print(gen.gi_frame.f_locals)

class company:
def __getitem__(self, item):
pass

from collections import UserList

生成器在UserList中的应用

生成器读取大文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#500G, 特殊 一行
def myreadlines(f, newline):
buf = ""
while True:
while newline in buf:
pos = buf.index(newline)
yield buf[:pos]
buf = buf[pos + len(newline):]
chunk = f.read(4096)

if not chunk:
#说明已经读到了文件结尾
yield buf
break
buf += chunk

with open("input.txt") as f:
for line in myreadlines(f, "{|}"):
print (line)

socket编程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# server
import socket
import threading

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.bind(('0.0.0.0', 8000))
server.listen()


def handle_sock(sock, addr):
while True:
data = sock.recv(1024)
print(data.decode("utf8"))
re_data = input()
sock.send(re_data.encode("utf8"))

#获取从客户端发送的数据
#一次获取1k的数据
while True:
sock, addr = server.accept()

#用线程去处理新接收的连接(用户)
client_thread = threading.Thread(target=handle_sock, args=(sock, addr))
client_thread.start()

# data = sock.recv(1024)
# print(data.decode("utf8"))
# re_data = input()
# sock.send(re_data.encode("utf8"))
# server.close()
# sock.close()



# client
import socket
client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect(('127.0.0.1', 8000))
while True:
re_data = input()
client.send(re_data.encode("utf8"))
data = client.recv(1024)
print(data.decode("utf8"))
# client.send("bobby".encode("utf8"))
# data = client.recv(1024)
# print (data.decode("utf8"))
# client.close()

socket模拟http请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#requests -> urlib -> socket
import socket
from urllib.parse import urlparse


def get_url(url):
#通过socket请求html
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"

#建立socket连接
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# client.setblocking(False)
client.connect((host, 80)) #阻塞不会消耗cpu

#不停的询问连接是否建立好, 需要while循环不停的去检查状态
#做计算任务或者再次发起其他的连接请求

client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))

data = b""
while True:
d = client.recv(1024)
if d:
data += d
else:
break

data = data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
client.close()

if __name__ == "__main__":
import time
start_time = time.time()
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
get_url(url)
print(time.time()-start_time)

python中的GIL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#gil global interpreter lock (cpython)
#python中一个线程对应于c语言中的一个线程
#gil使得同一个时刻只有一个线程在一个cpu上执行字节码, 无法将多个线程映射到多个cpu上执行

#gil会根据执行的字节码行数以及时间片释放gil,gil在遇到io的操作时候主动释放
# import dis
# def add(a):
# a = a+1
# return a
#
# print(dis.dis(add))

total = 0

def add():
#1. dosomething1
#2. io操作
# 1. dosomething3
global total
for i in range(1000000):
total += 1
def desc():
global total
for i in range(1000000):
total -= 1

import threading
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()

thread1.join()
thread2.join()
print(total)

python多线程编程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#对于io操作来说,多线程和多进程性能差别不大
#1.通过Thread类实例化

import time
import threading

def get_detail_html(url):
print("get detail html started")
time.sleep(2)
print("get detail html end")


def get_detail_url(url):
print("get detail url started")
time.sleep(4)
print("get detail url end")


#2. 通过继承Thread来实现多线程
class GetDetailHtml(threading.Thread):
def __init__(self, name):
super().__init__(name=name)

def run(self):
print("get detail html started")
time.sleep(2)
print("get detail html end")


class GetDetailUrl(threading.Thread):
def __init__(self, name):
super().__init__(name=name)

def run(self):
print("get detail url started")
time.sleep(4)
print("get detail url end")

if __name__ == "__main__":
thread1 = GetDetailHtml("get_detail_html")
thread2 = GetDetailUrl("get_detail_url")
start_time = time.time()
thread1.start()
thread2.start()

thread1.join()
thread2.join()

#当主线程退出的时候, 子线程kill掉
print ("last time: {}".format(time.time()-start_time))

线程通信方式- 共享变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#通过queue的方式进行线程间同步
from queue import Queue


import time
import threading


def get_detail_html(queue):
#爬取文章详情页
while True:
url = queue.get()
# for url in detail_url_list:
print("get detail html started")
time.sleep(2)
print("get detail html end")


def get_detail_url(queue):
# 爬取文章列表页
while True:
print("get detail url started")
time.sleep(4)
for i in range(20):
queue.put("http://projectsedu.com/{id}".format(id=i))
print("get detail url end")


#1. 线程通信方式- 共享变量

if __name__ == "__main__":
detail_url_queue = Queue(maxsize=1000)


thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,))
for i in range(10):
html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,))
html_thread.start()
# # thread2 = GetDetailUrl("get_detail_url")
start_time = time.time()
# thread_detail_url.start()
# thread_detail_url1.start()
#
# thread1.join()
# thread2.join()
detail_url_queue.task_done()
detail_url_queue.join()

线程同步Lock,Rlock(可重入锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from threading import Lock, RLock #可重入的锁

#在同一个线程里面,可以连续调用多次acquire, 一定要注意acquire的次数要和release的次数相等
total = 0
lock = RLock()
def add():
#1. dosomething1
#2. io操作
# 1. dosomething3
global lock
global total
for i in range(1000000):
lock.acquire()
lock.acquire()
total += 1
lock.release()
lock.release()


def desc():
global total
global lock
for i in range(1000000):
lock.acquire()
total -= 1
lock.release()

import threading
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()


#
thread1.join()
thread2.join()
print(total)

#1. 用锁会影响性能
#2. 锁会引起死锁
#死锁的情况 A(a,b)
"""
A(a、b)
acquire (a)
acquire (b)

B(a、b)
acquire (a)
acquire (b)
"""

线程同步 Condition(条件变量) 用于线程间复杂的同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import threading

#条件变量, 用于复杂的线程间同步
# class XiaoAi(threading.Thread):
# def __init__(self, lock):
# super().__init__(name="小爱")
# self.lock = lock
#
# def run(self):
# self.lock.acquire()
# print("{} : 在 ".format(self.name))
# self.lock.release()
#
# self.lock.acquire()
# print("{} : 好啊 ".format(self.name))
# self.lock.release()
#
# class TianMao(threading.Thread):
# def __init__(self, lock):
# super().__init__(name="天猫精灵")
# self.lock = lock
#
# def run(self):
#
# self.lock.acquire()
# print("{} : 小爱同学 ".format(self.name))
# self.lock.release()
#
# self.lock.acquire()
# print("{} : 我们来对古诗吧 ".format(self.name))
# self.lock.release()

#通过condition完成协同读诗

class XiaoAi(threading.Thread):
def __init__(self, cond):
super().__init__(name="小爱")
self.cond = cond

def run(self):
with self.cond:
self.cond.wait()
print("{} : 在 ".format(self.name))
self.cond.notify()

self.cond.wait()
print("{} : 好啊 ".format(self.name))
self.cond.notify()

self.cond.wait()
print("{} : 君住长江尾 ".format(self.name))
self.cond.notify()

self.cond.wait()
print("{} : 共饮长江水 ".format(self.name))
self.cond.notify()

self.cond.wait()
print("{} : 此恨何时已 ".format(self.name))
self.cond.notify()

self.cond.wait()
print("{} : 定不负相思意 ".format(self.name))
self.cond.notify()

class TianMao(threading.Thread):
def __init__(self, cond):
super().__init__(name="天猫精灵")
self.cond = cond

def run(self):
with self.cond:
print("{} : 小爱同学 ".format(self.name))
self.cond.notify()
self.cond.wait()

print("{} : 我们来对古诗吧 ".format(self.name))
self.cond.notify()
self.cond.wait()

print("{} : 我住长江头 ".format(self.name))
self.cond.notify()
self.cond.wait()

print("{} : 日日思君不见君 ".format(self.name))
self.cond.notify()
self.cond.wait()

print("{} : 此水几时休 ".format(self.name))
self.cond.notify()
self.cond.wait()

print("{} : 只愿君心似我心 ".format(self.name))
self.cond.notify()
self.cond.wait()



if __name__ == "__main__":
from concurrent import futures
cond = threading.Condition()
xiaoai = XiaoAi(cond)
tianmao = TianMao(cond)

#启动顺序很重要
#在调用with cond之后才能调用wait或者notify方法
#condition有两层锁, 一把底层锁会在线程调用了wait方法的时候释放, 上面的锁会在每次调用wait的时候分配一把并放入到cond的等待队列中,等到notify方法的唤醒
xiaoai.start()
tianmao.start()

线程同步 - Semaphore 使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#Semaphore 是用于控制进入数量的锁
#文件, 读、写, 写一般只是用于一个线程写,读可以允许有多个

#做爬虫
import threading
import time

class HtmlSpider(threading.Thread):
def __init__(self, url, sem):
super().__init__()
self.url = url
self.sem = sem

def run(self):
time.sleep(2)
print("got html text success")
self.sem.release()

class UrlProducer(threading.Thread):
def __init__(self, sem):
super().__init__()
self.sem = sem

def run(self):
for i in range(20):
self.sem.acquire() # 🔓的数量减1
html_thread = HtmlSpider("https://baidu.com/{}".format(i), self.sem)
html_thread.start()

if __name__ == "__main__":
sem = threading.Semaphore(3)
url_producer = UrlProducer(sem)
url_producer.start()

线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETED
from concurrent.futures import Future
from multiprocessing import Pool

#未来对象,task的返回容器


#线程池, 为什么要线程池
#主线程中可以获取某一个线程的状态或者某一个任务的状态,以及返回值
#当一个线程完成的时候我们主线程能立即知道
#futures可以让多线程和多进程编码接口一致
import time

def get_html(times):
time.sleep(times)
print("get page {} success".format(times))
return times



executor = ThreadPoolExecutor(max_workers=2)
#通过submit函数提交执行的函数到线程池中, submit 是立即返回
# task1 = executor.submit(get_html, (3))
# task2 = executor.submit(get_html, (2))


#要获取已经成功的task的返回
urls = [3,2,4]
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=FIRST_COMPLETED)
print("main")
# for future in as_completed(all_task):
# data = future.result()
# print("get {} page".format(data))
#通过executor的map获取已经完成的task的值
# for data in executor.map(get_html, urls):
# print("get {} page".format(data))


# #done方法用于判定某个任务是否完成
# print(task1.done())
# print(task2.cancel())
# time.sleep(3)
# print(task1.done())
#
# #result方法可以获取task的执行结果
# print(task1.result())

多线程和多进程对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor
#多进程编程
#耗cpu的操作,用多进程编程, 对于io操作来说, 使用多线程编程,进程切换代价要高于线程

#1. 对于耗费cpu的操作,多进程优于多线程
# def fib(n):
# if n<=2:
# return 1
# return fib(n-1)+fib(n-2)
#
# if __name__ == "__main__":
# with ThreadPoolExecutor(3) as executor:
# all_task = [executor.submit(fib, (num)) for num in range(25,40)]
# start_time = time.time()
# for future in as_completed(all_task):
# data = future.result()
# print("exe result: {}".format(data))
#
# print("last time is: {}".format(time.time()-start_time))

#2. 对于io操作来说,多线程优于多进程
def random_sleep(n):
time.sleep(n)
return n

if __name__ == "__main__":
with ProcessPoolExecutor(3) as executor:
all_task = [executor.submit(random_sleep, (num)) for num in [2]*30]
start_time = time.time()
for future in as_completed(all_task):
data = future.result()
print("exe result: {}".format(data))

print("last time is: {}".format(time.time()-start_time))

multiprocessing 多进程编程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# import os
# #fork只能用于linux/unix中
# pid = os.fork()
# print("bobby")
# if pid == 0:
# print('子进程 {} ,父进程是: {}.' .format(os.getpid(), os.getppid()))
# else:
# print('我是父进程:{}.'.format(pid))


import multiprocessing

#多进程编程
import time
def get_html(n):
time.sleep(n)
print("sub_progress success")
return n


if __name__ == "__main__":
# progress = multiprocessing.Process(target=get_html, args=(2,))
# print(progress.pid)
# progress.start()
# print(progress.pid)
# progress.join()
# print("main progress end")

#使用线程池
pool = multiprocessing.Pool(multiprocessing.cpu_count())
# result = pool.apply_async(get_html, args=(3,))
#
# #等待所有任务完成
# pool.close()
# pool.join()
#
# print(result.get())

#imap
# for result in pool.imap(get_html, [1,5,3]):
# print("{} sleep success".format(result))

for result in pool.imap_unordered(get_html, [1,5,3]):
print("{} sleep success".format(result))


进程间通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import time
from multiprocessing import Process, Queue, Pool, Manager, Pipe


# def producer(queue):
# queue.put("a")
# time.sleep(2)
#
# def consumer(queue):
# time.sleep(2)
# data = queue.get()
# print(data)
#
# if __name__ == "__main__":
# queue = Queue(10)
# my_producer = Process(target=producer, args=(queue,))
# my_consumer = Process(target=consumer, args=(queue,))
# my_producer.start()
# my_consumer.start()
# my_producer.join()
# my_consumer.join()

#共享全局变量通信
#共享全局变量不能适用于多进程编程,可以适用于多线程


# def producer(a):
# a += 100
# time.sleep(2)
#
# def consumer(a):
# time.sleep(2)
# print(a)
#
# if __name__ == "__main__":
# a = 1
# my_producer = Process(target=producer, args=(a,))
# my_consumer = Process(target=consumer, args=(a,))
# my_producer.start()
# my_consumer.start()
# my_producer.join()
# my_consumer.join()

#multiprocessing中的queue不能用于pool进程池
#pool中的进程间通信需要使用manager中的queue

# def producer(queue):
# queue.put("a")
# time.sleep(2)
#
# def consumer(queue):
# time.sleep(2)
# data = queue.get()
# print(data)
#
# if __name__ == "__main__":
# queue = Manager().Queue(10)
# pool = Pool(2)
#
# pool.apply_async(producer, args=(queue,))
# pool.apply_async(consumer, args=(queue,))
#
# pool.close()
# pool.join()

#通过pipe实现进程间通信
#pipe的性能高于queue

# def producer(pipe):
# pipe.send("bobby")
#
# def consumer(pipe):
# print(pipe.recv())
#
# if __name__ == "__main__":
# recevie_pipe, send_pipe = Pipe()
# #pipe只能适用于两个进程
# my_producer= Process(target=producer, args=(send_pipe, ))
# my_consumer = Process(target=consumer, args=(recevie_pipe,))
#
# my_producer.start()
# my_consumer.start()
# my_producer.join()
# my_consumer.join()

def add_data(p_dict, key, value):
p_dict[key] = value

if __name__ == "__main__":
progress_dict = Manager().dict()
from queue import PriorityQueue

first_progress = Process(target=add_data, args=(progress_dict, "bobby1", 22))
second_progress = Process(target=add_data, args=(progress_dict, "bobby2", 23))

first_progress.start()
second_progress.start()
first_progress.join()
second_progress.join()

print(progress_dict)

协程

并发、并行、同步、异步、阻塞、非阻塞

  • 并法是指一个时间段内,有几个程序在同一个cpu上运行,但是任意时刻只有一个程序在cpu上运行
  • 并行是指任意时刻点上,有多个程序同时运行在多个cpu上
  • 同步是指代码调用IO操作时,必须等待IO操作完成才返回的调用方式
  • 异步是指代码调用IO操作时,不必等IO操作完成就返回的调用方式

IO 多路复用 (select、poll 和 epoll)

C10k问题:是一个在1999年被提出的技术挑战,如何在一颗1GHz CPU,2G内存,1gbps网络环境下,让单台服务器同时为1万个客户端提供FTP服务

Unix下五种I/O模型

  • 阻塞式I/O
  • 非阻塞式I/O
  • I/O复用
  • 信号驱动式I/O
  • 异步I/O(POSIX的aio_系列函数)

调用select之后,操作系统会返回哪些文件句柄准备好了。select方法也是一个阻塞的方法,如果操作系统的文件句柄都没有准备好,就会阻塞住。select能够监听多个socket状态,只要有一个socket准备好就返回。

select、poll、epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select、poll、epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

epoll并不代表一定比select好
在并发高的情况下,连接活跃度不是很高, epoll比select
并发性不高,同时连接很活跃, select比epoll好

通过非阻塞io实现http请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#1. epoll并不代表一定比select好
# 在并发高的情况下,连接活跃度不是很高, epoll比select
# 并发性不高,同时连接很活跃, select比epoll好

#通过非阻塞io实现http请求

import socket
from urllib.parse import urlparse


#使用非阻塞io完成http请求

def get_url(url):
#通过socket请求html
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"

#建立socket连接
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setblocking(False)
try:
client.connect((host, 80)) #阻塞不会消耗cpu
except BlockingIOError as e:
pass

#不停的询问连接是否建立好, 需要while循环不停的去检查状态
#做计算任务或者再次发起其他的连接请求

while True:
try:
client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
break
except OSError as e:
pass


data = b""
while True:
try:
d = client.recv(1024)
except BlockingIOError as e:
continue
if d:
data += d
else:
break

data = data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
client.close()

if __name__ == "__main__":
get_url("http://www.baidu.com")

select+回调+事件循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# select + 回调 + 事件循环
# 并发性高
# 使用单线程

import socket
from urllib.parse import urlparse
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE


selector = DefaultSelector()
#使用select完成http请求
urls = []
stop = False


class Fetcher:
def connected(self, key):
selector.unregister(key.fd)
self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
selector.register(self.client.fileno(), EVENT_READ, self.readable)

def readable(self, key):
d = self.client.recv(1024)
if d:
self.data += d
else:
selector.unregister(key.fd)
data = self.data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
self.client.close()
urls.remove(self.spider_url)
if not urls:
global stop
stop = True

def get_url(self, url):
self.spider_url = url
url = urlparse(url)
self.host = url.netloc
self.path = url.path
self.data = b""
if self.path == "":
self.path = "/"

# 建立socket连接
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.setblocking(False)

try:
self.client.connect((self.host, 80)) # 阻塞不会消耗cpu
except BlockingIOError as e:
pass

#注册
selector.register(self.client.fileno(), EVENT_WRITE, self.connected)


def loop():
#事件循环,不停的请求socket的状态并调用对应的回调函数
#1. select本身是不支持register模式
#2. socket状态变化以后的回调是由程序员完成的
while not stop:
ready = selector.select()
for key, mask in ready:
call_back = key.data
call_back(key)
#回调+事件循环+select(poll\epoll)

if __name__ == "__main__":
fetcher = Fetcher()
import time
start_time = time.time()
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
urls.append(url)
fetcher = Fetcher()
fetcher.get_url(url)
loop()
print(time.time()-start_time)

# def get_url(url):
# #通过socket请求html
# url = urlparse(url)
# host = url.netloc
# path = url.path
# if path == "":
# path = "/"
#
# #建立socket连接
# client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# client.setblocking(False)
# try:
# client.connect((host, 80)) #阻塞不会消耗cpu
# except BlockingIOError as e:
# pass
#
# #不停的询问连接是否建立好, 需要while循环不停的去检查状态
# #做计算任务或者再次发起其他的连接请求
#
# while True:
# try:
# client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
# break
# except OSError as e:
# pass
#
#
# data = b""
# while True:
# try:
# d = client.recv(1024)
# except BlockingIOError as e:
# continue
# if d:
# data += d
# else:
# break
#
# data = data.decode("utf8")
# html_data = data.split("\r\n\r\n")[1]
# print(html_data)
# client.close()


回调之痛

  • 如果回调函数执行不正常该怎么办?
  • 如果回调里面还要嵌套回调怎么办?要嵌套很多层怎么办?
  • 如果嵌套了很多层,其中某个环节出错了会造成什么后果?
  • 如果有个数据需要被每个回调都处理怎么办?
  • 怎么使用当前函数中的局部变量?
  • ……

可读性差、共享状态管理困难、异常处理困难

协程

C10M问题:如何利用8核心CPU,64G内存,在10gbps的网络上保持1000万并发连接

问题:

  • 回调模式编码复杂度高
  • 同步编程的并法性不高
  • 多线程编程需要线程间同步
  • 采用同步的方式去编写异步的代码
  • 使用单线程去切换任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def get_url(url):
#do someting 1
html = get_html(url) #此处暂停,切换到另一个函数去执行
# #parse html
urls = parse_url(html)

def get_url(url):
#do someting 1
html = get_html(url) #此处暂停,切换到另一个函数去执行
# #parse html
urls = parse_url(html)

#传统函数调用 过程 A->B->C
#我们需要一个可以暂停的函数,并且可以在适当的时候恢复该函数的继续执行
#出现了协程 -> 有多个入口的函数, 可以暂停的函数, 可以暂停的函数(可以向暂停的地方传入值)

生成器进阶-send、close和throw方法

生成器不只可以产出值,还可以传出值。

send方法:

  • 启动生成器的方式有两种,next()和send()
  • send方法可以传递值进入生成器内部,同时还可以重启生成器执行到下一个yield位置
  • 在调用send发送非None值之前,我们必须启动一次生成器,方式有两种,1、gen.send(None) 2、next(gen)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def gen_func():
# 可以产出值,可以接收值(调用方传递进来的值)
html = yield "https://www.jd.com"
print(html)
yield 2
yield 3
return "cwz"


if __name__ == '__main__':
gen = gen_func()
# url = next(gen)
# 在调用send发送非None值之前,我们必须启动一次生成器,方式有两种,1、gen.send(None) 2、next(gen)
gen.send(None)
# 模拟页面内容
html_content = "网址内容"
print(gen.send(html_content)) # send方法可以传递值进入生成器内部,同时还可以重启生成器执行到下一个yield位置
# 启动生成器方式有两种,next(), send()


# 输出结果:
网址内容
2

close方法:

  • 会关闭生成器,RuntimeError: generator ignored GeneratorExit
  • 这里抛出异常是向下执行yield抛出的。
  • 不捕捉异常是不会抛错的,gen.close() 之后再next(gen),会抛出StopIteration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def gen_func():
try:
yield "https://www.baidu.com"
except GeneratorExit:
pass
yield 2
yield 3
return "cwz"


if __name__ == '__main__':
gen = gen_func()
print(next(gen))
gen.close() # RuntimeError: generator ignored GeneratorExit
# print(next(gen))

throw方法:

  • 抛出异常,在当前yield抛出,必须要手动捕获异常,不像close方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def gen_func():
try:
yield "https://www.baidu.com"
except Exception:
pass
yield 2
yield 3
return "cwz"


if __name__ == '__main__':
gen = gen_func()
print(next(gen))
gen.throw(Exception, "download error")
print(next(gen))

yield_from

python3.3新加了yield from语法

1
2
3
4
5
6
7
8
9
10
from itertools import chain

my_list = [1, 2, 3]
my_dict = {
"name": "reese",
"age": 18
}

for value in chain(my_list, my_dict, range(2, 5)):
print(value)

chain可以传入任意可迭代对象。

自己实现chain:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
my_list = [1, 2, 3]
my_dict = {
"name": "reese",
"age": 18
}


def my_chain(*args, **kwargs):
for iterable_value in args:
for value in iterable_value:
yield value


for i in my_chain(my_list, my_dict):
print(i)

yield from + 可迭代对象,举个例子说明yield from和yield的区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def g1(iterable):
yield iterable

def g2(iterable):
yield from iterable

for i in g1(range(5)):
print(i)

for i in g2(range(5)):
print(i)


# 输入结果:
range(0, 5)
0
1
2
3
4

yield from的一些概念:

1
2
3
4
5
6
7
8
9
10
11
def g1(gen):
yield from gen

def main():
g = g1(1)
g.send(None)


# main是调用方,g1是委托生成器,gen是子生成器
# yield from 会在调用方与子生成器之间建立一个双向通道
# 传统写函数的时候,g1是一个调用函数,yield from理解为调用一个子函数,子函数的返回是返回个g1,在返回给main。现在有个yield from 直接在调用方main和gen之间建立了通道,main中的send直接发给了子生成器了

yield from的用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
final_result = {}


# 子生成器,将外界传过来的值做保存
def sales_sum(pro_name):
total = 0
nums = []
while True:
# 不停地从外界接收值,放在nums和total里面
x = yield
print(pro_name + "销量: ", x)
# 如果外界传过来的值是None,就退出循环
if not x:
break
total += x
nums.append(x)
return total, nums


# 委托生成器
def middle(key):
while True:
final_result[key] = yield from sales_sum(key)
print(key + "销量统计完成!!")


def main():
data_sets = {
"小明牌面膜": [1200, 1500, 3000],
"小明牌手机": [28, 55, 98, 108],
"小明牌大衣": [280, 560, 778, 70],
}
for key, data_set in data_sets.items():
print("start key:", key)
m = middle(key)
m.send(None) # 预激middle协程,send是send到子生成器sales_sum中
for value in data_set:
m.send(value) # 给协程传递每一组的值
m.send(None)
print("final_result:", final_result)


if __name__ == '__main__':
main()

上面的代码中,如果将yield from直接替换成子生成器sales_num方法,会怎么样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def sales_sum(pro_name):
total = 0
nums = []
while True:
# 不停地从外界接收值,放在nums和total里面
x = yield
print(pro_name + "销量: ", x)
# 如果外界传过来的值是None,就退出循环
if not x:
break
total += x
nums.append(x)
return total, nums

if __name__ == "__main__":
my_gen = sales_sum("小明牌手机")
my_gen.send(None)
my_gen.send(1200)
my_gen.send(1500)
my_gen.send(3000)
try:
my_gen.send(None)
except StopIteration as e:
result = e.value
print(result)

需要手动处理StopIteration异常,才能将处理的值返回到 final_result[key]这里。

yield from原理说明:

基础版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#pep380规定了yield from

#1. RESULT = yield from EXPR可以简化成下面这样
#一些说明
"""
_i:子生成器,同时也是一个迭代器
_y:子生成器生产的值
_r:yield from 表达式最终的值
_s:调用方通过send()发送的值
_e:异常对象

"""

_i = iter(EXPR) # EXPR是一个可迭代对象,_i其实是子生成器;
try:
_y = next(_i) # 预激子生成器,把产出的第一个值存在_y中;
except StopIteration as _e:
_r = _e.value # 如果抛出了`StopIteration`异常,那么就将异常对象的`value`属性保存到_r,这是最简单的情况的返回值;
else:
while 1: # 尝试执行这个循环,委托生成器会阻塞;
_s = yield _y # 生产子生成器的值,等待调用方`send()`值,发送过来的值将保存在_s中;
try:
_y = _i.send(_s) # 转发_s,并且尝试向下执行;
except StopIteration as _e:
_r = _e.value # 如果子生成器抛出异常,那么就获取异常对象的`value`属性存到_r,退出循环,恢复委托生成器的运行;
break
RESULT = _r # _r就是整个yield from表达式返回的值。

上面的过程只是处理一般的情况,可能还需要处理以下情况:

  • 子生成器可能只是一个迭代器,并不是一个作为协程的生成器,所以它不支持.throw()和.close()方法
  • 如果子生成器支持.throw()和.close()方法,但是在子生成器内部,这两个方法都会抛出异常
  • 调用方让子生成器自己抛出异常
  • 当调用方使用next()或者.send(None)时,都要在子生成器上调用next()函数,当调用方使用.send()发送非 None 值时,才调用子生成器的.send()方法

完整版逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
_i = iter(EXPR)
try:
_y = next(_i)
except StopIteration as _e:
_r = _e.value
else:
while 1:
try:
_s = yield _y
except GeneratorExit as _e:
try:
_m = _i.close
except AttributeError:
pass
else:
_m()
raise _e
except BaseException as _e:
_x = sys.exc_info()
try:
_m = _i.throw
except AttributeError:
raise _e
else:
try:
_y = _m(*_x)
except StopIteration as _e:
_r = _e.value
break
else:
try:
if _s is None:
_y = next(_i)
else:
_y = _i.send(_s)
except StopIteration as _e:
_r = _e.value
break
RESULT = _r
  • while 1上面的逻辑和前面的一致,都是先获取一个迭代器然后next(),处理StopIteration。但是在调用while 1的时候,可能从外部接收到GeneratorExit(可能直接按了ctrl+c),这里首先获取_i的close,但是_i可能只是一个迭代器不是生成器,这时候调用close属性会抛出AttributeError,这种情况会将它忽略掉。然后调用close()方法,raise掉异常。
  • 还有BaseException是所有异常的父类,首先做了try,获取throw()方法。如果有throw(),调用之后会处理StopIteration
  • 如果没有异常,在_s为None的时候就要调用next(),不为None就调用send方法,最后处理一下StopIteration

总结一下关键点:

  • 子生成器生产的值,都是直接传给调用方的;调用方通过.send()发送的值都是直接传递给子生成器的;如果发送的是 None,会调用子生成器的__next__()方法,如果不是 None,会调用子生成器的.send()方法;
  • 子生成器退出的时候,最后的return EXPR,会触发一个StopIteration(EXPR)异常;
  • yield from表达式的值,是子生成器终止时,传递给StopIteration异常的第一个参数;
  • 如果调用的时候出现StopIteration异常,委托生成器会恢复运行,同时其他的异常会向上 “冒泡”;
  • 传入委托生成器的异常里,除了GeneratorExit之外,其他的所有异常全部传递给子生成器的.throw()方法;如果调用.throw()的时候出现了StopIteration异常,那么就恢复委托生成器的运行,其他的异常全部向上 “冒泡”;
  • 如果在委托生成器上调用.close()或传入GeneratorExit异常,会调用子生成器的.close()方法,没有的话就不调用。如果在调用.close()的时候抛出了异常,那么就向上 “冒泡”,否则的话委托生成器会抛出GeneratorExit异常。

async和await定义原生的协程

  • python为了将语义变得更加明确,就引入了async和await关键词用于定义原生的协程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from collections.abc import Awaitable

# Awaitable对象主要是实现了__await__方法
# async关键字定义的函数是实现了__await__方法的

async def downloader(url):
return "url"


async def download_url(url):
html = await downloader(url) # await后面跟的是Awaitable对象,可以将await理解成yield from
return html


if __name__ == '__main__':
coro = download_url("https://www.baidu.com")
# next(coro)
coro.send(None)

装饰器+生成器来实现协程,不建议

生成器是如何变成协程的:

我们希望协程是能够用单线程来调度的,这样我们就不需要像线程一样让操作系统去调度。协程是由程序员自己去调度的,它没有深入到内核级别去进行调度,进程和线程都是内核级别的调度,而协程是函数级别的调度。我们希望协程是由自己来决定什么时候来调用,并且能够像写同步代码一样来编写异步代码。 而生成器就可以完成协程这样的功能。

生成器是有状态的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#生成器是可以暂停的函数
import inspect
def gen_func():
yield 1
return "cwz"

if __name__ == '__main__':
gen = gen_func()
print(inspect.getgeneratorstate(gen)) # GEN_CREATED
next(gen)
print(inspect.getgeneratorstate(gen)) # GEN_SUSPENDED
try:
next(gen)
except StopIteration:
pass
print(inspect.getgeneratorstate(gen)) # GEN_CLOSED

从上面的代码也可以看出来,生成器的很多功能就奠定了我们可以用生成器来实现协程。

1
2
3
def gen_func():
value = yield 1
return "cwz"

上面的代码中,value = yield 1有两层含义:

  • 第一,返回值给调用方
  • 第二:调用方通过send方式返回值给gen

其中有了这两层含义就可以看作是协程了,现在可以消费从外面传递进来的数据。之前的生成器只是一个生产者,只是yield值出去,现在变成了一个消费者,可以接收值进来做一个处理。这种模式就可以称作是协程了,再加上yield from,就更加是了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#1. 用同步的方式编写异步的代码, 在适当的时候暂停函数并在适当的时候启动函数
import socket
def get_socket_data():
yield "bobby"

def downloader(url):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setblocking(False)

try:
client.connect((host, 80)) # 阻塞不会消耗cpu
except BlockingIOError as e:
pass

selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
source = yield from get_socket_data()
data = source.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)

def download_html(html):
html = yield from downloader()

if __name__ == "__main__":
#协程的调度依然是 事件循环+协程模式 ,协程是单线程模式
pass

asyncio并发编程

asyncio模块介绍

  • 包含各种特定系统实现的模块化事件循环

  • 传输和协议抽象

  • 对TCP、UDP、SSL、子进程呢个、延时调用以及其他的具体支持
  • 模仿futures模块但适用于事件循环使用的Future类
  • 基于yield from的协议和任务,可以让你用顺序的方式编写并发代码
  • 必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池
  • 模仿threading模块中的同步原语,可以用在单线程内的协程之间

使用asyncio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
import time


async def get_html(url):
print("start get html")
await asyncio.sleep(3)
print("end get html")


if __name__ == '__main__':
start = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(get_html("https://www.baidu.com"))
print("use time:", time.time() - start)

模拟访问10个网页

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import time


async def get_html(url):
print("start get html")
await asyncio.sleep(3)
print("end get html")


if __name__ == '__main__':
start = time.time()
loop = asyncio.get_event_loop()
tasks = [get_html("https://www.baidu.com") for i in range(10)]
loop.run_until_complete(asyncio.wait(tasks))
print("use time:", time.time() - start)

无论tasks是多少,需要的时间大概在3秒

异步编程中不能使用同步阻塞的代码,上述代码中如果await asyncio.sleep(3)换成了time.sleep(3)就不行了

获取返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import time

# 获取协程的返回值
async def get_html(url):
print("start get html")
await asyncio.sleep(3)
print("end get html")
return "cwz"


if __name__ == '__main__':
loop = asyncio.get_event_loop()
get_feature = asyncio.ensure_future(get_html("https://www.baidu.com"))
loop.run_until_complete(get_feature)
print(get_feature.result())

或者可以这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
import time

# 获取协程的返回值
async def get_html(url):
print("start get html")
await asyncio.sleep(3)
print("end get html")
return "cwz"


if __name__ == '__main__':
loop = asyncio.get_event_loop()
# get_feature = asyncio.ensure_future(get_html("https://www.baidu.com"))
task = loop.create_task(get_html("https://www.baidu.com"))
loop.run_until_complete(task)
print(task.result())

ensure_future最终还是调用的是create_task,将传递进来的协程包装成一个task,并且将协程注册到loop队列里面去。 ensure_future源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def ensure_future(coro_or_future, *, loop=None):
"""Wrap a coroutine or an awaitable in a future.

If the argument is a Future, it is returned directly.
"""
if coroutines.iscoroutine(coro_or_future):
if loop is None:
loop = events.get_event_loop()
task = loop.create_task(coro_or_future)
if task._source_traceback:
del task._source_traceback[-1]
return task
elif futures.isfuture(coro_or_future):
if loop is not None and loop is not futures._get_loop(coro_or_future):
raise ValueError('The future belongs to a different loop than '
'the one specified as the loop argument')
return coro_or_future
elif inspect.isawaitable(coro_or_future):
return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
else:
raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
'required')

协程结束之后加上自己的callback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
from functools import partial


# 获取协程的返回值
async def get_html(url):
print("start get html")
await asyncio.sleep(3)
print("end get html")
return "cwz"


# 协程结束之后可以callback
def callback(url, future):
print(future) # 这个future就是task
print("send email to cwz...")


if __name__ == '__main__':
loop = asyncio.get_event_loop()
task = loop.create_task(get_html("https://www.baidu.com"))
task.add_done_callback(partial(callback, "https://www.baidu.com"))
loop.run_until_complete(task)
print(task.result())

wait和gather:

gather更加高效,可以分组,可以将所有任务取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time


async def get_html(url):
print("start get url")
await asyncio.sleep(2)
print("end get url")


if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop()
tasks = [get_html("http://www.baidu.com") for i in range(10)]

# gather和wait的区别
# gather更加high-level
group1 = [get_html("http://projectsedu.com") for i in range(2)]
group2 = [get_html("http://www.baidu.com") for i in range(2)]
group1 = asyncio.gather(*group1)
group2 = asyncio.gather(*group2)
group2.cancel()
loop.run_until_complete(asyncio.gather(group1, group2))
print(time.time() - start_time)

task取消:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#1. run_until_complete
# import asyncio
# loop = asyncio.get_event_loop()
# loop.run_forever()
# loop.run_until_complete()

#1. loop会被放到future中
#2. 取消future(task)

import asyncio
import time

async def get_html(sleep_times):
print("waiting")
await asyncio.sleep(sleep_times)
print("done after {}s".format(sleep_times))


if __name__ == "__main__":
task1 = get_html(2)
task2 = get_html(3)
task3 = get_html(3)

tasks = [task1, task2, task3]

loop = asyncio.get_event_loop()

try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
all_tasks = asyncio.Task.all_tasks()
for task in all_tasks:
print("cancel task")
print(task.cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()

call_soon、call_at、call_later、call_soon_threadsafe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio


def callback(sleep_times, loop):
print("success time {}".format(loop.time()))


def stoploop(loop):
loop.stop()


# call_later, call_at
if __name__ == "__main__":
loop = asyncio.get_event_loop()
now = loop.time()
loop.call_at(now + 2, callback, 2, loop)
loop.call_at(now + 1, callback, 1, loop)
loop.call_at(now + 3, callback, 3, loop)
# loop.call_soon(stoploop, loop)
loop.call_soon(callback, 4, loop)
loop.run_forever()

ThreadPollExecutor 和 asycio 完成阻塞 IO 请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# 使用多线程:在协程中集成阻塞io
import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
from urllib.parse import urlparse


def get_url(url):
# 通过socket请求html
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"

# 建立socket连接
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# client.setblocking(False)
client.connect((host, 80)) # 阻塞不会消耗cpu

# 不停的询问连接是否建立好, 需要while循环不停的去检查状态
# 做计算任务或者再次发起其他的连接请求

client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))

data = b""
while True:
d = client.recv(1024)
if d:
data += d
else:
break

data = data.decode("utf8")
html_data = data.split("\r\n\r\n")[1]
print(html_data)
client.close()


if __name__ == "__main__":
import time

start_time = time.time()
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(3)
tasks = []
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
task = loop.run_in_executor(executor, get_url, url)
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print("last time:{}".format(time.time() - start_time))

asyncio模拟http请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# asyncio 没有提供http协议的接口 aiohttp
import asyncio
import socket
from urllib.parse import urlparse


async def get_url(url):
# 通过socket请求html
url = urlparse(url)
host = url.netloc
path = url.path
if path == "":
path = "/"

# 建立socket连接
reader, writer = await asyncio.open_connection(host, 80)
writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
all_lines = []
async for raw_line in reader:
data = raw_line.decode("utf8")
all_lines.append(data)
html = "\n".join(all_lines)
return html


async def main():
tasks = []
for url in range(20):
url = "http://shop.projectsedu.com/goods/{}/".format(url)
tasks.append(asyncio.ensure_future(get_url(url)))
for task in asyncio.as_completed(tasks):
result = await task
print(result)


if __name__ == "__main__":
import time

start_time = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
print('last time:{}'.format(time.time() - start_time))

async with 调用了__await____aenter__

aiohttp事项高并发爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# asyncio爬虫、去重、入库
import asyncio
import re
import aiohttp
import aiomysql
from pyquery import PyQuery

stopping = False
start_url = "http://www.jobbole.com/xinwen/xwyd/"
waiting_urls = [] # 待爬取的url
seen_urls = set() # 已经爬取过的url

sem = asyncio.Semaphore(3)


async def fetch(url, session):
async with sem:
try:
async with session.get(url) as resp:
print("url status: {}".format(resp.status))
if resp.status in [200, 201]:
data = await resp.text()
return data
except Exception as e:
print(e)


# 解析是通过cpu完成的,不涉及到IO,不需要使用协程
def extract_urls(html):
urls = []
pq = PyQuery(html)
for link in pq.items("a"):
url = link.attr("href")
if url and url.startswith("http") and url not in seen_urls:
urls.append(url)
waiting_urls.append(url)
return urls


# 从首页获取待爬取的url
async def init_urls(url, session):
html = await fetch(url, session)
seen_urls.add(url)
extract_urls(html) # 从html中解析出所有的url


async def article_handler(url, session, pool):
# 获取文章详情并解析入库
html = await fetch(url, session)
seen_urls.add(url)
extract_urls(html)
pq = PyQuery(html)
title = pq("title").text()
async with pool.acquire() as conn:
async with conn.cursor() as cur:
insert_sql = "insert into article(title) values('{}') ".format(title)
await cur.execute(insert_sql)


# 从waiting_urls中获取url 然后启动协程去完成爬取的过程
# consumer可以不停的从waiting_urls中获取数据,启动协程之后扔到asyncio事件循环中来
# 然后从事件循环 队列中取出数据,取到数据之后创建协程,讲协程扔到asyncio中来,这个时候把asyncio看作协程池
async def consumer(pool):
async with aiohttp.ClientSession() as session:
while not stopping:
# 如果这个协程消费的过快,生产者协程往里面放数据过慢的话,有可能会抛异常
if len(waiting_urls) == 0:
await asyncio.sleep(0.5)
continue

url = waiting_urls.pop() # 取数据
print("start get url: {}".format(url))

if re.match("http://.*?jobbole.com/xinwen/xwyd/\d+/", url):
# 如果没有爬取过,直接提交一个协程
# 这个协程负责获取url,解析这个url,并入库
if url not in seen_urls:
asyncio.ensure_future(article_handler(url, session, pool))
await asyncio.sleep(30)
# else:
# if url not in seen_urls:
# asyncio.ensure_future(init_urls(url, session))


async def main(loop):
# 等待mysql连接建立好
pool = await aiomysql.create_pool(
host='127.0.0.1',
port=3306,
user='root',
password='123456',
db='test1',
loop=loop,
charset='utf8',
autocommit=True
)
async with aiohttp.ClientSession() as session:
html = await fetch(start_url, session)
seen_urls.add(start_url)
extract_urls(html)
asyncio.ensure_future(consumer(pool))


if __name__ == '__main__':
loop = asyncio.get_event_loop()
asyncio.ensure_future(main(loop))
loop.run_forever()

python并发的解决方案:

  • tornado
  • twisted
  • gevent
  • asyncio

tornado、twisted采用了python的生成器

gevent是通过C语言实现了底层的协程完成的,gevnet最大的问题是采用了猴子补丁,将很多python内置库进行了补丁,用起来会很简单,但是很多内部的异常很难捕捉到。

并发编程中,asyncio为什么会这么复杂?协程编程为什么比我们多进程多线程编程复杂很多?原因就是协程是由程序员自己来调度的,程序员自己来调度肯定不会让平常编码的人来调度,所以就出现了一些框架,像asyncio,主要就是来完成协程调度的同时还能让我们像编写同步代码的方式来编写异步代码。

我们平常在编写多进程多线程的代码时,线程调度和进程调度是由操作系统来完成的,操作系统对于我们很多程序员来说是一个黑盒,实际上操作系统在调度的过程中有非常多的逻辑,操作系统完成了这些逻辑,很多人又不会去接触操作系统的原理,所以操作系统本身的调度过程我们是不清楚的。而我们只要写出多线程就行了,但是协程不一样,协程编程模式由阻塞IO的编码方式直接转换成异步非阻塞的编码方式,这是一个很大的转变。在这个转变过程中,我们不得不里了解其中协程的调度过程。