操作系统发展史

穿孔卡片

一个计算机房,只能被一个穿孔卡片使用

缺点:

  • CPU利用率低

联机批处理

支持多用户去使用一个计算机房

脱机批处理系统

高速磁盘:

  • 提高文件的读取速度

优点:

  • 提高CPU的利用率

多道技术(基于单核情况下研究)

  • 单道:多个使用CPU是串行
  • 多道:
    • 空间上的复用:一个CPU可以提供给多个用户去使用
    • 时间上的复用:切换 + 保存状态

时间上复用说明:

(1)若CPU遇到IO操作,会立即将当前执行程序CPU使用权断开
优点:CPU的利用率高

(2)若一个程序使用CPU的时间过长,会立即将当前执行程序CPU使用权断开

缺点:程序的执行率降低

并发与并行

并发:指的是看起来像同时运行,多个程序不停地切换 + 保存状态

并行:真实意义上的同时运行

进程

程序与进程:

  • 程序:一堆代码
  • 进程:一堆代码运行的过程

进程调度:

当代操作系统使用的是:时间片轮转法 + 分级反馈队列

1、先来先服务调度:

a, b程序, 若a程序先来, 先占用CPU

缺点:程序a先使用,程序b必须等待程序a使用CPU结束后才能使用

2、短作业优先调度:

a, b程序,谁的用时短,先优先调度使用cpu

缺点:若程序a使用时间最长,有N个程序使用时间短,必须等待所有用时短的程序结束后才能使用

3、时间片轮转法

CPU执行的时间1秒中,加载N个程序。要将1秒钟等分N个时间片

4、分级反馈队列

将执行优先分为多层级别

同步异步阻塞非阻塞

进程的三个状态:

  • 就绪态:所有程序创建时都会进入就绪态,准备调度
  • 运行态:调度后的进程,进入运行态
  • 阻塞态:凡是遇到IO操作的进程,都会进入阻塞态;若IO结束必须重新进入就绪态

同步与异步

指的是提交任务的方式

  • 同步:若有两个任务需要提交,在提交第一个任务时,必须等待该任务执行结束后,才能继续提交并执行第二个任务。
  • 异步:若有两个任务需要提交,在提交第一个任务时,不需要原地等待,立即可以提交并执行第二个任务。

阻塞与非阻塞

  • 阻塞:遇到IO一定会阻塞
  • 非阻塞:指的是就绪态、运行态

最大化提高CPU的使用率:尽可能减少不必要的IO操作

创建进程

进程的创建

windows中创建子进程,会将当前父进程代码重新加载执行一次

在Linux/mac中,会将当前父进程重新拷贝一份,再去执行

创建子进程两种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from multiprocessing import Process
import time

# 定义一个任务
def task(name):
print(f'{name}的任务开始执行')
time.sleep(1)
print(f'{name}的任务已经结束')


if __name__ == '__main__':
p = Process(target=task, args=('cwz',))
p.start()
print('主进程')

'''
主进程
cwz的任务开始执行
cwz的任务已经结束
'''

# 自定义一个类,并继承Process
class MyProcess(Process):

# 父类方法
def run(self):
print('开始执行任务')
time.sleep(1)
print('结束执行任务')

if __name__ == '__main__':
p = MyProcess()
p.start()
print('主进程')

'''
主进程
开始执行任务
结束执行任务
'''

join方法的使用

用来告诉操作系统,让子进程结束后再结束父进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing import Process
import time

def task(name):
print(f'{name} start...')
time.sleep(2)
print(f'{name} over...')


if __name__ == '__main__':
p = Process(target=task, args=('neo', ))
p.start() # 告诉操作系统,开启子进程
p.join() # 告诉操作系统,结束子进程后,父进程再结束
print('主进程')


'''
neo start...
neo over...
主进程
'''

进程间数据是相互隔离的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from multiprocessing import Process
import time

