#!/usr/bin/env python3 """Mail filter (including anti-spam) and notifier for Notmuch. Track messages classified as spam (or ham) by Bogofilter via '.bf_spam' (resp. '.bf_ham' ) tags. Since afew removes the `new' tag, when notifying mail we track new messages with a temporary tag (option '--tmp' of `filter' subcommand) which we assume not to preexist in the database. These tags and that added by the user to spam messages can be customized via command-line options or, from Python, by modifying module-level constants or by via function arguments. This script is potentially affected by environment variables, files and directories that affect afew, Bogofilter, Notmuch or (obviously) Python3, including: 1. `NOTMUCH_CONFIG' – location of Notmuch configuration file – and that file itself. 2. `BOGOFILTER_DIR' – location of Bogofilter's database directory – and that directory itself. 3. afew configuration. WISH: Accept customizable "new" flags (currently we assume "new"). """ # WISH: Finish documenting the exceptions possibly raised by each function import logging import sys import time from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace from functools import partial from logging import handlers from subprocess import PIPE, STDOUT, Popen, run from typing import Any, Callable, Iterable, Optional, Tuple, Union from notmuch import Database, Message # https://wiki.archlinux.org/index.php/Desktop_notifications#Python import gi # isort:skip gi.require_version('Notify', '0.7') # pylint: disable=wrong-import-position from gi.repository import Notify # noqa: E402 isort:skip Tags = Union[str, Iterable[str]] LOG = logging.getLogger(__name__) FILTER_ACTIONS = {'spam', 'general', 'notify'} # Defaults for command-line options BF_HAM = '.bf_ham' BF_SPAM = '.bf_spam' USER_SPAM = 'spam' TMP = '_simplemuch_tmp' class SimplemuchError(Exception): """Base class for simplemuch exception classes""" class NotmuchDatabaseNeedsUpgradeError(SimplemuchError): """needs_upgrade() returned True.""" # WISH Capture more information, e.g. return code and command line class BogofilterError(SimplemuchError): """Error from Bogofilter""" # def teste_mypy(i: int) -> None: # return i + '' def alert(summary: str, body: str, *args: Any, fun: Callable[..., None] = LOG.warning) -> None: """Show desktop notification -- `summary', `body' -- and log. Logs with fun(body, *args). """ if fun in (LOG.exception, LOG.error): kwargs = {'icon': 'dialog-error'} elif fun in (LOG.warn, LOG.warning): kwargs = {'icon': 'dialog-warning'} else: kwargs = {} Notify.Notification.new(summary, body % args, **kwargs).show() fun(body, *args) def safe_open_db_rw() -> Database: """Open Notmuch database for reading and writing and return it. Before returning, check if the database needs upgrade; if so, raise NotmuchDatabaseNeedsUpgradeError. """ nm_db = Database(mode=Database.MODE.READ_WRITE) if nm_db.needs_upgrade(): raise NotmuchDatabaseNeedsUpgradeError( 'Notmuch database needs upgrade. Exiting without action.\n' 'WISH Implement correct database upgrading') return nm_db def update(nm_db: Database, args: Namespace, query: str, opr: str) -> Tuple[int, float]: """Call bogofilter on messages matching `query', change their tags. Call `bogofilter' with command-line option `opr' (plus -bl) and feed it (via stdin) the filenames of messages matching Notmuch query `query'. For each such message, apply the corresponding tag change (according to `args.bf_spam' and `args.bf_ham'). `opr' must be in set('SsNn') (see bogofilter(1) for the meaning). Return the number of messages operated on and the elapsed time. This function is potentially affected by environment variables, files and directories that affect Bogofilter or Notmuch. TODO Handle bogofilter errors """ start = time.time() assert opr in set('SsNn') tag_ = args.bf_spam if opr in 'sS' else args.bf_ham if opr in 'sn': def tag(msg: Message) -> None: msg.add_tag(tag_) else: def tag(msg: Message) -> None: msg.remove_tag(tag_) num = 0 with Popen(('bogofilter', '-bl' + opr), stdin=PIPE, text=True, bufsize=1) as bogo: assert bogo.stdin # Placate mypy for msg in nm_db.create_query(query).search_messages(): bogo.stdin.write(msg.get_filename() + '\n') tag(msg) num += 1 if bogo.returncode: raise BogofilterError(f'Bogofilter returned {bogo.returncode}') return num, time.time() - start def train(args: Namespace) -> None: """Train Bogofilter on the Notmuch database. According to how the user classified the given message (spam or ham), update Simplemuch tags (`args.bf_spam' and `args.bf_ham') and Bogofilter's database. We assume the user classified a message as spam if it is tagged `args.user_spam'; and he classified it as ham if it has been read but not tagged `args.user_spam'. Therefore we assume that: 1. Messages tagged `args.user_spam' are in fact spam. 2. Spammy read messages are tagged `args.user_spam'. 3. Messages tagged `args.bf_spam' are also tagged `args.user_spam', unless they are false positives. A problematic scenario is when the user reads spam in webmail but forgets to tag it as spam in Notmuch. This function is potentially affected by environment variables, files and directories that affect Bogofilter or Notmuch. """ with safe_open_db_rw() as nm_db: def train_(query: str, opr: str, obj: str) -> None: assert opr in set('SsNn') opr_ = 'Register' if opr in 'sn' else 'Unregister' end = f'{opr_}ed %d {obj} in %.3gs' LOG.info('%s %s', opr_, obj) num, dur = update(nm_db, args, query, opr) LOG.info(end, num, dur) bf_spam, bf_ham, user_spam = args.bf_spam, args.bf_ham, args.user_spam train_(f'is:{user_spam} NOT is:{bf_spam}', 's', 'spam') train_(f'is:{bf_spam} NOT is:{user_spam}', 'S', '(false) spam') train_(f'NOT (is:{user_spam} is:unread is:{bf_ham})', 'n', 'ham') train_(f'is:{user_spam} AND is:{bf_ham}', 'N', '(false) ham') def count(nm_db: Database, query: str, exclude: Tags = ()) -> int: """Return Xapian’s best guess as to how many messages match `query'. `exclude', if provided, must contain tags to exclude from the count by default. A given tag will not be excluded if it appears explicitly in `query'. May raise: - `NullPointerError' if the query creation failed (e.g. too little memory). - `NotInitializedError' if the underlying db was not initialized. This function is potentially affected by environment variables, files and directories that affect Notmuch. WISH Find out and document what "best guess" means; this wording is from the documentation of notmuch Python bindings. """ query_ = nm_db.create_query(query) if isinstance(exclude, str): query_.exclude_tag(exclude) else: for tag in exclude: query_.exclude_tag(tag) return query_.count_messages() def filter_spam(nm_db: Database, query: str, ham: Optional[str] = None, spam: Optional[str] = None) -> None: """Filter (Bogofilter) the messages matching Notmuch query `query'. If Bogofilter classifies a given message as Spam/Ham then tag it `spam'/`ham' (defaulting to `BF_SPAM'/`BF_HAM'). This function is potentially affected by environment variables, files and directories that affect Bogofilter or Notmuch. """ tag = dict(H=ham or BF_HAM, S=spam or BF_SPAM) with Popen(('bogofilter', '-blT'), stdin=PIPE, stdout=PIPE, text=True, bufsize=1) as bogo: assert bogo.stdin and bogo.stdout # Placate mypy for msg in nm_db.create_query(query).search_messages(): bogo.stdin.write(msg.get_filename() + '\n') code = bogo.stdout.readline().split()[-2] if code != 'U': msg.add_tag(tag[code]) msg_id = msg.get_message_id() LOG.debug('Message %s marked %s', msg_id, tag[code]) def tag_search(nm_db: Database, query: str, add: Tags = (), remove: Tags = ()) -> None: """Add/remove tags from messages matching Notmuch `query'. `nm_db' must be open for reading and writing. `query' should be a Notmuch query on whose results we should act. Operate atomically on the set of messages matching `query'. May raise: - `XapianError' – see documentation of `begin_atomic()' and `end_atomic()' methods of `Database' - `NullPointerError' if notmuch query creation failed (e.g. too little memory) or `search_messages()' failed - `NotInitializedError' if the underlying db was not initialized - `NullPointerError' if a given tag is NULL - `TagTooLongError' if the length of a given tag exceeds notmuch.Message.NOTMUCH_TAG_MAX) - `ReadOnlyDatabaseError' if the database was opened in read-only mode - `NotInitializedError' if message has not been initialized This function is potentially affected by environment variables, files and directories that affect Notmuch. """ nm_db.begin_atomic() for msg in nm_db.create_query(query).search_messages(): if isinstance(add, str): msg.add_tag(add) else: for tag in add: msg.add_tag(tag) if isinstance(remove, str): msg.remove_tag(remove) else: for tag in remove: msg.remove_tag(tag) nm_db.end_atomic() def filter_notify(args: Namespace) -> None: """Filter mail (afew, Bogofilter and Notmuch) and notify. - `args.req' must be a container with elements of FILTER_ACTIONS we should act on (requested actions). - If \"args.req['spam']\" is True then `args.query' must be a string representing a Notmuch query (on whose results the spam filter will work) and `args.bf_ham', `args.bf_spam' must be the tags to add to messages that Bogofilter classifies as ham (resp. spam). - If `args.req' includes 'notify', we internally use a temporary tag – args.tmp – that we assume not to preexist in the Notmuch database. This function is potentially affected by environment variables, files and directories that affect afew, Bogofilter or Notmuch. TODO Document the required Notmuch saved queries. TODO Document the DKIM filtering. """ if args.req['general'] or args.req['notify'] or args.req['spam']: with safe_open_db_rw() as nm_db: if args.req['spam']: filter_spam(nm_db, args.query, args.bf_ham, args.bf_spam) if args.req['general'] or args.req['notify']: # Afew will remove 'new' tag_search(nm_db, 'is:new', args.tmp) tmp_count = count(nm_db, f'is:{args.tmp}') pipe = partial(run, stdout=PIPE, text=True) try: if args.req['general'] or args.req['notify']: exclude = pipe( ('notmuch', 'config', 'get', 'search.exclude_tags'), check=True).stdout.rstrip('\n').split('\n') if args.req['general']: afew = pipe(('afew', '-tnv'), check=True, stderr=STDOUT) LOG.info('\n%s', afew.stdout) exclude_dkim = '(%s)' % ' OR '.join( (f'is:{tag}' for tag in exclude + ['/dkim-.*/'])) # problem = ('1584638185559.1b10c882-e1e1-4993-8f01-bdbcb3b4afe2@' # '302036m.grancursosonline.com.br') dkim_query = f'(is:{args.tmp} -{exclude_dkim})' afew = pipe(('afew', '-tv', '-eDKIMValidityFilter', dkim_query), stderr=STDOUT) if afew.returncode: alert('DKIM filter error', 'afew DKIMValidityFilter returned %d:\n%s', afew.returncode, afew.stdout) else: LOG.info('\n%s', afew.stdout) if args.req['general'] or args.req['notify']: with safe_open_db_rw() as nm_db: if args.req['notify']: p_count = partial(count, nm_db, exclude=exclude) tmp_notify = f'is:{args.tmp} query:simplemuch_notify' notify = p_count(tmp_notify) if notify: unread = p_count("query:simplemuch_unread") inbox_unread = p_count("query:simplemuch_INBOX_unread") flagged = p_count("query:simplemuch_flagged") body = (f'\ Inbox: {unread} unread ({inbox_unread} INBOX), {flagged} flagged\n' + '\n'.join(msg.get_header('Subject') for msg in nm_db.create_query( tmp_notify).search_messages())) summary = f'{notify} new messages.' Notify.Notification.new( summary, body, 'mail-message-new').show() tag_search(nm_db, 'is:' + args.tmp, remove=args.tmp) tmp_count = 0 finally: if (args.req['notify'] or args.req['general']) and tmp_count: body_fmt = '%d messages left tagged %s' alert('Dirty messages', body_fmt, tmp_count, args.tmp) # Commented out since I don't know a simple way to obtain the location of the # Bogofilter directory. It may not be `(~/.bogofilter)': see Bogofilter # man page section `ENVIRONMENT'. Maybe the `-x' flag can help. # def clean(db, args): # """Remove Bogofilter tags from all messages and remove `(~/.bogofilter)'""" # if not shutil.rmtree.avoids_symlink_attacks: # print("Warning: this `shutil.rmtree' is susceptible to symlink attacks.") # while True: # reply = input(prompt= # f"""Remove Bogofilter database directory and, from all Notmuch email messages, # {args.bf_spam} and {args.bf_ham} tags? [y/N] """).lower() # if 'no'.startswith(reply): # return False # if 'yes'.startswith(reply): # shutil.rmtree(os.path.expanduser('~/.bogofilter')) # tag_search(db, f'is:{args.bf_spam}', remove='f{args.bf_spam}') # tag_search(db, f'is:{args.bf_ham}', remove=f'{args.bf_ham}') # return True # print( # 'Please provide a valid answer: "yes", "no" or a prefix, ' # 'case-insensitive', file=sys.stderr) def parse_command_line() -> Namespace: """Parse sys.argv into a Namespace object""" parser = ArgumentParser( description=__doc__, formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument( '--version', action='version', version='Simplemuch alpha') parser.add_argument('-v', '--verbose', action='store_true', help='Output log messages also to stderr') parser.add_argument( '--bf_spam', default=BF_SPAM, metavar='TAG', help='Tag for bogofilter-flagged spam') parser.add_argument( '--bf_ham', default=BF_HAM, metavar='TAG', help='Tag for bogofilter-flagged ham') parser.add_argument( '--user_spam', default=USER_SPAM, metavar='TAG', help='Tag for user-flagged spam') parser.add_argument( '--loglevel', default='INFO', help="""\ Severity threshold for logging; logging messages less severe are discarded. For the allowed values see https://docs.python.org/3/howto/logging.html""") subparsers = parser.add_subparsers( title='Subcommands', required=True, description='Specify exactly one') parser_filter = subparsers.add_parser( 'filter', help="""Filter mail. By default (see `--skip'), filter out spam, then do general mail filtering (with afew) and then, depending on the new messages, notify.""") parser_filter.add_argument( '--skip', choices=FILTER_ACTIONS, nargs='+', help='Actions to skip', default=()) # WISH: append a random suffix parser_filter.add_argument( '--tmp', metavar='TEMPORARY_TAG', default=TMP, help='Temporary tag for internal use; assumed by this script' ' not to preexist in the database') parser_filter.add_argument( 'query', nargs='?', default='is:new', help='The Notmuch query whose result will be spam-filtered') parser_filter.set_defaults(func=filter_notify) parser_train = subparsers.add_parser( 'train', help="""Train bogofilter. We assume the user classified a message as spam if it is tagged `args.user_spam'; and he classified a message as ham if it has been read but not tagged `args.user_spam'. Therefore we assume that: 1. Messages tagged `args.user_spam' are in fact spam. 2. Spammy read messages are tagged `args.user_spam'. 3. Messages tagged `args.bf_spam' are also tagged `args.user_spam', unless they are false positives. A problematic scenario is when the user reads spam in webmail but forgets to tag it spam in Notmuch.""") parser_train.set_defaults(func=train) # parser_clean = subparsers.add_parser( # 'clean', # help="Remove Bogofilter tags from all messages and remove " # "`(~/.bogofilter)'") # parser_clean.set_defaults(func=clean) args = parser.parse_args() args.req = {a: args.func is filter_notify and a not in args.skip for a in FILTER_ACTIONS} # Requested actions return args def main() -> None: """Run as script: set up logging, parse sys.argv, execute.""" # WISH Maybe change the type of socket. See SysLogHandler documentation handler1 = handlers.SysLogHandler( address='/dev/log', facility=handlers.SysLogHandler.LOG_MAIL) formatter = logging.Formatter( '%(module)s[%(process)d].%(funcName)s: %(levelname)s: %(message)s') handler1.setFormatter(formatter) LOG.addHandler(handler1) try: args = parse_command_line() # https://www.python.org/dev/peps/pep-0008/#programming-recommendations except: # noqa: E722 LOG.exception( 'Exception occurred while parsing command line ("%s")', sys.argv) raise try: if args.verbose: handler2 = logging.StreamHandler() handler2.setFormatter(formatter) LOG.addHandler(handler2) level_num = getattr(logging, args.loglevel.upper(), None) if not isinstance(level_num, int): raise ValueError('Invalid log level: %s' % args.loglevel) LOG.setLevel(level_num) if args.req['notify']: # WISH Compute name from sys.argv[0], like argparse? Notify.init('Simplemuch') args.func(args) except: # noqa: E722 alert('Exception occurred', 'Command line: "%s"; parsed: %s', sys.argv, args, fun=LOG.exception) raise if __name__ == '__main__': main() # Local Variables: # ispell-local-dictionary: "en_US" # End: