feat(peerDB): store on peers feature

This commit is contained in:
Alexey Kasyanchuk
2018-04-13 00:33:11 +03:00
parent 410c827905
commit 5407ef8c59
7 changed files with 174 additions and 56 deletions

5
package-lock.json generated
View File

@ -11897,6 +11897,11 @@
} }
} }
}, },
"object-hash": {
"version": "1.3.0",
"resolved": "https://registry.npmjs.org/object-hash/-/object-hash-1.3.0.tgz",
"integrity": "sha512-05KzQ70lSeGSrZJQXE5wNDiTkBJDlUT/myi6RX9dVIvz7a7Qh4oH93BQdiPMn27nldYvVQCKMUaM83AfizZlsQ=="
},
"object-keys": { "object-keys": {
"version": "0.4.0", "version": "0.4.0",
"resolved": "https://registry.npmjs.org/object-keys/-/object-keys-0.4.0.tgz", "resolved": "https://registry.npmjs.org/object-keys/-/object-keys-0.4.0.tgz",

View File

@ -126,6 +126,7 @@
"moment": "^2.20.1", "moment": "^2.20.1",
"mysql": "^2.15.0", "mysql": "^2.15.0",
"nat-upnp": "^1.1.1", "nat-upnp": "^1.1.1",
"object-hash": "^1.3.0",
"react": "^16.3.1", "react": "^16.3.1",
"react-dom": "^16.3.1", "react-dom": "^16.3.1",
"react-input-range": "^1.3.0", "react-input-range": "^1.3.0",

View File

@ -289,12 +289,6 @@ export default class TorrentPage extends Page {
if(!this.torrent) if(!this.torrent)
return; return;
if(true)
{
alert('Nice try :)! But this feature will be restored in later. Just keep go in other direction :D.')
return
}
this.setState({ this.setState({
voting: true voting: true
}); });

View File

@ -14,7 +14,8 @@ module.exports = ({
crypto, crypto,
insertTorrentToDB, insertTorrentToDB,
removeTorrentFromDB, removeTorrentFromDB,
checkTorrent checkTorrent,
p2pStore
}) => { }) => {
let torrentClientHashMap = {} let torrentClientHashMap = {}
@ -629,20 +630,7 @@ module.exports = ({
}, done) }, done)
}) })
let socketIPV4 = () => { recive('vote', async (hash, isGood, callback) =>
let ip = socket.request.connection.remoteAddress;
if (ipaddr.IPv4.isValid(ip)) {
// all ok
} else if (ipaddr.IPv6.isValid(ip)) {
let ipv6 = ipaddr.IPv6.parse(ip);
if (ipv6.isIPv4MappedAddress()) {
ip = ipv6.toIPv4Address().toString();
}
}
return ip
};
recive('vote', function(hash, isGood, callback)
{ {
if(hash.length != 40) if(hash.length != 40)
return; return;
@ -650,44 +638,35 @@ module.exports = ({
if(typeof callback != 'function') if(typeof callback != 'function')
return; return;
const ip = socketIPV4();
isGood = !!isGood; isGood = !!isGood;
sphinx.query('SELECT * FROM `torrents_actions` WHERE `hash` = ? AND (`action` = \'good\' OR `action` = \'bad\') AND ipv4 = ?', [hash, ip], function (error, rows, fields) {
if(!rows) {
console.error(error);
}
if(rows.length > 0) {
callback(false)
return
}
sphinx.query('SELECT good, bad FROM `torrents` WHERE `hash` = ?', hash, function (error, rows, fields) {
if(!rows || rows.length == 0)
return;
let {good, bad} = rows[0];
const action = isGood ? 'good' : 'bad'; const action = isGood ? 'good' : 'bad';
sphinx.query('INSERT INTO `torrents_actions` SET ?', {hash, action, ipv4: ip}, function(err, result) {
if(!result) { const votes = await p2pStore.find(`vote:${hash}`)
console.error(err); let good = isGood ? 1 : 0
} let bad = !isGood ? 1 : 0
sphinx.query('UPDATE torrents SET ' + action + ' = ' + action + ' + 1 WHERE hash = ?', hash, function(err, result) { if(votes)
if(!result) { {
console.error(err); console.log(votes)
} votes.forEach(({vote}) => {
if(isGood) { if(vote == 'bad')
good++; bad++
} else { else
bad++; good++
})
} }
console.log(bad, good)
p2pStore.store({
type: 'vote',
torrentHash: hash,
vote: action,
_index: `vote:${hash}`
})
send('vote', { send('vote', {
hash, good, bad hash, good, bad
}); });
callback(true) callback(true)
});
});
});
});
}); });
} }

