python中一切皆对象 函数和类也是对象,属于python的一等公民
type、object和class之间的关系 type可以生成一个类,可以返回对象的类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 a = 1 b = "abc" print (type (1 )) print (type (int )) print (type (b))print (type (str ))class Student : pass class MyStudent (Student ): pass stu = Student() print (type (stu)) print (type (Student)) print (int .__bases__) print (Student.__bases__) print (MyStudent.__bases__) print (type .__bases__) print (object .__bases__) print (type (object ))
魔法函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class MyVector (object ): def __init__ (self, x, y ): self.x = x self.y = y def __add__ (self, other ): re_vector = MyVector(self.x+other.x, self.y+other.y) return re_vector def __str__ (self ): return "x:{x}, y:{y}" .format (x=self.x, y=self.y) first_vec = MyVector(1 ,2 ) second_vec = MyVector(3 ,4 ) print (first_vec + second_vec)
抽象基类(abc模块) 1 2 3 4 5 6 7 8 9 10 11 12 class Company (object ): def __init__ (self, employee_list ): self.employee = employee_list def __len__ (self ): return len (self.employee) com = Company(['reese' , 'neo' ]) from import Sizedprint (isinstance (com, Sized))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import abcclass CacheBase (metaclass=abc.ABCMeta): @abc.abstractmethod def get (self, key ): pass @abc.abstractmethod def set (self, key, value ): pass class RedisCache (CacheBase ): pass redis_cache = RedisCache()
python的自省机制 自省是通过一定的机制查询到对象的内部结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class Person : name = "user" class Student (Person ): def __init__ (self, school_name ): self.school_name = school_name if __name__ == "__main__" : user = Student("cwz" ) print (user.__dict__)
super()真的是调用父类的吗 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class A : def __init__ (self ): print ('A' ) class B (A ): def __init__ (self ): print ("B" ) super ().__init__() class C (A ): def __init__ (self ): print ("C" ) super (C, self).__init__() class D (B, C): def __init__ (self ): print ("D" ) super (D, self).__init__() if __name__ == '__main__' : D() D B C A
with语句 上下文管理协议
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class Simple : def __enter__ (self ): print ("enter" ) return self def __exit__ (self, exc_type, exc_val, exc_tb ): print ("exit" ) def do_something (self ): print ("do something" ) with Simple() as simple: simple.do_something()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import contextlib@contextlib.contextmanager def test (x ): print ("start..." ) yield print ("end..." ) with test(1 ) as f: print ("进行中" ) start... 进行中 end...
1 from collections import abc
切片 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 """ 其中,第一个数字start表示切片开始位置,默认为0; 第二个数字end表示切片截止(但不包含)位置(默认为列表长度); 第三个数字step表示切片的步长(默认为1)。 当start为0时可以省略,当end为列表长度时可以省略, 当step为1时可以省略,并且省略步长时可以同时省略最后一个冒号。 另外,当step为负整数时,表示反向切片,这时start应该比end的值要大才行。 """ aList = [3 , 4 , 5 , 6 , 7 , 9 , 11 , 13 , 15 , 17 ] print (aList[::]) print (aList[::-1 ]) print (aList[::2 ]) print (aList[1 ::2 ]) print (aList[3 :6 ]) aList[0 :100 ] aList[100 :] aList[len (aList):] = [9 ] aList[:0 ] = [1 , 2 ] aList[3 :3 ] = [4 ] aList[:3 ] = [1 , 2 ] aList[3 :] = [4 , 5 , 6 ] aList[::2 ] = [0 ] * 3 print (aList)aList[::2 ] = ['a' , 'b' , 'c' ] aList[::2 ] = [1 ,2 ] aList[:3 ] = [] del aList[:3 ] del aList[::2 ]
自定义可切片对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import numbersclass Group : def __init__ (self, group_name, company_name, staffs ): self.group_name = group_name self.company_name = company_name self.staffs = staffs def __reversed__ (self ): self.staffs.reverse() def __getitem__ (self, item ): cls = type (self) if isinstance (item, slice ): return cls(group_name=self.group_name, company_name=self.company_name, staffs=self.staffs[item]) elif isinstance (item, numbers.Integral): return cls(group_name=self.group_name, company_name=self.company_name, staffs=[self.staffs[item]]) def __len__ (self ): return len (self.staffs) def __iter__ (self ): return iter (self.staffs) def __contains__ (self, item ): if item in self.staffs: return True else : return False staffs = ["cwz" , "reese" , "neo" , "setcreed" ] group = Group("user" , "imooc" , staffs) print (group[:2 ])print (len (group))for user in group: print (user) reversed (group)if "cwz" in group: print ("ok" )
bisect维护已排序序列 1 2 3 4 5 6 7 8 9 10 11 12 13 import bisectinter_list = [] bisect.insort(inter_list, 3 ) bisect.insort(inter_list, 1 ) bisect.insort(inter_list, 5 ) bisect.insort(inter_list, 7 ) bisect.insort(inter_list, 0 ) print (inter_list) print (bisect.bisect(inter_list, 3 ))
列表推导式、生成器表达式、字典推导式 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 int_list = [1 ,2 ,3 ,4 ,5 ] qu_list = [item * item for item in int_list] print (type (qu_list))int_list = [1 ,2 ,-3 ,4 ,5 ] qu_list = [item if item > 0 else abs (item) for item in int_list] int_list1 = [1 ,2 ] int_list2 = [3 ,4 ] qu_list = [(first, second) for first in int_list1 for second in int_list2] my_dict = { "key1" :"bobby1" , "key2" :"bobby2" } int_list = [1 ,2 ,3 ,4 ,5 ] def process_item (item ): return str (item) int_dict = {process_item(item):item for item in int_list} print (int_dict)
dict的abc继承关系 1 2 3 4 5 from import Mapping, MutableMappinga = {} print (isinstance (a, MutableMapping))
dict的子类 1 2 3 4 5 6 7 8 from collections import UserDict, defaultdictclass Mydict (UserDict ): def __setitem__ (self, key, item ): return super ().__setitem__(key, item*2 ) my_dict = Mydict(one=2 ) print (my_dict)
dict和set实现原理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 from random import randintdef load_list_data (total_nums, target_nums ): """ 从文件中读取数据,以list的方式返回 :param total_nums: 读取的数量 :param target_nums: 需要查询的数据的数量 """ all_data = [] target_data = [] file_name = "G:/慕课网课程/AdvancePython/fbobject_idnew.txt" with open (file_name, encoding="utf8" , mode="r" ) as f_open: for count, line in enumerate (f_open): if count < total_nums: all_data.append(line) else : break for x in range (target_nums): random_index = randint(0 , total_nums) if all_data[random_index] not in target_data: target_data.append(all_data[random_index]) if len (target_data) == target_nums: break return all_data, target_data def load_dict_data (total_nums, target_nums ): """ 从文件中读取数据,以dict的方式返回 :param total_nums: 读取的数量 :param target_nums: 需要查询的数据的数量 """ all_data = {} target_data = [] file_name = "G:/慕课网课程/AdvancePython/fbobject_idnew.txt" with open (file_name, encoding="utf8" , mode="r" ) as f_open: for count, line in enumerate (f_open): if count < total_nums: all_data[line] = 0 else : break all_data_list = list (all_data) for x in range (target_nums): random_index = randint(0 , total_nums-1 ) if all_data_list[random_index] not in target_data: target_data.append(all_data_list[random_index]) if len (target_data) == target_nums: break return all_data, target_data def find_test (all_data, target_data ): test_times = 100 total_times = 0 import time for i in range (test_times): find = 0 start_time = time.time() for data in target_data: if data in all_data: find += 1 last_time = time.time() - start_time total_times += last_time return total_times/test_times if __name__ == "__main__" : all_data, target_data = load_dict_data(2000000 , 1000 ) last_time = find_test(all_data, target_data) print (last_time)
python的变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 a = 1 a = "abc" a = [1 ,2 ,3 ] b = a print (id (a), id (b))print (a is b)a = [1 ,2 ,3 ,4 ] b = [1 ,2 ,3 ,4 ] class People : pass person = People() if type (person) is People: print ("yes" )
一个经典的错误: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 def add (a, b ): a += b return a class Company : def __init__ (self, name, staffs=[] ): = name self.staffs = staffs def add (self, staff_name ): self.staffs.append(staff_name) def remove (self, staff_name ): self.staffs.remove(staff_name) if __name__ == "__main__" : com1 = Company("com1" , ["bobby1" , "bobby2" ]) com1.add("bobby3" ) com1.remove("bobby1" ) print (com1.staffs) com2 = Company("com2" ) com2.add("bobby" ) print (com2.staffs) print (Company.__init__.__defaults__) com3 = Company("com3" ) com3.add("bobby5" ) print (com2.staffs) print (com3.staffs) print (com2.staffs is com3.staffs)
元类编程 property动态属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from datetime import datetime, dateclass User : def __init__ (self, name, birthday ): = name self.birthday = birthday self._age = 0 @property def age (self ): return - self.birthday.year @age.setter def age (self, value ): self._age = value if __name__ == '__main__' : user = User("cwz" , date(year=1994 , month=5 , day=10 )) user.age = 30 print (user._age) print (user.age)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from datetime import dateclass User : def __init__ (self, name, birthday, info={} ): = name self.birthday = birthday = info def __getattr__ (self, item ): return[item] if __name__ == '__main__' : user = User("cwz" , date(year=1994 , month=5 , day=10 ), info={"height" : 178 }) print (user.height)
属性描述符 举个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 class User : def __init__ (self, name, birthday ): = name self.birthday = birthday self._age = 0 @property def age (self ): return - self.birthday.year @age.setter def age (self, value ): self._age = value
1 2 3 4 5 6 7 8 9 class IntField : def __get__ (self, instance, owner ): pass def __set__ (self, instance, value ): pass def __delete__ (self, instance ): pass
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import numbersclass IntField : def __get__ (self, instance, owner ): return self.value def __set__ (self, instance, value ): if not isinstance (value, numbers.Integral): raise ValueError('int value need' ) self.value = value def __delete__ (self, instance ): pass class NonDataField : def __get__ (self, instance, owner ): pass class User : age = IntField() if __name__ == '__main__' : user = User() user.age = 30 print (user.age) ''' 如果user是某个类的实例,那么user.age(以及等价的getattr(user,’age’)) 首先调用__getattribute__。如果类定义了__getattr__方法, 那么在__getattribute__抛出 AttributeError 的时候就会调用到__getattr__, 而对于描述符(__get__)的调用,则是发生在__getattribute__内部的。 user = User(), 那么user.age 顺序如下: (1)如果“age”是出现在User或其基类的__dict__中, 且age是data descriptor (数据属性描述符), 那么调用其__get__方法, 否则 (2)如果“age”出现在user的__dict__中, 那么直接返回 obj.__dict__[‘age’], 否则 (3)如果“age”出现在User或其基类的__dict__中 (3.1)如果age是non-data descriptor,那么调用其__get__方法, 否则 (3.2)返回 __dict__[‘age’] (4)如果User有__getattr__方法,调用__getattr__方法,否则 (5)抛出AttributeError '''
1 2 3 4 5 6 7 8 9 10 11 12 13 class User : def __new__ (cls, *args, **kwargs ): print (" in new " ) return super ().__new__(cls) def __init__ (self, name ): print (" in init" ) pass a = int () if __name__ == "__main__" : user = User(name="bobby" )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def create_class (name ): if name == "user" : class User : def __str__ (self ): return "user" return User elif name == "company" : class Company : def __str__ (self ): return "company" return Company if __name__ == '__main__' : my_class = create_class("user" ) print (my_class())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 def create_class (name ): if name == "user" : class User : def __str__ (self ): return "user" return User elif name == "company" : class Company : def __str__ (self ): return "company" return Company def say (self ): return "i am user" class BaseClass (): def answer (self ): return "i am baseclass" class MetaClass (type ): def __new__ (cls, *args, **kwargs ): return super ().__new__(cls, *args, **kwargs) from import *class User (metaclass=MetaClass): def __init__ (self, name ): = name def __str__ (self ): return "user" if __name__ == "__main__" : my_obj = User(name="bobby" ) print (my_obj)
元类实现简单的ORM 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 import numbersclass Field : pass class IntField (Field ): def __init__ (self, db_column, min_value=None , max_value=None ): self._value = None self.min_value = min_value self.max_value = max_value self.db_column = db_column if min_value is not None : if not isinstance (min_value, numbers.Integral): raise ValueError("min_value must be int" ) elif min_value < 0 : raise ValueError("min_value must be positive int" ) if max_value is not None : if not isinstance (max_value, numbers.Integral): raise ValueError("max_value must be int" ) elif max_value < 0 : raise ValueError("max_value must be positive int" ) if min_value is not None and max_value is not None : if min_value > max_value: raise ValueError("min_value must be smaller than max_value" ) def __get__ (self, instance, owner ): return self._value def __set__ (self, instance, value ): if not isinstance (value, numbers.Integral): raise ValueError("int value need" ) if value < self.min_value or value > self.max_value: raise ValueError("value must between min_value and max_value" ) self._value = value class CharField (Field ): def __init__ (self, db_column, max_length=None ): self._value = None self.db_column = db_column if max_length is None : raise ValueError("you must spcify max_lenth for charfiled" ) self.max_length = max_length def __get__ (self, instance, owner ): return self._value def __set__ (self, instance, value ): if not isinstance (value, str ): raise ValueError("string value need" ) if len (value) > self.max_length: raise ValueError("value len excess len of max_length" ) self._value = value class ModelMetaClass (type ): def __new__ (cls, name, bases, attrs, **kwargs ): if name == "BaseModel" : return super ().__new__(cls, name, bases, attrs, **kwargs) fields = {} for key, value in attrs.items(): if isinstance (value, Field): fields[key] = value attrs_meta = attrs.get("Meta" , None ) _meta = {} db_table = name.lower() if attrs_meta is not None : table = getattr (attrs_meta, "db_table" , None ) if table is not None : db_table = table _meta["db_table" ] = db_table attrs["_meta" ] = _meta attrs["fields" ] = fields del attrs["Meta" ] return super ().__new__(cls, name, bases, attrs, **kwargs) class BaseModel (metaclass=ModelMetaClass): def __init__ (self, *args, **kwargs ): for key, value in kwargs.items(): setattr (self, key, value) return super ().__init__() def save (self ): fields = [] values = [] for key, value in self.fields.items(): db_column = value.db_column if db_column is None : db_column = key.lower() fields.append(db_column) value = getattr (self, key) values.append(str (value)) sql = "insert {db_table}({fields}) value({values})" .format (db_table=self._meta["db_table" ], fields="," .join(fields), values="," .join(values)) pass class User (BaseModel ): name = CharField(db_column="name" , max_length=10 ) age = IntField(db_column="age" , min_value=1 , max_value=100 ) class Meta : db_table = "user" if __name__ == "__main__" : user = User(name="bobby" , age=28 )
python的迭代协议 1 2 3 4 5 6 7 8 9 10 from import Iterable, Iteratora = [1 ,2 ] iter_rator = iter (a) print (isinstance (a, Iterable))print (isinstance (iter_rator, Iterator))
迭代器和可迭代对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 from import Iteratorclass Company (object ): def __init__ (self, employee_list ): self.employee = employee_list def __iter__ (self ): return MyIterator(self.employee) class MyIterator (Iterator ): def __init__ (self, employee_list ): self.iter_list = employee_list self.index = 0 def __next__ (self ): try : word = self.iter_list[self.index] except IndexError: raise StopIteration self.index += 1 return word if __name__ == "__main__" : company = Company(["tom" , "bob" , "jane" ]) my_itor = iter (company) for item in company: print (item)
生成器的使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 def gen_func (): yield 1 yield 2 yield 3 def fib (index ): if index <= 2 : return 1 else : return fib(index-1 ) + fib(index-2 ) def fib2 (index ): re_list = [] n,a,b = 0 ,0 ,1 while n<index: re_list.append(b) a,b = b, a+b n += 1 return re_list def gen_fib (index ): n,a,b = 0 ,0 ,1 while n<index: yield b a,b = b, a+b n += 1 for data in gen_fib(10 ): print (data) def func (): return 1 if __name__ == "__main__" : gen = gen_func() for value in gen: print (value)
python是如何实现生成器的 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 """ """ import inspectframe = None def foo (): bar() def bar (): global frame frame = inspect.currentframe() """ python一切皆对象,栈帧对象, 字节码对象 当foo调用子函数 bar, 又会创建一个栈帧 所有的栈帧都是分配在堆内存上,这就决定了栈帧可以独立于调用者存在 """ foo() print (frame.f_code.co_name)caller_frame = frame.f_back print (caller_frame.f_code.co_name)def gen_func (): yield 1 name = "bobby" yield 2 age = 30 return "imooc" import disgen = gen_func() print (dis.dis(gen))print (gen.gi_frame.f_lasti)print (gen.gi_frame.f_locals)next (gen)print (gen.gi_frame.f_lasti)print (gen.gi_frame.f_locals)next (gen)print (gen.gi_frame.f_lasti)print (gen.gi_frame.f_locals)class company : def __getitem__ (self, item ): pass from collections import UserList
生成器在UserList中的应用 生成器读取大文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def myreadlines (f, newline ): buf = "" while True : while newline in buf: pos = buf.index(newline) yield buf[:pos] buf = buf[pos + len (newline):] chunk = ) if not chunk: yield buf break buf += chunk with open ("input.txt" ) as f: for line in myreadlines(f, "{|}" ): print (line)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import socketimport threadingserver = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.bind(('' , 8000 )) server.listen() def handle_sock (sock, addr ): while True : data = sock.recv(1024 ) print (data.decode("utf8" )) re_data = input () sock.send(re_data.encode("utf8" )) while True : sock, addr = server.accept() client_thread = threading.Thread(target=handle_sock, args=(sock, addr)) client_thread.start() import socketclient = socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect(('' , 8000 )) while True : re_data = input () client.send(re_data.encode("utf8" )) data = client.recv(1024 ) print (data.decode("utf8" ))
socket模拟http请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import socketfrom urllib.parse import urlparsedef get_url (url ): url = urlparse(url) host = url.netloc path = url.path if path == "" : path = "/" client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, 80 )) client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n" .format (path, host).encode("utf8" )) data = b"" while True : d = client.recv(1024 ) if d: data += d else : break data = data.decode("utf8" ) html_data = data.split("\r\n\r\n" )[1 ] print (html_data) client.close() if __name__ == "__main__" : import time start_time = time.time() for url in range (20 ): url = "{}/" .format (url) get_url(url) print (time.time()-start_time)
python中的GIL 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 total = 0 def add (): global total for i in range (1000000 ): total += 1 def desc (): global total for i in range (1000000 ): total -= 1 import threadingthread1 = threading.Thread(target=add) thread2 = threading.Thread(target=desc) thread1.start() thread2.start() thread1.join() thread2.join() print (total)
python多线程编程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import timeimport threadingdef get_detail_html (url ): print ("get detail html started" ) time.sleep(2 ) print ("get detail html end" ) def get_detail_url (url ): print ("get detail url started" ) time.sleep(4 ) print ("get detail url end" ) class GetDetailHtml (threading.Thread): def __init__ (self, name ): super ().__init__(name=name) def run (self ): print ("get detail html started" ) time.sleep(2 ) print ("get detail html end" ) class GetDetailUrl (threading.Thread): def __init__ (self, name ): super ().__init__(name=name) def run (self ): print ("get detail url started" ) time.sleep(4 ) print ("get detail url end" ) if __name__ == "__main__" : thread1 = GetDetailHtml("get_detail_html" ) thread2 = GetDetailUrl("get_detail_url" ) start_time = time.time() thread1.start() thread2.start() thread1.join() thread2.join() print ("last time: {}" .format (time.time()-start_time))
线程通信方式- 共享变量 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 from queue import Queueimport timeimport threadingdef get_detail_html (queue ): while True : url = queue.get() print ("get detail html started" ) time.sleep(2 ) print ("get detail html end" ) def get_detail_url (queue ): while True : print ("get detail url started" ) time.sleep(4 ) for i in range (20 ): queue.put("{id}" .format (id =i)) print ("get detail url end" ) if __name__ == "__main__" : detail_url_queue = Queue(maxsize=1000 ) thread_detail_url = threading.Thread(target=get_detail_url, args=(detail_url_queue,)) for i in range (10 ): html_thread = threading.Thread(target=get_detail_html, args=(detail_url_queue,)) html_thread.start() start_time = time.time() detail_url_queue.task_done() detail_url_queue.join()
线程同步Lock,Rlock(可重入锁) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 from threading import Lock, RLock total = 0 lock = RLock() def add (): global lock global total for i in range (1000000 ): lock.acquire() lock.acquire() total += 1 lock.release() lock.release() def desc (): global total global lock for i in range (1000000 ): lock.acquire() total -= 1 lock.release() import threadingthread1 = threading.Thread(target=add) thread2 = threading.Thread(target=desc) thread1.start() thread2.start() thread1.join() thread2.join() print (total)""" A(a、b) acquire (a) acquire (b) B(a、b) acquire (a) acquire (b) """
线程同步 Condition(条件变量) 用于线程间复杂的同步 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 import threadingclass XiaoAi (threading.Thread): def __init__ (self, cond ): super ().__init__(name="小爱" ) self.cond = cond def run (self ): with self.cond: self.cond.wait() print ("{} : 在 " .format ( self.cond.notify() self.cond.wait() print ("{} : 好啊 " .format ( self.cond.notify() self.cond.wait() print ("{} : 君住长江尾 " .format ( self.cond.notify() self.cond.wait() print ("{} : 共饮长江水 " .format ( self.cond.notify() self.cond.wait() print ("{} : 此恨何时已 " .format ( self.cond.notify() self.cond.wait() print ("{} : 定不负相思意 " .format ( self.cond.notify() class TianMao (threading.Thread): def __init__ (self, cond ): super ().__init__(name="天猫精灵" ) self.cond = cond def run (self ): with self.cond: print ("{} : 小爱同学 " .format ( self.cond.notify() self.cond.wait() print ("{} : 我们来对古诗吧 " .format ( self.cond.notify() self.cond.wait() print ("{} : 我住长江头 " .format ( self.cond.notify() self.cond.wait() print ("{} : 日日思君不见君 " .format ( self.cond.notify() self.cond.wait() print ("{} : 此水几时休 " .format ( self.cond.notify() self.cond.wait() print ("{} : 只愿君心似我心 " .format ( self.cond.notify() self.cond.wait() if __name__ == "__main__" : from concurrent import futures cond = threading.Condition() xiaoai = XiaoAi(cond) tianmao = TianMao(cond) xiaoai.start() tianmao.start()
线程同步 - Semaphore 使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import threadingimport timeclass HtmlSpider (threading.Thread): def __init__ (self, url, sem ): super ().__init__() self.url = url self.sem = sem def run (self ): time.sleep(2 ) print ("got html text success" ) self.sem.release() class UrlProducer (threading.Thread): def __init__ (self, sem ): super ().__init__() self.sem = sem def run (self ): for i in range (20 ): self.sem.acquire() html_thread = HtmlSpider("{}" .format (i), self.sem) html_thread.start() if __name__ == "__main__" : sem = threading.Semaphore(3 ) url_producer = UrlProducer(sem) url_producer.start()
线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 from concurrent.futures import ThreadPoolExecutor, as_completed, wait, FIRST_COMPLETEDfrom concurrent.futures import Futurefrom multiprocessing import Poolimport timedef get_html (times ): time.sleep(times) print ("get page {} success" .format (times)) return times executor = ThreadPoolExecutor(max_workers=2 ) urls = [3 ,2 ,4 ] all_task = [executor.submit(get_html, (url)) for url in urls] wait(all_task, return_when=FIRST_COMPLETED) print ("main" )
多线程和多进程对比 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import timefrom concurrent.futures import ThreadPoolExecutor, as_completedfrom concurrent.futures import ProcessPoolExecutordef random_sleep (n ): time.sleep(n) return n if __name__ == "__main__" : with ProcessPoolExecutor(3 ) as executor: all_task = [executor.submit(random_sleep, (num)) for num in [2 ]*30 ] start_time = time.time() for future in as_completed(all_task): data = future.result() print ("exe result: {}" .format (data)) print ("last time is: {}" .format (time.time()-start_time))
multiprocessing 多进程编程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import multiprocessingimport timedef get_html (n ): time.sleep(n) print ("sub_progress success" ) return n if __name__ == "__main__" : pool = multiprocessing.Pool(multiprocessing.cpu_count()) for result in pool.imap_unordered(get_html, [1 ,5 ,3 ]): print ("{} sleep success" .format (result))
进程间通信 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 import timefrom multiprocessing import Process, Queue, Pool, Manager, Pipedef add_data (p_dict, key, value ): p_dict[key] = value if __name__ == "__main__" : progress_dict = Manager().dict () from queue import PriorityQueue first_progress = Process(target=add_data, args=(progress_dict, "bobby1" , 22 )) second_progress = Process(target=add_data, args=(progress_dict, "bobby2" , 23 )) first_progress.start() second_progress.start() first_progress.join() second_progress.join() print (progress_dict)
协程 并发、并行、同步、异步、阻塞、非阻塞
IO 多路复用 (select、poll 和 epoll)
C10k问题:是一个在1999年被提出的技术挑战,如何在一颗1GHz CPU,2G内存,1gbps网络环境下,让单台服务器同时为1万个客户端提供FTP服务
epoll并不代表一定比select好 在并发高的情况下,连接活跃度不是很高, epoll比select 并发性不高,同时连接很活跃, select比epoll好
通过非阻塞io实现http请求: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 import socketfrom urllib.parse import urlparsedef get_url (url ): url = urlparse(url) host = url.netloc path = url.path if path == "" : path = "/" client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.setblocking(False ) try : client.connect((host, 80 )) except BlockingIOError as e: pass while True : try : client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n" .format (path, host).encode("utf8" )) break except OSError as e: pass data = b"" while True : try : d = client.recv(1024 ) except BlockingIOError as e: continue if d: data += d else : break data = data.decode("utf8" ) html_data = data.split("\r\n\r\n" )[1 ] print (html_data) client.close() if __name__ == "__main__" : get_url("" )
select+回调+事件循环 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 import socketfrom urllib.parse import urlparsefrom selectors import DefaultSelector, EVENT_READ, EVENT_WRITEselector = DefaultSelector() urls = [] stop = False class Fetcher : def connected (self, key ): selector.unregister(key.fd) self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n" .format (self.path,"utf8" )) selector.register(self.client.fileno(), EVENT_READ, self.readable) def readable (self, key ): d = self.client.recv(1024 ) if d: += d else : selector.unregister(key.fd) data ="utf8" ) html_data = data.split("\r\n\r\n" )[1 ] print (html_data) self.client.close() urls.remove(self.spider_url) if not urls: global stop stop = True def get_url (self, url ): self.spider_url = url url = urlparse(url) = url.netloc self.path = url.path = b"" if self.path == "" : self.path = "/" self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client.setblocking(False ) try : self.client.connect((, 80 )) except BlockingIOError as e: pass selector.register(self.client.fileno(), EVENT_WRITE, self.connected) def loop (): while not stop: ready = for key, mask in ready: call_back = call_back(key) if __name__ == "__main__" : fetcher = Fetcher() import time start_time = time.time() for url in range (20 ): url = "{}/" .format (url) urls.append(url) fetcher = Fetcher() fetcher.get_url(url) loop() print (time.time()-start_time)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def get_url (url ): html = get_html(url) urls = parse_url(html) def get_url (url ): html = get_html(url) urls = parse_url(html)
在调用send发送非None值之前,我们必须启动一次生成器,方式有两种,1、gen.send(None) 2、next(gen)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def gen_func (): html = yield "" print (html) yield 2 yield 3 return "cwz" if __name__ == '__main__' : gen = gen_func() gen.send(None ) html_content = "网址内容" print (gen.send(html_content)) 网址内容 2
会关闭生成器,RuntimeError: generator ignored GeneratorExit
不捕捉异常是不会抛错的,gen.close() 之后再next(gen),会抛出StopIteration
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def gen_func (): try : yield "" except GeneratorExit: pass yield 2 yield 3 return "cwz" if __name__ == '__main__' : gen = gen_func() print (next (gen)) gen.close()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def gen_func (): try : yield "" except Exception: pass yield 2 yield 3 return "cwz" if __name__ == '__main__' : gen = gen_func() print (next (gen)) gen.throw(Exception, "download error" ) print (next (gen))
yield_from python3.3新加了yield from语法
1 2 3 4 5 6 7 8 9 10 from itertools import chainmy_list = [1 , 2 , 3 ] my_dict = { "name" : "reese" , "age" : 18 } for value in chain(my_list, my_dict, range (2 , 5 )): print (value)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 my_list = [1 , 2 , 3 ] my_dict = { "name" : "reese" , "age" : 18 } def my_chain (*args, **kwargs ): for iterable_value in args: for value in iterable_value: yield value for i in my_chain(my_list, my_dict): print (i)
yield from + 可迭代对象,举个例子说明yield from和yield的区别
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def g1 (iterable ): yield iterable def g2 (iterable ): yield from iterable for i in g1(range (5 )): print (i) for i in g2(range (5 )): print (i) range (0 , 5 )0 1 2 3 4
yield from的一些概念:
1 2 3 4 5 6 7 8 9 10 11 def g1 (gen ): yield from gen def main (): g = g1(1 ) g.send(None )
yield from的用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 final_result = {} def sales_sum (pro_name ): total = 0 nums = [] while True : x = yield print (pro_name + "销量: " , x) if not x: break total += x nums.append(x) return total, nums def middle (key ): while True : final_result[key] = yield from sales_sum(key) print (key + "销量统计完成!!" ) def main (): data_sets = { "小明牌面膜" : [1200 , 1500 , 3000 ], "小明牌手机" : [28 , 55 , 98 , 108 ], "小明牌大衣" : [280 , 560 , 778 , 70 ], } for key, data_set in data_sets.items(): print ("start key:" , key) m = middle(key) m.send(None ) for value in data_set: m.send(value) m.send(None ) print ("final_result:" , final_result) if __name__ == '__main__' : main()
上面的代码中,如果将yield from直接替换成子生成器sales_num方法,会怎么样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def sales_sum (pro_name ): total = 0 nums = [] while True : x = yield print (pro_name + "销量: " , x) if not x: break total += x nums.append(x) return total, nums if __name__ == "__main__" : my_gen = sales_sum("小明牌手机" ) my_gen.send(None ) my_gen.send(1200 ) my_gen.send(1500 ) my_gen.send(3000 ) try : my_gen.send(None ) except StopIteration as e: result = e.value print (result)
需要手动处理StopIteration异常,才能将处理的值返回到 final_result[key]这里。
yield from原理说明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 """ _i:子生成器,同时也是一个迭代器 _y:子生成器生产的值 _r:yield from 表达式最终的值 _s:调用方通过send()发送的值 _e:异常对象 """ _i = iter (EXPR) try : _y = next (_i) except StopIteration as _e: _r = _e.value else : while 1 : _s = yield _y try : _y = _i.send(_s) except StopIteration as _e: _r = _e.value break RESULT = _r
当调用方使用next()或者.send(None)时,都要在子生成器上调用next()函数,当调用方使用.send()发送非 None 值时,才调用子生成器的.send()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 _i = iter (EXPR) try : _y = next (_i) except StopIteration as _e: _r = _e.value else : while 1 : try : _s = yield _y except GeneratorExit as _e: try : _m = _i.close except AttributeError: pass else : _m() raise _e except BaseException as _e: _x = sys.exc_info() try : _m = _i.throw except AttributeError: raise _e else : try : _y = _m(*_x) except StopIteration as _e: _r = _e.value break else : try : if _s is None : _y = next (_i) else : _y = _i.send(_s) except StopIteration as _e: _r = _e.value break RESULT = _r
while 1上面的逻辑和前面的一致,都是先获取一个迭代器然后next(),处理StopIteration。但是在调用while 1的时候,可能从外部接收到GeneratorExit(可能直接按了ctrl+c),这里首先获取_i
子生成器生产的值,都是直接传给调用方的;调用方通过.send()发送的值都是直接传递给子生成器的;如果发送的是 None,会调用子生成器的__next__()
方法,如果不是 None,会调用子生成器的.send()方法;
子生成器退出的时候,最后的return EXPR,会触发一个StopIteration(EXPR)异常;
yield from表达式的值,是子生成器终止时,传递给StopIteration异常的第一个参数;
如果调用的时候出现StopIteration异常,委托生成器会恢复运行,同时其他的异常会向上 “冒泡”;
传入委托生成器的异常里,除了GeneratorExit之外,其他的所有异常全部传递给子生成器的.throw()方法;如果调用.throw()的时候出现了StopIteration异常,那么就恢复委托生成器的运行,其他的异常全部向上 “冒泡”;
如果在委托生成器上调用.close()或传入GeneratorExit异常,会调用子生成器的.close()方法,没有的话就不调用。如果在调用.close()的时候抛出了异常,那么就向上 “冒泡”,否则的话委托生成器会抛出GeneratorExit异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from import Awaitableasync def downloader (url ): return "url" async def download_url (url ): html = await downloader(url) return html if __name__ == '__main__' : coro = download_url("" ) coro.send(None )
生成器是如何变成协程的: 我们希望协程是能够用单线程来调度的,这样我们就不需要像线程一样让操作系统去调度。协程是由程序员自己去调度的,它没有深入到内核级别去进行调度,进程和线程都是内核级别的调度,而协程是函数级别的调度。我们希望协程是由自己来决定什么时候来调用,并且能够像写同步代码一样来编写异步代码。 而生成器就可以完成协程这样的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import inspectdef gen_func (): yield 1 return "cwz" if __name__ == '__main__' : gen = gen_func() print (inspect.getgeneratorstate(gen)) next (gen) print (inspect.getgeneratorstate(gen)) try : next (gen) except StopIteration: pass print (inspect.getgeneratorstate(gen))
1 2 3 def gen_func (): value = yield 1 return "cwz"
上面的代码中,value = yield 1有两层含义:
其中有了这两层含义就可以看作是协程了,现在可以消费从外面传递进来的数据。之前的生成器只是一个生产者,只是yield值出去,现在变成了一个消费者,可以接收值进来做一个处理。这种模式就可以称作是协程了,再加上yield from,就更加是了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import socketdef get_socket_data (): yield "bobby" def downloader (url ): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.setblocking(False ) try : client.connect((host, 80 )) except BlockingIOError as e: pass selector.register(self.client.fileno(), EVENT_WRITE, self.connected) source = yield from get_socket_data() data = source.decode("utf8" ) html_data = data.split("\r\n\r\n" )[1 ] print (html_data) def download_html (html ): html = yield from downloader() if __name__ == "__main__" : pass
asyncio并发编程 asyncio模块介绍
基于yield from的协议和任务,可以让你用顺序的方式编写并发代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import asyncioimport timeasync def get_html (url ): print ("start get html" ) await asyncio.sleep(3 ) print ("end get html" ) if __name__ == '__main__' : start = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(get_html("" )) print ("use time:" , time.time() - start)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import asyncioimport timeasync def get_html (url ): print ("start get html" ) await asyncio.sleep(3 ) print ("end get html" ) if __name__ == '__main__' : start = time.time() loop = asyncio.get_event_loop() tasks = [get_html("" ) for i in range (10 )] loop.run_until_complete(asyncio.wait(tasks)) print ("use time:" , time.time() - start)
异步编程中不能使用同步阻塞的代码,上述代码中如果await asyncio.sleep(3)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import asyncioimport timeasync def get_html (url ): print ("start get html" ) await asyncio.sleep(3 ) print ("end get html" ) return "cwz" if __name__ == '__main__' : loop = asyncio.get_event_loop() get_feature = asyncio.ensure_future(get_html("" )) loop.run_until_complete(get_feature) print (get_feature.result())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import asyncioimport timeasync def get_html (url ): print ("start get html" ) await asyncio.sleep(3 ) print ("end get html" ) return "cwz" if __name__ == '__main__' : loop = asyncio.get_event_loop() task = loop.create_task(get_html("" )) loop.run_until_complete(task) print (task.result())
ensure_future最终还是调用的是create_task,将传递进来的协程包装成一个task,并且将协程注册到loop队列里面去。 ensure_future源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def ensure_future (coro_or_future, *, loop=None ): """Wrap a coroutine or an awaitable in a future. If the argument is a Future, it is returned directly. """ if coroutines.iscoroutine(coro_or_future): if loop is None : loop = events.get_event_loop() task = loop.create_task(coro_or_future) if task._source_traceback: del task._source_traceback[-1 ] return task elif futures.isfuture(coro_or_future): if loop is not None and loop is not futures._get_loop(coro_or_future): raise ValueError('The future belongs to a different loop than ' 'the one specified as the loop argument' ) return coro_or_future elif inspect.isawaitable(coro_or_future): return ensure_future(_wrap_awaitable(coro_or_future), loop=loop) else : raise TypeError('An asyncio.Future, a coroutine or an awaitable is ' 'required' )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asynciofrom functools import partialasync def get_html (url ): print ("start get html" ) await asyncio.sleep(3 ) print ("end get html" ) return "cwz" def callback (url, future ): print (future) print ("send email to cwz..." ) if __name__ == '__main__' : loop = asyncio.get_event_loop() task = loop.create_task(get_html("" )) task.add_done_callback(partial(callback, "" )) loop.run_until_complete(task) print (task.result())
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asyncioimport timeasync def get_html (url ): print ("start get url" ) await asyncio.sleep(2 ) print ("end get url" ) if __name__ == "__main__" : start_time = time.time() loop = asyncio.get_event_loop() tasks = [get_html("" ) for i in range (10 )] group1 = [get_html("" ) for i in range (2 )] group2 = [get_html("" ) for i in range (2 )] group1 = asyncio.gather(*group1) group2 = asyncio.gather(*group2) group2.cancel() loop.run_until_complete(asyncio.gather(group1, group2)) print (time.time() - start_time)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import asyncioimport timeasync def get_html (sleep_times ): print ("waiting" ) await asyncio.sleep(sleep_times) print ("done after {}s" .format (sleep_times)) if __name__ == "__main__" : task1 = get_html(2 ) task2 = get_html(3 ) task3 = get_html(3 ) tasks = [task1, task2, task3] loop = asyncio.get_event_loop() try : loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: all_tasks = asyncio.Task.all_tasks() for task in all_tasks: print ("cancel task" ) print (task.cancel()) loop.stop() loop.run_forever() finally : loop.close()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import asynciodef callback (sleep_times, loop ): print ("success time {}" .format (loop.time())) def stoploop (loop ): loop.stop() if __name__ == "__main__" : loop = asyncio.get_event_loop() now = loop.time() loop.call_at(now + 2 , callback, 2 , loop) loop.call_at(now + 1 , callback, 1 , loop) loop.call_at(now + 3 , callback, 3 , loop) loop.call_soon(callback, 4 , loop) loop.run_forever()
ThreadPollExecutor 和 asycio 完成阻塞 IO 请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import asynciofrom concurrent.futures import ThreadPoolExecutorimport socketfrom urllib.parse import urlparsedef get_url (url ): url = urlparse(url) host = url.netloc path = url.path if path == "" : path = "/" client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, 80 )) client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n" .format (path, host).encode("utf8" )) data = b"" while True : d = client.recv(1024 ) if d: data += d else : break data = data.decode("utf8" ) html_data = data.split("\r\n\r\n" )[1 ] print (html_data) client.close() if __name__ == "__main__" : import time start_time = time.time() loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(3 ) tasks = [] for url in range (20 ): url = "{}/" .format (url) task = loop.run_in_executor(executor, get_url, url) tasks.append(task) loop.run_until_complete(asyncio.wait(tasks)) print ("last time:{}" .format (time.time() - start_time))
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import asyncioimport socketfrom urllib.parse import urlparseasync def get_url (url ): url = urlparse(url) host = url.netloc path = url.path if path == "" : path = "/" reader, writer = await asyncio.open_connection(host, 80 ) writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n" .format (path, host).encode("utf8" )) all_lines = [] async for raw_line in reader: data = raw_line.decode("utf8" ) all_lines.append(data) html = "\n" .join(all_lines) return html async def main (): tasks = [] for url in range (20 ): url = "{}/" .format (url) tasks.append(asyncio.ensure_future(get_url(url))) for task in asyncio.as_completed(tasks): result = await task print (result) if __name__ == "__main__" : import time start_time = time.time() loop = asyncio.get_event_loop() loop.run_until_complete(main()) print ('last time:{}' .format (time.time() - start_time))
async with 调用了__await__
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 import asyncioimport reimport aiohttpimport aiomysqlfrom pyquery import PyQuerystopping = False start_url = "" waiting_urls = [] seen_urls = set () sem = asyncio.Semaphore(3 ) async def fetch (url, session ): async with sem: try : async with session.get(url) as resp: print ("url status: {}" .format (resp.status)) if resp.status in [200 , 201 ]: data = await resp.text() return data except Exception as e: print (e) def extract_urls (html ): urls = [] pq = PyQuery(html) for link in pq.items("a" ): url = link.attr("href" ) if url and url.startswith("http" ) and url not in seen_urls: urls.append(url) waiting_urls.append(url) return urls async def init_urls (url, session ): html = await fetch(url, session) seen_urls.add(url) extract_urls(html) async def article_handler (url, session, pool ): html = await fetch(url, session) seen_urls.add(url) extract_urls(html) pq = PyQuery(html) title = pq("title" ).text() async with pool.acquire() as conn: async with conn.cursor() as cur: insert_sql = "insert into article(title) values('{}') " .format (title) await cur.execute(insert_sql) async def consumer (pool ): async with aiohttp.ClientSession() as session: while not stopping: if len (waiting_urls) == 0 : await asyncio.sleep(0.5 ) continue url = waiting_urls.pop() print ("start get url: {}" .format (url)) if re.match ("http://.*?\d+/" , url): if url not in seen_urls: asyncio.ensure_future(article_handler(url, session, pool)) await asyncio.sleep(30 ) async def main (loop ): pool = await aiomysql.create_pool( host='' , port=3306 , user='root' , password='123456' , db='test1' , loop=loop, charset='utf8' , autocommit=True ) async with aiohttp.ClientSession() as session: html = await fetch(start_url, session) seen_urls.add(start_url) extract_urls(html) asyncio.ensure_future(consumer(pool)) if __name__ == '__main__' : loop = asyncio.get_event_loop() asyncio.ensure_future(main(loop)) loop.run_forever()