1)自动任务
项目背景
- 每天定时执行SQL语句,更新基本数据表并生成当天日报数据。
- 根据日报数据每周生成周报。
- 基本数据表包括:
复购⽤户
专栏收⼊统计
⽤户拉新
⽤户活跃_⽤户活跃留存
⽤户活跃_⽤户在线时⻓
产品收⼊_会员收⼊统计
产品收⼊_总收⼊统计
产品收⼊_训练营收⼊
下载渠道付费转化率
celery实现
https://www.celerycn.io/
celery环境
Crontab时间任务配置
官方文档:https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html
需求:每天更新基础数据表;每天生成日报;每周生成周报。
class Config():
broker_url = 'redis://127.0.0.1/7'
result_backend = 'redis://127.0.0.1/8'
timezone = 'Asia/Shanghai'
beat_schedule = {
'基础': {
'task': 'server.base' ,
'schedule': crontab(minute=0 , hour=0)
} ,
'日报': {
'task': 'server.daily' ,
'schedule': crontab(minute=15 , hour=0)
} ,
'周报': {
'task': 'server.weekly' ,
'schedule': crontab(minute=30 , hour=0 , day_of_week=1)
} ,
}
Py MySQL的增删改查
安装pymysql:pip install pymysql
import pymysql
conn = pymysql.Connection(
host = 'localhost',
port = 3306,
user = 'root',
password='123456',
database='mytest1',
charset='utf8')
# 配置项
conn.begin()
#开启一次事务
cs = conn.cursor()
# 定义光标 :类似鼠标的作用,光标在哪个位置就在哪一行执行操作
cs.execute('''insert into test (username,password) value ('name1','pass1');''')
conn.commit()
#提交事务,运行上面的插入操作,想要执行必须要有commit参数
cs.close()
conn.close()
# 查询操作
cs = conn.cursor()
cs.execute(''' select * from test;''')
ret = cs.fetchall()
print(ret)
ret = cs.fetchmany(2)
# 返回2条
ret = cs.fetchone()
#只取一条
整合SQL语句
将SQL语句代码整合到一个文件夹下并且分类为基础数据、日报、周报。
# 得到日报周报基础数据并做日期填充
import os
filedir =os.path.join('edu_online_project','项目','SQL','基础')
# print(filedir)
filenames = os.listdir(filedir)#获取文件夹下所有文件名
# print(filenames)
# 使用循环遍历获取到整个sql表,然后append添加到sqls列表中
sqls = []
for filename in filenames:
filepath = os.path.join(filedir,filename)
print(filepath)
with open(filepath,'r',encoding = "utf-8") as f:
sql = f.read()
sqls.append(sql)
print(sqls)
如何填充日期,使用格式化方法,format。
sql = "select * from xxxx where dates ='{v}'"
ret = sql.format(v = '2021/11/19',v2='123' )
print(ret)
此时已经调试成为可以自动根据当天时间填充的SQL语句模式。
获取文件路径的那一行代码试了很多次,最后试出来需要把完整的路径都写出来才可以。
filedir =os.path.join(‘C:\‘,’Users’,’zhang’,’PycharmProjects’,’edu_online_project’,’项目’,’SQL’,’基础’)
# 得到日报周报基础数据并做日期填充
# C:\Users\zhang\PycharmProjects\edu_online_project\项目\SQL
import datetime
import os
# print(filedir)
last_date = datetime.datetime.now() - datetime.timedelta(days=1)
date = last_date.strftime('%Y-%m-%d')
date_14 = (last_date- datetime.timedelta(days=14)).strftime('%Y-%m-%d')
date_30 = (last_date- datetime.timedelta(days=30)).strftime('%Y-%m-%d')
filedir =os.path.join('C:\\','Users','zhang','PycharmProjects','edu_online_project','项目','SQL','基础')
filenames = os.listdir(filedir)#获取文件夹下所有文件名
# print(filenames)
# 使用循环遍历获取到整个sql表,然后append添加到sql列表中
sqls = []
for filename in filenames:
filepath = os.path.join(filedir,filename)
print(filepath)
with open(filepath,'r',encoding = "utf-8") as f:
sql = f.read().format( #使用format传递日期
date=date,
date_14 = date_14,
date_30= date_30
)
print(sql)
sqls.append(sql)
下一步进行代码的封装:
这一步实现的功能,通过构建函数能够获取到不同文件夹下的SQL语句,相应的函数为def get_sqls(filedir):和def get_base_sql():
import datetime
import os
def get_sqls(filedir):
# 日期相关
last_date = datetime.datetime.now() - datetime.timedelta(days=1)
date = last_date.strftime('%Y-%m-%d')
date_14 = (last_date - datetime.timedelta(days=14)).strftime('%Y-%m-%d')
date_30 = (last_date - datetime.timedelta(days=30)).strftime('%Y-%m-%d')
#文件相关
filenames = os.listdir(filedir)
# 获取文件夹下所有文件名
# 使用循环遍历获取到整个sql表,然后append添加到sql列表中
sqls = []
for filename in filenames:
filepath = os.path.join(filedir , filename)
print(filepath)
with open(filepath , 'r' , encoding="utf-8") as f:
sql = f.read().format( # 使用format传递日期
date=date ,
date_14=date_14 ,
date_30=date_30
)
print(sql)
sqls.append(sql)
return sqls
def get_base_sql():
file_dir = os.path.join('C:\\','Users','zhang','PycharmProjects','edu_online_project','项目','SQL','基础')
return get_sqls(file_dir)
def get_daily_sql():
pass
def get_weekly_sql():
pass
if __name__ == '__main__':
base_sqls = get_base_sql()
for base_sql in base_sqls:
print(base_sqls)
SQL语句编写
做到这里的时候发现Navicat里的表数据有点不对,在之前解读项目目标的时候没有执行完步骤, 于是按照《方式二》的步骤重新录入了一遍数据,然后连接pycharm。
sql语句编写有个问题,需要填充
current.total_paying as total_paying_num,
if(
last_2.total_paying = 0,
last_1.total_paying,
(last_1.total_paying - last_2.total_paying) / last_2.total_paying
) as total_paying_rate_day,
if(
last_7.total_paying = 0,
current.total_paying,
(current.total_paying - last_7.total_paying) / last_7.total_paying
) as total_paying_rate_week,
current.training_camp as training_camp_num,
if(
last_2.training_camp = 0,
last_1.training_camp,
(last_1.training_camp - last_2.training_camp) / last_2.training_camp
) as training_camp_rate_day,
if(
last_7.training_camp = 0,
current.training_camp,
(current.training_camp - last_7.training_camp) / last_7.training_camp
) as training_camp_rate_week,
current.special_column as special_column_num,
if(
last_2.special_column = 0,
last_1.special_column,
(last_1.special_column - last_2.special_column) / last_2.special_column
) as special_column_rate_day,
if(
last_7.special_column = 0,
current.special_column,
(current.special_column - last_7.special_column) / last_7.special_column
) as special_column_rate_week,
current.vip_member as vip_member_num,
if(
last_2.vip_member = 0,
last_1.vip_member,
(last_1.vip_member - last_2.vip_member) / last_2.vip_member
) as vip_member_rate_day,
if(
last_7.vip_member = 0,
current.vip_member,
(current.vip_member - last_7.vip_member) / last_7.vip_member
) as vip_member_rate_week,
current.activate_user as activate_user_num,
if(
last_2.activate_user = 0,
last_1.activate_user,
(last_1.activate_user - last_2.activate_user) / last_2.activate_user
) as activate_user_rate_day,
if(
last_7.activate_user = 0,
current.activate_user,
(current.activate_user - last_7.activate_user) / last_7.activate_user
) as activate_user_rate_week,
current.dau as dau_num,
if(
last_2.dau = 0,
last_1.dau,
(last_1.dau - last_2.dau) / last_2.dau
) as dau_rate_day,
if(
last_7.dau = 0,
current.dau,
(current.dau - last_7.dau) / last_7.dau
) as dau_rate_week,
current.customer_price as customer_price_num,
if(
last_2.customer_price = 0,
last_1.customer_price,
(last_1.customer_price - last_2.customer_price) / last_2.customer_price
) as customer_price_rate_day,
if(
last_7.customer_price = 0,
current.customer_price,
(current.customer_price - last_7.customer_price) / last_7.customer_price
) as customer_price_rate_week,
current.amount as amount_num,
if(
last_2.amount = 0,
last_1.amount,
(last_1.amount - last_2.amount) / last_2.amount
) as amount_rate_day,
if(
last_7.amount = 0,
current.amount,
(current.amount - last_7.amount) / last_7.amount
) as amount_rate_week
在写这一块的代码的时候跟着老师的敲是没问题的,可是在最后执行的时候频繁报红色波浪线的错误,training_camp这个字段的拼写有点问题,通过删除py代码生成的字段,再用代码补全的方式查看到了正确的代码是什么,“traininug_camp”,ing多了个inug。
日报:
use edu_company;
with p as
(select 日期 as date, -- ifnull语法:如果没有这个指标的挂不允许为nan,让其显示为0方便计算。
ifnull(总付费人数, 0) as 'total_paying',
ifnull(付费人数_训练营, 0) as 'traininug_camp',
ifnull(付费人数_专栏, 0) as 'special_column',
ifnull(付费人数_会员, 0) as 'vip_member',
ifnull(激活人数, 0) as 'activate_user',
ifnull(DAU, 0) as 'dau',
ifnull(客单价, 0) as 'customer_price',
ifnull(总付费金额, 0) as 'amount',
ifnull(新用户次日留存率, 0) as 'new_customer_remain_day',
ifnull(新用户7日留存率, 0) as 'new_customer_remain_week',
ifnull(活跃用户次日留存率, 0) as 'active_customer_remain_day',
ifnull(活跃用户7日留存率, 0) as 'active_customer_remain_week',
ifnull(浏览商详人数_训练营, 0) as 'product_details',
ifnull(`预约-付费转化率_训练营`, 0) as 'reservation',
ifnull(`浏览商详-预约转化率_训练营`, 0) as 'product_details_rate',
ifnull(`预约-付费转化率_训练营`, 0) as 'reservation_rate'
from 日报
order by 日期 desc -- 降序
limit 30)
select current.date as date,
current.total_paying as total_paying_num,
if(
last_2.total_paying = 0,
last_1.total_paying,
(last_1.total_paying - last_2.total_paying) / last_2.total_paying
) as total_paying_rate_day,
if(
last_7.total_paying = 0,
current.total_paying,
(current.total_paying - last_7.total_paying) / last_7.total_paying
) as total_paying_rate_week,
current.traininug_camp as training_camp_num,
if(
last_2.traininug_camp = 0,
last_1.traininug_camp,
(last_1.traininug_camp - last_2.traininug_camp) / last_2.traininug_camp
) as training_camp_rate_day,
if(
last_7.traininug_camp = 0,
current.traininug_camp,
(current.traininug_camp - last_7.traininug_camp) / last_7.traininug_camp
) as training_camp_rate_week,
current.special_column as special_column_num,
if(
last_2.special_column = 0,
last_1.special_column,
(last_1.special_column - last_2.special_column) / last_2.special_column
) as special_column_rate_day,
if(
last_7.special_column = 0,
current.special_column,
(current.special_column - last_7.special_column) / last_7.special_column
) as special_column_rate_week,
current.vip_member as vip_member_num,
if(
last_2.vip_member = 0,
last_1.vip_member,
(last_1.vip_member - last_2.vip_member) / last_2.vip_member
) as vip_member_rate_day,
if(
last_7.vip_member = 0,
current.vip_member,
(current.vip_member - last_7.vip_member) / last_7.vip_member
) as vip_member_rate_week,
current.activate_user as activate_user_num,
if(
last_2.activate_user = 0,
last_1.activate_user,
(last_1.activate_user - last_2.activate_user) / last_2.activate_user
) as activate_user_rate_day,
if(
last_7.activate_user = 0,
current.activate_user,
(current.activate_user - last_7.activate_user) / last_7.activate_user
) as activate_user_rate_week,
current.dau as dau_num,
if(
last_2.dau = 0,
last_1.dau,
(last_1.dau - last_2.dau) / last_2.dau
) as dau_rate_day,
if(
last_7.dau = 0,
current.dau,
(current.dau - last_7.dau) / last_7.dau
) as dau_rate_week,
current.customer_price as customer_price_num,
if(
last_2.customer_price = 0,
last_1.customer_price,
(last_1.customer_price - last_2.customer_price) / last_2.customer_price
) as customer_price_rate_day,
if(
last_7.customer_price = 0,
current.customer_price,
(current.customer_price - last_7.customer_price) / last_7.customer_price
) as customer_price_rate_week,
current.amount as amount_num,
if(
last_2.amount = 0,
last_1.amount,
(last_1.amount - last_2.amount) / last_2.amount
) as amount_rate_day,
if(
last_7.amount = 0,
current.amount,
(current.amount - last_7.amount) / last_7.amount
) as amount_rate_week,
current.new_customer_remain_day as new_customer_remain_day_data,
f1.new_customer_remain_day_avg as new_customer_remain_day_avg,
current.new_customer_remain_week as new_customer_remain_week_data,
f1.new_customer_remain_week_avg as new_customer_remain_week_avg,
current.active_customer_remain_day as active_customer_remain_day_data,
f1.active_customer_remain_day_avg as active_customer_remain_day_avg,
current.active_customer_remain_week as active_customer_remain_week_data,
f1.active_customer_remain_week_avg as active_customer_remain_week_avg,
current.product_details as product_details_data,
f1.product_details_avg as product_details_avg,
current.reservation as reservation_data,
f1.reservation_avg as reservation_avg,
current.product_details_rate as product_details_rate_data,
f1.product_details_rate_avg as product_details_rate_avg,
current.reservation_rate as reservation_rate_data,
f1.reservation_rate_avg as reservation_rate_avg
from (select * from p limit 1) current
left join p last_1 on datediff(current.date, last_1.date) = 1
left join p last_2 on datediff(current.date, last_2.date) = 2
left join p last_7 on datediff(current.date, last_7.date) = 7
left join (select avg(new_customer_remain_day) as new_customer_remain_day_avg,
avg(new_customer_remain_week) as new_customer_remain_week_avg,
avg(active_customer_remain_day) as active_customer_remain_day_avg,
avg(active_customer_remain_week) as active_customer_remain_week_avg,
avg(product_details) as product_details_avg,
avg(reservation) as reservation_avg,
avg(product_details_rate) as product_details_rate_avg,
avg(reservation_rate) as reservation_rate_avg
from p) f1 on 1 = 1;
周报
use edu_company;
with p as
(select 当周周一 as date, -- ifnull语法:如果没有这个指标的挂不允许为nan,让其显示为0方便计算。
ifnull(总付费人数, 0) as 'total_paying',
ifnull(付费人数_训练营, 0) as 'traininug_camp',
ifnull(付费人数_专栏, 0) as 'special_column',
ifnull(付费人数_会员, 0) as 'vip_member',
ifnull(激活人数, 0) as 'activate_user',
ifnull(DAU, 0) as 'dau',
ifnull(客单价, 0) as 'customer_price',
ifnull(总付费金额, 0) as 'amount',
ifnull(新用户次日留存率, 0) as 'new_customer_remain_day',
ifnull(新用户7日留存率, 0) as 'new_customer_remain_week',
ifnull(活跃用户次日留存率, 0) as 'active_customer_remain_day',
ifnull(活跃用户7日留存率, 0) as 'active_customer_remain_week',
ifnull(浏览商详人数_训练营, 0) as 'product_details',
ifnull(`预约-付费转化率_训练营`, 0) as 'reservation',
ifnull(`浏览商详-预约转化率_训练营`, 0) as 'product_details_rate',
ifnull(`预约-付费转化率_训练营`, 0) as 'reservation_rate'
from 周报
order by 当周周一 desc -- 降序
limit 30)
select current.date as date,
current.total_paying as total_paying_num,
if(
last_2.total_paying = 0,
last_1.total_paying,
(last_1.total_paying - last_2.total_paying) / last_2.total_paying
) as total_paying_rate_day,
if(
last_7.total_paying = 0,
current.total_paying,
(current.total_paying - last_7.total_paying) / last_7.total_paying
) as total_paying_rate_week,
current.traininug_camp as training_camp_num,
if(
last_2.traininug_camp = 0,
last_1.traininug_camp,
(last_1.traininug_camp - last_2.traininug_camp) / last_2.traininug_camp
) as training_camp_rate_day,
if(
last_7.traininug_camp = 0,
current.traininug_camp,
(current.traininug_camp - last_7.traininug_camp) / last_7.traininug_camp
) as training_camp_rate_week,
current.special_column as special_column_num,
if(
last_2.special_column = 0,
last_1.special_column,
(last_1.special_column - last_2.special_column) / last_2.special_column
) as special_column_rate_day,
if(
last_7.special_column = 0,
current.special_column,
(current.special_column - last_7.special_column) / last_7.special_column
) as special_column_rate_week,
current.vip_member as vip_member_num,
if(
last_2.vip_member = 0,
last_1.vip_member,
(last_1.vip_member - last_2.vip_member) / last_2.vip_member
) as vip_member_rate_day,
if(
last_7.vip_member = 0,
current.vip_member,
(current.vip_member - last_7.vip_member) / last_7.vip_member
) as vip_member_rate_week,
current.activate_user as activate_user_num,
if(
last_2.activate_user = 0,
last_1.activate_user,
(last_1.activate_user - last_2.activate_user) / last_2.activate_user
) as activate_user_rate_day,
if(
last_7.activate_user = 0,
current.activate_user,
(current.activate_user - last_7.activate_user) / last_7.activate_user
) as activate_user_rate_week,
current.dau as dau_num,
if(
last_2.dau = 0,
last_1.dau,
(last_1.dau - last_2.dau) / last_2.dau
) as dau_rate_day,
if(
last_7.dau = 0,
current.dau,
(current.dau - last_7.dau) / last_7.dau
) as dau_rate_week,
current.customer_price as customer_price_num,
if(
last_2.customer_price = 0,
last_1.customer_price,
(last_1.customer_price - last_2.customer_price) / last_2.customer_price
) as customer_price_rate_day,
if(
last_7.customer_price = 0,
current.customer_price,
(current.customer_price - last_7.customer_price) / last_7.customer_price
) as customer_price_rate_week,
current.amount as amount_num,
if(
last_2.amount = 0,
last_1.amount,
(last_1.amount - last_2.amount) / last_2.amount
) as amount_rate_day,
if(
last_7.amount = 0,
current.amount,
(current.amount - last_7.amount) / last_7.amount
) as amount_rate_week,
current.new_customer_remain_day as new_customer_remain_day_data,
f1.new_customer_remain_day_avg as new_customer_remain_day_avg,
current.new_customer_remain_week as new_customer_remain_week_data,
f1.new_customer_remain_week_avg as new_customer_remain_week_avg,
current.active_customer_remain_day as active_customer_remain_day_data,
f1.active_customer_remain_day_avg as active_customer_remain_day_avg,
current.active_customer_remain_week as active_customer_remain_week_data,
f1.active_customer_remain_week_avg as active_customer_remain_week_avg,
current.product_details as product_details_data,
f1.product_details_avg as product_details_avg,
current.reservation as reservation_data,
f1.reservation_avg as reservation_avg,
current.product_details_rate as product_details_rate_data,
f1.product_details_rate_avg as product_details_rate_avg,
current.reservation_rate as reservation_rate_data,
f1.reservation_rate_avg as reservation_rate_avg
from (select * from p limit 1) current
left join p last_1 on datediff(current.date, last_1.date) = 1
left join p last_2 on datediff(current.date, last_2.date) = 2
left join p last_7 on datediff(current.date, last_7.date) = 7
left join (select avg(new_customer_remain_day) as new_customer_remain_day_avg,
avg(new_customer_remain_week) as new_customer_remain_week_avg,
avg(active_customer_remain_day) as active_customer_remain_day_avg,
avg(active_customer_remain_week) as active_customer_remain_week_avg,
avg(product_details) as product_details_avg,
avg(reservation) as reservation_avg,
avg(product_details_rate) as product_details_rate_avg,
avg(reservation_rate) as reservation_rate_avg
from p) f1 on 1 = 1;
pandas运行SQL
# 读取SQL
sql = {}
with open(os.path.join('sql','daily_board.sql'),'r',encoding = "utf-8")as f:
sql['daily_board'] = f.read()
if __name__ == '__main__':
print(sql)
interface.py文件
这个文件起到读取四个SQL文件并输出数据到数据规范表中所示的对应变量中去。
import os
import pandas as pd
from sqlalchemy import create_engine
import json
import datetime
engine = create_engine('mysql+pymysql://root:123456@localhost:3306/edu_company?charset=utf8')
常规的导入包,链接MySQL数据库。
# 读取SQL
sql = {}
with open(
os.path.join('C:\\' , 'Users' , 'zhang' , 'PycharmProjects' , 'edu_online_project' , '2.数据接口' , '课上代码' , 'sql' ,
'daily_board.sql') , 'r' , encoding="utf-8")as f:
sql['daily_board'] = f.read()
with open(
os.path.join('C:\\' , 'Users' , 'zhang' , 'PycharmProjects' , 'edu_online_project' , '2.数据接口' , '课上代码' , 'sql' ,
'daily_chart.sql') , 'r' , encoding="utf-8")as f:
sql['daily_chart'] = f.read()
with open(
os.path.join('C:\\' , 'Users' , 'zhang' , 'PycharmProjects' , 'edu_online_project' , '2.数据接口' , '课上代码' , 'sql' ,
'weekly_board.sql') , 'r' , encoding="utf-8")as f:
sql['weekly_board'] = f.read()
with open(
os.path.join('C:\\' , 'Users' , 'zhang' , 'PycharmProjects' , 'edu_online_project' , '2.数据接口' , '课上代码' , 'sql' ,
'weekly_chart.sql') , 'r' , encoding="utf-8")as f:
sql['weekly_chart'] = f.read()
读取四个SQL文件,os.path路径的这个问题是经过多次调试后得到的结果,要写出完整的绝对路径才能够不报错’,pycharm里设置当前文件夹为源根也不能解决;并且要加上encoding=’utf-8’。
weekday_transfer = {
0: '星期一',
1: '星期二',
2: '星期三',
3: '星期四',
4: '星期五',
5: '星期六',
6: '星期日',
}
构造出daily_interface函数中日期的显示方式。