职位管理系统 创建一个可以管理职位的后台 安装
创建项目
1 django-admin startproject startproject
初始化:
1 2 3 python manage.py makemigrations python manage.py migrate
启动项目:
1 python manage.py runserver 0.0 .0 .0 :8000
创建管理员账号:
1 python manage.py createsuperuser
打开django的管理后台:http://127.0.0.1:8000/admin
输入用户名和密码:
招聘系统里面的职位管理
管理员能够发布职位
匿名用户(候选人)能够浏览职位
匿名用户能够投递职位
职位管理系统-建模
职位名称、类别、工作地点、职位职责、职位要求、发布人、发布日期、修改日期
创建应用
1 2 python manage.py startapp jobs
jobs应用的model:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from django.db import modelsfrom django.contrib.auth.models import UserJobTypes = [ (0 , "技术类" ), (1 , "产品类" ), (2 , "运营类" ), (3 , "设计类" ) ] Cities = [ (0 , "北京" ), (1 , "上海" ), (2 , "深圳" ) ] class Job (models.Model): job_type = models.SmallIntegerField(blank=False , choices=JobTypes, verbose_name="职位列表" ) job_name = models.CharField(max_length=250 , blank=False , verbose_name="职位名称" ) job_city = models.SmallIntegerField(blank=False , choices=Cities, verbose_name="工作地点" ) job_responsibility = models.TextField(max_length=1024 , verbose_name="职位职责" ) job_requirement = models.TextField(max_length=1024 , blank=False , verbose_name="职位要求" ) creator = models.ForeignKey(User, verbose_name="创建人" , null=True , on_delete=models.SET_NULL) create_date = models.DateTimeField(verbose_name="创建时间" ) modify_date = models.DateTimeField(verbose_name="修改时间" )
在admin中注册:
1 2 3 4 5 6 7 from django.contrib import adminfrom jobs.models import Jobadmin.site.register(Job)
同步数据库
1 2 python manage.py makemigrations python manage.py migrate
快速迭代完善应用 我们希望创建时间、创建人有一个默认值
在job应用的Model类中添加默认值,用default
来指代
1 2 create_date = models.DateTimeField(verbose_name="创建时间" , default=datetime.now) modify_date = models.DateTimeField(verbose_name="修改时间" , default=datetime.now)
在admin.py中更改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from django.contrib import adminfrom jobs.models import Jobclass JobAdmin (admin.ModelAdmin): exclude = ('creator' , 'create_date' , 'modify_date' ) list_display = ('job_name' , 'job_type' , 'job_city' , 'creator' , 'create_date' , 'modify_date' ) def save_model (self, request, obj, form, change ): obj.creator = request.user super ().save_model(request, obj, form, change) admin.site.register(Job, JobAdmin)
效果如下:
让匿名用户可以浏览职位列表页 职位列表展示
列表页是独立页面,使用自定义的页面
添加如下页面
匿名用户可以访问
Django的自定义模板
Django 模板包含了输出的 HTML 页面的静态部分的内容
模板里面的动态内容在运行时被替换
在 views 里面指定每个 URL 使用哪个模板来渲染页面
模版继承与块(Template Inheritance & Block)
模板继承允许定义一个骨架模板,骨架包含站点上的公共元素(如头部导航,尾部链接)
骨架模板里面可以定义 Block 块,每一个 Block 块都可以在继承的页面上重新定义/覆盖
一个页面可以继承自另一个页面
定义一个匿名访问页面的基础页面,基础页面中定义页头
添加页面 job/templates/base.html
1 2 3 4 5 6 7 8 <h1 style ="margin: auto;width: 50%;" > 匠国科技开放职位</h1 > <p > </p > {% block content %} {% endblock %}
添加职位列表页模板-继承自base.html
joblist.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 {% extends 'base.html' %} {% block content %} 终于等到你,期待加入我们,用技术探索一个新世界 {% if job_list %} <ul > {% for job in job_list %} <li > {{job.type_name}} <a href ="/job/{{ job.id }}/" style ="color:blue" > {{ job.job_name }}</a > {{job.city_name}} </li > {% endfor %} </ul > {% else %} <p > No jobs are available.</p > {% endif %} {% endblock %}
使用 extends 指令来表示,这个模板继承自 base.html 模板
Block content 里面重新定义了 content 这个块
变量:运行时会被替换, 变量用 表示,变量是 views 层取到内容后 填充到模板中的参数
Tag:控制模板的逻辑,包括 if, for, block 都是 tab
职位列表的视图
jobs/views.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from django.shortcuts import render, HttpResponsefrom django.template import loaderfrom jobs.models import Job, Cities, JobTypesdef job_list (request ): job_list = Job.objects.order_by('job_type' ) template = loader.get_template('joblist.html' ) context = {"job_list" : job_list} for job in job_list: job.city_name = Cities[job.job_city][1 ] job.job_type = JobTypes[job.job_type][1 ] return HttpResponse(template.render(context))
视图里面获取数据,把数据传入到模板中
使用 Django 的 model 来获取数据,数据按照职位类型排序
模板渲染指定了使用前面定义的 joblist.html,把 一个含有 job_list 这个 key 的 map 传入到模板
添加 URL 路径映射
让添加的页面,能够通过 URL 访问到
/joblist/ 的路径访问到 views 里面定义的 joblist 视图
这个视图是一个 Method View,方法表示一个视图
jobs/urls.py
1 2 3 4 5 6 7 from django.conf.urls import urlfrom jobs import viewsurlpatterns = [ url(r"^joblist/" , views.job_list, name="joblist" ), ]
应用(app)的所有 URL 定义加入到项目(recruitment)中
收到请求时,先走 jobs 应用下面的 URL 路由找页面,然后再按照 admin/ 路径匹配请求 URL
recruitment/urls.py
1 2 3 4 5 6 7 8 from django.contrib import adminfrom django.urls import pathfrom django.conf.urls import url, includeurlpatterns = [ url(r"^" , include("jobs.urls" )), path('admin/' , admin.site.urls), ]
模板添加定义,View 页面添加完,URL 中也定义路由之后,再访问页面:http://127.0.0.1:8000/joblist/
职位详情页面
前面列表页,每个职位上有一个链接,指向职位详情页
同样添加如下 3 块内容:
详情页模板 – 定义内容呈现(Template)
详情页视图 – 获取数据逻辑 (View)
定义 URL 路由
jobs/templates/job.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 {% extends 'base.html' %} {% block content %} <div style ="margin:auto; width:50%;" > {% if job %} <div class ="position_name" z > <h2 > 岗位名称:{{job.job_name}} </h2 > 城市: {{job.city_name}} <p > </p > </div > <hr > <div class ="position_responsibility" style ="width:600px;" > <h3 > 岗位职责:</h3 > <pre style ="font-size:16px" > {{job.job_responsibility}} </pre > </div > <hr > <div class ="position_requirement" style ="width:600px; " > <h3 > 任职要求:</h3 > <pre style ="font-size:16px" > {{job.job_requirement}} </pre > </div > <div class ="apply_position" > <input type ="button" class ="btn btn-primary" style ="width:120px;" value ="申请" onclick ="location.href='/resume/add/?apply_position={{job.job_name}}&city={{job.city_name}}'" /> </div > {% else %} <p > 职位不存在</p > {% endif %} {% endblock %} </div >
在 views.py, urls.py 中分别定义了 View 视图,以及 URL 的路由规则 /job/job_id 来访问详情
jobs/views.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from django.shortcuts import render, HttpResponsefrom django.template import loaderfrom django.http import Http404from jobs.models import Job, Cities, JobTypesdef job_list (request ): job_list = Job.objects.order_by('job_type' ) template = loader.get_template('joblist.html' ) context = {"job_list" : job_list} for job in job_list: job.city_name = Cities[job.job_city][1 ] job.job_type = JobTypes[job.job_type][1 ] return HttpResponse(template.render(context)) def detail (request, job_id ): try : job = Job.objects.get(pk=job_id) job.city_name = Cities[job.job_city][1 ] except Job.DoesNotExist: raise Http404("Job does not exist" ) return render(request, 'job.html' , {'job' : job})
jobs/urls.py
1 2 3 4 5 6 7 8 9 from django.conf.urls import urlfrom jobs import viewsurlpatterns = [ url(r"^joblist/" , views.job_list, name="joblist" ), url(r"^job/(?P<job_id>\d+)/$" , views.detail, name="detail" ), ]
招聘评估系统 产品迭代思想 产品的需求背景:
为了解决招聘面试的过程中线下面试管理效率低,面试结果不方便跟踪的痛点
以校园招聘的面试为例,做MVP产品迭代
线下面试流程:
准备简历 & 面试评估表
HR:发出面试评估表模板(Word)到一面面试官 (邮箱发出来)
一面面试官:登录邮箱下载 Word 模板,每个学生拷贝一份
按学生名字命名文件, 录入学生名字,学校,电话,学历等
第一轮面试
一面官:每面完一个学生,填写 Word 格式的评估表中
一面官:面完一天的学生后,批量把 Word 文档 Email 到 HR
HR:晚上查收下载评估表,汇总结果到 Excel,通知学生复试
HR:同时把已经通知复试的学生信息,发送到技术二面复试官
第二轮面试和 HR 面试
二面官:查收 Email,下载 Word 格式的一面评估记录
二面官:复试后追加复试的评估到 Word 记录中,邮件到 HR
类似如上步骤的 HR 复试
迭代思维与 MVP 产品规划方法(OOPD)
MVP:minimum viable product, 最小可用产品
OOPD:Online & Offline Product Development, 线上线下相结合的产品开发方法
内裤原则:MVP 包含了产品的轮廓,核心的功能,让业务可以运转
优先线下:能够走线下的,优先走线下流程,让核心的功能先跑起来,快速做用户验证和方案验证
MVP 的核心:忽略掉一切的细枝末节,做合适的假设和简化,使用最短的时间开发出来
迭代思维是最强大的产品思维逻辑,互联网上唯快不破的秘诀
优秀的工程师和优秀的产品经理,善于找出产品 MVP 的功能范围
如何找出产品的 MVP 功能范围?
使用这些问题来帮助确定范围
产品的核心目标是什么? 核心用户是谁?核心的场景是什么?
产品目标都需要在线上完成或者呈现吗?
最小 MVP 产品要做哪些事情,能够达到业务目标?
哪些功能不是在用户流程的核心路径上的?
做哪些简化,和假设,能够在最短的时间交付产品,并且可以让业务流程跑起来?
在产品中使用产品迭代思想 招聘面试系统核心的目标:这个产品是为了提高面试过程的效率,让面试过程和结果可以跟踪。围绕着核心目标,我们只需要两个功能:
能够维护候选人的信息,让候选人的信息进到系统里
能够填写面试反馈
第一个功能维护候选人的信息有两种方式实现:
HR通过已有的excel表导入信息
HR手工输入信息
根据MVP产品迭代思想,可以快速开发核心功能。
数据建模和企业级数据库设计原则 在招聘面试的系统中,有两个主要的模型,一个是用户的信息,包括面试官,另一个是候选人信息和面试评估反馈。候选人信息和面试评估反馈通常会放在两张表中,对于一个MVP版本来说,为了能够快速开发,我们把候选人信息和面试评估反馈放在一张表中,等到后续产品有权限控制时,我们在分成两张表。
对候选人信息做一个建模:
企业级数据库设计原则
包括3 个基础原则,4 个扩展性原则,3 个完备性原则
3个基础原则:
结构清晰:表名、字段命名没有歧义,能一眼看懂
唯一职责:一表一用,领域定义清晰,不存储无关信息,相关数据在一张表中
主键原则:设计不带物理意义的主键;有唯一约束,确保幂等
4 个扩展性原则(影响系统的性能和容量):
长短分离:可以扩展,长文本独立存储;有合适的容量设计
冷热分离:当前数据与历史数据分离
索引完备:有合适索引方便查询
不使用关联查询:不使用一切的 SQL Join 操作,不做 2 个表或者更多表的关联查询
示例:查询商家每一个订单的金额
select s.shop_name, o.id as order_id, o.total_amount from shop s, order o where s.id = o.shop_id
3 个完备性原则:
完整性:保证数据的准确性和完整性,重要的内容都有记录
可追溯:可追溯创建时间,修改时间,可以逻辑删除
一致性原则:数据之间保持一致,尽可能避免同样的数据存储在不同表中
创建应用和模型,分组展示页面内容 创建应用
1 python manage.py startapp interview
注冊应用
在 settings.py 中添加 interview 应用
添加模型
在 interview/models.py 里面定义 Candidate 类
从Excel文件批量导入候选人数据 实现候选人数据导入
怎么样实现一个数据导入的功能最简洁
开发一个自定义的 Web 页面,让用户能够上传 excel/csv 文件
开发一个命令行工具,读取 excel/csv,再访问数据库写入 DB
从数据库的客户端,比如 MySQL 的客户端里面导入数据
Django 框架已经考虑到(需要使用到命令行的场景)
使用自定义的 django management 命令来导入数据
应用下面创建 management/commands 目录,
commands 目录下添加脚本,创建类,继承自 BaseCommand,实现命令行逻辑
命令行导入:python manage.py import_candidates —path /path/to/your/file.csv
候选人列表筛选和查询
能够按照名字、手机号码、学校来查询候选人信息
能够按照初试结果,复试结果,HR复试结果,面试官来筛选;能按照复试结果来排序
使用内置的search_field
属性来设置哪些可以搜索的字段,用list_filter
属性来设置做筛选、过滤的字段,用ordering
字段来设置字段的排序。
企业域账号集成
什么是目录服务,英文名是Directory Service,目录服务是一个提供资源服务的定位查找功能的存储系统。在软件工程里面一个目录是指一组名字和值的映射,它允许根据一个给定的名字来查找对应的值,以词典类似。目录可以有树状结构,典型的目录有域名、企业的组织架构,这些都可以使用目录服务来存储其中的信息。
OpenLDAP是开发的LDAP服务,Lightweight Directory Access Protocol,轻量级的目录访问协议
可以直接使用域账号登陆
不用手工添加账号、维护独立密码
可以集成 OpenLDAP/ActiveDirecotry
DN: 目录服务中的一个唯一的对象
CN=David,OU=Shanghai,DC=ihopeit,DC=com
Open LDAP服务搭建
1 2 docker run -d -p 389:389 -p 636:636 --name my_openldap --env LDAP_ORGANISATION="myhome" --env LDAP_DOMAIN="myhome.com" --env LDAP_ADMIN_PASSWORD="cwz123456" --detach osixia/openldap
参数解释:
配置LDAP组织者:–-env LDAP_ORGANISATION=”myhome”
配置LDAP域:–-env LDAP_DOMAIN=”myhome.com”
配置LDAP密码:–-env LDAP_ADMIN_PASSWORD=”cwz123456”
默认登录用户名:admin
1 2 docker run -d -p 81:80 -p 443:443 --name phpldapadmin_service --env PHPLDAPADMIN_LDAP_HOSTS=49.235.76.103 --link my_openldap:ldap-host --env PHPLDAPADMIN_LDAP_HOSTS=ldap-host --detach osixia/phpldapadmin
参数解释:
配置的Ldap地址:–-env PHPLDAPADMIN_LDAP_HOSTS=49.235.76.103
访问:https://49.235.76.103,
Login DN:cn=admin,dc=myhome,dc=com
password:cwz123456
django配置ldap
1 2 pip install django-python3-ldap
在settings文件中配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 LDAP_AUTH_URL = "ldap://49.235.76.103:389" LDAP_AUTH_USE_TLS = False LDAP_AUTH_SEARCH_BASE = "dc=myhome,dc=com" LDAP_AUTH_OBJECT_CLASS = "inetOrgPerson" LDAP_AUTH_USER_FIELDS = { "username" : "cn" , "first_name" : "givenName" , "last_name" : "sn" , "email" : "mail" , } LDAP_AUTH_USER_LOOKUP_FIELDS = ("username" ,) LDAP_AUTH_CLEAN_USER_DATA = "django_python3_ldap.utils.clean_user_data" LDAP_AUTH_CONNECTION_USERNAME = "admin" LDAP_AUTH_CONNECTION_PASSWORD = "cwz123456" AUTHENTICATION_BACKENDS = {"django_python3_ldap.auth.LDAPBackend" , 'django.contrib.auth.backends.ModelBackend' , }
在django后台用ldap账号登录,会自动同步到django
批量导入面试官信息、权限 1 2 python manage.py ldap_sync_users
这时查看django 后台管理就会有用户加进来
增加组 interviewer组,赋予查看和修改应聘者的权限。
增加组 hr组,赋予对应聘者操作和对工作操作的权限。
到处候选人的数据到CSV 增加自定义的数据操作菜单 (数据导出为 CSV)
需要对数据进行操作,比如导出,状态变更
定义按钮的实现逻辑(处理函数), 在 ModelAdmin 中注册函数到 actions
interview/admin.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 def export_model_as_csv (model_admin, request, queryset ): response = HttpResponse(content_type='text/csv' ) field_list = exportable_fields response['Content-Disposition' ] = 'attachment; filename=%s-list-%s.csv' % ( 'recruitment-candidates' , datetime.now().strftime('%Y-%m-%d-%H-%M-%S' ) ) writer = csv.writer(response) writer.writerow( [queryset.model._meta.get_field(f).verbose_name.title() for f in field_list], ) for obj in queryset: csv_line_values = [] for field in field_list: field_obj = queryset.model._meta.get_field(field) field_value = field_obj.value_from_object(obj) csv_line_values.append(field_value) writer.writerow(csv_line_values) return response export_model_as_csv.short_description = '导出为CSV文件' class CandidateAdmin (admin.ModelAdmin): exclude = ('creator' , 'created_date' , 'modified_date' ) actions = [export_model_as_csv] …………
增加日志记录 日志级别:
DEBUG: 调试
INFO: 常用的系统信息
WARNING: 小的告警,不影响主要功能
ERROR: 系统出现不可忽视的错误
CRITICAL: 非常严重的错误
在settings.py配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 LOGGING = { "version" : 1 , "disable_existing_logger" : False , "handlers" : { "console" : { "class" : "logging.StreamHandler" , }, }, "loggers" : { "django_python3_ldap" : { "handlers" : ["console" ], "level" : "DEBUG" , }, }, }
dictconfig是用一个字典形式的格式来定义日志记录的内容。
version 定义了日志记录的版本号,到目前为止,日志记录只有一个版本1
disable_existing_loggers 是否要禁用现在已有的其他logger,一般为False
有四个组件:Handlers、Loggers、Filters、Formmaters
Filters 是过滤器,可以定义一些列的处理链,可以把handlers/loggers放到Filters里
Handlers是日志处理器 对于每一条日志消息如何处理,记录到 文件,控制台,还是网络
Loggers 定义了日志的记录器,它里面定义了一个个的键值对。比如 上面定义了一个django_python3_ldap
的日志记录器,使用了这个名称作为记录的类,会往控制台输出。
Formmaters 定义日志文本记录的格式
完善后的日志配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 LOGGING = { "version" : 1 , "disable_existing_logger" : False , "formatters" : { "simple" : { 'format' : '%(asctime)s %(name)-12s %(lineno)d %(levelname)-8s %(message)s' } }, "handlers" : { "console" : { "class" : "logging.StreamHandler" , "formatter" : "simple" }, "mail_admins" : { "level" : "ERROR" , "class" : "django.utils.log.AdminEmailHandler" }, "file" : { "class" : "logging.FileHandler" , "formatter" : "simple" , "filename" : os.path.join(os.path.dirname(BASE_DIR), 'recruitment.admin.log' ) } }, "root" : { "handles" : ["console" , "file" ], "level" : "INFO" }, "loggers" : { "django_python3_ldap" : { "handlers" : ["console" ], "level" : "DEBUG" , }, }, }
给导出csv文件增加日志:
生产环境与开发环境配置分离 配置文件的问题:
生产环境的配置与开发环境配置隔离开, 开发环境允许 Debugging
敏感信息不提交到代码库中,比如数据库连接,secret key, LDAP连接信息等
生产、开发环境使用的配置可能不一样,比如 分别使用 MySQL/Sqlite 数据库
把 settings.py 抽出来,创建3个配置文件:
base.py 基础配置
local.py 本地开发环境配置,允许 Debug
production.py 生产环境配置, 不进到 代码库版本控制
命令行启动时指定环境配置:
1 python manage.py runserver 0.0 .0 .0 :8000 --settings=settings.local
产品细节完善
1 2 3 4 from django.utils.translation import gettext as _admin.site.site_header = _("招聘管理系统" )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def get_group_names (self, user ): group_names = [] for i in user.groups.all (): group_names.append(i.name) return group_names def get_readonly_fields (self, request, obj=None ): group_names = self.get_group_names(request.user) if "interviewer" in group_names: logger.info("interviewer is in user's group for %s" % request.user.username) return ('first_interviewer' , 'second_interviewer' ,) return ()
让hr直接在列表修改候选人的面试官,而面试官自己不能修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 default_list_editable = ('first_interviewer' , 'second_interviewer' ,) def get_list_editable (self, request ): group_names = self.get_group_names(request.user) if request.user.is_superuser or 'hr' in group_names: return self.default_list_editable return () def get_changelist_instance (self, request ): self.list_editable = self.get_list_editable(request) return super ().get_changelist_instance(request)
简历投递和面试流程闭环 定制更美观的主题
1 pip install django-grappelli
1 2 3 4 5 6 7 8 9 10 11 12 13 INSTALLED_APPS = [ 'grappelli' , 'django.contrib.admin' , 'django.contrib.auth' , 'django.contrib.contenttypes' , 'django.contrib.sessions' , 'django.contrib.messages' , 'django.contrib.staticfiles' , 'django_python3_ldap' , 'jobs' , 'interview' , ]
1 2 3 4 5 urlpatterns = [ url(r"^" , include("jobs.urls" )), url('grappelli' , include('grappelli.urls' )), path('admin/' , admin.site.urls), ]
定制面试官权限
一面面试官仅填写一面反馈, 二面面试官可以填写二面反馈 def get_fieldsets(self, request, obj=None):
1 2 3 4 5 6 7 8 9 def get_fieldsets (self, request, obj=None ): group_names = self.get_group_names(request.user) if 'interviewer' in group_names and obj.first_interviewer == request.user: return self.default_fieldsets_first if 'interviewer' in group_names and obj.second_interviewer == request.user: return self.default_fieldsets_second return self.default_fieldsets
数据集权限(querySet),专业面试官只能看到分到自己的候选人
对于面试官,获取自己是一面面试官或者二面面试官的候选人集合 def get_queryset(self, request):
1 2 3 4 5 6 7 8 9 10 def get_queryset (self, request ): qs = super ().get_queryset(request) group_names = self.get_group_names(request.user) if request.user.is_superuser or 'hr' in group_names: return qs return Candidate.objects.filter ( Q(first_interviewer=request.user) | Q(second_interviewer=request.user) )
功能权限(菜单/按钮),数据导出权限仅 HR 和超级管理员可用
自定义权限: 在 Model 类的 Meta 中定义自定义的 permissions
在 action 上限制权限: export_model_as_csv.allowed_permissions = (‘export’,)
在 Admin 上检查权限: def has_export_permission(self, request)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 permissions = [ ("export" , "Can export candidate list" ), ("notify" , "notify interviewer for candidate review" ), ] export_model_as_csv.allowed_permissions = ('export' ,) def has_export_permission (self, request ): opts = self.opts return request.user.has_perm('%s.%s' % (opts.app_label, "export" ))
系统报错功能:钉钉群集成 发送通知:钉钉群消息集成
为什么不使用 Email/SMS 通知:
由于邮件、短信没有限制,可以给任何人发;网络上对于 API 调用有了各种限制
阿里云封禁 25 端口
为什么使用钉钉群消息
其他推荐消息方式
测试钉钉群消息
安装钉钉聊天机器人:pip install DingtalkChatbot
测试群消息
1 2 3 4 python manage.py shell --settings=settings.local from interview import dingtalkdingtalk.send("天维招聘面试启动通知, 开始招聘。。。" )
定制管理后台的操作按钮:通知面试官准备面试
1 2 3 4 5 6 7 def notify_interviewer (model_admin, request, queryset ): candidates = "" interviewers = "" for obj in queryset: candidates = obj.username + ";" + candidates interviewers = obj.first_interviewer.username + ";" + interviewers dingtalk.send("候选人 %s 进入面试环节,亲爱的面试官,请准备好面试: %s" % (candidates, interviewers))
1 actions = (export_model_as_csv, notify_interviewer, )
允许候选人注册登录:集成Registration
允许注册:安装 registration pip install django-registration-redux
添加到 apps 中
注册到urls中
1 2 3 4 5 6 urlpatterns = [ url(r"^" , include("jobs.urls" )), url('grappelli' , include('grappelli.urls' )), url(r'^accounts/' , include('registration.backends.simple.urls' )), path('admin/' , admin.site.urls), ]
1 2 3 LOGIN_REDIRECT_URL = '/' SIMPLE_BACKEND_REDIRECT_URL = '/accounts/login/'
候选人简历存储 创建简历Model
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class Resume (models.Model): username = models.CharField(max_length=135 , verbose_name='姓名' ) applicant = models.ForeignKey(User, verbose_name="申请人" , null=True , on_delete=models.SET_NULL) city = models.CharField(max_length=135 , verbose_name='城市' ) phone = models.CharField(max_length=135 , verbose_name='手机号码' ) email = models.EmailField(max_length=135 , blank=True , verbose_name='邮箱' ) apply_position = models.CharField(max_length=135 , blank=True , verbose_name='应聘职位' ) born_address = models.CharField(max_length=135 , blank=True , verbose_name='生源地' ) gender = models.CharField(max_length=135 , blank=True , verbose_name='性别' ) picture = models.ImageField(upload_to='images/' , blank=True , verbose_name='个人照片' ) attachment = models.FileField(upload_to='file/' , blank=True , verbose_name='简历附件' ) bachelor_school = models.CharField(max_length=135 , blank=True , verbose_name='本科学校' ) master_school = models.CharField(max_length=135 , blank=True , verbose_name='研究生学校' ) doctor_school = models.CharField(max_length=135 , blank=True , verbose_name='博士生学校' ) major = models.CharField(max_length=135 , blank=True , verbose_name='专业' ) degree = models.CharField(max_length=135 , choices=DEGREE_TYPE, blank=True , verbose_name='学历' ) created_date = models.DateTimeField(verbose_name="创建日期" , default=datetime.now) modified_date = models.DateTimeField(verbose_name="修改日期" , auto_now=True ) candidate_introduction = models.TextField(max_length=1024 , blank=True , verbose_name='自我介绍' ) work_experience = models.TextField(max_length=1024 , blank=True , verbose_name='工作经历' ) project_experience = models.TextField(max_length=1024 , blank=True , verbose_name='项目经历' ) class Meta : verbose_name = '简历' verbose_name_plural = '简历列表' def __str__ (self ): return self.username
注册 Model 到 Admin 中,设置展示字段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class ResumeAdmin (admin.ModelAdmin): list_display = ( 'username' , 'applicant' , 'city' , 'apply_position' , 'bachelor_school' , 'master_school' , 'major' , 'created_date' ) readonly_fields = ('applicant' , 'created_date' , 'modified_date' ,) fieldsets = ( (None , {'fields' : ( "applicant" , ("username" , "city" , "phone" ), ("email" , "apply_position" , "born_address" , "gender" ,), ("picture" , "attachment" ,), ("bachelor_school" , "master_school" ), ("major" , "degree" ), ('created_date' , 'modified_date' ), "candidate_introduction" , "work_experience" , "project_experience" ,)}), ) def save_model (self, request, obj, form, change ): obj.applicant = request.user super ().save_model(request, obj, form, change) admin.site.register(Job, JobAdmin) admin.site.register(Resume, ResumeAdmin)
候选人在线投递简历 职位详情页:候选人简历投递
目标:
注册的用户可以提交简历
简历跟当前用户关联
能够追溯到谁投递的简历
步骤:
定义简历创建 View (继承自通用的CreateView)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 from django.contrib.auth.mixins import LoginRequiredMixinfrom django.views.generic.edit import CreateViewclass ResumeCreateView (LoginRequfrom django.conf.urls import urlfrom django.urls import pathfrom jobs import viewsurlpatterns = [ url(r"^joblist/" , views.job_list, name="joblist" ), url(r"^job/(?P<job_id>\d+)/$" , views.detail, name="detail" ), path('resume/add/' , views.ResumeCreateView.as_view(), name='resume-add' ), url(r"^$" , views.job_list, name="name" ), ]iredMixin, CreateView): """ 简历职位页面 """ template_name = 'resume_form.html' success_url = '/joblist/' model = Resume fields = ["username" , "city" , "phone" , "email" , "apply_position" , "gender" , "bachelor_school" , "master_school" , "major" , "degree" , "candidate_introduction" , "work_experience" , "project_experience" ] from django.conf.urls import urlfrom django.urls import pathfrom jobs import viewsurlpatterns = [ url(r"^joblist/" , views.job_list, name="joblist" ), url(r"^job/(?P<job_id>\d+)/$" , views.detail, name="detail" ), path('resume/add/' , views.ResumeCreateView.as_view(), name='resume-add' ), url(r"^$" , views.job_list, name="name" ), ]
1 2 3 4 5 6 <h2 > 提交简历</h2 > <form method ="post" method ="post" > {% csrf_token %} {{ form.as_p }} <input type ="submit" value ="提交" > </form >
jobs/templates/job.html,申请按钮绑定事件
1 <input type ="button" class ="btn btn-primary" style ="width:120px;" value ="申请" onclick ="location.href='/resume/add/?apply_position={{job.job_name}}&city={{job.city_name}}'" />
进一步完善, 可以带参数跳转 && 关联登陆用户到简历
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from django.http import Http404, HttpResponseRedirectdef get_initial (self ): initial = {} for x in self.request.GET: initial[x] = self.request.GET[x] return initial def form_valid (self, form ): self.object = form.save(commit=False ) self.object .applicant = self.request.user self.object .save() return HttpResponseRedirect(self.get_success_url())
使用 Bootstrap 来定制页面样式
安装依赖包: pip install django-bootstrap4
添加到 apps 中: bootstrap4
1 2 3 4 5 6 7 8 9 10 11 12 13 14 INSTALLED_APPS = [ 'grappelli' , 'registration' , 'bootstrap4' , 'django.contrib.admin' , 'django.contrib.auth' , 'django.contrib.contenttypes' , 'django.contrib.sessions' , 'django.contrib.messages' , 'django.contrib.staticfiles' , 'django_python3_ldap' , 'jobs' , 'interview' , ]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 {# Load the tag library #} {% load bootstrap4 %} {# Load CSS and JavaScript #} {% bootstrap_css %} {% bootstrap_javascript jquery='full' %} {# Display django.contrib.messages as Bootstrap alerts #} {% bootstrap_messages %} <h2 > 提交简历</h2 > <form method ="post" method ="post" class ="form" style ="width: 600px;margin-left: 5px" > {% csrf_token %} {% bootstrap_form form %} {% buttons %} <button type ="submit" class ="btn btn-primary" > 提交 </button > {% endbuttons %} </form >
简历评估和安排一面面试官
目标:打通简历投递与面试流程,让简历实体 (Resume) 流转到候选人实体 (Candidate)
添加一个数据操作菜单“进入面试流程”
定义 enter_interview_process方法
def enter_interview_process(modeladmin, request, queryset)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from django.contrib import adminfrom django.contrib import messagesfrom datetime import datetimefrom jobs.models import Job, Resumefrom interview.models import Candidatedef enter_interview_process (modeladmin, request, queryset ): candidate_names = "" for resume in queryset: candidate = Candidate() candidate.__dict__.update(resume.__dict__) candidate.created_date = datetime.now() candidate.modified_date = datetime.now() candidate_names = candidate.username + "," + candidate_names candidate.creator = request.user.username candidate.save() messages.add_message(request, messages.INFO, '候选人: %s 已成功进入面试流程' % (candidate_names)) enter_interview_process.short_description = "进入面试流程"
1 2 3 class ResumeAdmin (admin.ModelAdmin): actions = (enter_interview_process,)
定制列表字段,查看简历详情
添加 ResumeDetailView 的详情页视图,使用 Django的通用视图,继承自 DetailView
1 2 3 4 5 from django.views.generic import DetailViewclass ResumeDetailView (DetailView ): """ 简历详情页 """ model = Resume template_name = 'resume_detail.html'
添加 Detail 页模板: resume_detail.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 {# Load the tag library #} {% load bootstrap4 %} {# Load CSS and JavaScript #} {% bootstrap_css %} {% bootstrap_javascript jquery='full' %} {# Display django.contrib.messages as Bootstrap alerts #} {% bootstrap_messages %} <h1 > 简历详细信息 </h1 > <div > 姓名: {{ object.username }} </div > <div > 城市: {{ object.city }}</div > <div > 手机号码: {{ object.phone }}</div > <p > </p > <div > 邮件地址: {{ object.email}}</div > <div > 申请职位: {{ object.apply_position}}</div > <div > 出生地: {{ object.born_address}}</div > <div > 性别: {{ object.gender}}</div > <hr > <div > 本科学校: {{ object.bachelor_school}}</div > <div > 研究所学校: {{ object.master_school}}</div > <div > 专业: {{ object.major}}</div > <div > 学历: {{ object.degree}}</div > <hr > <p > 候选人介绍: {{ object.candidate_introduction}}</p > <p > 工作经历: {{ object.work_experience}}</p > <p > 项目经历: {{ object.project_experience}}</p >
添加路由跳转:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from django.conf.urls import urlfrom django.urls import pathfrom jobs import viewsurlpatterns = [ url(r"^joblist/" , views.job_list, name="joblist" ), url(r"^job/(?P<job_id>\d+)/$" , views.detail, name="detail" ), path('resume/add/' , views.ResumeCreateView.as_view(), name='resume-add' ), path('resume/<int:pk>/' , views.ResumeDetailView.as_view(), name='resume-detail' ), url(r"^$" , views.job_list, name="name" ), ]
候选人列表页, 对于每一行来自简历投递的数据,添加一个“查看简历”的链接:
列表页,使用 函数名称 作为 list_display 中的字段
定义一个函数, 获取 简历详情页链接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def get_resume (self, obj ): if not obj.phone: return "" resumes = Resume.objects.filter (phone=obj.phone) if resumes: return mark_safe('<a href="/resume/%s" target="_blank">%s</a>' % (resumes[0 ].id , "查看简历" )) return "" get_resume.short_description = "查看简历" get_resume.allow_tags = True
Django进阶开发复杂场景 为已有系统数据库生成管理功能 问题:
已经有内部系统在运行了,缺少管理功能,希望能有一个权利后台
比如 人事系统,CRM,ERP 的产品,缺少部分数据的维护功能
为已有数据库生成管理后台
创建项目: django-admin startproject empmanager
编辑settings.py中的数据库配置
1 2 3 4 5 6 7 8 9 10 DATABASES = { 'default' : { 'ENGINE' : 'django.db.backends.mysql' , 'NAME' : 'mydatabase' , 'USER' : 'mydatabaseuser' , 'PASSWORD' : 'mypassword' , 'HOST' : '127.0.0.1' , 'PORT' : '5432' } }
生成 model 类: python manage.py inspectdb > models.py
Django中间件 Django中间件Middleware:
注入在 Django 请求/响应 处理流程中的钩子框架,能对 request/response 作处理
使用场景:
登录认证,安全拦截
日志记录,性能上报
缓存处理,监控告警
自定义中间件的2种方法:使用函数或者类来实现
使用函数:
1 2 3 4 5 6 7 def simple_middleware (get_response ): def middleware (request ): response = get_response(request) return response return middleware
类实现,Django提供的get_response方法,可能是一个真实的视图,也可能是请求处理链中的下一个中间件
1 2 3 4 5 6 7 8 9 class SimpleMiddleware : def __init__ (self, get_response ): self.get_response = get_response def __call__ (self, request ): response = self.get_response(resquest) return response
创建请求日志、性能日志记录中间件
定义实现中间件: def performance_logger_middleware(get_response)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import timeimport logginglogger = logging.getLogger(__name__) def performance_logger_middleware (get_response ): def middleware (request ): start_time = time.time() response = get_response(request) duration = time.time() - start_time response["X-Page-Duration-ms" ] = int (duration * 1000 ) logger.info("%s %s %s" , duration, request.path, request.GET.dict ()) return response return middleware
记录请求 URL, 参数, 响应时间
注册 middleware 到 settings 中
1 2 3 MIDDLEWARE = [ 'interview.performance.performance_logger_middleware' , ]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 LOGGING = { ...... "handlers" : { "console" : { "class" : "logging.StreamHandler" , "formatter" : "simple" }, "file" : { "class" : "logging.FileHandler" , "formatter" : "simple" , "filename" : os.path.join(os.path.dirname(BASE_DIR), 'recruitment.admin.log' ) }, 'performance' : { 'class' : 'logging.FileHandler' , 'formatter' : 'simple' , 'filename' : os.path.join(os.path.dirname(BASE_DIR), 'recruitment.performance.log' ), }, }, "loggers" : { ....... "interview.performance" : { "handlers" : ["console" , "performance" ], "level" : "INFO" , "propagate" : False , }, }, }
Django中使用多语言 使用多语言:
代码中使用 gettext, gettext_lazy 获取多语言资源对应的文本内容
1 2 3 4 from django.utils.translation import gettext_lazy as _class Resume (models.Model): username = models.CharField(max_length=135 , verbose_name=_('姓名' ))
职位列表页使用多语言:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 {# Load the tag library #} {% load bootstrap4 %} {% load i18n %} {# Load CSS and JavaScript #} {% bootstrap_css %} {% bootstrap_javascript jquery='full' %} {# Display django.contrib.messages as Bootstrap alerts #} {% bootstrap_messages %} <h1 style ="margin: auto;width: 50%;" > {% translate "天维科技开放职位" %}</h1 > <p > </p > {% block header %} <a href ="/" style ="text-decoration: none; color:#007bff" > {% translate "Homepage" %}</a > <a href ="/joblist" style ="text-decoration: none; color:#007bff" > {% translate "job list" %}</a > {% if user.is_authenticated %} <a href ="/accounts/logout" style ="text-decoration: none; color:#007bff" > "{% translate "Logout" %}</a > {% else %} <a href ="/accounts/login" style ="text-decoration: none; color:#007bff" > {% translate "Login" %}</a > {% endif %} {% if user.is_authenticated %} <p > {% blocktranslate with user_name=user.username %} 终于等到你 {{ user_name }},期待加入我们,用技术去探索一个新世界 {% endblocktranslate %}</p > {% else %} <p > {% translate "欢迎你,期待加入我们,登陆后可以提交简历." %}</p > {% endif %} {% endblock %} <hr > {% block content %} {% endblock %}
1 2 3 4 5 6 mkdir locale django-admin makemessages -l zh_HANS -l en
1 django-admin compilemessages
在项目urls.py文件配置
1 path('i18n/' , include('django.conf.urls.i18n' )),
在settings.py配置文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from django.utils.translation import gettext_lazy as _...... LANGUAGES = [ ('zh-hans' , _('Chinese' )), ('en' , _('English' )), ] LANGUAGE_CODE = 'zh-hans' TIME_ZONE = 'Asia/Shanghai' USE_I18N = True USE_L10N = True USE_TZ = True LOCALE_PATHS = ( os.path.join(BASE_DIR, 'locale' ), )
在页面上添加可以选择语言的按钮
在职位列表首页加上一个表单 base.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <div style ="flex: 1; align-content:right;" > <form action ="{% url 'set_language' %}" method ="post" style ="margin-block-end: 0em;" > {% csrf_token %} <input name ="next" type ="hidden" value ="{{ redirect_to }}" > <select name ="language" > {% get_current_language as LANGUAGE_CODE %} {% get_available_languages as LANGUAGES %} {% get_language_info_list for LANGUAGES as languages %} {% for language in languages %} <option value ="{{ language.code }}" {% if language.code == LANGUAGE_CODE %} selected {% endif %}> {{ language.name_local }} ({{ language.code }}) </option > {% endfor %} </select > <input type ="submit" value ={% translate "Switch " %} style ="font-size:12;height:20px" > </form > </div >
在settings.py文件配置
1 2 3 4 5 6 7 8 9 10 MIDDLEWARE = [ ...... 'django.contrib.sessions.middleware.SessionMiddleware' , 'django.middleware.locale.LocaleMiddleware' , 'django.middleware.common.CommonMiddleware' , ...... ]
错误和异常日志上报 Sentry集成 使用 Docker 来安装 sentry, 使用 release 版本
Django 配置集成 sentry , 自动上报未捕获异常, 错误日志
异常发送钉钉群 自定义中间件来实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import timeimport loggingfrom django.http import HttpResponseimport tracebackfrom sentry_sdk import capture_exceptionfrom . import dingtalklogger = logging.getLogger(__name__) class PerformanceAndExceptionLoggerMiddleware : def __init__ (self, get_response ): self.get_response = get_response def __call__ (self, request ): start_time = time.time() response = self.get_response(request) duration = time.time() - start_time response["X-Page-Duration-ms" ] = int (duration * 1000 ) logger.info("duration:%s url:%s parameters:%s" , duration, request.path, request.GET.dict () ) if duration > 500 : capture_message("slow request for url: %s with duration: %s" % (request.build_absolute_uri(), duration)) return response def process_exception (self, request, exception ): if exception: message = "url:{url} ** msg:{error} ````{tb}````" .format ( url = request.build_absolute_uri(), error = repr (exception), tb = traceback.format_exc() ) logger.warning(message) dingtalk.send(message) capture_exception(exception) return HttpResponse("Error processing the request, please contact the system administrator." , status=500 )
在settings.py配置中间件
Django安全防护 防止XSS跨站脚本攻击
恶意攻击者将代码通过网站注入到其他用户浏览器中的 攻击方式
攻击者会把恶意 JavaScript 代码作为普通数据放入 到网站数据库中;
其他用户在获取和展示数据的过程中,运行 JavaScript 代码;
JavaScript 代码执行恶意代码(调用恶意请求,发送 数据到攻击者等等)
举例说明:
1 2 3 4 5 6 7 8 9 10 11 def detail_resume (request, resume_id ): try : resume = Resume.objects.get(pk=resume_id) content = "name: %s <br> introduction: %s <br>" % (resume.username, resume.candidate_introduction) return HttpResponse(content) except Resume.DoesNotExist: raise Http404("resume does not exist" )
在jobs/url.py
1 2 3 if settings.DEBUG : urlpatterns += [url(r'^detail_resume/(?P<resume_id>\d+)/$' , views.detail_resume, name='detail_resume' ),]
在简历上加上这么一段js脚本:
<script>alert('page cookies:\n' + document.cookie;)</script>
可以利用django自带的模板渲染机制,来渲染页面。
CSRF 跨站请求伪造
CSRF(Cross-site request forgery,简称:CSRF 或 XSRF)
恶意攻击者在用户不知情的情况下,使用用户的身份来操作
黑客创建一个 请求网站 A 类的 URL 的 Web 页面,放在恶意网站 B 中 ,这个文件包含了一个创建 用户的表单。这个表单加载完毕就会立即进行提交
黑客把这个恶意 Web 页面的 URL 发送至超级管理员,诱导超级管理员打开这个 Web 页面
django在中间件 CsrfViewMiddleware
SQL 注入攻击
SQL 注入漏洞: 攻击者直接对网站数据库执行任意 SQL语句,在无需 用户权限的情况下即可实现对数据的访问、修改甚至是删除
Django 的 ORM 系统自动规避了 SQL 注入攻击
原始 SQL 语句,切记避免拼接字符串,这是错误的调用方式:
1 2 query = 'select * from employee where last_name=%s' % name Person.objects.raw(query)
1 2 name_map = {'first' : 'first_name' , 'last' : 'last_name' , 'pk' : 'id' } Person.objects.raw('select * from employee' , translations=name_map)
Django Rest Framework 开放API 按Django Rest Framework文档上的说明:https://www.django-rest-framework.org/
1 2 3 pip install djangorestframework pip install markdown pip install django-filter
1 2 3 4 INSTALLED_APPS = [ ... 'rest_framework' , ]
1 2 3 4 urlpatterns = [ ... path('api-auth/' , include('rest_framework.urls' )) ]
1 2 3 4 5 6 7 REST_FRAMEWORK = { 'DEFAULT_PERMISSION_CLASSES' : [ 'rest_framework.permissions.DjangoModelPermissionsOrAnonReadOnly' ] }
将用户信息和职位信息通过API暴露出去,需要对用户的Model和职位的Model提供相应的序列化方式。
在项目的urls.py中定义序列化方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 from django.contrib.auth.models import Userfrom jobs.models import Jobclass UserSerializer (serializers.HyperlinkedModelSerializer): class Meta : model = User fields = ['url' , 'username' , 'email' , 'is_staff' ] class UserViewSet (viewsets.ModelViewSet): queryset = User.objects.all () serializer_class = UserSerializer class JobSerializer (serializers.HyperlinkedModelSerializer): class Meta : model = Job fields = '__all__' class JobViewSet (viewsets.ModelViewSet): """ API endpoint that allows groups to be viewed or edited. """ queryset = Job.objects.all () serializer_class = JobSerializer router = routers.DefaultRouter() router.register(r'users' , UserViewSet) router.register(r'jobs' , JobViewSet) urlpatterns = [ ...... path('api/' , include(router.urls)), path('api-auth/' , include('rest_framework.urls' )), ]
在Django中使用缓存 Django 缓存的存储方式:
Memcached 缓存
Redis 缓存 (需要安装 django-redis 包)
数据库缓存
文件系统缓存
本地内存缓存
伪缓存( Dummy Cache, 用于开发、测试)
自定义缓存
缓存的策略:
整站缓存
视图缓存(使用CachePage来标记)
模板片段缓存
django-redis 中文文档:https://django-redis-chs.readthedocs.io/zh_CN/latest/
安装:pip install django-redis
配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 CACHES = { "default" : { "BACKEND" : "django_redis.cache.RedisCache" , "LOCATION" : "redis://127.0.0.1:6379/1" , "OPTIONS" : { "CLIENT_CLASS" : "django_redis.client.DefaultClient" , } } } MIDDLEWARE = [ 'django.middleware.cache.UpdateCacheMiddleware' 'django.middleware.common.CommonMiddleware' , 'django.middleware.cache.FetchFromCacheMiddleware' , ]
Django与Celery集成 Celery简单介绍:
一个分布式的任务队列
简单: 几行代码可以创建一个简单的 Celery 任务
高可用:工作机会自动重试
快速:可以执行一分钟上百万的任务
灵活:每一块都可以扩展
Celery使用场景,使用异步任务的场景:
发送电子邮件,发送 IM 消息通知
爬取网页, 数据分析
图像、视频处理
生成报告,深度学习
官方文档:https://docs.celeryproject.org/en/stable/getting-started/introduction.html#what-s-a-task-queue
安装:
1 2 pip install celery pip install "celery[librabbitmq,redis,auth,msgpack]"
1 2 3 4 5 6 7 8 9 10 from celery import Celeryapp = Celery('tasks' , backend='redis://127.0.0.1' , broker='redis://127.0.0.1' ) @app.task def add (x, y ): return x + y
运行celery
1 2 3 4 celery -A tasks worker --loglevel=INFO
添加运行任务
1 2 3 4 5 6 7 8 9 from tasks import addresult = add.delay(4 , 4 ) print ('Is task ready: %s' % result.ready())run_result = result.get(timeout=1 ) print ('task result: %s' % run_result)
Flower: 一个实时的 Celery 任务监控系统
安装:pip install flower
官方文档:https://docs.celeryproject.org/en/stable/userguide/monitoring.html
Django与Celery集成:异步任务 文档:https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html
Celery 4.0 的版本支持 Django 集成
不需要安装额外的库
使用 Celery 的自动发现机制: 自动发现 tasks.py
在项目主应用下新建celery.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from __future__ import absolute_import, unicode_literalsimport osfrom celery.schedules import crontabfrom celery import Celeryos.environ.setdefault('DJANGO_SETTINGS_MODULE' , 'settings.base' ) app = Celery('recruitment' ) app.config_from_object('django.conf:settings' , namespace='CELERY' ) app.autodiscover_tasks() @app.task(bind=True ) def debug_task (self ): print ('Request: {0!r}' .format (self.request))
在项目主应用__init__.py
1 2 3 4 5 from .celery import app as celery_app__all__ = ('celery_app' ,)
在配置文件上配置:
1 2 3 4 5 6 7 8 9 CELERY_BROKER_URL = 'redis://redis:6379/0' CELERY_RESULT_BACKEND = 'redis://redis:6379/1' CELERY_ACCEPT_CONTENT = ['application/json' ] CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' CELERYD_MAX_TASKS_PER_CHILD = 10 CELERYD_LOG_FILE = os.path.join(BASE_DIR, "logs" , "celery_work.log" ) CELERYBEAT_LOG_FILE = os.path.join(BASE_DIR, "logs" , "celery_beat.log" )
通知面试官面试,发送钉钉群消息的地方加上celery
在interview应用下新建tasks.py
1 2 3 4 5 6 7 8 from __future__ import absolute_importfrom celery import shared_taskfrom .dingtalk import send@shared_task def send_dingtalk_message (message ): send(message)
在admin.py下
1 2 send_dingtalk_message.delay("......" )
在项目根目录下启动:
1 2 DJANGO_SETTINGS_MODULE=settings.local celery --app recruitment worker -l info
Django 与 Celery 集成:定时任务
任务心跳管理进程 Beat
任务调度器
PersistentScheduler (默认)
DatabaseScheduler
任务存储
File Configuration
Database
安装 beat: pip install django-celery-beat
将django-celery-beat注册到app中
数据库迁移
使用 DatabaseScheduler 启动 beat 或者在 配置中设置 beat_scheduler
1 DJANGO_SETTINGS_MODULE=settings.local celery -A recruitment beat --scheduler django_celery_beat.scheduler:DatabaseScheduler
管理定时任务的方法
在 Admin 后台添加管理定时任务
系统启动时自动注册定时任务
直接设置应用的 beat_schedule
运行时添加定时任务
系统启动时自动注册定时任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from celery import Celery, shared_taskfrom celery.schedules import crontabos.environ.setdefault('DJANGO_SETTINGS_MODULE' , 'settings.base' ) app = Celery('recruitment' ) @app.on_after_configure.connect def setup_periodic_tasks (sender, **kwargs ): sender.add_periodic_task(10.0 , test.s('hello' ), name='hello every 10' ) sender.add_periodic_task(30.0 , test.s('world' ), expires=10 ) sender.add_periodic_task( crontab(hour=7 , minute=30 , day_of_week=1 ), test.s('Happy Mondays!' ), ) @app.task def test (arg ): print (arg)
直接配置:
1 2 3 4 5 6 7 8 9 from recruitment.tasks import addapp.conf.beat_schedule = { 'add-every-10-seconds' : { 'task' : 'recruitment.tasks.add' , 'schedule' : 10.0 , 'args' : (16 , 4 , ) }, }
系统运行时动态添加定时任务:
1 2 3 4 5 6 7 8 9 10 11 12 import jsonfrom django_celery_beat.models import PeriodicTask, IntervalScheduleschedule, created = IntervalSchedule.objects.get_or_create(every=10 , period=IntervalSchedule.SECONDS,) task = PeriodicTask.objects.create(interval=schedule, name='say welcome 2021' , task='recruitment.celery.test' , args=json.dumps(['welcome' ])) @app.task def test (arg ): print (arg)
文件和图片上传功能 场景/目标:
投递简历的页面, 可以上传个人的照片, 以及附件简历
上传的文件存储在服务器上,文件服务可以扩展
存储方案选型:
使用服务器本地磁盘
自建分布式文件服务器
阿里云 OSS
使用本地磁盘存储
设置图片、文件存储路径 & URL 映射,settings 里面添加 /media 路径, urls.py 中添加图片路径映射
准备 model, form, view 和 HTML 表单模板
model 里面添加图片/文件字段(如 个人照片, 个人简历字段到 Resume)
form.py 中增加图片,附件字段
创建简历的视图中展示 picture, attachment 字段
HTML 表单模板中增加 enctype 属性 (resume_form.html )
变更数据库
Admin里面 添加展示字段, 简历列表中加上照片展示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 MEDIA_ROOT = os.path.join(BASE_DIR, 'media' ) MEDIA_URL = '/media/' urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT) from django.forms import ModelFormfrom .models import Resumeclass ResumeForm (ModelForm ): class Meta : model = Resume fields = ["username" , "city" , "phone" , "email" , "apply_position" , "born_address" , "gender" , "picture" , "attachment" , "bachelor_school" , "master_school" , "major" , "degree" , "candidate_introduction" , "work_experience" , "project_experience" ] from django.utils.html import format_htmlclass ResumeAdmin (admin.ModelAdmin): actions = (enter_interview_process,) def image_tag (self, obj ): if obj.picture: return format_html('<img src="{}" style="width:100px;height:80px;"/>' .format (obj.picture.url)) return "" image_tag.allow_tags = True image_tag.short_description = 'Image
使用阿里云 OSS 存储 复用前面创建好的类, 把存储替换为 OSS 存储,提升系统扩展性、可靠性
安装 OSS 库,pip install django-oss-storage
OSS 的依赖添加 django_oss_storage 到 APPS
settings 里面添加 OSS 设置
1 2 3 4 5 6 7 8 9 10 11 DEFAULT_FILE_STORAGE = 'django_oss_storage.backends.OssMediaStorage' OSS_ACCESS_KEY_ID = '' OSS_ACCESS_KEY_SECRET = '' OSS_BUCKET_NAME = '' OSS_ENDPOINT = ''
多数据路由 对已有系统数据进行管理
应用场景:
现有的一个业务应用,使用的 MySQL 数据库, 数据库名为 running
需要对数据库中的部分 model 进行管理。 包括 area, city, country, province 进行管理
使用 Django 主应用的数据库 (SQLite)管理 Django 基础账号权限数据
两个数据库,需要对 model 的操作做路由
操作过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 DATABASES = { 'default' : { 'ENGINE' : 'django.db.backends.sqlite3' , 'NAME' : BASE_DIR / 'db.sqlite3' , }, 'running' : { 'ENGINE' : 'django.db.backends.mysql' , 'NAME' : 'running' , 'USER' : 'root' , 'PASSWORD' : 'root' , 'HOST' : '127.0.0.1' , 'PORT' : '3306' } }
指定数据库表生成 model (inspectdb)
1 python manage.py inspectdb --database=running --settings=settings.local area city country province >> running/models.py
注册到 Admin 中 (running/admin.py)
1 2 3 4 5 django-admin startapp running
添加 Router 类 & settings 中配置 Router
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class DatabaseRouter : route_app_labels = {'running' } def db_for_read (self, model, **hints ): if model._meta.app_label in self.route_app_labels: return 'running' return 'default' def db_for_write (self, model, **hints ): if model._meta.app_label in self.route_app_labels: return 'running' return 'default' def allow_relation (self, obj1, obj2, **hints ): return None def allow_migrate (self, db, app_label, model_name=None , **hints ): """ 遗留数据库中的表不允许迁移 """ if app_label in self.route_app_labels: return False return True DATABASE_ROUTERS = ['settings.router.DatabaseRouter' ]
支持大数据量的关联外键 场景/解决问题
依赖外键数据量过大导致管理后台卡顿甚至死机
比如添加一个员工,员工的出生地依赖于城市表,全球的城市数据有1w多
添加员工的时候,要从上万的城市中选择出生地
Django 默认会把依赖外键中的所有数据全部加载, 浏览器会停顿乃至没有反应,导致死机
关系:Province 依赖于 Country, City 从属于 Province
期望:选择依赖的数据时(比如员工所属城市),可输入字符查找
准备:设置 Province, City 的外键依赖
countryid = models.ForeignKey(Country, db_column=’countryid’, null=True, on_delete=models.SET_NULL)
设置自动完成的关联外键 (admin.py)
autocomplete_fields = [‘provinceid’]
依赖的 model Admin 类中设置可以搜索的字段
1 2 class CountryAdmin (admin.ModelAdmin): search_fields = ('chn_name' , 'eng_name' ,)
running/model.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 from django.db import modelsclass Area (models.Model): areaid = models.AutoField(primary_key=True ) countryid = models.PositiveIntegerField() chn_name = models.CharField(max_length=64 ) eng_name = models.CharField(max_length=64 , blank=True , null=True ) sort = models.PositiveIntegerField() class Meta : managed = False db_table = 'area' class Country (models.Model): countryid = models.AutoField(primary_key=True ) chn_name = models.CharField(max_length=64 ) eng_name = models.CharField(max_length=64 , blank=True , null=True ) country_logo = models.CharField(max_length=120 , blank=True , null=True ) sort = models.PositiveIntegerField() class Meta : managed = False db_table = 'country' def __str__ (self ): return self.chn_name if self.chn_name else "" class Province (models.Model): provinceid = models.AutoField(primary_key=True ) countryid = models.ForeignKey(Country, db_column='countryid' , null=True , on_delete=models.SET_NULL) areaid = models.PositiveIntegerField(blank=True , null=True ) chn_name = models.CharField(max_length=64 ) eng_name = models.CharField(max_length=64 , blank=True , null=True ) sort = models.PositiveIntegerField() class Meta : managed = False db_table = 'province' def __str__ (self ): return self.chn_name if self.chn_name else "" from smart_selects.db_fields import ChainedForeignKeyclass City (models.Model): cityid = models.AutoField(primary_key=True ) countryid = models.ForeignKey(Country, db_column='countryid' , null=True , on_delete=models.SET_NULL) areaid = models.PositiveIntegerField(blank=True , null=True ) provinceid = models.ForeignKey(Province, db_column='provinceid' , null=True , on_delete=models.SET_NULL) chn_name = models.CharField(max_length=64 ) eng_name = models.CharField(max_length=64 , blank=True , null=True ) sort = models.PositiveIntegerField() class Meta : managed = False db_table = 'city' def __str__ (self ): return self.chn_name if self.chn_name else self.eng_name if self.eng_name else ""
国家-城市 多级关联的场景,django中有一个 smart-selects的插件
https://github.com/jazzband/django-smart-selects
安装 django-smart-selects 插件,pip install django-smart-selects
添加 smart_selects 到安装的 APPS 中
在项目的 urls.py 中添加 chaining/ 的URL路径
1 2 3 4 urlpatterns = patterns('' , url(r'^admin/' , include(admin.site.urls)), url(r'^chaining/' , include('smart_selects.urls' )), )
Model 中定义 ChainedForeignKey
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from smart_selects.db_fields import ChainedForeignKeyclass City (models.Model): cityid = models.AutoField(primary_key=True ) countryid = models.ForeignKey(Country, db_column='countryid' , null=True , on_delete=model.SET_NULL) provinceid = ChainedForeignKey( Province, chained_field='countryid' , chained_model_field='countryid' , show_all=False , auto_choose=True , sort=True , db_column='provinceid' )
实现只读站点 ReadOnlyAdmin 场景
集成遗留的已有系统
已有系统的数据涉及到核心数据
为了确保数据安全,管理后台只提供数据的浏览功能
解决问题:
设置列表页 list_display 展示所有字段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class ReadOnlyAdmin (admin.ModelAdmin): readonly_fields = [] def get_list_display (self, request ): return [field.name for field in self.model._meta.concrete_fields] def get_readonly_fields (self, request, obj=None ): return list (self.readonly_fields) + \ [field.name for field in obj._meta.fields] + \ [field.name for field in obj._meta.many_to_many] def has_add_permission (self, request ): return False def has_delete_permission (self, request, obj=None ): return False def has_change_permission (self, request, obj=None ): return False class CountryAdmin (ReadOnlyAdmin ): search_fields = ('chn_name' , 'eng_name' ,)
自动注册所有Model到管理后台 场景:
实际的业务场景中, 往往Model 多大几十个
一个个写Admin, 再Register, 效率低
期望:能够自动注册 Model 到管理后台
不好的解决方案:
1 2 3 4 5 6 7 from django.apps import appsmodels = apps.get_models() for model in models: admin.site.register(model)
这样直接注册产生的问题:
settings 里面可能已经注册了 App,它是按照顺序加载的
重复注册时会出现异常,需要处理重复注册的逻辑
解决方案1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from django.contrib import adminfrom django.apps import apps, AppConfigclass AdminClass (admin.ModelAdmin): def __init__ (self, model, admin_site ): self.list_display = [field.name for field in model._meta.fields] super (AdminClass, self).__init__(model, admin_site) class UniversalManagerApp (AppConfig ): """ 应用配置在 所有应用的 Admin 都加载完之后执行 """ name = 'recruitment' def ready (self ): models = apps.get_app_config('running' ).get_models() for model in models: try : admin.site.register(model, AdminClass) except admin.sites.AlreadyRegistered: pass
解决方案2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 from django.contrib import adminfrom django.apps import apps, AppConfigclass ListAdminMixin (object ): def __init__ (self, model, admin_site ): self.list_display = [field.name for field in model._meta.fields] super ().__init__(model, admin_site) class UniversalManagerApp (AppConfig ): """ 应用配置在 所有应用的 Admin 都加载完之后执行 """ name = 'recruitment' def ready (self ): models = apps.get_app_config('running' ).get_models() for model in models: admin_class = type ('AdminClass' , (ListAdminMixin, admin.ModelAdmin,), {}) try : admin.site.register(model, admin_class) except admin.sites.AlreadyRegistered: pass
既然我们把所有的Model都注册进来,我们不用去创建Django的应用,不用写代码就能去管理一个数据库里面指定的表,可以使用python动态类的方法,使用动态的model把数据库中所有的model找到,在把这个model通过动态定义的方法把它定义注册进来,这样可以实现一个通用的应用,在这个应用可以定义一些设置,去设置我是对所有的表进行维护还是只要维护部份表。这样就可以做到不用写代码,直接加配置,对已有的系统进行维护
使用python中的动态特性
使用type()函数去定义一个类:
1 model = type (name, (models.Model,), attrs)
举例说明:
1 2 3 4 5 6 7 8 class Person (models.Model): name = models.CharField(max_length=255 ) Person = type ('Person' , (models.Model, ), { name = models.CharField(max_length=255 ), })
开源应用 sandman2,可以对已有数据库提供增、删、改、查功能 和 Rest API
安装:pip install sandman2
以 SQLite 数据库为例子, 启动 sandman2:
1 sandman2ctl sqlite+pysqlite:///db.sqlite3
访问 restapi 和 管理后台:
Django的 signals信号 什么是Signals
Django的信号
Django 框架内置的信号发送器,这个信号发送器在框架里面
有动作发生的时候,帮助解耦的应用接收到消息通知
当动作发生时,允许特定的信号发送者发送消息到一系列的消息接收者
Signals 是同步调用
信号的应用场景:
系统解耦;代码复用:实现统一处理逻辑的框架中间件; -> 可维护性提升
记录操作日志,增加/清除缓存,数据变化接入审批流程;评论通知;
关联业务变化通知
例:通讯录变化的异步事件处理,比如员工入职时发送消息通知团队新人入职,员工离 职时异步清理员工的权限等等;
Signals 类的子类 (Django内置的常用信号):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 django.db.models.signals.pre_init django.db.models.signals.post_init django.db.models.signals.pre_save django.db.models.signals.post_save django.db.models.signals.pre_delete django.db.models.signals.post_delete django.db.models.signals.m2m_changed django.core.signals.request_started django.core.signals.request_finished HTTP
如何注册信号处理器/接收器:
调用 Signals 任意一个子类的 connect方法
1 2 3 4 5 6 Signal.connect(receiver, sender=None , weak=True , dispatch_uid=None )
除了使用 Signal.connect() 方法注册处理器外,也可以使用 @receiver 的装饰器来注册
示例:使用装饰器来注册,修改数据时,发送消息通知到钉钉
在apps 的 ready() 函数中加载信号处理器
settings 中使用完整的名称注册 AppConfig,去掉原先注册的 jobs 应用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 from django.db.models.signals import post_save, post_deletefrom django.dispatch import receiverfrom .models import Job, Resumefrom interview.dingtalk import sendimport json, logginglogger = logging.getLogger(__name__) @receiver(signal=post_save, sender=Resume, dispatch_uid="resume_post_save_dispatcher" ) @receiver(signal=post_save, sender=Job, dispatch_uid="job_post_save_dispatcher" ) def post_save_callback (sender, instance=None , created=False , **kwarg ): message = "" if isinstance (instance, Job): message = "Job for %s has been saved" % instance.job_name else : message = "Resume for %s %s has been saved " % (instance.username , instance.apply_position) logger.info(message) send(message) from django.forms.models import model_to_dictdef post_delete_callback (sender, instance=None , using=None , **kwarg ): dict_obj = model_to_dict( instance, exclude=("picture" ,"attachment" , "created_date" , "modified_date" ) ) message = "Instance of %s has been deleted: %s" % (type (instance), json.dumps(dict_obj, ensure_ascii=False )) logger.info(message) send(message) post_delete.connect(post_delete_callback, sender=Resume, dispatch_uid="resume_post_delete_dispatcher" )
在jobs/apps.py
1 2 3 4 5 6 7 8 9 10 11 12 13 from django.apps import AppConfigimport logginglogger = logging.getLogger(__name__) class JobConfig (AppConfig ): name = 'jobs' def ready (self ): logger.info("JobConfig ready" ) from jobs.signal_processor import post_save_callback
在settings.py中配置
1 2 3 4 5 6 INSTALLED_APPS = [ ...... 'jobs.apps.JobConfig' , ]
自定义信号:
定义信号: 在项目根目录新建文件self_signal.py
1 2 import django.dispatchmy_signal = django.dispatch.Signals(providing_args=["argument1" ,"argument2" ])
1 2 from self_signal import my_signalmy_signal.send(sender="Recruitment" , argument1=111 , argument2=2 )
1 2 from self_signal import my_signalmy_signal.connect(callback_of_my_signal)
优雅的架构设计 CSR 架构总结 Celery Celery架构:
Celery架构之美:
清晰的定义来几个基础概念
API使用起来清晰、简洁
关键设计可以扩展,具备高可用性
定义了一套协议/API,跨平台(Python/Node/PHP 客户端,Python/Go/Rust 服务端)
Celery中核心的概念:
Task: 一个需要执行的任务,任务通常异步执行
Period Task: 需要定时执行的任务,定时一定间隔执行,也可以使用 crontab 表达式设定执 行周期和时间点
Message Broker: 消息代理,临时存储,传输任务到工作节点的消息队列。可以用 Redis, RabbitMQ, Amazon SQS 作为消息代理。消息代理可以有多个,以保障系统的高可用
Worker:工作节点,执行任务的进程,worker可以有多个,保障系统的高可用和扩展性
Result Store: 结果存储
Scheduler/Beat: 调度器进程, Beat是定时任务调度器
Celery 的跨平台 - 不同语言的客户端/服务器端实现:
Celery 的高可用架构:
CSR 架构总结 Sentry 解决的问题: 应用的错误,异常监控统计,报警通知;性能监控统计,对问题进行跟踪
Sentry架构之美:
API 简单、易用,自动集成;安装简单:架构依赖多,但使用 Docker 可以一个命令安装
自动对错误,异常进行统计聚合,按照上下文的Tag进行聚合
可以对性能进行统计分析,可抽样;可视化的趋势分析
多租户,支持双因素认证,敏感内容自动脱敏
开放的架构:可与 AD 域账号集成,与 Google/Stackoverflow 等账号集成
开放的架构:有完善的插件支持:Webhook/Gitlab/Jira/Slack/PushOver/….
支持不同环境(开发、测试、预发、线上);可以配置灵活的告警
跨平台,跨端的支持
Sentry中的概念
Symbolicator: 用来解析函数名,堆栈中的文件位置,代码上下文
Relay: 用来处理收到的请求,会立刻返回200或者429, 然后把事件放在内存中排队, 然后发到 Kafka ingest-events
ingest-consumer: 消费处理 Relay 发出的消息
process_event: 堆栈处理,插件预处理
postgresql: 用来保存完整的事件数据
Snuba: 事件数据的存储和查询服务
clickhouse: 用作数据仓库,用于 OLAP,搜索,聚合,标签统计
应用这边发送Crash的日志或者异常日志,发送到服务器端的web容器,发送到Nginx的网关,Nginx收到请求之后,发送到Sentry中的relay。relay是一个中继,会处理收到的请求,收到之后会立刻返回200或者429。relay会把事件放在内存队列中,发送到Kafka的ingest-events里面去。然后在Sentry的ingest-consumer那边会去消费中继发送过来的消息,进一步到后面去处理。后面的处理是由Celery的Task去处理的。Celery Task去取到消息之后做预处理,之后会去调用符号服务,符号服务是一个symbolicate,它可以解析我们代码里面的函数名、类名、包括堆栈里面的文件信息、每一行代码跟文件之间的关系和代码的上下文。解析完成之后再去处理这个事件,处理事件的时候,插件的预处理都会在这里去处理掉,process_event环节都会在这里运行,处理完了会保存事件。然后接下来的话,Celery的task任务会把事件发送到Snuba的Kafka的消息流里面去。snuba有一个消费端,会读取Kafka的消息,然后把这个数据存储到clickhouse,clickhouse用作数据仓库,用于做OLAP的分析,做数据的搜索聚合标签的统计。,包括我们按照不同的状态、Tag去查询数据都是走clickhouse。像这种错误信息收集,然后Crash信息收集的数据量很大的情况,我们用PostgreSQL不是太适合,特别是像跟着日志一起的相关的上下文的信息,这种数据在关系型数据库里面查询起来会相当慢。
Snuba是事件数据的一个查询跟存储服务,它的作用是为了隐藏Clickhouse对于上面应用层的一个复杂度,对clickhouse做了一个封装。
Sentry整体的架构:
在Sentry的核心架构里面,有几个重要的概念,包括Relay消息处理的中继。Ingest Kafka就是最前端接受消息的Kafka队列,Ingest consumer前端消息的消费者,然后Celery的这些task会去处理每一个消息,做这个符号解析,最后把数据保存到PostgreSQL之后,再把数据发一份到Snuba那边,产生Snuba Kafka消息,然后Snuba的消费者去一条一条消息消费,完了之后把完整的数据丢到clickhouse,同时也会去做一个事件的post_process,它里面回去遍历所有的插件,使用每一个插件对数据进行一个后处理。
Snuba的作用:
用作搜索、图计算、规则处理查询的一个服务
这个服务实际上对clickhouse做了一个抽象跟隔离
Snuba有两个部分组成的服务
一个是Reading的服务,通过clickhouse的查询语法去clickhouse查询,做分析,做聚合统计
一个是Writing的服务,首先是接收到Kafka的消息,然后把数据写到clickhouse里面去
Rest framework总结 解决的问题: 为应用提供Restful API
DRF 架构之美:
简单易用,既可以使用自动的 CRUD API,也可以自定义实现API
提供可浏览的 HTML API; 一套实现同时提供 HTML/JSON/XML 展现
灵活的用户认证,支持 Token/OAuth/OAuth2/JWT 等认证方式
提供流量控制,结果过滤筛选,分页,API 版本控制能力
灵活的权限控制:登陆用户,管理员,Django内置权限,只读权限,匿名用户
简单定义一个model的API
1 2 3 4 5 6 7 8 class UserSerializer (serializers.HyperlinkedModelSerializer): class Meta : model = User fields = ['url' , 'username' , 'email' , 'is_staff' ] class UserViewSet (viewsets.ModelViewSet): queryset = User.objects.all () serializer_class = UserSerializer
整体架构:
Django常用的插件
Django debug toolbar : 提供一个可以查看debug 信息的面板(包括SQL执行时间,页面耗时),开发环境可以使用,但线上环境不能使用
django-silk :性能瓶颈分析,可以查看SQL执行的时间
Simple UI:基于Element UI 和 VUE 的 Django Admin 主题
Haystack Django :模块化搜索方案
Django notifications: 发送消息通知,你有 xx 条未处理简历
Django markdown editor :Markdown 编辑器
django-crispy-forms : Crispy 表单,以一种非常优雅、干净的方式来创建美观的表单
django-simple-captcha:Django表单验证码
文档:https://django-debug-toolbar.readthedocs.io/en/latest/
安装:
1 python -m pip install django-debug-toolbar
注册到app:
1 2 3 4 5 6 7 8 INSTALLED_APPS = [ 'django.contrib.staticfiles' , 'debug_toolbar' , ] STATIC_URL = '/static/'
url:
1 2 3 4 5 6 7 8 import debug_toolbarfrom django.conf import settingsfrom django.urls import include, pathurlpatterns = [ ... path('__debug__/' , include(debug_toolbar.urls)), ]
启用middleware
1 2 3 4 5 MIDDLEWARE = [ 'debug_toolbar.middleware.DebugToolbarMiddleware' , ]
页面右侧就会有相关信息
Simple UI 文档:https://simpleui.72wo.com/docs/simpleui/quick.html
安装:pip install django-simpleui
注册app:
1 2 3 4 INSTALLED_APPS = [ 'simpleui' , ...... ]
Django模块化搜索Haystack
安装Package: pip install django-haystack
把 Haystack 添加到 settings 中
配置 HAYSTACK_CONNECTIONS, 指定使用哪种搜索引擎 (Solr, ES, Whoosh, Xapian)
1 2 3 4 5 6 7 import osHAYSTACK_CONNECTIONS = { 'default' : { 'ENGINE' : 'haystack.backends.whoosh_backend.WhooshEngine' , 'PATH' : os.path.join(os.path.dirname(__file__), 'whoosh_index' ), }, }
创建 SearchIndex 来指定 model 的索引策略
每一个 model 创建一个 SearchIndex : indexes.SearchIndex, indexes.Indexable
设置搜索的 页面 View 和 URL
创建索引,通常设置定时任务来创建全量索引,动态索引
1 2 python manage.py rebuild_index python manage.py update_index
生产环境部署和应用监控告警 生产环境部署之前:
单元测试:版本质量评估
生产环境 Django 配置
单元测试 测试用例基类层次:
SimplTestCase: 继承自python的 TestCase基类,可以发起 HTTP 请求,跟页面, 模板,URL 交互,禁止了数据库的访问
TransactionTestCase:在用例运行之后,清理 所有表来重置数据库; 可以运行提交、回滚 来观 察中间状态(需要测试事务时使用)
TestCase: 测试用例执行完后不清理表数据; 在 一个事务中执行用例,最后自动回滚事务
LiveServerTestCase: 在后台自动启动一个 Server,以便使用外部工具如 Selenium 做测试
一个简单的测试用例: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from django.test import TestCaseclass MyTestCase (TestCase ): @classmethod def setUpTestData (cls ): super ().setUpTestData() .... @classmethod def tearDownClass (cls ): ... super ().setUpTestData() def setUp (self ) -> None : pass def tearDown (self ) -> None : pass def test_something_that_will_pass (self ): self.assertFalse(False )
目录结构组织 哪些逻辑需要测试
Django 自带的代码(框架中实现的)逻辑不需要测试
自己写的代码需要测试,比如自定义的页面的访问,自定义的功能菜单
测试用例目录组织:
Django 使用 unittest 模块的内置测试查找机制
它将在当前工作目录下, 查找任何匹配模式test*.py 命名的文件作为 Test Case
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from django.test import TestCasefrom django.test import Clientfrom jobs.models import Job, JobTypes, Citiesclass JobTests (TestCase ): @classmethod def setUpTestData (cls ): cls.job = Job.objects.create(job_name="Java开发工程师" , job_type=JobTypes[0 ][0 ], job_city=Cities[1 ][0 ], job_requirement="精通Java开发" ) def test1 (self ): pass def test_index (self ): client = Client() response = client.get('/joblist/' ) self.assertEqual(response.status_code, 200 ) def test_detail (self ): response = self.client.get('/job/1/' ) self.assertEqual(response.status_code, 200 ) job = response.context['job' ] self.assertEqual(job.job_name, JobTests.job.job_name)
执行测试用例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 python manage.py test python manage.py test jobs.testcase python manage.py test jobs.testcase.test_views python manage.py test jobs.testcase.test_views.JobTests python manage.py test jobs.testcase.test_views.JobTests.test_detail
生产环境的应用部署 发布到生产环境的步骤
配置生产环境配置 (settings):DEBUG & Secret 相关信息
选择Django App的托管环境 (IaaS/PaaS,比如 阿里云/AWS/Azure/GAE/Heroku 等等)
部署前的安全检查
选择静态资源文件的托管环境(包括JS/CSS/图片/文件等) & 部署静态资源
部署 Django 应用容器 & Web服务器
配置生产环境配置:让网站准备好发布 必须调整的关键配置是:
DEBUG. 在生产环境中设置为 False(DEBUG = False)。避免在 web 页面上显示敏感的调试 跟踪和变量信息
SECRET_KEY. 这是用于CSRF保护的随机值
ALLOWED_HOSTS, 生产环境必须设置 允许访问的域名
生成 SECRET KEY:
1 python -c 'from django.core.management.utils import get_random_secret_key; print(get_random_secret_key())'
配置生产环境配置:密钥的存储和管理
1 2 3 4 5 import osDEBUG = FALSE SECRET_KEY = os.environ.get('DJANGO_SECRET_KEY' , 'xxxxx' ) ALLOWED_HOSTS = ["127.0.0.1" , "setcreed.com" ]
从 KMS 系统中读取配置的密钥
自己部署的 KMS 系统
云服务的 KMS 服务: 阿里云/AWS 的 KMS 服务
部署前的安全检查 1 python manage.py check --deploy
部署到生产环境 静态资源文件的托管环境
静态内容 Web 服务器: Apache/Nginx
CDN 服务器
collectstatic 工具:用来收集静态资源文件, settings 中的相关设置:
STATIC_URL: 能够访问到静态文件的 URL 路径。
STATIC_ROOT: collectstatic 工具用来保存收集到的项目引用到的任何静态文件的路径
STATICFILES_DIRS: 这列出了 Django 的 collectstatic 工具应该搜索静态文件的其他目
1 python manage.py collectstatic --settings=settings.local
收集完成后,可以将这些静态文件,上传到托管文件的服务器/CDN
Django 应用容器 同步应用
uWSGI: C 实现的 Python Web 容器;Web 服务器 Apache/Nginx 与 django-uwsgi 进程通信 来提供动态的内容
gunicorn:纯 Python 实现的高性能 Python 应用容器,无外部依赖,简单容易配置; 还没有遇到性能问题的时候,推荐使用 gunicorn.
异步应用,django3.0之后才有
Daphne: twisted 实现
Hypercorn: 基于 sans-io hyper, h11, h2, wsproto实现
Uvicorn: 基于 uvloop and httptools 实现
异步支持 Roadmap:
Django 的异步支持 Roadmap
Django 3.0 - ASGI Server
Django 3.1 - Async Views
Django 3.2/4.0 - Async ORM
异步视图
1 2 3 async def view (request ): await asyncio.sleep(0.5 ) return HttpResponse("hello, async" )
启动服务器:
同步应用服务器,以 gunicorn 为例
1 2 3 4 5 python -m pip install gunicorn export DJANGO_SETTINGS_MODULE=settings.localgunicorn -w 3 -b 127.0.0.1:8000 recruitment.wsgi:application
异步应用服务器,以uvcorn 为例:
1 2 3 python -m pip install uvicorn export DJANGO_SETTINGS_MODULE=settings.localuvicorn recruitment.asgi:application --workers 3
应用水平扩展:使用负载均衡 安装配置Tenine 使用Tengine,https://tengine.taobao.org/
Tengine完全兼容Nginx, 同时提供了额外的强大功能
增强相关运维、监控能力,比如异步打印日志及回滚,本地DNS缓存,内存监控等
动态脚本语言Lua支持。扩展功能简单高效
更加强大的负载均衡能力,包括一致性hash模块、会话保持模块
主动健康检查,根据服务器状态自动上线下线,以及动态解析upstream中出现的域名
输入过滤器机制支持,更强大的防攻击(访问速度限制)模块;方便实现应用防火墙
安装 Tengine:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 wget https://ftp.pcre.org/pub/pcre/pcre-8.44.tar.gz ./configure --prefix=/usr/local/pcre && make && make install wget https://tengine.taobao.org/download/tengine-2.3.2.tar.gz tar zxvf tengine-2.3.2.tar.gz && cd /data/tengine-2.3.0 ./configure --prefix=/data/tengine/ --with-http_realip_module -- with-http_gzip_static_module --with-pcre=/data/pcre-8.44 make && make install
简单配置:路由转发请求到 Gunicorn/uWSGI 服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 server { listen 80 ; server_name recruit.setcreed.com; localtion / { proxy_pass http://127.0.0.1:8000; proxy_set_header Host proxy_set_header X-Real-IP proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for ; } }
使用Tengine/Nginx 负载均衡
相关的配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 upstream django-server { server 192.168.1.100:8001 max_fails=3 fail_timeout=20s ; server 192.168.1.100:8002 max_fails=3 fail_timeout=20s ; server 192.168.1.101:8001 max_fails=3 fail_timeout=20s ; server 192.168.1.101:8002 max_fails=3 fail_timeout=20s ; } server { listen 80 ; server_name www.mysite.com; access_log /data/tengine/logs/recruitment-access.log main; error_log /data/tengine/logs/recruitment-error .log; location / { proxy_pass http://django-server; proxy_redirect off ; proxy_set_header Host $host ; proxy_set_header X-Real-IP $remote_addr ; proxy_set_header REMOTE-HOST $remote_addr ; proxy_set_header X-Forwarder-For $proxy_add_x_forwarder_for ; } }
Tengine 的分流策略 负载分流策略:
round-robin — 平均分配流量:轮询模式
least-connected — 最少连接优先,下一个请求分到活跃连接最少的服务器
ip-hash — 按照客户端 IP 哈希来分配服务器 IP
带权重流量分配
一致性哈希 (Tengine)
会话保持 (Tengine 特性)
最少连接优先:
前面配置的为平均分配流量
按照最少连接优先/ip hash 的配置:
1 2 3 4 5 6 7 upstream django-upstream { least_conn; server 192.168.1.100:8001 ; server 192.168.1.100:8002 ; server 192.168.1.101:8001 ; server 192.168.1.101:8002 }
按权重分配:
按照权重分配(适合机器配置不一样时)
6个请求里面,3个走到第一台,其它3台没台1个请求
1 2 3 4 5 6 upstream django-upstream { server 192.168.1.100:8001 weight=3 ; server 192.168.1.100:8002 ; server 192.168.1.101:8001 ; server 192.168.1.101:8002 }
会话保持:
1 2 3 4 5 6 7 8 9 10 11 12 13 upstream django-upstream { server 192.168.1.100 ; server 192.168.1.101 ; session_sticky; } server { location / { proxy_pass http://django-upstream; } }
健康检查自动容错 被动健康检查: ngx_http_upstream_module 实现了被动的健康检查功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 upstream backend { server 127.0.0.1:8080 max_fails=2 fail_timeout=20s ; server 127.0.0.1:8081 max_fails=2 fail_timeout=20s ; } server { listen 80 ; server_name www.mysite.com; location / { proxy_pass http://backend; } }
主动健康检查: http_upstream_check_module,主动定时检查:
1 2 3 4 5 6 7 8 9 upstream django-upstream { server 192.168.1.100 ; server 192.168.1.101 ; check interval=3000 rise=2 fall=3 timeout=1000 type=http; check_http_send "HEAD /HTTP/1.0\r\n\r\n" ; check_http_expect_alive http_2xx http_3xx; }
上面的参数含义
interval: 向后端发送的健康检查包的间隔
fall(fall_count):如果连续失败次数达到fall_count,服务就被认为是down
rise(rise_count):如果连续成功次数达到rise_count,服务就被认为是up
timeout:后端健康请求的超时时间
上面配置的意思:对django-upstream的所有节点,每隔3秒检测一次,请求2次正常则标记 realserver状态为up,如果检测3次都失败,则标记realserver的状态为down,超时时间为1秒。
使用CDN加速 为什么要使用 CDN:
CDN 访问的两阶段:
当用户发起URL请求时,第一阶段会做域名解析,通常会在服务器上配置一个域名,静态资源对应的域名的一个cname,cname指向另外一个不同的域名,这个域名是由CDN提供的。当CDN服务器解析收到域名请求的时候,它会根据用户的IP地址去判断用户的IP在哪个区域,然后去找到跟用户所在IP最近的一台CDN节点的IP地址,作为解析得到的域名的地址,然后把域名对应的IP地址返回给用户;然后第二阶段再去发起HTTP请求,这个时候就近的CDN节点会收到它的请求,会去取内容,要么从它的缓存里面取内容返回,要么去从原始的内容原站取到内容最后返回给用户。取到内容之后,CDN会做缓存,而且缓存时间比较长。手工清除缓存可能会导致不同节点不一致的问题,一般使用版本控制的策略,每一次资源有更新的时候,去使用不同的版本。
加速静态资源访问的两种方法:
使用云端的静态资源 (能够解决国外网站访问慢的问题)
使用 CDN 加速
例:使用阿里云的 OSS 存储静态资源文件, Django 会自动替换所有静态资源文件的路径为 OSS 文件的路径,并且对 URL 添加鉴权参数
1 2 3 4 STATIC_ROOT = ‘static' STATICFILES_STORAGE = ' django_oss_storage.backends.OssStaticStorage'
使用 CDN 的两种方式:
手工上传静态资源文件到 CDN
通过 Tengine 把本机的静态资源开放到 Web上, CDN自动回流到 Tengine
以手工上传静态资源文件为例,Django 启用 CDN 静态资源加速的步骤
生成静态资源文件, 上传静态资源到 OSS
配置 CDN 域名,回源地址指向 OSS Bucket, 配置 Referer 防盗链的白名单
配置 OSS Bucket 匿名可以读
设置 STATIC_URL,直接指向 CDN 地址,同时注释掉 OssStaticStorage 避免冲突
启用 CDN 加速静态资源文件:
1 2 STATIC_URL = 'http://icdn.setcreed.com/static/'
接入监控告警
Sentry 错误监控 与告警
告警趋势可视化: Prometheus & Grafana 概念介绍
告警趋势可视化: Prometheus & Grafana 架构
Prometheus & Grafana 接入
配置 Grafana 大盘
Prometheus & Grafana 概念介绍:
Prometheus 数据类型:
Counter:计数器,总是增长的整数值;请求数,订单量,错误数等
Gauge:可以上下波动的计量值,比如温度,内存使用量,处理中的请求;
Summary:提供观测样本的摘要,包含样本数量,样本值的和; 滑动窗口计算:请求耗时,响应数据大小
Histogram:把观测值放到配置好的桶中做统计,请求耗时,响应数据大小等
Prometheus & Grafana 架构:
Prometheus服务: 采集和存储时序数据
客户端类库: 用作注入应用端代码
Push gateway: 用于采集朝生暮死的作业数据
特殊用途的Exporter Service: 如Nginx/HAProxy/StatsD等
Alertmanager:处理告警
Prometheus 与 Grafana 的调用关系:
Prometheus & Grafana 接入:
配置 Grafana 大盘:
下载 Dashbaord 的Json ,导入到 Grafana 中:https://grafana.com/grafana/dashboards/9528
生产环境中的安全 生产环境的安全设计 生产环境安全要考虑的因素:
防火墙:把攻击挡在外面,建立安全区
应用安全:密码攻击 & 访问限流 – 防恶意攻击
架构安全:部署架构的安全性,应用架构安全设计
数据安全:SSL,敏感数据加密 与 日志脱敏
密码安全与业务安全:权限控制 & 密码安全策略
防火墙的作用:建立安全区,把攻击挡在外面
防火墙的类别:
WAF 防火墙
WAF: Web application firewall,基于预先定义的规则, 如预先定义的正则表 达式的黑名单,不安全URL 请求等
防止 SQL 注入,XSS, SSRF等web攻击
防止CC攻击 屏蔽常见的扫描黑客工具,扫描器
屏蔽异常的网络请求 屏蔽图片附件类目录php执行权限
防止 web shell 上传
系统防火墙 常用的 Linux 系统防火墙
iptables: Linux原始自带的防火墙工具iptables
ufw: Ubuntu的防火墙工具ufw. uncomplicated firewall 的简称,简单防火墙
firewalld: CentOS的防火墙工具firewalld
Ubuntu 的 ufw:
Linux原始的防火墙工具 iptables 过于繁琐
Ubuntu 提供了基于iptables 之上的防火墙 ufw
ufw 支持图形界面操作
规则:
开启 ufw 后,默认是允许所有连接通讯
且配置的策略也有先后顺序,每一条策略都有序号
服务器上配置,建议先 deny from any,再放开需要开放的访问
应用安全
防恶意密码攻击策略:
在用户连续登陆 n 次失败后, 要求输入验证码登陆
可选方案:使用 simple captcha 插件:https://django-simple-captcha.readthedocs.io/en/latest/usage.html
防恶意密码攻击
1 2 3 4 5 6 7 8 9 10 11 pip install django-simple-captcha path('captcha/' , include('captcha.urls' )), path("clogin/" , views.login_with_captcha, name="clogin" ),
添加 登陆验证 Form 和 Views 视图
添加登陆 模板页
添加登陆失败的频次控制
设置管理员的登陆页, 默认使用带连续失败需要验证码的页面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 from django import formsfrom django.contrib.auth import authenticate, loginfrom django.contrib.auth.forms import AuthenticationFormfrom django.contrib import messagesfrom captcha.fields import CaptchaField from django.shortcuts import renderfrom django.http.response import HttpResponseRedirectfrom django.urls import reverse_lazyimport logginglogger = logging.getLogger(__name__) class CaptchaLoginForm (AuthenticationForm ): captcha = CaptchaField(label='验证码' ) max_failed_login_count = 3 def login_with_captcha (request ): if request.POST: failed_login_count = request.session.get('failed_login_count' , 0 ) if failed_login_count >= max_failed_login_count : form = CaptchaLoginForm(data=request.POST) else : form = AuthenticationForm(data=request.POST) if form.is_valid(): request.session['failed_login_count' ] = 0 user = authenticate(username=form.cleaned_data["username" ], password=form.cleaned_data["password" ]) if user is not None : login(request,user) return HttpResponseRedirect(reverse_lazy('admin:index' )) else : failed_login_count += 1 request.session['failed_login_count' ] = failed_login_count logger.warning(" ----- failed login for user: %s, failed times:%s" % (form.data["username" ], failed_login_count) ) if failed_login_count >= max_failed_login_count : form = CaptchaLoginForm(request.POST) messages.add_message(request, messages.INFO, 'Not a valid request' ) else : failed_login_count = request.session.get('failed_login_count' , 0 ) if failed_login_count >= max_failed_login_count : form = CaptchaLoginForm(request.POST) else : form = AuthenticationForm() return render(request, 'login.html' , {'form' : form})
项目主应用下的templates/login.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 {% extends 'base.html' %} {% block content %} <form method ="post" style ="margin-left: 10px;" > {% csrf_token %} {{ form.as_p }} <div class ="form-actions" > <input type ="submit" class ="btn btn-primary" style ="width:120px;" value ="Login" /> </div > </form > {% endblock %}
访问限流 – 防恶意攻击:
Rest Framework API 限流
应用限流: 对页面的访问频次进行限流
Rest API 限流
可以对匿名用户,具名用户进行限流
可以设置峰值流量(如每分钟60次请求)
也可以设置连续一段时间的流量限制(比如每天3000次)
1 2 3 4 5 6 7 8 9 10 REST_FRAMEWORK = { 'DEFAULT_THROTTLE_CLASSES' : [ 'example.throttles.BurstRateThrottle' , 'example.throttles.SustainedRateThrottle' ], 'DEFAULT_THROTTLE_RATES' : { 'burst' : '60/min' , 'sustained' : '3000/day' } }
对页面的访问频次进行限流:
示例策略: 一分钟最多请求5次登陆页,防止暴力攻击登陆页
可以选方案: 使用 django-ratelimit 插件,https://django-ratelimit.readthedocs.io/en/stable/index.html
例如在对验证码登录的方法进行限流,每分钟最多可以访问5次:
1 2 3 4 5 from ratelimit.decorators import ratelimit@ratelimit(key='ip' , rate='5/m' ) def login_with_captcha (request ): pass
架构安全
防火墙
XSS攻击的中间件
CSRF攻击的中间件
SQL注入的中间件
应用的部署架构
密钥存储原则
应用的部署架构
这是典型中小型互联网应用部署架构
服务器内部组成私有网络
密钥存储 密钥信息的存储原则:
基础的用法: 使用环境变量/独立的配置文件, 不放在代码库中
使用 Key Server:使用开源的 Key Server, 或 阿里云/AWS 的 KMS 服务
从独立的配置文件中读取配置密钥
从独立的配置文件中读取配置密钥
容器环境,启动容器时作为环境变量传入 – 密钥不落地到容器存储中
1 2 3 4 with open ('/etc/secret_key.txt' ) as f: SECRET_KEY = f.read().strip()
使用 Key Server:
数据安全
Let’s Encrypt SSL 证书的使用
Let’s Encrypt是一家非盈利机构, 免费提供 SSL 证书。
Let’s encrypt的目标是为了构建一个安全的互联网
Let’s Encrypt的证书被各大主流的浏览器和网络服务商支持
提供的证书90天过期,需要自动重新申请。 有相应的工具可以使用
Let’s Encrypt 的两种使用方式:
Webroot 方式: certbot 会利用既有的 web server,在其 web root 目录下创 建隐藏文件,Lets Encrypt 服务端会通过域名来访问这些隐藏文件,以确认你的 确拥有对应域名的控制权
Standalone 方式: Certbot 会自己运行一个 web server 来进行验证。如果我 们自己的服务器上已经有 web server 正在运行 (比如 Nginx 或 Apache ), 用 standalone 方式的话需要先关掉它,以免冲突
安装使用:
1 2 3 4 5 6 sudo apt install snapd sudo snap install core sudo snap install --classic cerbot sudo ln -s /snap/bin/cerbot /usr/bin/cerbot
Web root 方式:
编辑 nginx.conf 配置文件, 确保可以访问 /.well-known/ 路径及里边存放的验证文件
1 2 3 4 location /.well-known/acme-challenge/ { default_type "text/plain" ; root /data/www/example; }
重新加载 nginx nginx -s reload
调用命令生成证书
1 2 /usr/bin/certbot certonly --email admin@example.com --webroot -w /data/www/example -d example.com -d www.example.com
命令会在 web root 目录中创建 .well-known 文件夹,其中包含了域名所有权的 验证文件
Certbot 会访问域名下面 /.well-known/acme-challenge/ 来验证域名是否绑 定,并生成证
—email 为申请者邮箱
—webroot 为 webroot 方式,-w 为站点目录,-d 为要加 https 证书的域名
在nginx中开启https
证书生成完成后可以到 /etc/letsencrypt/live/ 目录下查看对应域名的证书文件。 编辑 nginx 配置文件监听 443 端口,启用 SSL,并配置 SSL 的公钥、私钥证书 路径。
Nginx 配置文件中配置 SSL 证书, 以及监听端口, 重新加载 nginx
1 2 3 4 5 6 7 8 9 10 11 server { listen 443 ssl; listen [::]:443 ssl; server_name example.com www.example.com; index index.html index.htm index.php; root /home/wwwroot/example.com; ssl_certificate /etc/letsencrypt/live/example.com/fullchain.pem; ssl_certificate_key /etc/letsencrypt/live/example.com/privkey.pem; }
Let’s Encrypt SSL 证书续期
证书 3个月过期一次
使用 snapd 安装的 certbot, 会启动一个 cron job 或者 systemd 的定时任务,在证 书过期前自动续期
运行这个命令测试自动续期
1 2 3 4 5 cerbot renew --dry-run systemctl list-timers
敏感数据加密
对敏感数据,比如用户提交的内容,财务报告,第三方合同等数据进行加密
使用 Python 的 cryptography 库 pip install cryptography
1 2 3 4 5 6 7 8 9 10 11 12 13 from cryptography.fernet import Fernetkey = Fernet.generate_key() f = Fernet(key) token = f.encrypt(b"welcome to django" ) print (token)d = f.decrypt(token) print (d)
日志脱敏 在日志记录中,过滤掉敏感信息存储,避免敏感信息泄漏
1 2 3 4 5 6 7 8 from django.views.decorators.debug import sensitive_variables@sensitive_variables('user' , 'pw' , 'cc' ) def process_info (user ): pw = user.pass_word cc = user.credit_card_number name = user.name ...
可以用 sensitive_variables 装饰器阻止错误日志内容包含这些变量的值
具体参考 sensitive_post_parameters, sensitive_post_parameters https://docs.djangoproject.com/zh-hans/3.1/howto/error-reporting/#filteringsensitive-information
密码安全与业务安全
权限控制
遵循最小原则, 长时间没用自动回收
思路: 定时任务检查所有用户,找到长时间没有登陆的用户,回收相应的权限, 或删除账号
密码策略
密码策略 密码验证策略 AUTH_PASSWORD_VALIDATORS
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 AUTH_PASSWORD_VALIDATORS = [ { 'NAME' : 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator' , }, { 'NAME' : 'django.contrib.auth.password_validation.MinimumLengthValidator' , 'OPTIONS' : { 'min_length' : 9 , } }, { 'NAME' : 'django.contrib.auth.password_validation.CommonPasswordValidator' , }, { 'NAME' : 'django.contrib.auth.password_validation.NumericPasswordValidator' , }, ]
密码过期策略 可以使用 django-user-accounts 插件 https://github.com/pinax/django-user-accounts
1 2 3 4 5 6 7 8 9 10 11 MIDDLEWARE_CLASSES = [ ...... "account.middleware.ExpiredPasswordMiddleware" , ...... ] ACCOUNT_PASSWORD_USE_HISTORY = True ACCOUNT_PASSWORD_EXPIRY = 60 *60 *25 *5
云环境中的部署 docker容器
docker:
码头工人,轻量级的,可移植,自包含的容器,来自动化、版本化应用的发布
Docker上跑的容器是一个个的集装箱
Docker的基础是LXC
LXC用于应用程序的隔离,每个应用程序分配独立的命名空间,隔离的CPU, 内存,磁盘,网络资源
每个应用内部可以单跑一套容器系统,功能上相当于传统的虚拟机,但本质上是内核层面对资源的隔离
Docker 容器的分层和版本管理
Docker把应用和系统打包到一起(image镜像),进行版本化管理
应用之于Docker,如同代码之于Git/SVN,一个命令可以把应用部署到docker上
Docker 容器的几个重要概念:
用容器部署应用 Dockerfile 举例说明:https://docs.docker.com/compose/gettingstarted/
1 2 3 4 5 6 7 8 9 10 FROM python:3.7 -alpineWORKDIR /code ENV FLASK_APP=app.pyENV FLASK_RUN_HOST=0.0 .0.0 RUN apk add --no-cache gcc musl-dev linux-headers COPY requirements.txt requirements.txt RUN pip install -r requirements.txt EXPOSE 5000 COPY . . CMD ["flask" , "run" ]
使用 Python 3.7 作为基础镜像
把 /code 设置为工作目录
设置 flask 命令运行的环境变量
安装 gcc 和其它依赖
拷贝 requirements.txt 并安装其它依赖包
添加元数据来到镜像,声明监听 5000 端口
拷贝当前目录下所有文件,到镜像的工作目录
设置容器的默认运行命令:flask run
1 2 3 4 5 6 7 8 docker build . docker build -f /path/to/a/Dockerfile . docker build -t shykes/myapp .
用容器部署应用 Docker compose 1 2 3 4 5 6 7 8 9 version: "3.8" services: web: build: . ports: - "5000:5000" redis: image: "redis:alpine"
容器化 Django 应用 Django 应用容器化优势:
效率提升:开发环境可以复用
简单:一个命令搭建可以运行的开发环境
每个应用隔离的容器环境,无 Python/Pip包 版本冲突
代码调整:
settings文件配置:ALLOWED_HOSTS = [‘*’]
配置项放到环境变量中,通过读取配置启动
1 python manage.py runserver 0.0.0.0:8000 $server_params
构建小镜像Dockerfile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 FROM python:3.9 -alpineWORKDIR /data/recruitment ENV server_params=COPY requirements.txt ./ RUN apk add --update --no-cache curl jq py3-configobj py3-pip py3-setuptools python3 python3-dev \ && apk add --no-cache gcc g++ jpeg-dev zlib-dev libc-dev libressl-dev musl-dev libffi-dev \ && python -m pip install --upgrade pip \ && pip install -r requirements.txt \ && apk del gcc g++ libressl-dev musl-dev libffi-dev python3-dev \ && apk del curl jq py3-configobj py3-pip py3-setuptools \ && rm -rf /var/cache/apk/* COPY . . EXPOSE 8000 CMD ["/bin/sh" , "/data/recruitment/start.local.bat" ]
构建小镜像要点:
使用 alpine 的基础镜像
多个 命令使用一个 RUN 命令,这样只会产生一个 layer
在 RUN 命令的最后面删除不用的包,以及cache的包,减小镜像大小
使用 .dockerigonre 文件,把不需要打包到镜像的文件剔除,镜像会小很多
运行容器:
1 2 3 4 5 6 7 8 9 10 11 12 13 docker build -t setcreed/recruitment-base:v1.0. docker run -it --rm -p 8000:8000 --entrypoint /bin/sh setcreed/recruitment-base:v1.0 docker run -it --rm -p 8000:8000 -v "$(pwd) " :/data/recruitment --entrypoint /bin/sh setcreed/recruitment-base:v1.0 docker run --rm -p 8000:8000 -v "$(pwd) " :/data/recruitment --env server_params="-- settings=settings.local" setcreed/recruitment-base:v1.0
容器编排 – docker compose Docker compose 单机编排:
想要在一台主机上, 一下子跑起来多个容器, 且容器之间有调用关系
在一个 docker-compose.yml 文件中配置4个容器:
web: django 的应用, 使用 start.local.bat 来启动
redis: 缓存,以及 Celery 的broker 要用到的数据存储
celery: 异步任务 worker (用于异步发送钉钉消息通知)
flower:异步任务的监控应用 (监控异步任务的执行情况)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 version: "3.2" services: web: build: context: . dockerfile: Dockerfile command: /bin/sh /data/recruitment/start.local.bat environment: - server_params=--settings=settings.local volumes: - .:/data/recruitment - /data/logs/recruitment/ ports: - "8000:8000" depends_on: - redis - celery - flower redis: image: "redis:alpine" container_name: recruit-redis ports: - "6379:6379" celery: image: "setcreed/recruitment-base:v1.0" container_name: recruit-celery volumes: - .:/data/recruitment - /data/logs/recruitment/ entrypoint: ["/bin/sh" , "/data/recruitment/worker.start.sh" ] depends_on: - redis flower: image: "setcreed/recruitment-base:v1.0" container_name: recruit-flower ports: - "5555:5555" volumes: - .:/data/recruitment - /data/logs/recruitment/ entrypoint: ["/bin/sh" , "/data/recruitment/flower.start.sh" ] depends_on: - redis
启动:docker-compse up –d
停止:docker-compose stop
删除:docker-compose rm
docker docker-compose kubernates的关系
kubernates的介绍 k8s 的架构
k8s的分层架构
核心层:Kubernetes 最核心的功能,对外提 供 API 构建高层的应用,对内提供插件式应用 执行环境
应用层: 部署(无状态、有状态应用、job 等) 路由
管理层: 系统度量(如基础设施、容器和网络的度量) 自动化(如自动扩展、动态 Provision 等) 策略管理(RBAC、Quota、PSP、 NetworkPolicy 等)
核心组件:
k8s 的核心概念:
声明式管理: 通过 yml 声明期望的状态,k8s 自动根据定义的状态进行调度 (相对命令式管理而言, k8s 也提供命令式管理的方式)
声明式: kubectl apply -f
命令式: kubectl run xxx ,kubectl expose ..
Master node:集群主控节点,上面运行调度容器,管理容器
Worker node:工作节点,上面运行应用的容器
Pods:容器,k8s 对容器进行管理,自动创建、销毁容器
Service:使用 标签 选择算符(selectors)标识的一组 Pod,在集群内有固定 IP;可以用于为集群 内部容器/应用提供 稳定的访问入口,可以通过 service 名称访问到服务
Deployment:一套部署的容器集合
ReplicationController:可以复制的容器,功能集比 ReplicaSet 少,已不推荐使用
ReplicaSet:可以扩容、缩容的容器副本集,在容器集不需要状态,也不需要被其它容器访问的时候, 可以使用 ReplicaSet。其它容器无法直接访问 RS,可以通过 Service 来访问
StatefullSet:有状态的服务,比如 zookeeper,集群被外部使用的时候, 需要指定多个 zookeeper 服务的 ip 或者 hostname。由于是有状态的,比如 3 个节点的 zookeeper,不论如何 扩缩容(包括宕机恢复),3个节点容器名称/主机名总是 zk-0, zk-1, zk-2
Ingress:对外暴露可以访问的入口,为服务提供外网(从集群外)的访问入口
Namespace:资源可以放在 namespace 下面, 不同 namespace 之间相互隔离
Controller: 包括有节点控制器 , 路由控制器 , 服务控制器
创建 k8s 声明式配置
把 compose 文件转换为 k8s 声明式配置文件。 on mac :
1 2 3 4 5 6 7 8 9 curl -L https://github.com/kubernetes/kompose/releases/download/v1.16.0/k ompose-darwin-amd64 -o kompose chmod +x compose && sudo mv kompose /usr/local/bin/kompose convert mkidr k8s mv *.yaml k8skubectl apply -f k8s
在阿里云上搭建kubernates集群 K8s 环境创建 & 部署流程:
创建 Kubernetes 集群, 创建镜像仓库
配置本地 kubeconfig
Docker login 到 镜像仓库
Docker build & docker push 推送镜像到镜像仓库
K8s 部署到集群: kubectl apply -f k8s
在阿里云 k8s 控制台, 创建路由(ingress 路由),指向 Django 应用,即可访问
管理监控容器中的Django应用 云环境的复杂性:
应用被分布在了容器上运行,大量容器不断得创建销毁,升级
应用的可观测性,可见性变得更加重要
监控方案:
kubectl 命令行
可视化监控方案
GUI 的 kubernetes dashboard
云厂商的控制台
Sentry
ELK
Prometheus
https://help.aliyun.com/document_detail/94622.html
阿里云环境:手工安装 ack-prometheus-operator:https://help.aliyun.com/document_detail/94622.html
1 2 3 4 5 kubectl -n monitoring port-forward svc/ack-prometheus-operator-prometheus 9090:9090 kubectl -n monitoring port-forward svc/ack-prometheus-operator-grafana 3000:80
应用日志收集与查询 云环境的复杂性:
应用被分布在了容器上运行,大量容器不断得创建销毁,升级
应用的可观测性,可见性变得更加重要
日志收集 & 查询的不同方案:
使用Kubelet收集容器化应用输出到标准输出的日志
使用 sidecar 收集输出到文件中的日志,输出到标准输出 && tail –f
ELK/EFK 采集日志
阿里云Logtail 日志采集
k8s 下面的各种日志:
Pod logs
Node logs -> 宿主机的 /var/log/containers目录
K8s components logging (api server ,scheduler …)
K8s events
Audit logs
k8s 默认会将容器的stdout和stderr录入node的/var/log/containers目录下
而 k8s 组件的日志默认放置在/var/log目录下
阿里云 logtail 日志采集:
代码中的调整:
日志输出到独立的目录中
Settings 文件中更改日志路径
通过环境变量来创建您的采集配置和自定义Tag,所有与配置相关的环境变量都 采用aliyunlogs 作为前缀
创建采集配置的规则如下:- name: aliyunlogs {Logstore名称} value: {日志采集路径}
签名创建了两个采集配置
其中 aliyun_logs_recruitment-web 这个env表示创建一个Logstore名字为 recruitment-web,日志采集路径为stdout的配置,从而将容器的标准输出采集 到 recruitment-web 这个Logstore中
云环境下的持续集成 CI/CD的工作流程 CICD 包含如下流程:
Build & Package
Test
Deployment
需要的服务:
Git仓库:使用 Github
Docker 镜像仓库:使用阿里云镜像仓库
Jenkins:使用阿里云 k8s 上搭建的 jenkins
K8s: 使用阿里云 k8s
CICD 工具:Jenkins、Spinnaker、Harness
Jenkins pipeline 流水线:
CICD Pipeline: 包含一系列按照指定顺序执行的脚本
包含有用来完成任务的多个阶段(stages)
CD 阶段不同的部署策略:
Rolling Upgrade: 滚动更新,多个实例,下线一台升级一台,直至升级完
Blue/Green Deployment: 蓝绿部署,部署到新集群,部署完切流量到新集群
Canary Deployment:金丝雀部署,过程中新老版本共存,持续做灰度验证
CI/CD的使用 如何对 Django Web 应用进行 CICD, 自动化镜像打包,测试,发布,部署
前提:镜像的构建不依赖于本地的文件
安全策略:账号密码不保存在代码库中
账号密码保存到哪里? 密码如何使用
Kubernetes Secrets
KMS 系统: 如 Vault, 阿里云 KMS 等
使用 Kubernetes Secrets 管理账号密码:
使用账号密码使用的流程:
K8s secrets 中管理密码,k8s 配置文件中引用密码,代码中通过环境变量来引用
什么是 Kubernetes Secrets? https://kubernetes.io/zh/docs/concepts/configuration/secret/
如何使用:
基于文件创建 Secret : kubectl apply -f mysecret.yaml
基于命令创建:kubectl create secret generic
在云厂商 k8s 的管理控制台创建 secrets
示例:
1 2 3 4 5 6 7 8 9 10 kubectl create secret generic prod-db-secret --fromliteral=username=produser --from-literal=password=Y4nys7f11 kubectl get secret prod-db-secret kubectl describe secret prod-db-secret kubectl delete secret prod-db-secret
阿里云创建密钥:
创建密钥: 配置管理 – 保密字典
使用密钥: K8s 文件中 引用 secrets
Python 代码从环境变量读取配置
1 2 3 4 5 6 7 8 9 10 11 LDAP_AUTH_URL = os.environ.get('LDAPA_AUTH_URL' , 'ldap://localhost:389' ) LDAP_AUTH_CONNECTION_USERNAME = os.environ.get('LDAP_AUTH_CONNECTION_USERNAME' ) LDAP_AUTH_CONNECTION_PASSWORD = os.environ.get('LDAP_AUTH_CONNECTION_PASSWORD' ) OSS_ACCESS_KEY_ID = os.environ.get('OSS_ACCESS_KEY_ID' ) OSS_ACCESS_KEY_SECRET = os.environ.get('OSS_ACCESS_KEY_SECRET' )
搭建 k8s 中的 Jenkins 环境
1 2 3 4 5 6 7 8 9 10 11 12 cd Jenkins-doddocker pull ihopeit/jenkins-dod:1.1 docker tag ihopeit/jenkins-dod:1.1 registry.cnbeijing.aliyuncs.com/ihopeit/jenkins-dod:1.1 docker push registry.cn-beijing.aliyuncs.com/ihopeit/jenkins-dod:1.1 kubectl apply -f jenkins-service.yaml kubectl apply -f jenkins-deployment.yaml
阿里云 K8s 创建 Jenkins 路由(ingress路由)
访问 Jenkins, 配置 kubeconfig, 配置 阿里云 镜像仓库账号密码
创建 Jenkins 项目, Pipeline 项目,设置 Pipeline
Build 项目
快速迭代开发过程 快速迭代的价值与挑战 快速迭代:以天,甚至小时为单位,持续完善产品,交付到用户的循环过程
快速迭代的价值:
时间是最大的成本:机会瞬息即逝,赢得市场先机
快速验证需求,减少不对用户产生价值的投入(Fail fast, fail better)
快速验证方案,提高研发效率
加速反馈回路,给到团队和自己即时的激励
快速迭代的挑战:
产品设计者:能梳理清楚业务流程,抓住客户的重点需求,能把客户需求转化为系统需求
开发者:充分理解用户需求;有足够的能力,能用简洁的方案来设计出易维护的系统
根本挑战:
市场、用户、技术、环境变化太快,产品开发赶不上节奏
几乎不能从一开始就设计出一个完美的,能够适应未来长时间变化的方案
几乎没有人愿意承认,自己没有足够的能力(或条件)设计出一个完美的产品(系统)
使用 OOPD 方法识别产品核心功能 OOPD (Online and Offline integrated Product development)产品开发流程
OOPD 快速迭代的原则:
自助原则:做自己用的产品,自己用自己的产品,吃自己的狗食
0day 原则:找到明确的核心问题 拆解目标,抓住核心的问题,忽略掉一切细节,0day 发布
时限原则:设定时限,挑战自我 不给自己写 Bug 的时间
不完美原则:不做完美的产品(没有完美的产品,不去为了完美而浪费宝贵的资源)
谦卑原则:能够看到自己的局限性,获取用户反馈,持续迭代,听取用户声音
如何做好技术方案设计与工作拆解 做技术方案设计的前提条件
有明确的用户场景,用户如何跟产品交互,期望拿到什么样的预期结果
有清晰定义的业务流程
技术方案设计流程
产出的技术方案设计文档要素:
产品背景(用户场景,产品目标,引用到的业务流程,产品需求文档)
要解决的问题列表,系统不解决的问题列表,系统的限制
对于问题的不同解决方案的对比,阐述各个主要的问题如何被解决
所选整体的流程图(序列图),模块关系图,重要的接口、实体的概念定义
除了功能之外的其它方面的设计,包括安全,性能,可维护性,稳定性,监控,扩展性,易用性等
工作拆解的原则:
优先级:主流程上,不确定的工作优先完成(建议提前一个迭代做调研)
核心流程优先:核心工作优先,先把主流程跑通
依赖:减少不同人之间的工作依赖;并且保持团队工作拆解的透明,预留20% Buffer
拆解粒度:拆解到每项子任务 0.5-1天的粒度,最长不超过2天
如何保证交付质量和持续迭代
定义好清晰产品需求,产品需求从根本上决定了软件的质量
系统有整体上的架构方案的设计,评估,评审;系统设计决定了软件实现的质量
工程的角度持续交付的最佳实践推荐
Code Review: 每一次提交都有 CR,每次 commit 代码量 < 200 行,频繁commit
单元测试:项目开始建立好单元测试的机制,在持续集成中自动运行
自动化回归:对预发/线上系统做 API/页面自动化测试 (Postman/Robot Framework)
使用 CICD 机制对系统进行自动化的打包、测试、部署、线上验证
发布过程做到可监控,可回滚
对于大量用户使用的产品,使用灰度机制
架构上对于意外的并发访问进行限流,降级
架构上使用配置开关,对系统功能提供实时的开启/关闭的服务
对产品建立 A/B Test机制,通过数据来快速对比不同版本,不同方案的效果
自动化所有事情,代码化所有过程:代码化配置,代码化部署流程,代码化基础设施
声明式 API,CICD Pipeline, K8S, Helm, Terraform
Hacker
https://translations.readthedocs.io/en/latest/hacker_howto.html