""" Manage twts. """ import collections import configparser import os import os.path import shelve import twtxt.models import twtxt.parser import twtxthash import twtxtparser import xdg class TwtxtManager: def __init__(self, config): self.config = config self._cache = None self._data = None self.conversation_tree = [] self.missing_conversation_root_count = 0 self.twts_count = 0 self.unread_twts_count = 0 self.read_twts_count = 0 self.show_read_conversations = False self.show_unread_missing_conversation_roots = False def load_twts(self): all_tweets = [] # map a subject string to a list of tweets which replied to a hash by subject replies_by_subject = collections.defaultdict(list) # map a hash to a tweet known_tweet_hashes = set() self.max_author_nick_width = 0 self.read_twts_count = 0 self.unread_twts_count = 0 MAX_TWEETS_LIMIT = 2000 self._cache = shelve.open(self.config.cachefile) self.own_source = twtxt.models.Source(nick=self.config.nick, url=self.config.twturl) with open(self.config.twtfile, 'r', encoding='utf-8') as fd: own_twts = twtxt.parser.parse_tweets(fd.readlines(), self.own_source) self._cache[self.own_source.url] = {'tweets': own_twts} os.makedirs(os.path.dirname(self.config.datafile), exist_ok=True) self._data = shelve.open(self.config.datafile) for url, feed in self._cache.items(): if url == "last_update": continue tweets = feed['tweets'][-MAX_TWEETS_LIMIT:] if not tweets: continue self.max_author_nick_width = max(self.max_author_nick_width, len(tweets[0].source.nick)) all_tweets.extend(tweets) for twt in tweets: self.enhance_twt(twt) replies_by_subject[twt.subject].append(twt) known_tweet_hashes.add(twt.hash) known_tweet_hashes.add(twt.old_hash) entry = self._data.get(twt.hash) twt.read = bool(entry and entry.get("read", False)) if not twt.read: entry = self._data.get(twt.old_hash) twt.read = bool(entry and entry.get("read", False)) if twt.read: self.read_twts_count += 1 else: self.unread_twts_count += 1 for tweets in replies_by_subject.values(): tweets.sort() all_tweets.sort() #all_tweets = all_tweets[-MAX_TWEETS_LIMIT:] # TODO Add cyclic dependency check!! # list of all top level tweets conversation_tree = replies_by_subject.pop(None, []) # build conversation trees for twt in all_tweets: twt.replies.extend(replies_by_subject.get(twt.hash, [])) twt.replies.extend(replies_by_subject.get(twt.old_hash, [])) self.missing_conversation_root_count = 0 # ensure that every conversation has at least a fake root tweet and thus is reachable for hash, replies in replies_by_subject.items(): if hash not in known_tweet_hashes: if not replies: print("WARNING: referenced tweet '%s' has no replies, WTF?!" % hash) created_at = datetime.datetime.now() else: created_at = replies[0].created_at twt = twtxt.models.Tweet(created_at=created_at, text=" ", # constructor raises on empty string, will be reset down below source=twtxt.models.Source(nick="UNKNOWN", url=None)) twt.text = "" twt.hash = hash twt.old_hash = "?" twt.subject = None twt.tokens = [] twt.replies = replies entry = self._data.get(hash) twt.read = entry and entry.get("read", False) twt.missing = True conversation_tree.append(twt) self.missing_conversation_root_count += 1 conversation_tree.sort() self.twts_count = len(all_tweets) if self.show_read_conversations: self.conversation_tree = conversation_tree else: self.conversation_tree = [] for twt in conversation_tree: if self.show_unread_missing_conversation_roots: consider_twt_read = twt.read else: consider_twt_read = (hasattr(twt, "missing") and twt.missing) or twt.read if not consider_twt_read or self._has_unread_replies(twt): self.conversation_tree.append(twt) def enhance_twt(self, twt): """ Parse the text of the given twt, store all its enhanced information (such as hashes, tokens, subject, replies) in the twt and return it for further processing. """ twt.hash = twtxthash.create_hash(twt) twt.old_hash = twtxthash.create_old_hash(twt) if "\t" in twt.text: twt.text = twt.text.replace("\t", " ") twt.tokens = list(twtxtparser.parse_twt_text(twt.text)) twt.subject = next((token.hash for token in twt.tokens if isinstance(token, twtxtparser.SubjectHash)), None) twt.replies = [] twt.read = False return twt def toggle_read(self, twt): """ Toggle the read status of the given twt and sync it to disk. """ twt.read = not twt.read entry = self._data.get(twt.hash, {}) entry["read"] = twt.read self._data[twt.hash] = entry self._data.sync() self.read_twts_count += 1 if twt.read else -1 self.unread_twts_count -= 1 if twt.read else -1 def resolve_nick_by_url_from_cache(self, url): """ Resolve the given source URL to a nick using the cache. TODO: We could also use our config instead! This would even allow us to resolve our own nick which might be different from the twt's one. """ feed = self._cache.get(url) if feed: for twt in feed['tweets']: return twt.source.nick return None def is_following(self, nick, url): if url: return url in self._cache if nick: for feed in self._cache.values(): if not isinstance(feed, dict): continue tweets = feed['tweets'] if tweets and tweets[0].source.nick == nick: return True return False def _has_unread_replies(self, twt): """ Determine whether there are any unread replies in the given conversation without taking the read status of the given twt itself into account. """ for reply in twt.replies: if not reply.read or self._has_unread_replies(reply): return True return False def publish_twt(self, created_at, text): """ Publish the given text at the specified timestamp by writing it to the local twtxt.txt file. """ with open(self.config.twtfile, 'a', encoding='utf-8') as fd: fd.write("%s\t%s\n" % (created_at.isoformat(), text)) class Config: _CONFIG_DIR = os.path.join(xdg.xdg_config_home(), 'twtxt') _CONFIG_FILE = os.path.join(_CONFIG_DIR, 'config') _CACHE_DIR = os.path.join(xdg.xdg_cache_home(), 'twtxt') # The original twtxt implementation actually places its 'cache' file in the # config directory, which is wrong. The XDG Base Directory Specification # defines a dedicated cache directory. To be backwards-compatible, we cannot # fix this, though. :-( _CACHE_FILE = os.path.join(_CONFIG_DIR, 'cache') _DATA_DIR = os.path.join(xdg.xdg_data_home(), 'twtxt') _DATA_FILE = os.path.join(_DATA_DIR, 'data') def __init__(self, cfg): self._cfg = cfg @classmethod def load_file(cls): if not os.path.exists(Config._CONFIG_FILE): raise ValueError("Config file '%s' does not exist." % Config._CONFIG_FILE) cfg = configparser.ConfigParser() cfg.read(Config._CONFIG_FILE) return cls(cfg) @property def nick(self): return self._cfg.get("twtxt", "nick", fallback=os.environ.get("USER", "").lower()) @property def twtfile(self): return self._make_abs(Config._CONFIG_DIR, self._cfg.get("twtxt", "twtfile", fallback="twtxt.txt")) @property def twturl(self): return self._cfg.get("twtxt", "twturl", fallback=None) @property def cachefile(self): return self._make_abs(Config._CONFIG_DIR, self._cfg.get("twtxt", "cachefile", fallback="cache")) @property def datafile(self): return self._make_abs(Config._CONFIG_DIR, self._cfg.get("twtxt", "datafile", fallback=Config._DATA_FILE)) def _make_abs(self, base_path, path): """ Expand user homes and environment variables in the given `path` and make it absolute with the given `base_path` in case it is relative. """ path = os.path.expanduser(os.path.expandvars(path)) if os.path.isabs(path): return path return os.path.join(base_path, path) def colors(self, name): return self._cfg.get("colors", name, fallback=None) @property def nick_colors(self): if self._cfg.has_section("nick-colors"): return self._cfg.items("nick-colors") return []