IT WORKS!

This commit is contained in:
Davte 2020-03-28 20:54:41 +01:00
parent 69c06dc4dc
commit 248d4ccb88
2 changed files with 44 additions and 20 deletions

View File

@ -55,8 +55,12 @@ class Client:
with open(self.file_path, 'rb') as file_to_send: with open(self.file_path, 'rb') as file_to_send:
while not self.stopping: while not self.stopping:
output_data = file_to_send.read(self.buffer_chunk_size) output_data = file_to_send.read(self.buffer_chunk_size)
if not output_data:
break
writer.write(output_data) writer.write(output_data)
await writer.drain() await writer.drain()
writer.write_eof()
await writer.drain()
async def run_receiving_client(self, file_path='~/input.txt'): async def run_receiving_client(self, file_path='~/input.txt'):
self._file_path = file_path self._file_path = file_path
@ -68,9 +72,10 @@ class Client:
with open(self.file_path, 'wb') as file_to_receive: with open(self.file_path, 'wb') as file_to_receive:
while not self.stopping: while not self.stopping:
input_data = await reader.read(self.buffer_chunk_size) input_data = await reader.read(self.buffer_chunk_size)
if reader.at_eof():
break
if not input_data: if not input_data:
continue continue
print(input_data)
file_to_receive.write(input_data) file_to_receive.write(input_data)
def stop(self, *_): def stop(self, *_):
@ -112,7 +117,7 @@ if __name__ == '__main__':
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
client = Client( client = Client(
host='127.0.0.1', host='127.0.0.1',
port=5000, port=(5000 if action == 'send' else 5001),
) )
# loop.add_signal_handler(signal.SIGINT, client.stop, loop) # loop.add_signal_handler(signal.SIGINT, client.stop, loop)
logging.info("Starting client...") logging.info("Starting client...")

View File

@ -1,27 +1,32 @@
import asyncio import asyncio
import collections import collections
import logging import logging
import signal
class Server: class Server:
def __init__(self, host='localhost', port=3001, def __init__(self, host='localhost', input_port=5000, output_port=5001,
buffer_chunk_size=10**4, buffer_length_limit=10**4): buffer_chunk_size=10**4, buffer_length_limit=10**4):
self._host = host self._host = host
self._port = port self._input_port = input_port
self._output_port = output_port
self._stopping = False self._stopping = False
self.buffer = collections.deque() # Shared queue of bytes self.buffer = collections.deque() # Shared queue of bytes
self._buffer_chunk_size = buffer_chunk_size # How many bytes per chunk self._buffer_chunk_size = buffer_chunk_size # How many bytes per chunk
self._buffer_length_limit = buffer_length_limit # How many chunks in buffer self._buffer_length_limit = buffer_length_limit # How many chunks in buffer
self._working = False self._working = False
self.at_eof = False
@property @property
def host(self) -> str: def host(self) -> str:
return self._host return self._host
@property @property
def port(self) -> int: def input_port(self) -> int:
return self._port return self._input_port
@property
def output_port(self) -> int:
return self._output_port
@property @property
def stopping(self) -> bool: def stopping(self) -> bool:
@ -46,10 +51,9 @@ class Server:
while len(self.buffer) >= self.buffer_length_limit: while len(self.buffer) >= self.buffer_length_limit:
await asyncio.sleep(1) await asyncio.sleep(1)
continue continue
try: input_data = await reader.read(self.buffer_chunk_size)
input_data = await reader.readexactly(self.buffer_chunk_size) if reader.at_eof():
except asyncio.IncompleteReadError as e: self.at_eof = True
input_data = e.partial
self.buffer.append(input_data) self.buffer.append(input_data)
except Exception as e: except Exception as e:
logging.error(e) logging.error(e)
@ -63,26 +67,39 @@ class Server:
try: try:
input_data = self.buffer.popleft() input_data = self.buffer.popleft()
except IndexError: except IndexError:
continue if not self.at_eof:
continue
else:
writer.write_eof()
await writer.drain()
self.at_eof = False
break
writer.write(input_data) writer.write(input_data)
await writer.drain() await writer.drain()
except Exception as e: except Exception as e:
logging.error(e) logging.error(e)
async def forward_bytes(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): # noinspection PyUnusedLocal
async def handle_incoming_data(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
self._working = True self._working = True
asyncio.ensure_future(self.run_reader(reader=reader)) asyncio.ensure_future(self.run_reader(reader=reader))
# noinspection PyUnusedLocal
async def handle_outgoing_data(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
self._working = True
asyncio.ensure_future(self.run_writer(writer=writer)) asyncio.ensure_future(self.run_writer(writer=writer))
async def run_server(self): async def run_server(self):
reader_server = await asyncio.start_server(client_connected_cb=self.forward_bytes, reader_server = await asyncio.start_server(client_connected_cb=self.handle_incoming_data,
host=self.host, port=self.port) host=self.host, port=self.input_port)
await asyncio.start_server(client_connected_cb=self.handle_outgoing_data,
host=self.host, port=self.output_port)
async with reader_server: async with reader_server:
await reader_server.serve_forever() await reader_server.serve_forever()
return return
def stop(self, *_): def stop(self, *_):
if self.working: if self.working and not self.stopping:
logging.info("Received interruption signal, stopping...") logging.info("Received interruption signal, stopping...")
self._stopping = True self._stopping = True
else: else:
@ -105,10 +122,12 @@ if __name__ == '__main__':
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
server = Server( server = Server(
host='127.0.0.1', host='127.0.0.1',
port=5000, input_port=5000,
output_port=5001
) )
# loop.add_signal_handler(signal.SIGINT, server.stop, loop)
logging.info("Starting file bridging server...") logging.info("Starting file bridging server...")
loop.run_until_complete(server.run_server()) try:
loop.run_until_complete(server.run_server())
except KeyboardInterrupt:
logging.info("Stopping...")
loop.close() loop.close()
logging.info("Stopped server")