monflo/monflo.go
2020-09-11 09:57:37 +08:00

584 lines
16 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
uuid "github.com/satori/go.uuid"
"io/ioutil"
"math/rand"
"net"
"net/http"
"strings"
"sync"
"time"
)
var (
upgrader = websocket.Upgrader{
//允许跨域访问
CheckOrigin: func(r *http.Request) bool {
return true
},
EnableCompression: false,
}
managementService = MonfloManagement{}
)
func main() {
rand.Seed(time.Now().UnixNano())
go stun()
http.HandleFunc("/2015-10-26/verification", verification)
http.HandleFunc("/2015-10-26/devices", device)
http.HandleFunc("/2015-10-26", wsHandler)
http.HandleFunc("/config", configHandler)
http.HandleFunc("/set", configSetHandler)
http.HandleFunc("/peer", peerHandler)
err := http.ListenAndServeTLS("", "cert.crt", "private.key", nil)
if err != nil {
fmt.Print(err)
}
}
func configHandler(w http.ResponseWriter, r *http.Request) {
marshal, _ := json.Marshal(managementService.clients)
w.Write(marshal)
}
func configSetHandler(w http.ResponseWriter, r *http.Request) {
var clients []*ClientInfo
data, _ := ioutil.ReadAll(r.Body)
json.Unmarshal(data, &clients)
managementService.clients = clients
}
func peerHandler(w http.ResponseWriter, r *http.Request) {
peers := managementService.getPeers(r.FormValue("username"))
marshal, _ := json.Marshal(peers)
w.Write(marshal)
}
func stun() {
address := "0.0.0.0:3478"
addr, err := net.ResolveUDPAddr("udp", address)
if err != nil {
fmt.Println(err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
fmt.Println(err)
}
defer conn.Close()
for {
// Here must use make and give the lenth of buffer
data := make([]byte, 4096)
_, rAddr, err := conn.ReadFromUDP(data)
if err != nil {
fmt.Println(err)
continue
}
//file, _ := ioutil.ReadFile("data.bin")
//_, err = conn.WriteToUDP(file, rAddr)
_, err = conn.WriteToUDP([]byte{254, 239, 1, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 0, 6, 58, 210, 98, 46, 174, 198, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, rAddr)
if err != nil {
fmt.Println(err)
continue
}
}
}
func wsHandler(w http.ResponseWriter, r *http.Request) {
var (
wsConn *websocket.Conn
err error
mutex sync.Mutex
client *ClientInfo
)
//Upgrade websocket(返回给客户端的消息)
if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil {
//报错了直接返回底层的websocket链接就会终断掉
return
}
go func() {
for {
mutex.Lock()
err2 := wsConn.WriteMessage(websocket.PingMessage, []byte{})
mutex.Unlock()
if err2 != nil {
wsConn.Close()
return
}
time.Sleep(1 * time.Second)
}
defer func() {
if r := recover(); r != nil {
fmt.Printf("Runtime error caught: %v", r)
}
}()
}()
defer func() {
if r := recover(); r != nil {
fmt.Printf("Runtime error caught: %v", r)
}
if nil != client {
managementService.setOffline(client.Client)
}
}()
mutex.Lock()
wsConn.WriteMessage(websocket.TextMessage, []byte("{\n \"event\": \"cookie_created\",\n \"data\": \"5f0389ae63c5c5962de0bf1fa7edbb9e7605da388a8ffb2f69af4c47b1fde020\"\n}"))
mutex.Unlock()
client = new(ClientInfo)
for {
messageType, p, err := wsConn.ReadMessage()
if err != nil {
fmt.Printf("WebSocket异常断开," + wsConn.RemoteAddr().String() + ",异常信息:" + err.Error() + "\n")
wsConn.Close()
return
}
if messageType == websocket.CloseMessage {
fmt.Printf("WebSocket断开," + wsConn.RemoteAddr().String() + "\n")
wsConn.Close()
return
}
fmt.Printf(string(p) + "\n")
mutex.Lock()
tempClient := wsProcess(wsConn, p, client)
mutex.Unlock()
if tempClient != nil {
client = tempClient
}
}
}
func wsProcess(wsConn *websocket.Conn, data []byte, client *ClientInfo) *ClientInfo {
str := string(data)
if len(str) == 0 {
return nil
}
if str[0:1] == "{" {
var request MonfloRequest
json.Unmarshal(data, &request)
if request.Uri == "/" {
if request.Headers.ApiKey != "" {
client = managementService.getClientByApiKey(request.Headers.ApiKey)
client.wsConn = wsConn
client.ClientStatus = "online"
response := WelcomeInfo{
MonfloResponse: MonfloResponse{
Event: "welcome",
},
Data: WelcomeData{
Client: client.Client,
Features: Features{
MaxUserProfiles: 1,
MaxStreams: 128,
MaxDevices: 128,
MaxP2pResolution: 2160,
MaxP2pFrames: 60,
MaxRelayResolution: 2160,
MaxRelayFrames: 60,
Hevc: true,
SessionRecording: true,
ManagementConsole: true,
ConnectAnywhere: false,
Support: false,
CommercialUse: false,
DeviceThumbnails: false,
FileTransfer: true,
},
Peers: managementService.getPeers(client.UserName),
Subscription: 3,
Timestamp: time.Now().Unix(),
},
}
wsConn.WriteMessage(websocket.TextMessage, formatJson(response))
wsConn.WriteMessage(websocket.TextMessage, []byte("{\"status\":200}"))
//如果是客户端登陆,需要通知该用户下所有的服务端
//if client.ClientType == "client" {
// streamClients := managementService.getUserAllClient(client.UserName)
// if streamClients != nil {
// for _, streamClient := range streamClients {
// if streamClient.ClientStatus == "online" && streamClient.ClientType == "server" {
// eventResponse := &ClientConnectedEvent{
// MonfloResponse: MonfloResponse{
// Event: "client_connected",
// },
// Data: client.Client,
// }
// streamClient.wsConn.WriteMessage(websocket.TextMessage, formatJson(eventResponse))
// }
// }
// }
//}
return client
} else {
//服务端匿名登录
uid, _ := uuid.NewV4()
response := WelcomeInfo{
MonfloResponse: MonfloResponse{
Event: "welcome",
},
Data: WelcomeData{
Client: "client-" + uid.String(),
Features: Features{
MaxUserProfiles: 1,
MaxStreams: 128,
MaxDevices: 128,
MaxP2pResolution: 2160,
MaxP2pFrames: 60,
MaxRelayResolution: 2160,
MaxRelayFrames: 60,
Hevc: true,
SessionRecording: true,
ManagementConsole: true,
ConnectAnywhere: false,
Support: false,
CommercialUse: false,
DeviceThumbnails: false,
FileTransfer: true,
},
Subscription: 3,
Peers: make(map[string]PeerInfo),
Timestamp: time.Now().Unix(),
},
}
client = managementService.addServer(response.Data.Client)
client.wsConn = wsConn
client.ClientType = "server"
client.ClientStatus = "online"
wsConn.WriteMessage(websocket.TextMessage, formatJson(response))
wsConn.WriteMessage(websocket.TextMessage, []byte("{\"status\":200}"))
return client
}
}
if request.Uri == "/devices/current" && request.Method == "PATCH" {
client.Metadata = request.Data.Metadata
response := PatchDeviceResponse{
MonfloResponse: MonfloResponse{
Status: 200,
},
Data: LoginResponse{
Metadata: client.Metadata,
Id: client.Id,
},
}
wsConn.WriteMessage(websocket.TextMessage, formatJson(response))
}
if request.Method == "PATCH" && strings.Contains(request.Uri, "/streams/stream-") {
streamId := request.Uri[9:]
client.Key = request.Data.Key
client.PrivateAddr = request.Data.PrivateAddr
client.PublicAddr = request.Data.PublicAddr
client.StreamStatus = request.Data.Status
response := MonfloResponse{
Status: 200,
}
wsConn.WriteMessage(websocket.TextMessage, formatJson(response))
//通知所有该账号的客户端更新服务端信息
streamClients := managementService.getUserAllClient(client.UserName)
if streamClients != nil {
for _, streamClient := range streamClients {
if streamClient.ClientStatus == "online" && streamClient.ClientType == "client" {
response2 := &StreamResponse{
MonfloResponse: MonfloResponse{
Event: "stream_updated",
},
Data: StreamInfo{
Id: streamId,
Key: request.Data.Key,
Status: request.Data.Status,
OwnerClient: client.Client,
PrivateAddr: request.Data.PrivateAddr,
PublicAddr: request.Data.PublicAddr,
},
}
wsConn.WriteMessage(websocket.TextMessage, formatJson(response2))
}
}
}
}
if request.Uri == "/devices" && request.Method == "GET" {
response := DeviceResponse{
MonfloResponse: MonfloResponse{
Status: 200,
},
Data: LoginResponse{
Metadata: client.Metadata,
Id: client.Id,
},
}
wsConn.WriteMessage(websocket.TextMessage, formatJson(response))
}
if request.Uri == "/peers" && request.Method == "GET" {
peers := managementService.getPeers(client.UserName)
response := PeersResponse{
MonfloResponse: MonfloResponse{
Status: 200,
},
Data: peers,
}
wsConn.WriteMessage(websocket.TextMessage, formatJson(response))
}
if request.Method == "POST" && request.Uri == "/incognitos" {
if request.Data.Stream != "" {
//登录账户模式
managementService.genId(client)
} else {
//服务端匿名模式
client.PrivateAddr = request.Data.PrivateAddr
client.PublicAddr = request.Data.PublicAddr
client.Bitrate = request.Data.Bitrate
client.Key = request.Data.Key
client.Formats = request.Data.Formats
//分配Id
managementService.genId(client)
client.Id = client.StreamId
//分配StreamId
managementService.genStream(client)
}
response := &IncognitosResponse{
MonfloResponse: MonfloResponse{
Status: 200,
},
Data: IncognitosData{
Id: client.StreamId,
Stream: client.Stream,
Client: client.Client,
},
}
wsConn.WriteMessage(websocket.TextMessage, formatJson(response))
}
if request.Method == "POST" && request.Uri == "/streams" {
client.PrivateAddr = request.Data.PrivateAddr
client.PublicAddr = request.Data.PublicAddr
client.Bitrate = request.Data.Bitrate
client.Key = request.Data.Key
client.Formats = request.Data.Formats
managementService.genStream(client)
response := &StreamResponse{
MonfloResponse: MonfloResponse{
Status: 200,
},
Data: StreamInfo{
Id: client.Stream,
Status: "ready",
OwnerClient: client.Client,
Name: "0",
PrivateAddr: client.PrivateAddr,
PublicAddr: client.PublicAddr,
Key: client.Key,
Bitrate: client.Bitrate,
Formats: client.Formats,
},
}
wsConn.WriteMessage(websocket.TextMessage, formatJson(response))
}
if request.Method == "GET" && (request.Uri == "/invitations/logins" || request.Uri == "/invitations" || request.Uri == "/shortcuts") {
response := &MonfloResponse{
Status: 200,
}
wsConn.WriteMessage(websocket.TextMessage, formatJson(response))
}
if request.Method == "GET" && strings.Contains(request.Uri, "/streams/stream-") {
streamCode := request.Uri[9:]
stream := managementService.getClientStream(streamCode)
if stream != nil {
response := &StreamResponse{
MonfloResponse: MonfloResponse{
Status: 200,
},
Data: StreamInfo{
Id: stream.Stream,
Status: "ready",
OwnerClient: stream.Client,
Name: "0",
PrivateAddr: stream.PrivateAddr,
PublicAddr: stream.PublicAddr,
Key: stream.Key,
Bitrate: stream.Bitrate,
Formats: stream.Formats,
},
}
wsConn.WriteMessage(websocket.TextMessage, formatJson(response))
response2 := &StreamReadResponse{
MonfloResponse: MonfloResponse{
Event: "_stream_read",
},
Data: StreamReadData{
Client: client.Client,
Format: request.Headers.Format,
Id: stream.Stream,
PrivateAddr: request.Headers.PrivateAddr,
PublicAddr: request.Headers.PublicAddr,
Type: request.Headers.Type,
},
}
stream.wsConn.WriteMessage(websocket.TextMessage, formatJson(response2))
}
}
if request.Method == "GET" && strings.Contains(request.Uri, "/incognitos/") {
stream := managementService.getClientStreamId(request.Uri[12:])
if stream != nil {
response := &StreamResponse{
MonfloResponse: MonfloResponse{
Status: 200,
},
Data: StreamInfo{
Id: stream.Stream,
Status: "ready",
OwnerClient: stream.Client,
Name: stream.StreamId,
PrivateAddr: stream.PrivateAddr,
PublicAddr: stream.PublicAddr,
Key: stream.Key,
Bitrate: stream.Bitrate,
Formats: stream.Formats,
},
}
wsConn.WriteMessage(websocket.TextMessage, formatJson(response))
response2 := &StreamReadResponse{
MonfloResponse: MonfloResponse{
Event: "_stream_read",
},
Data: StreamReadData{
Client: client.Client,
Format: request.Headers.Format,
Id: stream.Stream,
PrivateAddr: request.Headers.PrivateAddr,
PublicAddr: request.Headers.PublicAddr,
Type: request.Headers.Type,
},
}
stream.wsConn.WriteMessage(websocket.TextMessage, formatJson(response2))
}
}
} else if str[0:1] == "[" {
var requests []MonfloRequest
json.Unmarshal(data, &requests)
var datas []interface{}
for _, request := range requests {
if request.Uri == "/devices" && request.Method == "GET" {
device := managementService.getDevices(client.UserName)
response := DevicesResponse{
MonfloResponse: MonfloResponse{
Status: 200,
},
Data: device,
}
datas = append(datas, response)
}
if request.Uri == "/info" && request.Method == "GET" {
response := MonfloResponse{
Status: 200,
}
datas = append(datas, response)
}
}
wsConn.WriteMessage(websocket.TextMessage, formatJson(datas))
}
return nil
}
func verification(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Server", "MonfloHTTPServer")
w.Header().Set("Content-Type", "application/json")
data, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Print(err.Error())
return
}
fmt.Print("verification:" + string(data) + "\n")
var loginRequest LoginRequest
json.Unmarshal(data, &loginRequest)
if !managementService.checkExist(loginRequest.Login, loginRequest.Fingerprint) {
w.WriteHeader(404)
return
}
client := managementService.getClient(loginRequest.Login, loginRequest.Fingerprint)
client.UserName = loginRequest.Login
response := LoginResponse{
CreatedAt: client.CreatedAt,
ApiKey: client.ApiKey,
MacAddress: client.MacAddress,
Metadata: client.Metadata,
Fingerprint: client.Fingerprint,
Id: client.Id,
Client: client.Client,
Name: client.Name,
}
marshal, _ := json.Marshal(response)
w.Write(marshal)
}
func device(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Server", "MonfloHTTPServer")
w.Header().Set("Content-Type", "application/json")
data, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Print(err.Error())
return
}
fmt.Print("Add Device:" + string(data) + "\n")
var addDeviceRequest AddDeviceRequest
json.Unmarshal(data, &addDeviceRequest)
userName := r.Header.Get("Monflo-login")
managementService.addClient(userName, addDeviceRequest.Fingerprint)
managementService.addDevice(addDeviceRequest)
client := managementService.getClientByFingerprint(addDeviceRequest.Fingerprint)
response := LoginResponse{
CreatedAt: client.CreatedAt,
ApiKey: client.ApiKey,
MacAddress: client.MacAddress,
Metadata: client.Metadata,
Fingerprint: client.Fingerprint,
Id: client.Id,
Client: client.Client,
Name: client.Name,
}
marshal, _ := json.Marshal(response)
w.Write(marshal)
}