feat(p2p): file transfer p2p feature

This commit is contained in:
Alexey Kasyanchuk 2018-08-31 06:14:25 +03:00
parent 2773db3949
commit 3e6ac4c00d
3 changed files with 54 additions and 11 deletions

View File

@ -5,6 +5,7 @@ const getTorrent = require('./gettorrent')
const _ = require('lodash') const _ = require('lodash')
const asyncForEach = require('./asyncForEach') const asyncForEach = require('./asyncForEach')
const cpuUsage = require('./bt/cpu-usage-global') const cpuUsage = require('./bt/cpu-usage-global')
const magnetParse = require('./magnetParse')
module.exports = async ({ module.exports = async ({
sphinx, sphinx,
@ -291,15 +292,6 @@ module.exports = async ({
return /[0-9a-f]+/i.test(hash) return /[0-9a-f]+/i.test(hash)
} }
const magnetParse = (magnet) => {
const match = /magnet:\?xt=urn:btih:([0-9a-f]+)/i.exec(magnet)
if(!match)
return
if(match[1].length === 40)
return match[1].toLowerCase()
return
}
const searchTorrentCall = function(text, navigation, callback, isP2P) const searchTorrentCall = function(text, navigation, callback, isP2P)
{ {
if(typeof callback != 'function') if(typeof callback != 'function')

View File

@ -0,0 +1,8 @@
module.exports = (magnet) => {
const match = /magnet:\?xt=urn:btih:([0-9a-f]+)/i.exec(magnet)
if(!match)
return
if(match[1].length === 40)
return match[1].toLowerCase()
return
}

View File

@ -130,6 +130,18 @@ class p2p {
++alias; ++alias;
}); });
}); });
this.on('file', (path, callback) => {
const readable = new fs.ReadStream(path)
logT('transfer', 'server transfer file', path)
readable.on('data', (chunk) => {
callback({data: chunk})
});
readable.on('end', () => {
logT('transfer', 'server finish transfer file', path)
callback(undefined)
});
})
} }
listen() { listen() {
@ -261,23 +273,29 @@ class p2p {
const socket = new JsonSocket(rawSocket); //Decorate a standard net.Socket with JsonSocket const socket = new JsonSocket(rawSocket); //Decorate a standard net.Socket with JsonSocket
socket.on('connect', () => { //Don't send until we're connected socket.on('connect', () => { //Don't send until we're connected
const callbacks = {} const callbacks = {}
const callbacksPermanent = {}
socket.on('message', (message) => { socket.on('message', (message) => {
if(message.id && callbacks[message.id]) if(message.id && callbacks[message.id])
{ {
callbacks[message.id](message.data, socket, address); callbacks[message.id](message.data, socket, address);
if(!callbacksPermanent[message.id])
delete callbacks[message.id]; delete callbacks[message.id];
} }
}); });
const emit = (type, data, callback) => { const emit = (type, data, callback, callbackPermanent) => {
const id = Math.random().toString(36).substring(5) const id = Math.random().toString(36).substring(5)
if(callback) if(callback)
callbacks[id] = callback; callbacks[id] = callback;
if(callback && callbackPermanent)
callbacksPermanent[id] = true // dont delete callback on message
socket.sendMessage({ socket.sendMessage({
id, id,
type, type,
data data
}); });
return () => delete callbacks[id];
} }
// check protocol // check protocol
@ -360,6 +378,31 @@ class p2p {
} }
} }
file(peer, path)
{
const fileStream = fs.createWriteStream(path)
let deleteCallback = peer.emit('file', path, (chunk) => {
if(!chunk)
{
logT('transfer', 'closing transfering file stream', path)
deleteCallback()
fileStream.end()
return
}
const {data} = chunk
if(!data)
{
logTE('transfer', 'error on file transfer', path)
deleteCallback()
fileStream.end()
return
}
fileStream.write(data)
}, true) // dont clear callback
}
peersList() peersList()
{ {
return this.peers.filter(peer => !!peer.emit) return this.peers.filter(peer => !!peer.emit)