1)自动任务

项目背景

  1. 每天定时执行SQL语句,更新基本数据表并生成当天日报数据。
  2. 根据日报数据每周生成周报。
  3. 基本数据表包括:
  1. 复购⽤户
  2. 专栏收⼊统计
  3. ⽤户拉新
  4. ⽤户活跃_⽤户活跃留存
  5. ⽤户活跃_⽤户在线时⻓
  6. 产品收⼊_会员收⼊统计
  7. 产品收⼊_总收⼊统计
  8. 产品收⼊_训练营收⼊
  9. 下载渠道付费转化率

celery实现

https://www.celerycn.io/
celery环境

Crontab时间任务配置

官方文档:https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html

需求:每天更新基础数据表;每天生成日报;每周生成周报。

class Config():
    broker_url = 'redis://127.0.0.1/7'
    result_backend = 'redis://127.0.0.1/8'
    timezone = 'Asia/Shanghai'

    beat_schedule = {
        '基础': {
            'task': 'server.base' ,
            'schedule': crontab(minute=0 , hour=0)
        } ,
        '日报': {
            'task': 'server.daily' ,
            'schedule': crontab(minute=15 , hour=0)
        } ,
        '周报': {
            'task': 'server.weekly' ,
            'schedule': crontab(minute=30 , hour=0 , day_of_week=1)
        } ,
    }

Py MySQL的增删改查

安装pymysql:pip install pymysql

import pymysql

conn = pymysql.Connection(
host = 'localhost',
    port = 3306,
    user = 'root',
    password='123456',
    database='mytest1',
    charset='utf8')
# 配置项

conn.begin()
#开启一次事务

cs = conn.cursor()
# 定义光标 :类似鼠标的作用,光标在哪个位置就在哪一行执行操作
cs.execute('''insert into test (username,password) value  ('name1','pass1');''')
conn.commit()
#提交事务,运行上面的插入操作,想要执行必须要有commit参数
cs.close()
conn.close()
# 查询操作

cs = conn.cursor()
cs.execute(''' select * from test;''')
ret = cs.fetchall()
print(ret)
ret = cs.fetchmany(2)
# 返回2条
ret = cs.fetchone()
#只取一条

整合SQL语句

将SQL语句代码整合到一个文件夹下并且分类为基础数据、日报、周报。

# 得到日报周报基础数据并做日期填充

import os
filedir =os.path.join('edu_online_project','项目','SQL','基础')
# print(filedir)

filenames = os.listdir(filedir)#获取文件夹下所有文件名
# print(filenames)
# 使用循环遍历获取到整个sql表,然后append添加到sqls列表中
sqls = []
for filename in filenames:
    filepath = os.path.join(filedir,filename)
    print(filepath)

    with open(filepath,'r',encoding = "utf-8") as f:
        sql = f.read()
        sqls.append(sql)
print(sqls)

如何填充日期,使用格式化方法,format。

sql = "select * from xxxx where dates ='{v}'"
ret = sql.format(v = '2021/11/19',v2='123' )
print(ret)

此时已经调试成为可以自动根据当天时间填充的SQL语句模式。

获取文件路径的那一行代码试了很多次,最后试出来需要把完整的路径都写出来才可以。

filedir =os.path.join(‘C:\‘,’Users’,’zhang’,’PycharmProjects’,’edu_online_project’,’项目’,’SQL’,’基础’)

# 得到日报周报基础数据并做日期填充
# C:\Users\zhang\PycharmProjects\edu_online_project\项目\SQL

import datetime
import os
# print(filedir)
last_date = datetime.datetime.now() - datetime.timedelta(days=1)
date = last_date.strftime('%Y-%m-%d')
date_14 = (last_date- datetime.timedelta(days=14)).strftime('%Y-%m-%d')
date_30 = (last_date- datetime.timedelta(days=30)).strftime('%Y-%m-%d')

filedir =os.path.join('C:\\','Users','zhang','PycharmProjects','edu_online_project','项目','SQL','基础')
filenames = os.listdir(filedir)#获取文件夹下所有文件名
# print(filenames)