x = 100
def func():
print('执行func函数')
global x
x = 200


if __name__ == '__main__':
p = Process(target=func)
p.start()
print(x) # 不能修改x的值
print('主进程')


'''
100
主进程
执行func函数
'''

进程间数据相互隔离:
主进程与子进程间会有各自的名称空间

进程对象的属性

  • p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
  • p.name:进程的名称
  • p.pid:进程的pid
  • p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
  • p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from multiprocessing import Process
from multiprocessing import current_process
import time
import os

def task(name):
# current_process().pid 获取子进程号
print(f'{name} start...', current_process().pid)
time.sleep(2)
print(f'{name} over...', current_process().pid)


if __name__ == '__main__':
p = Process(target=task, args=('neo', ))
p.start()
print('主进程', os.getpid())
print('主主进程', os.getppid())


'''
主进程 12576
主主进程 12476
neo start... 7772
neo over... 7772
'''

进程号回收的两种条件:

  • join, 可以回收子进程与主进程
  • 主进程正常结束,子进程与主进程也会回收

主进程这里指的是python解释器

主主进程指的是pycharm

p.is_alive 可以查看子进程存活状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from multiprocessing import Process
from multiprocessing import current_process
import time
import os

def task(name):
# current_process().pid 获取子进程号
print(f'{name} start...', current_process().pid)
time.sleep(2)
print(f'{name} over...', current_process().pid)


if __name__ == '__main__':
p = Process(target=task, args=('neo', ))
p.start()
# p.join()

print(p.is_alive())
p.terminate() # 直接告诉操作系统,终止子进程
time.sleep(1)
print(p.is_alive()) # 判断子进程是否存活
print('主进程', os.getpid())
print('主主进程', os.getppid())


'''
True
False
主进程 10708
主主进程 12476
'''

僵尸进程和孤儿进程

僵尸进程

指的是子进程已经结束,但PID号还存在,未销毁

僵尸进程会占用PID号,占用操作系统资源

孤儿进程

指的是子进程还在执行,但父进程意外结束。

操作系统内部优化机制:会自动回收没有父的子进程

守护进程

指的是主进程结束后,该主进程产生的所有子进程跟着结束,并回收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Process
from multiprocessing import current_process
import time

def task(name):
print(f'{name} start...', current_process().pid)
time.sleep(3)
print(f'{name} over...', current_process().pid)
print('子进程')

if __name__ == '__main__':
p = Process(target=task, args=('cwz', ))
p.daemon = True # True表示该进程是守护进程
p.start()
print('主进程')


'''
主进程
'''

TCP协议套接字,服务端实现接收客户端的连接并发

多进程实现

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from multiprocessing import Process
import socket


def task(conn, addr):
while True:
try:
data = conn.recv(1024)
if not data:
break
print(addr)
print(data.decode('utf-8'))
conn.send(data.upper())

except Exception:
break


if __name__ == '__main__':
server = socket.socket()
server.bind(('127.0.0.1', 9999))
server.listen(5)
while True:

conn, addr = server.accept()
print(addr)
p = Process(target=task, args=(conn, addr))
p.start()

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import socket

client = socket.socket()
client.connect(('127.0.0.1', 9999))

while True:
send_msg = input('客户端:')
if send_msg == 'q':
break

client.send(send_msg.encode('utf-8'))

data = client.recv(1024)
print(data.decode('utf-8'))

多线程实现

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import socket
from threading import Thread

server = socket.socket()
server.bind(('127.0.0.1', 9999))
server.listen(5)


def task(conn):
while True:
data = conn.recv(1024)
if len(data) == 0:
continue

print(data.decode('utf-8'))

conn.send(data.upper())


if __name__ == '__main__':
while True:
conn, addr = server.accept()
print(addr)

t = Thread(target=task, args=(conn,))
t.start()

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import socket

client = socket.socket()
client.connect(('127.0.0.1', 9999))

