|
- #!/usr/bin/env python3
- # -*- coding: utf-8; mode: python; tab-width: 4; indent-tabs-mode: nil -*-
- # vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 fileencoding=utf-8
-
- import sys
- import asyncio
- import time
- import threading
-
- class AsyncSocketService:
- def __init__(self, address='127.0.0.1', port=5000,
- bufsize: int = 10240,
- required_pkg_size: int = 0, debug: bool=False):
- self.__addr = address
- self.__port = port
- self.__bufsize = bufsize
- self.__required_pkg_size = required_pkg_size
- self.__debug = debug
-
- async def handle_accept(self, reader, writer):
- while True:
- async def read_all():
- cached_data = bytes()
- while True:
- cdata = await reader.read(self.__bufsize)
-
- if len(cdata) <= 0:
- break
-
- cached_data += cdata
-
- if cached_data == b'exit':
- break
-
- if self.__required_pkg_size > len(cached_data):
- continue
- else:
- break
-
- return cached_data
-
- data = await read_all()
-
- if not data or data == b'exit':
- break
-
- if self.__debug:
- client = writer.get_extra_info('peername')
- print('Received from {}: {!r}'.format(client, data))
- # Do process
- writer.write(data)
-
- await writer.drain()
-
- print('Close the client socket')
- writer.close()
- self.__loop.call_soon_threadsafe(self.__loop.stop)
-
- def run_service(self, loop: asyncio.BaseEventLoop):
- self.__loop = loop
- asyncio.set_event_loop(loop)
-
- server_coro = asyncio.start_server(self.handle_accept, self.__addr, self.__port,
- loop=loop)
- server = loop.run_until_complete(server_coro)
-
- host = server.sockets[0].getsockname()
- print('Serving on {}. Hit CTRL-C to stop.'.format(host))
- try:
- loop.run_forever()
- except KeyboardInterrupt:
- print("CTRL+C")
-
- print('Server shutting down.')
- server.close()
- loop.run_until_complete(server.wait_closed())
- loop.close()
-
- def start_service(self):
- start_msg = self.start_msg()
- loop = asyncio.new_event_loop()
- t = threading.Thread(target=self.run_service, args=(loop,))
- t.start()
-
- if __name__ == '__main__':
- s1 = AsyncSocketService(address='127.0.0.1', port=5000, required_pkg_size=100)
- s2 = AsyncSocketService(address='127.0.0.1', port=6000, required_pkg_size=100)
- s1.start_service()
- s2.start_service()
-
- while True:
- try:
- time.sleep(1.0)
- except:
- break
|