# pylint: disable=C0111,R0903 """RSS news ticker Fetches rss news items and shows these as a news ticker. Left-clicking will open the full story in a browser. New stories are highlighted. Parameters: * rss.feeds : Space-separated list of RSS URLs * rss.length : Maximum length of the module, default is 60 """ try: import feedparser DEPENDENCIES_OK = True except ImportError: DEPENDENCIES_OK = False import webbrowser import time import os import tempfile import logging import random import re import bumblebee.input import bumblebee.output import bumblebee.engine # pylint: disable=too-many-instance-attributes class Module(bumblebee.engine.Module): REFRESH_DELAY = 600 SCROLL_SPEED = 3 LAYOUT_STYLES_ITEMS = [[1,1,1],[3,3,2],[2,3,3],[3,2,3]] def __init__(self, engine, config): super(Module, self).__init__(engine, config, bumblebee.output.Widget(full_text=self.ticker_update if DEPENDENCIES_OK else self._show_error) ) # Use BBC newsfeed as demo: self._feeds = self.parameter('feeds', 'https://www.espn.com/espn/rss/news').split(" ") self._feeds_to_update = [] self._max_title_length = int(self.parameter("length", 60)) self._items = [] self._current_item = None self._ticker_offset = 0 self._pre_delay = 0 self._post_delay = 0 self._state = [] self._newspaper_filename = tempfile.mktemp('.html') self._last_refresh = 0 self._last_update = 0 engine.input.register_callback(self, button=bumblebee.input.LEFT_MOUSE, cmd=self._open) engine.input.register_callback(self, button=bumblebee.input.RIGHT_MOUSE, cmd=self._create_newspaper) def _open(self, _): if self._current_item: webbrowser.open(self._current_item['link']) def _check_for_image(self, entry): image = next(iter([l['href'] for l in entry['links'] if l['rel']=='enclosure']), None) if not image and 'media_content' in entry: try: media = sorted(entry['media_content'], key=lambda i: i['height'] if 'height' in i else 0, reverse=True) image = next(iter([i['url'] for i in media if i['medium']=='image']), None) except Exception: pass if not image: match = re.search(']*src\s*=["\']*([^\s^>^"^\']*)["\']*', entry['summary']) if match: image=match.group(1) return image if image else '' def _remove_tags(self, txt): return re.sub('<[^>]*>', '', txt) def _create_item(self, entry, url, feed): return {'title': self._remove_tags(entry['title'].replace('\n', ' ')), 'link': entry['link'], 'new': all([i['title'] != entry['title'] for i in self._items]), 'source': url, 'summary': self._remove_tags(entry['summary']), 'feed': feed, 'image': self._check_for_image(entry), 'published': time.mktime(entry.published_parsed) if hasattr(entry, 'published_parsed') else 0} def _update_items_from_feed(self, url): parser = feedparser.parse(url) new_items = [self._create_item(entry, url, parser['feed']['title']) for entry in parser['entries']] # Remove the previous items self._items = [i for i in self._items if i['source'] != url] # Add the new items self._items.extend(new_items) # Sort the items on publish date self._items.sort(key=lambda i: i['published'], reverse=True) def _check_for_refresh(self): if self._feeds_to_update: # Update one feed at a time to not overload this update cycle url = self._feeds_to_update.pop() self._update_items_from_feed(url) if not self._current_item: self._next_item() elif time.time()-self._last_refresh >= self.REFRESH_DELAY: # Populate the list with feeds to update self._feeds_to_update = self._feeds[:] # Update the refresh time self._last_refresh = time.time() def _next_item(self): self._ticker_offset = 0 self._pre_delay = 2 self._post_delay = 4 if not self._items: return # Index of the current element idx = self._items.index(self._current_item) if self._current_item in self._items else - 1 # First show new items, else show next new_items = [i for i in self._items if i['new']] self._current_item = next(iter(new_items), self._items[(idx+1) % len(self._items)]) def _check_scroll_done(self): # Check if the complete title has been shown if self._ticker_offset + self._max_title_length > len(self._current_item['title']): # Do not immediately show next item after scroll self._post_delay -= 1 if self._post_delay == 0: self._current_item['new'] = False # Mark the previous item as 'old' self._next_item() else: # Increase scroll position self._ticker_offset += self.SCROLL_SPEED def _show_error(self, _): return "Please install feedparser first" def ticker_update(self, _): # Only update the ticker once a second now = time.time() if now-self._last_update < 1: return self._response self._last_update = now self._check_for_refresh() # If no items were retrieved, return an empty string if not self._current_item: return " "*self._max_title_length # Prepare a substring of the item title self._response = self._current_item['title'][self._ticker_offset:self._ticker_offset+self._max_title_length] # Add spaces if too short self._response = self._response.ljust(self._max_title_length) # Do not immediately scroll if self._pre_delay > 0: # Change state during pre_delay for new items if self._current_item['new']: self._state = ['warning'] self._pre_delay -= 1 return self._response self._state = [] self._check_scroll_done() return self._response def update(self, widgets): pass def state(self, _): return self._state def _create_news_element(self, item, overlay_title): try: timestr = "" if item['published'] == 0 else str(time.ctime(item['published'])) except Exception as e: logging.error(str(e)) raise e element = "
"
for j in range(0, self.LAYOUT_STYLES_ITEMS[style][i]):
if newspaper_items:
section += self._create_news_element(newspaper_items[0], self.LAYOUT_STYLES_ITEMS[style][i]!=3)
del newspaper_items[0]
section += " | "
section += "