"""
Logger
=======
Coaster can help your application log errors at run-time. Initialize with
:func:`coaster.logger.init_app`. If you use :func:`coaster.app.init_app`,
this is done automatically for you.
"""
from datetime import datetime, timedelta
from html import escape
from io import StringIO
from pprint import pprint
from threading import Lock
from typing import Any, Dict
import logging
import logging.handlers
import re
import textwrap
import traceback
from flask import g, request, session
from flask.config import Config
import requests
from .auth import current_auth
# Global variable to ensure log handlers are registered only once per process
logging_configured = False
# Regex for credit card numbers
_card_re = re.compile(r'\b(?:\d[ -]*?){13,16}\b')
# These keywords are borrowed from Sentry's documentation and expanded for PII
_filter_re = re.compile(
'''
(password
|secret
|passwd
|api_key
|apikey
|access_token
|auth_token
|_token
|token_
|credentials
|mysql_pwd
|stripetoken
|cardnumber
|email
|phone)
''',
re.IGNORECASE | re.VERBOSE,
)
# global var as lazy in-memory cache
error_throttle_timestamp_slack: Dict[str, datetime] = {}
error_throttle_timestamp_telegram: Dict[str, datetime] = {}
[docs]class FilteredValueIndicator:
"""Represent a filtered value."""
def __str__(self):
"""Filter str."""
return '[Filtered]'
def __repr__(self):
"""Filter repr."""
return '[Filtered]'
# Construct a singleton
filtered_value_indicator = FilteredValueIndicator()
[docs]class RepeatValueIndicator:
"""Represent a repeating value."""
def __init__(self, key):
"""Init with key."""
self.key = key
def __repr__(self):
"""Return representation."""
return '<same as prior "%s">' % self.key
__str__ = __repr__
[docs]def filtered_value(key, value):
"""Find and mask sensitive values based on key names."""
if isinstance(key, str) and _filter_re.search(key):
return filtered_value_indicator
if isinstance(value, str):
return _card_re.sub('[Filtered]', value)
return value
[docs]def pprint_with_indent(dictlike, outfile, indent=4):
"""Filter values and pprint with indent to create a Markdown code block."""
out = StringIO()
pprint( # NOQA: T003
{key: filtered_value(key, value) for key, value in dictlike.items()}, out
)
outfile.write(textwrap.indent(out.getvalue(), ' ' * indent))
out.close()
[docs]class SlackHandler(logging.Handler):
"""Custom logging handler to post error reports to Slack."""
def __init__(self, app_name, webhooks):
"""Init handler."""
super().__init__()
self.app_name = app_name
self.webhooks = webhooks
[docs] def emit(self, record):
"""Emit an event."""
throttle_key = (record.module, record.lineno)
if throttle_key not in error_throttle_timestamp_slack or (
(datetime.utcnow() - error_throttle_timestamp_slack[throttle_key])
> timedelta(minutes=5)
):
# Sanity check:
# If we're not going to be reporting this, don't bother to format payload
if record.levelname not in [
lname
for webhook in self.webhooks
for lname in webhook.get('levelnames', [])
]:
return
if record.exc_text:
double_split = [
s.split('----') for s in record.exc_text.split('----------')
]
flat_list = [item for sublist in double_split for item in sublist]
# Separate out the first line of each section. It'll be used as the
# "pretext" while the rest will be used as a "text" attachment.
sections = [s.strip().split('\n', 1) for s in flat_list]
else:
sections = []
data = {
'text': "*{levelname}* in {name}: {message}: `{info}`".format(
levelname=record.levelname,
name=self.app_name,
message=record.message,
info=repr(record.exc_info[1]) if record.exc_info else '',
),
'attachments': [
{
'mrkdwn_in': ['text'],
'fallback': section[0],
'pretext': section[0],
'text': ('```\n' + section[1] + '\n```')
if len(section) > 1
else '',
}
for section in sections
],
}
for webhook in self.webhooks:
if record.levelname not in webhook.get('levelnames', []):
continue
payload = dict(data)
for attr in ('channel', 'username', 'icon_emoji'):
if attr in webhook:
payload[attr] = webhook[attr]
try:
requests.post(
webhook['url'],
json=payload,
headers={'Content-Type': 'application/json'},
)
except: # NOQA # nosec
# We need a bare except clause because this is the exception
# handler. It can't have exceptions of its own.
pass
error_throttle_timestamp_slack[throttle_key] = datetime.utcnow()
[docs]class TelegramHandler(logging.Handler):
"""Custom logging handler to report errors to a Telegram chat."""
def __init__(self, app_name, chatid, apikey):
"""Init handler."""
super().__init__()
self.app_name = app_name
self.chatid = chatid
self.apikey = apikey
[docs] def emit(self, record):
"""Emit an event."""
throttle_key = (record.module, record.lineno)
if throttle_key not in error_throttle_timestamp_telegram or (
(datetime.utcnow() - error_throttle_timestamp_telegram[throttle_key])
> timedelta(minutes=5)
):
text = '<b>{levelname}</b> in <b>{name}</b>: {message}'.format(
levelname=escape(record.levelname, False),
name=escape(self.app_name, False),
message=escape(record.message, False),
)
if record.exc_info:
# Reverse the traceback, after dropping the first line with
# "Traceback (most recent call first)".
traceback_lines = traceback.format_exception(*record.exc_info)[1:][::-1]
for index, stack_frame in enumerate(traceback_lines):
stack_frame_lines = stack_frame.split('\n', 1)
traceback_lines[index] = (
'\n'.join(
[escape(stack_frame_lines[0].strip(), False)]
+ [
'<pre>' + escape(_l.strip(), False) + '</pre>'
for _l in stack_frame_lines[1:]
if _l
]
)
+ '\n'
)
text += '\n\n' + '\n'.join(traceback_lines)
if len(text) > 4096:
text = text[: 4096 - 7] # 7 = len('</pre>…')
if text.count('<pre>') > text.count('</pre>'):
text += '</pre>'
text += '…'
requests.post(
'https://api.telegram.org/bot{apikey}/sendMessage'.format(
apikey=self.apikey
),
data={
'chat_id': self.chatid,
'parse_mode': 'html',
'text': text,
'disable_preview': True,
},
)
error_throttle_timestamp_telegram[throttle_key] = datetime.utcnow()
[docs]def init_app(app):
"""
Enable logging for an app using :class:`LocalVarFormatter`.
Requires the app to be configured and checks for the following configuration
parameters. All are optional:
* ``LOGFILE``: Name of the file to log to (default ``error.log``)
* ``LOGFILE_LEVEL``: Logging level to use for file logger (default `WARNING`)
* ``ADMINS``: List of email addresses of admins who will be mailed error reports
* ``MAIL_DEFAULT_SENDER``: From address of email. Can be an address or a tuple with
name and address
* ``MAIL_SERVER``: SMTP server to send with (default ``localhost``)
* ``MAIL_USERNAME`` and ``MAIL_PASSWORD``: SMTP credentials, if required
* ``SLACK_LOGGING_WEBHOOKS``: If present, will send error logs to all specified
Slack webhooks
* ``TELEGRAM_ERROR_CHATID`` and ``TELEGRAM_ERROR_APIKEY``: If present, will use the
specified API key to post a message to the specified chat
Format for ``SLACK_LOGGING_WEBHOOKS``::
SLACK_LOGGING_WEBHOOKS = [{
'levelnames': ['WARNING', 'ERROR', 'CRITICAL'],
'url': 'https://hooks.slack.com/...'
}]
"""
global logging_configured
if logging_configured:
app.logger.warning("Logging already configured, ignoring repeat call")
return
logging_configured = True
root = logging.getLogger()
formatter = LocalVarFormatter(
'%(asctime)s - %(module)s.%(funcName)s:%(lineno)s - %(levelname)s - %(message)s'
)
error_log_file = app.config.get('LOGFILE', 'error.log')
if error_log_file: # Specify a falsy value in config to disable the log file
file_handler = logging.handlers.TimedRotatingFileHandler(
error_log_file,
when=app.config.get('LOGFILE_WHEN', 'midnight'),
backupCount=app.config.get('LOGFILE_BACKUPCOUNT', 0),
)
file_handler.setFormatter(formatter)
file_handler.setLevel(app.config.get('LOGFILE_LEVEL', logging.WARNING))
root.addHandler(file_handler)
if app.config.get('SLACK_LOGGING_WEBHOOKS'):
logging.handlers.SlackHandler = SlackHandler
slack_handler = logging.handlers.SlackHandler(
app_name=app.config.get('SITE_ID') or app.name,
webhooks=app.config['SLACK_LOGGING_WEBHOOKS'],
)
slack_handler.setFormatter(formatter)
slack_handler.setLevel(logging.NOTSET)
root.addHandler(slack_handler)
if app.config.get('TELEGRAM_ERROR_CHATID') and app.config.get(
'TELEGRAM_ERROR_APIKEY'
):
logging.handlers.TelegramHandler = TelegramHandler
telegram_handler = logging.handlers.TelegramHandler(
app_name=app.config.get('SITE_ID') or app.name,
chatid=app.config['TELEGRAM_ERROR_CHATID'],
apikey=app.config['TELEGRAM_ERROR_APIKEY'],
)
telegram_handler.setLevel(logging.ERROR)
root.addHandler(telegram_handler)
if app.config.get('ADMINS'):
mail_sender = app.config.get('MAIL_DEFAULT_SENDER', 'logs@example.com')
if isinstance(mail_sender, (list, tuple)):
mail_sender = mail_sender[1] # Get email from (name, email)
if app.config.get('MAIL_USERNAME') and app.config.get('MAIL_PASSWORD'):
credentials = (app.config['MAIL_USERNAME'], app.config['MAIL_PASSWORD'])
else:
credentials = None
mail_handler = logging.handlers.SMTPHandler(
app.config.get('MAIL_SERVER', 'localhost'),
mail_sender,
app.config['ADMINS'],
'%s failure' % (app.config.get('SITE_ID') or app.name),
credentials=credentials,
)
mail_handler.setFormatter(formatter)
mail_handler.setLevel(logging.ERROR)
root.addHandler(mail_handler)
# Legacy name
configure = init_app