while True:
send_msg = input('客户端:')
if send_msg == 'q':
break

client.send(send_msg.encode('utf-8'))

data = client.recv(1024)
print(data.decode('utf-8'))

进程互斥锁

多进程同时抢购余票

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 并发运行,效率高,但竞争写同一文件,数据写入错乱
# data.json文件内容为 {"ticket_num": 1}
import json
import time
from multiprocessing import Process


def search(user):
with open('data.json', 'r', encoding='utf-8') as f:
dic = json.load(f)
print(f'用户{user}查看余票,还剩{dic.get("ticket_num")}...')


def buy(user):
with open('data.json', 'r', encoding='utf-8') as f:
dic = json.load(f)

time.sleep(0.1)
if dic['ticket_num'] > 0:
dic['ticket_num'] -= 1
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(dic, f)
print(f'用户{user}抢票成功!')

else:
print(f'用户{user}抢票失败')


def run(user):
search(user)
buy(user)


if __name__ == '__main__':
for i in range(10): # 模拟10个用户抢票
p = Process(target=run, args=(f'用户{i}', ))
p.start()

使用锁来保证数据安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# data.json文件内容为 {"ticket_num": 1}
import json
import time
from multiprocessing import Process, Lock


def search(user):
with open('data.json', 'r', encoding='utf-8') as f:
dic = json.load(f)
print(f'用户{user}查看余票,还剩{dic.get("ticket_num")}...')


def buy(user):
with open('data.json', 'r', encoding='utf-8') as f:
dic = json.load(f)

time.sleep(0.2)
if dic['ticket_num'] > 0:
dic['ticket_num'] -= 1
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(dic, f)
print(f'用户{user}抢票成功!')

else:
print(f'用户{user}抢票失败')


def run(user, mutex):
search(user)
mutex.acquire() # 加锁
buy(user)
mutex.release() # 释放锁


if __name__ == '__main__':
# 调用Lock()类得到一个锁对象
mutex = Lock()

for i in range(10): # 模拟10个用户抢票
p = Process(target=run, args=(f'用户{i}', mutex))
p.start()

进程互斥锁:

  • 让并发变成串行,牺牲了执行效率,保证了数据安全
  • 在程序并发时,需要修改数据使用

进程队列

队列

队列遵循的是先进先出

队列:相当于内存中一个队列空间,可以存放多个数据,但数据的顺序是由先进去的排在前面。

q.put() 添加数据

q.get() 取数据,遵循队列先进先出

q.get_nowait() 获取队列数据, 队列中没有就会报错

q.put_nowait 添加数据,若队列满了也会报错

q.full() 查看队列是否满了

q.empty() 查看队列是否为空

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from multiprocessing import Queue

# 调用队列类,实例化队列对象
q = Queue(5) # 队列中存放5个数据

# put添加数据,若队列里的数据满了就会卡住
q.put(1)
print('进入数据1')
q.put(2)
print('进入数据2')
q.put(3)
print('进入数据3')
q.put(4)
print('进入数据4')
q.put(5)
print('进入数据5')

# 查看队列是否满了
print(q.full())

# 添加数据, 若队列满了也会报错
q.put_nowait(6)

# q.get() 获取的数据遵循先进先出
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# print(q.get())
print(q.get_nowait()) # 获取队列数据, 队列中没有就会报错

# 判断队列是否为空
print(q.empty())
q.put(6)
print('进入数据6')

进程间通信

IPC(Inter-Process Communication)

进程间数据是相互隔离的,若想实现进程间通信,可以利用队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from multiprocessing import Process, Queue

def task1(q):
data = 'hello 你好'
q.put(data)
print('进程1添加数据到队列')


def task2(q):
print(q.get())
print('进程2从队列中获取数据')



if __name__ == '__main__':
q = Queue()

