diff --git a/src/background/bt/spider.js b/src/background/bt/spider.js index aa4a436..3ee96d5 100644 --- a/src/background/bt/spider.js +++ b/src/background/bt/spider.js @@ -52,6 +52,7 @@ class Spider extends Emiter { this.cpuInterval = config.spider.cpuInterval; this.announceHashes = [] + this.searchHashes = [] } send(message, address) { @@ -72,7 +73,7 @@ class Spider extends Emiter { this.send(message, address) } - getPeersRequest(infoHash) { + getPeersRequest(infoHash, address) { const message = { t: generateTid(), y: 'q', @@ -82,13 +83,7 @@ class Spider extends Emiter { info_hash: infoHash } } - for(const address of this.table.nodes) - { - if(parseInt(Math.random() * 5) !== 1) - continue; - - this.send(message, address) - } + this.send(message, address) } announcePeer(infoHash, token, address, port) @@ -126,7 +121,10 @@ class Spider extends Emiter { ) { const node = this.table.shift() - if (node) { + //console.log(parseInt(Math.random() * this.table.caption / (this.table.nodes.length || 0))) + //if (node) { + if (node && parseInt(Math.random() * this.table.nodes.length / 8) === 0) { + //console.log('walk', this.table.nodes.length) this.findNode(Node.neighbor(node.id, this.table.id), {address: node.address, port: node.port}) } } @@ -141,6 +139,7 @@ class Spider extends Emiter { this.table.add(node) } }) + //console.log('nodes', this.table.nodes.length) this.emit('nodes', nodes) // announce torrents @@ -154,6 +153,7 @@ class Spider extends Emiter { } onFoundPeers(peers, token, address) { + console.log('responce') if(token) { for(const hash of this.announceHashes) @@ -167,6 +167,7 @@ class Spider extends Emiter { const ips = Node.decodeCompactIP(peers) this.emit('peer', ips) + console.log('p', ips) } onFindNodeRequest(message, address) { @@ -192,6 +193,12 @@ class Spider extends Emiter { nodes: Node.encodeNodes(this.table.first()) } }, address) + + // also check hashes of alive ones + for(const hash of this.announceHashes) + { + this.getPeersRequest(hash, address) + } } onGetPeersRequest(message, address) { @@ -220,6 +227,12 @@ class Spider extends Emiter { }, address) this.emit('unensureHash', infohash.toString('hex').toUpperCase()) + + // also check hashes of alive ones + for(const hash of this.announceHashes) + { + this.getPeersRequest(hash, address) + } } onAnnouncePeerRequest(message, address) { @@ -241,7 +254,7 @@ class Spider extends Emiter { if(this.client && !this.ignore) { cpuDebug('cpu usage:' + cpuUsage()) if(this.cpuLimit <= 0 || cpuUsage() <= this.cpuLimit + this.cpuInterval) { - this.client.add(addressPair, infohash); + // this.client.add(addressPair, infohash); } } } @@ -335,12 +348,14 @@ class Spider extends Emiter { } } + /* this.announceSearchInterval = setInterval(() => { for(const hash of this.announceHashes) { this.getPeersRequest(hash) } }, 3000) + */ } close(callback) @@ -353,8 +368,8 @@ class Spider extends Emiter { clearInterval(this.joinInterval) if(this.trafficInterval) clearInterval(this.trafficInterval) - if(this.announceSearchInterval) - clearInterval(this.announceSearchInterval) + //if(this.announceSearchInterval) + // clearInterval(this.announceSearchInterval) this.closing = true this.udp.close(() => { this.initialized = false diff --git a/src/background/bt/table.js b/src/background/bt/table.js index 4cdebaa..5b0140b 100644 --- a/src/background/bt/table.js +++ b/src/background/bt/table.js @@ -57,9 +57,11 @@ class Table{ this.nodes = [] this.caption = cap } - add(node) { + add(node, onAdd) { if (this.nodes.length < this.caption) { this.nodes.push(node) + if(onAdd) + onAdd(node) } } shift() { diff --git a/src/background/p2p.js b/src/background/p2p.js new file mode 100644 index 0000000..ec22883 --- /dev/null +++ b/src/background/p2p.js @@ -0,0 +1,126 @@ +const config = require('./config'); +const net = require('net') +const JsonSocket = require('json-socket') + +class p2p { + peers = [] + ignoreAddresses = [] + messageHandlers = {} + size = 0 + + constructor(send = () => {}) + { + this.send = send + this.tcpServer = net.createServer(); + this.tcpServer.on('connection', (socket) => { + //console.log('p2p server connection', socket.remoteAddress) + socket = new JsonSocket(socket); + socket.on('error', (err) => {}) + socket.on('message', (message) => { + if(message.type && this.messageHandlers[message.type]) + { + this.messageHandlers[message.type](message.data, (data) => { + socket.sendMessage({ + id: message.id, + data + }); + }) + } + }); + }) + } + + listen() { + console.log('listen p2p on', config.spiderPort, 'port') + this.tcpServer.listen(config.spiderPort); + } + + on(type, callback) { + this.messageHandlers[type] = callback + } + + add(address) { + const { peers } = this + + if(this.size > 10) + return; + + if(address.port <= 1 || address.port > 65535) + return; + + if(this.ignoreAddresses.includes(address.address)) + return; + + for(let peer of peers) + { + if(peer.address === address.address) { + peer.port = address.port; + return; + } + } + this.connect(address) + } + + connect(address) + { + this.peers.push(address) + const socket = new JsonSocket(new net.Socket()); //Decorate a standard net.Socket with JsonSocket + socket.on('connect', () => { //Don't send until we're connected + // add to peers + this.size++; + this.send('peer', this.size) + console.log('new peer', address) + + const callbacks = {} + socket.on('message', (message) => { + if(message.id && callbacks[message.id]) + { + callbacks[message.id](message.data); + delete callbacks[message.id]; + } + }); + + const emit = (type, data, callback) => { + const id = Math.random().toString(36).substring(5) + if(callback) + callbacks[id] = callback; + socket.sendMessage({ + id, + type, + data + }); + } + address.emit = emit + }); + + socket.on('close', () => { + const index = this.peers.indexOf(address); + if(index >= 0) + { + if(this.peers[index].emit) // only autorized peers + { + this.size--; + this.send('peer', this.size) + } + this.peers.splice(index, 1); + + console.log('close peer connection', address) + } + }) + + socket.on('error', (err) => {}) + + socket.connect(address.port, address.address); + } + + emit(type, data, callback) + { + for(const peer of this.peers) + { + if(peer.emit) + peer.emit(type, data, callback) + } + } +} + +module.exports = p2p \ No newline at end of file diff --git a/src/background/spider.js b/src/background/spider.js index a85ba41..2e03840 100644 --- a/src/background/spider.js +++ b/src/background/spider.js @@ -3,9 +3,8 @@ const client = new (require('./bt/client')) const spider = new (require('./bt/spider'))(client) const mysql = require('mysql'); const getPeersStatisticUDP = require('./bt/udp-tracker-request') -const net = require('net') -const JsonSocket = require('json-socket') const crypto = require('crypto') +const P2PServer = require('./p2p') const stun = require('stun') //var express = require('express'); //var app = express(); @@ -253,30 +252,8 @@ setInterval(() => { topCache = {}; }, 24 * 60 * 60 * 1000); - - -// socket -const messageHandlers = {} -const onSocketMessage = (type, callback) => { - messageHandlers[type] = callback -} -const tcpServer = net.createServer(); -tcpServer.listen(config.spiderPort); -tcpServer.on('connection', (socket) => { - socket = new JsonSocket(socket); - socket.on('error', (err) => {}) - socket.on('message', (message) => { - if(message.type && messageHandlers[message.type]) - { - messageHandlers[message.type](message.data, (data) => { - socket.sendMessage({ - id: message.id, - data - }); - }) - } - }); -}) +const p2p = new P2PServer(send) +p2p.listen() //io.on('connection', function(socket) //{ @@ -437,7 +414,7 @@ tcpServer.on('connection', (socket) => { }) }); - onSocketMessage('searchTorrent', ({text, navigation} = {}, callback) => { + p2p.on('searchTorrent', ({text, navigation} = {}, callback) => { if(!text) return; @@ -541,7 +518,7 @@ tcpServer.on('connection', (socket) => { }) }); - onSocketMessage('searchFiles', ({text, navigation} = {}, callback) => { + p2p.on('searchFiles', ({text, navigation} = {}, callback) => { if(!text) return; @@ -1046,86 +1023,6 @@ client.on('complete', function (metadata, infohash, rinfo) { }); -const p2p = { - peers: [], - ignoreAddresses: [], - add(address) { - const { peers } = this - - if(peers.length > 10) - return; - - if(address.port <= 1 || address.port > 65535) - return; - - if(this.ignoreAddresses.includes(address.address)) - return; - - for(let peer of peers) - { - if(peer.address === address.address) { - peer.port = address.port; - return; - } - } - this.connect(address) - }, - connect(address) - { - this.peers.push(address) - const socket = new JsonSocket(new net.Socket()); //Decorate a standard net.Socket with JsonSocket - socket.on('connect', () => { //Don't send until we're connected - // add to peers - send('peer', this.peers.length) - console.log('new peer', address) - - const callbacks = {} - socket.on('message', (message) => { - if(message.id && callbacks[message.id]) - { - callbacks[message.id](message.data); - delete callbacks[message.id]; - } - }); - - const emit = (type, data, callback) => { - const id = Math.random().toString(36).substring(5) - if(callback) - callbacks[id] = callback; - socket.sendMessage({ - id, - type, - data - }); - } - address.emit = emit - }); - - socket.on('close', () => { - const index = this.peers.indexOf(address); - if(index >= 0) - { - this.peers.splice(index, 1); - - console.log('close peer connection', address) - send('peer', this.peers.length) - } - }) - - socket.on('error', (err) => {}) - - socket.connect(address.port, address.address); - }, - emit(type, data, callback) - { - for(const peer of this.peers) - { - if(peer.emit) - peer.emit(type, data, callback) - } - } -} - const { STUN_BINDING_REQUEST, STUN_ATTR_XOR_MAPPED_ADDRESS } = stun.constants const stunServer = stun.createServer() const stunRequest = stun.createMessage(STUN_BINDING_REQUEST) @@ -1139,11 +1036,6 @@ stunServer.once('bindingResponse', stunMsg => { stunServer.send(stunRequest, 19302, 'stun.l.google.com') spider.on('peer', (IPs) => { - const { peers } = p2p; - - if(peers.length > 10) - return - IPs.forEach(ip => p2p.add(ip)) })