fix(vote): proper vote data sequencing

This commit is contained in:
Alexey Kasyanchuk
2018-04-18 18:54:56 +03:00
parent 5e02b0df73
commit f23310065d
6 changed files with 175 additions and 205 deletions

View File

@ -20,7 +20,6 @@ import LinearProgress from 'material-ui/LinearProgress';
import FlatButton from 'material-ui/FlatButton';
import {fileTypeDetect} from './content'
import {contentIcon} from './torrent'
import waitObject from './waitObject'
let buildFilesTree = (filesList) => {
let rootTree = {
@ -231,8 +230,8 @@ export default class TorrentPage extends Page {
if(this.props.hash != hash)
return;
// in some cases torrent object can be resolve late
await waitObject(this, 'torrent')
if(!this.torrent)
return
this.torrent.good = good;
this.torrent.bad = bad;

View File

@ -1,15 +0,0 @@
export default (obj, val) => new Promise((resolve) => {
if(typeof obj[val] === 'object')
{
resolve()
return
}
const w = setInterval(() => {
if(typeof obj[val] === 'object')
{
clearInterval(w)
resolve()
}
}, 3)
})

View File

@ -106,14 +106,12 @@ module.exports = ({
if(options.files)
{
sphinx.query('SELECT * FROM `files` WHERE `hash` = ? LIMIT 50000', hash, function (error, rows, fields) {
torrent.filesList = rows;
callback(baseRowData(torrent))
});
torrent.filesList = await sphinx.query('SELECT * FROM `files` WHERE `hash` = ? LIMIT 50000', hash);
callback(baseRowData(torrent))
}
else
{
callback(baseRowData(torrent))
callback(baseRowData(torrent))
}
if(torrentClientHashMap[hash])

View File

@ -1,5 +1,4 @@
const config = require('./config')
const mysql = require( 'mysql')
const {single} = require('./mysql')
const forBigTable = require('./forBigTable')
const { BrowserWindow } = require("electron");
const url = require('url')
@ -8,45 +7,12 @@ const fs = require('fs')
const currentVersion = 3
module.exports = (callback, mainWindow, sphinxApp) => {
const sphinx = mysql.createConnection({
host : config.sphinx.host,
port : config.sphinx.port
});
const query = (sql) => new Promise((resolve, reject) => {
sphinx.query(sql, (err, res) => {
if(err)
reject(err)
else
resolve(res)
})
})
const insertValues = (table, values, callback) => new Promise((resolve) => {
let names = '';
let data = '';
for(const val in values)
{
if(values[val] === null)
continue;
names += '`' + val + '`,';
data += sphinx.escape(values[val]) + ',';
}
names = names.slice(0, -1)
data = data.slice(0, -1)
let query = `INSERT INTO ${table}(${names}) VALUES(${data})`;
sphinx.query(query, (...responce) => {
if(callback)
callback(...responce)
resolve(...responce)
})
})
module.exports = async (callback, mainWindow, sphinxApp) => {
const sphinx = await single().waitConnection()
const setVersion = async (version) => {
await query(`delete from version where id = 1`)
await query(`insert into version(id, version) values(1, ${version})`)
await sphinx.query(`delete from version where id = 1`)
await sphinx.query(`insert into version(id, version) values(1, ${version})`)
if(sphinxApp)
fs.writeFileSync(`${sphinxApp.directoryPath}/version.vrs`, version)
}
@ -135,8 +101,8 @@ module.exports = (callback, mainWindow, sphinxApp) => {
openPatchWindow()
let i = 1
const torrents = (await query("SELECT COUNT(*) AS c FROM torrents"))[0].c
const files = (await query("SELECT COUNT(*) AS c FROM files"))[0].c
const torrents = (await sphinx.query("SELECT COUNT(*) AS c FROM torrents"))[0].c
const files = (await sphinx.query("SELECT COUNT(*) AS c FROM files"))[0].c
await forBigTable(sphinx, 'torrents', async (torrent) => {
console.log('update index', torrent.id, torrent.name, '[', i, 'of', torrents, ']')
@ -144,8 +110,8 @@ module.exports = (callback, mainWindow, sphinxApp) => {
patchWindow.webContents.send('reindex', {field: torrent.name, index: i++, all: torrents, torrent: true})
torrent.nameIndex = torrent.name
await query(`DELETE FROM torrents WHERE id = ${torrent.id}`)
await insertValues('torrents', torrent)
await sphinx.query(`DELETE FROM torrents WHERE id = ${torrent.id}`)
await sphinx.insertValues('torrents', torrent)
})
i = 1
await forBigTable(sphinx, 'files', async (file) => {
@ -154,8 +120,8 @@ module.exports = (callback, mainWindow, sphinxApp) => {
patchWindow.webContents.send('reindex', {field: file.path, index: i++, all: files})
file.pathIndex = file.path
await query(`DELETE FROM files WHERE id = ${file.id}`)
await insertValues('files', file)
await sphinx.query(`DELETE FROM files WHERE id = ${file.id}`)
await sphinx.insertValues('files', file)
})
await setVersion(2)
@ -167,13 +133,13 @@ module.exports = (callback, mainWindow, sphinxApp) => {
console.log('optimizing torrents')
if(patchWindow)
patchWindow.webContents.send('optimize', {field: 'torrents'})
query(`OPTIMIZE INDEX torrents`)
sphinx.query(`OPTIMIZE INDEX torrents`)
await sphinxApp.waitOptimized('torrents')
console.log('optimizing files')
if(patchWindow)
patchWindow.webContents.send('optimize', {field: 'files'})
query(`OPTIMIZE INDEX files`)
sphinx.query(`OPTIMIZE INDEX files`)
await sphinxApp.waitOptimized('files')
await setVersion(3)
@ -190,53 +156,45 @@ module.exports = (callback, mainWindow, sphinxApp) => {
callback()
}
sphinx.connect(async (mysqlError) => {
if(mysqlError)
// init of db, we can set version to last
if(sphinxApp && sphinxApp.isInitDb)
{
console.log('new db, set version to last version', currentVersion)
await setVersion(currentVersion)
}
sphinx.query('select * from version', async (err, version) => {
if(err)
{
console.log('error on sphinx connecting on db patching', mysqlError)
console.log('error on version get on db patch')
return
}
// init of db, we can set version to last
if(sphinxApp && sphinxApp.isInitDb)
if(!version || !version[0] || !version[0].version)
{
console.log('new db, set version to last version', currentVersion)
await setVersion(currentVersion)
}
sphinx.query('select * from version', async (err, version) => {
if(err)
if(sphinxApp && fs.existsSync(`${sphinxApp.directoryPath}/version.vrs`))
{
console.log('error on version get on db patch')
return
}
if(!version || !version[0] || !version[0].version)
{
if(sphinxApp && fs.existsSync(`${sphinxApp.directoryPath}/version.vrs`))
const ver = parseInt(fs.readFileSync(`${sphinxApp.directoryPath}/version.vrs`))
if(ver > 0)
{
const ver = parseInt(fs.readFileSync(`${sphinxApp.directoryPath}/version.vrs`))
if(ver > 0)
{
console.log('readed version from version.vrs', ver)
patch(ver)
}
else
{
console.log('error: bad version in version.vrs')
}
console.log('readed version from version.vrs', ver)
patch(ver)
}
else
{
console.log('version not founded, set db version to 1')
await setVersion(1)
patch(1)
console.log('error: bad version in version.vrs')
}
}
else
{
patch(version[0].version)
console.log('version not founded, set db version to 1')
await setVersion(1)
patch(1)
}
})
}
else
{
patch(version[0].version)
}
})
}

106
src/background/mysql.js Normal file
View File

@ -0,0 +1,106 @@
const mysql = require('mysql');
const config = require('./config');
const expand = (sphinx) => {
const queryCall = sphinx.query.bind(sphinx)
sphinx.query = (sql, args, callback) => new Promise((resolve, reject) => {
if(typeof args === 'function' || typeof args === 'undefined')
{
queryCall(sql, (err, res) => {
if(err)
reject(err)
else
resolve(res)
if(args)
args(err, res)
})
}
else
{
queryCall(sql, args, (err, res) => {
if(err)
reject(err)
else
resolve(res)
if(callback)
callback(err, res)
})
}
})
sphinx.insertValues = (table, values, callback) => new Promise((resolve) => {
let names = '';
let data = '';
for(const val in values)
{
if(values[val] === null)
continue;
names += '`' + val + '`,';
data += sphinx.escape(values[val]) + ',';
}
names = names.slice(0, -1)
data = data.slice(0, -1)
let query = `INSERT INTO ${table}(${names}) VALUES(${data})`;
queryCall(query, (...responce) => {
if(callback)
callback(...responce)
resolve(...responce)
})
})
return sphinx
}
const pool = () => {
let sphinx = mysql.createPool({
connectionLimit: config.sphinx.connectionLimit,
host : config.sphinx.host,
port : config.sphinx.port
});
return expand(sphinx)
}
let mysqlSingle;
const single = (callback) => {
mysqlSingle = mysql.createConnection({
host : config.sphinx.host,
port : config.sphinx.port
});
let promiseResolve;
const connectionPromise = new Promise((resolve) => {
promiseResolve = resolve
})
mysqlSingle.waitConnection = () => connectionPromise;
mysqlSingle.connect((mysqlError) => {
if (mysqlError) {
console.error('error connecting: ' + mysqlError.stack);
return;
}
if(callback)
callback(mysqlSingle)
promiseResolve(mysqlSingle)
});
mysqlSingle.on('error', (err) => {
console.log('db error', err);
if(err.code === 'PROTOCOL_CONNECTION_LOST') { // Connection to the MySQL server is usually
mysqlSingle = undefined
single(); // lost due to either server restart, or a
} else { // connnection idle timeout (the wait_timeout
throw err; // server variable configures this)
}
});
mysqlSingle = expand(mysqlSingle)
return mysqlSingle
}
module.exports = {pool, single}

View File

@ -2,7 +2,7 @@ const config = require('./config');
const client = new (require('./bt/client'))
const spider = new (require('./bt/spider'))(client)
const fs = require('fs');
const mysql = require('mysql');
const {single, pool} = require('./mysql')
const getPeersStatisticUDP = require('./bt/udp-tracker-request')
const crypto = require('crypto')
const P2PServer = require('./p2p')
@ -39,11 +39,7 @@ module.exports = function (send, recive, dataDirectory, version, env)
let torrentsId = 1;
let filesId = 1;
let sphinx = mysql.createPool({
connectionLimit: config.sphinx.connectionLimit,
host : config.sphinx.host,
port : config.sphinx.port
});
let sphinx = pool();
// initialize p2p
const p2p = new P2PServer(send)
@ -71,109 +67,37 @@ const udpTrackers = [
}
]
let mysqlSingle;
function handleListenerDisconnect() {
mysqlSingle = mysql.createConnection({
host : config.sphinx.host,
port : config.sphinx.port
});
let mysqlSingle = single((mysqlSingle) => {
mysqlSingle.query("SELECT MAX(`id`) as mx from torrents", (err, rows) => {
if(err)
return
mysqlSingle.connect(function(mysqlError) {
if (mysqlError) {
console.error('error connecting: ' + mysqlError.stack);
return;
}
if(rows[0] && rows[0].mx >= 1)
torrentsId = rows[0].mx + 1;
})
mysqlSingle.query("SELECT MAX(`id`) as mx from torrents", (err, rows) => {
if(err)
return
mysqlSingle.query("SELECT COUNT(*) as cnt from torrents", (err, rows) => {
if(err)
return
if(rows[0] && rows[0].mx >= 1)
torrentsId = rows[0].mx + 1;
})
p2p.info.torrents = rows[0].cnt
})
mysqlSingle.query("SELECT COUNT(*) as cnt from torrents", (err, rows) => {
if(err)
return
mysqlSingle.query("SELECT MAX(`id`) as mx from files", (err, rows) => {
if(err)
return
p2p.info.torrents = rows[0].cnt
})
if(rows[0] &&rows[0].mx >= 1)
filesId = rows[0].mx + 1;
})
mysqlSingle.query("SELECT MAX(`id`) as mx from files", (err, rows) => {
if(err)
return
mysqlSingle.query("SELECT COUNT(*) as cnt from files", (err, rows) => {
if(err)
return
if(rows[0] &&rows[0].mx >= 1)
filesId = rows[0].mx + 1;
})
mysqlSingle.query("SELECT COUNT(*) as cnt from files", (err, rows) => {
if(err)
return
p2p.info.files = rows[0].cnt
})
});
mysqlSingle.on('error', function(err) {
console.log('db error', err);
if(err.code === 'PROTOCOL_CONNECTION_LOST') { // Connection to the MySQL server is usually
handleListenerDisconnect(); // lost due to either server restart, or a
} else { // connnection idle timeout (the wait_timeout
throw err; // server variable configures this)
}
});
const query = mysqlSingle.query;
mysqlSingle.query = (...args) => {
let callback, i;
for(i = 1; i < args.length; i++)
{
if(typeof args[i] == 'function')
{
callback = args[i];
break;
}
}
if(callback)
{
pushDatabaseBalance();
args[i] = (...a) => {
popDatabaseBalance();
callback(...a)
}
}
else if(args.length <= 2)
{
pushDatabaseBalance();
args.push(() => {
popDatabaseBalance();
});
}
query.apply(mysqlSingle, args)
}
mysqlSingle.insertValues = (table, values, callback) => {
let names = '';
let data = '';
for(const val in values)
{
if(values[val] === null)
continue;
names += '`' + val + '`,';
data += mysqlSingle.escape(values[val]) + ',';
}
names = names.slice(0, -1)
data = data.slice(0, -1)
let query = `INSERT INTO ${table}(${names}) VALUES(${data})`;
if(callback)
return mysqlSingle.query(query, (...responce) => callback(...responce))
else
return mysqlSingle.query(query)
}
}
handleListenerDisconnect();
p2p.info.files = rows[0].cnt
})
});
/*
app.use(express.static('build', {index: false}));