1、导入包 和配置

import datetime
import pymysql
import xlwt

# 日志文件所在目录
LOG_DIR = './'
# 日志文件名称(支持模糊匹配 如配置 log.log 也会将log.log20220304的匹配进去)
LOG_NAME = 'mesg.log'
# 统计几天前的日志(按日志的创建时间)
INTERVAL = 30
# mysql表名称
MYSQL_TABLE = 'error_log_info_test'
# 表格输出路径 空字符串表示不输出
EXPORT_PATH = './test.xls'
# 系统名称字段
SYSTEM_NAME = '租用环境'
# 错误日志关键字
KEYWORD = "ERROR"
# 错误日志过滤关键字
KEYWORD_FILTER = "mms"

2 、sql脚本

CREATE TABLE `error_log_info_test`  (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `line_num` int(11) NULL DEFAULT NULL,
  `api_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `error_info` varchar(15000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `error_dir` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `system_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  `date_time` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 9617 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

3、完整代码

import datetime
import pymysql
import xlwt

# 日志文件所在目录
LOG_DIR = './'
# 日志文件名称(支持模糊匹配 如配置 log.log 也会将log.log20220304的匹配进去)
LOG_NAME = 'mesg.log'
# 统计几天前的日志(按日志的创建时间)
INTERVAL = 30

# LOG_LIST = ['./mesg.log']
# mysql表名称
MYSQL_TABLE = 'error_log_info_test'
# 表格输出路径 空字符串表示不输出
EXPORT_PATH = './test.xls'
# 系统名称字段
SYSTEM_NAME = '租用环境'
# 错误日志关键字
KEYWORD = "ERROR"
# 错误日志过滤关键字
KEYWORD_FILTER = "mms"


class LogExport(object):
    def __init__(self, log_dir, export_path, log_name):
        self.log_dir = log_dir
        self.export_path = export_path
        self.log_name = log_name
        self.keyword = KEYWORD

    @classmethod
    def read_log(cls, dirs, keyword, type=None):
        result_list = []
        with open(dirs, mode="r", encoding="utf-8", errors="ignore") as f:
            a = 0
            c = 0
            for i in f:
                a += 1
                if keyword in i and KEYWORD_FILTER in i:
                    # s = i.find("mms")
                    # print(s)
                    # e = i.index(":", s)
                    # print(e)
                    api_name = i[40: 60]
                    p = i.find(keyword)
                    result = i[p:-1] + f.read(2000)
                    # print(result)
                    data_time = i[p - 24: p - 1]
                    result_list.append([a, api_name, result, dirs, SYSTEM_NAME, data_time])
                    c += 1
                    if c == 100 and type == 1:
                        db = MsqlLink(result_list)
                        db.handle_sql()
                        c = 0
                        result_list = []
            if c != 0 and type == 1:
                db = MsqlLink(result_list)
                db.handle_sql()
                print("全部提交成功!")

        return result_list

    def export_excel(self):
        if self.export_path != "" or str(self.export_path).replace(" ", "") != "":
            new_book = xlwt.Workbook(encoding='utf-8', style_compression=0)
            sheet1 = new_book.add_sheet("sheet1", cell_overwrite_ok=True)
            error_log = self.read_log(self.log_dir, self.keyword)
            a = 0
            for index, value in enumerate(error_log):
                # print(value)
                for ind, va in enumerate(value):
                    sheet1.write(a, ind, va)
                a += 1
            new_book.save(self.export_path)

    def insert_mysql(self):
        self.read_log(self.log_dir, self.keyword, type=1)


class MsqlLink(object):
    def __init__(self, sql_list: list):
        # print(data,level)
        # 打开数据库连接
        self.sql_list = sql_list
        self.db = pymysql.connect(host='47.99.163.204', user='abc', port=8112,
                                  password='abc123.', database="test_01",
                                  charset='utf8mb4')

    def insert_data(self, sql):
        try:
            insert = self.db.cursor()
            msg = insert.execute(sql)
            print("执行结果:", msg, sql)
            self.db.commit()
        except BaseException as er:
            print("执行错误 0:", sql, er)

    def close_sql(self):
        self.db.close()
        print("连接已经关闭")

    def handle_sql(self):
        try:
            sql = f"INSERT INTO {MYSQL_TABLE}(line_num,api_name,error_info,error_dir,system_name,date_time)   VALUES (%s, %s,%s, %s,%s, %s)"
            data = []
            for index, value in enumerate(self.sql_list):
                data.append(
                    (value[0], value[1], value[2], str(value[3]).encode('utf-8', errors='ignore'), value[4], value[5]))
            insert = self.db.cursor()
            aa = tuple(data)
            insert.executemany(sql, aa)
            self.db.commit()
            return True
        except Exception as e:
            raise e


if __name__ == '__main__':
    import os, re
    print("正在启动采集错误日志程序......")
    for file_name in os.listdir(LOG_DIR):
        file_path = os.path.join(LOG_DIR, file_name)
        if os.path.isfile(file_path) and re.search(LOG_NAME, str(file_name)):
            times = os.path.getmtime(file_path)
            update_time = datetime.datetime.fromtimestamp(times)
            print(update_time)
            system_time = datetime.datetime.now()
            interval = (system_time - update_time).days
            if interval <= INTERVAL:
                print(f"正在对{file_path}日志进行采集......")
                log = LogExport(log_dir=file_path, log_name=None, export_path=file_name)
                log.export_excel()
                log.insert_mysql()
                print(f"{file_path}日志进行采集提交成功!")

4、实现效果

在这里插入图片描述