Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions infra/conf/transport_internet.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ type SplitHTTPConfig struct {
XPaddingBytes *Int32Range `json:"xPaddingBytes"`
Xmux Xmux `json:"xmux"`
DownloadSettings *StreamConfig `json:"downloadSettings"`
Mode string `json:"mode"`
}

type Xmux struct {
Expand Down Expand Up @@ -289,6 +290,14 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) {
muxProtobuf.CMaxReuseTimes.To = 128
}

switch c.Mode {
case "":
c.Mode = "auto"
case "auto", "packet-up", "stream-up":
default:
return nil, errors.New("unsupported mode: " + c.Mode)
}

config := &splithttp.Config{
Path: c.Path,
Host: c.Host,
Expand All @@ -299,6 +308,7 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) {
NoSSEHeader: c.NoSSEHeader,
XPaddingBytes: splithttpNewRandRangeConfig(c.XPaddingBytes),
Xmux: &muxProtobuf,
Mode: c.Mode,
}
var err error
if c.DownloadSettings != nil {
Expand Down
4 changes: 4 additions & 0 deletions transport/internet/splithttp/browser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
// has no fields because everything is global state :O)
type BrowserDialerClient struct{}

func (c *BrowserDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser {
panic("not implemented yet")
}

func (c *BrowserDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
conn, err := browser_dialer.DialGet(baseURL)
dummyAddr := &gonet.IPAddr{}
Expand Down
12 changes: 12 additions & 0 deletions transport/internet/splithttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type DialerClient interface {
// (ctx, baseURL) -> (downloadReader, remoteAddr, localAddr)
// baseURL already contains sessionId
OpenDownload(context.Context, string) (io.ReadCloser, net.Addr, net.Addr, error)

// (ctx, baseURL) -> uploadWriter
// baseURL already contains sessionId
OpenUpload(context.Context, string) io.WriteCloser
}

// implements splithttp.DialerClient in terms of direct network connections
Expand All @@ -38,6 +42,14 @@ type DefaultDialerClient struct {
dialUploadConn func(ctxInner context.Context) (net.Conn, error)
}

func (c *DefaultDialerClient) OpenUpload(ctx context.Context, baseURL string) io.WriteCloser {
reader, writer := io.Pipe()
req, _ := http.NewRequestWithContext(ctx, "POST", baseURL, reader)
req.Header = c.transportConfig.GetRequestHeader()
go c.client.Do(req)
return writer
}

func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string) (io.ReadCloser, gonet.Addr, gonet.Addr, error) {
var remoteAddr gonet.Addr
var localAddr gonet.Addr
Expand Down
73 changes: 41 additions & 32 deletions transport/internet/splithttp/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions transport/internet/splithttp/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ message Config {
RandRangeConfig xPaddingBytes = 8;
Multiplexing xmux = 9;
xray.transport.internet.StreamConfig downloadSettings = 10;
string mode = 11;
}

message RandRangeConfig {
Expand Down
84 changes: 46 additions & 38 deletions transport/internet/splithttp/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,9 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me

httpClient, muxRes := getHTTPClient(ctx, dest, streamSettings)

var httpClient2 DialerClient
httpClient2 := httpClient
requestURL2 := requestURL
var muxRes2 *muxResource
var requestURL2 url.URL
if transportConfiguration.DownloadSettings != nil {
globalDialerAccess.Lock()
if streamSettings.DownloadSettings == nil {
Expand All @@ -279,27 +279,59 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
requestURL2.RawQuery = config2.GetNormalizedQuery()
}

maxUploadSize := scMaxEachPostBytes.roll()
// WithSizeLimit(0) will still allow single bytes to pass, and a lot of
// code relies on this behavior. Subtract 1 so that together with
// uploadWriter wrapper, exact size limits can be enforced
uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - 1))
reader, remoteAddr, localAddr, err := httpClient2.OpenDownload(context.WithoutCancel(ctx), requestURL2.String())
if err != nil {
return nil, err
}

if muxRes != nil {
muxRes.OpenRequests.Add(1)
}
if muxRes2 != nil {
muxRes2.OpenRequests.Add(1)
}
closed := false

go func() {
if muxRes != nil {
defer muxRes.OpenRequests.Add(-1)
}
if muxRes2 != nil {
defer muxRes2.OpenRequests.Add(-1)
}
conn := splitConn{
writer: nil,
reader: reader,
remoteAddr: remoteAddr,
localAddr: localAddr,
onClose: func() {
if closed {
return
}
closed = true
if muxRes != nil {
muxRes.OpenRequests.Add(-1)
}
if muxRes2 != nil {
muxRes2.OpenRequests.Add(-1)
}
},
}

mode := transportConfiguration.Mode
if mode == "auto" && realityConfig != nil {
mode = "stream-up"
}
if mode == "stream-up" {
conn.writer = httpClient.OpenUpload(ctx, requestURL.String())
return stat.Connection(&conn), nil
}

maxUploadSize := scMaxEachPostBytes.roll()
// WithSizeLimit(0) will still allow single bytes to pass, and a lot of
// code relies on this behavior. Subtract 1 so that together with
// uploadWriter wrapper, exact size limits can be enforced
uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - 1))

conn.writer = uploadWriter{
uploadPipeWriter,
maxUploadSize,
}

go func() {
requestsLimiter := semaphore.New(int(scMaxConcurrentPosts.roll()))
var requestCounter int64

Expand Down Expand Up @@ -352,30 +384,6 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
}
}()

httpClient3 := httpClient
requestURL3 := requestURL
if httpClient2 != nil {
httpClient3 = httpClient2
requestURL3 = requestURL2
}

reader, remoteAddr, localAddr, err := httpClient3.OpenDownload(context.WithoutCancel(ctx), requestURL3.String())
if err != nil {
return nil, err
}

writer := uploadWriter{
uploadPipeWriter,
maxUploadSize,
}

conn := splitConn{
writer: writer,
reader: reader,
remoteAddr: remoteAddr,
localAddr: localAddr,
}

return stat.Connection(&conn), nil
}

