aboutsummaryrefslogtreecommitdiff
path: root/tltxch.py
diff options
context:
space:
mode:
Diffstat (limited to 'tltxch.py')
-rwxr-xr-xtltxch.py291
1 files changed, 291 insertions, 0 deletions
diff --git a/tltxch.py b/tltxch.py
new file mode 100755
index 0000000..702543c
--- /dev/null
+++ b/tltxch.py
@@ -0,0 +1,291 @@
+#!/usr/bin/env python3
+
+import json, base64, sys, argparse, curses
+from dataclasses import dataclass
+from urllib.request import urlopen, Request
+from urllib.error import URLError, HTTPError
+
+USE_SEXTANTS = False
+ROWS, COLS = 25, 40
+
+COLOR_MAP = {
+ "BLACK": curses.COLOR_BLACK,
+ "RED": curses.COLOR_RED,
+ "GREEN": curses.COLOR_GREEN,
+ "YELLOW": curses.COLOR_YELLOW,
+ "BLUE": curses.COLOR_BLUE,
+ "MAGENTA": curses.COLOR_MAGENTA,
+ "CYAN": curses.COLOR_CYAN,
+ "WHITE": curses.COLOR_WHITE,
+}
+
+ANSI_FG = {
+ "BLACK": "\033[30m",
+ "RED": "\033[31m",
+ "GREEN": "\033[32m",
+ "YELLOW": "\033[33m",
+ "BLUE": "\033[34m",
+ "MAGENTA": "\033[35m",
+ "CYAN": "\033[36m",
+ "WHITE": "\033[37m",
+}
+ANSI_BG = {
+ "BLACK": "\033[40m",
+ "RED": "\033[41m",
+ "GREEN": "\033[42m",
+ "YELLOW": "\033[43m",
+ "BLUE": "\033[44m",
+ "MAGENTA": "\033[45m",
+ "CYAN": "\033[46m",
+ "WHITE": "\033[47m",
+}
+ANSI_RESET = "\033[0m"
+
+G0_DE = {0x5B: "Ä", 0x5C: "Ö", 0x5D: "Ü", 0x7B: "ä", 0x7C: "ö", 0x7D: "ü", 0x7E: "ß"}
+
+
+@dataclass
+class RowState:
+ alpha: str = "WHITE"
+ mosaic: bool = False
+ flash: bool = False
+ box: bool = False
+ size: str = "NORMAL"
+ contiguous: bool = True
+ hold_mosaic: bool = False
+ conceal: bool = False
+ bg: str = "BLACK"
+ held_code: int = 0x20
+ held_contiguous: bool = True
+
+
+COLOUR_INDEX = ["BLACK", "RED", "GREEN", "YELLOW", "BLUE", "MAGENTA", "CYAN", "WHITE"]
+
+
+def is_set_after(d: int) -> bool:
+ return d in {
+ 0x00,
+ 0x01,
+ 0x02,
+ 0x03,
+ 0x04,
+ 0x05,
+ 0x06,
+ 0x07,
+ 0x08,
+ 0x0A,
+ 0x0B,
+ 0x0D,
+ 0x0E,
+ 0x0F,
+ 0x10,
+ 0x11,
+ 0x12,
+ 0x13,
+ 0x14,
+ 0x15,
+ 0x16,
+ 0x17,
+ 0x1B,
+ 0x1F,
+ }
+
+
+def apply_control(d: int, st: RowState):
+ if 0x00 <= d <= 0x07:
+ st.alpha = COLOUR_INDEX[d]
+ st.mosaic = False
+ st.held_code = 0x20
+ elif d == 0x08:
+ st.flash = True
+ elif d == 0x09:
+ st.flash = False
+ elif d == 0x0A:
+ st.box = False
+ elif d == 0x0B:
+ st.box = True
+ elif d == 0x0C:
+ if st.size != "NORMAL":
+ st.size = "NORMAL"
+ st.held_code = 0x20
+ elif d == 0x0D:
+ st.size = "DH"
+ st.held_code = 0x20
+ elif d == 0x0E:
+ st.size = "DW"
+ st.held_code = 0x20
+ elif d == 0x0F:
+ st.size = "DS"
+ st.held_code = 0x20
+ elif 0x10 <= d <= 0x17:
+ st.alpha = COLOUR_INDEX[d - 0x10]
+ st.mosaic = True
+ elif d == 0x18:
+ st.conceal = True
+ elif d == 0x19:
+ st.contiguous = True
+ elif d == 0x1A:
+ st.contiguous = False
+ elif d == 0x1B:
+ pass
+ elif d == 0x1C:
+ st.bg = "BLACK"
+ elif d == 0x1D:
+ st.bg = st.alpha
+ elif d == 0x1E:
+ st.hold_mosaic = True
+ elif d == 0x1F:
+ st.hold_mosaic = False
+ st.held_code = 0x20
+
+
+def g0_char(d: int, national="DE") -> str:
+ if national == "DE" and d in G0_DE:
+ return G0_DE[d]
+ if d == 0x7F:
+ return " "
+ return chr(d) if 0x20 <= d <= 0x7E else " "
+
+
+def mosaic_glyph(d: int, contiguous: bool) -> str:
+ if not USE_SEXTANTS:
+ return "#" if d != 0x20 else " "
+ return " " if d != 0x20 else " "
+
+
+def fetch_stream(channel: str, page: int):
+ url = f"https://api.teletext.ch/channels/{channel}/pages/{page}"
+ req = Request(url)
+ try:
+ with urlopen(req, timeout=10) as resp:
+ payload = json.loads(
+ resp.read().decode(resp.headers.get_content_charset() or "utf-8")
+ )
+ ep1 = payload["subpages"][0]["ep1Info"]["data"]["ep1Format"]
+ stream = (
+ base64.b64decode(ep1["header"])
+ + base64.b64decode(ep1["content"])
+ + base64.b64decode(ep1["commandRow"])
+ )
+ return stream, None
+ except (HTTPError, URLError) as e:
+ return None, f"Network error: {e}"
+ except Exception as e:
+ return None, f"Error: {e}"
+
+
+def decode_stream(bytestream: bytes):
+ """Yield (row, col, char, fg_color, bg_color) tuples."""
+ n = ROWS * COLS
+ if bytestream is None:
+ bytestream = bytes([0x20]) * n
+ elif len(bytestream) < n:
+ bytestream += bytes([0x20]) * (n - len(bytestream))
+ elif len(bytestream) > n:
+ bytestream = bytestream[:n]
+
+ idx = 0
+ for row in range(ROWS):
+ if idx == 0 or idx == (ROWS - 1) * COLS:
+ idx += COLS
+ continue
+ st = RowState()
+ for col in range(COLS):
+ d = bytestream[idx] & 0x7F
+ idx += 1
+ if d < 0x20:
+ if is_set_after(d):
+ if st.mosaic and st.hold_mosaic and (st.held_code & 0x40):
+ ch = mosaic_glyph(st.held_code, st.held_contiguous)
+ else:
+ ch = " "
+ yield row, col, ch, st.alpha, st.bg
+ apply_control(d, st)
+ if not is_set_after(d):
+ yield row, col, " ", st.alpha, st.bg
+ continue
+ if st.mosaic:
+ if d != 0x20 and (d & 0x40):
+ st.held_code = d
+ st.held_contiguous = st.contiguous
+ ch = mosaic_glyph(d, st.contiguous)
+ else:
+ ch = " " if st.conceal else g0_char(d, national="DE")
+ yield row, col, ch, st.alpha, st.bg
+
+
+def render_ansi(bytestream, error=None):
+ for row, col, ch, fg, bg in decode_stream(bytestream):
+ if col == 0:
+ sys.stdout.write(ANSI_FG[fg] + ANSI_BG[bg])
+ sys.stdout.write(ANSI_FG[fg] + ANSI_BG[bg] + ch)
+ if col == COLS - 1:
+ sys.stdout.write(ANSI_RESET + "\n")
+ if error:
+ sys.stderr.write(f"[ERROR] {error}\n")
+
+
+def tui_mode(channel, page):
+ def _main(stdscr):
+ curses.curs_set(0)
+ curses.start_color()
+ color_pairs = {}
+ pid = 1
+ for fg_name, fg_val in COLOR_MAP.items():
+ for bg_name, bg_val in COLOR_MAP.items():
+ curses.init_pair(pid, fg_val, bg_val)
+ color_pairs[(fg_name, bg_name)] = pid
+ pid += 1
+
+ current_page = page
+ status_msg = ""
+ while True:
+ stream, err = fetch_stream(channel, current_page)
+ status_msg = err or f"Loaded page {current_page}"
+ stdscr.clear()
+ stdscr.addstr(
+ 0,
+ 0,
+ f"{channel.upper()} Page {current_page} [n]ext [p]rev [g]oto [q]uit",
+ curses.A_REVERSE,
+ )
+ for row, col, ch, fg, bg in decode_stream(stream):
+ stdscr.addstr(
+ row + 1, col, ch, curses.color_pair(color_pairs[(fg, bg)])
+ )
+ stdscr.addstr(ROWS + 1, 0, status_msg[: COLS - 1], curses.A_REVERSE)
+ stdscr.refresh()
+
+ key = stdscr.getch()
+ if key in (ord("q"), 27):
+ break
+ elif key == ord("n"):
+ current_page += 1
+ elif key == ord("p"):
+ current_page -= 1
+ elif key == ord("g"):
+ curses.echo()
+ stdscr.addstr(ROWS + 2, 0, "Goto page: ")
+ p_str = stdscr.getstr(ROWS + 2, 11, 4).decode().strip()
+ curses.noecho()
+ try:
+ new_page = int(p_str)
+ current_page = new_page
+ except ValueError:
+ pass
+
+ curses.wrapper(_main)
+
+
+if __name__ == "__main__":
+ ap = argparse.ArgumentParser(description="Swiss Teletext Client")
+ ap.add_argument("--channel", default="srf1", choices=["srf1", "srfzwei", "srfinfo"])
+ ap.add_argument("--page", default=100, type=int)
+ ap.add_argument("--tui", action="store_true", help="TUI")
+ args = ap.parse_args()
+
+ if args.tui:
+ tui_mode(args.channel, args.page)
+ else:
+ stream, err = fetch_stream(args.channel, args.page)
+ render_ansi(stream, err)