# 使用循环遍历获取到整个sql表,然后append添加到sql列表中
sqls = []
for filename in filenames:
    filepath = os.path.join(filedir,filename)
    print(filepath)

    with open(filepath,'r',encoding = "utf-8") as f:
        sql = f.read().format(  #使用format传递日期
            date=date,
            date_14 = date_14,
            date_30= date_30
        )
        print(sql)
        sqls.append(sql)

下一步进行代码的封装:
这一步实现的功能,通过构建函数能够获取到不同文件夹下的SQL语句,相应的函数为def get_sqls(filedir):和def get_base_sql():

import datetime
import os

def get_sqls(filedir):
    # 日期相关
    last_date = datetime.datetime.now() - datetime.timedelta(days=1)
    date = last_date.strftime('%Y-%m-%d')
    date_14 = (last_date - datetime.timedelta(days=14)).strftime('%Y-%m-%d')
    date_30 = (last_date - datetime.timedelta(days=30)).strftime('%Y-%m-%d')
    #文件相关
    filenames = os.listdir(filedir)
    # 获取文件夹下所有文件名

    # 使用循环遍历获取到整个sql表,然后append添加到sql列表中
    sqls = []
    for filename in filenames:
        filepath = os.path.join(filedir , filename)
        print(filepath)

        with open(filepath , 'r' , encoding="utf-8") as f:
            sql = f.read().format(  # 使用format传递日期
                date=date ,
                date_14=date_14 ,
                date_30=date_30
            )
            print(sql)
            sqls.append(sql)
    return sqls

def get_base_sql():
    file_dir = os.path.join('C:\\','Users','zhang','PycharmProjects','edu_online_project','项目','SQL','基础')
    return get_sqls(file_dir)

def get_daily_sql():

    pass

def get_weekly_sql():
    pass



if __name__ == '__main__':
    base_sqls = get_base_sql()
    for base_sql in base_sqls:
        print(base_sqls)

SQL语句编写

做到这里的时候发现Navicat里的表数据有点不对,在之前解读项目目标的时候没有执行完步骤, 于是按照《方式二》的步骤重新录入了一遍数据,然后连接pycharm。

sql语句编写有个问题,需要填充

       current.total_paying                as total_paying_num,
       if(
               last_2.total_paying = 0,
               last_1.total_paying,
               (last_1.total_paying - last_2.total_paying) / last_2.total_paying
           )                               as total_paying_rate_day,

       if(
               last_7.total_paying = 0,
               current.total_paying,
               (current.total_paying - last_7.total_paying) / last_7.total_paying
           )                               as total_paying_rate_week,

       current.training_camp               as training_camp_num,
       if(
               last_2.training_camp = 0,
               last_1.training_camp,
               (last_1.training_camp - last_2.training_camp) / last_2.training_camp
           )                               as training_camp_rate_day,

       if(
               last_7.training_camp = 0,
               current.training_camp,
               (current.training_camp - last_7.training_camp) / last_7.training_camp
           )                               as training_camp_rate_week,

       current.special_column              as special_column_num,
       if(
               last_2.special_column = 0,
               last_1.special_column,
               (last_1.special_column - last_2.special_column) / last_2.special_column
           )                               as special_column_rate_day,

       if(
               last_7.special_column = 0,
               current.special_column,
               (current.special_column - last_7.special_column) / last_7.special_column
           )                               as special_column_rate_week,

       current.vip_member                  as vip_member_num,
       if(
               last_2.vip_member = 0,
               last_1.vip_member,
               (last_1.vip_member - last_2.vip_member) / last_2.vip_member
           )                               as vip_member_rate_day,

       if(
               last_7.vip_member = 0,
               current.vip_member,
               (current.vip_member - last_7.vip_member) / last_7.vip_member
           )                               as vip_member_rate_week,

       current.activate_user               as activate_user_num,
       if(
               last_2.activate_user = 0,
               last_1.activate_user,
               (last_1.activate_user - last_2.activate_user) / last_2.activate_user
           )                               as activate_user_rate_day,

       if(
               last_7.activate_user = 0,
               current.activate_user,
               (current.activate_user - last_7.activate_user) / last_7.activate_user
           )                               as activate_user_rate_week,

       current.dau                         as dau_num,
       if(
               last_2.dau = 0,
               last_1.dau,
               (last_1.dau - last_2.dau) / last_2.dau
           )                               as dau_rate_day,

       if(
               last_7.dau = 0,
               current.dau,
               (current.dau - last_7.dau) / last_7.dau
           )                               as dau_rate_week,

       current.customer_price              as customer_price_num,
       if(
               last_2.customer_price = 0,
               last_1.customer_price,
               (last_1.customer_price - last_2.customer_price) / last_2.customer_price
           )                               as customer_price_rate_day,

       if(
               last_7.customer_price = 0,
               current.customer_price,
               (current.customer_price - last_7.customer_price) / last_7.customer_price
           )                               as customer_price_rate_week,

       current.amount                      as amount_num,
       if(
               last_2.amount = 0,
               last_1.amount,
               (last_1.amount - last_2.amount) / last_2.amount
           )                               as amount_rate_day,

       if(
               last_7.amount = 0,
               current.amount,
               (current.amount - last_7.amount) / last_7.amount
           )                               as amount_rate_week

