"""
Configure logging.
A call to the function ``config_logger`` in a launching script
is all that is needed to set up the logging format.
Usually the 'level' argument is the only argument one needs to customize::
config_logger(level='info')
Do not call this in library modules. Library modules should have::
logger = logging.getLogger(__name__)
and then just use ``logger`` to write logs without concern about formatting,
destination of the log message, etc.
"""
__all__ = ['config_logger', 'set_level', 'add_console_handler']
import inspect
import logging
import logging.handlers
import os
import sys
import time
import warnings
from datetime import datetime
from logging import Formatter
from typing import Union
# When exceptions are raised during logging, then,
# the default implementation of handleError() in Handler
# checks to see if a module-level variable,
# raiseExceptions,
# is set.
# If set, a traceback is printed to sys.stderr.
# If not set, the exception is swallowed.
#
# If no logging configuration is provided, then for Python 2.x,
# If logging.raiseExceptions is False (production mode),
# the event is silently dropped.
# If logging.raiseExceptions is True (development mode),
# a message 'No handlers could be found for logger X.Y.Z'
# is printed once.
# Turn off annoyance in ptpython when setting DEBUG logging
logging.getLogger('parso').setLevel(logging.ERROR)
logging.captureWarnings(True)
warnings.filterwarnings('default', category=ResourceWarning)
warnings.filterwarnings('default', category=DeprecationWarning)
warnings.filterwarnings('ignore', category=DeprecationWarning, module='ptpython')
warnings.filterwarnings('ignore', category=DeprecationWarning, module='jedi')
rootlogger = logging.getLogger()
logger = logging.getLogger(__name__)
class DynamicFormatter(Formatter):
def __init__(
self,
*args,
with_datetime: bool = True,
with_timezone: bool = True,
with_level: bool = True,
**kwargs,
):
# TODO: could add parameters to customize formatting, as new ideas emerges.
super().__init__(*args, **kwargs)
self._with_datetime = with_datetime
self._with_timezone = with_timezone
self._with_level = with_level
self._tz = datetime.now().astimezone().tzname()
def format(self, record):
r = record
if self._with_datetime:
asctime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(r.created))
msecs = int((r.created % 1) * 10000)
fmt = f'{asctime}.{msecs:0>4} '
if self._with_timezone:
fmt += f'{self._tz} '
else:
fmt = ''
if self._with_level:
fmt += f'{r.levelname: <12} '
fmt += f'%(message)s [( {r.name}, {r.lineno}, {r.funcName}'
p = r.processName
t = r.threadName
if t.endswith(f' ({r.funcName})'):
t = t[: -(len(r.funcName) + 3)]
tk = getattr(r, 'taskName', None)
if p == 'MainProcess':
if t == 'MainThread':
if tk is None:
fmt += ' )]'
else:
fmt += f' | {tk} )]'
else:
if tk is None:
fmt += f' | {t} )]'
else:
fmt += f' | {t}, {tk} )]'
else:
if t == 'MainThread':
if tk is None:
fmt += f' | {p} <{r.process}> )]'
else:
fmt += f' | {p} <{r.process}>, {tk} )]'
else:
if tk is None:
fmt += f' | {p} <{r.process}>, {t} )]'
else:
fmt += f' | {p} <{r.process}>, {t}, {tk} )]'
formatter = Formatter(fmt)
return formatter.format(record)
# We cannot completely construct the message and return it w/o making use
# of a Formatter's `format` method. Somehow that wouldn't have the correct behavior
# for `logger.exception`---it would not print the traceback.
[docs]
def set_level(level: Union[str, int] = logging.INFO) -> int:
"""
In one application, call `set_level` on the top level.
Usually you should do this only once, and do not set level on any handler.
Return the original level that was in effect before this function was called.
By default, the original level is `logging.INFO` or `logging.WARNING`.
Parameters
----------
level
Either the integers `logging.DEBUG`, `logging.INFO`, etc., or strings
"debug", "info", etc.
"""
if isinstance(level, str):
level = getattr(logging, level.upper())
level0 = rootlogger.level
rootlogger.setLevel(level)
return level0
[docs]
def add_console_handler(**kwargs) -> logging.Handler:
"""
Log to ``sys.stderr``.
"""
h = logging.StreamHandler()
h.setFormatter(DynamicFormatter(**kwargs))
rootlogger.addHandler(h)
return h
[docs]
def config_logger(level=logging.INFO, **kwargs):
# For use in one-off scripts.
set_level(level)
add_console_handler(**kwargs)
def add_disk_handler(
*,
foldername: str = None,
maxBytes=1_000_000,
backupCount=20,
delay=True,
**kwargs,
):
if foldername:
foldername = foldername.rstrip('/')
else:
launcher = get_calling_file()
foldername == f'{os.environ.get("LOGDIR", "/tmp/log")}/{launcher.lstrip("/").replace("/", "-")}'
print(f"Log files are located in '{foldername}'")
os.makedirs(foldername, exist_ok=True)
h = logging.handlers.RotatingFileHandler(
filename=foldername + '/current',
maxBytes=maxBytes,
backupCount=backupCount,
delay=delay,
)
h.setFormatter(DynamicFormatter(**kwargs))
rootlogger.addHandler(h)
return foldername
def log_uncaught_exception(handlers=None, logger=logger):
# locally bind `handlers` and `logger` in case the global references are gone
# when the exception handler is invoked.
if handlers is None:
handlers = rootlogger.handlers
def handle_exception(exc_type, exc_val, exc_tb):
if not issubclass(exc_tb, KeyboardInterrupt):
if sys.version_info.minor < 11:
logging.currentframe = lambda: sys._getframe(1)
fn, lno, func, sinfo = logger.findCaller(stack_info=False, stacklevel=1)
record = logger.makeRecord(
logger.name,
logging.CRITICAL,
fn,
lno,
msg=exc_val,
args=(),
exc_info=(exc_type, exc_val, exc_tb),
func=func,
sinfo=sinfo,
)
for h in handlers:
h.handle(record)
sys.__excepthook__(exc_type, exc_val, exc_tb)
sys.excepthook = handle_exception
def get_calling_file() -> inspect.FrameInfo:
"""
This function finds the "launch script" of the currently running program.
The "launch script" is either a `.py` file or the Python interpreter.
The returned object has attribute `filename`, which is the full path of the launch script.
The value `'<stdin>'` suggests the Python interpreter, i.e. it's in an interactive Python session.
$ ptpython
>>> from cloudly.util.logging import get_calling_file
>>> def func_a():
... return get_calling_file()
>>> def func_b():
... z = func_a()
... print(z)
... print()
... print(z.filename)
>>> func_b()
FrameInfo(frame=<frame at 0x7f54d8009e30, file '<stdin>', line 2, code func_a>, filename='<stdin>', lineno=2, function='func_a', code_context=None, index=None)
<stdin>
>>>
"""
st = inspect.stack()
caller = None
for s in st:
if os.path.basename(s.filename) == 'runpy.py':
# `runpy.py` is usually what finds and launches the module specified by the
# `-m` command line switch. So, the last step was the user script. Stop here.
break
if '_pytest/python.py' in s.filename:
# Don't follow into py.test; stop at the test file
break
if s.filename == '<stdin>':
# This is the Python interpreter.
caller = s
break
if s.function == '<module>':
# This is the case if the code has traced to the `if __name__ == '__main__'` block.
# I don't know if there are other possibilities.
break
caller = s
return caller