# -*- coding: utf-8 -*-
import logging
import sys
import inspect
from functools import wraps
from datetime import datetime
# Python 2兼容性处理
try :
from collections import OrderedDict
except ImportError :
from ordereddict import OrderedDict
class OptimizedFormatter( logging .Formatter ) :
def __init__ ( self , *args, **kwargs) :
logging .Formatter .__init__ ( self , *args, **kwargs)
self ._frame_blacklist = set ( [ id ( inspect .currentframe ( ) ) ] )
def format( self , record) :
try :
# 自动捕获调用信息
self ._inject_frame_info( record)
# 参数格式化
args_str = self ._format_args( record)
if args_str:
record.msg = u"%s ⎸ %s" % ( record.msg , args_str)
# 时间格式化优化
record.asctime = self ._format_time( record)
return logging .Formatter .format ( self , record)
except Exception as e:
return u"⚠️ FORMAT_ERROR | %s | Raw: %s" % ( e, record.msg )
def _inject_frame_info( self , record) :
"""注入调用帧信息"""
if not all ( [ hasattr ( record, a) for a in ( 'module' , 'lineno' , 'funcName' ) ] ) :
frame = self ._find_valid_frame( )
if frame:
record.module = self ._get_module_name( frame)
record.lineno = frame.f_lineno
record.funcName = frame.f_code .co_name
def _find_valid_frame( self ) :
"""查找有效调用帧(兼容Python 2)"""
stack = inspect .stack ( )
for frame_info in reversed ( stack) :
frame = frame_info[ 0 ]
if id ( frame) in self ._frame_blacklist:
continue
if self ._is_logging_frame( frame) :
continue
return frame
return inspect .currentframe ( )
def _is_logging_frame( self , frame) :
"""判断是否日志类自身帧"""
module = inspect .getmodule ( frame)
return module and module.__name__ == __name__
def _get_module_name( self , frame) :
"""安全获取模块名"""
try :
module = inspect .getmodule ( frame)
return module.__name__ if module else 'unknown'
except :
return 'unknown'
def _format_args( self , record) :
"""格式化参数"""
parts = [ ]
# 位置参数
if getattr ( record, 'positional_args' , None ) :
parts.extend ( self ._safe_repr( a) for a in record.positional_args )
# 关键字参数
if getattr ( record, 'extra_kwargs' , None ) :
parts.extend ( u"%s=%s" % ( k, self ._safe_repr( v) )
for k, v in record.extra_kwargs .items ( ) )
return u" ⎸ " .join ( parts) if parts else u""
def _safe_repr( self , value) :
"""安全类型转换(兼容Python 2)"""
try :
if isinstance ( value, dict ) :
return u"{%s}" % u"," .join ( u"%s:%s" % ( self ._safe_repr( k) , self ._safe_repr( v) )
for k, v in value.iteritems ( ) )
if isinstance ( value, ( list , tuple ) ) :
brackets = u"[]" if isinstance ( value, list ) else u"()"
return u"%s%s%s" % ( brackets[ 0 ] , u"," .join ( self ._safe_repr( x) for x in value) , brackets[ 1 ] )
return self ._safe_str( value)
except :
return u"<UNREPRESENTABLE>"
def _safe_str( self , obj) :
"""安全字符串转换(处理Python 2的bytes/unicode)"""
try :
if isinstance ( obj, str ) :
try :
return obj.decode ( 'utf-8' )
except UnicodeDecodeError :
return obj.decode ( 'latin-1' , 'replace' )
if isinstance ( obj, unicode ) :
return obj
return unicode ( str ( obj) , 'utf-8' , errors= 'replace' )
except :
return u"<UNCONVERTIBLE>"
def _format_time( self , record) :
"""高性能时间格式化"""
return datetime .fromtimestamp ( record.created ) .strftime ( '%H:%M:%S' )
class SimpleLogger( object ) :
_initialized = False
@ classmethod
def _ensure_init( cls) :
if not cls._initialized:
root = logging .getLogger ( )
root.setLevel ( logging .DEBUG )
if not root.handlers :
handler = logging .StreamHandler ( )
handler.setFormatter ( OptimizedFormatter(
fmt= u'[%(asctime)s] [%(levelname)s] %(module)s:%(lineno)d ➤ %(funcName)s | %(message)s' ,
datefmt= None # 使用自定义时间格式化
) )
root.addHandler ( handler)
cls._initialized = True
@ classmethod
def _log( cls, level, msg, *args, **kwargs) :
cls._ensure_init( )
try :
# 获取调用上下文
frame = inspect .currentframe ( ) .f_back .f_back
module = inspect .getmodule ( frame) .__name__ if inspect .getmodule ( frame) else 'unknown'
logger = logging .getLogger ( module)
if not logger.handlers :
logger.propagate = True
# 创建LogRecord(兼容Python 2)
record = logger.makeRecord (
name= logger.name ,
level= level,
fn= None ,
lno= None ,
msg= msg,
args= ( ) ,
exc_info= None ,
extra= {
'positional_args' : args,
'extra_kwargs' : kwargs
} ,
func= inspect .getframeinfo ( frame) [ 2 ]
)
# 手动注入行号信息
record.lineno = frame.f_lineno
logger.handle ( record)
except Exception as e:
sys .stderr .write ( u"LOG_ERROR: %s\n " % e)
@ classmethod
def debug( cls, msg, *args, **kwargs) :
cls._log( logging .DEBUG , msg, *args, **kwargs)
@ classmethod
def info( cls, msg, *args, **kwargs) :
cls._log( logging .INFO , msg, *args, **kwargs)
@ classmethod
def warning( cls, msg, *args, **kwargs) :
cls._log( logging .WARNING , msg, *args, **kwargs)
@ classmethod
def error( cls, msg, *args, **kwargs) :
cls._log( logging .ERROR , msg, *args, **kwargs)
if __name__ == "__main__" :
# 测试中文日志
SimpleLogger.info ( u"用户登录" , u"张三" , ip= "192.168.1.100" )
# 测试混合参数
SimpleLogger.error ( "配置错误" , "database" , code = 500 , detail= OrderedDict( [ ( "line" , 42 ) , ( "file" , "app.conf" ) ] ) )
# 测试二进制数据
bad_data = '\x e6\x 97\x a0' # GBK编码的汉字"无"
SimpleLogger.warning ( b"invalid data" , bad_data, sector= [ 0x12 , 0xff , 0x7f ] )
# 测试嵌套参数
SimpleLogger.debug ( "调试信息" , { "key" : [ 1 , 2 , 3 ] } , timeout= 30.5 )
# -*- coding: utf-8 -*-
import logging
import sys
import inspect
from functools import wraps
from datetime import datetime

