diff --git a/src/client.py b/src/client.py index 741d968..4f50a41 100644 --- a/src/client.py +++ b/src/client.py @@ -2,40 +2,28 @@ import asyncio import collections import logging import signal +import sys -class Server: - def __init__(self, host='localhost', input_port=3001, output_port=3002, +class Client: + def __init__(self, host='localhost', port=3001, buffer_chunk_size=10**4, buffer_length_limit=10**4): self._host = host - self._input_port = input_port - self._output_port = output_port - self._reader = None - self._writer = None + self._port = port self._stopping = False self.buffer = collections.deque() # Shared queue of bytes self._buffer_chunk_size = buffer_chunk_size # How many bytes per chunk self._buffer_length_limit = buffer_length_limit # How many chunks in buffer + self._file_path = None + self._working = False @property def host(self) -> str: return self._host @property - def input_port(self) -> int: - return self._input_port - - @property - def output_port(self) -> int: - return self._output_port - - @property - def reader(self) -> asyncio.StreamReader: - return self._reader - - @property - def writer(self) -> asyncio.StreamWriter: - return self._writer + def port(self) -> int: + return self._port @property def stopping(self) -> bool: @@ -49,63 +37,51 @@ class Server: def buffer_chunk_size(self) -> int: return self._buffer_chunk_size - def set_reader(self, reader=None, writer=None): - self._reader = reader - _ = writer + @property + def file_path(self) -> str: + return self._file_path - def set_writer(self, reader=None, writer=None): - _ = reader - self._writer = writer + @property + def working(self) -> bool: + return self._working - async def run_server(self): - logging.info("===== Started server...") - await asyncio.start_server(client_connected_cb=self.set_reader, host=self.host, port=self.input_port) - await asyncio.start_server(client_connected_cb=self.set_writer, host=self.host, port=self.output_port) - while (not self.stopping) and (self.reader is None or self.writer is None): - logging.info("==... waiting for connections...==") - await asyncio.sleep(.5) - await asyncio.gather( - self.run_reader(), - self.run_writer() - ) - logging.info("... stopped server. =====") - return + async def run_sending_client(self, file_path='~/output.txt'): + self._file_path = file_path + _, writer = await asyncio.open_connection(host=self.host, port=self.port) + await self.send(writer=writer) - async def run_reader(self): - while not self.stopping: - try: - # Stop if buffer is full - while len(self.buffer) >= self.buffer_length_limit: - await asyncio.sleep(1) - continue + async def send(self, writer: asyncio.StreamWriter): + self._working = True + with open(self.file_path, 'rb') as file_to_send: + while not self.stopping: + output_data = file_to_send.read(self.buffer_chunk_size) + writer.write(output_data) + await writer.drain() + + async def run_receiving_client(self, file_path='~/input.txt'): + self._file_path = file_path + reader, _ = await asyncio.open_connection(host=self.host, port=self.port) + await self.receive(reader=reader) + + async def receive(self, reader: asyncio.StreamReader): + self._working = True + with open(self.file_path, 'wb') as file_to_receive: + while not self.stopping: try: - input_data = await self.reader.readexactly(self.buffer_chunk_size) + input_data = await reader.readexactly(self.buffer_chunk_size) except asyncio.IncompleteReadError as e: input_data = e.partial - self.buffer.append(input_data) - except Exception as e: - logging.error(e) - - async def run_writer(self): - while not self.stopping: - try: - # Slow down if buffer is short - if len(self.buffer) < 3: - await asyncio.sleep(.1) - try: - input_data = self.buffer.popleft() - except IndexError: - continue - self.writer.write(input_data) - await self.writer.drain() - except Exception as e: - logging.error(e) + file_to_receive.write(input_data) def stop(self, *_): - self._stopping = True + if self.working: + logging.info("Received interruption signal, stopping...") + self._stopping = True + else: + raise KeyboardInterrupt("Not working yet...") -if 1 or __name__ == '__main__': +if __name__ == '__main__': log_formatter = logging.Formatter( "%(asctime)s [%(module)-15s %(levelname)-8s] %(message)s", style='%' @@ -118,13 +94,31 @@ if 1 or __name__ == '__main__': console_handler.setLevel(logging.DEBUG) root_logger.addHandler(console_handler) - loop = asyncio.get_event_loop() - server = Server( - input_port=0, - output_port=0 + if len(sys.argv) > 1: + action = sys.argv[1] + else: + action = input("Do you want to (R)eceive or (S)end a file?\t\t") + + action = ( + 'send' + if action.lower() == 's' + else 'receive' ) - loop.add_signal_handler(signal.SIGINT, server.stop, loop) - logging.info("Starting file bridging server...") - loop.run_until_complete(server.run_server()) - logging.info("Received KeyboardInterrupt, stopping...") + if len(sys.argv) > 2: + file_path = sys.argv[2] + else: + file_path = input(f"Enter file to {action}:\t\t\t\t\t\t") + + loop = asyncio.get_event_loop() + client = Client( + host='127.0.0.1', + port=5000, + ) + loop.add_signal_handler(signal.SIGINT, client.stop, loop) + logging.info("Starting client...") + if action.lower() == 'send': + loop.run_until_complete(client.run_sending_client(file_path=file_path)) + else: + loop.run_until_complete(client.run_receiving_client(file_path=file_path)) loop.close() + logging.info("Stopped server") diff --git a/src/server.py b/src/server.py index 516627a..bddcaa0 100644 --- a/src/server.py +++ b/src/server.py @@ -5,37 +5,23 @@ import signal class Server: - def __init__(self, host='localhost', input_port=3001, output_port=3002, + def __init__(self, host='localhost', port=3001, buffer_chunk_size=10**4, buffer_length_limit=10**4): self._host = host - self._input_port = input_port - self._output_port = output_port - self._reader = None - self._writer = None + self._port = port self._stopping = False self.buffer = collections.deque() # Shared queue of bytes self._buffer_chunk_size = buffer_chunk_size # How many bytes per chunk self._buffer_length_limit = buffer_length_limit # How many chunks in buffer + self._working = False @property def host(self) -> str: return self._host @property - def input_port(self) -> int: - return self._input_port - - @property - def output_port(self) -> int: - return self._output_port - - @property - def reader(self) -> asyncio.StreamReader: - return self._reader - - @property - def writer(self) -> asyncio.StreamWriter: - return self._writer + def port(self) -> int: + return self._port @property def stopping(self) -> bool: @@ -49,49 +35,11 @@ class Server: def buffer_chunk_size(self) -> int: return self._buffer_chunk_size - def set_reader(self, reader=None, writer=None): - self._reader = reader - _ = writer + @property + def working(self) -> bool: + return self._working - def set_writer(self, reader=None, writer=None): - _ = reader - self._writer = writer - - async def run_server(self): - logging.info("===== Started server...") - if 0: # loop.create_connection method - reader, _ = await loop.create_connection(asyncio.Transport, host=self.host, port=self.input_port) - self._reader = reader - _, writer = await loop.create_connection(asyncio.Transport, host=self.host, port=self.output_port) - self._writer = writer - elif 1: # asyncio.open_connection method - reader, _ = await asyncio.open_connection(self.host, self.input_port) - self._reader = reader - _, writer = await asyncio.open_connection(self.host, self.output_port) - self._writer = writer - else: # asyncio.start_server method - await asyncio.start_server(client_connected_cb=self.set_reader, host=self.host, port=self.input_port) - await asyncio.start_server(client_connected_cb=self.set_writer, host=self.host, port=self.output_port) - while (not self.stopping) and (self.reader is None or self.writer is None): - logging.info("==... waiting for connections...==") - await asyncio.sleep(.5) - await asyncio.gather( - self.run_writer(), - self.run_reader() - ) - logging.info("... stopped server. =====") - return - - async def test_client(self): - logging.info("===== Started client test...") - await asyncio.gather( - self.send('/home/davte/Provaaaaa.txt'), - self.receive('/home/davte/Ricevuto.log') - ) - logging.info("... stopped client test. =====") - return - - async def run_reader(self): + async def run_reader(self, reader): while not self.stopping: try: # Stop if buffer is full @@ -99,14 +47,14 @@ class Server: await asyncio.sleep(1) continue try: - input_data = await self.reader.readexactly(self.buffer_chunk_size) + input_data = await reader.readexactly(self.buffer_chunk_size) except asyncio.IncompleteReadError as e: input_data = e.partial self.buffer.append(input_data) except Exception as e: logging.error(e) - async def run_writer(self): + async def run_writer(self, writer): while not self.stopping: try: # Slow down if buffer is short @@ -116,41 +64,32 @@ class Server: input_data = self.buffer.popleft() except IndexError: continue - self.writer.write(input_data) - await self.writer.drain() + writer.write(input_data) + await writer.drain() except Exception as e: logging.error(e) + async def forward_bytes(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + self._working = True + asyncio.ensure_future(self.run_reader(reader=reader)) + asyncio.ensure_future(self.run_writer(writer=writer)) + + async def run_server(self): + reader_server = await asyncio.start_server(client_connected_cb=self.forward_bytes, + host=self.host, port=self.port) + async with reader_server: + await reader_server.serve_forever() + return + def stop(self, *_): - self._stopping = True - - async def send(self, file_path): - await asyncio.start_server(client_connected_cb=self.set_writer, host='5.249.159.33', port='5000') - while (not self.stopping) and self.writer is None: - logging.info("==... waiting for connections...==") - await asyncio.sleep(.5) - with open(file_path, 'rb') as file_to_send: - output_data = file_to_send.read(self.buffer_chunk_size) - self.writer.write(output_data) - await self.writer.drain() - - async def receive(self, file_path): - await asyncio.start_server(client_connected_cb=self.set_reader, host='5.249.159.33', port='5001') - while (not self.stopping) and self.reader is None: - logging.info("==... waiting for connections...==") - await asyncio.sleep(.5) - with open(file_path, 'wb') as file_to_receive: - try: - input_data = await self.reader.readexactly(self.buffer_chunk_size) - except asyncio.IncompleteReadError as e: - input_data = e.partial - file_to_receive.write(input_data) + if self.working: + logging.info("Received interruption signal, stopping...") + self._stopping = True + else: + raise KeyboardInterrupt("Not working yet...") if __name__ == '__main__': - print("https://stackoverflow.com/questions/48023911/aiohttp-error-connect-call-failed-for-multiple-async-requests" - "-to-localhost") - exit() log_formatter = logging.Formatter( "%(asctime)s [%(module)-15s %(levelname)-8s] %(message)s", style='%' @@ -165,11 +104,11 @@ if __name__ == '__main__': loop = asyncio.get_event_loop() server = Server( - input_port=5000, - output_port=5001 + host='127.0.0.1', + port=5000, ) loop.add_signal_handler(signal.SIGINT, server.stop, loop) logging.info("Starting file bridging server...") loop.run_until_complete(server.run_server()) - logging.info("Received KeyboardInterrupt, stopping...") loop.close() + logging.info("Stopped server")