p1 = Process(target=task1, args=(q, ))
p2 = Process(target=task2, args=(q, ))
p1.start()
p2.start()
print('主进程')

生产者与消费者

在程序中,通过队列生产者把数据添加到队列中,消费者从队列中获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from multiprocessing import Process, Queue
import time


# 生产者
def producer(name, food, q):
for i in range(10):
data = food, i
msg = f'用户{name}开始制作{data}'
print(msg)
q.put(data)
time.sleep(0.1)

# 消费者
def consumer(name, q):
while True:
data = q.get()
if not data:
break

print(f'用户{name}开始吃{data}')


if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=('neo', '煎饼', q))
p2 = Process(target=producer, args=('wick', '肉包', q))

c1 = Process(target=consumer, args=('cwz', q))
c2 = Process(target=consumer, args=('woods', q))

p1.start()
p2.start()

c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
print('主')

线程

线程的概念

进程与线程都是虚拟单位

进程:资源单位

线程:执行单位

开启一个进程,一定会有一个线程,线程才是真正执行者

开启进程:

  • 开辟一个名称空间,每开启一个进程都会占用一份内存资源
  • 会自带一个线程

开启线程:

  • 一个进程可以开启多个线程
  • 线程的开销远小于进程

注意:线程不能实现并行,线程只能实现并发,进程可以实现并行

线程的两种创建方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from threading import Thread
import time

# 创建线程方式1
def task():
print('线程开启')
time.sleep(1)
print('线程结束')

if __name__ == '__main__':
t = Thread(target=task)
t.start()


# 创建线程方式2
class MyThread(Thread):
def run(self):
print('线程开启...')
time.sleep(1)
print('线程结束...')


if __name__ == '__main__':
t = MyThread()
t.start()

线程对象的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from threading import Thread
from threading import current_thread
import time

def task():
print(f'线程开启{current_thread().name}')
time.sleep(1)
print(f'线程结束{current_thread().name}')


if __name__ == '__main__':
t = Thread(target=task)
print(t.isAlive())
# t.daemon = True
t.start()
print(t.isAlive())

线程互斥锁

线程之间数据是共享的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from threading import Thread
from threading import Lock
import time

mutex = Lock()
n = 100

def task(i):
print(f'线程{i}启动')
global n
mutex.acquire()
temp = n
time.sleep(0.1)
n = temp - 1
print(n)
mutex.release()

if __name__ == '__main__':
t_l = []
for i in range(100):
t = Thread(target=task, args=(i, ))
t_l.append(t)
t.start()

for t in t_l:
t.join()

print(n)

GIL全局解释锁

Python代码的执行由Python虚拟机(也叫解释器主循环)来控制。Python在设计之初就考虑到要在主循环中,同时只有一个线程在执行。虽然 Python 解释器中可以“运行”多个线程,但在任意时刻只有一个线程在解释器中运行。

对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。

全局解释锁特点:

  1. GIL本质上是一个互斥锁。
  2. GIL是为了阻止同一个进程内多个进程同时执行(并行)
    • 单个进程下的多个线程无法实现并行,但能实现并发
  3. 这把锁主要是因为Cpython的内存管理不是线程安全的
    • 保证线程在执行任务时不会被垃圾回收机制回收
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from threading import Thread
import time

num = 100


def task():
global num
num2 = num
time.sleep(1)
num = num2 - 1
print(num)


for line in range(100):
t = Thread(target=task)
t.start()

# 这里的运行结果都是99, 加了IO操作,所有线程都对num进行了减值操作,由于GIL锁的存在,没有修改成功,都是99

在多线程环境中,Python 虚拟机按以下方式执行:

  1. 设置 GIL;
  2. 切换到一个线程去运行;
  3. 运行指定数量的字节码指令或者线程主动让出控制(可以调用 time.sleep(0));
  4. 把线程设置为睡眠状态;
  5. 解锁 GIL;
  6. 再次重复以上所有步骤。

多线程的作用

