Files
llm-proxy-go/main.go

264 lines
5.9 KiB
Go

package main
import (
"bufio"
"context"
"crypto/tls"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/google/uuid"
"github.com/joho/godotenv"
"github.com/pelletier/go-toml/v2"
)
type Config struct {
UpstreamURL string
ListenAddr string
APIKey string
Insecure bool
}
func main() {
if len(os.Args) > 1 && (os.Args[1] == "-h" || os.Args[1] == "--help") {
printHelp()
os.Exit(0)
}
godotenv.Load()
cfg := loadConfig()
printConfig(cfg)
if cfg.APIKey == "" {
fmt.Fprintln(os.Stderr, "error: API_KEY is required")
os.Exit(1)
}
if cfg.Insecure {
fmt.Fprintln(os.Stderr, "WARNING: TLS verification disabled")
}
mux := http.NewServeMux()
mux.HandleFunc("/", handleProxy(cfg))
srv := &http.Server{
Addr: cfg.ListenAddr,
Handler: mux,
}
go func() {
fmt.Printf("LLM Proxy listening on %s proxy to upstream: %s\n", cfg.ListenAddr, cfg.UpstreamURL)
srv.ListenAndServe()
}()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
srv.Shutdown(ctx)
}
func handleProxy(cfg Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
requestId := uuid.New().String()
log.Println("Handle proxy: ", requestId, r.URL)
if r.URL.Path == "/health" {
w.WriteHeader(http.StatusOK)
return
}
proxyReq := cloneRequest(requestId, r, cfg.UpstreamURL)
if cfg.APIKey != "" {
proxyReq.Header.Set("Authorization", "Bearer "+cfg.APIKey)
}
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: cfg.Insecure},
}
client := &http.Client{Transport: tr}
resp, err := client.Do(proxyReq)
if err != nil {
log.Println("Upstream error: ", requestId, err)
http.Error(w, fmt.Sprintf("upstream error: %v", err), http.StatusBadGateway)
return
}
defer resp.Body.Close()
for k, v := range resp.Header {
w.Header()[k] = v
}
w.WriteHeader(resp.StatusCode)
isStreamingRequest := isStreaming(resp)
log.Println("Request streaming: ", requestId, isStreamingRequest)
if !isStreamingRequest {
io.Copy(w, resp.Body)
return
}
handleStream(requestId, w, resp.Body, cfg)
}
}
func cloneRequest(requestId string, r *http.Request, upstreamURL string) *http.Request {
upstream, _ := url.Parse(upstreamURL)
proxyReq := r.Clone(context.Background())
proxyReq.URL.Scheme = upstream.Scheme
proxyReq.URL.Host = upstream.Host
proxyReq.URL.Path = strings.ReplaceAll(r.URL.Path, upstream.Path, "")
if upstream.Path != "" && !strings.HasSuffix(proxyReq.URL.Path, "/") {
proxyReq.URL.Path = upstream.Path + proxyReq.URL.Path
}
proxyReq.Host = upstream.Host
log.Println("Upstream proxy: ", requestId, proxyReq.URL)
if val := r.Header.Get("Content-Type"); val != "" {
proxyReq.Header.Set("Content-Type", val)
}
proxyReq.Header.Del("Host")
proxyReq.Header.Del("Authorization")
proxyReq.RequestURI = ""
return proxyReq
}
func isStreaming(resp *http.Response) bool {
ct := strings.ToLower(resp.Header.Get("Content-Type"))
return strings.Contains(ct, "text/event-stream") ||
strings.Contains(ct, "application/x-ndjson") ||
strings.Contains(ct, "stream")
}
func handleStream(requestId string, w io.Writer, body io.Reader, cfg Config) {
reader := bufio.NewReader(body)
for {
line, err := reader.ReadString('\n')
if err != nil {
if err != io.EOF {
log.Println("Stream error: ", requestId, err)
fmt.Fprintf(os.Stderr, "stream error: %v\n", err)
} else {
log.Println("Stream end: ", requestId)
}
break
}
line = strings.TrimRight(line, "\r\n")
if line == "" {
continue
}
processChunk(requestId, w, line, cfg)
}
}
func processChunk(requestId string, w io.Writer, line string, cfg Config) {
log.Println("Process chunk: ", requestId, line)
if strings.HasPrefix(line, "data: ") {
data := strings.TrimPrefix(line, "data: ")
if data == "[DONE]" {
fmt.Fprintln(w, line)
return
}
}
fmt.Fprintln(w, line)
}
func printHelp() {
fmt.Println(`LLM Proxy - HTTP proxy for LLM APIs
Usage:
llm-proxy Start the proxy
llm-proxy -h Show this help
Config:
Config file (optional): llm-proxy.toml
Environment variables take priority over config file.
Environment Variables:
UPSTREAM_URL Upstream LLM API URL (default: https://api.openai.com/v1/chat/completions)
LISTEN_ADDR Listen address (default: :8080)
API_KEY Upstream API key (required)
INSECURE Skip TLS verification (default: false)`)
}
func printConfig(cfg Config) {
masked := cfg.APIKey
if len(masked) > 4 {
masked = "****" + masked[len(masked)-4:]
} else {
masked = "****"
}
fmt.Printf("Upstream URL: %s\n", cfg.UpstreamURL)
fmt.Printf("Listen Addr: %s\n", cfg.ListenAddr)
fmt.Printf("API Key: %s\n", masked)
fmt.Printf("Insecure: %v\n", cfg.Insecure)
}
func loadConfig() Config {
cfg := Config{
UpstreamURL: "https://api.openai.com/v1/chat/completions",
ListenAddr: "127.0.0.1:8080",
}
if data, err := os.ReadFile("llm-proxy.toml"); err == nil {
var tomlCfg struct {
UpstreamURL string `toml:"upstream_url"`
ListenAddr string `toml:"listen_addr"`
APIKey string `toml:"api_key"`
Insecure bool `toml:"insecure"`
}
if err := toml.Unmarshal(data, &tomlCfg); err == nil {
cfg.UpstreamURL = tomlCfg.UpstreamURL
cfg.ListenAddr = tomlCfg.ListenAddr
cfg.APIKey = tomlCfg.APIKey
cfg.Insecure = tomlCfg.Insecure
fmt.Println("Loaded config from llm-proxy.toml")
}
}
if val := getEnv("UPSTREAM_URL", "OPENAI_API_BASE"); val != "" {
cfg.UpstreamURL = val
}
if val := getEnv("API_KEY", "OPENAI_API_KEY"); val != "" {
cfg.APIKey = val
}
if val := getEnv("LISTEN_ADDR"); val != "" {
cfg.ListenAddr = val
}
if val := getEnv("INSECURE"); val != "" {
cfg.Insecure = val == "true"
}
return cfg
}
func getEnv(keys ...string) string {
for _, key := range keys {
if val := os.Getenv(key); val != "" {
return val
}
}
return ""
}