From a1330db184ad84e070b4d1be893afa511fe5fa35 Mon Sep 17 00:00:00 2001 From: Alexey Kasyanchuk Date: Sat, 31 Dec 2016 06:30:07 +0300 Subject: [PATCH] init --- .gitignore | 1 + package.json | 15 +++ src/client.js | 94 ++++++++++++++++++ src/peer-queue.js | 55 +++++++++++ src/spider.js | 193 ++++++++++++++++++++++++++++++++++++ src/table.js | 72 ++++++++++++++ src/token.js | 16 +++ src/wire.js | 247 ++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 693 insertions(+) create mode 100644 .gitignore create mode 100644 package.json create mode 100644 src/client.js create mode 100644 src/peer-queue.js create mode 100644 src/spider.js create mode 100644 src/table.js create mode 100644 src/token.js create mode 100644 src/wire.js diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8d87b1d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +node_modules/* diff --git a/package.json b/package.json new file mode 100644 index 0000000..aacc85c --- /dev/null +++ b/package.json @@ -0,0 +1,15 @@ +{ + "name": "btsearch", + "version": "1.0.0", + "description": "", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "", + "license": "ISC", + "dependencies": { + "bencode": "^0.11.0", + "bitfield": "^1.1.2" + } +} diff --git a/src/client.js b/src/client.js new file mode 100644 index 0000000..d766da8 --- /dev/null +++ b/src/client.js @@ -0,0 +1,94 @@ +'use strict' + +const Emiter = require('events') +var util = require('util'); +var net = require('net'); + +var PeerQueue = require('./peer-queue'); +var Wire = require('./wire'); + + +class Client extends Emiter +{ + constructor(options) { + super(); + this.timeout = 5000; + this.maxConnections = 200; + this.activeConnections = 0; + this.peers = new PeerQueue(this.maxConnections); + this.on('download', this._download); + + // if (typeof options.ignore === 'function') { + // this.ignore = options.ignore; + //} + //else { + this.ignore = function (infohash, rinfo, ignore) { + ignore(false); + }; + // } + } + + _next(infohash, successful) { + var req = this.peers.shift(infohash, successful); + if (req) { + this.ignore(req.infohash.toString('hex'), req.rinfo, (drop) => { + if (!drop) { + this.emit('download', req.rinfo, req.infohash); + } + }); + } + } + + _download(rinfo, infohash) + { + console.log('start download ' + infohash.toString('hex')); + this.activeConnections++; + + var successful = false; + var socket = new net.Socket(); + + socket.setTimeout(this.timeout || 5000); + socket.connect(rinfo.port, rinfo.address, () => { + var wire = new Wire(infohash); + socket.pipe(wire).pipe(socket); + + wire.on('metadata', (metadata, infoHash) => { + successful = true; + this.emit('complete', metadata, infoHash, rinfo); + socket.destroy(); + }); + + wire.on('fail', () => { + socket.destroy(); + }); + + wire.sendHandshake(); + }); + + socket.on('error', (err) => { + socket.destroy(); + }); + + socket.on('timeout', (err) => { + socket.destroy(); + }); + + socket.once('close', () => { + this.activeConnections--; + this._next(infohash, successful); + }); + } + + add(rinfo, infohash) { + this.peers.push({infohash: infohash, rinfo: rinfo}); + if (this.activeConnections < this.maxConnections && this.peers.length() > 0) { + this._next(); + } + } + + isIdle() { + return this.peers.length() === 0; + } +} + +module.exports = Client; \ No newline at end of file diff --git a/src/peer-queue.js b/src/peer-queue.js new file mode 100644 index 0000000..6d289c1 --- /dev/null +++ b/src/peer-queue.js @@ -0,0 +1,55 @@ +'use strict'; + +var PeerQueue = function (maxSize, perLimit) { + this.maxSize = maxSize || 200; + this.perLimit = perLimit || 10; + this.peers = {}; + this.reqs = []; +}; + +PeerQueue.prototype._shift = function () { + if (this.length() > 0) { + var req = this.reqs.shift(); + this.peers[req.infohash.toString('hex')] = []; + return req; + } +}; + +PeerQueue.prototype.push = function (peer) { + var infohashHex = peer.infohash.toString('hex'); + var peers = this.peers[infohashHex]; + + if (peers && peers.length < this.perLimit) { + peers.push(peer); + } + else if (this.length() < this.maxSize) { + this.reqs.push(peer); + } +}; + +PeerQueue.prototype.shift = function (infohash, successful) { + if (infohash) { + var infohashHex = infohash.toString('hex'); + if (successful === true) { + delete this.peers[infohashHex]; + } + else { + var peers = this.peers[infohashHex]; + if (peers) { + if (peers.length > 0) { + return peers.shift(); + } + else { + delete this.peers[infohashHex]; + } + } + } + } + return this._shift(); +}; + +PeerQueue.prototype.length = function () { + return this.reqs.length; +}; + +module.exports = PeerQueue; \ No newline at end of file diff --git a/src/spider.js b/src/spider.js new file mode 100644 index 0000000..04a6fcb --- /dev/null +++ b/src/spider.js @@ -0,0 +1,193 @@ +'use strict' + +const dgram = require('dgram') +const Emiter = require('events') +const bencode = require('bencode') +const {Table, Node} = require('./table') +const Token = require('./token') + +const bootstraps = [{ + address: 'router.bittorrent.com', + port: 6881 +}, { + address: 'router.utorrent.com', + port: 6881 +}, { + address: 'dht.transmissionbt.com', + port: 6881 +}, { + address: 'dht.aelitis.com', + port: 6881 +}] + +function isValidPort(port) { + return port > 0 && port < (1 << 16) +} + +function generateTid() { + return parseInt(Math.random() * 99).toString() +} + +class Spider extends Emiter { + constructor(client) { + super() + const options = arguments.length? arguments[0]: {} + this.udp = dgram.createSocket('udp4') + this.table = new Table(options.tableCaption || 1000) + this.bootstraps = options.bootstraps || bootstraps + this.token = new Token() + this.client = client + + this.walkInterval = 5; + } + + send(message, address) { + const data = bencode.encode(message) + this.udp.send(data, 0, data.length, address.port, address.address) + } + + findNode(id, address) { + const message = { + t: generateTid(), + y: 'q', + q: 'find_node', + a: { + id: id, + target: Node.generateID() + } + } + this.send(message, address) + } + + join() { + this.bootstraps.forEach((bootstrap) => { + this.findNode(this.table.id, bootstrap) + }) + } + + walk() { + if(!this.client || this.client.isIdle()) { + const node = this.table.shift() + if (node) { + this.findNode(Node.neighbor(node.id, this.table.id), {address: node.address, port: node.port}) + } + } + setTimeout(()=>this.walk(), this.walkInterval) + } + + onFoundNodes(data) { + const nodes = Node.decodeNodes(data) + nodes.forEach((node) => { + if (node.id != this.table.id && isValidPort(node.port)) { + this.table.add(node) + } + }) + this.emit('nodes', nodes) + } + + onFindNodeRequest(message, address) { + const {t: tid, a: {id: nid, target: infohash}} = message + + if (tid === undefined || target.length != 20 || nid.length != 20) { + return + } + + this.send({ + t: tid, + y: 'r', + r: { + id: Node.neighbor(nid, this.table.id), + nodes: Node.encodeNodes(this.table.first()) + } + }, address) + } + + onGetPeersRequest(message, address) { + const {t: tid, a: {id: nid, info_hash: infohash}} = message + + if (tid === undefined || infohash.length != 20 || nid.length != 20) { + return + } + + this.send({ + t: tid, + y: 'r', + r: { + id: Node.neighbor(nid, this.table.id), + nodes: Node.encodeNodes(this.table.first()), + token: this.token.token + } + }, address) + + this.emit('unensureHash', infohash.toString('hex').toUpperCase()) + } + + onAnnouncePeerRequest(message, address) { + let {t: tid, a: {info_hash: infohash, token: token, id: id, implied_port: implied, port: port}} = message + if (!tid) return + + if (!this.token.isValid(token)) return + + port = (implied != undefined && implied != 0) ? address.port : (port || 0) + if (!isValidPort(port)) return + + this.send({ t: tid, y: 'r', r: { id: Node.neighbor(id, this.table.id) } }, address) + + let addressPair = { + address: address.address, + port: port + }; + this.emit('ensureHash', infohash.toString('hex').toUpperCase(), addressPair) + if(this.client) { + this.client.add(addressPair, infohash); + } + } + + onPingRequest(message, address) { + this.send({ t: message.t, y: 'r', r: { id: Node.neighbor(message.a.id, this.table.id) } }, address) + } + + parse(data, address) { + try { + const message = bencode.decode(data) + if (message.y.toString() == 'r' && message.r.nodes) { + this.onFoundNodes(message.r.nodes) + } else if (message.y.toString() == 'q') { + switch(message.q.toString()) { + case 'get_peers': + this.onGetPeersRequest(message, address) + break + case 'announce_peer': + this.onAnnouncePeerRequest(message, address) + break + case 'find_node': + this.onFindNodeRequest(message, address) + break + case 'ping': + this.onPingRequest(message, address) + break + } + } + } catch (err) {} + } + + listen(port) { + this.udp.bind(port) + this.udp.on('listening', () => { + console.log(`Listen on ${this.udp.address().address}:${this.udp.address().port}`) + }) + this.udp.on('message', (data, addr) => { + this.parse(data, addr) + }) + this.udp.on('error', (err) => {}) + setInterval(() => { + if(!this.client || this.client.isIdle()) { + this.join() + } + }, 3000) + this.join() + this.walk() + } +} + +module.exports = Spider \ No newline at end of file diff --git a/src/table.js b/src/table.js new file mode 100644 index 0000000..c3ac200 --- /dev/null +++ b/src/table.js @@ -0,0 +1,72 @@ +'use strict' + +const crypto = require('crypto') + +class Node { + static generateID() { + return crypto.createHash('sha1').update(crypto.randomBytes(20)).digest() + } + + constructor(id) { + this.id = id || Node.generateNodeID() + } + + static neighbor(target, id) { + return Buffer.concat([target.slice(0, 10), id.slice(10)]) + } + + static encodeNodes(nodes) { + return Buffer.concat(nodes.map((node)=> Buffer.concat([node.id, Node.encodeIP(node.address), Node.encodePort(node.port)]))) + } + + static decodeNodes(data) { + const nodes = [] + for (let i = 0; i + 26 <= data.length; i += 26) { + nodes.push({ + id: data.slice(i, i + 20), + address: `${data[i + 20]}.${data[i + 21]}.${data[i + 22]}.${data[i + 23]}`, + port: data.readUInt16BE(i + 24) + }) + } + return nodes + } + + static encodeIP(ip) { + return Buffer.from(ip.split('.').map((i)=>parseInt(i))) + } + + static encodePort(port) { + const data = Buffer.alloc(2) + data.writeUInt16BE(port, 0) + return data + } +} + +class Table{ + constructor(cap) { + this.id = Node.generateID() + this.nodes = [] + this.caption = cap + } + add(node) { + if (this.nodes.length < this.caption) { + this.nodes.push(node) + } + } + shift() { + return this.nodes.shift() + } + size() { + return this.nodes.length; + } + first() { + if(this.nodes.length >= 8) { + return this.nodes.slice(0, 8) + }else if(this.nodes.length > 0) { + return new Array(8).join().split(',').map(()=> this.nodes[0]) + } + return [] + } +} + +module.exports = {Table, Node} \ No newline at end of file diff --git a/src/token.js b/src/token.js new file mode 100644 index 0000000..827584a --- /dev/null +++ b/src/token.js @@ -0,0 +1,16 @@ +'use strict' + +module.exports = class { + constructor() { + this.generate() + setInterval(()=> this.generate(), 60000*15) + } + + isValid(t) { + return t.toString() === this.token.toString() + } + + generate() { + this.token = new Buffer([parseInt(Math.random()*200), parseInt(Math.random()*200)]) + } +} \ No newline at end of file diff --git a/src/wire.js b/src/wire.js new file mode 100644 index 0000000..d2232ce --- /dev/null +++ b/src/wire.js @@ -0,0 +1,247 @@ +'use strict'; + +var stream = require('stream'); +var crypto = require('crypto'); +var util = require('util'); + +var BitField = require('bitfield'); +var bencode = require('bencode'); + +var {Node} = require('./table'); + +var BT_RESERVED = new Buffer([0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x01]); +var BT_PROTOCOL = new Buffer('BitTorrent protocol'); +var PIECE_LENGTH = Math.pow(2, 14); +var MAX_METADATA_SIZE = 10000000; +var BITFIELD_GROW = 1000; +var EXT_HANDSHAKE_ID = 0; +var BT_MSG_ID = 20; + +var Wire = function(infohash) { + stream.Duplex.call(this); + + this._bitfield = new BitField(0, { grow: BITFIELD_GROW }); + this._infohash = infohash; + + this._buffer = []; + this._bufferSize = 0; + + this._next = null; + this._nextSize = 0; + + this._metadata = null; + this._metadataSize = null; + this._numPieces = 0; + this._ut_metadata = null; + + this._onHandshake(); +} + +util.inherits(Wire, stream.Duplex); + +Wire.prototype._onMessageLength = function (buffer) { + if (buffer.length >= 4) { + var length = buffer.readUInt32BE(0); + if (length > 0) { + this._register(length, this._onMessage) + } + } +}; + +Wire.prototype._onMessage = function (buffer) { + this._register(4, this._onMessageLength) + if (buffer[0] == BT_MSG_ID) { + this._onExtended(buffer.readUInt8(1), buffer.slice(2)); + } +}; + +Wire.prototype._onExtended = function(ext, buf) { + if (ext === 0) { + try { + this._onExtHandshake(bencode.decode(buf)); + } + catch (err) { + this._fail(); + } + } + else { + this._onPiece(buf); + } +}; + +Wire.prototype._register = function (size, next) { + this._nextSize = size; + this._next = next; +}; + +Wire.prototype.end = function() { + stream.Duplex.prototype.end.apply(this, arguments); +}; + +Wire.prototype._onHandshake = function() { + this._register(1, function(buffer) { + if (buffer.length == 0) { + this.end(); + return this._fail(); + } + var pstrlen = buffer.readUInt8(0); + this._register(pstrlen + 48, function(handshake) { + var protocol = handshake.slice(0, pstrlen); + if (protocol.toString() !== BT_PROTOCOL.toString()) { + this.end(); + this._fail(); + return; + } + handshake = handshake.slice(pstrlen); + if ( !!(handshake[5] & 0x10) ) { + this._register(4, this._onMessageLength); + this._sendExtHandshake(); + } + else { + this._fail(); + } + }.bind(this)); + }.bind(this)); +}; + +Wire.prototype._onExtHandshake = function(extHandshake) { + if (!extHandshake.metadata_size || !extHandshake.m.ut_metadata + || extHandshake.metadata_size > MAX_METADATA_SIZE) { + this._fail(); + return; + } + + this._metadataSize = extHandshake.metadata_size; + this._numPieces = Math.ceil(this._metadataSize / PIECE_LENGTH); + this._ut_metadata = extHandshake.m.ut_metadata; + + this._requestPieces(); +} + +Wire.prototype._requestPieces = function() { + this._metadata = new Buffer(this._metadataSize); + for (var piece = 0; piece < this._numPieces; piece++) { + this._requestPiece(piece); + } +}; + +Wire.prototype._requestPiece = function(piece) { + var msg = Buffer.concat([ + new Buffer([BT_MSG_ID]), + new Buffer([this._ut_metadata]), + bencode.encode({msg_type: 0, piece: piece}) + ]); + this._sendMessage(msg); +}; + +Wire.prototype._sendPacket = function(packet) { + this.push(packet); +}; + +Wire.prototype._sendMessage = function(msg) { + var buf = new Buffer(4); + buf.writeUInt32BE(msg.length, 0); + this._sendPacket(Buffer.concat([buf, msg])); +}; + +Wire.prototype.sendHandshake = function() { + var peerID = Node.generateID(); + var packet = Buffer.concat([ + new Buffer([BT_PROTOCOL.length]), + BT_PROTOCOL, BT_RESERVED, this._infohash, peerID + ]); + this._sendPacket(packet); +}; + +Wire.prototype._sendExtHandshake = function() { + var msg = Buffer.concat([ + new Buffer([BT_MSG_ID]), + new Buffer([EXT_HANDSHAKE_ID]), + bencode.encode({m: {ut_metadata: 1}}) + ]); + this._sendMessage(msg); +}; + +Wire.prototype._onPiece = function(piece) { + var dict, trailer; + try { + var str = piece.toString(); + var trailerIndex = str.indexOf('ee') + 2; + dict = bencode.decode(str.substring(0, trailerIndex)); + trailer = piece.slice(trailerIndex); + } + catch (err) { + this._fail(); + return; + } + if (dict.msg_type != 1) { + this._fail(); + return; + } + if (trailer.length > PIECE_LENGTH) { + this._fail(); + return; + } + trailer.copy(this._metadata, dict.piece * PIECE_LENGTH); + this._bitfield.set(dict.piece); + this._checkDone(); +}; + +Wire.prototype._checkDone = function () { + var done = true; + for (var piece = 0; piece < this._numPieces; piece++) { + if (!this._bitfield.get(piece)) { + done = false; + break; + } + } + if (!done) { + return + } + this._onDone(this._metadata); +}; + +Wire.prototype._onDone = function(metadata) { + try { + var info = bencode.decode(metadata).info; + if (info) { + metadata = bencode.encode(info); + } + } + catch (err) { + this._fail(); + return; + } + var infohash = crypto.createHash('sha1').update(metadata).digest('hex'); + if (this._infohash.toString('hex') != infohash ) { + this._fail(); + return false; + } + this.emit('metadata', {info: bencode.decode(metadata, 'utf8')}, this._infohash); +}; + +Wire.prototype._fail = function() { + this.emit('fail'); +}; + +Wire.prototype._write = function (buf, encoding, next) { + this._bufferSize += buf.length; + this._buffer.push(buf); + + while (this._bufferSize >= this._nextSize) { + var buffer = Buffer.concat(this._buffer); + this._bufferSize -= this._nextSize; + this._buffer = this._bufferSize + ? [buffer.slice(this._nextSize)] + : []; + this._next(buffer.slice(0, this._nextSize)); + } + + next(null); +} + +Wire.prototype._read = function() { + // do nothing +}; + +module.exports = Wire;