在写这一块的代码的时候跟着老师的敲是没问题的,可是在最后执行的时候频繁报红色波浪线的错误,training_camp这个字段的拼写有点问题,通过删除py代码生成的字段,再用代码补全的方式查看到了正确的代码是什么,“traininug_camp”,ing多了个inug。

日报:

use edu_company;

with p as
         (select 日期                          as date, -- ifnull语法:如果没有这个指标的挂不允许为nan,让其显示为0方便计算。
                 ifnull(总付费人数, 0)            as 'total_paying',
                 ifnull(付费人数_训练营, 0)         as 'traininug_camp',
                 ifnull(付费人数_专栏, 0)          as 'special_column',
                 ifnull(付费人数_会员, 0)          as 'vip_member',
                 ifnull(激活人数, 0)             as 'activate_user',
                 ifnull(DAU, 0)              as 'dau',
                 ifnull(客单价, 0)              as 'customer_price',
                 ifnull(总付费金额, 0)            as 'amount',
                 ifnull(新用户次日留存率, 0)         as 'new_customer_remain_day',
                 ifnull(新用户7日留存率, 0)         as 'new_customer_remain_week',
                 ifnull(活跃用户次日留存率, 0)        as 'active_customer_remain_day',
                 ifnull(活跃用户7日留存率, 0)        as 'active_customer_remain_week',
                 ifnull(浏览商详人数_训练营, 0)       as 'product_details',
                 ifnull(`预约-付费转化率_训练营`, 0)   as 'reservation',
                 ifnull(`浏览商详-预约转化率_训练营`, 0) as 'product_details_rate',
                 ifnull(`预约-付费转化率_训练营`, 0)   as 'reservation_rate'
          from 日报
          order by 日期 desc -- 降序
          limit 30)
