python3数据处理脚本模板
# python3数据处理脚本模板
本文给出一些数据处理脚本的模板。平时在我们上线新的功能时,可能会涉及到处理历史数据,那么一般情况下,建议使用python脚本来处理,原因是linux服务器默认内置python环境,不需要为处理脚本额外搭建环境,另外运维人员也比较熟悉python脚本,所以运维人员也可以把控到风险。向运维提交的数据处理脚本,优先考虑sql语句,其次考虑python脚本。
# 1. 示例1: 生成sql插入语句
本示例实现的数据处理需求:将已关注公众号、且没有入库的用户,保存到数据库中(实际上是生成insert语句)。
本示例用到的知识点:
- 数据库读取
借助pymysql。 - 请求接口
借助requests,可以自动将接口返回的数据转换为json。 - snowflake
生成snowflake ID。 - 日志打印
封装了一个通用的日志记录器。 - 读取命令行参数
- 变量值注入到字符串
借助python3的字符串格式化语法:f''
。
该示例之所以生成了sql语句,而没有直接插入到数据库中。原因有3点:
- 给sql的方式沟通更简单高效,也利于一旦出错时的sql回滚
- 运行该脚本,还需要启动一个snowflake服务,运维还得多一步操作
# This is a sample Python script.
# Press ⌃R to execute it or replace it with your code.
# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
import datetime
import logging
import sys
import requests
import pymysql
import snowflake.client
# 全局变量
logger: logging.Logger = None
mysql_connect: pymysql.connections.Connection = None
mysql_host = None
mysql_user = None
mysql_pwd = None
mysql_port = None
mysql_db = None
# 初始化日志记录器
def init_logger():
global logger
logger = logging.getLogger("data_process")
logger.setLevel(logging.DEBUG)
# 建立一个filehandler来把日志记录在文件里,级别为debug以上
fh = logging.FileHandler("data_process.log")
fh.setLevel(logging.DEBUG)
# 建立一个streamhandler来把日志打在CMD窗口上,级别为debug以上
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# 设置日志格式
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# 将相应的handler添加在logger对象中
logger.addHandler(ch)
logger.addHandler(fh)
# 初始化mysql连接
def init_mysql_connect():
global mysql_host, mysql_user, mysql_pwd, mysql_port, mysql_db, mysql_connect
mysql_connect = pymysql.connect(host=mysql_host, user=mysql_user, password=mysql_pwd, port=mysql_port,
database=mysql_db)
# 请求微信服务号的关注用户的openid列表
def request_fuwuhao_openids(access_token):
api = f'https://api.weixin.qq.com/cgi-bin/user/get?access_token={access_token}'
wx_openids = requests.get(api).json()
logger.info("请求微信服务号的关注用户的openid列表, 总数:%s", wx_openids['total'])
return wx_openids["data"]["openid"]
# 请求服务号的用户列表
def request_new_fuwuhao_users():
global mysql_connect, access_token
weixin_users = []
openids = request_fuwuhao_openids(access_token)
index = 0
for openid in openids:
logger.info("请求服务号的用户列表, 当前进度:%s, 总数: %s", index, len(openids))
index = index + 1
# 若当前openid在数据库中不存在,则补充到weixin_users数组
if get_fuwuhao_user_from_db(openid) is None:
logger.info("请求服务号的用户列表, 当前openid在数据库中不存在: %s", openid)
weixin_user = request_weixin_user(openid)
weixin_users.append(weixin_user)
else:
logging.info("请求服务号的用户列表, 当前openid在数据库中已存在: %s", openid)
logger.info("请求服务号的用户列表, 完成, total: %s, count_will_process:%s", len(openids), len(weixin_users))
return weixin_users
# 请求某微信用户的信息
def request_weixin_user(openid):
global access_token
api = f'https://api.weixin.qq.com/cgi-bin/user/info?lang=zh_CN&access_token={access_token}&openid={openid}'
wx_user = requests.get(api).json()
logger.info("请求某微信用户的信息: %s", wx_user)
return wx_user
# 从数据库中查询某服务号用户
def get_fuwuhao_user_from_db(openid):
global mysql_connect
cursor = mysql_connect.cursor()
sql = f"select 1 from fuwuhao_users where openid = '{openid}'"
cursor.execute(sql)
return cursor.fetchone()
# 生成插入sql
def generate_insert_sql():
global mysql_connect, access_token
# 请求服务号关注用户列表
wx_users = request_new_fuwuhao_users()
# 生成sql
sql = "INSERT INTO fuwuhao_users (id, is_subscribe, openid, lang, subscribe_time, unionid, remark, groupid, tagids, subscribe_scene, qr_scene, qr_scene_str, created_at, updated_at, deleted_at) VALUES "
cur_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
index = 0
for elem in wx_users:
sql = sql + f"({generate_snowflake_id()}, '{elem['subscribe']}', '{elem['openid']}', '{elem['language']}', {elem['subscribe_time']}, '{elem['unionid']}', '{elem['remark']}', {elem['groupid']}, '{elem['tagid_list']}', '{elem['subscribe_scene']}', {elem['qr_scene']}, '{elem['qr_scene_str']}', '{cur_time}', '{cur_time}', null)"
if index != len(wx_users) - 1:
sql = sql + ","
index = index + 1
return sql
# 生成一个snowflake id
# python3生成snowflake的步骤:
# 1. pip3 install pysnowflake
# 2. 运行snowflake_start_server
# 3. 代码中调用snowflake.client.get_guid()获取一个snowflake ID
def generate_snowflake_id():
# 配置snowflake服务的主机配置,若不配置, 则默认使用localhost:8910
snowflake_server_host = 'localhost'
snowflake_server_port = 8910
snowflake.client.setup(snowflake_server_host, snowflake_server_port)
return snowflake.client.get_guid()
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
# 从命令行读取参数
# 程序运行示例:python3 ./main.py weixin_api_access_token database_1 127.0.0.1 root 123456 3306
cmd_args = sys.argv
access_token = cmd_args[1]
mysql_db = cmd_args[2]
mysql_host = cmd_args[3]
mysql_user = cmd_args[4]
mysql_pwd = cmd_args[5]
mysql_port = int(cmd_args[6])
# 初始化
init_logger()
init_mysql_connect()
# 生成数据库表插入语句
sql = generate_insert_sql()
logger.info("main, 生成sql语句完成: %s", sql)
上次更新: 2022-01-06 19:43:12