# Python 2兼容性处理
try:
    from collections import OrderedDict
except ImportError:
    from ordereddict import OrderedDict


class OptimizedFormatter(logging.Formatter):
    def __init__(self, *args, **kwargs):
        logging.Formatter.__init__(self, *args, **kwargs)
        self._frame_blacklist = set([id(inspect.currentframe())])

    def format(self, record):
        try:
            # 自动捕获调用信息
            self._inject_frame_info(record)
            
            # 参数格式化
            args_str = self._format_args(record)
            if args_str:
                record.msg = u"%s ⎸ %s" % (record.msg, args_str)
            
            # 时间格式化优化
            record.asctime = self._format_time(record)
            
            return logging.Formatter.format(self, record)
        except Exception as e:
            return u"⚠️ FORMAT_ERROR | %s | Raw: %s" % (e, record.msg)

    def _inject_frame_info(self, record):
        """注入调用帧信息"""
        if not all([hasattr(record, a) for a in ('module', 'lineno', 'funcName')]):
            frame = self._find_valid_frame()
            if frame:
                record.module = self._get_module_name(frame)
                record.lineno = frame.f_lineno
                record.funcName = frame.f_code.co_name

    def _find_valid_frame(self):
        """查找有效调用帧（兼容Python 2）"""
        stack = inspect.stack()
        for frame_info in reversed(stack):
            frame = frame_info[0]
            if id(frame) in self._frame_blacklist:
                continue
            if self._is_logging_frame(frame):
                continue
            return frame
        return inspect.currentframe()

    def _is_logging_frame(self, frame):
        """判断是否日志类自身帧"""
        module = inspect.getmodule(frame)
        return module and module.__name__ == __name__

    def _get_module_name(self, frame):
        """安全获取模块名"""
        try:
            module = inspect.getmodule(frame)
            return module.__name__ if module else 'unknown'
        except:
            return 'unknown'

    def _format_args(self, record):
        """格式化参数"""
        parts = []
        # 位置参数
        if getattr(record, 'positional_args', None):
            parts.extend(self._safe_repr(a) for a in record.positional_args)
        # 关键字参数
        if getattr(record, 'extra_kwargs', None):
            parts.extend(u"%s=%s" % (k, self._safe_repr(v)) 
                      for k, v in record.extra_kwargs.items())
        return u" ⎸ ".join(parts) if parts else u""

    def _safe_repr(self, value):
        """安全类型转换（兼容Python 2）"""
        try:
            if isinstance(value, dict):
                return u"{%s}" % u",".join(u"%s:%s" % (self._safe_repr(k), self._safe_repr(v)) 
                         for k, v in value.iteritems())
            if isinstance(value, (list, tuple)):
                brackets = u"[]" if isinstance(value, list) else u"()"
                return u"%s%s%s" % (brackets[0], u",".join(self._safe_repr(x) for x in value), brackets[1])
            return self._safe_str(value)
        except:
            return u"<UNREPRESENTABLE>"

    def _safe_str(self, obj):
        """安全字符串转换（处理Python 2的bytes/unicode）"""
        try:
            if isinstance(obj, str):
                try:
                    return obj.decode('utf-8')
                except UnicodeDecodeError:
                    return obj.decode('latin-1', 'replace')
            if isinstance(obj, unicode):
                return obj
            return unicode(str(obj), 'utf-8', errors='replace')
        except:
            return u"<UNCONVERTIBLE>"

    def _format_time(self, record):
        """高性能时间格式化"""
        return datetime.fromtimestamp(record.created).strftime('%H:%M:%S')


