1
0
mirror of https://github.com/openSUSE/osc.git synced 2024-11-10 06:46:15 +01:00

Add output.pipe_to_pager() that pipes lines to a pager without creating an intermediate temporary file

This commit is contained in:
Daniel Mach 2024-04-16 16:56:00 +02:00
parent d1111e23a1
commit dc7efaa6de
2 changed files with 47 additions and 12 deletions

View File

@ -1,6 +1,7 @@
from .key_value_table import KeyValueTable from .key_value_table import KeyValueTable
from .input import get_user_input from .input import get_user_input
from .output import get_default_pager from .output import get_default_pager
from .output import pipe_to_pager
from .output import print_msg from .output import print_msg
from .output import run_pager from .output import run_pager
from .output import sanitize_text from .output import sanitize_text

View File

@ -2,9 +2,11 @@ import os
import platform import platform
import re import re
import shlex import shlex
import subprocess
import sys import sys
import tempfile import tempfile
from typing import Dict from typing import Dict
from typing import List
from typing import Optional from typing import Optional
from typing import TextIO from typing import TextIO
from typing import Union from typing import Union
@ -170,6 +172,25 @@ def get_default_pager():
return 'more' return 'more'
def get_pager():
"""
Return (pager, env) where
``pager`` is a list with parsed pager command
``env`` is copy of os.environ() with added variables specific to the pager
"""
env = os.environ.copy()
pager = os.getenv("PAGER", default="").strip()
pager = pager or get_default_pager()
# LESS env is not always set and we need -R to display escape sequences properly
less_opts = os.getenv("LESS", default="")
if "-R" not in less_opts:
less_opts += " -R"
env["LESS"] = less_opts
return shlex.split(pager), env
def run_pager(message: Union[bytes, str], tmp_suffix: str = ""): def run_pager(message: Union[bytes, str], tmp_suffix: str = ""):
from ..core import run_external from ..core import run_external
@ -185,16 +206,29 @@ def run_pager(message: Union[bytes, str], tmp_suffix: str = ""):
safe_write(tmpfile, message) safe_write(tmpfile, message)
tmpfile.flush() tmpfile.flush()
env = os.environ.copy() pager, env = get_pager()
cmd = pager + [tmpfile.name]
pager = os.getenv("PAGER", default="").strip()
pager = pager or get_default_pager()
# LESS env is not always set and we need -R to display escape sequences properly
less_opts = os.getenv("LESS", default="")
if "-R" not in less_opts:
less_opts += " -R"
env["LESS"] = less_opts
cmd = shlex.split(pager) + [tmpfile.name]
run_external(*cmd, env=env) run_external(*cmd, env=env)
def pipe_to_pager(lines: Union[List[bytes], List[str]], *, add_newlines=False):
"""
Pipe ``lines`` to the pager.
If running in a non-interactive terminal, print the data instead.
Add a newline after each line if ``add_newlines`` is ``True``.
"""
if not tty.IS_INTERACTIVE:
for line in lines:
safe_write(sys.stdout, line, add_newline=add_newlines)
return
pager, env = get_pager()
with subprocess.Popen(pager, stdin=subprocess.PIPE, encoding="utf-8", env=env) as proc:
try:
for line in lines:
safe_write(proc.stdin, line, add_newline=add_newlines)
proc.stdin.flush()
proc.stdin.close()
except BrokenPipeError:
pass
proc.wait()