1、计算密集型, 有四个任务,每个任务需要10s

单核:

  • 开启进程
    • 消耗资源过大
    • 4个进程: 40s
  • 开启线程
    • 消耗资源远小于进程
    • 4个线程: 40s

多核:

  • 开启进程
    • 并行执行, 效率比较高
    • 4个进程: 10s
  • 开启线程
    • 并发执行,执行效率低
    • 4个线程: 40s

2、IO密集型, 四个任务, 每个任务需要10s

单核:

  • 开启进程
    • 消耗资源过大
    • 4个进程: 40s
  • 开启线程
    • 消耗资源远小于进程
    • 4个线程: 40s

多核:

  • 开启进程
    • 并行执行, 效率小于多线程, 但是遇到IO会立马切换CPU的执行权限
    • 4个进程: 40s + 开启进程消耗的额外时间
  • 开启线程
    • 并发执行,执行效率高于多进程
    • 4个线程: 40s

测试计算密集型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from threading import Thread
from multiprocessing import Process
import time
import os


# 计算密集型
def work1():
number = 0
for line in range(100000000):
number += 1

# IO密集型
def work2():
time.sleep(2)


if __name__ == '__main__':
# 测试计算密集型
print(os.cpu_count()) # 4核cpu

start = time.time()
list1 = []
for line in range(6):

p = Process(target=work1) # 程序执行时间8.756593704223633
# p = Thread(target=work1) # 程序执行时间31.78555393218994
list1.append(p)
p.start()
for p in list1:
p.join()
end = time.time()
print(f'程序执行时间{end - start}')

IO密集型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from threading import Thread
from multiprocessing import Process
import time
import os

# 计算密集型
def work1():
number = 0
for line in range(100000000):
number += 1

# IO密集型
def work2():
time.sleep(1)


if __name__ == '__main__':
# 测试计算密集型
print(os.cpu_count()) # 4核cpu

start = time.time()
list1 = []
for line in range(100):

# p = Process(target=work2) # 程序执行时间15.354223251342773
p = Thread(target=work2) # 程序执行时间1.0206732749938965
list1.append(p)
p.start()
for p in list1:
p.join()
end = time.time()
print(f'程序执行时间{end - start}')

结论:

  • 在计算密集型的情况下, 使用多进程
  • 在IO密集型的情况下, 使用多线程
  • 高效执行多个进程, 内有多个IO密集型程序,使用多进程 + 多线程

死锁现象

指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,如无外力作用,它们都无法推进下去.此时称系统处于死锁状态

以下就是死锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from threading import Thread, Lock
from threading import current_thread
import time

mutex_a = Lock()
mutex_b = Lock()


class MyThread(Thread):

def run(self):
self.func1()
self.func2()

def func1(self):
mutex_a.acquire()
print(f'用户{self.name}抢到锁a')
mutex_b.acquire()
print(f'用户{self.name}抢到锁b')
mutex_b.release()
print(f'用户{self.name}释放锁b')
mutex_a.release()
print(f'用户{self.name}释放锁a')

def func2(self):
mutex_b.acquire()
print(f'用户{self.name}抢到锁b')
time.sleep(1)
mutex_a.acquire()
print(f'用户{self.name}抢到锁a')
mutex_a.release()
print(f'用户{self.name}释放锁a')
mutex_b.release()
print(f'用户{self.name}释放锁b')


for line in range(10):
t = MyThread()
t.start()


'''
用户Thread-1抢到锁a
用户Thread-1抢到锁b
用户Thread-1释放锁b
用户Thread-1释放锁a
用户Thread-1抢到锁b
用户Thread-2抢到锁a
'''
# 一直等待

递归锁

用于解决死锁问题

RLock: 比喻成万能钥匙,可以提供给多个人使用

但是第一个使用的时候,会对该锁做一个引用计数

只有引用计数为0, 才能真正释放让一个人使用

