diff options
Diffstat (limited to 'tltxch.py')
-rwxr-xr-x | tltxch.py | 291 |
1 files changed, 291 insertions, 0 deletions
diff --git a/tltxch.py b/tltxch.py new file mode 100755 index 0000000..702543c --- /dev/null +++ b/tltxch.py @@ -0,0 +1,291 @@ +#!/usr/bin/env python3 + +import json, base64, sys, argparse, curses +from dataclasses import dataclass +from urllib.request import urlopen, Request +from urllib.error import URLError, HTTPError + +USE_SEXTANTS = False +ROWS, COLS = 25, 40 + +COLOR_MAP = { + "BLACK": curses.COLOR_BLACK, + "RED": curses.COLOR_RED, + "GREEN": curses.COLOR_GREEN, + "YELLOW": curses.COLOR_YELLOW, + "BLUE": curses.COLOR_BLUE, + "MAGENTA": curses.COLOR_MAGENTA, + "CYAN": curses.COLOR_CYAN, + "WHITE": curses.COLOR_WHITE, +} + +ANSI_FG = { + "BLACK": "\033[30m", + "RED": "\033[31m", + "GREEN": "\033[32m", + "YELLOW": "\033[33m", + "BLUE": "\033[34m", + "MAGENTA": "\033[35m", + "CYAN": "\033[36m", + "WHITE": "\033[37m", +} +ANSI_BG = { + "BLACK": "\033[40m", + "RED": "\033[41m", + "GREEN": "\033[42m", + "YELLOW": "\033[43m", + "BLUE": "\033[44m", + "MAGENTA": "\033[45m", + "CYAN": "\033[46m", + "WHITE": "\033[47m", +} +ANSI_RESET = "\033[0m" + +G0_DE = {0x5B: "Ä", 0x5C: "Ö", 0x5D: "Ü", 0x7B: "ä", 0x7C: "ö", 0x7D: "ü", 0x7E: "ß"} + + +@dataclass +class RowState: + alpha: str = "WHITE" + mosaic: bool = False + flash: bool = False + box: bool = False + size: str = "NORMAL" + contiguous: bool = True + hold_mosaic: bool = False + conceal: bool = False + bg: str = "BLACK" + held_code: int = 0x20 + held_contiguous: bool = True + + +COLOUR_INDEX = ["BLACK", "RED", "GREEN", "YELLOW", "BLUE", "MAGENTA", "CYAN", "WHITE"] + + +def is_set_after(d: int) -> bool: + return d in { + 0x00, + 0x01, + 0x02, + 0x03, + 0x04, + 0x05, + 0x06, + 0x07, + 0x08, + 0x0A, + 0x0B, + 0x0D, + 0x0E, + 0x0F, + 0x10, + 0x11, + 0x12, + 0x13, + 0x14, + 0x15, + 0x16, + 0x17, + 0x1B, + 0x1F, + } + + +def apply_control(d: int, st: RowState): + if 0x00 <= d <= 0x07: + st.alpha = COLOUR_INDEX[d] + st.mosaic = False + st.held_code = 0x20 + elif d == 0x08: + st.flash = True + elif d == 0x09: + st.flash = False + elif d == 0x0A: + st.box = False + elif d == 0x0B: + st.box = True + elif d == 0x0C: + if st.size != "NORMAL": + st.size = "NORMAL" + st.held_code = 0x20 + elif d == 0x0D: + st.size = "DH" + st.held_code = 0x20 + elif d == 0x0E: + st.size = "DW" + st.held_code = 0x20 + elif d == 0x0F: + st.size = "DS" + st.held_code = 0x20 + elif 0x10 <= d <= 0x17: + st.alpha = COLOUR_INDEX[d - 0x10] + st.mosaic = True + elif d == 0x18: + st.conceal = True + elif d == 0x19: + st.contiguous = True + elif d == 0x1A: + st.contiguous = False + elif d == 0x1B: + pass + elif d == 0x1C: + st.bg = "BLACK" + elif d == 0x1D: + st.bg = st.alpha + elif d == 0x1E: + st.hold_mosaic = True + elif d == 0x1F: + st.hold_mosaic = False + st.held_code = 0x20 + + +def g0_char(d: int, national="DE") -> str: + if national == "DE" and d in G0_DE: + return G0_DE[d] + if d == 0x7F: + return " " + return chr(d) if 0x20 <= d <= 0x7E else " " + + +def mosaic_glyph(d: int, contiguous: bool) -> str: + if not USE_SEXTANTS: + return "#" if d != 0x20 else " " + return " " if d != 0x20 else " " + + +def fetch_stream(channel: str, page: int): + url = f"https://api.teletext.ch/channels/{channel}/pages/{page}" + req = Request(url) + try: + with urlopen(req, timeout=10) as resp: + payload = json.loads( + resp.read().decode(resp.headers.get_content_charset() or "utf-8") + ) + ep1 = payload["subpages"][0]["ep1Info"]["data"]["ep1Format"] + stream = ( + base64.b64decode(ep1["header"]) + + base64.b64decode(ep1["content"]) + + base64.b64decode(ep1["commandRow"]) + ) + return stream, None + except (HTTPError, URLError) as e: + return None, f"Network error: {e}" + except Exception as e: + return None, f"Error: {e}" + + +def decode_stream(bytestream: bytes): + """Yield (row, col, char, fg_color, bg_color) tuples.""" + n = ROWS * COLS + if bytestream is None: + bytestream = bytes([0x20]) * n + elif len(bytestream) < n: + bytestream += bytes([0x20]) * (n - len(bytestream)) + elif len(bytestream) > n: + bytestream = bytestream[:n] + + idx = 0 + for row in range(ROWS): + if idx == 0 or idx == (ROWS - 1) * COLS: + idx += COLS + continue + st = RowState() + for col in range(COLS): + d = bytestream[idx] & 0x7F + idx += 1 + if d < 0x20: + if is_set_after(d): + if st.mosaic and st.hold_mosaic and (st.held_code & 0x40): + ch = mosaic_glyph(st.held_code, st.held_contiguous) + else: + ch = " " + yield row, col, ch, st.alpha, st.bg + apply_control(d, st) + if not is_set_after(d): + yield row, col, " ", st.alpha, st.bg + continue + if st.mosaic: + if d != 0x20 and (d & 0x40): + st.held_code = d + st.held_contiguous = st.contiguous + ch = mosaic_glyph(d, st.contiguous) + else: + ch = " " if st.conceal else g0_char(d, national="DE") + yield row, col, ch, st.alpha, st.bg + + +def render_ansi(bytestream, error=None): + for row, col, ch, fg, bg in decode_stream(bytestream): + if col == 0: + sys.stdout.write(ANSI_FG[fg] + ANSI_BG[bg]) + sys.stdout.write(ANSI_FG[fg] + ANSI_BG[bg] + ch) + if col == COLS - 1: + sys.stdout.write(ANSI_RESET + "\n") + if error: + sys.stderr.write(f"[ERROR] {error}\n") + + +def tui_mode(channel, page): + def _main(stdscr): + curses.curs_set(0) + curses.start_color() + color_pairs = {} + pid = 1 + for fg_name, fg_val in COLOR_MAP.items(): + for bg_name, bg_val in COLOR_MAP.items(): + curses.init_pair(pid, fg_val, bg_val) + color_pairs[(fg_name, bg_name)] = pid + pid += 1 + + current_page = page + status_msg = "" + while True: + stream, err = fetch_stream(channel, current_page) + status_msg = err or f"Loaded page {current_page}" + stdscr.clear() + stdscr.addstr( + 0, + 0, + f"{channel.upper()} Page {current_page} [n]ext [p]rev [g]oto [q]uit", + curses.A_REVERSE, + ) + for row, col, ch, fg, bg in decode_stream(stream): + stdscr.addstr( + row + 1, col, ch, curses.color_pair(color_pairs[(fg, bg)]) + ) + stdscr.addstr(ROWS + 1, 0, status_msg[: COLS - 1], curses.A_REVERSE) + stdscr.refresh() + + key = stdscr.getch() + if key in (ord("q"), 27): + break + elif key == ord("n"): + current_page += 1 + elif key == ord("p"): + current_page -= 1 + elif key == ord("g"): + curses.echo() + stdscr.addstr(ROWS + 2, 0, "Goto page: ") + p_str = stdscr.getstr(ROWS + 2, 11, 4).decode().strip() + curses.noecho() + try: + new_page = int(p_str) + current_page = new_page + except ValueError: + pass + + curses.wrapper(_main) + + +if __name__ == "__main__": + ap = argparse.ArgumentParser(description="Swiss Teletext Client") + ap.add_argument("--channel", default="srf1", choices=["srf1", "srfzwei", "srfinfo"]) + ap.add_argument("--page", default=100, type=int) + ap.add_argument("--tui", action="store_true", help="TUI") + args = ap.parse_args() + + if args.tui: + tui_mode(args.channel, args.page) + else: + stream, err = fetch_stream(args.channel, args.page) + render_ansi(stream, err) |