diff options
Diffstat (limited to 'src/libktorrent/kademlia/dht.cpp')
| -rw-r--r-- | src/libktorrent/kademlia/dht.cpp | 378 | 
1 files changed, 378 insertions, 0 deletions
diff --git a/src/libktorrent/kademlia/dht.cpp b/src/libktorrent/kademlia/dht.cpp new file mode 100644 index 0000000..ff14340 --- /dev/null +++ b/src/libktorrent/kademlia/dht.cpp @@ -0,0 +1,378 @@ +/*************************************************************************** + *   Copyright (C) 2005 by Joris Guisson                                   * + *   joris.guisson@gmail.com                                               * + *                                                                         * + *   This program is free software; you can redistribute it and/or modify  * + *   it under the terms of the GNU General Public License as published by  * + *   the Free Software Foundation; either version 2 of the License, or     * + *   (at your option) any later version.                                   * + *                                                                         * + *   This program is distributed in the hope that it will be useful,       * + *   but WITHOUT ANY WARRANTY; without even the implied warranty of        * + *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         * + *   GNU General Public License for more details.                          * + *                                                                         * + *   You should have received a copy of the GNU General Public License     * + *   along with this program; if not, write to the                         * + *   Free Software Foundation, Inc.,                                       * + *   51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.             * + ***************************************************************************/ +#include <tqmap.h> +#include <kresolver.h> +#include <util/log.h> +#include <util/array.h> +#include <util/functions.h> +#include <torrent/bnode.h> +#include <torrent/globals.h> +#include <tdesocketaddress.h> +#include "announcetask.h" +#include "dht.h" +#include "node.h" +#include "rpcserver.h" +#include "rpcmsg.h" +#include "kclosestnodessearch.h" +#include "database.h" +#include "taskmanager.h" +#include "nodelookup.h" + + +using namespace bt; +using namespace KNetwork; + +namespace dht +{ +	 + + +	DHT::DHT() : node(0),srv(0),db(0),tman(0) +	{ +		connect(&update_timer,TQ_SIGNAL(timeout()),this,TQ_SLOT(update())); +	} + + +	DHT::~DHT() +	{ +		if (running) +			stop(); +	} +	 +	void DHT::start(const TQString & table,const TQString & key_file,bt::Uint16 port) +	{ +		if (running) +			return; +		 +		if (port == 0) +			port = 6881;  +		 +		table_file = table; +		this->port = port; +		Out(SYS_DHT|LOG_NOTICE) << "DHT: Starting on port " << port << endl; +		srv = new RPCServer(this,port); +		node = new Node(srv,key_file); +		db = new Database(); +		tman = new TaskManager(); +		expire_timer.update(); +		running = true; +		srv->start(); +		node->loadTable(table); +		update_timer.start(1000); +		started(); +	} +		 +		 +	void DHT::stop() +	{ +		if (!running) +			return; +		 +		update_timer.stop(); +		Out(SYS_DHT|LOG_NOTICE) << "DHT: Stopping " << endl; +		srv->stop(); +		node->saveTable(table_file); +		running = false; +		stopped(); +		delete tman; tman = 0; +		delete db; db = 0; +		delete node; node = 0; +		delete srv; srv = 0; +	} + +	void DHT::ping(PingReq* r) +	{ +		if (!running) +			return; +		 +		// ignore requests we get from ourself +		if (r->getID() == node->getOurID()) +			return; +		 +		Out(SYS_DHT|LOG_NOTICE) << "DHT: Sending ping response" << endl; +		PingRsp rsp(r->getMTID(),node->getOurID()); +		rsp.setOrigin(r->getOrigin()); +		srv->sendMsg(&rsp); +		node->recieved(this,r); +	} +	 +	 +	 +	void DHT::findNode(FindNodeReq* r) +	{ +		if (!running) +			return; +		 +		// ignore requests we get from ourself +		if (r->getID() == node->getOurID()) +			return; +		 +		Out(SYS_DHT|LOG_DEBUG) << "DHT: got findNode request" << endl; +		node->recieved(this,r); +		// find the K closest nodes and pack them +		KClosestNodesSearch kns(r->getTarget(),K); +		 +		node->findKClosestNodes(kns); +		 +		Uint32 rs = kns.requiredSpace(); +		// create the data +		TQByteArray nodes(rs); +		// pack the found nodes in a byte array +		if (rs > 0) +			kns.pack(nodes); +		 +		FindNodeRsp fnr(r->getMTID(),node->getOurID(),nodes); +		fnr.setOrigin(r->getOrigin()); +		srv->sendMsg(&fnr); +	} + +	 +	void DHT::announce(AnnounceReq* r) +	{ +		if (!running) +			return; +		 +		// ignore requests we get from ourself +		if (r->getID() == node->getOurID()) +			return; +		 +		Out(SYS_DHT|LOG_DEBUG) << "DHT: got announce request" << endl; +		node->recieved(this,r); +		// first check if the token is OK +		dht::Key token = r->getToken(); +		if (!db->checkToken(token,r->getOrigin().ipAddress().IPv4Addr(),r->getOrigin().port())) +			return; +		 +		// everything OK, so store the value +		Uint8 tdata[6]; +		bt::WriteUint32(tdata,0,r->getOrigin().ipAddress().IPv4Addr()); +		bt::WriteUint16(tdata,4,r->getPort()); +		db->store(r->getInfoHash(),DBItem(tdata)); +		// send a proper response to indicate everything is OK +		AnnounceRsp rsp(r->getMTID(),node->getOurID()); +		rsp.setOrigin(r->getOrigin()); +		srv->sendMsg(&rsp); +	} +	 +	 +	 +	void DHT::getPeers(GetPeersReq* r) +	{ +		if (!running) +			return; +		 +		// ignore requests we get from ourself +		if (r->getID() == node->getOurID()) +			return; +		 +		Out(SYS_DHT|LOG_DEBUG) << "DHT: got getPeers request" << endl; +		node->recieved(this,r); +		DBItemList dbl; +		db->sample(r->getInfoHash(),dbl,50); +		 +		// generate a token +		dht::Key token = db->genToken(r->getOrigin().ipAddress().IPv4Addr(),r->getOrigin().port()); +		 +		if (dbl.count() == 0) +		{ +			// if data is null do the same as when we have a findNode request +			// find the K closest nodes and pack them +			KClosestNodesSearch kns(r->getInfoHash(),K); +			node->findKClosestNodes(kns); +			Uint32 rs = kns.requiredSpace(); +			// create the data +			TQByteArray nodes(rs); +			// pack the found nodes in a byte array +			if (rs > 0) +				kns.pack(nodes); +		 +			GetPeersRsp fnr(r->getMTID(),node->getOurID(),nodes,token); +			fnr.setOrigin(r->getOrigin()); +			srv->sendMsg(&fnr); +		} +		else +		{			 +			// send a get peers response +			GetPeersRsp fvr(r->getMTID(),node->getOurID(),dbl,token); +			fvr.setOrigin(r->getOrigin()); +			srv->sendMsg(&fvr); +		} +	} +	 +	void DHT::response(MsgBase* r) +	{ +		if (!running) +			return; +		 +		node->recieved(this,r); +	} +	 +	void DHT::error(ErrMsg* ) +	{} +	 + +	void DHT::portRecieved(const TQString & ip,bt::Uint16 port) +	{ +		if (!running) +			return; +		 +		Out(SYS_DHT|LOG_DEBUG) << "Sending ping request to " << ip << ":" << port << endl; +		PingReq* r = new PingReq(node->getOurID()); +		r->setOrigin(KInetSocketAddress(ip,port)); +		srv->doCall(r); +	} +	 +	bool DHT::canStartTask() const +	{ +		// we can start a task if we have less then  7 runnning and +		// there are at least 16 RPC slots available +		if (tman->getNumTasks() >= 7) +			return false; +		else if (256 - srv->getNumActiveRPCCalls() <= 16) +			return false; +		 +		return true;	 +	} +	 +	AnnounceTask* DHT::announce(const bt::SHA1Hash & info_hash,bt::Uint16 port) +	{ +		if (!running) +			return 0; +		 +		KClosestNodesSearch kns(info_hash,K); +		node->findKClosestNodes(kns); +		if (kns.getNumEntries() > 0) +		{ +			Out(SYS_DHT|LOG_NOTICE) << "DHT: Doing announce " << endl; +			AnnounceTask* at = new AnnounceTask(db,srv,node,info_hash,port); +			at->start(kns,!canStartTask()); +			tman->addTask(at); +			if (!db->contains(info_hash)) +				db->insert(info_hash); +			return at; +		} +		 +		return 0; +	} +	 +	NodeLookup* DHT::refreshBucket(const dht::Key & id,KBucket & bucket) +	{ +		if (!running) +			return 0; +		 +		KClosestNodesSearch kns(id,K); +		bucket.findKClosestNodes(kns); +		bucket.updateRefreshTimer(); +		if (kns.getNumEntries() > 0) +		{ +			Out(SYS_DHT|LOG_DEBUG) << "DHT: refreshing bucket " << endl; +			NodeLookup* nl = new NodeLookup(id,srv,node); +			nl->start(kns,!canStartTask()); +			tman->addTask(nl); +			return nl; +		} +		 +		return 0; +	} +	 +	NodeLookup* DHT::findNode(const dht::Key & id) +	{ +		if (!running) +			return 0; +		 +		KClosestNodesSearch kns(id,K); +		node->findKClosestNodes(kns); +		if (kns.getNumEntries() > 0) +		{ +			Out(SYS_DHT|LOG_DEBUG) << "DHT: finding node " << endl; +			NodeLookup* at = new NodeLookup(id,srv,node); +			at->start(kns,!canStartTask()); +			tman->addTask(at); +			return at; +		} +		 +		return 0; +	} +	 +	void DHT::update() +	{ +		if (!running) +			return; +		 +		if (expire_timer.getElapsedSinceUpdate() > 5*60*1000) +		{ +			db->expire(bt::GetCurrentTime()); +			expire_timer.update(); +		} +		 +		node->refreshBuckets(this); +		tman->removeFinishedTasks(this); +		stats.num_tasks = tman->getNumTasks() + tman->getNumQueuedTasks(); +		stats.num_peers = node->getNumEntriesInRoutingTable(); +	} +	 +	void DHT::timeout(const MsgBase* r) +	{ +		node->onTimeout(r); +	} +	 +	void DHT::addDHTNode(const TQString & host,Uint16 hport) +	{ +		if (!running) +			return; +		 +		KResolverResults res = KResolver::resolve(host,TQString::number(hport)); +		if (res.count() > 0) +		{ +			srv->ping(node->getOurID(),res.front().address()); +		} +	} +	 +	TQMap<TQString, int> DHT::getClosestGoodNodes(int maxNodes) +	{ +		TQMap<TQString, int> map; +		 +		if(!node) +			return map; +		 +		int max = 0; +		KClosestNodesSearch kns(node->getOurID(), maxNodes*2); +		node->findKClosestNodes(kns); +		 +		KClosestNodesSearch::Itr it; +		for(it = kns.begin(); it != kns.end(); ++it) +		{ +			KBucketEntry e = it->second; +			 +			if(!e.isGood()) +				continue; +			 +			KInetSocketAddress a = e.getAddress(); +			 +			map.insert(a.ipAddress().toString(), a.port()); +			if(++max >= maxNodes) +				break; +		} +		 +		return map; +	} +} + +#include "dht.moc"  | 