select current.date           as date,
       current.total_paying                as total_paying_num,
       if(
               last_2.total_paying = 0,
               last_1.total_paying,
               (last_1.total_paying - last_2.total_paying) / last_2.total_paying
           )                               as total_paying_rate_day,

       if(
               last_7.total_paying = 0,
               current.total_paying,
               (current.total_paying - last_7.total_paying) / last_7.total_paying
           )                               as total_paying_rate_week,

       current.traininug_camp               as training_camp_num,
       if(
               last_2.traininug_camp = 0,
               last_1.traininug_camp,
               (last_1.traininug_camp - last_2.traininug_camp) / last_2.traininug_camp
           )                               as training_camp_rate_day,

       if(
               last_7.traininug_camp = 0,
               current.traininug_camp,
               (current.traininug_camp - last_7.traininug_camp) / last_7.traininug_camp
           )                               as training_camp_rate_week,

       current.special_column              as special_column_num,
       if(
               last_2.special_column = 0,
               last_1.special_column,
               (last_1.special_column - last_2.special_column) / last_2.special_column
           )                               as special_column_rate_day,

       if(
               last_7.special_column = 0,
               current.special_column,
               (current.special_column - last_7.special_column) / last_7.special_column
           )                               as special_column_rate_week,

       current.vip_member                  as vip_member_num,
       if(
               last_2.vip_member = 0,
               last_1.vip_member,
               (last_1.vip_member - last_2.vip_member) / last_2.vip_member
           )                               as vip_member_rate_day,

       if(
               last_7.vip_member = 0,
               current.vip_member,
               (current.vip_member - last_7.vip_member) / last_7.vip_member
           )                               as vip_member_rate_week,

       current.activate_user               as activate_user_num,
       if(
               last_2.activate_user = 0,
               last_1.activate_user,
               (last_1.activate_user - last_2.activate_user) / last_2.activate_user
           )                               as activate_user_rate_day,

       if(
               last_7.activate_user = 0,
               current.activate_user,
               (current.activate_user - last_7.activate_user) / last_7.activate_user
           )                               as activate_user_rate_week,

       current.dau                         as dau_num,
       if(
               last_2.dau = 0,
               last_1.dau,
               (last_1.dau - last_2.dau) / last_2.dau
           )                               as dau_rate_day,

       if(
               last_7.dau = 0,
               current.dau,
               (current.dau - last_7.dau) / last_7.dau
           )                               as dau_rate_week,

       current.customer_price              as customer_price_num,
       if(
               last_2.customer_price = 0,
               last_1.customer_price,
               (last_1.customer_price - last_2.customer_price) / last_2.customer_price
           )                               as customer_price_rate_day,

       if(
               last_7.customer_price = 0,
               current.customer_price,
               (current.customer_price - last_7.customer_price) / last_7.customer_price
           )                               as customer_price_rate_week,

       current.amount                      as amount_num,
       if(
               last_2.amount = 0,
               last_1.amount,
               (last_1.amount - last_2.amount) / last_2.amount
           )                               as amount_rate_day,

       if(
               last_7.amount = 0,
               current.amount,
               (current.amount - last_7.amount) / last_7.amount
           )                               as amount_rate_week,

       current.new_customer_remain_day     as new_customer_remain_day_data,
       f1.new_customer_remain_day_avg      as new_customer_remain_day_avg,

       current.new_customer_remain_week    as new_customer_remain_week_data,
       f1.new_customer_remain_week_avg     as new_customer_remain_week_avg,

       current.active_customer_remain_day  as active_customer_remain_day_data,
       f1.active_customer_remain_day_avg   as active_customer_remain_day_avg,

       current.active_customer_remain_week as active_customer_remain_week_data,
       f1.active_customer_remain_week_avg  as active_customer_remain_week_avg,

       current.product_details             as product_details_data,
       f1.product_details_avg              as product_details_avg,

       current.reservation                 as reservation_data,
       f1.reservation_avg                  as reservation_avg,

       current.product_details_rate        as product_details_rate_data,
       f1.product_details_rate_avg         as product_details_rate_avg,

       current.reservation_rate            as reservation_rate_data,
       f1.reservation_rate_avg             as reservation_rate_avg

from (select * from p limit 1) current
         left join p last_1 on datediff(current.date, last_1.date) = 1
         left join p last_2 on datediff(current.date, last_2.date) = 2
         left join p last_7 on datediff(current.date, last_7.date) = 7

         left join (select avg(new_customer_remain_day)     as new_customer_remain_day_avg,
                           avg(new_customer_remain_week)    as new_customer_remain_week_avg,
                           avg(active_customer_remain_day)  as active_customer_remain_day_avg,
                           avg(active_customer_remain_week) as active_customer_remain_week_avg,
                           avg(product_details)             as product_details_avg,
                           avg(reservation)                 as reservation_avg,
                           avg(product_details_rate)        as product_details_rate_avg,
                           avg(reservation_rate)            as reservation_rate_avg
                    from p) f1 on 1 = 1;

周报

use edu_company;

