python中一切皆对象 函数和类也是对象,属于python的一等公民
赋值给一个变量
可以添加到集合对象中
可以作为参数传递给函数
可以当做函数的返回值
type、object和class之间的关系 type可以生成一个类,可以返回对象的类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 a = 1 b = "abc" print (type (1 )) print (type (int )) print (type (b))print (type (str ))class Student : pass class MyStudent (Student ): pass stu = Student() print (type (stu)) print (type (Student)) print (int .__bases__) print (Student.__bases__) print (MyStudent.__bases__) print (type .__bases__) print (object .__bases__) print (type (object ))
魔法函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class MyVector (object ): def __init__ (self, x, y ): self.x = x self.y = y def __add__ (self, other ): re_vector = MyVector(self.x+other.x, self.y+other.y) return re_vector def __str__ (self ): return "x:{x}, y:{y}" .format (x=self.x, y=self.y) first_vec = MyVector(1 ,2 ) second_vec = MyVector(3 ,4 ) print (first_vec + second_vec)
抽象基类(abc模块) 1 2 3 4 5 6 7 8 9 10 11 12 class Company (object ): def __init__ (self, employee_list ): self.employee = employee_list def __len__ (self ): return len (self.employee) com = Company(['reese' , 'neo' ]) from collections.abc import Sizedprint (isinstance (com, Sized))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import abcclass CacheBase (metaclass=abc.ABCMeta): @abc.abstractmethod def get (self, key ): pass @abc.abstractmethod def set (self, key, value ): pass class RedisCache (CacheBase ): pass redis_cache = RedisCache()
python的自省机制 自省是通过一定的机制查询到对象的内部结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class Person : name = "user" class Student (Person ): def __init__ (self, school_name ): self.school_name = school_name if __name__ == "__main__" : user = Student("cwz" ) print (user.__dict__)
super()真的是调用父类的吗 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class A : def __init__ (self ): print ('A' ) class B (A ): def __init__ (self ): print ("B" ) super ().__init__() class C (A ): def __init__ (self ): print ("C" ) super (C, self).__init__() class D (B, C): def __init__ (self ): print ("D" ) super (D, self).__init__() if __name__ == '__main__' : D() D B C A
mixin模式特点:
Mixin类功能单一
不和基类关联,可以和任意基类组合
在mixin中不要使用super这种用法
with语句 上下文管理协议
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class Simple : def __enter__ (self ): print ("enter" ) return self def __exit__ (self, exc_type, exc_val, exc_tb ): print ("exit" ) def do_something (self ): print ("do something" ) with Simple() as simple: simple.do_something()
contextlib
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import contextlib@contextlib.contextmanager def test (x ): print ("start..." ) yield print ("end..." ) with test(1 ) as f: print ("进行中" ) start... 进行中 end...
自定义序列类
python中序列类型的abc继承关系
1 from collections import abc
切片 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 """ 其中,第一个数字start表示切片开始位置,默认为0; 第二个数字end表示切片截止(但不包含)位置(默认为列表长度); 第三个数字step表示切片的步长(默认为1)。 当start为0时可以省略,当end为列表长度时可以省略, 当step为1时可以省略,并且省略步长时可以同时省略最后一个冒号。 另外,当step为负整数时,表示反向切片,这时start应该比end的值要大才行。 """ aList = [3 , 4 , 5 , 6 , 7 , 9 , 11 , 13 , 15 , 17 ] print (aList[::]) print (aList[::-1 ]) print (aList[::2 ]) print (aList[1 ::2 ]) print (aList[3 :6 ]) aList[0 :100 ] aList[100 :] aList[len (aList):] = [9 ] aList[:0 ] = [1 , 2 ] aList[3 :3 ] = [4 ] aList[:3 ] = [1 , 2 ] aList[3 :] = [4 , 5 , 6 ] aList[::2 ] = [0 ] * 3 print (aList)aList[::2 ] = ['a' , 'b' , 'c' ] aList[::2 ] = [1 ,2 ] aList[:3 ] = [] del aList[:3 ] del aList[::2 ]
自定义可切片对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import numbersclass Group : def __init__ (self, group_name, company_name, staffs ): self.group_name = group_name self.company_name = company_name self.staffs = staffs def __reversed__ (self ): self.staffs.reverse() def __getitem__ (self, item ): cls = type (self) if isinstance (item, slice ): return cls(group_name=self.group_name, company_name=self.company_name, staffs=self.staffs[item]) elif isinstance (item, numbers.Integral): return cls(group_name=self.group_name, company_name=self.company_name, staffs=[self.staffs[item]]) def __len__ (self ): return len (self.staffs) def __iter__ (self ): return iter (self.staffs) def __contains__ (self, item ): if item in self.staffs: return True else : return False staffs = ["cwz" , "reese" , "neo" , "setcreed" ] group = Group("user" , "imooc" , staffs) print (group[:2 ])print (len (group))for user in group: print (user) reversed (group)if "cwz" in group: print ("ok" )
bisect维护已排序序列 1 2 3 4 5 6 7 8 9 10 11 12 13 import bisectinter_list = [] bisect.insort(inter_list, 3 ) bisect.insort(inter_list, 1 ) bisect.insort(inter_list, 5 ) bisect.insort(inter_list, 7 ) bisect.insort(inter_list, 0 ) print (inter_list) print (bisect.bisect(inter_list, 3 ))
什么时候不使用列表
列表推导式、生成器表达式、字典推导式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 int_list = [1 ,2 ,3 ,4 ,5 ] qu_list = [item * item for item in int_list] print (type (qu_list))int_list = [1 ,2 ,-3 ,4 ,5 ] qu_list = [item if item > 0 else abs (item) for item in int_list] int_list1 = [1 ,2 ] int_list2 = [3 ,4 ] qu_list = [(first, second) for first in int_list1 for second in int_list2] my_dict = { "key1" :"bobby1" , "key2" :"bobby2" } int_list = [1 ,2 ,3 ,4 ,5 ] def process_item (item ): return str (item) int_dict = {process_item(item):item for item in int_list} print (int_dict)
dict的abc继承关系 1 2 3 4 5 from collections.abc import Mapping, MutableMappinga = {} print (isinstance (a, MutableMapping))
dict的子类 1 2 3 4 5 6 7 8 from collections import UserDict, defaultdictclass Mydict (UserDict ): def __setitem__ (self, key, item ): return super ().__setitem__(key, item*2 ) my_dict = Mydict(one=2 ) print (my_dict)
set和frozenset
dict和set实现原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 from random import randintdef load_list_data (total_nums, target_nums ): """ 从文件中读取数据,以list的方式返回 :param total_nums: 读取的数量 :param target_nums: 需要查询的数据的数量 """ all_data = [] target_data = [] file_name = "G:/慕课网课程/AdvancePython/fbobject_idnew.txt" with open (file_name, encoding="utf8" , mode="r" ) as f_open: for count, line in enumerate (f_open): if count < total_nums: all_data.append(line) else : break for x in range (target_nums): random_index = randint(0 , total_nums) if all_data[random_index] not in target_data: target_data.append(all_data[random_index]) if len (target_data) == target_nums: break return all_data, target_data def load_dict_data (total_nums, target_nums ): """ 从文件中读取数据,以dict的方式返回 :param total_nums: 读取的数量 :param target_nums: 需要查询的数据的数量 """ all_data = {} target_data = [] file_name = "G:/慕课网课程/AdvancePython/fbobject_idnew.txt" with open (file_name, encoding="utf8" , mode="r" ) as f_open: for count, line in enumerate (f_open): if count < total_nums: all_data[line] = 0 else : break all_data_list = list (all_data) for x in range (target_nums): random_index = randint(0 , total_nums-1 ) if all_data_list[random_index] not in target_data: target_data.append(all_data_list[random_index]) if len (target_data) == target_nums: break return all_data, target_data def find_test (all_data, target_data ): test_times = 100 total_times = 0 import time for i in range (test_times): find = 0 start_time = time.time() for data in target_data: if data in all_data: find += 1 last_time = time.time() - start_time total_times += last_time return total_times/test_times if __name__ == "__main__" : all_data, target_data = load_dict_data(2000000 , 1000 ) last_time = find_test(all_data, target_data) print (last_time)
数组是一个连续的空间,不需要从头到尾遍历
python的变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 a = 1 a = "abc" a = [1 ,2 ,3 ] b = a print (id (a), id (b))print (a is b)a = [1 ,2 ,3 ,4 ] b = [1 ,2 ,3 ,4 ] class People : pass person = People() if type (person) is People: print ("yes" )
一个经典的错误: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 def add (a, b ): a += b return a class Company : def __init__ (self, name, staffs=[] ): self.name = name self.staffs = staffs def add (self, staff_name ): self.staffs.append(staff_name) def remove (self, staff_name ): self.staffs.remove(staff_name) if __name__ == "__main__" : com1 = Company("com1" , ["bobby1" , "bobby2" ]) com1.add("bobby3" ) com1.remove("bobby1" ) print (com1.staffs) com2 = Company("com2" ) com2.add("bobby" ) print (com2.staffs) print (Company.__init__.__defaults__) com3 = Company("com3" ) com3.add("bobby5" ) print (com2.staffs) print (com3.staffs) print (com2.staffs is com3.staffs)
元类编程 property动态属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from datetime import datetime, dateclass User : def __init__ (self, name, birthday ): self.name = name self.birthday = birthday self._age = 0 @property def age (self ): return datetime.now().year - self.birthday.year @age.setter def age (self, value ): self._age = value if __name__ == '__main__' : user = User("cwz" , date(year=1994 , month=5 , day=10 )) user.age = 30 print (user._age) print (user.age)
__getattr__
找不到属性触发,
__getattribute__
不管有没有找到属性都会触发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from datetime import dateclass User : def __init__ (self, name, birthday, info={} ): self.name = name self.birthday = birthday self.info = info def __getattr__ (self, item ): return self.info[item] if __name__ == '__main__' : user = User("cwz" , date(year=1994 , month=5 , day=10 ), info={"height" : 178 }) print (user.height)
属性描述符 举个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 class User : def __init__ (self, name, birthday ): self.name = name self.birthday = birthday self._age = 0 @property def age (self ): return datetime.now().year - self.birthday.year @age.setter def age (self, value ): self._age = value
如果要校验name是否是字符串类型,就需要@age.setter
这样写重复的代码。
只要实现下面三个魔法方法中任意一个方法,就是属性描述符对象
1 2 3 4 5 6 7 8 9 class IntField : def __get__ (self, instance, owner ): pass def __set__ (self, instance, value ): pass def __delete__ (self, instance ): pass
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import numbersclass IntField : def __get__ (self, instance, owner ): return self.value def __set__ (self, instance, value ): if not isinstance (value, numbers.Integral): raise ValueError('int value need' ) self.value = value def __delete__ (self, instance ): pass class NonDataField : def __get__ (self, instance, owner ): pass class User : age = IntField() if __name__ == '__main__' : user = User() user.age = 30 print (user.age) ''' 如果user是某个类的实例,那么user.age(以及等价的getattr(user,’age’)) 首先调用__getattribute__。如果类定义了__getattr__方法, 那么在__getattribute__抛出 AttributeError 的时候就会调用到__getattr__, 而对于描述符(__get__)的调用,则是发生在__getattribute__内部的。 user = User(), 那么user.age 顺序如下: (1)如果“age”是出现在User或其基类的__dict__中, 且age是data descriptor (数据属性描述符), 那么调用其__get__方法, 否则 (2)如果“age”出现在user的__dict__中, 那么直接返回 obj.__dict__[‘age’], 否则 (3)如果“age”出现在User或其基类的__dict__中 (3.1)如果age是non-data descriptor,那么调用其__get__方法, 否则 (3.2)返回 __dict__[‘age’] (4)如果User有__getattr__方法,调用__getattr__方法,否则 (5)抛出AttributeError '''
这样可以控制给对象赋值的行为
__new__
和__init__
1 2 3 4 5 6 7 8 9 10 11 12 13 class User : def __new__ (cls, *args, **kwargs ): print (" in new " ) return super ().__new__(cls) def __init__ (self, name ): print (" in init" ) pass a = int () if __name__ == "__main__" : user = User(name="bobby" )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def create_class (name ): if name == "user" : class User : def __str__ (self ): return "user" return User elif name == "company" : class Company : def __str__ (self ): return "company" return Company if __name__ == '__main__' : my_class = create_class("user" ) print (my_class())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 def create_class (name ): if name == "user" : class User : def __str__ (self ): return "user" return User elif name == "company" : class Company : def __str__ (self ): return "company" return Company def say (self ): return "i am user" class BaseClass (): def answer (self ): return "i am baseclass" class MetaClass (type ): def __new__ (cls, *args, **kwargs ): return super ().__new__(cls, *args, **kwargs) from collections.abc import *class User (metaclass=MetaClass): def __init__ (self, name ): self.name = name def __str__ (self ): return "user" if __name__ == "__main__" : my_obj = User(name="bobby" ) print (my_obj)
元类实现简单的ORM 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 import numbersclass Field : pass class IntField (Field ): def __init__ (self, db_column, min_value=None , max_value=None ): self._value = None self.min_value = min_value self.max_value = max_value self.db_column = db_column if min_value is not None : if not isinstance (min_value, numbers.Integral): raise ValueError("min_value must be int" ) elif min_value < 0 : raise ValueError("min_value must be positive int" ) if max_value is not None : if not isinstance (max_value, numbers.Integral): raise ValueError("max_value must be int" ) elif max_value < 0 : raise ValueError("max_value must be positive int" ) if min_value is not None and max_value is not None : if min_value > max_value: raise ValueError("min_value must be smaller than max_value" ) def __get__ (self, instance, owner ): return self._value def __set__ (self, instance, value ): if not isinstance (value, numbers.Integral): raise ValueError("int value need" ) if value < self.min_value or value > self.max_value: raise ValueError("value must between min_value and max_value" ) self._value = value class CharField (Field ): def __init__ (self, db_column, max_length=None ): self._value = None self.db_column = db_column if max_length is None : raise ValueError("you must spcify max_lenth for charfiled" ) self.max_length = max_length def __get__ (self, instance, owner ): return self._value def __set__ (self, instance, value ): if not isinstance (value, str ): raise ValueError("string value need" ) if len (value) > self.max_length: raise ValueError("value len excess len of max_length" ) self._value = value class ModelMetaClass (type ): def __new__ (cls, name, bases, attrs, **kwargs ): if name == "BaseModel" : return super ().__new__(cls, name, bases, attrs, **kwargs) fields = {} for key, value in attrs.items(): if isinstance (value, Field): fields[key] = value attrs_meta = attrs.get("Meta" , None ) _meta = {} db_table = name.lower() if attrs_meta is not None : table = getattr (attrs_meta, "db_table" , None ) if table is not None : db_table = table _meta["db_table" ] = db_table attrs["_meta" ] = _meta attrs["fields" ] = fields del attrs["Meta" ] return super ().__new__(cls, name, bases, attrs, **kwargs) class BaseModel (metaclass=ModelMetaClass): def __init__ (self, *args, **kwargs ): for key, value in kwargs.items(): setattr (self, key, value) return super ().__init__() def save (self ): fields = [] values = [] for key, value in self.fields.items(): db_column = value.db_column if db_column is None : db_column = key.lower() fields.append(db_column) value = getattr (self, key) values.append(str (value)) sql = "insert {db_table}({fields}) value({values})" .format (db_table=self._meta["db_table" ], fields="," .join(fields), values="," .join(values)) pass class User (BaseModel ): name = CharField(db_column="name" , max_length=10 ) age = IntField(db_column="age" , min_value=1 , max_value=100 ) class Meta : db_table = "user" if __name__ == "__main__" : user = User(name="bobby" , age=28 ) user.save()
python的迭代协议 1 2 3 4 5 6 7 8 9 10 from collections.abc import Iterable, Iteratora = [1 ,2 ] iter_rator = iter (a) print (isinstance (a, Iterable))print (isinstance (iter_rator, Iterator))
迭代器和可迭代对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from collections.abc import Iteratorclass Company (object ): def __init__ (self, employee_list ): self.employee = employee_list def __iter__ (self ): return MyIterator(self.employee) class MyIterator (Iterator ): def __init__ (self, employee_list ): self.iter_list = employee_list self.index = 0 def __next__ (self ): try : word = self.iter_list[self.index] except IndexError: raise StopIteration self.index += 1 return word if __name__ == "__main__" : company = Company(["tom" , "bob" , "jane" ]) my_itor = iter (company) for item in company: print (item)
生成器的使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 def gen_func (): yield 1 yield 2 yield 3 def fib (index ): if index <= 2 : return 1 else : return fib(index-1 ) + fib(index-2 ) def fib2 (index ): re_list = [] n,a,b = 0 ,0 ,1 while n<index: re_list.append(b) a,b = b, a+b n += 1 return re_list def gen_fib (index ): n,a,b = 0 ,0 ,1 while n<index: yield b a,b = b, a+b n += 1 for data in gen_fib(10 ): print (data) def func (): return 1 if __name__ == "__main__" : gen = gen_func() for value in gen: print (value)
python是如何实现生成器的 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 """ """ import inspectframe = None def foo (): bar() def bar (): global frame frame = inspect.currentframe() """ python一切皆对象,栈帧对象, 字节码对象 当foo调用子函数 bar, 又会创建一个栈帧 所有的栈帧都是分配在堆内存上,这就决定了栈帧可以独立于调用者存在 """ foo() print (frame.f_code.co_name)caller_frame = frame.f_back print (caller_frame.f_code.co_name)def gen_func (): yield 1 name = "bobby" yield 2 age = 30 return "imooc" import disgen = gen_func() print (dis.dis(gen))print (gen.gi_frame.f_lasti)print (gen.gi_frame.f_locals)next (gen)print (gen.gi_frame.f_lasti)print (gen.gi_frame.f_locals)next (gen)print (gen.gi_frame.f_lasti)print (gen.gi_frame.f_locals)class company : def __getitem__ (self, item ): pass from collections import UserList
生成器在UserList中的应用 生成器读取大文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def myreadlines (f, newline ): buf = "" while True : while newline in buf: pos = buf.index(newline) yield buf[:pos] buf = buf[pos + len (newline):] chunk = f.read(4096 ) if not chunk: yield buf break buf += chunk with open ("input.txt" ) as f: for line in myreadlines(f, "{|}" ): print (line)
socket编程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import socketimport threadingserver = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.bind(('0.0.0.0' , 8000 )) server.listen() def handle_sock (sock, addr ): while True : data = sock.recv(1024 ) print (data.decode("utf8" )) re_data = input () sock.send(re_data.encode("utf8" )) while True : sock, addr = server.accept() client_thread = threading.Thread(target=handle_sock, args=(sock, addr)) client_thread.start() import socketclient = socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect(('127.0.0.1' , 8000 )) while True : re_data = input () client.send(re_data.encode("utf8" )) data = client.recv(1024 ) print (data.decode("utf8" ))
socket模拟http请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import socketfrom urllib.parse import urlparsedef get_url (url ): url = urlparse(url) host = url.netloc path = url.path if path == "" : path = "/" client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, 80 )) client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n" .format (path, host).encode("utf8" )) data = b"" while True : d = client.recv(1024 ) if d: data += d else : break data = data.decode("utf8" ) html_data = data.split("\r\n\r\n" )[1 ] print (html_data) client.close() if __name__ == "__main__" : import time start_time = time.time() for url in range (20 ): url = "http://shop.projectsedu.com/goods/{}/" .format (url) get_url(url) print (time.time()-start_time)
python中的GIL 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 total = 0 def add (): global total for i in range (1000000 ): total += 1 def desc (): global total for i in range (1000000 ): total -= 1 import threadingthread1 = threading.Thread(target=add) thread2 = threading.Thread(target=desc) thread1.start() thread2.start() thread1.join() thread2.join() print (total)
python多线程编程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import timeimport threadingdef get_detail_html (url ): print ("get detail html started" ) time.sleep(2 ) print ("get detail html end" ) def get_detail_url (url ): print ("get detail url started" ) time.sleep(4 ) print ("get detail url end" ) class GetDetailHtml (threading.Thread): def __init__ (self, name ): super ().__init__(name=name) def run (self ): print ("get detail html started" ) time.sleep(2 ) print ("get detail html end" ) class GetDetailUrl (threading.Thread): def __init__ (self, name ): super ().__init__(name=name) def run (self ): print ("get detail url started" ) time.sleep(4 ) print ("get detail url end" ) if __name__ == "__main__" : thread1 = GetDetailHtml("get_detail_html" ) thread2 = GetDetailUrl("get_detail_url" ) start_time = time.time() thread1.start() thread2.start() thread1.join() thread2.join() print ("last time: {}" .format (time.time()-start_time))
线程通信方式- 共享变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from queue import Queueimport timeimport threadingdef get_detail_html (queue ): while True : url = queue.get() print ("get detail html started" ) time.sleep(2 ) print ("get detail html end" ) def get_detail_url (queue ): while True : print ("get detail url started" ) time.sleep(4 ) for i in range (20 ): queue.put("http://projectsedu.com/{id}" .format (id =i)) print ("get detail url end" ) if __name__ == "__main__" : detail_url_queue = Queue(maxsize=1000 ) thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,)) for i in range (10 ): html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,)) html_thread.start() start_time = time.time() detail_url_queue.task_done() detail_url_queue.join()
线程同步Lock,Rlock(可重入锁) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 from threading import Lock, RLock total = 0 lock = RLock() def add (): global lock global total for i in range (1000000 ): lock.acquire() lock.acquire() total += 1 lock.release() lock.release() def desc (): global total global lock for i in range (1000000 ): lock.acquire() total -= 1 lock.release() import threadingthread1 = threading.Thread(target=add) thread2 = threading.Thread(target=desc) thread1.start() thread2.start() thread1.join() thread2.join() print (total)""" A(a、b) acquire (a) acquire (b) B(a、b) acquire (a) acquire (b) """
线程同步 Condition(条件变量) 用于线程间复杂的同步 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 import threadingclass XiaoAi (threading.Thread): def __init__ (self, cond ): super ().__init__(name="小爱" ) self.cond = cond def run (self ): with self.cond: self.cond.wait() print ("{} : 在 " .format (self.name)) self.cond.notify() self.cond.wait() print ("{} : 好啊 " .format (self.name)) self.cond.notify() self.cond.wait() print ("{} : 君住长江尾 " .format (self.name)) self.cond.notify() self.cond.wait() print ("{} : 共饮长江水 " .format (self.name)) self.cond.notify() self.cond.wait() print ("{} : 此恨何时已 " .format (self.name)) self.cond.notify() self.cond.wait() print ("{} : 定不负相思意 " .format (self.name)) self.cond.notify() class TianMao (threading.Thread): def __init__ (self, cond ): super ().__init__(name="天猫精灵" ) self.cond = cond def run (self ): with self.cond: print ("{} : 小爱同学 " .format (self.name)) self.cond.notify() self.cond.wait() print ("{} : 我们来对古诗吧 " .format (self.name)) self.cond.notify() self.cond.wait() print ("{} : 我住长江头 " .format (self.name)) self.cond.notify() self.cond.wait() print ("{} : 日日思君不见君 " .format (self.name)) self.cond.notify() self.cond.wait() print ("{} : 此水几时休 " .format (self.name)) self.cond.notify() self.cond.wait() print ("{} : 只愿君心似我心 " .format (self.name)) self.cond.notify() self.cond.wait() if __name__ == "__main__" : from concurrent import futures cond = threading.Condition() xiaoai = XiaoAi(cond) tianmao = TianMao(cond) xiaoai.start() tianmao.start()
线程同步 - Semaphore 使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import threadingimport timeclass HtmlSpider (threading.Thread): def __init__ (self, url, sem ): super ().__init__() self.url = url self.sem = sem def run (self ): time.sleep(2 ) print ("got html text success" ) self.sem.release() class UrlProducer (threading.Thread): def __init__ (self, sem ): super ().__init__() self.sem = sem def run (self ): for i in range (20 ): self.sem.acquire() html_thread = HtmlSpider("https://baidu.com/{}" .format (i), self.sem) html_thread.start() if __name__ == "__main__" : sem = threading.Semaphore(3 ) url_producer = UrlProducer(sem) url_producer.start()
线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETEDfrom concurrent.futures import Futurefrom multiprocessing import Poolimport timedef get_html (times ): time.sleep(times) print ("get page {} success" .format (times)) return times executor = ThreadPoolExecutor(max_workers=2 ) urls = [3 ,2 ,4 ] all_task = [executor.submit(get_html, (url)) for url in urls] wait(all_task, return_when=FIRST_COMPLETED) print ("main" )
多线程和多进程对比 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import timefrom concurrent.futures import ThreadPoolExecutor, as_completedfrom concurrent.futures import ProcessPoolExecutordef random_sleep (n ): time.sleep(n) return n if __name__ == "__main__" : with ProcessPoolExecutor(3 ) as executor: all_task = [executor.submit(random_sleep, (num)) for num in [2 ]*30 ] start_time = time.time() for future in as_completed(all_task): data = future.result() print ("exe result: {}" .format (data)) print ("last time is: {}" .format (time.time()-start_time))
multiprocessing 多进程编程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import multiprocessingimport timedef get_html (n ): time.sleep(n) print ("sub_progress success" ) return n if __name__ == "__main__" : pool = multiprocessing.Pool(multiprocessing.cpu_count()) for result in pool.imap_unordered(get_html, [1 ,5 ,3 ]): print ("{} sleep success" .format (result))
进程间通信 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 import timefrom multiprocessing import Process, Queue, Pool, Manager, Pipedef add_data (p_dict, key, value ): p_dict[key] = value if __name__ == "__main__" : progress_dict = Manager().dict () from queue import PriorityQueue first_progress = Process(target=add_data, args=(progress_dict, "bobby1" , 22 )) second_progress = Process(target=add_data, args=(progress_dict, "bobby2" , 23 )) first_progress.start() second_progress.start() first_progress.join() second_progress.join() print (progress_dict)
协程 并发、并行、同步、异步、阻塞、非阻塞
并法是指一个时间段内,有几个程序在同一个cpu上运行,但是任意时刻只有一个程序在cpu上运行
并行是指任意时刻点上,有多个程序同时运行在多个cpu上
同步是指代码调用IO操作时,必须等待IO操作完成才返回的调用方式
异步是指代码调用IO操作时,不必等IO操作完成就返回的调用方式
IO 多路复用 (select、poll 和 epoll)
C10k问题:是一个在1999年被提出的技术挑战,如何在一颗1GHz CPU,2G内存,1gbps网络环境下,让单台服务器同时为1万个客户端提供FTP服务
Unix下五种I/O模型
阻塞式I/O
非阻塞式I/O
I/O复用
信号驱动式I/O
异步I/O(POSIX的aio_系列函数)
调用select之后,操作系统会返回哪些文件句柄准备好了。select方法也是一个阻塞的方法,如果操作系统的文件句柄都没有准备好,就会阻塞住。select能够监听多个socket状态,只要有一个socket准备好就返回。
select、poll、epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select、poll、epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。
epoll并不代表一定比select好 在并发高的情况下,连接活跃度不是很高, epoll比select 并发性不高,同时连接很活跃, select比epoll好
通过非阻塞io实现http请求: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 import socketfrom urllib.parse import urlparsedef get_url (url ): url = urlparse(url) host = url.netloc path = url.path if path == "" : path = "/" client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.setblocking(False ) try : client.connect((host, 80 )) except BlockingIOError as e: pass while True : try : client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n" .format (path, host).encode("utf8" )) break except OSError as e: pass data = b"" while True : try : d = client.recv(1024 ) except BlockingIOError as e: continue if d: data += d else : break data = data.decode("utf8" ) html_data = data.split("\r\n\r\n" )[1 ] print (html_data) client.close() if __name__ == "__main__" : get_url("http://www.baidu.com" )
select+回调+事件循环 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 import socketfrom urllib.parse import urlparsefrom selectors import DefaultSelector, EVENT_READ, EVENT_WRITEselector = DefaultSelector() urls = [] stop = False class Fetcher : def connected (self, key ): selector.unregister(key.fd) self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n" .format (self.path, self.host).encode("utf8" )) selector.register(self.client.fileno(), EVENT_READ, self.readable) def readable (self, key ): d = self.client.recv(1024 ) if d: self.data += d else : selector.unregister(key.fd) data = self.data.decode("utf8" ) html_data = data.split("\r\n\r\n" )[1 ] print (html_data) self.client.close() urls.remove(self.spider_url) if not urls: global stop stop = True def get_url (self, url ): self.spider_url = url url = urlparse(url) self.host = url.netloc self.path = url.path self.data = b"" if self.path == "" : self.path = "/" self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client.setblocking(False ) try : self.client.connect((self.host, 80 )) except BlockingIOError as e: pass selector.register(self.client.fileno(), EVENT_WRITE, self.connected) def loop (): while not stop: ready = selector.select() for key, mask in ready: call_back = key.data call_back(key) if __name__ == "__main__" : fetcher = Fetcher() import time start_time = time.time() for url in range (20 ): url = "http://shop.projectsedu.com/goods/{}/" .format (url) urls.append(url) fetcher = Fetcher() fetcher.get_url(url) loop() print (time.time()-start_time)
回调之痛
如果回调函数执行不正常该怎么办?
如果回调里面还要嵌套回调怎么办?要嵌套很多层怎么办?
如果嵌套了很多层,其中某个环节出错了会造成什么后果?
如果有个数据需要被每个回调都处理怎么办?
怎么使用当前函数中的局部变量?
……
可读性差、共享状态管理困难、异常处理困难
协程
C10M问题:如何利用8核心CPU,64G内存,在10gbps的网络上保持1000万并发连接
问题:
回调模式编码复杂度高
同步编程的并法性不高
多线程编程需要线程间同步
采用同步的方式去编写异步的代码
使用单线程去切换任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def get_url (url ): html = get_html(url) urls = parse_url(html) def get_url (url ): html = get_html(url) urls = parse_url(html)
生成器进阶-send、close和throw方法
生成器不只可以产出值,还可以传出值。
send方法:
启动生成器的方式有两种,next()和send()
send方法可以传递值进入生成器内部,同时还可以重启生成器执行到下一个yield位置
在调用send发送非None值之前,我们必须启动一次生成器,方式有两种,1、gen.send(None) 2、next(gen)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def gen_func (): html = yield "https://www.jd.com" print (html) yield 2 yield 3 return "cwz" if __name__ == '__main__' : gen = gen_func() gen.send(None ) html_content = "网址内容" print (gen.send(html_content)) 网址内容 2
close方法:
会关闭生成器,RuntimeError: generator ignored GeneratorExit
这里抛出异常是向下执行yield抛出的。
不捕捉异常是不会抛错的,gen.close() 之后再next(gen),会抛出StopIteration
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def gen_func (): try : yield "https://www.baidu.com" except GeneratorExit: pass yield 2 yield 3 return "cwz" if __name__ == '__main__' : gen = gen_func() print (next (gen)) gen.close()
throw方法:
抛出异常,在当前yield抛出,必须要手动捕获异常,不像close方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def gen_func (): try : yield "https://www.baidu.com" except Exception: pass yield 2 yield 3 return "cwz" if __name__ == '__main__' : gen = gen_func() print (next (gen)) gen.throw(Exception, "download error" ) print (next (gen))
yield_from python3.3新加了yield from语法
1 2 3 4 5 6 7 8 9 10 from itertools import chainmy_list = [1 , 2 , 3 ] my_dict = { "name" : "reese" , "age" : 18 } for value in chain(my_list, my_dict, range (2 , 5 )): print (value)
chain可以传入任意可迭代对象。
自己实现chain:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 my_list = [1 , 2 , 3 ] my_dict = { "name" : "reese" , "age" : 18 } def my_chain (*args, **kwargs ): for iterable_value in args: for value in iterable_value: yield value for i in my_chain(my_list, my_dict): print (i)
yield from + 可迭代对象,举个例子说明yield from和yield的区别
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def g1 (iterable ): yield iterable def g2 (iterable ): yield from iterable for i in g1(range (5 )): print (i) for i in g2(range (5 )): print (i) range (0 , 5 )0 1 2 3 4
yield from的一些概念:
1 2 3 4 5 6 7 8 9 10 11 def g1 (gen ): yield from gen def main (): g = g1(1 ) g.send(None )
yield from的用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 final_result = {} def sales_sum (pro_name ): total = 0 nums = [] while True : x = yield print (pro_name + "销量: " , x) if not x: break total += x nums.append(x) return total, nums def middle (key ): while True : final_result[key] = yield from sales_sum(key) print (key + "销量统计完成!!" ) def main (): data_sets = { "小明牌面膜" : [1200 , 1500 , 3000 ], "小明牌手机" : [28 , 55 , 98 , 108 ], "小明牌大衣" : [280 , 560 , 778 , 70 ], } for key, data_set in data_sets.items(): print ("start key:" , key) m = middle(key) m.send(None ) for value in data_set: m.send(value) m.send(None ) print ("final_result:" , final_result) if __name__ == '__main__' : main()
上面的代码中,如果将yield from直接替换成子生成器sales_num方法,会怎么样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def sales_sum (pro_name ): total = 0 nums = [] while True : x = yield print (pro_name + "销量: " , x) if not x: break total += x nums.append(x) return total, nums if __name__ == "__main__" : my_gen = sales_sum("小明牌手机" ) my_gen.send(None ) my_gen.send(1200 ) my_gen.send(1500 ) my_gen.send(3000 ) try : my_gen.send(None ) except StopIteration as e: result = e.value print (result)
需要手动处理StopIteration异常,才能将处理的值返回到 final_result[key]这里。
yield from原理说明:
基础版:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 """ _i:子生成器,同时也是一个迭代器 _y:子生成器生产的值 _r:yield from 表达式最终的值 _s:调用方通过send()发送的值 _e:异常对象 """ _i = iter (EXPR) try : _y = next (_i) except StopIteration as _e: _r = _e.value else : while 1 : _s = yield _y try : _y = _i.send(_s) except StopIteration as _e: _r = _e.value break RESULT = _r
上面的过程只是处理一般的情况,可能还需要处理以下情况:
子生成器可能只是一个迭代器,并不是一个作为协程的生成器,所以它不支持.throw()和.close()方法
如果子生成器支持.throw()和.close()方法,但是在子生成器内部,这两个方法都会抛出异常
调用方让子生成器自己抛出异常
当调用方使用next()或者.send(None)时,都要在子生成器上调用next()函数,当调用方使用.send()发送非 None 值时,才调用子生成器的.send()方法
完整版逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 _i = iter (EXPR) try : _y = next (_i) except StopIteration as _e: _r = _e.value else : while 1 : try : _s = yield _y except GeneratorExit as _e: try : _m = _i.close except AttributeError: pass else : _m() raise _e except BaseException as _e: _x = sys.exc_info() try : _m = _i.throw except AttributeError: raise _e else : try : _y = _m(*_x) except StopIteration as _e: _r = _e.value break else : try : if _s is None : _y = next (_i) else : _y = _i.send(_s) except StopIteration as _e: _r = _e.value break RESULT = _r
while 1上面的逻辑和前面的一致,都是先获取一个迭代器然后next(),处理StopIteration。但是在调用while 1的时候,可能从外部接收到GeneratorExit(可能直接按了ctrl+c),这里首先获取_i
的close,但是_i
可能只是一个迭代器不是生成器,这时候调用close属性会抛出AttributeError,这种情况会将它忽略掉。然后调用close()方法,raise掉异常。
还有BaseException是所有异常的父类,首先做了try,获取throw()方法。如果有throw(),调用之后会处理StopIteration
如果没有异常,在_s
为None的时候就要调用next(),不为None就调用send方法,最后处理一下StopIteration
总结一下关键点:
子生成器生产的值,都是直接传给调用方的;调用方通过.send()发送的值都是直接传递给子生成器的;如果发送的是 None,会调用子生成器的__next__()
方法,如果不是 None,会调用子生成器的.send()方法;
子生成器退出的时候,最后的return EXPR,会触发一个StopIteration(EXPR)异常;
yield from表达式的值,是子生成器终止时,传递给StopIteration异常的第一个参数;
如果调用的时候出现StopIteration异常,委托生成器会恢复运行,同时其他的异常会向上 “冒泡”;
传入委托生成器的异常里,除了GeneratorExit之外,其他的所有异常全部传递给子生成器的.throw()方法;如果调用.throw()的时候出现了StopIteration异常,那么就恢复委托生成器的运行,其他的异常全部向上 “冒泡”;
如果在委托生成器上调用.close()或传入GeneratorExit异常,会调用子生成器的.close()方法,没有的话就不调用。如果在调用.close()的时候抛出了异常,那么就向上 “冒泡”,否则的话委托生成器会抛出GeneratorExit异常。
async和await定义原生的协程
python为了将语义变得更加明确,就引入了async和await关键词用于定义原生的协程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from collections.abc import Awaitableasync def downloader (url ): return "url" async def download_url (url ): html = await downloader(url) return html if __name__ == '__main__' : coro = download_url("https://www.baidu.com" ) coro.send(None )
装饰器+生成器来实现协程,不建议
生成器是如何变成协程的: 我们希望协程是能够用单线程来调度的,这样我们就不需要像线程一样让操作系统去调度。协程是由程序员自己去调度的,它没有深入到内核级别去进行调度,进程和线程都是内核级别的调度,而协程是函数级别的调度。我们希望协程是由自己来决定什么时候来调用,并且能够像写同步代码一样来编写异步代码。 而生成器就可以完成协程这样的功能。
生成器是有状态的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import inspectdef gen_func (): yield 1 return "cwz" if __name__ == '__main__' : gen = gen_func() print (inspect.getgeneratorstate(gen)) next (gen) print (inspect.getgeneratorstate(gen)) try : next (gen) except StopIteration: pass print (inspect.getgeneratorstate(gen))
从上面的代码也可以看出来,生成器的很多功能就奠定了我们可以用生成器来实现协程。
1 2 3 def gen_func (): value = yield 1 return "cwz"
上面的代码中,value = yield 1有两层含义:
第一,返回值给调用方
第二:调用方通过send方式返回值给gen
其中有了这两层含义就可以看作是协程了,现在可以消费从外面传递进来的数据。之前的生成器只是一个生产者,只是yield值出去,现在变成了一个消费者,可以接收值进来做一个处理。这种模式就可以称作是协程了,再加上yield from,就更加是了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import socketdef get_socket_data (): yield "bobby" def downloader (url ): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.setblocking(False ) try : client.connect((host, 80 )) except BlockingIOError as e: pass selector.register(self.client.fileno(), EVENT_WRITE, self.connected) source = yield from get_socket_data() data = source.decode("utf8" ) html_data = data.split("\r\n\r\n" )[1 ] print (html_data) def download_html (html ): html = yield from downloader() if __name__ == "__main__" : pass
asyncio并发编程 asyncio模块介绍
包含各种特定系统实现的模块化事件循环
传输和协议抽象
对TCP、UDP、SSL、子进程呢个、延时调用以及其他的具体支持
模仿futures模块但适用于事件循环使用的Future类
基于yield from的协议和任务,可以让你用顺序的方式编写并发代码
必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池
模仿threading模块中的同步原语,可以用在单线程内的协程之间
使用asyncio
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import asyncioimport timeasync def get_html (url ): print ("start get html" ) await asyncio.sleep(3 ) print ("end get html" ) if __name__ == '__main__' : start = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(get_html("https://www.baidu.com" )) print ("use time:" , time.time() - start)
模拟访问10个网页
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import asyncioimport timeasync def get_html (url ): print ("start get html" ) await asyncio.sleep(3 ) print ("end get html" ) if __name__ == '__main__' : start = time.time() loop = asyncio.get_event_loop() tasks = [get_html("https://www.baidu.com" ) for i in range (10 )] loop.run_until_complete(asyncio.wait(tasks)) print ("use time:" , time.time() - start)
无论tasks是多少,需要的时间大概在3秒
异步编程中不能使用同步阻塞的代码,上述代码中如果await asyncio.sleep(3)
换成了time.sleep(3)
就不行了
获取返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import asyncioimport timeasync def get_html (url ): print ("start get html" ) await asyncio.sleep(3 ) print ("end get html" ) return "cwz" if __name__ == '__main__' : loop = asyncio.get_event_loop() get_feature = asyncio.ensure_future(get_html("https://www.baidu.com" )) loop.run_until_complete(get_feature) print (get_feature.result())
或者可以这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import asyncioimport timeasync def get_html (url ): print ("start get html" ) await asyncio.sleep(3 ) print ("end get html" ) return "cwz" if __name__ == '__main__' : loop = asyncio.get_event_loop() task = loop.create_task(get_html("https://www.baidu.com" )) loop.run_until_complete(task) print (task.result())
ensure_future最终还是调用的是create_task,将传递进来的协程包装成一个task,并且将协程注册到loop队列里面去。 ensure_future源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def ensure_future (coro_or_future, *, loop=None ): """Wrap a coroutine or an awaitable in a future. If the argument is a Future, it is returned directly. """ if coroutines.iscoroutine(coro_or_future): if loop is None : loop = events.get_event_loop() task = loop.create_task(coro_or_future) if task._source_traceback: del task._source_traceback[-1 ] return task elif futures.isfuture(coro_or_future): if loop is not None and loop is not futures._get_loop(coro_or_future): raise ValueError('The future belongs to a different loop than ' 'the one specified as the loop argument' ) return coro_or_future elif inspect.isawaitable(coro_or_future): return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) else : raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' 'required' )
协程结束之后加上自己的callback
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asynciofrom functools import partialasync def get_html (url ): print ("start get html" ) await asyncio.sleep(3 ) print ("end get html" ) return "cwz" def callback (url, future ): print (future) print ("send email to cwz..." ) if __name__ == '__main__' : loop = asyncio.get_event_loop() task = loop.create_task(get_html("https://www.baidu.com" )) task.add_done_callback(partial(callback, "https://www.baidu.com" )) loop.run_until_complete(task) print (task.result())
wait和gather:
gather更加高效,可以分组,可以将所有任务取消
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asyncioimport timeasync def get_html (url ): print ("start get url" ) await asyncio.sleep(2 ) print ("end get url" ) if __name__ == "__main__" : start_time = time.time() loop = asyncio.get_event_loop() tasks = [get_html("http://www.baidu.com" ) for i in range (10 )] group1 = [get_html("http://projectsedu.com" ) for i in range (2 )] group2 = [get_html("http://www.baidu.com" ) for i in range (2 )] group1 = asyncio.gather(*group1) group2 = asyncio.gather(*group2) group2.cancel() loop.run_until_complete(asyncio.gather(group1, group2)) print (time.time() - start_time)
task取消:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import asyncioimport timeasync def get_html (sleep_times ): print ("waiting" ) await asyncio.sleep(sleep_times) print ("done after {}s" .format (sleep_times)) if __name__ == "__main__" : task1 = get_html(2 ) task2 = get_html(3 ) task3 = get_html(3 ) tasks = [task1, task2, task3] loop = asyncio.get_event_loop() try : loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: all_tasks = asyncio.Task.all_tasks() for task in all_tasks: print ("cancel task" ) print (task.cancel()) loop.stop() loop.run_forever() finally : loop.close()
call_soon、call_at、call_later、call_soon_threadsafe
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asynciodef callback (sleep_times, loop ): print ("success time {}" .format (loop.time())) def stoploop (loop ): loop.stop() if __name__ == "__main__" : loop = asyncio.get_event_loop() now = loop.time() loop.call_at(now + 2 , callback, 2 , loop) loop.call_at(now + 1 , callback, 1 , loop) loop.call_at(now + 3 , callback, 3 , loop) loop.call_soon(callback, 4 , loop) loop.run_forever()
ThreadPollExecutor 和 asycio 完成阻塞 IO 请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import asynciofrom concurrent.futures import ThreadPoolExecutorimport socketfrom urllib.parse import urlparsedef get_url (url ): url = urlparse(url) host = url.netloc path = url.path if path == "" : path = "/" client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, 80 )) client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n" .format (path, host).encode("utf8" )) data = b"" while True : d = client.recv(1024 ) if d: data += d else : break data = data.decode("utf8" ) html_data = data.split("\r\n\r\n" )[1 ] print (html_data) client.close() if __name__ == "__main__" : import time start_time = time.time() loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(3 ) tasks = [] for url in range (20 ): url = "http://shop.projectsedu.com/goods/{}/" .format (url) task = loop.run_in_executor(executor, get_url, url) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print ("last time:{}" .format (time.time() - start_time))
asyncio模拟http请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import asyncioimport socketfrom urllib.parse import urlparseasync def get_url (url ): url = urlparse(url) host = url.netloc path = url.path if path == "" : path = "/" reader, writer = await asyncio.open_connection(host, 80 ) writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n" .format (path, host).encode("utf8" )) all_lines = [] async for raw_line in reader: data = raw_line.decode("utf8" ) all_lines.append(data) html = "\n" .join(all_lines) return html async def main (): tasks = [] for url in range (20 ): url = "http://shop.projectsedu.com/goods/{}/" .format (url) tasks.append(asyncio.ensure_future(get_url(url))) for task in asyncio.as_completed(tasks): result = await task print (result) if __name__ == "__main__" : import time start_time = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) print ('last time:{}' .format (time.time() - start_time))
async with 调用了__await__
和__aenter__
aiohttp事项高并发爬虫
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 import asyncioimport reimport aiohttpimport aiomysqlfrom pyquery import PyQuerystopping = False start_url = "http://www.jobbole.com/xinwen/xwyd/" waiting_urls = [] seen_urls = set () sem = asyncio.Semaphore(3 ) async def fetch (url, session ): async with sem: try : async with session.get(url) as resp: print ("url status: {}" .format (resp.status)) if resp.status in [200 , 201 ]: data = await resp.text() return data except Exception as e: print (e) def extract_urls (html ): urls = [] pq = PyQuery(html) for link in pq.items("a" ): url = link.attr("href" ) if url and url.startswith("http" ) and url not in seen_urls: urls.append(url) waiting_urls.append(url) return urls async def init_urls (url, session ): html = await fetch(url, session) seen_urls.add(url) extract_urls(html) async def article_handler (url, session, pool ): html = await fetch(url, session) seen_urls.add(url) extract_urls(html) pq = PyQuery(html) title = pq("title" ).text() async with pool.acquire() as conn: async with conn.cursor() as cur: insert_sql = "insert into article(title) values('{}') " .format (title) await cur.execute(insert_sql) async def consumer (pool ): async with aiohttp.ClientSession() as session: while not stopping: if len (waiting_urls) == 0 : await asyncio.sleep(0.5 ) continue url = waiting_urls.pop() print ("start get url: {}" .format (url)) if re.match ("http://.*?jobbole.com/xinwen/xwyd/\d+/" , url): if url not in seen_urls: asyncio.ensure_future(article_handler(url, session, pool)) await asyncio.sleep(30 ) async def main (loop ): pool = await aiomysql.create_pool( host='127.0.0.1' , port=3306 , user='root' , password='123456' , db='test1' , loop=loop, charset='utf8' , autocommit=True ) async with aiohttp.ClientSession() as session: html = await fetch(start_url, session) seen_urls.add(start_url) extract_urls(html) asyncio.ensure_future(consumer(pool)) if __name__ == '__main__' : loop = asyncio.get_event_loop() asyncio.ensure_future(main(loop)) loop.run_forever()
python并发的解决方案:
tornado
twisted
gevent
asyncio
tornado、twisted采用了python的生成器
gevent是通过C语言实现了底层的协程完成的,gevnet最大的问题是采用了猴子补丁,将很多python内置库进行了补丁,用起来会很简单,但是很多内部的异常很难捕捉到。
并发编程中,asyncio为什么会这么复杂?协程编程为什么比我们多进程多线程编程复杂很多?原因就是协程是由程序员自己来调度的,程序员自己来调度肯定不会让平常编码的人来调度,所以就出现了一些框架,像asyncio,主要就是来完成协程调度的同时还能让我们像编写同步代码的方式来编写异步代码。
我们平常在编写多进程多线程的代码时,线程调度和进程调度是由操作系统来完成的,操作系统对于我们很多程序员来说是一个黑盒,实际上操作系统在调度的过程中有非常多的逻辑,操作系统完成了这些逻辑,很多人又不会去接触操作系统的原理,所以操作系统本身的调度过程我们是不清楚的。而我们只要写出多线程就行了,但是协程不一样,协程编程模式由阻塞IO的编码方式直接转换成异步非阻塞的编码方式,这是一个很大的转变。在这个转变过程中,我们不得不里了解其中协程的调度过程。