上面的例子中用RLock代替Lock, 就不会发生死锁现象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from threading import Thread, Lock, RLock
from threading import current_thread
import time

# mutex_a = Lock()
# mutex_b = Lock()

mutex_a = mutex_b = RLock()

class MyThread(Thread):

def run(self):
self.func1()
self.func2()

def func1(self):
mutex_a.acquire()
print(f'用户{self.name}抢到锁a')
mutex_b.acquire()
print(f'用户{self.name}抢到锁b')
mutex_b.release()
print(f'用户{self.name}释放锁b')
mutex_a.release()
print(f'用户{self.name}释放锁a')

def func2(self):
mutex_b.acquire()
print(f'用户{self.name}抢到锁b')
time.sleep(1)
mutex_a.acquire()
print(f'用户{self.name}抢到锁a')
mutex_a.release()
print(f'用户{self.name}释放锁a')
mutex_b.release()
print(f'用户{self.name}释放锁b')


for line in range(10):
t = MyThread()
t.start()

信号量

互斥锁: 比喻成一个家用马桶, 同一时间只能让一个人去使用

信号比喻成公测多个马桶: 同一时间可以让多个人去使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from threading import Semaphore
from threading import Thread
from threading import current_thread
import time

sm = Semaphore(5)

def task():
sm.acquire()
print(f'{current_thread().name}执行任务')
time.sleep(1)
sm.release()


for i in range(20):
t = Thread(target=task)
t.start()

线程队列

线程Q: 就是线程队列 FIFO

  • 普通队列: 先进先出 FIFO
  • 特殊队列: 后进先出 LIFO
  • 优先级队列: 若传入一个元组,会依次判断参数的ASCII的数值大小
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import queue

# 普通的线程队列: 遵循先进先出
q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)

print(q.get()) # 1
print(q.get()) # 2


# LIFO队列 后进先出
q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)
print(q.get()) # 3


# 优先级队列:根据参数内
q = queue.PriorityQueue()
q.put((4, '我'))
q.put((2, '你'))
q.put((3, 'ta'))
print(q.get()) # (2, '你')

Event事件

用来控制线程的执行

出现e.wait(),就会把这个线程设置为False,就不能执行这个任务;

只要有一个线程出现e.set(),就会告诉Event对象,把有e.wait的用户全部改为True,剩余的任务就会立马去执行。由一些线程去控制另一些线程,中间通过Event。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from threading import Event
from threading import Thread
import time
# 调用Event实例化出对象
e = Event()
#
# # 若该方法出现在任务中,则为False,阻塞
# e.wait() # False
# # 若该方法出现在任务中,则将其他线程的False改为True,进入就绪态和运行态
# e.set() # True


def light():
print('红灯亮...')
time.sleep(5)
# 应该发出信号,告诉其他线程准备执行
e.set() # 将car中的False变为True
print('绿灯亮...')


def car(name):
print('正在等红灯...')
# 让所有汽车任务进入阻塞态
e.wait() # False
print(f'{name}正在加速飘逸...')


# 让一个light线程控制多个car线程
t = Thread(target=light)
t.start()

for i in range(10):
t = Thread(target=car, args=(f'汽车{i}号', ))
t.start()

进程池与线程池

  1. 进程池与线程池是用来控制当前程序允许创建(进程/线程)的数量
  2. 作用:保证在硬件允许的范围内创建(进程/线程)的数量

线程池使用一:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from concurrent.futures import ThreadPoolExecutor
import time

pool = ThreadPoolExecutor(5) # 5代表只能开启5个进程, 不加默认使用cpu的进程数

# ThreadPoolExecutor(5) # 5代表只能开启5个线程

# pool.submit() #异步提交任务, 括号里传函数地址


def task():
print('线程任务开始了...')
time.sleep(1)
print('线程任务结束了...')


for line in range(5):
pool.submit(task)

