package main import ( "encoding/json" "fmt" "github.com/gorilla/websocket" uuid "github.com/satori/go.uuid" "io/ioutil" "math/rand" "net" "net/http" "strings" "sync" "time" ) var ( upgrader = websocket.Upgrader{ //允许跨域访问 CheckOrigin: func(r *http.Request) bool { return true }, EnableCompression: false, } managementService = MonfloManagement{} ) func main() { rand.Seed(time.Now().UnixNano()) go stun() http.HandleFunc("/2015-10-26/verification", verification) http.HandleFunc("/2015-10-26/devices", device) http.HandleFunc("/2015-10-26", wsHandler) http.HandleFunc("/config", configHandler) http.HandleFunc("/set", configSetHandler) http.HandleFunc("/peer", peerHandler) err := http.ListenAndServeTLS("", "cert.crt", "private.key", nil) if err != nil { fmt.Print(err) } } func configHandler(w http.ResponseWriter, r *http.Request) { marshal, _ := json.Marshal(managementService.clients) w.Write(marshal) } func configSetHandler(w http.ResponseWriter, r *http.Request) { var clients []*ClientInfo data, _ := ioutil.ReadAll(r.Body) json.Unmarshal(data, &clients) managementService.clients = clients } func peerHandler(w http.ResponseWriter, r *http.Request) { peers := managementService.getPeers(r.FormValue("username")) marshal, _ := json.Marshal(peers) w.Write(marshal) } func stun() { address := "0.0.0.0:3478" addr, err := net.ResolveUDPAddr("udp", address) if err != nil { fmt.Println(err) } conn, err := net.ListenUDP("udp", addr) if err != nil { fmt.Println(err) } defer conn.Close() for { // Here must use make and give the lenth of buffer data := make([]byte, 4096) _, rAddr, err := conn.ReadFromUDP(data) if err != nil { fmt.Println(err) continue } //file, _ := ioutil.ReadFile("data.bin") //_, err = conn.WriteToUDP(file, rAddr) _, err = conn.WriteToUDP([]byte{254, 239, 1, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 0, 6, 58, 210, 98, 46, 174, 198, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, rAddr) if err != nil { fmt.Println(err) continue } } } func wsHandler(w http.ResponseWriter, r *http.Request) { var ( wsConn *websocket.Conn err error mutex sync.Mutex client *ClientInfo ) //Upgrade websocket(返回给客户端的消息) if wsConn, err = upgrader.Upgrade(w, r, nil); err != nil { //报错了,直接返回底层的websocket链接就会终断掉 return } go func() { for { mutex.Lock() err2 := wsConn.WriteMessage(websocket.PingMessage, []byte{}) mutex.Unlock() if err2 != nil { wsConn.Close() return } time.Sleep(1 * time.Second) } defer func() { if r := recover(); r != nil { fmt.Printf("Runtime error caught: %v", r) } }() }() defer func() { if r := recover(); r != nil { fmt.Printf("Runtime error caught: %v", r) } if nil != client { managementService.setOffline(client.Client) } }() mutex.Lock() wsConn.WriteMessage(websocket.TextMessage, []byte("{\n \"event\": \"cookie_created\",\n \"data\": \"5f0389ae63c5c5962de0bf1fa7edbb9e7605da388a8ffb2f69af4c47b1fde020\"\n}")) mutex.Unlock() client = new(ClientInfo) for { messageType, p, err := wsConn.ReadMessage() if err != nil { fmt.Printf("WebSocket异常断开," + wsConn.RemoteAddr().String() + ",异常信息:" + err.Error() + "\n") wsConn.Close() return } if messageType == websocket.CloseMessage { fmt.Printf("WebSocket断开," + wsConn.RemoteAddr().String() + "\n") wsConn.Close() return } fmt.Printf(string(p) + "\n") mutex.Lock() tempClient := wsProcess(wsConn, p, client) mutex.Unlock() if tempClient != nil { client = tempClient } } } func wsProcess(wsConn *websocket.Conn, data []byte, client *ClientInfo) *ClientInfo { str := string(data) if len(str) == 0 { return nil } if str[0:1] == "{" { var request MonfloRequest json.Unmarshal(data, &request) if request.Uri == "/" { if request.Headers.ApiKey != "" { client = managementService.getClientByApiKey(request.Headers.ApiKey) client.wsConn = wsConn client.ClientStatus = "online" response := WelcomeInfo{ MonfloResponse: MonfloResponse{ Event: "welcome", }, Data: WelcomeData{ Client: client.Client, Features: Features{ MaxUserProfiles: 1, MaxStreams: 128, MaxDevices: 128, MaxP2pResolution: 2160, MaxP2pFrames: 60, MaxRelayResolution: 2160, MaxRelayFrames: 60, Hevc: true, SessionRecording: true, ManagementConsole: true, ConnectAnywhere: false, Support: false, CommercialUse: false, DeviceThumbnails: false, FileTransfer: true, }, Peers: managementService.getPeers(client.UserName), Subscription: 3, Timestamp: time.Now().Unix(), }, } wsConn.WriteMessage(websocket.TextMessage, formatJson(response)) wsConn.WriteMessage(websocket.TextMessage, []byte("{\"status\":200}")) //如果是客户端登陆,需要通知该用户下所有的服务端 //if client.ClientType == "client" { // streamClients := managementService.getUserAllClient(client.UserName) // if streamClients != nil { // for _, streamClient := range streamClients { // if streamClient.ClientStatus == "online" && streamClient.ClientType == "server" { // eventResponse := &ClientConnectedEvent{ // MonfloResponse: MonfloResponse{ // Event: "client_connected", // }, // Data: client.Client, // } // streamClient.wsConn.WriteMessage(websocket.TextMessage, formatJson(eventResponse)) // } // } // } //} return client } else { //服务端匿名登录 uid, _ := uuid.NewV4() response := WelcomeInfo{ MonfloResponse: MonfloResponse{ Event: "welcome", }, Data: WelcomeData{ Client: "client-" + uid.String(), Features: Features{ MaxUserProfiles: 1, MaxStreams: 128, MaxDevices: 128, MaxP2pResolution: 2160, MaxP2pFrames: 60, MaxRelayResolution: 2160, MaxRelayFrames: 60, Hevc: true, SessionRecording: true, ManagementConsole: true, ConnectAnywhere: false, Support: false, CommercialUse: false, DeviceThumbnails: false, FileTransfer: true, }, Subscription: 3, Peers: make(map[string]PeerInfo), Timestamp: time.Now().Unix(), }, } client = managementService.addServer(response.Data.Client) client.wsConn = wsConn client.ClientType = "server" client.ClientStatus = "online" wsConn.WriteMessage(websocket.TextMessage, formatJson(response)) wsConn.WriteMessage(websocket.TextMessage, []byte("{\"status\":200}")) return client } } if request.Uri == "/devices/current" && request.Method == "PATCH" { client.Metadata = request.Data.Metadata response := PatchDeviceResponse{ MonfloResponse: MonfloResponse{ Status: 200, }, Data: LoginResponse{ Metadata: client.Metadata, Id: client.Id, }, } wsConn.WriteMessage(websocket.TextMessage, formatJson(response)) } if request.Method == "PATCH" && strings.Contains(request.Uri, "/streams/stream-") { streamId := request.Uri[9:] client.Key = request.Data.Key client.PrivateAddr = request.Data.PrivateAddr client.PublicAddr = request.Data.PublicAddr client.StreamStatus = request.Data.Status response := MonfloResponse{ Status: 200, } wsConn.WriteMessage(websocket.TextMessage, formatJson(response)) //通知所有该账号的客户端更新服务端信息 streamClients := managementService.getUserAllClient(client.UserName) if streamClients != nil { for _, streamClient := range streamClients { if streamClient.ClientStatus == "online" && streamClient.ClientType == "client" { response2 := &StreamResponse{ MonfloResponse: MonfloResponse{ Event: "stream_updated", }, Data: StreamInfo{ Id: streamId, Key: request.Data.Key, Status: request.Data.Status, OwnerClient: client.Client, PrivateAddr: request.Data.PrivateAddr, PublicAddr: request.Data.PublicAddr, }, } wsConn.WriteMessage(websocket.TextMessage, formatJson(response2)) } } } } if request.Uri == "/devices" && request.Method == "GET" { response := DeviceResponse{ MonfloResponse: MonfloResponse{ Status: 200, }, Data: LoginResponse{ Metadata: client.Metadata, Id: client.Id, }, } wsConn.WriteMessage(websocket.TextMessage, formatJson(response)) } if request.Uri == "/peers" && request.Method == "GET" { peers := managementService.getPeers(client.UserName) response := PeersResponse{ MonfloResponse: MonfloResponse{ Status: 200, }, Data: peers, } wsConn.WriteMessage(websocket.TextMessage, formatJson(response)) } if request.Method == "POST" && request.Uri == "/incognitos" { if request.Data.Stream != "" { //登录账户模式 managementService.genId(client) } else { //服务端匿名模式 client.PrivateAddr = request.Data.PrivateAddr client.PublicAddr = request.Data.PublicAddr client.Bitrate = request.Data.Bitrate client.Key = request.Data.Key client.Formats = request.Data.Formats //分配Id managementService.genId(client) client.Id = client.StreamId //分配StreamId managementService.genStream(client) } response := &IncognitosResponse{ MonfloResponse: MonfloResponse{ Status: 200, }, Data: IncognitosData{ Id: client.StreamId, Stream: client.Stream, Client: client.Client, }, } wsConn.WriteMessage(websocket.TextMessage, formatJson(response)) } if request.Method == "POST" && request.Uri == "/streams" { client.PrivateAddr = request.Data.PrivateAddr client.PublicAddr = request.Data.PublicAddr client.Bitrate = request.Data.Bitrate client.Key = request.Data.Key client.Formats = request.Data.Formats managementService.genStream(client) response := &StreamResponse{ MonfloResponse: MonfloResponse{ Status: 200, }, Data: StreamInfo{ Id: client.Stream, Status: "ready", OwnerClient: client.Client, Name: "0", PrivateAddr: client.PrivateAddr, PublicAddr: client.PublicAddr, Key: client.Key, Bitrate: client.Bitrate, Formats: client.Formats, }, } wsConn.WriteMessage(websocket.TextMessage, formatJson(response)) } if request.Method == "GET" && (request.Uri == "/invitations/logins" || request.Uri == "/invitations" || request.Uri == "/shortcuts") { response := &MonfloResponse{ Status: 200, } wsConn.WriteMessage(websocket.TextMessage, formatJson(response)) } if request.Method == "GET" && strings.Contains(request.Uri, "/streams/stream-") { streamCode := request.Uri[9:] stream := managementService.getClientStream(streamCode) if stream != nil { response := &StreamResponse{ MonfloResponse: MonfloResponse{ Status: 200, }, Data: StreamInfo{ Id: stream.Stream, Status: "ready", OwnerClient: stream.Client, Name: "0", PrivateAddr: stream.PrivateAddr, PublicAddr: stream.PublicAddr, Key: stream.Key, Bitrate: stream.Bitrate, Formats: stream.Formats, }, } wsConn.WriteMessage(websocket.TextMessage, formatJson(response)) response2 := &StreamReadResponse{ MonfloResponse: MonfloResponse{ Event: "_stream_read", }, Data: StreamReadData{ Client: client.Client, Format: request.Headers.Format, Id: stream.Stream, PrivateAddr: request.Headers.PrivateAddr, PublicAddr: request.Headers.PublicAddr, Type: request.Headers.Type, }, } stream.wsConn.WriteMessage(websocket.TextMessage, formatJson(response2)) } } if request.Method == "GET" && strings.Contains(request.Uri, "/incognitos/") { stream := managementService.getClientStreamId(request.Uri[12:]) if stream != nil { response := &StreamResponse{ MonfloResponse: MonfloResponse{ Status: 200, }, Data: StreamInfo{ Id: stream.Stream, Status: "ready", OwnerClient: stream.Client, Name: stream.StreamId, PrivateAddr: stream.PrivateAddr, PublicAddr: stream.PublicAddr, Key: stream.Key, Bitrate: stream.Bitrate, Formats: stream.Formats, }, } wsConn.WriteMessage(websocket.TextMessage, formatJson(response)) response2 := &StreamReadResponse{ MonfloResponse: MonfloResponse{ Event: "_stream_read", }, Data: StreamReadData{ Client: client.Client, Format: request.Headers.Format, Id: stream.Stream, PrivateAddr: request.Headers.PrivateAddr, PublicAddr: request.Headers.PublicAddr, Type: request.Headers.Type, }, } stream.wsConn.WriteMessage(websocket.TextMessage, formatJson(response2)) } } } else if str[0:1] == "[" { var requests []MonfloRequest json.Unmarshal(data, &requests) var datas []interface{} for _, request := range requests { if request.Uri == "/devices" && request.Method == "GET" { device := managementService.getDevices(client.UserName) response := DevicesResponse{ MonfloResponse: MonfloResponse{ Status: 200, }, Data: device, } datas = append(datas, response) } if request.Uri == "/info" && request.Method == "GET" { response := MonfloResponse{ Status: 200, } datas = append(datas, response) } } wsConn.WriteMessage(websocket.TextMessage, formatJson(datas)) } return nil } func verification(w http.ResponseWriter, r *http.Request) { w.Header().Add("Server", "MonfloHTTPServer") w.Header().Set("Content-Type", "application/json") data, err := ioutil.ReadAll(r.Body) if err != nil { fmt.Print(err.Error()) return } fmt.Print("verification:" + string(data) + "\n") var loginRequest LoginRequest json.Unmarshal(data, &loginRequest) if !managementService.checkExist(loginRequest.Login, loginRequest.Fingerprint) { w.WriteHeader(404) return } client := managementService.getClient(loginRequest.Login, loginRequest.Fingerprint) client.UserName = loginRequest.Login response := LoginResponse{ CreatedAt: client.CreatedAt, ApiKey: client.ApiKey, MacAddress: client.MacAddress, Metadata: client.Metadata, Fingerprint: client.Fingerprint, Id: client.Id, Client: client.Client, Name: client.Name, } marshal, _ := json.Marshal(response) w.Write(marshal) } func device(w http.ResponseWriter, r *http.Request) { w.Header().Add("Server", "MonfloHTTPServer") w.Header().Set("Content-Type", "application/json") data, err := ioutil.ReadAll(r.Body) if err != nil { fmt.Print(err.Error()) return } fmt.Print("Add Device:" + string(data) + "\n") var addDeviceRequest AddDeviceRequest json.Unmarshal(data, &addDeviceRequest) userName := r.Header.Get("Monflo-login") managementService.addClient(userName, addDeviceRequest.Fingerprint) managementService.addDevice(addDeviceRequest) client := managementService.getClientByFingerprint(addDeviceRequest.Fingerprint) response := LoginResponse{ CreatedAt: client.CreatedAt, ApiKey: client.ApiKey, MacAddress: client.MacAddress, Metadata: client.Metadata, Fingerprint: client.Fingerprint, Id: client.Id, Client: client.Client, Name: client.Name, } marshal, _ := json.Marshal(response) w.Write(marshal) }