Merge branch 'pr_182' into dev

This commit is contained in:
fatedier
2016-12-24 08:51:26 -06:00
committed by GitHub
Unverified
43 changed files with 5540 additions and 163 deletions
+33 -14
View File
@@ -45,8 +45,8 @@ var (
LogMaxDays int64 = 3
PrivilegeMode bool = false
PrivilegeToken string = ""
AuthTimeout int64 = 15
Domain string = ""
AuthTimeout int64 = 900
SubDomainHost string = ""
// if PrivilegeAllowPorts is not nil, tcp proxies which remote port exist in this map can be connected
PrivilegeAllowPorts map[int64]struct{}
@@ -123,12 +123,12 @@ func loadCommonConf(confFile string) error {
DashboardPort = 0
}
tmpStr, ok = conf.Get("common", "dashboard_username")
tmpStr, ok = conf.Get("common", "dashboard_user")
if ok {
DashboardUsername = tmpStr
}
tmpStr, ok = conf.Get("common", "dashboard_password")
tmpStr, ok = conf.Get("common", "dashboard_pwd")
if ok {
DashboardPassword = tmpStr
}
@@ -233,7 +233,10 @@ func loadCommonConf(confFile string) error {
AuthTimeout = v
}
}
Domain, ok = conf.Get("common", "domain")
SubDomainHost, ok = conf.Get("common", "subdomain_host")
if ok {
SubDomainHost = strings.ToLower(strings.TrimSpace(SubDomainHost))
}
return nil
}
@@ -252,7 +255,7 @@ func loadProxyConf(confFile string) (proxyServers map[string]*ProxyServer, err e
proxyServer.Type, ok = section["type"]
if ok {
if proxyServer.Type != "tcp" && proxyServer.Type != "http" && proxyServer.Type != "https" {
if proxyServer.Type != "tcp" && proxyServer.Type != "http" && proxyServer.Type != "https" && proxyServer.Type != "udp" {
return proxyServers, fmt.Errorf("Parse conf error: proxy [%s] type error", proxyServer.Name)
}
} else {
@@ -264,8 +267,8 @@ func loadProxyConf(confFile string) (proxyServers map[string]*ProxyServer, err e
return proxyServers, fmt.Errorf("Parse conf error: proxy [%s] no auth_token found", proxyServer.Name)
}
// for tcp
if proxyServer.Type == "tcp" {
// for tcp and udp
if proxyServer.Type == "tcp" || proxyServer.Type == "udp" {
proxyServer.BindAddr, ok = section["bind_addr"]
if !ok {
proxyServer.BindAddr = "0.0.0.0"
@@ -288,13 +291,18 @@ func loadProxyConf(confFile string) (proxyServers map[string]*ProxyServer, err e
if ok {
proxyServer.CustomDomains = strings.Split(domainStr, ",")
if len(proxyServer.CustomDomains) == 0 {
return proxyServers, fmt.Errorf("Parse conf error: proxy [%s] custom_domains must be set when type equals http", proxyServer.Name)
return proxyServers, fmt.Errorf("Parse conf error: proxy [%s] custom_domains must be set when type is http", proxyServer.Name)
}
for i, domain := range proxyServer.CustomDomains {
proxyServer.CustomDomains[i] = strings.ToLower(strings.TrimSpace(domain))
domain = strings.ToLower(strings.TrimSpace(domain))
// custom domain should not belong to subdomain_host
if SubDomainHost != "" && strings.Contains(domain, SubDomainHost) {
return proxyServers, fmt.Errorf("Parse conf error: proxy [%s] custom domain should not belong to subdomain_host", proxyServer.Name)
}
proxyServer.CustomDomains[i] = domain
}
} else {
return proxyServers, fmt.Errorf("Parse conf error: proxy [%s] custom_domains must be set when type equals http", proxyServer.Name)
return proxyServers, fmt.Errorf("Parse conf error: proxy [%s] custom_domains must be set when type is http", proxyServer.Name)
}
//location
@@ -312,13 +320,17 @@ func loadProxyConf(confFile string) (proxyServers map[string]*ProxyServer, err e
if ok {
proxyServer.CustomDomains = strings.Split(domainStr, ",")
if len(proxyServer.CustomDomains) == 0 {
return proxyServers, fmt.Errorf("Parse conf error: proxy [%s] custom_domains must be set when type equals https", proxyServer.Name)
return proxyServers, fmt.Errorf("Parse conf error: proxy [%s] custom_domains must be set when type is https", proxyServer.Name)
}
for i, domain := range proxyServer.CustomDomains {
proxyServer.CustomDomains[i] = strings.ToLower(strings.TrimSpace(domain))
domain = strings.ToLower(strings.TrimSpace(domain))
if SubDomainHost != "" && strings.Contains(domain, SubDomainHost) {
return proxyServers, fmt.Errorf("Parse conf error: proxy [%s] custom domain should not belong to subdomain_host", proxyServer.Name)
}
proxyServer.CustomDomains[i] = domain
}
} else {
return proxyServers, fmt.Errorf("Parse conf error: proxy [%s] custom_domains must be set when type equals https", proxyServer.Name)
return proxyServers, fmt.Errorf("Parse conf error: proxy [%s] custom_domains must be set when type is https", proxyServer.Name)
}
}
proxyServers[proxyServer.Name] = proxyServer
@@ -398,3 +410,10 @@ func DeleteProxy(proxyName string) {
defer ProxyServersMutex.Unlock()
delete(ProxyServers, proxyName)
}
func GetProxyServer(proxyName string) (p *ProxyServer, ok bool) {
ProxyServersMutex.RLock()
defer ProxyServersMutex.RUnlock()
p, ok = ProxyServers[proxyName]
return
}
+1 -1
View File
@@ -34,7 +34,7 @@ func RunDashboardServer(addr string, port int64) (err error) {
// url router
mux := http.NewServeMux()
// api, see dashboard_api.go
mux.HandleFunc("/api/reload", apiReload)
mux.HandleFunc("/api/reload", use(apiReload, basicAuth))
mux.HandleFunc("/api/proxies", apiProxies)
// view, see dashboard_view.go
+130 -36
View File
@@ -16,6 +16,7 @@ package server
import (
"fmt"
"net"
"sync"
"time"
@@ -25,6 +26,7 @@ import (
"github.com/fatedier/frp/src/models/msg"
"github.com/fatedier/frp/src/utils/conn"
"github.com/fatedier/frp/src/utils/log"
"github.com/fatedier/frp/src/utils/pool"
)
type Listener interface {
@@ -35,12 +37,17 @@ type Listener interface {
type ProxyServer struct {
*config.ProxyServerConf
CtlConn *conn.Conn `json:"-"` // control connection with frpc
listeners []Listener // accept new connection from remote users
ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel
workConnChan chan *conn.Conn // get new work conns from control goroutine
mutex sync.RWMutex
closeChan chan struct{} // for notify other goroutines that the proxy is closed by close this channel
Status int64
CtlConn *conn.Conn // control connection with frpc
WorkConnUdp *conn.Conn // work connection for udp
udpConn *net.UDPConn
listeners []Listener // accept new connection from remote users
ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel
workConnChan chan *conn.Conn // get new work conns from control goroutine
udpSenderChan chan *msg.UdpPacket
mutex sync.RWMutex
closeChan chan struct{} // close this channel for notifying other goroutines that the proxy is closed
}
func NewProxyServer() (p *ProxyServer) {
@@ -60,7 +67,7 @@ func NewProxyServerFromCtlMsg(req *msg.ControlReq) (p *ProxyServer) {
p.PrivilegeMode = req.PrivilegeMode
p.PrivilegeToken = PrivilegeToken
p.BindAddr = BindAddr
if p.Type == "tcp" {
if p.Type == "tcp" || p.Type == "udp" {
p.ListenPort = req.RemotePort
} else if p.Type == "http" {
p.ListenPort = VhostHttpPort
@@ -80,6 +87,7 @@ func (p *ProxyServer) Init() {
metric.SetStatus(p.Name, p.Status)
p.workConnChan = make(chan *conn.Conn, p.PoolCount+10)
p.ctlMsgChan = make(chan int64, p.PoolCount+10)
p.udpSenderChan = make(chan *msg.UdpPacket, 1024)
p.listeners = make([]Listener, 0)
p.closeChan = make(chan struct{})
p.Unlock()
@@ -145,46 +153,73 @@ func (p *ProxyServer) Start(c *conn.Conn) (err error) {
p.Unlock()
metric.SetStatus(p.Name, p.Status)
// create connection pool if needed
if p.PoolCount > 0 {
go p.connectionPoolManager(p.closeChan)
}
// start a goroutine for every listener to accept user connection
for _, listener := range p.listeners {
go func(l Listener) {
if p.Type == "udp" {
// udp is special
p.udpConn, err = conn.ListenUDP(p.BindAddr, p.ListenPort)
if err != nil {
log.Warn("ProxyName [%s], listen udp port error: %v", p.Name, err)
return err
}
go func() {
for {
// block
// if listener is closed, err returned
c, err := l.Accept()
buf := pool.GetBuf(2048)
n, remoteAddr, err := p.udpConn.ReadFromUDP(buf)
if err != nil {
log.Info("ProxyName [%s], listener is closed", p.Name)
log.Info("ProxyName [%s], udp listener is closed", p.Name)
return
}
log.Debug("ProxyName [%s], get one new user conn [%s]", p.Name, c.GetRemoteAddr())
if p.Status != consts.Working {
log.Debug("ProxyName [%s] is not working, new user conn close", p.Name)
c.Close()
return
localAddr, _ := net.ResolveUDPAddr("udp", p.udpConn.LocalAddr().String())
udpPacket := msg.NewUdpPacket(buf[0:n], remoteAddr, localAddr)
select {
case p.udpSenderChan <- udpPacket:
default:
log.Warn("ProxyName [%s], udp sender channel is full", p.Name)
}
pool.PutBuf(buf)
}
}()
} else {
// create connection pool if needed
if p.PoolCount > 0 {
go p.connectionPoolManager(p.closeChan)
}
go func(userConn *conn.Conn) {
workConn, err := p.getWorkConn()
// start a goroutine for every listener to accept user connection
for _, listener := range p.listeners {
go func(l Listener) {
for {
// block
// if listener is closed, err returned
c, err := l.Accept()
if err != nil {
log.Info("ProxyName [%s], listener is closed", p.Name)
return
}
log.Debug("ProxyName [%s], get one new user conn [%s]", p.Name, c.GetRemoteAddr())
if p.Status != consts.Working {
log.Debug("ProxyName [%s] is not working, new user conn close", p.Name)
c.Close()
return
}
// message will be transferred to another without modifying
// l means local, r means remote
log.Debug("Join two connections, (l[%s] r[%s]) (l[%s] r[%s])", workConn.GetLocalAddr(), workConn.GetRemoteAddr(),
userConn.GetLocalAddr(), userConn.GetRemoteAddr())
go func(userConn *conn.Conn) {
workConn, err := p.getWorkConn()
if err != nil {
return
}
needRecord := true
go msg.JoinMore(userConn, workConn, p.BaseConf, needRecord)
}(c)
}
}(listener)
// message will be transferred to another without modifying
// l means local, r means remote
log.Debug("Join two connections, (l[%s] r[%s]) (l[%s] r[%s])", workConn.GetLocalAddr(), workConn.GetRemoteAddr(),
userConn.GetLocalAddr(), userConn.GetRemoteAddr())
needRecord := true
go msg.JoinMore(userConn, workConn, p.BaseConf, needRecord)
}(c)
}
}(listener)
}
}
return nil
}
@@ -200,10 +235,18 @@ func (p *ProxyServer) Close() {
}
close(p.ctlMsgChan)
close(p.workConnChan)
close(p.udpSenderChan)
close(p.closeChan)
if p.CtlConn != nil {
p.CtlConn.Close()
}
if p.WorkConnUdp != nil {
p.WorkConnUdp.Close()
}
if p.udpConn != nil {
p.udpConn.Close()
p.udpConn = nil
}
}
metric.SetStatus(p.Name, p.Status)
// if the proxy created by PrivilegeMode, delete it when closed
@@ -228,9 +271,60 @@ func (p *ProxyServer) RegisterNewWorkConn(c *conn.Conn) {
case p.workConnChan <- c:
default:
log.Debug("ProxyName [%s], workConnChan is full, so close this work connection", p.Name)
c.Close()
}
}
// create a tcp connection for forwarding udp packages
func (p *ProxyServer) RegisterNewWorkConnUdp(c *conn.Conn) {
if p.WorkConnUdp != nil && !p.WorkConnUdp.IsClosed() {
p.WorkConnUdp.Close()
}
p.WorkConnUdp = c
// read
go func() {
var (
buf string
err error
)
for {
buf, err = c.ReadLine()
if err != nil {
log.Warn("ProxyName [%s], work connection for udp closed", p.Name)
return
}
udpPacket := &msg.UdpPacket{}
err = udpPacket.UnPack([]byte(buf))
if err != nil {
log.Warn("ProxyName [%s], unpack udp packet error: %v", p.Name, err)
continue
}
// send to user
_, err = p.udpConn.WriteToUDP(udpPacket.Content, udpPacket.Dst)
if err != nil {
continue
}
}
}()
// write
go func() {
for {
udpPacket, ok := <-p.udpSenderChan
if !ok {
return
}
err := c.WriteString(string(udpPacket.Pack()) + "\n")
if err != nil {
log.Debug("ProxyName [%s], write to work connection for udp error: %v", p.Name, err)
return
}
}
}()
}
// When frps get one user connection, we get one work connection from the pool and return it.
// If no workConn available in the pool, send message to frpc to get one or more
// and wait until it is available.