Expand Down
33 changes: 26 additions & 7 deletions transport/internet/splithttp/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
return
}

h.config.WriteResponseHeader(writer)

sessionId := ""
subpath := strings.Split(request.URL.Path[len(h.path):], "/")
if len(subpath) > 0 {
Expand Down Expand Up @@ -134,7 +136,26 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
}

if seq == "" {
errors.LogInfo(context.Background(), "no seq on request:", request.URL.Path)
if h.config.Mode == "packet-up" {
errors.LogInfo(context.Background(), "stream-up mode is not allowed")
writer.WriteHeader(http.StatusBadRequest)
return
}
err = currentSession.uploadQueue.Push(Packet{
Reader: request.Body,
})
if err != nil {
errors.LogInfoInner(context.Background(), err, "failed to upload (PushReader)")
writer.WriteHeader(http.StatusConflict)
} else {
writer.WriteHeader(http.StatusOK)
<-request.Context().Done()
}
return
}

if h.config.Mode == "stream-up" {
errors.LogInfo(context.Background(), "packet-up mode is not allowed")
writer.WriteHeader(http.StatusBadRequest)
return
}
Expand All @@ -148,14 +169,14 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
}

if err != nil {
errors.LogInfoInner(context.Background(), err, "failed to upload")
errors.LogInfoInner(context.Background(), err, "failed to upload (ReadAll)")
writer.WriteHeader(http.StatusInternalServerError)
return
}

seqInt, err := strconv.ParseUint(seq, 10, 64)
if err != nil {
errors.LogInfoInner(context.Background(), err, "failed to upload")
errors.LogInfoInner(context.Background(), err, "failed to upload (ParseUint)")
writer.WriteHeader(http.StatusInternalServerError)
return
}
Expand All @@ -166,12 +187,11 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
})

if err != nil {
errors.LogInfoInner(context.Background(), err, "failed to upload")
errors.LogInfoInner(context.Background(), err, "failed to upload (PushPayload)")
writer.WriteHeader(http.StatusInternalServerError)
return
}

h.config.WriteResponseHeader(writer)
writer.WriteHeader(http.StatusOK)
} else if request.Method == "GET" {
responseFlusher, ok := writer.(http.Flusher)
Expand All @@ -195,8 +215,6 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
writer.Header().Set("Content-Type", "text/event-stream")
}

h.config.WriteResponseHeader(writer)

writer.WriteHeader(http.StatusOK)

responseFlusher.Flush()
Expand All @@ -223,6 +241,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req

conn.Close()
} else {
errors.LogInfo(context.Background(), "unsupported method: ", request.Method)
writer.WriteHeader(http.StatusMethodNotAllowed)
}
}
Expand Down
Loading