#
# Copyright 2015 Red Hat, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#
# Refer to the README and COPYING files for full details of the license
#
"""
This module defines the special logging tools that lago uses
"""
import logging
import logging.config
import os
import re
import sys
import datetime
import threading
from collections import (OrderedDict, deque, )
from functools import wraps
#: Message to be shown when a task is started
START_TASK_MSG = ''
#: Message template that will trigger a task
START_TASK_TRIGGER_MSG = 'start task %s'
#: Regexp that will match the above template
START_TASK_REG = re.compile('start task (?P<task_name>.*)')
#: Message to be shown when a task is ended
END_TASK_MSG = 'Success'
END_TASK_ON_ERROR_MSG = 'ERROR'
#: Message template that will trigger a task end
END_TASK_TRIGGER_MSG = 'end task %s'
#: Regexp that will match the above template
END_TASK_REG = re.compile('end task (?P<task_name>.*)')
#: Message template that will always shoud the messago
ALWAYS_SHOW_TRIGGER_MSG = 'force-show:%s'
#: Regexp that will match the above template
ALWAYS_SHOW_REG = re.compile('force-show:(?P<message>.*)')
[docs]class Task(deque):
"""
Small wrapper around deque to add the failed status and name to a task
Attributes:
name (str): name for this task
failed (bool): If this task has failed or not (if there was any error
log shown during it's execution)
force_show (bool): If set, will show any log records generated inside
this task even if it's out of nested depth limit
"""
def __init__(self, name, *args, **kwargs):
"""
Args:
name (str): name for this task
*args: any :class:`deque` args
*kwargs: any :class:`deque` kwargs
"""
self.failed = False
self.force_show = False
self.name = name
self.start_time = datetime.datetime.now()
super(deque, self).__init__(*args, **kwargs)
def __str__(self):
return (
'%s(failed=%s, force_show=%s, len=%d)' %
(self.name, self.failed, self.force_show, len(self))
)
[docs] def elapsed_time(self):
return str(datetime.datetime.now() - self.start_time).rsplit('.', 1)[0]
[docs]class ContextLock(object):
"""
Context manager to thread lock a block of code
"""
def __init__(self):
self.lock = threading.Lock()
def __enter__(self):
self.lock.acquire()
def __exit__(self, exc_type, exc_val, exc_tb):
self.lock.release()
[docs]class TaskHandler(logging.StreamHandler):
"""
This log handler will use the concept of tasks, to hide logs, and will show
all the logs for the current task if there's a logged error while running
that task.
It will hide any logs that belong to nested tasks that have more than
``task_tree_depth`` parent levels, and for the ones that are above that
level, it will show only the logs that have a loglevel above
``level``.
You can force showing a log record immediately if you use the
:func:`log_always` function bypassing all the filters.
If there's a log record with log level higher than ``dump_level`` it will
be considered a failure, and all the logs for the current task that have a
log level above ``level`` will be shown no matter at which depth the task
belongs to. Also, all the parent tasks will be tagged as error.
Attributes:
formatter (logging.LogFormatter): formatter to use
initial_depth (int): Initial depth to account for, in case this handler
was created in a subtask
tasks_by_thread (dict of str: OrderedDict of str: Task): List of
thread names, and their currently open tasks with their latest
log records
dump_level (int): log level from which to consider a log record as
error
buffer_size (int): Size of the log record deque for each task, the
bigger, the more records it can show in case of error but the more
memory it will use
task_tree_depth (int): number of the nested level to show start/end
task logs for, if -1 will show always
level (int): Log level to show logs from if the depth limit is not
reached
main_failed (bool): used to flag from a child thread that the main
should fail any current task
_tasks_lock (ContextLock): Lock for the tasks_by_thread dict
_main_thread_lock (ContextLock): Lock for the main_failed bool
"""
#: List of chars to show as task prefix, to ease distinguishing them
TASK_INDICATORS = ['@', '#', '*', '-', '~']
def __init__(
self,
initial_depth=0,
task_tree_depth=-1,
buffer_size=2000,
dump_level=logging.ERROR,
level=logging.NOTSET,
formatter=ColorFormatter,
):
super(TaskHandler, self).__init__()
self.formatter = formatter
self.initial_depth = initial_depth
self.tasks_by_thread = {}
self.dump_level = dump_level
self.buffer_size = buffer_size
self.task_tree_depth = task_tree_depth
self.level = level
self.main_failed = False
self._tasks_lock = ContextLock()
self._main_thread_lock = ContextLock()
@property
def cur_task(self):
"""
Returns:
str: the current active task
"""
return self.tasks.keys()[-1] if self.tasks else None
@property
def cur_thread(self):
"""
Returns:
str: Name of the current thread
"""
return threading.current_thread().name
@property
def tasks(self):
"""
Returns:
OrderedDict of str, Task: list of task names and log records for
each for the current thread
"""
return self.get_tasks(thread_name=self.cur_thread)
[docs] def get_tasks(self, thread_name):
"""
Args:
thread_name (str): name of the thread to get the tasks for
Returns:
OrderedDict of str, Task: list of task names and log records for
each for the given thread
"""
if thread_name not in self.tasks_by_thread:
with self._tasks_lock:
self.tasks_by_thread[thread_name] = OrderedDict()
return self.tasks_by_thread[thread_name]
@property
def cur_depth_level(self):
"""
Returns:
int: depth level for the current task
"""
cur_level = self.initial_depth
if not self.am_i_main_thread:
cur_level += len(self.get_tasks(thread_name='MainThread'))
return cur_level + len(self.tasks)
@property
def am_i_main_thread(self):
"""
Returns:
bool: if the current thread is the main thread
"""
return threading.current_thread().name == 'MainThread'
[docs] def mark_main_tasks_as_failed(self):
"""
Flags to the main thread that all it's tasks sholud fail
Returns:
None
"""
if self.am_i_main_thread:
return
with self._main_thread_lock:
self.main_failed = True
[docs] def should_show_by_depth(self, cur_level=None):
"""
Args:
cur_level (int): depth level to take into account
Returns:
bool: True if the given depth level should show messages (not
taking into account the log level)
"""
if cur_level is None:
cur_level = self.cur_depth_level
return (self.task_tree_depth < 0 or self.task_tree_depth >= cur_level)
[docs] def should_show_by_level(self, record_level, base_level=None):
"""
Args:
record_level (int): log level of the record to check
base_level (int or None): log level to check against, will use the
object's :class:`dump_level` if None is passed
Returns:
bool: True if the given log record should be shown according to the
log level
"""
if base_level is None:
base_level = self.dump_level
return record_level >= base_level
[docs] def handle_new_task(self, task_name, record):
"""
Do everything needed when a task is starting
Params:
task_name (str): name of the task that is starting
record (logging.LogRecord): log record with all the info
Returns:
None
"""
record.msg = ColorFormatter.colored('default', START_TASK_MSG)
record.task = task_name
self.tasks[task_name] = Task(name=task_name, maxlen=self.buffer_size)
if self.should_show_by_depth():
self.pretty_emit(record, is_header=True)
[docs] def mark_parent_tasks_as_failed(self, task_name, flush_logs=False):
"""
Marks all the parent tasks as failed
Args:
task_name (str): Name of the child task
flush_logs (bool): If ``True`` will discard all the logs form
parent tasks
Returns:
None
"""
for existing_task_name in self.tasks:
if existing_task_name == task_name:
break
if flush_logs:
self.tasks[existing_task_name].clear()
self.tasks[existing_task_name].failed = True
self.mark_main_tasks_as_failed()
[docs] def close_children_tasks(self, parent_task_name):
"""
Closes all the children tasks that were open
Args:
parent_task_name (str): Name of the parent task
Returns:
None
"""
if parent_task_name not in self.tasks:
return
while self.tasks:
next_task = reversed(self.tasks.keys()).next()
if next_task == parent_task_name:
break
del self.tasks[next_task]
[docs] def handle_closed_task(self, task_name, record):
"""
Do everything needed when a task is closed
Params:
task_name (str): name of the task that is finishing
record (logging.LogRecord): log record with all the info
Returns:
None
"""
if task_name not in self.tasks:
return
if self.main_failed:
self.mark_parent_tasks_as_failed(self.cur_task)
if self.tasks[task_name].failed:
record.msg = ColorFormatter.colored('red', END_TASK_ON_ERROR_MSG)
else:
record.msg = ColorFormatter.colored('green', END_TASK_MSG)
record.msg += ' (in %s)' % self.tasks[task_name].elapsed_time()
if self.should_show_by_depth() or self.tasks[task_name].force_show:
if self.tasks[task_name].force_show:
self.handle_error()
self.pretty_emit(record, is_header=True)
self.close_children_tasks(task_name)
self.tasks.pop(task_name)
[docs] def handle_error(self):
"""
Handles an error log record that should be shown
Returns:
None
"""
if not self.tasks:
return
# All the parents inherit the failure
self.mark_parent_tasks_as_failed(self.cur_task, flush_logs=True, )
# Show the start headers for all the parent tasks if they were not
# shown by the depth level limit
for index, task in enumerate(self.tasks.values()):
if self.should_show_by_depth(index + 1):
continue
start_task_header = logging.LogRecord(
'', logging.INFO, '', 0, '', [], None
)
start_task_header.msg = ColorFormatter.colored(
'default',
START_TASK_MSG,
)
start_task_header.task = task.name
self.pretty_emit(
start_task_header,
is_header=True,
task_level=index + 1,
)
# Show now all the cached logs for the current task
for old_record in self.tasks[self.cur_task]:
self.pretty_emit(old_record)
self.tasks[self.cur_task].clear()
[docs] def get_task_indicator(self, task_level=None):
"""
Args:
task_level (int or None): task depth level to get the indicator
for, if None, will use the current tasks depth
Returns:
str: char to prepend to the task logs to indicate it's level
"""
if task_level is None:
task_level = len(self.tasks)
return self.TASK_INDICATORS[task_level % len(self.TASK_INDICATORS)]
[docs] def pretty_emit(self, record, is_header=False, task_level=None):
"""
Wrapper around the :class:`logging.StreamHandler` emit method to add
some decoration stuff to the message
Args:
record (logging.LogRecord): log record to emit
is_header (bool): if this record is a header, usually, a start or
end task message
task_level (int): If passed, will take that as the current nested
task level instead of calculating it from the current tasks
Returns:
None
"""
task = record.task or self.cur_task
if task_level is None:
task_level = self.cur_depth_level
if is_header:
extra_prefix = (
self.get_task_indicator(task_level - 1) + ' ' + (
'' if self.am_i_main_thread else '[%s] ' % self.cur_thread
) + task + ': '
)
record.levelno = logging.INFO
else:
extra_prefix = ' ' + self.get_task_indicator(task_level) + ' '
if task:
record.msg = (
' ' * (task_level - 1) + extra_prefix + str(record.msg)
)
super(TaskHandler, self).emit(record)
super(TaskHandler, self).flush()
[docs] def emit(self, record):
"""
Handle the given record, this is the entry point from the python
logging facility
Params:
record (logging.LogRecord): log record to handle
Returns:
None
"""
record.task = self.cur_task
if record.levelno >= self.dump_level and self.cur_task:
self.tasks[self.cur_task].failed = True
self.tasks[self.cur_task].force_show = True
# Makes no sense to start a task with an error log
is_start = START_TASK_REG.match(str(record.msg))
if is_start:
self.handle_new_task(is_start.groupdict()['task_name'], record)
return
is_end = END_TASK_REG.match(str(record.msg))
if is_end:
self.handle_closed_task(is_end.groupdict()['task_name'], record)
return
force_show_record = ALWAYS_SHOW_REG.match(str(record.msg))
if force_show_record:
record.msg = force_show_record.groupdict()['message']
self.pretty_emit(record)
if (
not force_show_record and self.should_show_by_level(record)
and self.should_show_by_depth()
):
self.pretty_emit(record)
return
if self.cur_task:
self.tasks[self.cur_task].append(record)
[docs]class LogTask(object):
"""
Context manager for a log task
Example:
>>> with LogTask('mytask'):
... pass
"""
def __init__(
self,
task,
logger=logging,
level='info',
propagate_fail=True,
):
self.task = task
self.logger = logger
self.level = level
self.propagate = propagate_fail
def __enter__(self):
getattr(self.logger, self.level)(START_TASK_TRIGGER_MSG % self.task)
def __exit__(self, *args, **kwargs):
exc_type, _, _ = sys.exc_info()
if exc_type and self.propagate:
end_log_task(self.task, level='error')
else:
getattr(self.logger, self.level)(END_TASK_TRIGGER_MSG % self.task)
[docs]def log_task(task, logger=logging, level='info', propagate_fail=True):
"""
Parameterized decorator to wrap a function in a log task
Example:
>>> @log_task('mytask')
... def do_something():
... pass
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
with LogTask(
task,
logger=logger,
level=level,
propagate_fail=propagate_fail,
):
return func(*args, **kwargs)
return wrapper
return decorator
[docs]def start_log_task(task, logger=logging, level='info'):
"""
Starts a log task
Args:
task (str): name of the log task to start
logger (logging.Logger): logger to use
level (str): log level to use
Returns:
None
"""
getattr(logger, level)(START_TASK_TRIGGER_MSG % task)
[docs]def end_log_task(task, logger=logging, level='info'):
"""
Ends a log task
Args:
task (str): name of the log task to end
logger (logging.Logger): logger to use
level (str): log level to use
Returns:
None
"""
getattr(logger, level)(END_TASK_TRIGGER_MSG % task)
[docs]def log_always(message):
"""
Wraps the given message with a tag that will make it be always logged by
the task logger
Args:
message (str): message to wrap with the tag
Returns:
str: tagged message that will get it shown immediately by the task
logger
"""
return ALWAYS_SHOW_TRIGGER_MSG % message
[docs]def hide_paramiko_logs():
paramiko_logger = logging.getLogger('paramiko.transport')
paramiko_logger.propagate = False
paramiko_logger.setLevel(logging.ERROR)
[docs]def setup_prefix_logging(logdir):
"""
Sets up a file logger that will create a log in the given logdir (usually a
lago prefix)
Args:
logdir (str): path to create the log into, will be created if it does
not exist
Returns:
None
"""
if not os.path.exists(logdir):
os.mkdir(logdir)
file_handler = logging.FileHandler(
filename=os.path.join(logdir, 'lago.log'),
)
file_formatter = logging.Formatter(
fmt=(
'%(asctime)s::%(filename)s::%(funcName)s::%(lineno)s::'
'%(name)s::%(levelname)s::%(message)s'
),
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(file_formatter)
logging.root.addHandler(file_handler)
hide_paramiko_logs()