# Copyright (C) 2007 daelstorm. All rights reserved.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
#
# Previous copyright below
# Copyright (c) 2003-2004 Hyriand. All rights reserved.
#
# Based on code from PySoulSeek, original copyright note:
# Copyright (c) 2001-2003 Alexander Kanavin. All rights reserved.
"""
This module implements SoulSeek networking protocol.
"""
from __future__ import division
from math import floor
from slskmessages import *
import SocketServer
import socket
import random, sys, time
if sys.platform == "win32":
from multiselect import multiselect
import select
import threading
import struct
from errno import EINTR
from utils import _, win32
from logfacility import log
MAXFILELIMIT = -1
if win32:
import ctypes
ctypes.cdll.msvcrt._setmaxstdio(2048)
MAXFILELIMIT = ctypes.cdll.msvcrt._getmaxstdio()
else:
try:
import resource
try:
(soft, MAXFILELIMIT) = resource.getrlimit(resource.RLIMIT_NOFILE)
except AttributeError:
pass
except ImportError:
pass
# OSX reports INFINITE as hard limit, but supports up to 10240
# Solaris supposedly reports 65535 and actually supports this
# Linux usually reports 1024 and supports this.
if MAXFILELIMIT > 65535:
MAXFILELIMIT = 2048
if MAXFILELIMIT > 0:
# Bumping soft limit
try:
resource.setrlimit(resource.RLIMIT_NOFILE, (MAXFILELIMIT, MAXFILELIMIT))
except:
pass
# Since most people have a limit of 1024 or higher we can
# set it to 90% of the max limit and still get a workable amount of
# connections. We cannot set it to 100% because our connection count
# doesn't agree with with the OS connection count (at least on Linux),
# maybe because closed connections aren't closed on the spot.
MAXFILELIMIT = max(int(floor(MAXFILELIMIT*0.9)), 100)
class Connection:
"""
Holds data about a connection. conn is a socket object,
addr is (ip, port) pair, ibuf and obuf are input and output msgBuffer,
init is a PeerInit object (see slskmessages docstrings).
"""
def __init__(self, conn = None, addr = None, ibuf = "", obuf = ""):
self.conn = conn
self.addr = addr
self.ibuf = ibuf
self.obuf = obuf
self.init = None
self.lastreadlength = 100*1024
class ServerConnection(Connection):
"""
Server socket
"""
def __init__(self, conn = None, addr = None, ibuf = "", obuf = ""):
Connection.__init__(self, conn, addr, ibuf, obuf)
self.lastping = time.time()
class PeerConnection(Connection):
def __init__(self, conn = None, addr = None, ibuf = "", obuf = "", init = None):
Connection.__init__(self, conn, addr, ibuf, obuf)
self.filereq = None
self.filedown = None
self.fileupl = None
self.filereadbytes = 0
self.bytestoread = 0
self.init = init
self.piercefw = None
self.lastactive = time.time()
self.starttime = None # Used for upload bandwidth management
self.sentbytes2 = 0
self.readbytes2 = 0
class PeerConnectionInProgress:
""" As all p2p connect()s are non-blocking, this class is used to
hold data about a connection that is not yet established. msgObj is
a message to be sent after the connection has been established.
"""
def __init__(self, conn = None, msgObj = None):
self.conn = conn
self.msgObj = msgObj
self.lastactive = time.time()
class SlskProtoThread(threading.Thread):
""" This is a netwroking thread that actually does all the communication.
It sends data to the UI thread via a callback function and receives data
via a Queue object.
"""
""" Server and peers send each other small binary messages, that start
with length and message code followed by the actual messsage data.
These are the codes."""
servercodes = {
Login:1,
SetWaitPort:2,
GetPeerAddress:3,
AddUser:5,
Unknown6:6,
GetUserStatus:7,
SayChatroom:13,
JoinRoom:14,
LeaveRoom:15,
UserJoinedRoom:16,
UserLeftRoom:17,
ConnectToPeer:18,
MessageUser:22,
MessageAcked:23,
FileSearch:26,
SetStatus:28,
ServerPing:32,
SendSpeed:34,
SharedFoldersFiles:35,
GetUserStats:36,
QueuedDownloads:40,
Relogged:41,
UserSearch:42,
AddThingILike:51,
RemoveThingILike:52,
Recommendations:54,
GlobalRecommendations:56,
UserInterests:57,
PlaceInLineResponse:60, # Depreciated?
RoomAdded:62,
RoomRemoved:63,
RoomList:64,
ExactFileSearch:65,
AdminMessage:66,
GlobalUserList:67, # Depreciated?
TunneledMessage:68, # Depreciated?
PrivilegedUsers:69,
HaveNoParent:71,
SearchParent:73,
ParentMinSpeed:83,
ParentSpeedRatio:84,
Msg85:85,
ParentInactivityTimeout:86,
SearchInactivityTimeout:87,
MinParentsInCache:88,
Msg89:89,
DistribAliveInterval:90,
AddToPrivileged:91,
CheckPrivileges:92,
SearchRequest:93,
AcceptChildren:100,
NetInfo:102,
WishlistSearch:103,
WishlistInterval:104,
SimilarUsers:110,
ItemRecommendations:111,
ItemSimilarUsers:112,
RoomTickerState:113,
RoomTickerAdd:114,
RoomTickerRemove:115,
RoomTickerSet:116,
AddThingIHate:117,
RemoveThingIHate:118,
RoomSearch:120,
SendUploadSpeed:121,
UserPrivileged:122,
GivePrivileges:123,
NotifyPrivileges:124,
AckNotifyPrivileges:125,
BranchLevel:126,
BranchRoot:127,
ChildDepth:129,
#AnotherStatus:10,
PrivateRoomUsers:133,
PrivateRoomAddUser:134,
PrivateRoomRemoveUser:135,
PrivateRoomDismember:136,
PrivateRoomDisown:137,
PrivateRoomSomething:138,
PrivateRoomAdded:139,
PrivateRoomRemoved:140,
PrivateRoomToggle:141,
ChangePassword:142,
PrivateRoomAddOperator:143,
PrivateRoomRemoveOperator:144,
PrivateRoomOperatorAdded:145,
PrivateRoomOperatorRemoved:146,
PrivateRoomOwned:148,
JoinPublicRoom:150,
LeavePublicRoom:151,
PublicRoomMessage:152,
CantConnectToPeer:1001,
}
peercodes = {
GetSharedFileList:4,
SharedFileList:5,
FileSearchRequest:8,
FileSearchResult:9,
UserInfoRequest:15,
UserInfoReply:16,
PMessageUser:22,
FolderContentsRequest:36,
FolderContentsResponse:37,
TransferRequest:40,
TransferResponse:41,
PlaceholdUpload:42,
QueueUpload:43,
PlaceInQueue:44,
UploadFailed:46,
QueueFailed:50,
PlaceInQueueRequest:51,
UploadQueueNotification:52,
Msg12547:12547
}
distribclasses = {0:DistribAlive, 3:DistribSearch, 4:DistribBranchLevel, 5: DistribBranchRoot, 7: DistribChildDepth, 9:DistribMessage9}
IN_PROGRESS_STALE_AFTER = 30
# The value of 30 was pulled out of thin air. When we let the OS handle this:
# - Linux seems okay, stale in progress conns get killed after a minute or two
# - With Windows, based on #473, it would seem these connections are never removed
CONNECTION_MAX_IDLE = 60
def __init__(self, ui_callback, queue, bindip, config, eventprocessor):
""" ui_callback is a UI callback function to be called with messages
list as a parameter. queue is Queue object that holds messages from UI
thread.
"""
threading.Thread.__init__(self)
self._ui_callback = ui_callback
self._queue = queue
self._want_abort = 0
self._stopped = 0
self._bindip = bindip
self._config = config
self._eventprocessor = eventprocessor
portrange = config.sections["server"]["portrange"]
self.serverclasses = {}
for i in self.servercodes.keys():
self.serverclasses[self.servercodes[i]] = i
self.peerclasses = {}
for i in self.peercodes.keys():
self.peerclasses[self.peercodes[i]] = i
self._p = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._p.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._conns = {}
self._connsinprogress = {}
self._uploadlimit = (self._calcLimitNone, 0)
self._downloadlimit = (self._calcDLimitByTotal, self._config.sections["transfers"]["downloadlimit"])
self._limits = {}
self._dlimits = {}
# GeoIP Config
self._geoip = None
# GeoIP Module
self.geoip = self._eventprocessor.geoip
listenport = None
self.lastsocketwarning = 0
for listenport in range(portrange[0], portrange[1]+1):
try:
self._p.bind(('', listenport))
except socket.error:
listenport = None
else:
self._p.listen(1)
self._ui_callback([IncPort(listenport)])
break
if listenport is not None:
self.start()
else:
short = _("Could not bind to a local port, aborting connection")
long = _("The range you specified for client connection ports was %(low)s-%(high)s, but none of these were usable. Increase and/or move the range and restart Nicotine+.") % {'low':portrange[0], 'high':portrange[1]}
if portrange[0] < 1024:
long += "\n\n" + _("Note that part of your range lies below 1024, this is usually not allowed on most operating systems with the exception of Windows.")
self._ui_callback([PopupMessage(short, long)])
def _isUpload(self, conn):
return conn.__class__ is PeerConnection and conn.fileupl is not None
def _isDownload(self, conn):
return conn.__class__ is PeerConnection and conn.filedown is not None
def _calcUploadSpeed(self, i):
curtime = time.time()
if i.starttime is None:
i.starttime = curtime
elapsed = curtime - i.starttime
if elapsed == 0:
return 0
else:
return i.sentbytes2 / elapsed
def _calcLimitByTransfer(self, conns, i):
max = self._uploadlimit[1] * 1024.0
limit = max - self._calcUploadSpeed(i) + 1024
if limit < 1024.0:
return long(0)
return long(limit)
def _calcLimitByTotal(self, conns, i):
max = self._uploadlimit[1] * 1024.0
bw = 0.0
for j in conns.values():
if self._isUpload(j):
bw += self._calcUploadSpeed(j)
limit = max - bw + 1024
if limit < 1024.0:
return long(0)
return long(limit)
def _calcDownloadSpeed(self, i):
curtime = time.time()
if i.starttime is None:
i.starttime = curtime
elapsed = curtime - i.starttime
if elapsed == 0:
return 0
else:
return i.readbytes2 / elapsed
def _calcDLimitByTotal(self, conns, i):
max = self._downloadlimit[1] * 1024.0
bw = 0.0
for j in conns:
if self._isDownload(j):
bw += self._calcDownloadSpeed(j)
limit = max - bw + 1023
if limit < 1024.0:
return long(0)
return long(limit)
def _calcLimitNone(self, conns, i):
return None
def run(self):
""" Actual networking loop is here."""
# @var p Peer / Listen Port
p = self._p
# @var s Server Port
self._server_socket = server_socket = None
conns = self._conns
connsinprogress = self._connsinprogress
queue = self._queue
while not self._want_abort:
if not queue.empty():
conns, connsinprogress, server_socket = self.process_queue(queue, conns, connsinprogress, server_socket)
self._server_socket = server_socket
for i in conns.keys()[:]:
if conns[i].__class__ is ServerConnection and i is not server_socket:
del conns[i]
outsocks = [i for i in conns.keys() if len(conns[i].obuf) > 0 or (i is not server_socket and conns[i].fileupl is not None and conns[i].fileupl.offset is not None)]
outsock = []
self._limits = {}
self._dlimits = {}
for i in outsocks:
if self._isUpload(conns[i]):
limit = self._uploadlimit[0](conns, conns[i])
if limit is None or limit > 0:
self._limits[i] = limit
outsock.append(i)
else:
outsock.append(i)
try:
# Select Networking Input and Output sockets
if sys.platform == "win32":
input, output, exc = multiselect(conns.keys() + connsinprogress.keys() + [p], connsinprogress.keys() + outsock, [], 0.5)
else:
input, output, exc = select.select(conns.keys() + connsinprogress.keys() + [p], connsinprogress.keys() + outsock, [], 0.5)
numsockets = 0
if p is not None:
numsockets += 1
numsockets += len(conns) + len(connsinprogress)
self._ui_callback([InternalData(numsockets)])
#print "Sockets open: %s = %s + %s + %s (+1)" % (len(conns.keys()+connsinprogress.keys()+[p]+outsock), len(conns.keys()), len(connsinprogress.keys()), len(outsock))
except select.error, error:
if len(error.args) == 2 and error.args[0] == EINTR:
# Error recieved; but we don't care :)
continue
# Error recieved; terminate networking loop
print time.strftime("%H:%M:%S"), "select.error", error
self._want_abort = 1
message = _("Major Socket Error: Networking terminated! %s" % str(error))
log.addwarning(message)
except ValueError, error:
# Possibly opened too many sockets
print time.strftime("%H:%M:%S"), "select ValueError:", error
continue
# Write Output
for (key, value) in conns.iteritems():
if key in output:
try:
self.writeData(server_socket, conns, key)
except socket.error, err:
self._ui_callback([ConnectError(value, err)])
# Listen / Peer Port
if p in input[:]:
try:
incconn, incaddr = p.accept()
except:
time.sleep(0.01)
else:
ip, port = self.getIpPort(incaddr)
if self.ipBlocked(ip):
message = _("Ignoring connection request from blocked IP Address %s:%s" %( ip, port))
log.add(message, 3)
else:
conns[incconn] = PeerConnection(incconn, incaddr, "", "")
self._ui_callback([IncConn(incconn, incaddr)])
# Manage Connections
curtime = time.time()
for connection_in_progress in connsinprogress.keys()[:]:
if (curtime - connsinprogress[connection_in_progress].lastactive) > self.IN_PROGRESS_STALE_AFTER:
connection_in_progress.close()
del connsinprogress[connection_in_progress]
continue
try:
msgObj = connsinprogress[connection_in_progress].msgObj
if connection_in_progress in input:
connection_in_progress.recv(0)
except socket.error, err:
self._ui_callback([ConnectError(msgObj, err)])
connection_in_progress.close()
del connsinprogress[connection_in_progress]
else:
if connection_in_progress in output:
if connection_in_progress is server_socket:
conns[server_socket] = ServerConnection(server_socket, msgObj.addr, "","")
self._ui_callback([ServerConn(server_socket, msgObj.addr)])
else:
ip, port = self.getIpPort(msgObj.addr)
if self.ipBlocked(ip):
message = "Blocking peer connection in progress to IP: %(ip)s Port: %(port)s" % { "ip":ip, "port":port}
log.add(message, 3)
connection_in_progress.close()
else:
conns[connection_in_progress] = PeerConnection(connection_in_progress, msgObj.addr, "", "", msgObj.init)
self._ui_callback([OutConn(connection_in_progress, msgObj.addr)])
del connsinprogress[connection_in_progress]
# Process Data
for connection in conns.keys()[:]:
ip, port = self.getIpPort(conns[connection].addr)
if self.ipBlocked(ip) and connection is not self._server_socket:
message = "Blocking peer connection to IP: %(ip)s Port: %(port)s" % { "ip":ip, "port":port}
log.add(message, 3)
connection.close()
del conns[connection]
continue
if connection in input:
if self._isDownload(conns[connection]):
limit = self._downloadlimit[0](conns, connection)
if limit is None or limit > 0:
self._dlimits[connection] = limit
#if connection in self._dlimits:
#Todo: fix this Ugly download limit hack (sleep)
#time.sleep(1.0)
try:
self.readData(conns, connection)
except socket.error, err:
self._ui_callback([ConnectError(conns[connection], err)])
else:
try:
self.readData(conns, connection)
except socket.error, err:
self._ui_callback([ConnectError(conns[connection], err)])
if connection in conns and len(conns[connection].ibuf) > 0:
if connection is server_socket:
msgs, conns[server_socket].ibuf = self.process_server_input(conns[server_socket].ibuf)
self._ui_callback(msgs)
else:
if conns[connection].init is None or conns[connection].init.type not in ['F', 'D']:
msgs, conns[connection] = self.process_peer_input(conns[connection], conns[connection].ibuf)
self._ui_callback(msgs)
if conns[connection].init is not None and conns[connection].init.type == 'F':
msgs, conns[connection] = self.process_file_input(conns[connection], conns[connection].ibuf)
self._ui_callback(msgs)
if conns[connection].init is not None and conns[connection].init.type == 'D':
msgs, conns[connection] = self.process_distrib_input(conns[connection], conns[connection].ibuf)
self._ui_callback(msgs)
if conns[connection].conn == None:
del conns[connection]
# ---------------------------
# Server Pings used to get us banned
# ---------------------------
# Timeout Connections
curtime = time.time()
connsockets = len(conns.keys())
for connection in conns.keys()[:]:
if connection is not server_socket and connection is not p:
if curtime - conns[connection].lastactive > self.CONNECTION_MAX_IDLE:
self._ui_callback([ConnClose(connection, conns[connection].addr)])
connection.close()
#print "Closed_run", conns[i].addr
del conns[connection]
# Was 30 seconds
if server_socket in conns:
if curtime - conns[server_socket].lastping > 120:
conns[server_socket].lastping = curtime
queue.put(ServerPing())
self._ui_callback([])
if self._downloadlimit[1] and self._downloadlimit[1] > 0:
time.sleep(0.1)
# Close Server Port
if server_socket is not None:
server_socket.close()
#print "Networking thread aborted"
self._stopped = 1
def socketStillActive(self, conn):
try:
connection = self._conns[conn]
except KeyError:
return False
return (len(connection.obuf) > 0 or len(connection.ibuf) > 0)
def ipBlocked(self, address):
if address is None:
return True
ips = self._config.sections["server"]["ipblocklist"]
s_address = address.split(".")
for ip in ips:
# No Wildcard in IP
if "*" not in ip:
if address == ip:
return True
continue
# Wildcard in IP
parts = ip.split(".")
seg = 0
for part in parts:
# Stop if there's no wildcard or matching string number
if part not in ( s_address[seg], "*"):
break
seg += 1
# Last time around
if seg == 4:
# Wildcard blocked
return True
# Not blocked
return False
def getIpPort(self, address):
ip = port = None
if type(address) is tuple:
ip, port = address
return ip, port
def writeData(self, server_socket, conns, i):
if i in self._limits:
limit = self._limits[i]
else:
limit = None
conns[i].lastactive = time.time()
i.setblocking(0)
if limit is None:
bytes_send = i.send(conns[i].obuf)
else:
bytes_send = i.send(conns[i].obuf[:limit])
i.setblocking(1)
conns[i].obuf = conns[i].obuf[bytes_send:]
if i is not server_socket:
if conns[i].fileupl is not None and conns[i].fileupl.offset is not None:
conns[i].fileupl.sentbytes += bytes_send
conns[i].sentbytes2 += bytes_send
try:
if conns[i].fileupl.offset + conns[i].fileupl.sentbytes + len(conns[i].obuf) < conns[i].fileupl.size:
bytestoread = bytes_send*2-len(conns[i].obuf)+10*1024
if bytestoread > 0:
read = conns[i].fileupl.file.read(bytestoread)
conns[i].obuf = conns[i].obuf + read
except IOError, strerror:
self._ui_callback([FileError(conns[i], conns[i].fileupl.file, strerror)])
except ValueError:
pass
if bytes_send > 0:
self._ui_callback([conns[i].fileupl])
def readData(self, conns, i):
# Check for a download limit
if i in self._dlimits:
limit = self._dlimits[i]
else:
limit = None
conns[i].lastactive = time.time()
if limit is None:
# Unlimited download data
data = i.recv(conns[i].lastreadlength)
conns[i].ibuf = conns[i].ibuf + data
if len(data) >= conns[i].lastreadlength//2:
conns[i].lastreadlength = conns[i].lastreadlength * 2
else:
# Speed Limited Download data (transfers)
data = i.recv(conns[i].lastreadlength )
conns[i].ibuf += data
conns[i].lastreadlength = limit
conns[i].readbytes2 += len(data)
if not data:
self._ui_callback([ConnClose(i, conns[i].addr)])
i.close()
#print "Closed", conns[i].addr
del conns[i]
def process_server_input(self, msgBuffer):
""" Server has sent us something, this function retrieves messages
from the msgBuffer, creates message objects and returns them and the rest
of the msgBuffer.
"""
msgs = []
# Server messages are 8 bytes or greater in length
while len(msgBuffer) >= 8:
msgsize, msgtype = struct.unpack(" len(msgBuffer):
break
elif msgtype in self.serverclasses:
msg = self.serverclasses[msgtype]()
msg.parseNetworkMessage(msgBuffer[8:msgsize+4])
msgs.append(msg)
else:
msgs.append(_("Server message type %(type)i size %(size)i contents %(msgBuffer)s unknown") %{'type':msgtype, 'size':msgsize-4, 'msgBuffer':msgBuffer[8:msgsize+4].__repr__()})
msgBuffer = msgBuffer[msgsize+4:]
return msgs, msgBuffer
def parseFileReq(self, conn, msgBuffer):
msg = None
# File Request messages are 4 bytes or greater in length
if len(msgBuffer) >= 4:
reqnum = struct.unpack("= 8:
offset = struct.unpack(" 0:
try:
conn.filedown.file.write(msgBuffer[:leftbytes])
except IOError, strerror:
self._ui_callback([FileError(conn, conn.filedown.file, strerror)])
except ValueError:
pass
self._ui_callback([DownloadFile(conn.conn, len(msgBuffer[:leftbytes]), conn.filedown.file)])
conn.filereadbytes = conn.filereadbytes + len(msgBuffer[:leftbytes])
msgBuffer = msgBuffer[leftbytes:]
elif conn.fileupl is not None:
if conn.fileupl.offset is None:
offset, msgBuffer = self.parseOffset(conn, msgBuffer)
if offset is not None:
try:
conn.fileupl.file.seek(offset)
except IOError, strerror:
self._ui_callback([FileError(conn, conn.fileupl.file, strerror)])
except ValueError:
pass
conn.fileupl.offset = offset
self._ui_callback([conn.fileupl])
conn.ibuf = msgBuffer
return msgs, conn
def process_peer_input(self, conn, msgBuffer):
""" We have a "P" connection (p2p exchange) , peer has sent us
something, this function retrieves messages
from the msgBuffer, creates message objects and returns them
and the rest of the msgBuffer.
"""
msgs = []
while (conn.init is None or conn.init.type not in ['F', 'D']) and len(msgBuffer) >= 8:
msgsize = struct.unpack("= 8:
msgtype = struct.unpack(" len(msgBuffer):
break
elif conn.init is None:
# Unpack Peer Connections
if msgBuffer[4] == chr(0):
msg = PierceFireWall(conn)
try:
msg.parseNetworkMessage(msgBuffer[5:msgsize+4])
except Exception, error:
print error
else:
conn.piercefw = msg
msgs.append(msg)
elif msgBuffer[4] == chr(1):
msg = PeerInit(conn)
try:
msg.parseNetworkMessage(msgBuffer[5:msgsize+4])
except Exception, error:
print error
else:
conn.init = msg
msgs.append(msg)
elif conn.piercefw is None:
msgs.append(_("Unknown peer init code: %(type)i, message contents %(msgBuffer)s") %{'type':ord(msgBuffer[4]), 'msgBuffer':msgBuffer[5:msgsize+4].__repr__()})
conn.conn.close()
self._ui_callback([ConnClose(conn.conn, conn.addr)])
conn.conn = None
break
else:
break
elif conn.init.type == 'P':
# Unpack Peer Messages
msgtype = struct.unpack("= 0:
msgBuffer = msgBuffer[msgsize+4:]
else:
msgBuffer = ""
conn.ibuf = msgBuffer
return msgs, conn
def process_distrib_input(self, conn, msgBuffer):
""" We have a distributed network connection, parent has sent us
something, this function retrieves messages
from the msgBuffer, creates message objects and returns them
and the rest of the msgBuffer.
"""
msgs = []
while len(msgBuffer) >= 5:
msgsize = struct.unpack(" len(msgBuffer):
break
msgtype = ord(msgBuffer[4])
if msgtype in self.distribclasses:
msg = self.distribclasses[msgtype](conn)
msg.parseNetworkMessage(msgBuffer[5:msgsize+4])
msgs.append(msg)
else:
msgs.append(_("Distrib message type %(type)i size %(size)i contents %(msgBuffer)s unknown") %{'type':msgtype, 'size':msgsize-1, 'msgBuffer':msgBuffer[5:msgsize+4].__repr__() } )
conn.conn.close()
self._ui_callback([ConnClose(conn.conn, conn.addr)])
conn.conn = None
break
if msgsize >= 0:
msgBuffer = msgBuffer[msgsize+4:]
else:
msgBuffer = ""
conn.ibuf = msgBuffer
return msgs, conn
def _resetCounters(self, conns):
curtime = time.time()
for i in conns.values():
if self._isUpload(i):
i.starttime = curtime
i.sentbytes2 = 0
if self._isDownload(i):
i.starttime = curtime
i.sentbytes2 = 0
def process_queue(self, queue, conns, connsinprogress, server_socket, maxsockets=MAXFILELIMIT):
""" Processes messages sent by UI thread. server_socket is a server connection
socket object, queue holds the messages, conns and connsinprogress
are dictionaries holding Connection and PeerConnectionInProgress
messages."""
msgList = []
needsleep = False
numsockets = 0
if server_socket is not None:
numsockets += 1
numsockets += len(conns) + len(connsinprogress)
while not queue.empty():
msgList.append(queue.get())
for msgObj in msgList:
if issubclass(msgObj.__class__, ServerMessage):
try:
msg = msgObj.makeNetworkMessage()
if server_socket in conns:
conns[server_socket].obuf = conns[server_socket].obuf + struct.pack("= 0:
checkuser = 0
if checkuser:
msg = msgObj.makeNetworkMessage()
conns[msgObj.conn].obuf += struct.pack(" 60:
self.lastsocketwarning = time.time()
log.addwarning(_("You have just hit your connection limit of %(limit)s. Nicotine+ will drop connections for your protection. If you get this message often you should search for less generic terms, or increase your per-process file descriptor limit.") % {'limit':maxsockets})
if needsleep:
time.sleep(1)
return conns, connsinprogress, server_socket
def abort(self):
""" Call this to abort the thread"""
self._want_abort = 1
def stopped(self):
""" returns true if thread has stopped """
return self._stopped