From 40dba845fd91d5e477f8bb3177aa7e4c27bd6575 Mon Sep 17 00:00:00 2001 From: Davte Date: Thu, 26 Mar 2020 12:32:01 +0100 Subject: [PATCH] Working on server --- src/server.py | 82 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 57 insertions(+), 25 deletions(-) diff --git a/src/server.py b/src/server.py index feb3810..1b12282 100644 --- a/src/server.py +++ b/src/server.py @@ -1,27 +1,32 @@ import asyncio +import collections import logging +import signal class Server: - def __init__(self, host='localhost', input_port=None, output_port=None): + def __init__(self, host='localhost', input_port=3001, output_port=3002, + buffer_chunk_size=10**4, buffer_length_limit=10**4): self._host = host self._input_port = input_port self._output_port = output_port self._reader = None self._writer = None self._stopping = False - self.buffer = [] # Shared list + self.buffer = collections.deque() # Shared queue of bytes + self._buffer_chunk_size = buffer_chunk_size # How many bytes per chunk + self._buffer_length_limit = buffer_length_limit # How many chunks in buffer @property - def host(self): + def host(self) -> str: return self._host @property - def input_port(self): + def input_port(self) -> int: return self._input_port @property - def output_port(self): + def output_port(self) -> int: return self._output_port @property @@ -29,45 +34,72 @@ class Server: return self._reader @property - def writer(self) -> asyncio.StreamReader: + def writer(self) -> asyncio.StreamWriter: return self._writer @property def stopping(self) -> bool: return self._stopping - async def setup_reader(self): - reader, _ = await asyncio.open_connection(self.host, self.input_port) - self._reader = reader + @property + def buffer_length_limit(self) -> int: + return self._buffer_length_limit - async def setup_writer(self): - _, writer = await asyncio.open_connection(self.host, self.output_port) - self._writer = writer + @property + def buffer_chunk_size(self) -> int: + return self._buffer_chunk_size async def run_server(self): - await self.setup_reader() + logging.info("===== Started server...") + await asyncio.gather( + self.run_reader(), + self.run_writer() + ) + logging.info("... stopped server. =====") + return + + async def run_reader(self): + reader, _ = await asyncio.open_connection(self.host, self.input_port) + self._reader = reader while not self.stopping: try: - while len(self.buffer) >= 10**4: + # Stop if buffer is full + while len(self.buffer) >= self.buffer_length_limit: await asyncio.sleep(1) + continue try: - input_data = await self.reader.readexactly(10**4) + input_data = await self.reader.readexactly(self.buffer_chunk_size) except asyncio.IncompleteReadError as e: input_data = e.partial self.buffer.append(input_data) except Exception as e: logging.error(e) + async def run_writer(self): + _, writer = await asyncio.open_connection(self.host, self.output_port) + self._writer = writer + while not self.stopping: + try: + # Slow down if buffer is short + if len(self.buffer) < 3: + await asyncio.sleep(.1) + try: + input_data = self.buffer.popleft() + except IndexError: + continue + self.writer.write(input_data) + except Exception as e: + logging.error(e) + + def stop(self): + self._stopping = True + if __name__ == '__main__': loop = asyncio.get_event_loop() - try: - logging.info("Starting file bridging server...") - # loop.run_until_complete( - # asyncio.gather( - # read_input_socket, - # write_on_output_socket - # ) - # ) - except KeyboardInterrupt(): - logging.info("Received KeyboardInterrupt, stopping...") + server = Server() + loop.add_signal_handler(signal.SIGINT, server.stop, loop) + logging.info("Starting file bridging server...") + loop.run_until_complete(server.run_server()) + logging.info("Received KeyboardInterrupt, stopping...") + loop.close()