Merge pull request #5340 from fatedier/design/wire-v2-workconn-message-framing
feat: use wire v2 framing for UDP workConn payload
This commit is contained in:
committed by
GitHub
Unverified
parent
3e19ef9bfd
commit
012d9fb0c5
@@ -112,6 +112,8 @@ type SessionContext struct {
|
||||
ServerCfg *v1.ServerConfig
|
||||
// client registry
|
||||
ClientRegistry *registry.ClientRegistry
|
||||
// negotiated wire protocol for this client session
|
||||
WireProtocol string
|
||||
}
|
||||
|
||||
type Control struct {
|
||||
@@ -452,6 +454,7 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
|
||||
Configurer: pxyConf,
|
||||
ServerCfg: ctl.sessionCtx.ServerCfg,
|
||||
EncryptionKey: ctl.sessionCtx.EncryptionKey,
|
||||
WireProtocol: ctl.sessionCtx.WireProtocol,
|
||||
})
|
||||
if err != nil {
|
||||
return remoteAddr, err
|
||||
|
||||
@@ -92,6 +92,7 @@ type BaseProxy struct {
|
||||
userInfo plugin.UserInfo
|
||||
loginMsg *msg.Login
|
||||
configurer v1.ProxyConfigurer
|
||||
wireProtocol string
|
||||
|
||||
mu sync.RWMutex
|
||||
xl *xlog.Logger
|
||||
@@ -331,6 +332,7 @@ type Options struct {
|
||||
Configurer v1.ProxyConfigurer
|
||||
ServerCfg *v1.ServerConfig
|
||||
EncryptionKey []byte
|
||||
WireProtocol string
|
||||
}
|
||||
|
||||
func NewProxy(ctx context.Context, options *Options) (pxy Proxy, err error) {
|
||||
@@ -357,6 +359,7 @@ func NewProxy(ctx context.Context, options *Options) (pxy Proxy, err error) {
|
||||
userInfo: options.UserInfo,
|
||||
loginMsg: options.LoginMsg,
|
||||
configurer: configurer,
|
||||
wireProtocol: options.WireProtocol,
|
||||
}
|
||||
|
||||
factory := proxyFactoryRegistry[reflect.TypeOf(configurer)]
|
||||
|
||||
@@ -15,12 +15,15 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
v1 "github.com/fatedier/frp/pkg/config/v1"
|
||||
"github.com/fatedier/frp/pkg/msg"
|
||||
"github.com/fatedier/frp/pkg/proto/wire"
|
||||
)
|
||||
|
||||
func TestWorkConnStartWritesStartWorkConn(t *testing.T) {
|
||||
@@ -51,3 +54,56 @@ func TestWorkConnStartWritesStartWorkConn(t *testing.T) {
|
||||
require.NoError(t, result.err)
|
||||
require.Same(t, serverMsgConn, result.conn)
|
||||
}
|
||||
|
||||
func TestGetWorkConnFromPoolStartWorkConnUnchangedForUDPWireV2(t *testing.T) {
|
||||
startMsg := getStartWorkConnFromPool(t, &v1.UDPProxyConfig{
|
||||
ProxyBaseConfig: v1.ProxyBaseConfig{Name: "udp", Type: string(v1.ProxyTypeUDP)},
|
||||
}, wire.ProtocolV2)
|
||||
|
||||
require.Equal(t, msg.StartWorkConn{ProxyName: "udp"}, startMsg)
|
||||
}
|
||||
|
||||
func TestGetWorkConnFromPoolLeavesRawTCPPayloadUnframed(t *testing.T) {
|
||||
startMsg := getStartWorkConnFromPool(t, &v1.TCPProxyConfig{
|
||||
ProxyBaseConfig: v1.ProxyBaseConfig{Name: "tcp", Type: string(v1.ProxyTypeTCP)},
|
||||
}, wire.ProtocolV2)
|
||||
|
||||
require.Equal(t, msg.StartWorkConn{ProxyName: "tcp"}, startMsg)
|
||||
}
|
||||
|
||||
func getStartWorkConnFromPool(t *testing.T, cfg v1.ProxyConfigurer, wireProtocol string) msg.StartWorkConn {
|
||||
t.Helper()
|
||||
|
||||
client, server := net.Pipe()
|
||||
t.Cleanup(func() {
|
||||
client.Close()
|
||||
server.Close()
|
||||
})
|
||||
|
||||
serverMsgConn := msg.NewConn(server, msg.NewV2ReadWriter(server))
|
||||
clientMsgConn := msg.NewConn(client, msg.NewV2ReadWriter(client))
|
||||
pxy := &BaseProxy{
|
||||
name: cfg.GetBaseConfig().Name,
|
||||
configurer: cfg,
|
||||
poolCount: 0,
|
||||
ctx: context.Background(),
|
||||
wireProtocol: wireProtocol,
|
||||
getWorkConnFn: func() (*WorkConn, error) {
|
||||
return NewWorkConn(serverMsgConn), nil
|
||||
},
|
||||
}
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
conn, err := pxy.GetWorkConnFromPool(nil, nil)
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
}
|
||||
errCh <- err
|
||||
}()
|
||||
|
||||
var startMsg msg.StartWorkConn
|
||||
require.NoError(t, clientMsgConn.ReadMsgInto(&startMsg))
|
||||
require.NoError(t, <-errCh)
|
||||
return startMsg
|
||||
}
|
||||
|
||||
+13
-11
@@ -108,7 +108,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
|
||||
pxy.checkCloseCh = make(chan int)
|
||||
|
||||
// read message from workConn, if it returns any error, notify proxy to start a new workConn
|
||||
workConnReaderFn := func(conn net.Conn) {
|
||||
workConnReaderFn := func(payloadConn *msg.Conn) {
|
||||
for {
|
||||
var (
|
||||
rawMsg msg.Message
|
||||
@@ -116,10 +116,10 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
|
||||
)
|
||||
xl.Tracef("loop waiting message from udp workConn")
|
||||
// client will send heartbeat in workConn for keeping alive
|
||||
_ = conn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second))
|
||||
if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
|
||||
_ = payloadConn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second))
|
||||
if rawMsg, errRet = payloadConn.ReadMsg(); errRet != nil {
|
||||
xl.Warnf("read from workConn for udp error: %v", errRet)
|
||||
_ = conn.Close()
|
||||
_ = payloadConn.Close()
|
||||
// notify proxy to start a new work connection
|
||||
// ignore error here, it means the proxy is closed
|
||||
_ = errors.PanicToError(func() {
|
||||
@@ -127,7 +127,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
|
||||
})
|
||||
return
|
||||
}
|
||||
if err := conn.SetReadDeadline(time.Time{}); err != nil {
|
||||
if err := payloadConn.SetReadDeadline(time.Time{}); err != nil {
|
||||
xl.Warnf("set read deadline error: %v", err)
|
||||
}
|
||||
switch m := rawMsg.(type) {
|
||||
@@ -144,7 +144,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
|
||||
int64(len(m.Content)),
|
||||
)
|
||||
}); errRet != nil {
|
||||
conn.Close()
|
||||
_ = payloadConn.Close()
|
||||
xl.Infof("reader goroutine for udp work connection closed")
|
||||
return
|
||||
}
|
||||
@@ -153,7 +153,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
|
||||
}
|
||||
|
||||
// send message to workConn
|
||||
workConnSenderFn := func(conn net.Conn, ctx context.Context) {
|
||||
workConnSenderFn := func(payloadConn *msg.Conn, ctx context.Context) {
|
||||
var errRet error
|
||||
for {
|
||||
select {
|
||||
@@ -162,9 +162,9 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
|
||||
xl.Infof("sender goroutine for udp work connection closed")
|
||||
return
|
||||
}
|
||||
if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
|
||||
if errRet = payloadConn.WriteMsg(udpMsg); errRet != nil {
|
||||
xl.Infof("sender goroutine for udp work connection closed: %v", errRet)
|
||||
conn.Close()
|
||||
_ = payloadConn.Close()
|
||||
return
|
||||
}
|
||||
xl.Tracef("send message to udp workConn, len: %d", len(udpMsg.Content))
|
||||
@@ -223,9 +223,11 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
|
||||
}
|
||||
|
||||
pxy.workConn = netpkg.WrapReadWriteCloserToConn(rwc, workConn)
|
||||
// Plain UDP payload follows the negotiated wire protocol for message framing.
|
||||
payloadConn := msg.NewConn(pxy.workConn, msg.NewReadWriter(pxy.workConn, pxy.wireProtocol))
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go workConnReaderFn(pxy.workConn)
|
||||
go workConnSenderFn(pxy.workConn, ctx)
|
||||
go workConnReaderFn(payloadConn)
|
||||
go workConnSenderFn(payloadConn, ctx)
|
||||
_, ok := <-pxy.checkCloseCh
|
||||
cancel()
|
||||
if !ok {
|
||||
|
||||
@@ -777,6 +777,7 @@ func (svr *Service) RegisterControl(
|
||||
LoginMsg: loginMsg,
|
||||
ServerCfg: svr.cfg,
|
||||
ClientRegistry: svr.clientRegistry,
|
||||
WireProtocol: wireProtocol,
|
||||
})
|
||||
if err != nil {
|
||||
xl.Warnf("create new controller error: %v", err)
|
||||
|
||||
Reference in New Issue
Block a user