View File

@ -55,6 +55,16 @@ const writeSphinxConfig = (path, dbPath) => {
rt_field = versionIndex rt_field = versionIndex
} }
index store
{
type = rt
path = ${dbPath}/database/store
rt_field = storeIndex
rt_attr_json = data
rt_attr_string = hash
}
searchd searchd
{ {
listen = 9312 listen = 9312

View File

@ -6,6 +6,7 @@ const mysql = require('mysql');
const getPeersStatisticUDP = require('./bt/udp-tracker-request') const getPeersStatisticUDP = require('./bt/udp-tracker-request')
const crypto = require('crypto') const crypto = require('crypto')
const P2PServer = require('./p2p') const P2PServer = require('./p2p')
const P2PStore = require('./store')
const stun = require('stun') const stun = require('stun')
const natUpnp = require('nat-upnp'); const natUpnp = require('nat-upnp');
const http = require('https') const http = require('https')
@ -49,6 +50,7 @@ const p2p = new P2PServer(send)
p2p.version = version p2p.version = version
p2p.encryptor = encryptor p2p.encryptor = encryptor
p2p.listen() p2p.listen()
const p2pStore = new P2PStore(p2p, sphinx)
const udpTrackers = [ const udpTrackers = [
{ {
@ -744,7 +746,8 @@ API({
crypto, crypto,
insertTorrentToDB, insertTorrentToDB,
removeTorrentFromDB, removeTorrentFromDB,
checkTorrent checkTorrent,
p2pStore
}) })
if(config.indexer) { if(config.indexer) {

126
src/background/store.js Normal file
View File

@ -0,0 +1,126 @@
const objectHash = require('object-hash');
module.exports = class P2PStore {
constructor(p2p, sphinx)
{
this.id = 1
console.log('connect p2p store...')
this.p2p = p2p
this.sphinx = sphinx
this.sphinx.query("SELECT MAX(`id`) as mx from store", (err, rows) => {
if(err)
return
if(rows[0] && rows[0].mx >= 1)
this.id = rows[0].mx + 1;
})
this.p2p.on('dbStore', (record, callback) => {
if(!record)
return
if(typeof record !== 'object')
return
if(!record.id)
return
if(record.id <= this.id)
return
// store
this._pushToDb(record)
})
this.p2p.on('dbSync', ({id} = {}, callback) => {
if(!id || this.id <= id)
{
callback(false)
return
}
// back
this.sphinx.query(`select * from store where id >= ${id}`, (err, records) => {
if(err)
{
console.log(err)
return
}
if(records.length > 0)
callback({records})
})
})
}
sync()
{
this.p2p.emit('dbSync', {id: this.id}, (data) => {
if(!data || !data.records)
return
for(const record of data.records)
{
if(!record.id)
return
if(record.id <= this.id)
return
// push to db
this._pushToDb(record)
}
})
}
_pushToDb(value, callback)
{
const data = this.sphinx.escape(JSON.stringify(value.data))
this.sphinx.query(
`insert into store(id, hash, data` + (value.index ? ', storeIndex' : '') + `)
values('${value.id}', '${value.hash}', ${data}` + (value.index ? ',' + this.sphinx.escape(value.index) : '') + ')',
(err) => {
if(err)
{
console.log(err)
return
}
if(callback)
callback()
})
}
store(obj)
{
const value = {
id: this.id++,
hash: objectHash(obj),
data: obj,
index: obj._index
}
this._pushToDb(value, () => {
// store record
this.p2p.emit('dbStore', value)
})
}
find(index)
{
return new Promise((resolve) => {
this.sphinx.query(`select * from store where match(${this.sphinx.escape(index)}) LIMIT 50000`, (err, records) => {
if(err)
{
console.log(err)
resolve(false)
return
}
resolve(records.map(({data}) => JSON.parse(data)))
})
})
}
}