Source code for soma.qt_gui.io

# -*- coding: utf-8 -*-
#  This software and supporting documentation are distributed by
#      Institut Federatif de Recherche 49
#      CEA/NeuroSpin, Batiment 145,
#      91191 Gif-sur-Yvette cedex
#      France
#
# This software is governed by the CeCILL license version 2 under
# French law and abiding by the rules of distribution of free software.
# You can  use, modify and/or redistribute the software under the
# terms of the CeCILL license version 2 as circulated by CEA, CNRS
# and INRIA at the following URL "http://www.cecill.info".
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability.
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or
# data to be ensured and,  more generally, to use and operate it in the
# same conditions as regards security.
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license version 2 and that you accept its terms.

"""
Utils for socket communication
"""
from __future__ import print_function

from __future__ import absolute_import
import threading
import socket
import errno
import time
import sys

import six
import six.moves.queue as queue

from soma.qt_gui.qt_backend.QtCore import QObject, QSocketNotifier


[docs]class Socket(QObject): """ Opens a connection to a socket server and provides methods to read from and write to socket streams. To handle specific message format, redefine readMessage method. By default it reads a line on the socket. To process messages, redefine processMessage method. By default it prints the message to standard output. Attributes ---------- dest: string socket server machine port: int port that the socket server listens socket: socket.socket python socket object socketnotifier: qt.QSocketNotifier the notifier sends a signal when there's something to read on the socket. readLock: threading.Lock lock to prevent threads from reading at the same time on the socket writeLock: threading.Lock lock to prevent threads from writing at the same time on the socket lock: threading.RLock lock to prevent concurrent access on object data because it can be used by multiple threads. At least principal thread and reading messages thread when it not possible to use a QSocketNotifier. initialized: bool indicates if the connection is correctly opened notifyenabled: bool indicates if message received on the socket must be processed. loopRetry: int class attribute: max number of connection tries defaultPort: int class attribute: default port for socket server """ loopRetry = 60 # Retry to connect 60 times (1 minute) defaultPort = 50007 def __init__(self, host, port=None): """ Parameters ---------- host: string socket server machine (localhost if it is current machine) port: int port that the socket server listens """ super(Socket, self).__init__() self.dest = host self.port = port self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socketnotifier = None # We are in a multi threaded environnement, so only one thread can # use the socket at the same time self.readLock = threading.Lock() self.writeLock = threading.Lock() self.lock = threading.RLock() self.initialized = 0 self.notifyenabled = 1
[docs] def initialize(self, port=None): """ Connects the socket to the server and sets the frame to read data from the socket. Two methods for reading: - reading thread - QSocketNotifier (not possible on windows platform) Returns ------- port: int port that the socket server listens """ if self.initialized: return if port is not None: self.port = port # We have to retry connecting because it can take time for the socket # server to start i = 0 while i < Socket.loopRetry: # using socket.htons to convert port number is useless # because Python take care of this res = self.socket.connect_ex((self.dest, self.port)) if res != 0: # retry to connect time.sleep(1) self.socket.close() self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) i = i + 1 else: # Connection successful, exit from the while loop break if findPlatform() == 'windows': # on windows, QSocketNotifier seems to make the socket fail self.usethread = 1 else: self.usethread = 1 # 0 # QSocketNotifier seems to create problems when it is not used in qt main thread (application freeze) # sets the frame to read from the socket if self.usethread: # reading thread self.readthread = threading.Thread(target=self.readForever) self.readthread.setDaemon(True) # don't block when exiting program self._messages = queue.Queue(500) self.socket.setblocking(0) self.initialized = 1 self.readthread.start() else: # QSocketNotifier that signals that data is enable for reading self.socketnotifier = QSocketNotifier( self.socket.fileno(), QSocketNotifier.Read) # , self ) self.socketnotifier.activated.connect(self.messageHandler) self.socketnotifier.setEnabled(True) self.initialized = 1
[docs] def messageHandler(self): """ This method is called when a message is received on the socket. Gets and processes the message. """ excep = None msg = None try: msg = self.getMessage() except Exception as e: excep = e self.processMessage(msg, excep)
[docs] def getMessage(self, timeout=30): ''' Called by message handler to gets the last received message. If socket communication is managed by a QSocketNotifier, the message is not yet read, so this method read it. If reading is in another process, the message is already read and it is in messages queue. ''' if not self.initialized: return '', '' if self.usethread: # block until a message arrives in queue self.lock.acquire() try: try: msg = self._messages.get(True, timeout) except queue.Empty as e: raise IOError( errno.ETIMEDOUT, 'socket communication timed out') finally: self.lock.release() else: try: msg = self.readMessage(timeout) except IOError as e: if e.errno == errno.EPIPE: # The socket has been closed # Return to avoid infinite loop self.close() raise return msg
[docs] def processMessage(self, msg, excep): """ Processes a message received on the socket. This method only print the message. To do some specific treatment, subclass Socket and redefine this method. """ print("message received :", msg, excep)
[docs] def send(self, msg): """ Sends data to the connected socket. @type msg: string @param msg: the message to send. """ if not self.initialized: return totalsent = 0 msglen = len(msg) # Send the message atomically (send is not thread-safe) close = False self.writeLock.acquire() # sendall() only appeared in python 2.1.12 # self.socket.sendall( msg ) if hasattr(msg, 'encode'): # encode to bytes (python3) msg = msg.encode() n = 0 try: while n < msglen: try: n += self.socket.send(msg[n:]) except socket.error as e: if e.errno == errno.EWOULDBLOCK: time.sleep(0.02) else: close = True break finally: self.writeLock.release() if close: self.close()
[docs] def readForever(self): """ Reading loop to handle reading messages from the socket. Used only if QSocketNotifier cannot be used. """ while self.initialized: try: msg = self.readMessage() self.lock.acquire() try: self._messages.put(msg) finally: self.lock.release() if self.notifyenabled: self.messageHandler() except IOError as e: if e.errno != errno.ETIMEDOUT: self.close() # raise if not self.initialized: break
[docs] def readLine(self, timeout): """ Reads a line of data from the socket (a string followed by ``'\\n'``). self.readLock must be acquired before calling this method. If data cannot be read before timeout (in seconds), an IOError exception is raised. Returns ------- timeout: int max time to wait before reading the message. """ msg = b'' char = b'' waitedTime = 0 # Receive the message atomically while char != b'\n': if char != b'': msg += char try: char = self.socket.recv(1) waitedTime = 0 if char == b'\0' or char == b'': e = IOError( errno.EPIPE, 'socket communication interrupted') raise e except socket.error as e: if e.errno == errno.EWOULDBLOCK: char = b'' time.sleep(0.02) waitedTime += 0.02 if waitedTime >= timeout: raise IOError( errno.ETIMEDOUT, 'socket communication timed out') else: raise IOError( errno.EPIPE, 'socket communication interrupted') return six.ensure_str(msg)
[docs] def readMessage(self, timeout=30): """ Reads a message from the socket. This method only gets the readlock and reads a line. To read specific message formats, subclass Socket and redefine this method. Parameters ---------- timeout: int max time to wait before reading the message. Returns ------- message: string the message received from the socket """ self.readLock.acquire() try: msg = self.readLine(timeout) finally: self.readLock.release() return msg
[docs] def findFreePort(self): """ Try to find the first unused port. @rtype: int @return: numero of a free port """ if self.port is not None: startport = self.port else: startport = Socket.defaultPort res = 0 tmpsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) while res == 0: res = tmpsocket.connect_ex((self.dest, startport)) tmpsocket.close() if res != 0: return startport tmpsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) startport += 1
[docs] def disableHandler(self): """Disables reading from the socket.""" self.notifyenabled = 0 if self.socketnotifier is not None: self.socketnotifier.setEnabled(0)
[docs] def enableHandler(self): """Enables reading from the socket.""" self.notifyenabled = 1 if self.socketnotifier is not None: self.socketnotifier.setEnabled(1)
def __del__(self): """ Closes the connection when the object is deleted """ self.close()
[docs] def close(self): """ Closes the connection. """ if not self.initialized: return self.initialized = 0 self.disableHandler() self.socketnotifier = None self.socket.close() if self.usethread and self.readthread != threading.currentThread(): self.readthread.join()
[docs]def findPlatform(): """ Identify platform, possible return values are : - 'windows': Windows - 'linux': Linux - 'sunos': SunOS (Solaris) - 'darwin': Darwin (MacOS X) - None: unknown """ if sys.platform[:3] == "win": return 'windows' else: if sys.platform.find('linux') != -1: return 'linux' if sys.platform.find('sunos') != -1: return 'sunos' if sys.platform.find('darwin') != -1: return 'darwin' if sys.platform.find('irix') != -1: return 'irix' return sys.platform