|
|
|
@@ -0,0 +1,335 @@
|
|
|
|
|
"""Small standard-library web service for publicpaste."""
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
import sys
|
|
|
|
|
import threading
|
|
|
|
|
import time
|
|
|
|
|
from dataclasses import dataclass
|
|
|
|
|
from http import HTTPStatus
|
|
|
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import Any
|
|
|
|
|
|
|
|
|
|
from .core import PasteError, paste_text
|
|
|
|
|
|
|
|
|
|
MAX_REQUEST_BYTES = 1024 * 1024
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
|
|
|
class Settings:
|
|
|
|
|
host: str
|
|
|
|
|
port: int
|
|
|
|
|
data_dir: Path
|
|
|
|
|
timeout: float
|
|
|
|
|
overall_timeout: float
|
|
|
|
|
verify: bool
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _env_float(name: str, default: float) -> float:
|
|
|
|
|
value = os.environ.get(name)
|
|
|
|
|
if value is None or value == "":
|
|
|
|
|
return default
|
|
|
|
|
try:
|
|
|
|
|
parsed = float(value)
|
|
|
|
|
except ValueError as exc:
|
|
|
|
|
raise ValueError(f"{name} must be a number") from exc
|
|
|
|
|
if parsed <= 0:
|
|
|
|
|
raise ValueError(f"{name} must be positive")
|
|
|
|
|
return parsed
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _env_int(name: str, default: int) -> int:
|
|
|
|
|
value = os.environ.get(name)
|
|
|
|
|
if value is None or value == "":
|
|
|
|
|
return default
|
|
|
|
|
try:
|
|
|
|
|
parsed = int(value)
|
|
|
|
|
except ValueError as exc:
|
|
|
|
|
raise ValueError(f"{name} must be an integer") from exc
|
|
|
|
|
if parsed <= 0:
|
|
|
|
|
raise ValueError(f"{name} must be positive")
|
|
|
|
|
return parsed
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _env_bool(name: str, default: bool) -> bool:
|
|
|
|
|
value = os.environ.get(name)
|
|
|
|
|
if value is None or value == "":
|
|
|
|
|
return default
|
|
|
|
|
return value.strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_settings() -> Settings:
|
|
|
|
|
return Settings(
|
|
|
|
|
host=os.environ.get("PUBLICPASTE_HOST") or "0.0.0.0",
|
|
|
|
|
port=_env_int("PUBLICPASTE_PORT", 12072),
|
|
|
|
|
data_dir=Path(os.environ.get("PUBLICPASTE_DATA_DIR") or "/data"),
|
|
|
|
|
timeout=_env_float("PUBLICPASTE_TIMEOUT", 8.0),
|
|
|
|
|
overall_timeout=_env_float("PUBLICPASTE_OVERALL_TIMEOUT", 12.0),
|
|
|
|
|
verify=_env_bool("PUBLICPASTE_VERIFY", False),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PublicPasteServer(ThreadingHTTPServer):
|
|
|
|
|
settings: Settings
|
|
|
|
|
log_lock: threading.Lock
|
|
|
|
|
|
|
|
|
|
def __init__(self, server_address: tuple[str, int], settings: Settings) -> None:
|
|
|
|
|
super().__init__(server_address, PublicPasteHandler)
|
|
|
|
|
self.settings = settings
|
|
|
|
|
self.log_lock = threading.Lock()
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def log_path(self) -> Path:
|
|
|
|
|
return self.settings.data_dir / "publicpaste.log"
|
|
|
|
|
|
|
|
|
|
def append_api_log(self, entry: dict[str, Any]) -> None:
|
|
|
|
|
entry.setdefault("time", time.strftime("%Y-%m-%dT%H:%M:%S%z"))
|
|
|
|
|
line = json.dumps(entry, ensure_ascii=False, sort_keys=True)
|
|
|
|
|
with self.log_lock:
|
|
|
|
|
with self.log_path.open("a", encoding="utf-8") as log_file:
|
|
|
|
|
log_file.write(line + "\n")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PublicPasteHandler(BaseHTTPRequestHandler):
|
|
|
|
|
server: PublicPasteServer
|
|
|
|
|
|
|
|
|
|
def do_GET(self) -> None: # noqa: N802 - BaseHTTPRequestHandler API
|
|
|
|
|
if self.path == "/" or self.path.startswith("/?"):
|
|
|
|
|
self._send_html(INDEX_HTML)
|
|
|
|
|
return
|
|
|
|
|
if self.path == "/health" or self.path.startswith("/health?"):
|
|
|
|
|
self._send_json(HTTPStatus.OK, {"ok": True, "service": "publicpaste"})
|
|
|
|
|
return
|
|
|
|
|
self._send_json(HTTPStatus.NOT_FOUND, {"error": "not found"})
|
|
|
|
|
|
|
|
|
|
def do_POST(self) -> None: # noqa: N802 - BaseHTTPRequestHandler API
|
|
|
|
|
if self.path != "/api/paste":
|
|
|
|
|
self._send_json(HTTPStatus.NOT_FOUND, {"error": "not found"})
|
|
|
|
|
return
|
|
|
|
|
started = time.monotonic()
|
|
|
|
|
try:
|
|
|
|
|
payload = self._read_json_body()
|
|
|
|
|
text = payload.get("text") if isinstance(payload, dict) else None
|
|
|
|
|
if not isinstance(text, str) or text.strip() == "":
|
|
|
|
|
raise ClientError(HTTPStatus.BAD_REQUEST, "text is required")
|
|
|
|
|
result = paste_text(
|
|
|
|
|
text,
|
|
|
|
|
timeout=self.server.settings.timeout,
|
|
|
|
|
overall_timeout=self.server.settings.overall_timeout,
|
|
|
|
|
verify=self.server.settings.verify,
|
|
|
|
|
)
|
|
|
|
|
except ClientError as exc:
|
|
|
|
|
self._log_api(False, int(exc.status), error=exc.message, duration=time.monotonic() - started)
|
|
|
|
|
self._send_json(exc.status, {"error": exc.message})
|
|
|
|
|
return
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
self._log_api(False, 400, error="invalid json", duration=time.monotonic() - started)
|
|
|
|
|
self._send_json(HTTPStatus.BAD_REQUEST, {"error": "invalid json"})
|
|
|
|
|
return
|
|
|
|
|
except PasteError as exc:
|
|
|
|
|
self._log_api(False, 502, error=str(exc), duration=time.monotonic() - started)
|
|
|
|
|
self._send_json(HTTPStatus.BAD_GATEWAY, {"error": "all providers failed"})
|
|
|
|
|
return
|
|
|
|
|
except Exception as exc: # noqa: BLE001 - keep HTTP errors concise.
|
|
|
|
|
self._log_api(False, 500, error=str(exc), duration=time.monotonic() - started)
|
|
|
|
|
self._send_json(HTTPStatus.INTERNAL_SERVER_ERROR, {"error": "server error"})
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
body = {"url": result.url, "provider": result.provider}
|
|
|
|
|
self._log_api(True, 200, provider=result.provider, url=result.url, duration=time.monotonic() - started)
|
|
|
|
|
self._send_json(HTTPStatus.OK, body)
|
|
|
|
|
|
|
|
|
|
def log_message(self, format: str, *args: Any) -> None: # noqa: A002 - inherited name.
|
|
|
|
|
print(f"{self.address_string()} - {format % args}", file=sys.stderr)
|
|
|
|
|
|
|
|
|
|
def _read_json_body(self) -> Any:
|
|
|
|
|
content_length = self.headers.get("Content-Length")
|
|
|
|
|
if content_length is None:
|
|
|
|
|
raise ClientError(HTTPStatus.BAD_REQUEST, "missing content length")
|
|
|
|
|
try:
|
|
|
|
|
length = int(content_length)
|
|
|
|
|
except ValueError as exc:
|
|
|
|
|
raise ClientError(HTTPStatus.BAD_REQUEST, "invalid content length") from exc
|
|
|
|
|
if length > MAX_REQUEST_BYTES:
|
|
|
|
|
raise ClientError(HTTPStatus.REQUEST_ENTITY_TOO_LARGE, "request too large")
|
|
|
|
|
body = self.rfile.read(length)
|
|
|
|
|
try:
|
|
|
|
|
return json.loads(body.decode("utf-8"))
|
|
|
|
|
except UnicodeDecodeError as exc:
|
|
|
|
|
raise json.JSONDecodeError("invalid utf-8", "", 0) from exc
|
|
|
|
|
|
|
|
|
|
def _send_html(self, content: str) -> None:
|
|
|
|
|
body = content.encode("utf-8")
|
|
|
|
|
self.send_response(HTTPStatus.OK)
|
|
|
|
|
self.send_header("Content-Type", "text/html; charset=utf-8")
|
|
|
|
|
self.send_header("Content-Length", str(len(body)))
|
|
|
|
|
self.end_headers()
|
|
|
|
|
self.wfile.write(body)
|
|
|
|
|
|
|
|
|
|
def _send_json(self, status: HTTPStatus, payload: dict[str, Any]) -> None:
|
|
|
|
|
body = json.dumps(payload, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
|
|
|
|
|
self.send_response(status)
|
|
|
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
|
|
|
self.send_header("Content-Length", str(len(body)))
|
|
|
|
|
self.end_headers()
|
|
|
|
|
self.wfile.write(body)
|
|
|
|
|
|
|
|
|
|
def _log_api(
|
|
|
|
|
self,
|
|
|
|
|
ok: bool,
|
|
|
|
|
status: int,
|
|
|
|
|
*,
|
|
|
|
|
duration: float,
|
|
|
|
|
error: str | None = None,
|
|
|
|
|
provider: str | None = None,
|
|
|
|
|
url: str | None = None,
|
|
|
|
|
) -> None:
|
|
|
|
|
entry: dict[str, Any] = {
|
|
|
|
|
"event": "api.paste",
|
|
|
|
|
"ok": ok,
|
|
|
|
|
"status": status,
|
|
|
|
|
"duration_ms": round(duration * 1000, 2),
|
|
|
|
|
"client": self.client_address[0],
|
|
|
|
|
}
|
|
|
|
|
if error:
|
|
|
|
|
entry["error"] = error
|
|
|
|
|
if provider:
|
|
|
|
|
entry["provider"] = provider
|
|
|
|
|
if url:
|
|
|
|
|
entry["url"] = url
|
|
|
|
|
self.server.append_api_log(entry)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ClientError(Exception):
|
|
|
|
|
def __init__(self, status: HTTPStatus, message: str) -> None:
|
|
|
|
|
super().__init__(message)
|
|
|
|
|
self.status = status
|
|
|
|
|
self.message = message
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
INDEX_HTML = f"""<!doctype html>
|
|
|
|
|
<html lang="zh-CN">
|
|
|
|
|
<head>
|
|
|
|
|
<meta charset="utf-8">
|
|
|
|
|
<meta name="viewport" content="width=device-width, initial-scale=1">
|
|
|
|
|
<title>publicpaste</title>
|
|
|
|
|
<style>
|
|
|
|
|
:root {{ color-scheme: light dark; font-family: system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif; }}
|
|
|
|
|
body {{ margin: 0; min-height: 100vh; display: grid; place-items: center; background: #f4f6f8; color: #17202a; }}
|
|
|
|
|
main {{ width: min(880px, calc(100vw - 32px)); background: white; border-radius: 18px; box-shadow: 0 18px 45px rgba(0,0,0,.10); padding: 28px; }}
|
|
|
|
|
h1 {{ margin: 0 0 8px; font-size: 28px; }}
|
|
|
|
|
p {{ color: #59636e; }}
|
|
|
|
|
textarea {{ box-sizing: border-box; width: 100%; min-height: 280px; resize: vertical; border: 1px solid #ccd3da; border-radius: 12px; padding: 14px; font: 15px/1.5 ui-monospace, SFMono-Regular, Menlo, Consolas, monospace; }}
|
|
|
|
|
button {{ border: 0; border-radius: 999px; padding: 11px 18px; background: #1769e0; color: white; font-weight: 700; cursor: pointer; }}
|
|
|
|
|
button:disabled {{ opacity: .65; cursor: wait; }}
|
|
|
|
|
.row {{ display: flex; gap: 12px; align-items: center; flex-wrap: wrap; margin-top: 14px; }}
|
|
|
|
|
.status {{ min-height: 24px; color: #59636e; }}
|
|
|
|
|
.result {{ display: none; margin-top: 18px; padding: 14px; border: 1px solid #d8dee6; border-radius: 12px; background: #f9fafb; word-break: break-all; }}
|
|
|
|
|
.error {{ color: #b42318; }}
|
|
|
|
|
@media (prefers-color-scheme: dark) {{ body {{ background: #111827; color: #f9fafb; }} main {{ background: #1f2937; }} p,.status {{ color: #cbd5e1; }} textarea,.result {{ background: #111827; color: #f9fafb; border-color: #374151; }} }}
|
|
|
|
|
</style>
|
|
|
|
|
</head>
|
|
|
|
|
<body>
|
|
|
|
|
<main>
|
|
|
|
|
<h1>publicpaste</h1>
|
|
|
|
|
<p>输入要发布的文本,点击“完成”后会返回公开 paste 链接并自动复制到剪贴板。</p>
|
|
|
|
|
<textarea id="text" maxlength="{MAX_REQUEST_BYTES}" placeholder="在这里输入文本..."></textarea>
|
|
|
|
|
<div class="row">
|
|
|
|
|
<button id="submit" type="button">完成</button>
|
|
|
|
|
<button id="copy" type="button" style="display:none">再次复制链接</button>
|
|
|
|
|
<span id="status" class="status"></span>
|
|
|
|
|
</div>
|
|
|
|
|
<section id="result" class="result" aria-live="polite">
|
|
|
|
|
<div>Provider: <strong id="provider"></strong></div>
|
|
|
|
|
<div>URL: <a id="url" href="#" target="_blank" rel="noopener noreferrer"></a></div>
|
|
|
|
|
</section>
|
|
|
|
|
</main>
|
|
|
|
|
<script>
|
|
|
|
|
const text = document.getElementById('text');
|
|
|
|
|
const submit = document.getElementById('submit');
|
|
|
|
|
const copy = document.getElementById('copy');
|
|
|
|
|
const statusEl = document.getElementById('status');
|
|
|
|
|
const resultEl = document.getElementById('result');
|
|
|
|
|
const providerEl = document.getElementById('provider');
|
|
|
|
|
const urlEl = document.getElementById('url');
|
|
|
|
|
let currentUrl = '';
|
|
|
|
|
|
|
|
|
|
function setStatus(message, isError = false) {{
|
|
|
|
|
statusEl.textContent = message;
|
|
|
|
|
statusEl.className = isError ? 'status error' : 'status';
|
|
|
|
|
}}
|
|
|
|
|
|
|
|
|
|
async function copyUrl() {{
|
|
|
|
|
if (!currentUrl) return;
|
|
|
|
|
try {{
|
|
|
|
|
await navigator.clipboard.writeText(currentUrl);
|
|
|
|
|
setStatus('链接已复制到剪贴板');
|
|
|
|
|
}} catch (err) {{
|
|
|
|
|
setStatus('复制失败,请手动复制链接', true);
|
|
|
|
|
}}
|
|
|
|
|
}}
|
|
|
|
|
|
|
|
|
|
submit.addEventListener('click', async () => {{
|
|
|
|
|
const value = text.value;
|
|
|
|
|
if (!value.trim()) {{
|
|
|
|
|
setStatus('请输入文本', true);
|
|
|
|
|
return;
|
|
|
|
|
}}
|
|
|
|
|
submit.disabled = true;
|
|
|
|
|
copy.style.display = 'none';
|
|
|
|
|
resultEl.style.display = 'none';
|
|
|
|
|
setStatus('正在提交...');
|
|
|
|
|
try {{
|
|
|
|
|
const response = await fetch('/api/paste', {{
|
|
|
|
|
method: 'POST',
|
|
|
|
|
headers: {{ 'Content-Type': 'application/json' }},
|
|
|
|
|
body: JSON.stringify({{ text: value }}),
|
|
|
|
|
}});
|
|
|
|
|
const data = await response.json().catch(() => ({{ error: 'invalid response' }}));
|
|
|
|
|
if (!response.ok) throw new Error(data.error || 'request failed');
|
|
|
|
|
currentUrl = data.url;
|
|
|
|
|
providerEl.textContent = data.provider || '';
|
|
|
|
|
urlEl.textContent = currentUrl;
|
|
|
|
|
urlEl.href = currentUrl;
|
|
|
|
|
resultEl.style.display = 'block';
|
|
|
|
|
copy.style.display = 'inline-block';
|
|
|
|
|
setStatus('提交成功,正在复制链接...');
|
|
|
|
|
await copyUrl();
|
|
|
|
|
}} catch (err) {{
|
|
|
|
|
setStatus(err instanceof Error ? err.message : '提交失败', true);
|
|
|
|
|
}} finally {{
|
|
|
|
|
submit.disabled = false;
|
|
|
|
|
}}
|
|
|
|
|
}});
|
|
|
|
|
|
|
|
|
|
copy.addEventListener('click', copyUrl);
|
|
|
|
|
</script>
|
|
|
|
|
</body>
|
|
|
|
|
</html>
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main() -> int:
|
|
|
|
|
try:
|
|
|
|
|
settings = load_settings()
|
|
|
|
|
settings.data_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
server = PublicPasteServer((settings.host, settings.port), settings)
|
|
|
|
|
except Exception as exc: # noqa: BLE001
|
|
|
|
|
print(f"publicpaste-web: {exc}", file=sys.stderr)
|
|
|
|
|
return 1
|
|
|
|
|
|
|
|
|
|
print(f"publicpaste-web: listening on {settings.host}:{settings.port}", file=sys.stderr)
|
|
|
|
|
try:
|
|
|
|
|
server.serve_forever()
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
pass
|
|
|
|
|
finally:
|
|
|
|
|
server.server_close()
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover
|
|
|
|
|
raise SystemExit(main())
|