feat(p2p): replaced old ssh relay to new relay mechanism (improved p2p network stablity over NAT).

This commit is contained in:
Alexey Kasyanchuk 2019-03-29 00:50:37 +01:00
parent 639aa839e3
commit df1ebd1e03
3 changed files with 195 additions and 130 deletions

View File

@ -1,4 +1,3 @@
const ssh = require('./ssh')
const shuffle = require('./shuffle') const shuffle = require('./shuffle')
const config = require('./config'); const config = require('./config');
const net = require('net') const net = require('net')
@ -14,6 +13,14 @@ const {promisify} = require('util');
const mkdirp = promisify(require('mkdirp')) const mkdirp = promisify(require('mkdirp'))
const deleteFolderRecursive = require('./deleteFolderRecursive') const deleteFolderRecursive = require('./deleteFolderRecursive')
const compareVersions = require('compare-versions'); const compareVersions = require('compare-versions');
const portCheck = require('./portCheck')
const findGoodPort = async (port, host) => {
while (!(await portCheck(port, host))) {
port++
}
return port
}
class p2p { class p2p {
constructor(send = () => {}) constructor(send = () => {})
@ -44,6 +51,13 @@ class p2p {
this.tcpServer = net.createServer(); this.tcpServer = net.createServer();
this.tcpServer.maxConnections = config.p2pConnections * 2; this.tcpServer.maxConnections = config.p2pConnections * 2;
this.relay = {server: false, client: false}
this.selfAddress = null;
this.relayServers = {};
this.relayServersLimit = 8;
// <-> server commination for relays
this.relaySocket = null;
// define some help info // define some help info
Object.defineProperty(this.info, 'maxPeersConnections', { Object.defineProperty(this.info, 'maxPeersConnections', {
enumerable: true, enumerable: true,
@ -101,15 +115,26 @@ class p2p {
return; return;
} }
for(const peer of this.clients) {
if(peer.peerId === data.peerId) {
// already connected from different interface
logT('p2p', 'server peer', data.peerId, 'already connected from different address', '( check:', peer.files === data.files && peer.torrents === data.torrents, ')');
return;
}
}
// protocol ok // protocol ok
clearTimeout(socketObject.protocolTimeout) clearTimeout(socketObject.protocolTimeout)
const { _socket: socket } = socketObject const { _socket: socket } = socketObject
socketObject.rats = true socketObject.rats = true
socketObject.peerId = data.peerId
socketObject.relay = data.relay
callback({ callback({
protocol: 'rats', protocol: 'rats',
version: this.version, version: this.version,
peerId: this.peerId, peerId: this.peerId,
relay: this.relay,
info: this.info, info: this.info,
peers: this.addresses(this.recommendedPeersList()) peers: this.addresses(this.recommendedPeersList())
}) })
@ -202,6 +227,72 @@ class p2p {
readable = null readable = null
}); });
}) })
this.on('relay', async (nil, callback, remote) => {
if(this.relay.server && remote.relay) {
// update status, because client ask for relay
remote.relay.client = true
if(!this.relayServers[remote.peerId] && Object.keys(this.relayServers).length < this.relayServersLimit) {
let relayPort;
const server = net.createServer();
this.relayServers[remote.peerId] = server;
let relay;
const peers = {}
const establishConnectionTimeout = setTimeout(() => {
logTE('relay', `not recived relay income connection, timeout`);
server.close();
delete this.relayServers[remote.peerId]
}, 8000)
server.on('connection', (peer) => {
logT('relay', `new relay connection`);
peer = new JsonSocket(peer);
peer._id = Math.random().toString(36).substring(2, 15)
peers[peer._id] = peer
peer.on('message', (data) => {
if (!relay && data && remote.peerId == data.peerId) {
relay = peer
logT('relay', `reply root pear fouded, current openned relays ${Object.keys(this.relayServers).length}`);
if (this.selfAddress) {
logT('relay', `exchange ${remote.peerId} relay to other peers`);
this.emit('peer', {port: relayPort, address: this.selfAddress})
}
clearTimeout(establishConnectionTimeout);
return;
}
if (relay) {
if(peer === relay && data.id && peers[data.id]) {
//logT('relay', `server message to pear ${data.id}`);
peers[data.id].sendMessage(data.data)
} else {
//logT('relay', `server message to relay ${peer._id}`);
relay.sendMessage({id: peer._id, data});
}
}
});
peer.on('close', () => {
if(peer == relay) {
logT('relay', `relay client disconnected`);
relay = null
server.close();
delete this.relayServers[remote.peerId]
} else if(relay) {
logT('relay', `relay peer disconnected`);
relay.sendMessage({id: peer._id, close: true});
}
if(peer._id && peers[peer._id])
delete peers[peer._id]
});
peer.on('error', (err) => {
logTE('relay', `relay server peer error ${err}`);
})
});
relayPort = await findGoodPort(Math.floor(Math.random() * 50000) + 10000, '0.0.0.0')
server.listen(relayPort, '0.0.0.0');
logT('relay', `establish new relay server on port`, relayPort);
callback({port: relayPort})
}
}
})
} }
listen() { listen() {
@ -210,60 +301,48 @@ class p2p {
} }
checkPortAndRedirect(address, port) { checkPortAndRedirect(address, port) {
isPortReachable(port, {host: address}).then((isAvailable) => { this.selfAddress = address;
isPortReachable(port, {host: address}).then(async (isAvailable) => {
if(this.closing) if(this.closing)
return // responce can be very late, and ssh can start after closing of program, this will break on linux return // responce can be very late, and can start after closing of program, this will break on linux
this.p2pStatus = isAvailable ? 2 : 0
this.send('p2pStatus', this.p2pStatus)
// all ok don't need to start any ssh tunnels
if(isAvailable) if(isAvailable)
{ {
logT('ssh', 'tcp p2p port is reachable - all ok') logT('relay', 'tcp p2p port is reachable - all ok')
return; const server = net.createServer();
const randomPort = await findGoodPort(Math.floor(Math.random() * 50000) + 10000, '0.0.0.0')
server.listen(randomPort, '0.0.0.0');
isPortReachable(randomPort, {host: address}).then(async (isAvailable) => {
if(isAvailable) {
logT('relay', 'relay server port check success - can be using as relay')
this.relay.server = true;
} else {
logT('relay', 'relay server port check failes - not using as relay')
}
server.close();
})
this.p2pStatus = 2
this.send('p2pStatus', this.p2pStatus)
} }
else else
{ {
logT('ssh', 'tcp p2p port is unreachable - try ssh tunnel') logT('relay', 'tcp p2p port is unreachable, using relay client')
} this.relay.client = true;
// try reconnect to new relay server
if(!this.encryptor) let candidatePeer = this.peersList().filter(peer => peer.relay && peer.relay.server)
{ if(candidatePeer && candidatePeer.length > 0) {
logT('ssh', 'something wrong with encryptor') logT('relay', 'reconnect to new relay, because no relays connection before check');
return this.connectToRelay(candidatePeer[0])
}
let remoteHost = '03de848286b8fbe6e775e6601c3bcfb9b71dfddcacb861b061458ce5e4020a15a649aabef88234d2af01ead4276a6de1YlqiJBlXCmoA7TpnbRuSRHNDsIBLlZ9McbovKJXHtAA='
this.ssh = ssh(config.spiderPort, this.encryptor.decrypt(remoteHost), 'relay', 'relaymytrf', (selfPeer) => {
if(!selfPeer)
{
this.p2pStatus = 0
this.send('p2pStatus', this.p2pStatus)
this.externalPeers = []
return
} }
this.p2pStatus = 0
logT('ssh', 'ssh tunnel success, redirect peers to ssh') this.send('p2pStatus', this.p2pStatus)
}
this.p2pStatus = 1
this.send('p2pStatus', this.p2pStatus)
this.ignore(selfPeer)
this.emit('peer', selfPeer)
this.externalPeers = [selfPeer] // add external peers and tell this on every connection
})
}) })
} }
close() close()
{ {
this.closing = true this.closing = true
if(this.ssh)
{
logT('ssh', 'closing ssh...')
this.ssh.kill()
}
// close server // close server
const promise = new Promise(resolve => this.tcpServer.close(resolve)) const promise = new Promise(resolve => this.tcpServer.close(resolve))
for (const client in this.clients) { for (const client in this.clients) {
@ -329,6 +408,76 @@ class p2p {
return _.uniq(peers) return _.uniq(peers)
} }
connectToRelay(relayPeer, tryes = 3)
{
if(this.relay.client && relayPeer.relay.server && !this.relaySocket) {
relayPeer.emit('relay', {}, ({port} = {}) => {
if(!port) {
logTE('relay', 'no port in relay request responce');
return;
}
logT('relay', 'try connecting to new relay', relayPeer.peerId)
let peers = {}
this.relaySocket = new JsonSocket(new net.Socket());
this.relaySocket.connect(port, relayPeer.address, () => {
logT('relay', 'connected to relay', relayPeer.peerId);
this.relaySocket.sendMessage({peerId: this.peerId})
this.p2pStatus = 1
this.send('p2pStatus', this.p2pStatus)
tryes = 3; // restore tryies bebause we connected
});
this.relaySocket.on('message', (data) => {
if(!data.id)
return
if(!peers[data.id]) {
if(data.close)
return
peers[data.id] = new JsonSocket(new net.Socket());
peers[data.id].on('message', (toPeer) => {
//logT('relay', 'client message to relay', data.id);
this.relaySocket.sendMessage({id: data.id, data: toPeer})
})
peers[data.id].connect(config.spiderPort, '0.0.0.0', () => {
//logT('relay', 'client message to my server', data.id);
peers[data.id].sendMessage(data.data)
});
} else {
if(data.close) {
peers[data.id].destroy();
delete peers[data.id];
logT('relay', 'peer disconnected');
return
}
//logT('relay', 'client message to my server', data.id);
peers[data.id].sendMessage(data.data)
}
});
this.relaySocket.on('close', () => {
logT('relay', 'relay client closed because server exit');
for(const id in peers) {
peers[id].destroy();
}
peers = null
this.relaySocket = null
this.p2pStatus = 0
this.send('p2pStatus', this.p2pStatus)
// try reconnect to new relay server
let candidatePeer = this.peersList().filter(peer => peer.relay && peer.relay.server && peer != relayPeer)
if(candidatePeer && candidatePeer.length > 0 && tryes > 0) {
logT('relay', 'reconnect to new relay, because old closed');
this.connectToRelay(candidatePeer[0], --tryes)
}
});
})
}
}
connect(address) connect(address)
{ {
this.peers.push(address) this.peers.push(address)
@ -368,6 +517,7 @@ class p2p {
port: config.spiderPort, port: config.spiderPort,
version: this.version, version: this.version,
peerId: this.peerId, peerId: this.peerId,
relay: this.relay,
info: this.info, info: this.info,
peers: this.addresses(this.recommendedPeersList()).concat(this.externalPeers) // also add external peers peers: this.addresses(this.recommendedPeersList()).concat(this.externalPeers) // also add external peers
}, (data) => { }, (data) => {
@ -406,6 +556,7 @@ class p2p {
//extra info //extra info
address.version = data.version address.version = data.version
address.peerId = data.peerId address.peerId = data.peerId
address.relay = data.relay
address.info = data.info address.info = data.info
this.send('peer', { this.send('peer', {
size: this.size, size: this.size,
@ -419,6 +570,9 @@ class p2p {
{ {
data.peers.forEach(peer => this.add(peer)) data.peers.forEach(peer => this.add(peer))
} }
// try connect to relay if needed
this.connectToRelay(address)
}) })
}); });

View File

@ -39,7 +39,6 @@ const mime = require('mime');
//server.listen(config.httpPort); //server.listen(config.httpPort);
//console.log('Listening web server on', config.httpPort, 'port') //console.log('Listening web server on', config.httpPort, 'port')
module.exports = function (send, recive, dataDirectory, version, env) module.exports = function (send, recive, dataDirectory, version, env)
{ {
this.initialized = (async () => this.initialized = (async () =>

View File

@ -1,88 +0,0 @@
const appPath = require('./electronAppPath')
const { spawn } = require('child_process')
const fs = require('fs')
function getRandomInt(min, max) {
return Math.floor(Math.random() * (max - min)) + min;
}
let tryies = 5;
const startSSH = (port, host, user, password, callback) => {
let remotePort = getRandomInt(10000, 65000)
if(tryies-- <= 0)
{
if(callback)
callback(false)
return
}
const options = [
'-N',
'-T',
'-R', `0.0.0.0:${remotePort}:127.0.0.1:${port}`,
`${user}@${host}`,
'-pw', password,
'-v'
]
const sshClientPath = appPath('plink');
if(!fs.existsSync(sshClientPath)) {
logTE('ssh', 'plink not founded for system in path', sshClientPath)
return
}
const ssh = spawn(sshClientPath, options)
const checkMessage = (data) => {
if(data.includes(`Remote port forwarding from 0.0.0.0:${remotePort}`))
{
if(data.includes('refused'))
{
ssh.kill()
startSSH(port, host, user, password, callback)
}
else if(data.includes('enabled'))
{
if(callback)
callback({address: host, port: remotePort})
}
}
}
ssh.stdout.on('data', (data) => {
logT('ssh', `ssh: ${data}`)
checkMessage(data)
if(data.includes('Store key in cache?'))
{
ssh.stdin.write("y")
ssh.stdin.end()
}
})
ssh.stderr.on('data', (data) => {
logT('ssh', `ssh error: ${data}`);
checkMessage(data)
if(data.includes('Password authentication failed'))
{
ssh.kill()
}
if(data.includes('Store key in cache?'))
{
ssh.stdin.write("y")
ssh.stdin.end()
}
});
ssh.on('close', (code, signal) => {
logT('ssh', `ssh closed with code ${code} and signal ${signal}`)
if(callback)
callback(false)
})
return ssh
}
module.exports = startSSH