with p as
         (select 当周周一                          as date, -- ifnull语法:如果没有这个指标的挂不允许为nan,让其显示为0方便计算。
                 ifnull(总付费人数, 0)            as 'total_paying',
                 ifnull(付费人数_训练营, 0)         as 'traininug_camp',
                 ifnull(付费人数_专栏, 0)          as 'special_column',
                 ifnull(付费人数_会员, 0)          as 'vip_member',
                 ifnull(激活人数, 0)             as 'activate_user',
                 ifnull(DAU, 0)              as 'dau',
                 ifnull(客单价, 0)              as 'customer_price',
                 ifnull(总付费金额, 0)            as 'amount',
                 ifnull(新用户次日留存率, 0)         as 'new_customer_remain_day',
                 ifnull(新用户7日留存率, 0)         as 'new_customer_remain_week',
                 ifnull(活跃用户次日留存率, 0)        as 'active_customer_remain_day',
                 ifnull(活跃用户7日留存率, 0)        as 'active_customer_remain_week',
                 ifnull(浏览商详人数_训练营, 0)       as 'product_details',
                 ifnull(`预约-付费转化率_训练营`, 0)   as 'reservation',
                 ifnull(`浏览商详-预约转化率_训练营`, 0) as 'product_details_rate',
                 ifnull(`预约-付费转化率_训练营`, 0)   as 'reservation_rate'
          from 周报
          order by 当周周一 desc -- 降序
          limit 30)
select current.date           as date,
       current.total_paying                as total_paying_num,
       if(
               last_2.total_paying = 0,
               last_1.total_paying,
               (last_1.total_paying - last_2.total_paying) / last_2.total_paying
           )                               as total_paying_rate_day,

       if(
               last_7.total_paying = 0,
               current.total_paying,
               (current.total_paying - last_7.total_paying) / last_7.total_paying
           )                               as total_paying_rate_week,

       current.traininug_camp               as training_camp_num,
       if(
               last_2.traininug_camp = 0,
               last_1.traininug_camp,
               (last_1.traininug_camp - last_2.traininug_camp) / last_2.traininug_camp
           )                               as training_camp_rate_day,

       if(
               last_7.traininug_camp = 0,
               current.traininug_camp,
               (current.traininug_camp - last_7.traininug_camp) / last_7.traininug_camp
           )                               as training_camp_rate_week,

       current.special_column              as special_column_num,
       if(
               last_2.special_column = 0,
               last_1.special_column,
               (last_1.special_column - last_2.special_column) / last_2.special_column
           )                               as special_column_rate_day,

       if(
               last_7.special_column = 0,
               current.special_column,
               (current.special_column - last_7.special_column) / last_7.special_column
           )                               as special_column_rate_week,

       current.vip_member                  as vip_member_num,
       if(
               last_2.vip_member = 0,
               last_1.vip_member,
               (last_1.vip_member - last_2.vip_member) / last_2.vip_member
           )                               as vip_member_rate_day,

       if(
               last_7.vip_member = 0,
               current.vip_member,
               (current.vip_member - last_7.vip_member) / last_7.vip_member
           )                               as vip_member_rate_week,

       current.activate_user               as activate_user_num,
       if(
               last_2.activate_user = 0,
               last_1.activate_user,
               (last_1.activate_user - last_2.activate_user) / last_2.activate_user
           )                               as activate_user_rate_day,

       if(
               last_7.activate_user = 0,
               current.activate_user,
               (current.activate_user - last_7.activate_user) / last_7.activate_user
           )                               as activate_user_rate_week,

       current.dau                         as dau_num,
       if(
               last_2.dau = 0,
               last_1.dau,
               (last_1.dau - last_2.dau) / last_2.dau
           )                               as dau_rate_day,

       if(
               last_7.dau = 0,
               current.dau,
               (current.dau - last_7.dau) / last_7.dau
           )                               as dau_rate_week,

       current.customer_price              as customer_price_num,
       if(
               last_2.customer_price = 0,
               last_1.customer_price,
               (last_1.customer_price - last_2.customer_price) / last_2.customer_price
           )                               as customer_price_rate_day,

       if(
               last_7.customer_price = 0,
               current.customer_price,
               (current.customer_price - last_7.customer_price) / last_7.customer_price
           )                               as customer_price_rate_week,

       current.amount                      as amount_num,
       if(
               last_2.amount = 0,
               last_1.amount,
               (last_1.amount - last_2.amount) / last_2.amount
           )                               as amount_rate_day,

       if(
               last_7.amount = 0,
               current.amount,
               (current.amount - last_7.amount) / last_7.amount
           )                               as amount_rate_week,

       current.new_customer_remain_day     as new_customer_remain_day_data,
       f1.new_customer_remain_day_avg      as new_customer_remain_day_avg,

       current.new_customer_remain_week    as new_customer_remain_week_data,
       f1.new_customer_remain_week_avg     as new_customer_remain_week_avg,

       current.active_customer_remain_day  as active_customer_remain_day_data,
       f1.active_customer_remain_day_avg   as active_customer_remain_day_avg,

       current.active_customer_remain_week as active_customer_remain_week_data,
       f1.active_customer_remain_week_avg  as active_customer_remain_week_avg,

       current.product_details             as product_details_data,
       f1.product_details_avg              as product_details_avg,

       current.reservation                 as reservation_data,
       f1.reservation_avg                  as reservation_avg,

       current.product_details_rate        as product_details_rate_data,
       f1.product_details_rate_avg         as product_details_rate_avg,

       current.reservation_rate            as reservation_rate_data,
       f1.reservation_rate_avg             as reservation_rate_avg

