diff --git a/modules/contrib/rss.py b/modules/contrib/rss.py new file mode 100644 index 0000000..2a27d27 --- /dev/null +++ b/modules/contrib/rss.py @@ -0,0 +1,312 @@ +# pylint: disable=C0111,R0903 + +"""RSS news ticker + +Fetches rss news items and shows these as a news ticker. +Left-clicking will open the full story in a browser. +New stories are highlighted. + +Parameters: + * rss.feeds : Space-separated list of RSS URLs + * rss.length : Maximum length of the module, default is 60 +""" + +try: + import feedparser + DEPENDENCIES_OK = True +except ImportError: + DEPENDENCIES_OK = False + +import webbrowser +import time +import os +import tempfile +import logging +import random +import re +import json + +import bumblebee.input +import bumblebee.output +import bumblebee.engine + + +# pylint: disable=too-many-instance-attributes +class Module(bumblebee.engine.Module): + REFRESH_DELAY = 600 + SCROLL_SPEED = 3 + LAYOUT_STYLES_ITEMS = [[1,1,1],[3,3,2],[2,3,3],[3,2,3]] + HISTORY_FILENAME = ".config/i3/rss.hist" + + def __init__(self, engine, config): + super(Module, self).__init__(engine, config, + bumblebee.output.Widget(full_text=self.ticker_update if DEPENDENCIES_OK else self._show_error) + ) + # Use BBC newsfeed as demo: + self._feeds = self.parameter('feeds', 'https://www.espn.com/espn/rss/news').split(" ") + self._feeds_to_update = [] + self._response = "" + + self._max_title_length = int(self.parameter("length", 60)) + + self._items = [] + self._current_item = None + + self._ticker_offset = 0 + self._pre_delay = 0 + self._post_delay = 0 + + self._state = [] + + self._newspaper_filename = tempfile.mktemp('.html') + + self._last_refresh = 0 + self._last_update = 0 + + engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE, cmd=self._open) + engine.input.register_callback(self, button=bumblebee.input.RIGHT_MOUSE, cmd=self._create_newspaper) + + self._history = {'ticker': {}, 'newspaper': {}} + self._load_history() + + def _load_history(self): + if os.path.isfile(self.HISTORY_FILENAME): + self._history = json.loads(open(self.HISTORY_FILENAME, "r").read()) + + def _update_history(self, group): + sources = set([i['source'] for i in self._items]) + self._history[group] = dict([[s, [i['title'] for i in self._items if i['source'] == s]] for s in sources]) + + def _save_history(self): + if not os.path.exists(os.path.dirname(self.HISTORY_FILENAME)): + os.makedirs(os.path.dirname(self.HISTORY_FILENAME)) + open(self.HISTORY_FILENAME, "w").write(json.dumps(self._history)) + + def _check_history(self, items, group): + for i in items: + i['new'] = not (i['source'] in self._history[group] and i['title'] in self._history[group][i['source']]) + + def _open(self, _): + if self._current_item: + webbrowser.open(self._current_item['link']) + + def _check_for_image(self, entry): + image = next(iter([l['href'] for l in entry['links'] if l['rel'] == 'enclosure']), None) + if not image and 'media_content' in entry: + try: + media = sorted(entry['media_content'], key=lambda i: i['height'] if 'height' in i else 0, reverse=True) + image = next(iter([i['url'] for i in media if i['medium'] == 'image']), None) + except Exception: + pass + if not image: + match = re.search(r']*src\s*=["\']*([^\s^>^"^\']*)["\']*', entry['summary']) + if match: + image = match.group(1) + return image if image else '' + + def _remove_tags(self, txt): + return re.sub('<[^>]*>', '', txt) + + def _create_item(self, entry, url, feed): + return {'title': self._remove_tags(entry['title'].replace('\n', ' ')), + 'link': entry['link'], + 'new': True, + 'source': url, + 'summary': self._remove_tags(entry['summary']), + 'feed': feed, + 'image': self._check_for_image(entry), + 'published': time.mktime(entry.published_parsed) if hasattr(entry, 'published_parsed') else 0} + + def _update_items_from_feed(self, url): + parser = feedparser.parse(url) + new_items = [self._create_item(entry, url, parser['feed']['title']) for entry in parser['entries']] + # Check history + self._check_history(new_items, 'ticker') + # Remove the previous items + self._items = [i for i in self._items if i['source'] != url] + # Add the new items + self._items.extend(new_items) + # Sort the items on publish date + self._items.sort(key=lambda i: i['published'], reverse=True) + + def _check_for_refresh(self): + if self._feeds_to_update: + # Update one feed at a time to not overload this update cycle + url = self._feeds_to_update.pop() + self._update_items_from_feed(url) + + if not self._feeds_to_update: + self._update_history('ticker') + self._save_history() + + if not self._current_item: + self._next_item() + elif time.time()-self._last_refresh >= self.REFRESH_DELAY: + # Populate the list with feeds to update + self._feeds_to_update = self._feeds[:] + # Update the refresh time + self._last_refresh = time.time() + + def _next_item(self): + self._ticker_offset = 0 + self._pre_delay = 2 + self._post_delay = 4 + + if not self._items: + return + + # Index of the current element + idx = self._items.index(self._current_item) if self._current_item in self._items else - 1 + + # First show new items, else show next + new_items = [i for i in self._items if i['new']] + self._current_item = next(iter(new_items), self._items[(idx+1) % len(self._items)]) + + def _check_scroll_done(self): + # Check if the complete title has been shown + if self._ticker_offset + self._max_title_length > len(self._current_item['title']): + # Do not immediately show next item after scroll + self._post_delay -= 1 + if self._post_delay == 0: + self._current_item['new'] = False + # Mark the previous item as 'old' + self._next_item() + else: + # Increase scroll position + self._ticker_offset += self.SCROLL_SPEED + + def _show_error(self, _): + return "Please install feedparser first" + + def ticker_update(self, _): + # Only update the ticker once a second + now = time.time() + if now-self._last_update < 1: + return self._response + + self._last_update = now + + self._check_for_refresh() + + # If no items were retrieved, return an empty string + if not self._current_item: + return " "*self._max_title_length + + # Prepare a substring of the item title + self._response = self._current_item['title'][self._ticker_offset:self._ticker_offset+self._max_title_length] + # Add spaces if too short + self._response = self._response.ljust(self._max_title_length) + + # Do not immediately scroll + if self._pre_delay > 0: + # Change state during pre_delay for new items + if self._current_item['new']: + self._state = ['warning'] + self._pre_delay -= 1 + return self._response + + self._state = [] + self._check_scroll_done() + + return self._response + + def update(self, widgets): + pass + + def state(self, _): + return self._state + + def _create_news_element(self, item, overlay_title): + try: + timestr = "" if item['published'] == 0 else str(time.ctime(item['published'])) + except Exception as exc: + logging.error(str(exc)) + raise e + element = "
"
+ for _ in range(0, self.LAYOUT_STYLES_ITEMS[style][i]):
+ if newspaper_items:
+ section += self._create_news_element(newspaper_items[0], self.LAYOUT_STYLES_ITEMS[style][i] != 3)
+ del newspaper_items[0]
+ section += " | "
+ section += "