some transfer fixes

This commit is contained in:
Alexey Kasyanchuk 2018-09-01 05:08:05 +03:00
parent cdfed5ae38
commit 3699fbb549
2 changed files with 50 additions and 8 deletions

View File

@ -8,6 +8,7 @@ const isPortReachable = require('./isPortReachable')
const EventEmitter = require('events'); const EventEmitter = require('events');
const _ = require('lodash') const _ = require('lodash')
const fs = require('fs') const fs = require('fs')
const path = require('path')
class p2p { class p2p {
constructor(send = () => {}) constructor(send = () => {})
@ -133,14 +134,35 @@ class p2p {
}); });
this.on('file', ({path}, callback) => { this.on('file', ({path}, callback) => {
const readable = new fs.ReadStream(path) if(!this.dataDirectory)
{
logTE('transfer', 'no data directory')
return
}
const filePath = path.resolve(this.dataDirectory + '/' + path)
if(!filePath.includes(this.dataDirectory))
{
logTE('transfer', 'file get must be from data dir')
return
}
if(!fs.existsSync(filePath))
{
logTE('transfer', 'no such file or directory', filePath)
return
}
let readable = new fs.ReadStream(filePath)
logT('transfer', 'server transfer file', path) logT('transfer', 'server transfer file', path)
readable.on('data', (chunk) => { readable.on('data', (chunk) => {
console.log('chunk', chunk.length)
callback({data: chunk}) callback({data: chunk})
}); });
readable.on('end', () => { readable.on('end', () => {
logT('transfer', 'server finish transfer file', path) logT('transfer', 'server finish transfer file', path)
callback(undefined) callback(undefined)
readable = null
}); });
}) })
} }
@ -370,19 +392,34 @@ class p2p {
socket.connect(address.port, address.address); socket.connect(address.port, address.address);
} }
emit(type, data, callback) emit(type, data, callback, callbackPermanent)
{ {
const callbacks = []
for(const peer of this.peers) for(const peer of this.peers)
{ {
if(peer.emit) if(peer.emit)
peer.emit(type, data, callback) callbacks.push(peer.emit(type, data, callback, callbackPermanent))
} }
return () => callbacks.forEach(callback => callback())
} }
file(peer, path) file(path, targetPath, remotePeer)
{ {
const fileStream = fs.createWriteStream(path) if(!this.dataDirectory)
let deleteCallback = peer.emit('file', {path}, (chunk) => { {
logTE('transfer', 'no data directory')
return
}
const fileStream = fs.createWriteStream(this.dataDirectory + '/' + (targetPath || path.basename(path)))
let peer = null
let deleteCallback = (remotePeer || this).emit('file', {path}, (chunk, nil, addr) => {
if(peer && addr !== peer)
{
logT('transfer', 'ignore other peers responce', addr.peerId)
return
}
if(!chunk) if(!chunk)
{ {
logT('transfer', 'closing transfering file stream', path) logT('transfer', 'closing transfering file stream', path)
@ -392,7 +429,7 @@ class p2p {
} }
const {data} = chunk const {data} = chunk
if(!data) if(!data || data.type !== 'Buffer')
{ {
logTE('transfer', 'error on file transfer', path) logTE('transfer', 'error on file transfer', path)
deleteCallback() deleteCallback()
@ -400,7 +437,11 @@ class p2p {
return return
} }
fileStream.write(data) // make sure no othe peer will recive data
peer = addr
const buffer = Buffer.from(data.data)
fileStream.write(buffer)
}, true) // dont clear callback }, true) // dont clear callback
} }

View File

@ -66,6 +66,7 @@ module.exports = function (send, recive, dataDirectory, version, env)
const p2p = new P2PServer(send) const p2p = new P2PServer(send)
p2p.version = version p2p.version = version
p2p.encryptor = encryptor p2p.encryptor = encryptor
p2p.dataDirectory = dataDirectory // make file transfer work
p2p.listen() p2p.listen()
const p2pStore = new P2PStore(p2p, sphinx) const p2pStore = new P2PStore(p2p, sphinx)