Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion infra/conf/transport_internet.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) {
switch c.Mode {
case "":
c.Mode = "auto"
case "auto", "packet-up", "stream-up":
case "auto", "packet-up", "stream-up", "stream-one":
default:
return nil, errors.New("unsupported mode: " + c.Mode)
}
Expand All @@ -327,6 +327,9 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) {
}
var err error
if c.DownloadSettings != nil {
if c.Mode == "stream-one" {
return nil, errors.New(`Can not use "downloadSettings" in "stream-one" mode.`)
}
if c.Extra != nil {
c.DownloadSettings.SocketSettings = nil
}
Expand Down Expand Up @@ -707,8 +710,10 @@ func (p TransportProtocol) Build() (string, error) {
case "ws", "websocket":
return "websocket", nil
case "h2", "h3", "http":
errors.PrintDeprecatedFeatureWarning("HTTP transport", "XHTTP transport")
return "http", nil
case "grpc":
errors.PrintMigrateFeatureInfo("gRPC transport", "XHTTP transport")
return "grpc", nil
case "httpupgrade":
return "httpupgrade", nil
Expand Down
4 changes: 4 additions & 0 deletions transport/internet/splithttp/browser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
// has no fields because everything is global state :O)
type BrowserDialerClient struct{}

func (c *BrowserDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) {
panic("not implemented yet")
}

func (c *BrowserDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser {
panic("not implemented yet")
}
Expand Down
65 changes: 65 additions & 0 deletions transport/internet/splithttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type DialerClient interface {
// (ctx, baseURL) -> uploadWriter
// baseURL already contains sessionId
OpenUpload(context.Context, string) io.WriteCloser

// (ctx, pureURL) -> (uploadWriter, downloadReader)
// pureURL can not contain sessionId
Open(context.Context, string) (io.WriteCloser, io.ReadCloser)
}

// implements splithttp.DialerClient in terms of direct network connections
Expand All @@ -42,6 +46,30 @@ type DefaultDialerClient struct {
dialUploadConn func(ctxInner context.Context) (net.Conn, error)
}

func (c *DefaultDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) {
reader, writer := io.Pipe()
req, _ := http.NewRequestWithContext(ctx, "POST", pureURL, reader)
req.Header = c.transportConfig.GetRequestHeader()
if !c.transportConfig.NoGRPCHeader {
req.Header.Set("Content-Type", "application/grpc")
}
wrc := &WaitReadCloser{Wait: make(chan struct{})}
go func() {
response, err := c.client.Do(req)
if err != nil || response.StatusCode != 200 {
if err != nil {
errors.LogInfoInner(ctx, err, "failed to open ", pureURL)
} else {
errors.LogInfo(ctx, "unexpected status ", response.StatusCode)
}
wrc.Close()
return
}
wrc.Set(response.Body)
}()
return writer, wrc
}

func (c *DefaultDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser {
reader, writer := io.Pipe()
req, _ := http.NewRequestWithContext(ctx, "POST", baseURL, reader)
Expand Down Expand Up @@ -226,3 +254,40 @@ func (c downloadBody) Close() error {
c.cancel()
return nil
}

type WaitReadCloser struct {
Wait chan struct{}
io.ReadCloser
}

func (w *WaitReadCloser) Set(rc io.ReadCloser) {
w.ReadCloser = rc
defer func() {
if recover() != nil {
rc.Close()
}
}()
close(w.Wait)
}

func (w *WaitReadCloser) Read(b []byte) (int, error) {
if w.ReadCloser == nil {
if <-w.Wait; w.ReadCloser == nil {
return 0, io.ErrClosedPipe
}
}
return w.ReadCloser.Read(b)
}

func (w *WaitReadCloser) Close() error {
if w.ReadCloser != nil {
return w.ReadCloser.Close()
}
defer func() {
if recover() != nil && w.ReadCloser != nil {
w.ReadCloser.Close()
}
}()
close(w.Wait)
return nil
}
42 changes: 31 additions & 11 deletions transport/internet/splithttp/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package splithttp
import (
"context"
gotls "crypto/tls"
"io"
"net/http"
"net/url"
"strconv"
Expand Down Expand Up @@ -279,9 +280,33 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
requestURL2.RawQuery = config2.GetNormalizedQuery()
}

reader, remoteAddr, localAddr, err := httpClient2.OpenDownload(context.WithoutCancel(ctx), requestURL2.String())
if err != nil {
return nil, err
mode := transportConfiguration.Mode
if mode == "" || mode == "auto" {
mode = "packet-up"
if (tlsConfig != nil && (len(tlsConfig.NextProtocol) != 1 || tlsConfig.NextProtocol[0] == "h2")) || realityConfig != nil {
mode = "stream-up"
}
if realityConfig != nil && transportConfiguration.DownloadSettings == nil {
mode = "stream-one"
}
}
errors.LogInfo(ctx, "XHTTP is using mode: "+mode)

var writer io.WriteCloser
var reader io.ReadCloser
var remoteAddr, localAddr net.Addr
var err error

if mode == "stream-one" {
requestURL.Path = transportConfiguration.GetNormalizedPath()
writer, reader = httpClient.Open(context.WithoutCancel(ctx), requestURL.String())
remoteAddr = &net.TCPAddr{}
localAddr = &net.TCPAddr{}
} else {
reader, remoteAddr, localAddr, err = httpClient2.OpenDownload(context.WithoutCancel(ctx), requestURL2.String())
if err != nil {
return nil, err
}
}

if muxRes != nil {
Expand All @@ -293,7 +318,7 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
closed := false

conn := splitConn{
writer: nil,
writer: writer,
reader: reader,
remoteAddr: remoteAddr,
localAddr: localAddr,
Expand All @@ -311,14 +336,9 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
},
}

mode := transportConfiguration.Mode
if mode == "auto" {
mode = "packet-up"
if (tlsConfig != nil && len(tlsConfig.NextProtocol) != 1) || realityConfig != nil {
mode = "stream-up"
}
if mode == "stream-one" {
return stat.Connection(&conn), nil
}
errors.LogInfo(ctx, "XHTTP is using mode: "+mode)
if mode == "stream-up" {
conn.writer = httpClient.OpenUpload(ctx, requestURL.String())
return stat.Connection(&conn), nil
Expand Down
48 changes: 35 additions & 13 deletions transport/internet/splithttp/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,22 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req

h.config.WriteResponseHeader(writer)

validRange := h.config.GetNormalizedXPaddingBytes()
x_padding := int32(len(request.URL.Query().Get("x_padding")))
if validRange.To > 0 && (x_padding < validRange.From || x_padding > validRange.To) {
errors.LogInfo(context.Background(), "invalid x_padding length:", x_padding)
writer.WriteHeader(http.StatusBadRequest)
return
}

sessionId := ""
subpath := strings.Split(request.URL.Path[len(h.path):], "/")
if len(subpath) > 0 {
sessionId = subpath[0]
}

if sessionId == "" {
errors.LogInfo(context.Background(), "no sessionid on request:", request.URL.Path)
if sessionId == "" && h.config.Mode != "" && h.config.Mode != "auto" && h.config.Mode != "stream-one" && h.config.Mode != "stream-up" {
errors.LogInfo(context.Background(), "stream-one mode is not allowed")
writer.WriteHeader(http.StatusBadRequest)
return
}
Expand All @@ -126,17 +134,20 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
}
}

currentSession := h.upsertSession(sessionId)
var currentSession *httpSession
if sessionId != "" {
currentSession = h.upsertSession(sessionId)
}
scMaxEachPostBytes := int(h.ln.config.GetNormalizedScMaxEachPostBytes().To)

if request.Method == "POST" {
if request.Method == "POST" && sessionId != "" {
seq := ""
if len(subpath) > 1 {
seq = subpath[1]
}

if seq == "" {
if h.config.Mode == "packet-up" {
if h.config.Mode != "" && h.config.Mode != "auto" && h.config.Mode != "stream-up" {
errors.LogInfo(context.Background(), "stream-up mode is not allowed")
writer.WriteHeader(http.StatusBadRequest)
return
Expand All @@ -148,13 +159,16 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
errors.LogInfoInner(context.Background(), err, "failed to upload (PushReader)")
writer.WriteHeader(http.StatusConflict)
} else {
if request.Header.Get("Content-Type") == "application/grpc" {
writer.Header().Set("Content-Type", "application/grpc")
}
writer.WriteHeader(http.StatusOK)
<-request.Context().Done()
}
return
}

if h.config.Mode == "stream-up" {
if h.config.Mode != "" && h.config.Mode != "auto" && h.config.Mode != "packet-up" {
errors.LogInfo(context.Background(), "packet-up mode is not allowed")
writer.WriteHeader(http.StatusBadRequest)
return
Expand Down Expand Up @@ -193,24 +207,29 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
}

writer.WriteHeader(http.StatusOK)
} else if request.Method == "GET" {
} else if request.Method == "GET" || sessionId == "" {
responseFlusher, ok := writer.(http.Flusher)
if !ok {
panic("expected http.ResponseWriter to be an http.Flusher")
}

// after GET is done, the connection is finished. disable automatic
// session reaping, and handle it in defer
currentSession.isFullyConnected.Close()
defer h.sessions.Delete(sessionId)
if sessionId != "" {
// after GET is done, the connection is finished. disable automatic
// session reaping, and handle it in defer
currentSession.isFullyConnected.Close()
defer h.sessions.Delete(sessionId)
}

// magic header instructs nginx + apache to not buffer response body
writer.Header().Set("X-Accel-Buffering", "no")
// A web-compliant header telling all middleboxes to disable caching.
// Should be able to prevent overloading the cache, or stop CDNs from
// teeing the response stream into their cache, causing slowdowns.
writer.Header().Set("Cache-Control", "no-store")
if !h.config.NoSSEHeader {

if request.Header.Get("Content-Type") == "application/grpc" {
writer.Header().Set("Content-Type", "application/grpc")
} else if !h.config.NoSSEHeader {
// magic header to make the HTTP middle box consider this as SSE to disable buffer
writer.Header().Set("Content-Type", "text/event-stream")
}
Expand All @@ -227,9 +246,12 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
downloadDone: downloadDone,
responseFlusher: responseFlusher,
},
reader: currentSession.uploadQueue,
reader: request.Body,
remoteAddr: remoteAddr,
}
if sessionId != "" {
conn.reader = currentSession.uploadQueue
}

h.ln.addConn(stat.Connection(&conn))

Expand Down