/* * This file is part of the KFTPGrabber project * * Copyright (C) 2003-2006 by the KFTPGrabber developers * Copyright (C) 2003-2006 Jernej Kos * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License as * published by the Free Software Foundation; either version 2 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * is provided AS IS, WITHOUT ANY WARRANTY; without even the implied * warranty of MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, and * NON-INFRINGEMENT. See the GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Steet, Fifth Floor, Boston, * MA 02110-1301, USA. * * In addition, as a special exception, the copyright holders give * permission to link the code of portions of this program with the * OpenSSL library under certain conditions as described in each * individual source file, and distribute linked combinations * including the two. * * You must obey the GNU General Public License in all respects * for all of the code used other than OpenSSL. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you * do not wish to do so, delete this exception statement from your * version. If you delete this exception statement from all source * files in the program, then also delete it here. */ #include "thread.h" #include "ftpsocket.h" #include "sftpsocket.h" #include namespace KFTPEngine { Thread::Thread() : TQThread(), m_eventHandler(new EventHandler(this)), m_socket(0), m_wakeupEvent(0), m_abortLoop(false), m_wakeUpPending(false) { m_protocolMap.insert("ftp", new FtpSocket(this)); m_protocolMap.insert("sftp", new SftpSocket(this)); // FTP is the default protocol m_socket = m_protocolMap["ftp"]; // Auto start the thread start(); } Thread::~Thread() { m_abortLoop = true; if (!wait(1000)) terminate(); // Destroy all protocol sockets delete static_cast(m_protocolMap["ftp"]); delete static_cast(m_protocolMap["sftp"]); m_protocolMap.clear(); } void Thread::run() { while (!m_abortLoop) { TQThread::usleep(100); // "Poll" the socket m_socket->poll(); // Transmit wakeup events if any if (m_wakeUpPending && m_socket->isBusy()) { m_wakeupMutex.lock(); m_socket->wakeup(m_wakeupEvent); delete m_wakeupEvent; m_wakeupEvent = 0; m_wakeUpPending = false; m_wakeupMutex.unlock(); } // Execute any pending commands if the socket isn't busy if (!m_commandQueue.empty()) { m_queueMutex.lock(); TQValueList::iterator queueEnd = m_commandQueue.end(); for (TQValueList::iterator i = m_commandQueue.begin(); i != queueEnd; ++i) { Commands::Type cmdType = *i; // Execute the command if (cmdType == Commands::CmdNext) { m_commandQueue.remove(i--); m_socket->nextCommand(); } else if (!m_socket->isBusy()) { m_commandQueue.remove(i--); m_socket->setCurrentCommand(cmdType); switch (cmdType) { case Commands::CmdConnect: { m_socket->protoConnect(nextCommandParameter().asUrl()); break; } case Commands::CmdDisconnect: { m_socket->protoDisconnect(); break; } case Commands::CmdList: { m_socket->protoList(nextCommandParameter().asUrl()); break; } case Commands::CmdScan: { m_socket->protoScan(nextCommandParameter().asUrl()); break; } case Commands::CmdGet: { m_socket->protoGet(nextCommandParameter().asUrl(), nextCommandParameter().asUrl()); break; } case Commands::CmdPut: { m_socket->protoPut(nextCommandParameter().asUrl(), nextCommandParameter().asUrl()); break; } case Commands::CmdDelete: { m_socket->protoDelete(nextCommandParameter().asUrl()); break; } case Commands::CmdRename: { m_socket->protoRename(nextCommandParameter().asUrl(), nextCommandParameter().asUrl()); break; } case Commands::CmdChmod: { m_socket->protoChmod(nextCommandParameter().asUrl(), nextCommandParameter().asFileSize(), nextCommandParameter().asBoolean()); break; } case Commands::CmdMkdir: { m_socket->protoMkdir(nextCommandParameter().asUrl()); break; } case Commands::CmdRaw: { m_socket->protoRaw(nextCommandParameter().asString()); break; } case Commands::CmdFxp: { m_socket->protoSiteToSite(static_cast(nextCommandParameter().asData()), nextCommandParameter().asUrl(), nextCommandParameter().asUrl()); break; } default: { // Just ignore unknown commands for now break; } } } } m_queueMutex.unlock(); } } } void Thread::wakeup(WakeupEvent *event) { TQMutexLocker locker(&m_wakeupMutex); m_wakeupEvent = event; m_wakeUpPending = true; } void Thread::abort() { // Clear any pending wakeup events if (m_wakeUpPending) { TQMutexLocker locker(&m_wakeupMutex); m_wakeupEvent = 0; m_wakeUpPending = false; } m_socket->protoAbort(); } void Thread::event(Event::Type type, TQValueList params) { if (m_eventHandler) { Event *e = new Event(type, params); tqApp->postEvent(m_eventHandler, e); } } void Thread::selectSocketForProtocol(const KURL &url) { if (url.protocol() == m_socket->protocolName()) return; // Change the socket if one exists Socket *socket = m_protocolMap.find(url.protocol()); if (socket) m_socket = socket; } EventParameter Thread::nextCommandParameter() { TQMutexLocker locker(&m_paramsMutex); EventParameter param = m_commandParams.front(); m_commandParams.pop_front(); return param; } void Thread::connect(const KURL &url) { TQMutexLocker locker(&m_paramsMutex); TQMutexLocker lockerq(&m_queueMutex); // Setup the correct socket to use for connection selectSocketForProtocol(url); m_commandQueue.append(Commands::CmdConnect); m_commandParams.append(EventParameter(url)); } void Thread::disconnect() { TQMutexLocker locker(&m_paramsMutex); TQMutexLocker lockerq(&m_queueMutex); m_commandQueue.append(Commands::CmdDisconnect); } void Thread::list(const KURL &url) { TQMutexLocker locker(&m_paramsMutex); TQMutexLocker lockerq(&m_queueMutex); m_commandQueue.append(Commands::CmdList); m_commandParams.append(EventParameter(url)); } void Thread::scan(const KURL &url) { TQMutexLocker locker(&m_paramsMutex); TQMutexLocker lockerq(&m_queueMutex); m_commandQueue.append(Commands::CmdScan); m_commandParams.append(EventParameter(url)); } void Thread::get(const KURL &source, const KURL &destination) { TQMutexLocker locker(&m_paramsMutex); TQMutexLocker lockerq(&m_queueMutex); m_commandQueue.append(Commands::CmdGet); m_commandParams.append(EventParameter(destination)); m_commandParams.append(EventParameter(source)); } void Thread::put(const KURL &source, const KURL &destination) { TQMutexLocker locker(&m_paramsMutex); TQMutexLocker lockerq(&m_queueMutex); m_commandQueue.append(Commands::CmdPut); m_commandParams.append(EventParameter(destination)); m_commandParams.append(EventParameter(source)); } void Thread::remove(const KURL &url) { TQMutexLocker locker(&m_paramsMutex); TQMutexLocker lockerq(&m_queueMutex); m_commandQueue.append(Commands::CmdDelete); m_commandParams.append(EventParameter(url)); } void Thread::rename(const KURL &source, const KURL &destination) { TQMutexLocker locker(&m_paramsMutex); TQMutexLocker lockerq(&m_queueMutex); m_commandQueue.append(Commands::CmdRename); m_commandParams.append(EventParameter(destination)); m_commandParams.append(EventParameter(source)); } void Thread::chmod(const KURL &url, int mode, bool recursive) { TQMutexLocker locker(&m_paramsMutex); TQMutexLocker lockerq(&m_queueMutex); m_commandQueue.append(Commands::CmdChmod); m_commandParams.append(EventParameter(recursive)); m_commandParams.append(EventParameter(mode)); m_commandParams.append(EventParameter(url)); } void Thread::mkdir(const KURL &url) { TQMutexLocker locker(&m_paramsMutex); TQMutexLocker lockerq(&m_queueMutex); m_commandQueue.append(Commands::CmdMkdir); m_commandParams.append(EventParameter(url)); } void Thread::raw(const TQString &raw) { TQMutexLocker locker(&m_paramsMutex); TQMutexLocker lockerq(&m_queueMutex); m_commandQueue.append(Commands::CmdRaw); m_commandParams.append(EventParameter(raw)); } void Thread::siteToSite(Thread *thread, const KURL &source, const KURL &destination) { TQMutexLocker locker(&m_paramsMutex); TQMutexLocker lockerq(&m_queueMutex); m_commandQueue.append(Commands::CmdFxp); m_commandParams.append(EventParameter(destination)); m_commandParams.append(EventParameter(source)); m_commandParams.append(EventParameter(thread->socket())); } }