perf(replication): replication thread optimization

This commit is contained in:
Alexey Kasyanchuk 2018-08-05 23:48:31 +03:00
parent 45af9bddea
commit e77775794f
3 changed files with 90 additions and 169 deletions

View File

@ -7,6 +7,7 @@ const asyncForEach = require('./asyncForEach')
module.exports = async ({
sphinx,
sphinxSingle,
send,
recive,
p2p,
@ -208,7 +209,11 @@ module.exports = async ({
if(typeof callback != 'function')
return;
sphinx.query('SELECT * FROM `torrents` ORDER BY rand() limit 5', (error, torrents) => {
// ignore sql requests on closing
if(sphinxSingle.state === 'disconnected')
return
sphinxSingle.query('SELECT * FROM `torrents` ORDER BY rand() limit 5', (error, torrents) => {
if(!torrents || torrents.length == 0) {
callback(undefined)
return;
@ -222,7 +227,7 @@ module.exports = async ({
}
const inSql = Object.keys(hashes).map(hash => sphinx.escape(hash)).join(',');
sphinx.query(`SELECT * FROM files WHERE hash IN(${inSql}) limit 50000`, (error, files) => {
sphinxSingle.query(`SELECT * FROM files WHERE hash IN(${inSql}) limit 50000`, (error, files) => {
if(!files)
{
files = []

View File

@ -122,10 +122,12 @@ const pool = () => {
return expand(sphinx)
}
let mysqlSingle = {
const single = (callback) => {
let mysqlSingle = {
_mysql: null
};
const proxySingle = new Proxy(mysqlSingle, {
};
const proxySingle = new Proxy(mysqlSingle, {
get(target, prop) {
if(!target[prop])
{
@ -136,8 +138,10 @@ const proxySingle = new Proxy(mysqlSingle, {
}
return target[prop]
}
})
const single = (callback) => {
})
const start = () =>
{
mysqlSingle._mysql = mysql.createConnection({
host : config.sphinx.host,
port : config.sphinx.port
@ -164,8 +168,9 @@ const single = (callback) => {
mysqlSingle._mysql.on('error', (err) => {
console.log('db error', err);
if(err.code === 'PROTOCOL_CONNECTION_LOST') { // Connection to the MySQL server is usually
console.log('restart single sql connection')
mysqlSingle._mysql = undefined
single(); // lost due to either server restart, or a
start(); // lost due to either server restart, or a
} else { // connnection idle timeout (the wait_timeout
throw err; // server variable configures this)
}
@ -173,6 +178,9 @@ const single = (callback) => {
mysqlSingle._mysql = expand(mysqlSingle._mysql)
return proxySingle
}
return start()
}
module.exports = {pool, single}

View File

@ -73,114 +73,17 @@ module.exports = function (send, recive, dataDirectory, version, env)
}
]
let mysqlSingle = single((mysqlSingle) => {
mysqlSingle.query("SELECT MAX(`id`) as mx from torrents", (err, rows) => {
if(err)
return
const sphinxSingle = await single().waitConnection()
torrentsId = (await sphinxSingle.query("SELECT MAX(`id`) as mx from torrents"))[0]
torrentsId = ((torrentsId && torrentsId.mx) || 0) + 1
filesId = (await sphinxSingle.query("SELECT MAX(`id`) as mx from files"))[0]
filesId = ((filesId && filesId.mx) || 0) + 1
p2p.info.torrents = (await sphinxSingle.query("SELECT COUNT(*) as cnt from torrents"))[0].cnt
p2p.info.files = (await sphinxSingle.query("SELECT COUNT(*) as cnt from files"))[0].cnt
const sphinxSingleAlternative = await single().waitConnection()
if(rows[0] && rows[0].mx >= 1)
torrentsId = rows[0].mx + 1;
})
mysqlSingle.query("SELECT COUNT(*) as cnt from torrents", (err, rows) => {
if(err)
return
p2p.info.torrents = rows[0].cnt
})
mysqlSingle.query("SELECT MAX(`id`) as mx from files", (err, rows) => {
if(err)
return
if(rows[0] &&rows[0].mx >= 1)
filesId = rows[0].mx + 1;
})
mysqlSingle.query("SELECT COUNT(*) as cnt from files", (err, rows) => {
if(err)
return
p2p.info.files = rows[0].cnt
})
});
/*
app.use(express.static('build', {index: false}));
app.get('/sitemap.xml', function(req, res) {
sphinx.query('SELECT count(*) as cnt FROM `torrents` WHERE contentCategory != \'xxx\' OR contentCategory IS NULL', function (error, rows, fields) {
if(!rows) {
return;
}
let urls = []
for(let i = 0; i < Math.ceil(rows[0].cnt / config.sitemapMaxSize); i++)
urls.push(`http://${config.domain}/sitemap${i+1}.xml`);
res.header('Content-Type', 'application/xml');
res.send( sm.buildSitemapIndex({
urls
}));
});
});
app.get('/sitemap:id.xml', function(req, res) {
if(req.params.id < 1)
return;
let page = (req.params.id - 1) * config.sitemapMaxSize
sphinx.query('SELECT hash FROM `torrents` WHERE contentCategory != \'xxx\' OR contentCategory IS NULL LIMIT ?, ?', [page, config.sitemapMaxSize], function (error, rows, fields) {
if(!rows) {
return;
}
let sitemap = sm.createSitemap ({
hostname: 'http://' + config.domain,
cacheTime: 600000
});
sitemap.add({url: '/'});
for(let i = 0; i < rows.length; i++)
sitemap.add({url: '/torrent/' + rows[i].hash});
sitemap.toXML( function (err, xml) {
if (err) {
return res.status(500).end();
}
res.header('Content-Type', 'application/xml');
res.send( xml );
});
});
});
app.get('*', function(req, res)
{
if(typeof req.query['_escaped_fragment_'] != 'undefined')
{
let program = phantomjs.exec('phantom.js', 'http://' + config.domain + req.path)
let body = '';
let timeout = setTimeout(() => {
program.kill();
}, 45000)
program.stderr.pipe(process.stderr)
program.stdout.on('data', (chunk) => {
body += chunk;
});
program.on('exit', code => {
clearTimeout(timeout);
res.header('Content-Type', 'text/html');
res.send( body );
})
return;
}
res.sendfile(__dirname + '/build/index.html');
});
*/
// start
function baseRowData(row)
{
return {
@ -289,7 +192,7 @@ app.get('*', function(req, res)
const updateTorrentTrackers = (hash) => {
let maxSeeders = 0, maxLeechers = 0, maxCompleted = 0;
mysqlSingle.query('UPDATE torrents SET trackersChecked = ? WHERE hash = ?', [Math.floor(Date.now() / 1000), hash], (err, result) => {
sphinxSingle.query('UPDATE torrents SET trackersChecked = ? WHERE hash = ?', [Math.floor(Date.now() / 1000), hash], (err, result) => {
if(!result) {
console.error(err);
return
@ -320,7 +223,7 @@ app.get('*', function(req, res)
maxCompleted = completed;
let checkTime = new Date();
mysqlSingle.query('UPDATE torrents SET seeders = ?, completed = ?, leechers = ?, trackersChecked = ? WHERE hash = ?', [seeders, completed, leechers, Math.floor(checkTime.getTime() / 1000), hash], function(err, result) {
sphinxSingle.query('UPDATE torrents SET seeders = ?, completed = ?, leechers = ?, trackersChecked = ? WHERE hash = ?', [seeders, completed, leechers, Math.floor(checkTime.getTime() / 1000), hash], function(err, result) {
if(!result) {
console.error(err);
return
@ -352,7 +255,7 @@ app.get('*', function(req, res)
if(free < config.cleanupDiscLimit)
{
mysqlSingle.query(`SELECT * FROM torrents WHERE added < DATE_SUB(NOW(), INTERVAL 6 hour) ORDER BY seeders ASC, files DESC, leechers ASC, completed ASC LIMIT ${cleanTorrents}`, function(err, torrents) {
sphinxSingle.query(`SELECT * FROM torrents WHERE added < DATE_SUB(NOW(), INTERVAL 6 hour) ORDER BY seeders ASC, files DESC, leechers ASC, completed ASC LIMIT ${cleanTorrents}`, function(err, torrents) {
if(!torrents)
return;
@ -364,8 +267,8 @@ app.get('*', function(req, res)
cleanupDebug('cleanup torrent', torrent.name, '[seeders', torrent.seeders, ', files', torrent.files, ']', 'free', (free / (1024 * 1024)) + "mb");
mysqlSingle.query('DELETE FROM files WHERE hash = ?', torrent.hash);
mysqlSingle.query('DELETE FROM torrents WHERE hash = ?', torrent.hash);
sphinxSingle.query('DELETE FROM files WHERE hash = ?', torrent.hash);
sphinxSingle.query('DELETE FROM torrents WHERE hash = ?', torrent.hash);
})
});
}
@ -481,7 +384,7 @@ app.get('*', function(req, res)
torrent.id = torrentsId++;
mysqlSingle.query("SELECT id FROM torrents WHERE hash = ?", torrent.hash, (err, single) => {
sphinxSingle.query("SELECT id FROM torrents WHERE hash = ?", torrent.hash, (err, single) => {
if(!single)
{
console.log(err)
@ -497,7 +400,7 @@ app.get('*', function(req, res)
torrent.nameIndex = torrent.name
mysqlSingle.insertValues('torrents', torrent, function(err, result) {
sphinxSingle.insertValues('torrents', torrent, function(err, result) {
if(result) {
if(!silent)
send('newTorrent', {
@ -521,14 +424,14 @@ app.get('*', function(req, res)
});
})
mysqlSingle.query('SELECT count(*) as files_count FROM files WHERE hash = ?', [torrent.hash], function(err, rows) {
sphinxSingle.query('SELECT count(*) as files_count FROM files WHERE hash = ?', [torrent.hash], function(err, rows) {
if(!rows)
return
const db_files = rows[0]['files_count'];
if(db_files !== torrent.files)
{
mysqlSingle.query('DELETE FROM files WHERE hash = ?', torrent.hash, function (err, result) {
sphinxSingle.query('DELETE FROM files WHERE hash = ?', torrent.hash, function (err, result) {
if(err)
{
return;
@ -539,7 +442,7 @@ app.get('*', function(req, res)
file.pathIndex = file.path;
});
mysqlSingle.insertValues('files', filesList, function(err, result) {
sphinxSingle.insertValues('files', filesList, function(err, result) {
if(!result) {
console.error(err);
return
@ -554,8 +457,8 @@ app.get('*', function(req, res)
const removeTorrentFromDB = async (torrent) => {
const {hash} = torrent
await mysqlSingle.query('DELETE FROM torrents WHERE hash = ?', hash)
await mysqlSingle.query('DELETE FROM files WHERE hash = ?', hash)
await sphinxSingle.query('DELETE FROM torrents WHERE hash = ?', hash)
await sphinxSingle.query('DELETE FROM files WHERE hash = ?', hash)
}
const updateTorrentToDB = async (torrent) => {
@ -571,7 +474,7 @@ app.get('*', function(req, res)
delete torrent.id
delete torrent.filesList
await mysqlSingle.updateValues('torrents', torrent, {hash: torrent.hash})
await sphinxSingle.updateValues('torrents', torrent, {hash: torrent.hash})
}
const insertMetadata = (metadata, infohash, rinfo) => {
@ -774,6 +677,7 @@ app.get('*', function(req, res)
// setup api
await API({
sphinx,
sphinxSingle: sphinxSingleAlternative,
recive,
send,
p2p,
@ -821,6 +725,9 @@ app.get('*', function(req, res)
if(upnp)
upnp.ratsUnmap()
console.log('closing alternative db interface')
await new Promise(resolve => sphinxSingleAlternative.end(resolve))
// save torrents sessions
console.log('save torrents downloads sessions')
torrentClient.saveSession(dataDirectory + '/downloads.json')
@ -905,9 +812,10 @@ app.get('*', function(req, res)
// don't listen complete torrent responses
client.removeAllListeners('complete')
console.log('closing torrent client')
torrentClient.destroy(() => {
sphinx.end(() => spider.close(() => {
mysqlSingle.destroy()
sphinxSingle.destroy()
console.log('spider closed')
callback()
}))