140 lines
5.5 KiB
Python
140 lines
5.5 KiB
Python
import http.server
|
|
import socketserver
|
|
import os
|
|
import logging
|
|
import signal
|
|
import sys
|
|
import threading
|
|
import fnmatch
|
|
|
|
PORT = 8080
|
|
RESPONSE_DIR = "/app/responses"
|
|
STATE_DIR = "/tmp/mock_obs_state"
|
|
|
|
class MockOBSHandler(http.server.SimpleHTTPRequestHandler):
|
|
def do_GET(self):
|
|
logging.info(f"GET request for: {self.path}")
|
|
path_without_query = self.path.split('?')[0]
|
|
|
|
# Check for state stored by a PUT request first
|
|
sanitized_put_path = 'PUT' + path_without_query.replace('/', '_')
|
|
state_file_path = os.path.join(STATE_DIR, sanitized_put_path)
|
|
if os.path.exists(state_file_path):
|
|
logging.info(f"Found stored PUT state for {self.path} at {state_file_path}")
|
|
self.send_response(200)
|
|
self.send_header("Content-type", "application/xml")
|
|
file_size = os.path.getsize(state_file_path)
|
|
self.send_header("Content-Length", str(file_size))
|
|
self.end_headers()
|
|
with open(state_file_path, 'rb') as f:
|
|
self.wfile.write(f.read())
|
|
return
|
|
|
|
# If no PUT state file, fall back to the glob/exact match logic
|
|
self.handle_request('GET')
|
|
|
|
def do_PUT(self):
|
|
logging.info(f"PUT request for: {self.path}")
|
|
logging.info(f"Headers: {self.headers}")
|
|
path_without_query = self.path.split('?')[0]
|
|
|
|
body = b''
|
|
if self.headers.get('Transfer-Encoding', '').lower() == 'chunked':
|
|
logging.info("Chunked transfer encoding detected")
|
|
while True:
|
|
line = self.rfile.readline().strip()
|
|
if not line:
|
|
break
|
|
chunk_length = int(line, 16)
|
|
if chunk_length == 0:
|
|
self.rfile.readline()
|
|
break
|
|
body += self.rfile.read(chunk_length)
|
|
self.rfile.read(2) # Read the trailing CRLF
|
|
else:
|
|
content_length = int(self.headers.get('Content-Length', 0))
|
|
body = self.rfile.read(content_length)
|
|
|
|
logging.info(f"Body: {body.decode('utf-8')}")
|
|
sanitized_path = 'PUT' + path_without_query.replace('/', '_')
|
|
state_file_path = os.path.join(STATE_DIR, sanitized_path)
|
|
|
|
logging.info(f"Saving state for {self.path} to {state_file_path}")
|
|
os.makedirs(os.path.dirname(state_file_path), exist_ok=True)
|
|
with open(state_file_path, 'wb') as f:
|
|
f.write(body)
|
|
|
|
self.send_response(200)
|
|
self.send_header("Content-type", "text/plain")
|
|
response_body = b"OK"
|
|
self.send_header("Content-Length", str(len(response_body)))
|
|
self.end_headers()
|
|
self.wfile.write(response_body)
|
|
|
|
def do_POST(self):
|
|
logging.info(f"POST request for: {self.path}")
|
|
self.handle_request('POST')
|
|
|
|
def do_DELETE(self):
|
|
logging.info(f"DELETE request for: {self.path}")
|
|
self.handle_request('DELETE')
|
|
|
|
def handle_request(self, method):
|
|
path_without_query = self.path.split('?')[0]
|
|
sanitized_request_path = method + path_without_query.replace('/', '_')
|
|
logging.info(f"Handling request, looking for match for: {sanitized_request_path}")
|
|
|
|
response_file = None
|
|
# Check for glob match first
|
|
if os.path.exists(RESPONSE_DIR):
|
|
for filename in os.listdir(RESPONSE_DIR):
|
|
if fnmatch.fnmatch(sanitized_request_path, filename):
|
|
response_file = os.path.join(RESPONSE_DIR, filename)
|
|
logging.info(f"Found matching response file (glob): {response_file}")
|
|
break
|
|
|
|
# Fallback to exact match if no glob match
|
|
if response_file is None:
|
|
exact_file = os.path.join(RESPONSE_DIR, sanitized_request_path)
|
|
if os.path.exists(exact_file):
|
|
response_file = exact_file
|
|
logging.info(f"Found matching response file (exact): {response_file}")
|
|
|
|
if response_file:
|
|
logging.info(f"Serving content from {response_file}")
|
|
self.send_response(200)
|
|
self.send_header("Content-type", "application/xml")
|
|
file_size = os.path.getsize(response_file)
|
|
self.send_header("Content-Length", str(file_size))
|
|
self.end_headers()
|
|
with open(response_file, 'rb') as f:
|
|
self.wfile.write(f.read())
|
|
else:
|
|
logging.info(f"Response file not found for {sanitized_request_path}. Sending 404.")
|
|
self.send_response(404)
|
|
self.send_header("Content-type", "text/plain")
|
|
body = f"Mock response not found for {sanitized_request_path}".encode('utf-8')
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
|
|
|
|
if not os.path.exists(STATE_DIR):
|
|
logging.info(f"Creating state directory: {STATE_DIR}")
|
|
os.makedirs(STATE_DIR)
|
|
if not os.path.exists(RESPONSE_DIR):
|
|
os.makedirs(RESPONSE_DIR)
|
|
|
|
with socketserver.TCPServer(("", PORT), MockOBSHandler) as httpd:
|
|
logging.info(f"Serving mock OBS API on port {PORT}")
|
|
|
|
def graceful_shutdown(sig, frame):
|
|
logging.info("Received SIGTERM, shutting down gracefully...")
|
|
threading.Thread(target=httpd.shutdown).start()
|
|
|
|
signal.signal(signal.SIGTERM, graceful_shutdown)
|
|
|
|
httpd.serve_forever()
|
|
logging.info("Server has shut down.") |