修复监控失败的问题

This commit is contained in:
dushixiang
2022-02-13 14:42:22 +08:00
parent 77f253c748
commit 6165265417
8 changed files with 152 additions and 123 deletions

View File

@ -2,7 +2,6 @@ package api
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"path" "path"
@ -54,7 +53,6 @@ func (api GuacamoleApi) Guacamole(c echo.Context) error {
height := c.QueryParam("height") height := c.QueryParam("height")
dpi := c.QueryParam("dpi") dpi := c.QueryParam("dpi")
sessionId := c.Param("id") sessionId := c.Param("id")
connectionId := c.QueryParam("connectionId")
intWidth, _ := strconv.Atoi(width) intWidth, _ := strconv.Atoi(width)
intHeight, _ := strconv.Atoi(height) intHeight, _ := strconv.Atoi(height)
@ -63,65 +61,48 @@ func (api GuacamoleApi) Guacamole(c echo.Context) error {
propertyMap := repository.PropertyRepository.FindAllMap(ctx) propertyMap := repository.PropertyRepository.FindAllMap(ctx)
var s model.Session configuration.SetParameter("width", width)
configuration.SetParameter("height", height)
if len(connectionId) > 0 { configuration.SetParameter("dpi", dpi)
s, err = repository.SessionRepository.FindByConnectionId(ctx, connectionId) s, err := service.SessionService.FindByIdAndDecrypt(ctx, sessionId)
if err != nil {
return err
}
api.setConfig(propertyMap, s, configuration)
var (
ip = s.IP
port = s.Port
)
if s.AccessGatewayId != "" && s.AccessGatewayId != "-" {
g, err := service.GatewayService.GetGatewayAndReconnectById(s.AccessGatewayId)
if err != nil { if err != nil {
return err utils.Disconnect(ws, AccessGatewayUnAvailable, "获取接入网关失败:"+err.Error())
return nil
} }
if s.Status != constant.Connected { if !g.Connected {
return errors.New("会话未在线") utils.Disconnect(ws, AccessGatewayUnAvailable, "接入网关不可用:"+g.Message)
return nil
} }
configuration.ConnectionID = connectionId exposedIP, exposedPort, err := g.OpenSshTunnel(s.ID, ip, port)
sessionId = s.ID
configuration.SetParameter("width", strconv.Itoa(s.Width))
configuration.SetParameter("height", strconv.Itoa(s.Height))
configuration.SetParameter("dpi", "96")
} else {
configuration.SetParameter("width", width)
configuration.SetParameter("height", height)
configuration.SetParameter("dpi", dpi)
s, err = service.SessionService.FindByIdAndDecrypt(ctx, sessionId)
if err != nil { if err != nil {
return err utils.Disconnect(ws, AccessGatewayCreateError, "创建SSH隧道失败"+err.Error())
} return nil
api.setConfig(propertyMap, s, configuration)
var (
ip = s.IP
port = s.Port
)
if s.AccessGatewayId != "" && s.AccessGatewayId != "-" {
g, err := service.GatewayService.GetGatewayAndReconnectById(s.AccessGatewayId)
if err != nil {
utils.Disconnect(ws, AccessGatewayUnAvailable, "获取接入网关失败:"+err.Error())
return nil
}
if !g.Connected {
utils.Disconnect(ws, AccessGatewayUnAvailable, "接入网关不可用:"+g.Message)
return nil
}
exposedIP, exposedPort, err := g.OpenSshTunnel(s.ID, ip, port)
if err != nil {
utils.Disconnect(ws, AccessGatewayCreateError, "创建SSH隧道失败"+err.Error())
return nil
}
ip = exposedIP
port = exposedPort
defer g.CloseSshTunnel(s.ID)
} }
ip = exposedIP
port = exposedPort
defer g.CloseSshTunnel(s.ID)
}
configuration.SetParameter("hostname", ip) configuration.SetParameter("hostname", ip)
configuration.SetParameter("port", strconv.Itoa(port)) configuration.SetParameter("port", strconv.Itoa(port))
// 加载资产配置的属性,优先级比全局配置的高,因此最后加载,覆盖掉全局配置 // 加载资产配置的属性,优先级比全局配置的高,因此最后加载,覆盖掉全局配置
attributes, err := repository.AssetRepository.FindAssetAttrMapByAssetId(ctx, s.AssetId) attributes, err := repository.AssetRepository.FindAssetAttrMapByAssetId(ctx, s.AssetId)
if err != nil { if err != nil {
return err return err
} }
if len(attributes) > 0 { if len(attributes) > 0 {
api.setAssetConfig(attributes, s, configuration) api.setAssetConfig(attributes, s, configuration)
}
} }
for name := range configuration.Parameters { for name := range configuration.Parameters {
// 替换数据库空格字符串占位符为真正的空格 // 替换数据库空格字符串占位符为真正的空格
@ -136,9 +117,7 @@ func (api GuacamoleApi) Guacamole(c echo.Context) error {
guacdTunnel, err := guacd.NewTunnel(addr, configuration) guacdTunnel, err := guacd.NewTunnel(addr, configuration)
if err != nil { if err != nil {
if connectionId == "" { utils.Disconnect(ws, NewTunnelError, err.Error())
utils.Disconnect(ws, NewTunnelError, err.Error())
}
log.Printf("[%v] 建立连接失败: %v", sessionId, err.Error()) log.Printf("[%v] 建立连接失败: %v", sessionId, err.Error())
return err return err
} }
@ -151,43 +130,31 @@ func (api GuacamoleApi) Guacamole(c echo.Context) error {
GuacdTunnel: guacdTunnel, GuacdTunnel: guacdTunnel,
} }
if connectionId == "" { if configuration.Protocol == constant.SSH {
if configuration.Protocol == constant.SSH { nextTerminal, err := CreateNextTerminalBySession(s)
nextTerminal, err := CreateNextTerminalBySession(s) if err == nil {
if err == nil { nextSession.NextTerminal = nextTerminal
nextSession.NextTerminal = nextTerminal
}
} }
}
nextSession.Observer = session.NewObserver(sessionId) nextSession.Observer = session.NewObserver(sessionId)
session.GlobalSessionManager.Add <- nextSession session.GlobalSessionManager.Add <- nextSession
go nextSession.Observer.Start() go nextSession.Observer.Start()
sess := model.Session{ sess := model.Session{
ConnectionId: guacdTunnel.UUID, ConnectionId: guacdTunnel.UUID,
Width: intWidth, Width: intWidth,
Height: intHeight, Height: intHeight,
Status: constant.Connecting, Status: constant.Connecting,
Recording: configuration.GetParameter(guacd.RecordingPath), Recording: configuration.GetParameter(guacd.RecordingPath),
} }
if sess.Recording == "" { if sess.Recording == "" {
// 未录屏时无需审计 // 未录屏时无需审计
sess.Reviewed = true sess.Reviewed = true
} }
// 创建新会话 // 创建新会话
log.Debugf("[%v] 新建会话成功: %v", sessionId, sess.ConnectionId) log.Debugf("[%v] 新建会话成功: %v", sessionId, sess.ConnectionId)
if err := repository.SessionRepository.UpdateById(ctx, &sess, sessionId); err != nil { if err := repository.SessionRepository.UpdateById(ctx, &sess, sessionId); err != nil {
return err return err
}
} else {
// 要监控会话
forObsSession := session.GlobalSessionManager.GetById(sessionId)
if forObsSession == nil {
utils.Disconnect(ws, NotFoundSession, "获取会话失败")
return nil
}
nextSession.ID = utils.UUID()
forObsSession.Observer.Add <- nextSession
log.Debugf("[%v:%v] 观察者[%v]加入会话[%v]", sessionId, connectionId, nextSession.ID, s.ConnectionId)
} }
guacamoleHandler := NewGuacamoleHandler(ws, guacdTunnel) guacamoleHandler := NewGuacamoleHandler(ws, guacdTunnel)
@ -196,21 +163,11 @@ func (api GuacamoleApi) Guacamole(c echo.Context) error {
for { for {
_, message, err := ws.ReadMessage() _, message, err := ws.ReadMessage()
if err != nil { if err != nil {
log.Debugf("[%v:%v] WebSocket已关闭, %v", sessionId, connectionId, err.Error()) log.Debugf("[%v] WebSocket已关闭, %v", sessionId, err.Error())
// guacdTunnel.Read() 会阻塞所以要先把guacdTunnel客户端关闭才能退出Guacd循环 // guacdTunnel.Read() 会阻塞所以要先把guacdTunnel客户端关闭才能退出Guacd循环
_ = guacdTunnel.Close() _ = guacdTunnel.Close()
if connectionId != "" { service.SessionService.CloseSessionById(sessionId, Normal, "用户正常退出")
observerId := nextSession.ID
forObsSession := session.GlobalSessionManager.GetById(sessionId)
if forObsSession != nil {
// 移除会话中保存的观察者信息
forObsSession.Observer.Del <- observerId
log.Debugf("[%v:%v] 观察者[%v]退出会话", sessionId, connectionId, observerId)
}
} else {
service.SessionService.CloseSessionById(sessionId, Normal, "用户正常退出")
}
guacamoleHandler.Stop() guacamoleHandler.Stop()
return nil return nil
} }
@ -245,6 +202,84 @@ func (api GuacamoleApi) setAssetConfig(attributes map[string]string, s model.Ses
} }
} }
func (api GuacamoleApi) GuacamoleMonitor(c echo.Context) error {
ws, err := UpGrader.Upgrade(c.Response().Writer, c.Request(), nil)
if err != nil {
log.Errorf("升级为WebSocket协议失败%v", err.Error())
return err
}
ctx := context.TODO()
sessionId := c.Param("id")
s, err := repository.SessionRepository.FindById(ctx, sessionId)
if err != nil {
return err
}
if s.Status != constant.Connected {
utils.Disconnect(ws, AssetNotActive, "会话离线")
return nil
}
connectionId := s.ConnectionId
configuration := guacd.NewConfiguration()
configuration.ConnectionID = connectionId
sessionId = s.ID
configuration.SetParameter("width", strconv.Itoa(s.Width))
configuration.SetParameter("height", strconv.Itoa(s.Height))
configuration.SetParameter("dpi", "96")
addr := config.GlobalCfg.Guacd.Hostname + ":" + strconv.Itoa(config.GlobalCfg.Guacd.Port)
asset := fmt.Sprintf("%s:%s", configuration.GetParameter("hostname"), configuration.GetParameter("port"))
log.Debugf("[%v] 新建 guacd 会话, guacd=%v, asset=%v", sessionId, addr, asset)
guacdTunnel, err := guacd.NewTunnel(addr, configuration)
if err != nil {
utils.Disconnect(ws, NewTunnelError, err.Error())
log.Printf("[%v] 建立连接失败: %v", sessionId, err.Error())
return err
}
nextSession := &session.Session{
ID: sessionId,
Protocol: s.Protocol,
Mode: s.Mode,
WebSocket: ws,
GuacdTunnel: guacdTunnel,
}
// 要监控会话
forObsSession := session.GlobalSessionManager.GetById(sessionId)
if forObsSession == nil {
utils.Disconnect(ws, NotFoundSession, "获取会话失败")
return nil
}
nextSession.ID = utils.UUID()
forObsSession.Observer.Add <- nextSession
log.Debugf("[%v:%v] 观察者[%v]加入会话[%v]", sessionId, connectionId, nextSession.ID, s.ConnectionId)
guacamoleHandler := NewGuacamoleHandler(ws, guacdTunnel)
guacamoleHandler.Start()
for {
_, message, err := ws.ReadMessage()
if err != nil {
log.Debugf("[%v:%v] WebSocket已关闭, %v", sessionId, connectionId, err.Error())
// guacdTunnel.Read() 会阻塞所以要先把guacdTunnel客户端关闭才能退出Guacd循环
_ = guacdTunnel.Close()
observerId := nextSession.ID
forObsSession.Observer.Del <- observerId
log.Debugf("[%v:%v] 观察者[%v]退出会话", sessionId, connectionId, observerId)
guacamoleHandler.Stop()
return nil
}
_, err = guacdTunnel.WriteAndFlush(message)
if err != nil {
service.SessionService.CloseSessionById(sessionId, TunnelClosed, "远程连接已关闭")
return nil
}
}
}
func (api GuacamoleApi) setConfig(propertyMap map[string]string, s model.Session, configuration *guacd.Configuration) { func (api GuacamoleApi) setConfig(propertyMap map[string]string, s model.Session, configuration *guacd.Configuration) {
if propertyMap[guacd.EnableRecording] == "true" { if propertyMap[guacd.EnableRecording] == "true" {
configuration.SetParameter(guacd.RecordingPath, path.Join(config.GlobalCfg.Guacd.Recording, s.ID)) configuration.SetParameter(guacd.RecordingPath, path.Join(config.GlobalCfg.Guacd.Recording, s.ID))

View File

@ -47,7 +47,7 @@ func (api WebTerminalApi) SshEndpoint(c echo.Context) error {
}() }()
ctx := context.TODO() ctx := context.TODO()
sessionId := c.QueryParam("sessionId") sessionId := c.Param("id")
cols, _ := strconv.Atoi(c.QueryParam("cols")) cols, _ := strconv.Atoi(c.QueryParam("cols"))
rows, _ := strconv.Atoi(c.QueryParam("rows")) rows, _ := strconv.Atoi(c.QueryParam("rows"))
@ -222,7 +222,7 @@ func (api WebTerminalApi) SshMonitorEndpoint(c echo.Context) error {
}() }()
ctx := context.TODO() ctx := context.TODO()
sessionId := c.QueryParam("sessionId") sessionId := c.Param("id")
s, err := repository.SessionRepository.FindById(ctx, sessionId) s, err := repository.SessionRepository.FindById(ctx, sessionId)
if err != nil { if err != nil {
return WriteMessage(ws, dto.NewMessage(Closed, "获取会话失败")) return WriteMessage(ws, dto.NewMessage(Closed, "获取会话失败"))

View File

@ -85,9 +85,6 @@ func setupRoutes() *echo.Echo {
e.POST("/login", accountApi.LoginEndpoint) e.POST("/login", accountApi.LoginEndpoint)
e.POST("/loginWithTotp", accountApi.LoginWithTotpEndpoint) e.POST("/loginWithTotp", accountApi.LoginWithTotpEndpoint)
e.GET("/ssh", webTerminalApi.SshEndpoint)
e.GET("/ssh-monitor", webTerminalApi.SshMonitorEndpoint)
account := e.Group("/account") account := e.Group("/account")
{ {
account.GET("/info", accountApi.InfoEndpoint) account.GET("/info", accountApi.InfoEndpoint)
@ -175,6 +172,9 @@ func setupRoutes() *echo.Echo {
sessions.POST("", SessionApi.SessionCreateEndpoint) sessions.POST("", SessionApi.SessionCreateEndpoint)
sessions.POST("/:id/connect", SessionApi.SessionConnectEndpoint) sessions.POST("/:id/connect", SessionApi.SessionConnectEndpoint)
sessions.GET("/:id/tunnel", guacamoleApi.Guacamole) sessions.GET("/:id/tunnel", guacamoleApi.Guacamole)
sessions.GET("/:id/tunnel-monitor", guacamoleApi.GuacamoleMonitor)
sessions.GET("/:id/ssh", webTerminalApi.SshEndpoint)
sessions.GET("/:id/ssh-monitor", webTerminalApi.SshMonitorEndpoint)
sessions.POST("/:id/resize", SessionApi.SessionResizeEndpoint) sessions.POST("/:id/resize", SessionApi.SessionResizeEndpoint)
sessions.GET("/:id/stats", SessionApi.SessionStatsEndpoint) sessions.GET("/:id/stats", SessionApi.SessionStatsEndpoint)

View File

@ -15,8 +15,6 @@ const STATE_DISCONNECTED = 5;
class AccessMonitor extends Component { class AccessMonitor extends Component {
formRef = React.createRef()
state = { state = {
client: {}, client: {},
containerOverflow: 'hidden', containerOverflow: 'hidden',
@ -29,7 +27,7 @@ class AccessMonitor extends Component {
}; };
async componentDidMount() { async componentDidMount() {
const connectionId = this.props.connectionId; const sessionId = this.props.sessionId;
let rate = this.props.rate; let rate = this.props.rate;
let protocol = this.props.protocol; let protocol = this.props.protocol;
let width = this.props.width; let width = this.props.width;
@ -45,7 +43,7 @@ class AccessMonitor extends Component {
height: height * rate, height: height * rate,
rate: rate, rate: rate,
}) })
this.renderDisplay(connectionId); this.renderDisplay(sessionId);
} }
componentWillUnmount() { componentWillUnmount() {
@ -110,9 +108,9 @@ class AccessMonitor extends Component {
}); });
} }
async renderDisplay(connectionId, protocol) { async renderDisplay(sessionId, protocol) {
let tunnel = new Guacamole.WebSocketTunnel(wsServer + '/tunnel'); let tunnel = new Guacamole.WebSocketTunnel(`${wsServer}/sessions/${sessionId}/tunnel-monitor`);
tunnel.onstatechange = this.onTunnelStateChange; tunnel.onstatechange = this.onTunnelStateChange;
let client = new Guacamole.Client(tunnel); let client = new Guacamole.Client(tunnel);
@ -128,7 +126,6 @@ class AccessMonitor extends Component {
let token = getToken(); let token = getToken();
let params = { let params = {
'connectionId': connectionId,
'X-Auth-Token': token 'X-Auth-Token': token
}; };

View File

@ -62,7 +62,7 @@ class BatchCommandTerm extends Component {
let paramStr = qs.stringify(params); let paramStr = qs.stringify(params);
let webSocket = new WebSocket(wsServer + '/ssh?' + paramStr); let webSocket = new WebSocket(`${wsServer}/sessions/${sessionId}/ssh?${paramStr}`);
this.props.appendWebsocket({'id': assetId, 'ws': webSocket}); this.props.appendWebsocket({'id': assetId, 'ws': webSocket});

View File

@ -113,13 +113,12 @@ class Term extends Component {
let params = { let params = {
'cols': term.cols, 'cols': term.cols,
'rows': term.rows, 'rows': term.rows,
'sessionId': sessionId,
'X-Auth-Token': token 'X-Auth-Token': token
}; };
let paramStr = qs.stringify(params); let paramStr = qs.stringify(params);
let webSocket = new WebSocket(wsServer + '/ssh?' + paramStr); let webSocket = new WebSocket(`${wsServer}/sessions/${sessionId}/ssh?${paramStr}`);
let pingInterval; let pingInterval;
webSocket.onopen = (e => { webSocket.onopen = (e => {

View File

@ -33,13 +33,12 @@ class TermMonitor extends Component {
let token = getToken(); let token = getToken();
let params = { let params = {
'sessionId': sessionId,
'X-Auth-Token': token 'X-Auth-Token': token
}; };
let paramStr = qs.stringify(params); let paramStr = qs.stringify(params);
let webSocket = new WebSocket(wsServer + '/ssh-monitor?' + paramStr); let webSocket = new WebSocket(`${wsServer}/sessions/${sessionId}/ssh-monitor?${paramStr}`);
webSocket.onmessage = (e) => { webSocket.onmessage = (e) => {
let msg = Message.parse(e.data); let msg = Message.parse(e.data);
switch (msg['type']) { switch (msg['type']) {

View File

@ -202,7 +202,6 @@ class OnlineSession extends Component {
showMonitor = (record) => { showMonitor = (record) => {
this.setState({ this.setState({
connectionId: record.connectionId,
sessionId: record.id, sessionId: record.id,
sessionProtocol: record.protocol, sessionProtocol: record.protocol,
sessionMode: record.mode, sessionMode: record.mode,
@ -477,7 +476,7 @@ class OnlineSession extends Component {
> >
{ {
this.state.sessionMode === 'guacd' ? this.state.sessionMode === 'guacd' ?
<AccessMonitor connectionId={this.state.connectionId} <AccessMonitor sessionId={this.state.sessionId}
width={this.state.sessionWidth} width={this.state.sessionWidth}
height={this.state.sessionHeight} height={this.state.sessionHeight}
protocol={this.state.sessionProtocol} protocol={this.state.sessionProtocol}