From cab7527c634835171c87f82a8e7b6794c8f4ae1c Mon Sep 17 00:00:00 2001 From: Davte Date: Thu, 26 Mar 2020 14:49:17 +0100 Subject: [PATCH] Working on server but getting errors ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 5000) --- src/client.py | 130 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/server.py | 83 ++++++++++++++++++++++++++++---- 2 files changed, 205 insertions(+), 8 deletions(-) diff --git a/src/client.py b/src/client.py index e69de29..741d968 100644 --- a/src/client.py +++ b/src/client.py @@ -0,0 +1,130 @@ +import asyncio +import collections +import logging +import signal + + +class Server: + def __init__(self, host='localhost', input_port=3001, output_port=3002, + buffer_chunk_size=10**4, buffer_length_limit=10**4): + self._host = host + self._input_port = input_port + self._output_port = output_port + self._reader = None + self._writer = None + self._stopping = False + self.buffer = collections.deque() # Shared queue of bytes + self._buffer_chunk_size = buffer_chunk_size # How many bytes per chunk + self._buffer_length_limit = buffer_length_limit # How many chunks in buffer + + @property + def host(self) -> str: + return self._host + + @property + def input_port(self) -> int: + return self._input_port + + @property + def output_port(self) -> int: + return self._output_port + + @property + def reader(self) -> asyncio.StreamReader: + return self._reader + + @property + def writer(self) -> asyncio.StreamWriter: + return self._writer + + @property + def stopping(self) -> bool: + return self._stopping + + @property + def buffer_length_limit(self) -> int: + return self._buffer_length_limit + + @property + def buffer_chunk_size(self) -> int: + return self._buffer_chunk_size + + def set_reader(self, reader=None, writer=None): + self._reader = reader + _ = writer + + def set_writer(self, reader=None, writer=None): + _ = reader + self._writer = writer + + async def run_server(self): + logging.info("===== Started server...") + await asyncio.start_server(client_connected_cb=self.set_reader, host=self.host, port=self.input_port) + await asyncio.start_server(client_connected_cb=self.set_writer, host=self.host, port=self.output_port) + while (not self.stopping) and (self.reader is None or self.writer is None): + logging.info("==... waiting for connections...==") + await asyncio.sleep(.5) + await asyncio.gather( + self.run_reader(), + self.run_writer() + ) + logging.info("... stopped server. =====") + return + + async def run_reader(self): + while not self.stopping: + try: + # Stop if buffer is full + while len(self.buffer) >= self.buffer_length_limit: + await asyncio.sleep(1) + continue + try: + input_data = await self.reader.readexactly(self.buffer_chunk_size) + except asyncio.IncompleteReadError as e: + input_data = e.partial + self.buffer.append(input_data) + except Exception as e: + logging.error(e) + + async def run_writer(self): + while not self.stopping: + try: + # Slow down if buffer is short + if len(self.buffer) < 3: + await asyncio.sleep(.1) + try: + input_data = self.buffer.popleft() + except IndexError: + continue + self.writer.write(input_data) + await self.writer.drain() + except Exception as e: + logging.error(e) + + def stop(self, *_): + self._stopping = True + + +if 1 or __name__ == '__main__': + log_formatter = logging.Formatter( + "%(asctime)s [%(module)-15s %(levelname)-8s] %(message)s", + style='%' + ) + root_logger = logging.getLogger() + root_logger.setLevel(logging.DEBUG) + + console_handler = logging.StreamHandler() + console_handler.setFormatter(log_formatter) + console_handler.setLevel(logging.DEBUG) + root_logger.addHandler(console_handler) + + loop = asyncio.get_event_loop() + server = Server( + input_port=0, + output_port=0 + ) + loop.add_signal_handler(signal.SIGINT, server.stop, loop) + logging.info("Starting file bridging server...") + loop.run_until_complete(server.run_server()) + logging.info("Received KeyboardInterrupt, stopping...") + loop.close() diff --git a/src/server.py b/src/server.py index 1b12282..3dfa6b9 100644 --- a/src/server.py +++ b/src/server.py @@ -49,18 +49,49 @@ class Server: def buffer_chunk_size(self) -> int: return self._buffer_chunk_size + def set_reader(self, reader=None, writer=None): + self._reader = reader + _ = writer + + def set_writer(self, reader=None, writer=None): + _ = reader + self._writer = writer + async def run_server(self): logging.info("===== Started server...") + if 0: # loop.create_connection method (search about Protocol?) + reader, _ = await loop.create_connection(host=self.host, port=self.input_port) + self._reader = reader + _, writer = await loop.create_connection(host=self.host, port=self.output_port) + self._writer = writer + elif 1: # asyncio.open_connection method + reader, _ = await asyncio.open_connection(self.host, self.input_port) + self._reader = reader + _, writer = await asyncio.open_connection(self.host, self.output_port) + self._writer = writer + else: # asyncio.start_server method + await asyncio.start_server(client_connected_cb=self.set_reader, host=self.host, port=self.input_port) + await asyncio.start_server(client_connected_cb=self.set_writer, host=self.host, port=self.output_port) + while (not self.stopping) and (self.reader is None or self.writer is None): + logging.info("==... waiting for connections...==") + await asyncio.sleep(.5) await asyncio.gather( - self.run_reader(), - self.run_writer() + self.run_writer(), + self.run_reader() ) logging.info("... stopped server. =====") return + async def test_client(self): + logging.info("===== Started client test...") + await asyncio.gather( + self.send('/home/davte/Provaaaaa.txt'), + self.receive('/home/davte/Ricevuto.log') + ) + logging.info("... stopped client test. =====") + return + async def run_reader(self): - reader, _ = await asyncio.open_connection(self.host, self.input_port) - self._reader = reader while not self.stopping: try: # Stop if buffer is full @@ -76,8 +107,6 @@ class Server: logging.error(e) async def run_writer(self): - _, writer = await asyncio.open_connection(self.host, self.output_port) - self._writer = writer while not self.stopping: try: # Slow down if buffer is short @@ -88,16 +117,54 @@ class Server: except IndexError: continue self.writer.write(input_data) + await self.writer.drain() except Exception as e: logging.error(e) - def stop(self): + def stop(self, *_): self._stopping = True + async def send(self, file_path): + await asyncio.start_server(client_connected_cb=self.set_writer, host='5.249.159.33', port='5000') + while (not self.stopping) and self.writer is None: + logging.info("==... waiting for connections...==") + await asyncio.sleep(.5) + with open(file_path, 'rb') as file_to_send: + output_data = file_to_send.read(self.buffer_chunk_size) + self.writer.write(output_data) + await self.writer.drain() + + async def receive(self, file_path): + await asyncio.start_server(client_connected_cb=self.set_reader, host='5.249.159.33', port='5001') + while (not self.stopping) and self.reader is None: + logging.info("==... waiting for connections...==") + await asyncio.sleep(.5) + with open(file_path, 'wb') as file_to_receive: + try: + input_data = await self.reader.readexactly(self.buffer_chunk_size) + except asyncio.IncompleteReadError as e: + input_data = e.partial + file_to_receive.write(input_data) + if __name__ == '__main__': + log_formatter = logging.Formatter( + "%(asctime)s [%(module)-15s %(levelname)-8s] %(message)s", + style='%' + ) + root_logger = logging.getLogger() + root_logger.setLevel(logging.DEBUG) + + console_handler = logging.StreamHandler() + console_handler.setFormatter(log_formatter) + console_handler.setLevel(logging.DEBUG) + root_logger.addHandler(console_handler) + loop = asyncio.get_event_loop() - server = Server() + server = Server( + input_port=5000, + output_port=5001 + ) loop.add_signal_handler(signal.SIGINT, server.stop, loop) logging.info("Starting file bridging server...") loop.run_until_complete(server.run_server())