From 012d9fb0c5230d5df8c844c87d2169a8fa32dc58 Mon Sep 17 00:00:00 2001 From: fatedier Date: Thu, 28 May 2026 18:06:56 +0800 Subject: [PATCH] Merge pull request #5340 from fatedier/design/wire-v2-workconn-message-framing feat: use wire v2 framing for UDP workConn payload --- Release.md | 27 ++++++------------ client/proxy/udp.go | 14 ++++++---- pkg/msg/wire_v2_test.go | 19 +++++++++++++ server/control.go | 3 ++ server/proxy/proxy.go | 3 ++ server/proxy/proxy_test.go | 56 ++++++++++++++++++++++++++++++++++++++ server/proxy/udp.go | 24 ++++++++-------- server/service.go | 1 + 8 files changed, 111 insertions(+), 36 deletions(-) diff --git a/Release.md b/Release.md index e724fd34..72efbbc7 100644 --- a/Release.md +++ b/Release.md @@ -1,21 +1,10 @@ -## Compatibility Policy - -Starting with v0.69.0, each minor release is supported until there are nine newer minor releases. For example, v0.69.0 will be supported until v0.78.0 is released. Within this window, frpc v0.69.0 is guaranteed to work with any frps from v0.61.0 to v0.77.0, and vice versa. Patch releases within the same minor are always compatible. Versions outside the support window may continue to work on a best-effort basis, but compatibility is no longer guaranteed. - -For mixed-version deployments, upgrade frps first, then upgrade frpc. This keeps the server side ready for newer client-side protocol behavior before clients start using it. - -## Notes - -This release introduces wire protocol v2 as a transition path for future frpc/frps protocol changes. The existing wire protocol is difficult to extend without compatibility risk, and upcoming changes, including replacing deprecated stream encryption methods, require a versioned protocol. - -**The default value of `transport.wireProtocol` remains `v1` in this release.** Users can keep the default for now. To test v2 early, upgrade both frpc and frps to versions that support it, then set `transport.wireProtocol = "v2"` in frpc. A v2-enabled frpc cannot connect to an older frps. - -When `transport.wireProtocol = "v2"` is enabled, the control channel uses negotiated AEAD encryption after the login handshake. Both frpc and frps must be upgraded to this release to use v2. - -v1 will be deprecated when v2 becomes the default in a future release. It will continue to be supported until v0.78.0 is released, and may be removed in v0.78.0 or later. - ## Features -* Added `transport.wireProtocol` for frpc to select the internal message protocol used between frpc and frps. Supported values are `v1` and `v2`. -* Added client protocol visibility in the frps dashboard and `/api/clients` API. Online clients now report their negotiated protocol as `v1` or `v2`. -* Wire protocol v2 now negotiates AEAD control-channel encryption. Supported algorithms are `xchacha20-poly1305` and `aes-256-gcm`; frpc advertises its preferred order based on local AES-GCM hardware support, and frps selects the first supported algorithm from that list. +* When `transport.wireProtocol = "v2"` is enabled, ordinary UDP proxy work connection payloads now use wire protocol v2 message framing. This keeps UDP message payloads aligned with the negotiated frpc/frps wire protocol. + +## Compatibility Notes + +* The default/empty `transport.wireProtocol` and `transport.wireProtocol = "v1"` continue to use the legacy message codec for ordinary UDP proxy payloads. +* Raw stream proxy paths such as TCP, HTTP, and STCP remain unframed and are not affected by the UDP payload framing change. +* SUDP and XTCP keep their existing legacy behavior in this release and will be considered separately in a future phase. +* `transport.wireProtocol = "v2"` requires both frpc and frps to use versions that support the same wire v2 semantics. Mixing a newer peer that sends v2-framed ordinary UDP payloads with an older v2-capable peer that still expects the legacy UDP payload codec can break ordinary UDP proxy traffic. diff --git a/client/proxy/udp.go b/client/proxy/udp.go index 8547ef72..110c01ac 100644 --- a/client/proxy/udp.go +++ b/client/proxy/udp.go @@ -99,15 +99,17 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) { pxy.mu.Lock() pxy.workConn = netpkg.WrapReadWriteCloserToConn(remote, conn) + // Plain UDP payload follows the configured wire protocol for message framing. + payloadRW := msg.NewReadWriter(pxy.workConn, pxy.clientCfg.Transport.WireProtocol) pxy.readCh = make(chan *msg.UDPPacket, 1024) pxy.sendCh = make(chan msg.Message, 1024) pxy.closed = false pxy.mu.Unlock() - workConnReaderFn := func(conn net.Conn, readCh chan *msg.UDPPacket) { + workConnReaderFn := func(rw msg.ReadWriter, readCh chan *msg.UDPPacket) { for { var udpMsg msg.UDPPacket - if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil { + if errRet := rw.ReadMsgInto(&udpMsg); errRet != nil { xl.Warnf("read from workConn for udp error: %v", errRet) return } @@ -120,7 +122,7 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) { } } } - workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) { + workConnSenderFn := func(rw msg.ReadWriter, sendCh chan msg.Message) { defer func() { xl.Infof("writer goroutine for udp work connection closed") }() @@ -132,7 +134,7 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) { case *msg.Ping: xl.Tracef("send ping message to udp workConn") } - if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil { + if errRet = rw.WriteMsg(rawMsg); errRet != nil { xl.Errorf("udp work write error: %v", errRet) return } @@ -151,8 +153,8 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, _ *msg.StartWorkConn) { } } - go workConnSenderFn(pxy.workConn, pxy.sendCh) - go workConnReaderFn(pxy.workConn, pxy.readCh) + go workConnSenderFn(payloadRW, pxy.sendCh) + go workConnReaderFn(payloadRW, pxy.readCh) go heartbeatFn(pxy.sendCh) // Call Forwarder with proxy protocol version (empty string means no proxy protocol) diff --git a/pkg/msg/wire_v2_test.go b/pkg/msg/wire_v2_test.go index 5879fc37..f6f25e55 100644 --- a/pkg/msg/wire_v2_test.go +++ b/pkg/msg/wire_v2_test.go @@ -43,9 +43,28 @@ func TestV2ReadWriterRoundTrip(t *testing.T) { func TestNewReadWriter(t *testing.T) { require.IsType(t, &V1ReadWriter{}, NewReadWriter(&bytes.Buffer{}, "")) require.IsType(t, &V1ReadWriter{}, NewReadWriter(&bytes.Buffer{}, wire.ProtocolV1)) + require.IsType(t, &V1ReadWriter{}, NewReadWriter(&bytes.Buffer{}, "unknown")) require.IsType(t, &V2ReadWriter{}, NewReadWriter(&bytes.Buffer{}, wire.ProtocolV2)) } +func TestNewReadWriterEncoding(t *testing.T) { + for _, wireProtocol := range []string{"", wire.ProtocolV1} { + var legacy bytes.Buffer + legacyRW := NewReadWriter(&legacy, wireProtocol) + require.NoError(t, legacyRW.WriteMsg(&UDPPacket{Content: []byte("legacy")})) + require.NotEmpty(t, legacy.Bytes()) + require.Equal(t, TypeUDPPacket, legacy.Bytes()[0]) + } + + var v2 bytes.Buffer + v2RW := NewReadWriter(&v2, wire.ProtocolV2) + require.NoError(t, v2RW.WriteMsg(&UDPPacket{Content: []byte("v2")})) + frame, err := wire.NewConn(&v2).ReadFrame() + require.NoError(t, err) + require.Equal(t, wire.FrameTypeMessage, frame.Type) + require.Equal(t, V2TypeUDPPacket, binary.BigEndian.Uint16(frame.Payload[:2])) +} + func TestV2MessageTypeIDsAreStable(t *testing.T) { require.Equal(t, uint16(1), V2TypeLogin) require.Equal(t, uint16(2), V2TypeLoginResp) diff --git a/server/control.go b/server/control.go index d7b41c43..084b4eea 100644 --- a/server/control.go +++ b/server/control.go @@ -112,6 +112,8 @@ type SessionContext struct { ServerCfg *v1.ServerConfig // client registry ClientRegistry *registry.ClientRegistry + // negotiated wire protocol for this client session + WireProtocol string } type Control struct { @@ -452,6 +454,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err Configurer: pxyConf, ServerCfg: ctl.sessionCtx.ServerCfg, EncryptionKey: ctl.sessionCtx.EncryptionKey, + WireProtocol: ctl.sessionCtx.WireProtocol, }) if err != nil { return remoteAddr, err diff --git a/server/proxy/proxy.go b/server/proxy/proxy.go index 7175a606..7945b91b 100644 --- a/server/proxy/proxy.go +++ b/server/proxy/proxy.go @@ -92,6 +92,7 @@ type BaseProxy struct { userInfo plugin.UserInfo loginMsg *msg.Login configurer v1.ProxyConfigurer + wireProtocol string mu sync.RWMutex xl *xlog.Logger @@ -331,6 +332,7 @@ type Options struct { Configurer v1.ProxyConfigurer ServerCfg *v1.ServerConfig EncryptionKey []byte + WireProtocol string } func NewProxy(ctx context.Context, options *Options) (pxy Proxy, err error) { @@ -357,6 +359,7 @@ func NewProxy(ctx context.Context, options *Options) (pxy Proxy, err error) { userInfo: options.UserInfo, loginMsg: options.LoginMsg, configurer: configurer, + wireProtocol: options.WireProtocol, } factory := proxyFactoryRegistry[reflect.TypeOf(configurer)] diff --git a/server/proxy/proxy_test.go b/server/proxy/proxy_test.go index 38f9dd42..381c743e 100644 --- a/server/proxy/proxy_test.go +++ b/server/proxy/proxy_test.go @@ -15,12 +15,15 @@ package proxy import ( + "context" "net" "testing" "github.com/stretchr/testify/require" + v1 "github.com/fatedier/frp/pkg/config/v1" "github.com/fatedier/frp/pkg/msg" + "github.com/fatedier/frp/pkg/proto/wire" ) func TestWorkConnStartWritesStartWorkConn(t *testing.T) { @@ -51,3 +54,56 @@ func TestWorkConnStartWritesStartWorkConn(t *testing.T) { require.NoError(t, result.err) require.Same(t, serverMsgConn, result.conn) } + +func TestGetWorkConnFromPoolStartWorkConnUnchangedForUDPWireV2(t *testing.T) { + startMsg := getStartWorkConnFromPool(t, &v1.UDPProxyConfig{ + ProxyBaseConfig: v1.ProxyBaseConfig{Name: "udp", Type: string(v1.ProxyTypeUDP)}, + }, wire.ProtocolV2) + + require.Equal(t, msg.StartWorkConn{ProxyName: "udp"}, startMsg) +} + +func TestGetWorkConnFromPoolLeavesRawTCPPayloadUnframed(t *testing.T) { + startMsg := getStartWorkConnFromPool(t, &v1.TCPProxyConfig{ + ProxyBaseConfig: v1.ProxyBaseConfig{Name: "tcp", Type: string(v1.ProxyTypeTCP)}, + }, wire.ProtocolV2) + + require.Equal(t, msg.StartWorkConn{ProxyName: "tcp"}, startMsg) +} + +func getStartWorkConnFromPool(t *testing.T, cfg v1.ProxyConfigurer, wireProtocol string) msg.StartWorkConn { + t.Helper() + + client, server := net.Pipe() + t.Cleanup(func() { + client.Close() + server.Close() + }) + + serverMsgConn := msg.NewConn(server, msg.NewV2ReadWriter(server)) + clientMsgConn := msg.NewConn(client, msg.NewV2ReadWriter(client)) + pxy := &BaseProxy{ + name: cfg.GetBaseConfig().Name, + configurer: cfg, + poolCount: 0, + ctx: context.Background(), + wireProtocol: wireProtocol, + getWorkConnFn: func() (*WorkConn, error) { + return NewWorkConn(serverMsgConn), nil + }, + } + + errCh := make(chan error, 1) + go func() { + conn, err := pxy.GetWorkConnFromPool(nil, nil) + if conn != nil { + conn.Close() + } + errCh <- err + }() + + var startMsg msg.StartWorkConn + require.NoError(t, clientMsgConn.ReadMsgInto(&startMsg)) + require.NoError(t, <-errCh) + return startMsg +} diff --git a/server/proxy/udp.go b/server/proxy/udp.go index 6c5b854a..609d9c42 100644 --- a/server/proxy/udp.go +++ b/server/proxy/udp.go @@ -108,7 +108,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) { pxy.checkCloseCh = make(chan int) // read message from workConn, if it returns any error, notify proxy to start a new workConn - workConnReaderFn := func(conn net.Conn) { + workConnReaderFn := func(payloadConn *msg.Conn) { for { var ( rawMsg msg.Message @@ -116,10 +116,10 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) { ) xl.Tracef("loop waiting message from udp workConn") // client will send heartbeat in workConn for keeping alive - _ = conn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second)) - if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil { + _ = payloadConn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second)) + if rawMsg, errRet = payloadConn.ReadMsg(); errRet != nil { xl.Warnf("read from workConn for udp error: %v", errRet) - _ = conn.Close() + _ = payloadConn.Close() // notify proxy to start a new work connection // ignore error here, it means the proxy is closed _ = errors.PanicToError(func() { @@ -127,7 +127,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) { }) return } - if err := conn.SetReadDeadline(time.Time{}); err != nil { + if err := payloadConn.SetReadDeadline(time.Time{}); err != nil { xl.Warnf("set read deadline error: %v", err) } switch m := rawMsg.(type) { @@ -144,7 +144,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) { int64(len(m.Content)), ) }); errRet != nil { - conn.Close() + _ = payloadConn.Close() xl.Infof("reader goroutine for udp work connection closed") return } @@ -153,7 +153,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) { } // send message to workConn - workConnSenderFn := func(conn net.Conn, ctx context.Context) { + workConnSenderFn := func(payloadConn *msg.Conn, ctx context.Context) { var errRet error for { select { @@ -162,9 +162,9 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) { xl.Infof("sender goroutine for udp work connection closed") return } - if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil { + if errRet = payloadConn.WriteMsg(udpMsg); errRet != nil { xl.Infof("sender goroutine for udp work connection closed: %v", errRet) - conn.Close() + _ = payloadConn.Close() return } xl.Tracef("send message to udp workConn, len: %d", len(udpMsg.Content)) @@ -223,9 +223,11 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) { } pxy.workConn = netpkg.WrapReadWriteCloserToConn(rwc, workConn) + // Plain UDP payload follows the negotiated wire protocol for message framing. + payloadConn := msg.NewConn(pxy.workConn, msg.NewReadWriter(pxy.workConn, pxy.wireProtocol)) ctx, cancel := context.WithCancel(context.Background()) - go workConnReaderFn(pxy.workConn) - go workConnSenderFn(pxy.workConn, ctx) + go workConnReaderFn(payloadConn) + go workConnSenderFn(payloadConn, ctx) _, ok := <-pxy.checkCloseCh cancel() if !ok { diff --git a/server/service.go b/server/service.go index 7ec303e4..bc70e803 100644 --- a/server/service.go +++ b/server/service.go @@ -777,6 +777,7 @@ func (svr *Service) RegisterControl( LoginMsg: loginMsg, ServerCfg: svr.cfg, ClientRegistry: svr.clientRegistry, + WireProtocol: wireProtocol, }) if err != nil { xl.Warnf("create new controller error: %v", err)