# Copyright (C) 2007 daelstorm. All rights reserved. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Previous copyright below # Copyright (c) 2003-2004 Hyriand. All rights reserved. # # Based on code from PySoulSeek, original copyright note: # Copyright (c) 2001-2003 Alexander Kanavin. All rights reserved. """ This module implements SoulSeek networking protocol. """ from __future__ import division from math import floor from slskmessages import * import SocketServer import socket import random, sys, time if sys.platform == "win32": from multiselect import multiselect import select import threading import struct from errno import EINTR from utils import _, win32 from logfacility import log MAXFILELIMIT = -1 if win32: import ctypes ctypes.cdll.msvcrt._setmaxstdio(2048) MAXFILELIMIT = ctypes.cdll.msvcrt._getmaxstdio() else: try: import resource try: (soft, MAXFILELIMIT) = resource.getrlimit(resource.RLIMIT_NOFILE) except AttributeError: pass except ImportError: pass # OSX reports INFINITE as hard limit, but supports up to 10240 # Solaris supposedly reports 65535 and actually supports this # Linux usually reports 1024 and supports this. if MAXFILELIMIT > 65535: MAXFILELIMIT = 2048 if MAXFILELIMIT > 0: # Bumping soft limit try: resource.setrlimit(resource.RLIMIT_NOFILE, (MAXFILELIMIT, MAXFILELIMIT)) except: pass # Since most people have a limit of 1024 or higher we can # set it to 90% of the max limit and still get a workable amount of # connections. We cannot set it to 100% because our connection count # doesn't agree with with the OS connection count (at least on Linux), # maybe because closed connections aren't closed on the spot. MAXFILELIMIT = max(int(floor(MAXFILELIMIT*0.9)), 100) class Connection: """ Holds data about a connection. conn is a socket object, addr is (ip, port) pair, ibuf and obuf are input and output msgBuffer, init is a PeerInit object (see slskmessages docstrings). """ def __init__(self, conn = None, addr = None, ibuf = "", obuf = ""): self.conn = conn self.addr = addr self.ibuf = ibuf self.obuf = obuf self.init = None self.lastreadlength = 100*1024 class ServerConnection(Connection): """ Server socket """ def __init__(self, conn = None, addr = None, ibuf = "", obuf = ""): Connection.__init__(self, conn, addr, ibuf, obuf) self.lastping = time.time() class PeerConnection(Connection): def __init__(self, conn = None, addr = None, ibuf = "", obuf = "", init = None): Connection.__init__(self, conn, addr, ibuf, obuf) self.filereq = None self.filedown = None self.fileupl = None self.filereadbytes = 0 self.bytestoread = 0 self.init = init self.piercefw = None self.lastactive = time.time() self.starttime = None # Used for upload bandwidth management self.sentbytes2 = 0 self.readbytes2 = 0 class PeerConnectionInProgress: """ As all p2p connect()s are non-blocking, this class is used to hold data about a connection that is not yet established. msgObj is a message to be sent after the connection has been established. """ def __init__(self, conn = None, msgObj = None): self.conn = conn self.msgObj = msgObj self.lastactive = time.time() class SlskProtoThread(threading.Thread): """ This is a netwroking thread that actually does all the communication. It sends data to the UI thread via a callback function and receives data via a Queue object. """ """ Server and peers send each other small binary messages, that start with length and message code followed by the actual messsage data. These are the codes.""" servercodes = { Login:1, SetWaitPort:2, GetPeerAddress:3, AddUser:5, Unknown6:6, GetUserStatus:7, SayChatroom:13, JoinRoom:14, LeaveRoom:15, UserJoinedRoom:16, UserLeftRoom:17, ConnectToPeer:18, MessageUser:22, MessageAcked:23, FileSearch:26, SetStatus:28, ServerPing:32, SendSpeed:34, SharedFoldersFiles:35, GetUserStats:36, QueuedDownloads:40, Relogged:41, UserSearch:42, AddThingILike:51, RemoveThingILike:52, Recommendations:54, GlobalRecommendations:56, UserInterests:57, PlaceInLineResponse:60, # Depreciated? RoomAdded:62, RoomRemoved:63, RoomList:64, ExactFileSearch:65, AdminMessage:66, GlobalUserList:67, # Depreciated? TunneledMessage:68, # Depreciated? PrivilegedUsers:69, HaveNoParent:71, SearchParent:73, ParentMinSpeed:83, ParentSpeedRatio:84, Msg85:85, ParentInactivityTimeout:86, SearchInactivityTimeout:87, MinParentsInCache:88, Msg89:89, DistribAliveInterval:90, AddToPrivileged:91, CheckPrivileges:92, SearchRequest:93, AcceptChildren:100, NetInfo:102, WishlistSearch:103, WishlistInterval:104, SimilarUsers:110, ItemRecommendations:111, ItemSimilarUsers:112, RoomTickerState:113, RoomTickerAdd:114, RoomTickerRemove:115, RoomTickerSet:116, AddThingIHate:117, RemoveThingIHate:118, RoomSearch:120, SendUploadSpeed:121, UserPrivileged:122, GivePrivileges:123, NotifyPrivileges:124, AckNotifyPrivileges:125, BranchLevel:126, BranchRoot:127, ChildDepth:129, #AnotherStatus:10, PrivateRoomUsers:133, PrivateRoomAddUser:134, PrivateRoomRemoveUser:135, PrivateRoomDismember:136, PrivateRoomDisown:137, PrivateRoomSomething:138, PrivateRoomAdded:139, PrivateRoomRemoved:140, PrivateRoomToggle:141, ChangePassword:142, PrivateRoomAddOperator:143, PrivateRoomRemoveOperator:144, PrivateRoomOperatorAdded:145, PrivateRoomOperatorRemoved:146, PrivateRoomOwned:148, JoinPublicRoom:150, LeavePublicRoom:151, PublicRoomMessage:152, CantConnectToPeer:1001, } peercodes = { GetSharedFileList:4, SharedFileList:5, FileSearchRequest:8, FileSearchResult:9, UserInfoRequest:15, UserInfoReply:16, PMessageUser:22, FolderContentsRequest:36, FolderContentsResponse:37, TransferRequest:40, TransferResponse:41, PlaceholdUpload:42, QueueUpload:43, PlaceInQueue:44, UploadFailed:46, QueueFailed:50, PlaceInQueueRequest:51, UploadQueueNotification:52, Msg12547:12547 } distribclasses = {0:DistribAlive, 3:DistribSearch, 4:DistribBranchLevel, 5: DistribBranchRoot, 7: DistribChildDepth, 9:DistribMessage9} IN_PROGRESS_STALE_AFTER = 30 # The value of 30 was pulled out of thin air. When we let the OS handle this: # - Linux seems okay, stale in progress conns get killed after a minute or two # - With Windows, based on #473, it would seem these connections are never removed CONNECTION_MAX_IDLE = 60 def __init__(self, ui_callback, queue, bindip, config, eventprocessor): """ ui_callback is a UI callback function to be called with messages list as a parameter. queue is Queue object that holds messages from UI thread. """ threading.Thread.__init__(self) self._ui_callback = ui_callback self._queue = queue self._want_abort = 0 self._stopped = 0 self._bindip = bindip self._config = config self._eventprocessor = eventprocessor portrange = config.sections["server"]["portrange"] self.serverclasses = {} for i in self.servercodes.keys(): self.serverclasses[self.servercodes[i]] = i self.peerclasses = {} for i in self.peercodes.keys(): self.peerclasses[self.peercodes[i]] = i self._p = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._p.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._conns = {} self._connsinprogress = {} self._uploadlimit = (self._calcLimitNone, 0) self._downloadlimit = (self._calcDLimitByTotal, self._config.sections["transfers"]["downloadlimit"]) self._limits = {} self._dlimits = {} # GeoIP Config self._geoip = None # GeoIP Module self.geoip = self._eventprocessor.geoip listenport = None self.lastsocketwarning = 0 for listenport in range(portrange[0], portrange[1]+1): try: self._p.bind(('', listenport)) except socket.error: listenport = None else: self._p.listen(1) self._ui_callback([IncPort(listenport)]) break if listenport is not None: self.start() else: short = _("Could not bind to a local port, aborting connection") long = _("The range you specified for client connection ports was %(low)s-%(high)s, but none of these were usable. Increase and/or move the range and restart Nicotine+.") % {'low':portrange[0], 'high':portrange[1]} if portrange[0] < 1024: long += "\n\n" + _("Note that part of your range lies below 1024, this is usually not allowed on most operating systems with the exception of Windows.") self._ui_callback([PopupMessage(short, long)]) def _isUpload(self, conn): return conn.__class__ is PeerConnection and conn.fileupl is not None def _isDownload(self, conn): return conn.__class__ is PeerConnection and conn.filedown is not None def _calcUploadSpeed(self, i): curtime = time.time() if i.starttime is None: i.starttime = curtime elapsed = curtime - i.starttime if elapsed == 0: return 0 else: return i.sentbytes2 / elapsed def _calcLimitByTransfer(self, conns, i): max = self._uploadlimit[1] * 1024.0 limit = max - self._calcUploadSpeed(i) + 1024 if limit < 1024.0: return long(0) return long(limit) def _calcLimitByTotal(self, conns, i): max = self._uploadlimit[1] * 1024.0 bw = 0.0 for j in conns.values(): if self._isUpload(j): bw += self._calcUploadSpeed(j) limit = max - bw + 1024 if limit < 1024.0: return long(0) return long(limit) def _calcDownloadSpeed(self, i): curtime = time.time() if i.starttime is None: i.starttime = curtime elapsed = curtime - i.starttime if elapsed == 0: return 0 else: return i.readbytes2 / elapsed def _calcDLimitByTotal(self, conns, i): max = self._downloadlimit[1] * 1024.0 bw = 0.0 for j in conns: if self._isDownload(j): bw += self._calcDownloadSpeed(j) limit = max - bw + 1023 if limit < 1024.0: return long(0) return long(limit) def _calcLimitNone(self, conns, i): return None def run(self): """ Actual networking loop is here.""" # @var p Peer / Listen Port p = self._p # @var s Server Port self._server_socket = server_socket = None conns = self._conns connsinprogress = self._connsinprogress queue = self._queue while not self._want_abort: if not queue.empty(): conns, connsinprogress, server_socket = self.process_queue(queue, conns, connsinprogress, server_socket) self._server_socket = server_socket for i in conns.keys()[:]: if conns[i].__class__ is ServerConnection and i is not server_socket: del conns[i] outsocks = [i for i in conns.keys() if len(conns[i].obuf) > 0 or (i is not server_socket and conns[i].fileupl is not None and conns[i].fileupl.offset is not None)] outsock = [] self._limits = {} self._dlimits = {} for i in outsocks: if self._isUpload(conns[i]): limit = self._uploadlimit[0](conns, conns[i]) if limit is None or limit > 0: self._limits[i] = limit outsock.append(i) else: outsock.append(i) try: # Select Networking Input and Output sockets if sys.platform == "win32": input, output, exc = multiselect(conns.keys() + connsinprogress.keys() + [p], connsinprogress.keys() + outsock, [], 0.5) else: input, output, exc = select.select(conns.keys() + connsinprogress.keys() + [p], connsinprogress.keys() + outsock, [], 0.5) numsockets = 0 if p is not None: numsockets += 1 numsockets += len(conns) + len(connsinprogress) self._ui_callback([InternalData(numsockets)]) #print "Sockets open: %s = %s + %s + %s (+1)" % (len(conns.keys()+connsinprogress.keys()+[p]+outsock), len(conns.keys()), len(connsinprogress.keys()), len(outsock)) except select.error, error: if len(error.args) == 2 and error.args[0] == EINTR: # Error recieved; but we don't care :) continue # Error recieved; terminate networking loop print time.strftime("%H:%M:%S"), "select.error", error self._want_abort = 1 message = _("Major Socket Error: Networking terminated! %s" % str(error)) log.addwarning(message) except ValueError, error: # Possibly opened too many sockets print time.strftime("%H:%M:%S"), "select ValueError:", error continue # Write Output for (key, value) in conns.iteritems(): if key in output: try: self.writeData(server_socket, conns, key) except socket.error, err: self._ui_callback([ConnectError(value, err)]) # Listen / Peer Port if p in input[:]: try: incconn, incaddr = p.accept() except: time.sleep(0.01) else: ip, port = self.getIpPort(incaddr) if self.ipBlocked(ip): message = _("Ignoring connection request from blocked IP Address %s:%s" %( ip, port)) log.add(message, 3) else: conns[incconn] = PeerConnection(incconn, incaddr, "", "") self._ui_callback([IncConn(incconn, incaddr)]) # Manage Connections curtime = time.time() for connection_in_progress in connsinprogress.keys()[:]: if (curtime - connsinprogress[connection_in_progress].lastactive) > self.IN_PROGRESS_STALE_AFTER: connection_in_progress.close() del connsinprogress[connection_in_progress] continue try: msgObj = connsinprogress[connection_in_progress].msgObj if connection_in_progress in input: connection_in_progress.recv(0) except socket.error, err: self._ui_callback([ConnectError(msgObj, err)]) connection_in_progress.close() del connsinprogress[connection_in_progress] else: if connection_in_progress in output: if connection_in_progress is server_socket: conns[server_socket] = ServerConnection(server_socket, msgObj.addr, "","") self._ui_callback([ServerConn(server_socket, msgObj.addr)]) else: ip, port = self.getIpPort(msgObj.addr) if self.ipBlocked(ip): message = "Blocking peer connection in progress to IP: %(ip)s Port: %(port)s" % { "ip":ip, "port":port} log.add(message, 3) connection_in_progress.close() else: conns[connection_in_progress] = PeerConnection(connection_in_progress, msgObj.addr, "", "", msgObj.init) self._ui_callback([OutConn(connection_in_progress, msgObj.addr)]) del connsinprogress[connection_in_progress] # Process Data for connection in conns.keys()[:]: ip, port = self.getIpPort(conns[connection].addr) if self.ipBlocked(ip) and connection is not self._server_socket: message = "Blocking peer connection to IP: %(ip)s Port: %(port)s" % { "ip":ip, "port":port} log.add(message, 3) connection.close() del conns[connection] continue if connection in input: if self._isDownload(conns[connection]): limit = self._downloadlimit[0](conns, connection) if limit is None or limit > 0: self._dlimits[connection] = limit #if connection in self._dlimits: #Todo: fix this Ugly download limit hack (sleep) #time.sleep(1.0) try: self.readData(conns, connection) except socket.error, err: self._ui_callback([ConnectError(conns[connection], err)]) else: try: self.readData(conns, connection) except socket.error, err: self._ui_callback([ConnectError(conns[connection], err)]) if connection in conns and len(conns[connection].ibuf) > 0: if connection is server_socket: msgs, conns[server_socket].ibuf = self.process_server_input(conns[server_socket].ibuf) self._ui_callback(msgs) else: if conns[connection].init is None or conns[connection].init.type not in ['F', 'D']: msgs, conns[connection] = self.process_peer_input(conns[connection], conns[connection].ibuf) self._ui_callback(msgs) if conns[connection].init is not None and conns[connection].init.type == 'F': msgs, conns[connection] = self.process_file_input(conns[connection], conns[connection].ibuf) self._ui_callback(msgs) if conns[connection].init is not None and conns[connection].init.type == 'D': msgs, conns[connection] = self.process_distrib_input(conns[connection], conns[connection].ibuf) self._ui_callback(msgs) if conns[connection].conn == None: del conns[connection] # --------------------------- # Server Pings used to get us banned # --------------------------- # Timeout Connections curtime = time.time() connsockets = len(conns.keys()) for connection in conns.keys()[:]: if connection is not server_socket and connection is not p: if curtime - conns[connection].lastactive > self.CONNECTION_MAX_IDLE: self._ui_callback([ConnClose(connection, conns[connection].addr)]) connection.close() #print "Closed_run", conns[i].addr del conns[connection] # Was 30 seconds if server_socket in conns: if curtime - conns[server_socket].lastping > 120: conns[server_socket].lastping = curtime queue.put(ServerPing()) self._ui_callback([]) if self._downloadlimit[1] and self._downloadlimit[1] > 0: time.sleep(0.1) # Close Server Port if server_socket is not None: server_socket.close() #print "Networking thread aborted" self._stopped = 1 def socketStillActive(self, conn): try: connection = self._conns[conn] except KeyError: return False return (len(connection.obuf) > 0 or len(connection.ibuf) > 0) def ipBlocked(self, address): if address is None: return True ips = self._config.sections["server"]["ipblocklist"] s_address = address.split(".") for ip in ips: # No Wildcard in IP if "*" not in ip: if address == ip: return True continue # Wildcard in IP parts = ip.split(".") seg = 0 for part in parts: # Stop if there's no wildcard or matching string number if part not in ( s_address[seg], "*"): break seg += 1 # Last time around if seg == 4: # Wildcard blocked return True # Not blocked return False def getIpPort(self, address): ip = port = None if type(address) is tuple: ip, port = address return ip, port def writeData(self, server_socket, conns, i): if i in self._limits: limit = self._limits[i] else: limit = None conns[i].lastactive = time.time() i.setblocking(0) if limit is None: bytes_send = i.send(conns[i].obuf) else: bytes_send = i.send(conns[i].obuf[:limit]) i.setblocking(1) conns[i].obuf = conns[i].obuf[bytes_send:] if i is not server_socket: if conns[i].fileupl is not None and conns[i].fileupl.offset is not None: conns[i].fileupl.sentbytes += bytes_send conns[i].sentbytes2 += bytes_send try: if conns[i].fileupl.offset + conns[i].fileupl.sentbytes + len(conns[i].obuf) < conns[i].fileupl.size: bytestoread = bytes_send*2-len(conns[i].obuf)+10*1024 if bytestoread > 0: read = conns[i].fileupl.file.read(bytestoread) conns[i].obuf = conns[i].obuf + read except IOError, strerror: self._ui_callback([FileError(conns[i], conns[i].fileupl.file, strerror)]) except ValueError: pass if bytes_send > 0: self._ui_callback([conns[i].fileupl]) def readData(self, conns, i): # Check for a download limit if i in self._dlimits: limit = self._dlimits[i] else: limit = None conns[i].lastactive = time.time() if limit is None: # Unlimited download data data = i.recv(conns[i].lastreadlength) conns[i].ibuf = conns[i].ibuf + data if len(data) >= conns[i].lastreadlength//2: conns[i].lastreadlength = conns[i].lastreadlength * 2 else: # Speed Limited Download data (transfers) data = i.recv(conns[i].lastreadlength ) conns[i].ibuf += data conns[i].lastreadlength = limit conns[i].readbytes2 += len(data) if not data: self._ui_callback([ConnClose(i, conns[i].addr)]) i.close() #print "Closed", conns[i].addr del conns[i] def process_server_input(self, msgBuffer): """ Server has sent us something, this function retrieves messages from the msgBuffer, creates message objects and returns them and the rest of the msgBuffer. """ msgs = [] # Server messages are 8 bytes or greater in length while len(msgBuffer) >= 8: msgsize, msgtype = struct.unpack(" len(msgBuffer): break elif msgtype in self.serverclasses: msg = self.serverclasses[msgtype]() msg.parseNetworkMessage(msgBuffer[8:msgsize+4]) msgs.append(msg) else: msgs.append(_("Server message type %(type)i size %(size)i contents %(msgBuffer)s unknown") %{'type':msgtype, 'size':msgsize-4, 'msgBuffer':msgBuffer[8:msgsize+4].__repr__()}) msgBuffer = msgBuffer[msgsize+4:] return msgs, msgBuffer def parseFileReq(self, conn, msgBuffer): msg = None # File Request messages are 4 bytes or greater in length if len(msgBuffer) >= 4: reqnum = struct.unpack("= 8: offset = struct.unpack(" 0: try: conn.filedown.file.write(msgBuffer[:leftbytes]) except IOError, strerror: self._ui_callback([FileError(conn, conn.filedown.file, strerror)]) except ValueError: pass self._ui_callback([DownloadFile(conn.conn, len(msgBuffer[:leftbytes]), conn.filedown.file)]) conn.filereadbytes = conn.filereadbytes + len(msgBuffer[:leftbytes]) msgBuffer = msgBuffer[leftbytes:] elif conn.fileupl is not None: if conn.fileupl.offset is None: offset, msgBuffer = self.parseOffset(conn, msgBuffer) if offset is not None: try: conn.fileupl.file.seek(offset) except IOError, strerror: self._ui_callback([FileError(conn, conn.fileupl.file, strerror)]) except ValueError: pass conn.fileupl.offset = offset self._ui_callback([conn.fileupl]) conn.ibuf = msgBuffer return msgs, conn def process_peer_input(self, conn, msgBuffer): """ We have a "P" connection (p2p exchange) , peer has sent us something, this function retrieves messages from the msgBuffer, creates message objects and returns them and the rest of the msgBuffer. """ msgs = [] while (conn.init is None or conn.init.type not in ['F', 'D']) and len(msgBuffer) >= 8: msgsize = struct.unpack("= 8: msgtype = struct.unpack(" len(msgBuffer): break elif conn.init is None: # Unpack Peer Connections if msgBuffer[4] == chr(0): msg = PierceFireWall(conn) try: msg.parseNetworkMessage(msgBuffer[5:msgsize+4]) except Exception, error: print error else: conn.piercefw = msg msgs.append(msg) elif msgBuffer[4] == chr(1): msg = PeerInit(conn) try: msg.parseNetworkMessage(msgBuffer[5:msgsize+4]) except Exception, error: print error else: conn.init = msg msgs.append(msg) elif conn.piercefw is None: msgs.append(_("Unknown peer init code: %(type)i, message contents %(msgBuffer)s") %{'type':ord(msgBuffer[4]), 'msgBuffer':msgBuffer[5:msgsize+4].__repr__()}) conn.conn.close() self._ui_callback([ConnClose(conn.conn, conn.addr)]) conn.conn = None break else: break elif conn.init.type == 'P': # Unpack Peer Messages msgtype = struct.unpack("= 0: msgBuffer = msgBuffer[msgsize+4:] else: msgBuffer = "" conn.ibuf = msgBuffer return msgs, conn def process_distrib_input(self, conn, msgBuffer): """ We have a distributed network connection, parent has sent us something, this function retrieves messages from the msgBuffer, creates message objects and returns them and the rest of the msgBuffer. """ msgs = [] while len(msgBuffer) >= 5: msgsize = struct.unpack(" len(msgBuffer): break msgtype = ord(msgBuffer[4]) if msgtype in self.distribclasses: msg = self.distribclasses[msgtype](conn) msg.parseNetworkMessage(msgBuffer[5:msgsize+4]) msgs.append(msg) else: msgs.append(_("Distrib message type %(type)i size %(size)i contents %(msgBuffer)s unknown") %{'type':msgtype, 'size':msgsize-1, 'msgBuffer':msgBuffer[5:msgsize+4].__repr__() } ) conn.conn.close() self._ui_callback([ConnClose(conn.conn, conn.addr)]) conn.conn = None break if msgsize >= 0: msgBuffer = msgBuffer[msgsize+4:] else: msgBuffer = "" conn.ibuf = msgBuffer return msgs, conn def _resetCounters(self, conns): curtime = time.time() for i in conns.values(): if self._isUpload(i): i.starttime = curtime i.sentbytes2 = 0 if self._isDownload(i): i.starttime = curtime i.sentbytes2 = 0 def process_queue(self, queue, conns, connsinprogress, server_socket, maxsockets=MAXFILELIMIT): """ Processes messages sent by UI thread. server_socket is a server connection socket object, queue holds the messages, conns and connsinprogress are dictionaries holding Connection and PeerConnectionInProgress messages.""" msgList = [] needsleep = False numsockets = 0 if server_socket is not None: numsockets += 1 numsockets += len(conns) + len(connsinprogress) while not queue.empty(): msgList.append(queue.get()) for msgObj in msgList: if issubclass(msgObj.__class__, ServerMessage): try: msg = msgObj.makeNetworkMessage() if server_socket in conns: conns[server_socket].obuf = conns[server_socket].obuf + struct.pack("= 0: checkuser = 0 if checkuser: msg = msgObj.makeNetworkMessage() conns[msgObj.conn].obuf += struct.pack(" 60: self.lastsocketwarning = time.time() log.addwarning(_("You have just hit your connection limit of %(limit)s. Nicotine+ will drop connections for your protection. If you get this message often you should search for less generic terms, or increase your per-process file descriptor limit.") % {'limit':maxsockets}) if needsleep: time.sleep(1) return conns, connsinprogress, server_socket def abort(self): """ Call this to abort the thread""" self._want_abort = 1 def stopped(self): """ returns true if thread has stopped """ return self._stopped