使用二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from concurrent.futures import ThreadPoolExecutor
import time

pool = ThreadPoolExecutor(5) # 5代表只能开启5个进程, 不加默认使用cpu的进程数

# ThreadPoolExecutor(5) # 5代表只能开启5个线程

# pool.submit() #异步提交任务, 括号里传函数地址


def task():
print('线程任务开始了...')
time.sleep(1)
print('线程任务结束了...')
return 123


# 回调函数
def call_back(res):
print(type(res))
res2 = res.result() # 注意:赋值操作不要与接收的res同名
print(res2)


for line in range(5):
pool.submit(task).add_done_callback(call_back)

pool.shutdown() 会让所有线程池的任务结束后,才往下执行代码

协程

  • 进程: 资源单位
  • 线程: 执行单位
  • 协程: 在单线程下实现并发

注意: 协程不是操作系统资源,目的是让单线程实现并发

协程目的

  • 操作系统:使用多道技术,切换 + 保存状态,一个是遇到IO, 另一个是CPU执行时间过长
  • 协程:通过手动模拟操作系统 “多道计数”, 实现 切换 + 保存状态
    • 手动实现,遇到IO切换,欺骗操作系统误以为没有IO操作
    • 单线程时,遇到IO,就切换 + 保存状态
    • 单线程时,对于计算密集型,来回切换 + 保存状态反而效率更低

优点:在IO密集型的情况下,会提高效率

缺点:若在计算密集型的情况下,来回切换,反而效率更低

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import time

def func1():
for i in range(10000000):
i+1

def func2():
for i in range(10000000):
i+1


start = time.time()
func1()
func2()
stop = time.time()
print(stop - start) # 1.0312113761901855


# 基于yield实现并发 在计算密集型的情况下效率更低

def func1():
while True:
10000000+1
yield

def func2():
g = func1()
for i in range(10000000):
i+1
next(g) # 每次执行next相当于切换到func1下面


start = time.time()
func2()
stop = time.time()
print(stop - start) # 1.3294126987457275

gevent

gevent模块介绍

gevent是一个第三方模块,可以帮你监听IO操作,并切换

使用gevent的目的:在单线程下实现,遇到IO就会 保存状态 + 切换

gevent使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import time
from gevent import monkey

monkey.patch_all() # 可以监听该程序下所有的IO操作
from gevent import spawn, joinall # 用于做切换 + 保存状态


def func1():
print('1')
time.sleep(1) # IO操作


def func2():
print('2')
time.sleep(3)


def func3():
print('3')
time.sleep(5)

start = time.time()

s1 = spawn(func1)
s2 = spawn(func2)
s3 = spawn(func3)

s1.join() # 发送信号,相当于等待自己(在单线程的情况下)
s2.join()
s3.join()

# joinall((s1, s2, s3)) # 一个个执行很麻烦,可以用joinall把这些全部装进去

end = time.time()
print(end - start) # 5.006161451339722

TCP服务端socket套接字实现协程

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from gevent import monkey
from gevent import spawn
import socket

monkey.patch_all()
server = socket.socket()
server.bind(('127.0.0.1', 9999))
server.listen(5)

def task(conn):
while True:
try:
data = conn.recv(1024)
if len(data) == 0:
break
print(data.decode('utf-8'))
send_data = data.upper()
conn.send(send_data)

except Exception:
break

conn.close()


def server2():
while True:
conn, addr = server.accept()
print(addr)
spawn(task, conn)


if __name__ == '__main__':
s = spawn(server2)
s.join()

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import socket
from threading import Thread, current_thread

def client():
client = socket.socket()
client.connect(('127.0.0.1', 9999))

number = 0
while True:
send_data = f'{current_thread().name} {number}'
client.send(send_data.encode('utf-8'))

data = client.recv(1024)
print(data.decode('utf-8'))
number += 1


for i in range(400):
t = Thread(target=client)
t.start()