# -*- coding: utf-8 -*-
import logging
import sys
import inspect
from functools import wraps
from datetime import datetime
# Python 2兼容性处理
try :
from collections import OrderedDict
except ImportError :
from ordereddict import OrderedDict
class OptimizedFormatter( logging .Formatter ) :
def __init__ ( self , *args, **kwargs) :
logging .Formatter .__init__ ( self , *args, **kwargs)
self ._frame_blacklist = set ( [ id ( inspect .currentframe ( ) ) ] )
def format( self , record) :
try :
# 自动捕获调用信息
self ._inject_frame_info( record)
# 参数格式化
args_str = self ._format_args( record)
if args_str:
record.msg = u"%s ⎸ %s" % ( record.msg , args_str)
# 时间格式化优化
record.asctime = self ._format_time( record)
return logging .Formatter .format ( self , record)
except Exception as e:
return u"⚠️ FORMAT_ERROR | %s | Raw: %s" % ( e, record.msg )
def _inject_frame_info( self , record) :
"""注入调用帧信息"""
if not all ( [ hasattr ( record, a) for a in ( 'module' , 'lineno' , 'funcName' ) ] ) :
frame = self ._find_valid_frame( )
if frame:
record.module = self ._get_module_name( frame)
record.lineno = frame.f_lineno
record.funcName = frame.f_code .co_name
def _find_valid_frame( self ) :
"""查找有效调用帧(兼容Python 2)"""
stack = inspect .stack ( )
for frame_info in reversed ( stack) :
frame = frame_info[ 0 ]
if id ( frame) in self ._frame_blacklist:
continue
if self ._is_logging_frame( frame) :
continue
return frame
return inspect .currentframe ( )
def _is_logging_frame( self , frame) :
"""判断是否日志类自身帧"""
module = inspect .getmodule ( frame)
return module and module.__name__ == __name__
def _get_module_name( self , frame) :
"""安全获取模块名"""
try :
module = inspect .getmodule ( frame)
return module.__name__ if module else 'unknown'
except :
return 'unknown'
def _format_args( self , record) :
"""格式化参数"""
parts = [ ]
# 位置参数
if getattr ( record, 'positional_args' , None ) :
parts.extend ( self ._safe_repr( a) for a in record.positional_args )
# 关键字参数
if getattr ( record, 'extra_kwargs' , None ) :
parts.extend ( u"%s=%s" % ( k, self ._safe_repr( v) )
for k, v in record.extra_kwargs .items ( ) )
return u" ⎸ " .join ( parts) if parts else u""
def _safe_repr( self , value) :
"""安全类型转换(兼容Python 2)"""
try :
if isinstance ( value, dict ) :
return u"{%s}" % u"," .join ( u"%s:%s" % ( self ._safe_repr( k) , self ._safe_repr( v) )
for k, v in value.iteritems ( ) )
if isinstance ( value, ( list , tuple ) ) :
brackets = u"[]" if isinstance ( value, list ) else u"()"
return u"%s%s%s" % ( brackets[ 0 ] , u"," .join ( self ._safe_repr( x) for x in value) , brackets[ 1 ] )
return self ._safe_str( value)
except :
return u"<UNREPRESENTABLE>"
def _safe_str( self , obj) :
"""安全字符串转换(处理Python 2的bytes/unicode)"""
try :
if isinstance ( obj, str ) :
try :
return obj.decode ( 'utf-8' )
except UnicodeDecodeError :
return obj.decode ( 'latin-1' , 'replace' )
if isinstance ( obj, unicode ) :
return obj
return unicode ( str ( obj) , 'utf-8' , errors= 'replace' )
except :
return u"<UNCONVERTIBLE>"
def _format_time( self , record) :
"""高性能时间格式化"""
return datetime .fromtimestamp ( record.created ) .strftime ( '%H:%M:%S' )
class SimpleLogger( object ) :
_initialized = False
@ classmethod
def _ensure_init( cls) :
if not cls._initialized:
root = logging .getLogger ( )
root.setLevel ( logging .DEBUG )
if not root.handlers :
handler = logging .StreamHandler ( )
handler.setFormatter ( OptimizedFormatter(
fmt= u'[%(asctime)s] [%(levelname)s] %(module)s:%(lineno)d ➤ %(funcName)s | %(message)s' ,
datefmt= None # 使用自定义时间格式化
) )
root.addHandler ( handler)
cls._initialized = True
@ classmethod
def _log( cls, level, msg, *args, **kwargs) :
cls._ensure_init( )
try :
# 获取调用上下文
frame = inspect .currentframe ( ) .f_back .f_back
module = inspect .getmodule ( frame) .__name__ if inspect .getmodule ( frame) else 'unknown'
logger = logging .getLogger ( module)
if not logger.handlers :
logger.propagate = True
# 创建LogRecord(兼容Python 2)
record = logger.makeRecord (
name= logger.name ,
level= level,
fn= None ,
lno= None ,
msg= msg,
args= ( ) ,
exc_info= None ,
extra= {
'positional_args' : args,
'extra_kwargs' : kwargs
} ,
func= inspect .getframeinfo ( frame) [ 2 ]
)
# 手动注入行号信息
record.lineno = frame.f_lineno
logger.handle ( record)
except Exception as e:
sys .stderr .write ( u"LOG_ERROR: %s\n " % e)
@ classmethod
def debug( cls, msg, *args, **kwargs) :
cls._log( logging .DEBUG , msg, *args, **kwargs)
@ classmethod
def info( cls, msg, *args, **kwargs) :
cls._log( logging .INFO , msg, *args, **kwargs)
@ classmethod
def warning( cls, msg, *args, **kwargs) :
cls._log( logging .WARNING , msg, *args, **kwargs)
@ classmethod
def error( cls, msg, *args, **kwargs) :
cls._log( logging .ERROR , msg, *args, **kwargs)
if __name__ == "__main__" :
# 测试中文日志
SimpleLogger.info ( u"用户登录" , u"张三" , ip= "192.168.1.100" )
# 测试混合参数
SimpleLogger.error ( "配置错误" , "database" , code = 500 , detail= OrderedDict( [ ( "line" , 42 ) , ( "file" , "app.conf" ) ] ) )
# 测试二进制数据
bad_data = '\x e6\x 97\x a0' # GBK编码的汉字"无"
SimpleLogger.warning ( b"invalid data" , bad_data, sector= [ 0x12 , 0xff , 0x7f ] )
# 测试嵌套参数
SimpleLogger.debug ( "调试信息" , { "key" : [ 1 , 2 , 3 ] } , timeout= 30.5 )
