балансировка нагрузки на базу

This commit is contained in:
Alexey Kasyanchuk 2017-01-02 12:54:50 +03:00
parent 85836c5ff6
commit c766b2e5e8
2 changed files with 37 additions and 12 deletions

View File

@ -95,7 +95,7 @@ socketMysql.connect(function(mysqlError) {
console.log(text); console.log(text);
let q = 2; let q = 2;
socketMysql.query('SELECT * FROM `torrents` WHERE MATCH(`name`) AGAINST(?)', text, function (error, rows, fields) { socketMysql.query('SELECT * FROM `torrents` WHERE MATCH(`name`) AGAINST(?) LIMIT 10', text, function (error, rows, fields) {
rows.forEach((row) => { rows.forEach((row) => {
search[row.hash] = baseRowData(row); search[row.hash] = baseRowData(row);
}); });
@ -104,7 +104,7 @@ socketMysql.connect(function(mysqlError) {
return search[key]; return search[key];
})); }));
}); });
socketMysql.query('SELECT * FROM `files` INNER JOIN torrents ON(torrents.hash = files.hash) WHERE MATCH(`path`) AGAINST(?)', text, function (error, rows, fields) { socketMysql.query('SELECT * FROM `files` INNER JOIN torrents ON(torrents.hash = files.hash) WHERE MATCH(`path`) AGAINST(?) LIMIT 10', text, function (error, rows, fields) {
rows.forEach((row) => { rows.forEach((row) => {
search[row.hash] = baseRowData(row); search[row.hash] = baseRowData(row);
search[row.hash].path = row.path; search[row.hash].path = row.path;
@ -125,9 +125,19 @@ listenerMysql.connect(function(err) {
return; return;
} }
//spider.on('ensureHash', (hash, addr)=> { let undoneQueries = 0;
// console.log('new hash'); let checkDatabaseBalance = () => {
//}) if(undoneQueries >= 5000)
{
console.log('too much freeze mysql connection. doing balance');
spider.ignore = true;
}
else if(undoneQueries == 0)
{
console.log('all connections done, continue');
spider.ignore = false;
}
};
client.on('complete', function (metadata, infohash, rinfo) { client.on('complete', function (metadata, infohash, rinfo) {
console.log('writing torrent to db'); console.log('writing torrent to db');
@ -139,8 +149,10 @@ listenerMysql.connect(function(err) {
filesCount = metadata.info.files.length; filesCount = metadata.info.files.length;
size = 0; size = 0;
undoneQueries++;
listenerMysql.query('DELETE FROM files WHERE hash = ?', hash, function (err, result) { listenerMysql.query('DELETE FROM files WHERE hash = ?', hash, function (err, result) {
undoneQueries--;
checkDatabaseBalance();
}) })
for(let i = 0; i < metadata.info.files.length; i++) for(let i = 0; i < metadata.info.files.length; i++)
{ {
@ -151,7 +163,10 @@ listenerMysql.connect(function(err) {
path: filePath, path: filePath,
size: file.length, size: file.length,
}; };
undoneQueries++;
let query = listenerMysql.query('INSERT INTO files SET ?', fileQ, function(err, result) { let query = listenerMysql.query('INSERT INTO files SET ?', fileQ, function(err, result) {
undoneQueries--;
checkDatabaseBalance();
if(!result) { if(!result) {
console.log(fileQ); console.log(fileQ);
console.error(err); console.error(err);
@ -168,7 +183,10 @@ listenerMysql.connect(function(err) {
path: metadata.info.name, path: metadata.info.name,
size: size, size: size,
}; };
undoneQueries++;
let query = listenerMysql.query('INSERT INTO files SET ?', fileQ, function(err, result) { let query = listenerMysql.query('INSERT INTO files SET ?', fileQ, function(err, result) {
undoneQueries--;
checkDatabaseBalance();
if(!result) { if(!result) {
console.log(fileQ); console.log(fileQ);
console.error(err); console.error(err);
@ -185,7 +203,10 @@ listenerMysql.connect(function(err) {
ipv4: rinfo.address, ipv4: rinfo.address,
port: rinfo.port port: rinfo.port
}; };
undoneQueries++;
var query = listenerMysql.query('INSERT INTO torrents SET ? ON DUPLICATE KEY UPDATE hash=hash', torrentQ, function(err, result) { var query = listenerMysql.query('INSERT INTO torrents SET ? ON DUPLICATE KEY UPDATE hash=hash', torrentQ, function(err, result) {
undoneQueries--;
checkDatabaseBalance();
if(result) { if(result) {
io.sockets.emit('newTorrent', { io.sockets.emit('newTorrent', {
hash: hash, hash: hash,
@ -205,5 +226,5 @@ listenerMysql.connect(function(err) {
// spider.on('nodes', (nodes)=>console.log('foundNodes')) // spider.on('nodes', (nodes)=>console.log('foundNodes'))
//spider.listen(4445) spider.listen(4445)
}); });

View File

@ -37,6 +37,7 @@ class Spider extends Emiter {
this.bootstraps = options.bootstraps || bootstraps this.bootstraps = options.bootstraps || bootstraps
this.token = new Token() this.token = new Token()
this.client = client this.client = client
this.ignore = false; // ignore all requests
this.walkInterval = 5; this.walkInterval = 5;
} }
@ -67,11 +68,14 @@ class Spider extends Emiter {
walk() { walk() {
if(!this.client || this.client.isIdle()) { if(!this.client || this.client.isIdle()) {
if(!this.ignore)
{
const node = this.table.shift() const node = this.table.shift()
if (node) { if (node) {
this.findNode(Node.neighbor(node.id, this.table.id), {address: node.address, port: node.port}) this.findNode(Node.neighbor(node.id, this.table.id), {address: node.address, port: node.port})
} }
} }
}
setTimeout(()=>this.walk(), this.walkInterval) setTimeout(()=>this.walk(), this.walkInterval)
} }
@ -138,7 +142,7 @@ class Spider extends Emiter {
port: port port: port
}; };
this.emit('ensureHash', infohash.toString('hex').toUpperCase(), addressPair) this.emit('ensureHash', infohash.toString('hex').toUpperCase(), addressPair)
if(this.client) { if(this.client && !this.ignore) {
this.client.add(addressPair, infohash); this.client.add(addressPair, infohash);
} }
} }