from (select * from p limit 1) current
         left join p last_1 on datediff(current.date, last_1.date) = 1
         left join p last_2 on datediff(current.date, last_2.date) = 2
         left join p last_7 on datediff(current.date, last_7.date) = 7

         left join (select avg(new_customer_remain_day)     as new_customer_remain_day_avg,
                           avg(new_customer_remain_week)    as new_customer_remain_week_avg,
                           avg(active_customer_remain_day)  as active_customer_remain_day_avg,
                           avg(active_customer_remain_week) as active_customer_remain_week_avg,
                           avg(product_details)             as product_details_avg,
                           avg(reservation)                 as reservation_avg,
                           avg(product_details_rate)        as product_details_rate_avg,
                           avg(reservation_rate)            as reservation_rate_avg
                    from p) f1 on 1 = 1;

pandas运行SQL

# 读取SQL
sql = {}
with open(os.path.join('sql','daily_board.sql'),'r',encoding = "utf-8")as f:
    sql['daily_board'] = f.read()

if __name__ == '__main__':
    print(sql)

interface.py文件

这个文件起到读取四个SQL文件并输出数据到数据规范表中所示的对应变量中去。

import os
import pandas as pd
from sqlalchemy import create_engine
import json
import datetime
engine = create_engine('mysql+pymysql://root:123456@localhost:3306/edu_company?charset=utf8')

常规的导入包,链接MySQL数据库。

# 读取SQL
sql = {}
with open(
        os.path.join('C:\\' , 'Users' , 'zhang' , 'PycharmProjects' , 'edu_online_project' , '2.数据接口' , '课上代码' , 'sql' ,
                     'daily_board.sql') , 'r' , encoding="utf-8")as f:
    sql['daily_board'] = f.read()
with open(
        os.path.join('C:\\' , 'Users' , 'zhang' , 'PycharmProjects' , 'edu_online_project' , '2.数据接口' , '课上代码' , 'sql' ,
                     'daily_chart.sql') , 'r' , encoding="utf-8")as f:
    sql['daily_chart'] = f.read()
with open(
        os.path.join('C:\\' , 'Users' , 'zhang' , 'PycharmProjects' , 'edu_online_project' , '2.数据接口' , '课上代码' , 'sql' ,
                     'weekly_board.sql') , 'r' , encoding="utf-8")as f:
    sql['weekly_board'] = f.read()
with open(
        os.path.join('C:\\' , 'Users' , 'zhang' , 'PycharmProjects' , 'edu_online_project' , '2.数据接口' , '课上代码' , 'sql' ,
                     'weekly_chart.sql') , 'r' , encoding="utf-8")as f:
    sql['weekly_chart'] = f.read()

读取四个SQL文件,os.path路径的这个问题是经过多次调试后得到的结果,要写出完整的绝对路径才能够不报错’,pycharm里设置当前文件夹为源根也不能解决;并且要加上encoding=’utf-8’。

weekday_transfer = {
    0: '星期一',
    1: '星期二',
    2: '星期三',
    3: '星期四',
    4: '星期五',
    5: '星期六',
    6: '星期日',
}

构造出daily_interface函数中日期的显示方式。