class SimpleLogger(object):
    _initialized = False

    @classmethod
    def _ensure_init(cls):
        if not cls._initialized:
            root = logging.getLogger()
            root.setLevel(logging.DEBUG)
            if not root.handlers:
                handler = logging.StreamHandler()
                handler.setFormatter(OptimizedFormatter(
                    fmt=u'[%(asctime)s] [%(levelname)s] %(module)s:%(lineno)d ➤ %(funcName)s | %(message)s',
                    datefmt=None  # 使用自定义时间格式化
                ))
                root.addHandler(handler)
            cls._initialized = True

    @classmethod
    def _log(cls, level, msg, *args, **kwargs):
        cls._ensure_init()
        try:
            # 获取调用上下文
            frame = inspect.currentframe().f_back.f_back
            module = inspect.getmodule(frame).__name__ if inspect.getmodule(frame) else 'unknown'

            logger = logging.getLogger(module)
            if not logger.handlers:
                logger.propagate = True

            # 创建LogRecord（兼容Python 2）
            record = logger.makeRecord(
                name=logger.name,
                level=level,
                fn=None,
                lno=None,
                msg=msg,
                args=(),
                exc_info=None,
                extra={
                    'positional_args': args,
                    'extra_kwargs': kwargs
                },
                func=inspect.getframeinfo(frame)[2]
            )

            # 手动注入行号信息
            record.lineno = frame.f_lineno
            
            logger.handle(record)
        except Exception as e:
            sys.stderr.write(u"LOG_ERROR: %s\n" % e)

    @classmethod
    def debug(cls, msg, *args, **kwargs):
        cls._log(logging.DEBUG, msg, *args, **kwargs)

    @classmethod
    def info(cls, msg, *args, **kwargs):
        cls._log(logging.INFO, msg, *args, **kwargs)

    @classmethod
    def warning(cls, msg, *args, **kwargs):
        cls._log(logging.WARNING, msg, *args, **kwargs)

    @classmethod
    def error(cls, msg, *args, **kwargs):
        cls._log(logging.ERROR, msg, *args, **kwargs)


if __name__ == "__main__":
    # 测试中文日志
    SimpleLogger.info(u"用户登录", u"张三", ip="192.168.1.100")
    
    # 测试混合参数
    SimpleLogger.error("配置错误", "database", code=500, detail=OrderedDict([("line", 42), ("file", "app.conf")]))
    
    # 测试二进制数据
    bad_data = '\xe6\x97\xa0'  # GBK编码的汉字"无"
    SimpleLogger.warning(b"invalid data", bad_data, sector=[0x12, 0xff, 0x7f])
    
    # 测试嵌套参数
    SimpleLogger.debug("调试信息", {"key": [1, 2, 3]}, timeout=30.5)