Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Revert "gh-93357: Start porting asyncio server test cases to Isolated…
…AsyncioTestCase (#93369)"

This reverts commit ce8fc18.
  • Loading branch information
arhadthedev committed Oct 7, 2022
commit c17e93019d32d3b306a677a60f226e077d7a4a4c
292 changes: 173 additions & 119 deletions Lib/test/test_asyncio/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,10 +566,46 @@ def test_exception_cancel(self):
test_utils.run_briefly(self.loop)
self.assertIs(stream._waiter, None)


class NewStreamTests(unittest.IsolatedAsyncioTestCase):

async def test_start_server(self):
def test_start_server(self):

class MyServer:

def __init__(self, loop):
self.server = None
self.loop = loop

async def handle_client(self, client_reader, client_writer):
data = await client_reader.readline()
client_writer.write(data)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()

def start(self):
sock = socket.create_server(('127.0.0.1', 0))
self.server = self.loop.run_until_complete(
asyncio.start_server(self.handle_client,
sock=sock))
return sock.getsockname()

def handle_client_callback(self, client_reader, client_writer):
self.loop.create_task(self.handle_client(client_reader,
client_writer))

def start_callback(self):
sock = socket.create_server(('127.0.0.1', 0))
addr = sock.getsockname()
sock.close()
self.server = self.loop.run_until_complete(
asyncio.start_server(self.handle_client_callback,
host=addr[0], port=addr[1]))
return addr

def stop(self):
if self.server is not None:
self.server.close()
self.loop.run_until_complete(self.server.wait_closed())
self.server = None

async def client(addr):
reader, writer = await asyncio.open_connection(*addr)
Expand All @@ -581,43 +617,61 @@ async def client(addr):
await writer.wait_closed()
return msgback

async def handle_client(client_reader, client_writer):
data = await client_reader.readline()
client_writer.write(data)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()

with self.subTest(msg="coroutine"):
server = await asyncio.start_server(
handle_client,
host=socket_helper.HOSTv4
)
addr = server.sockets[0].getsockname()
msg = await client(addr)
server.close()
await server.wait_closed()
self.assertEqual(msg, b"hello world!\n")
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))

with self.subTest(msg="callback"):
async def handle_client_callback(client_reader, client_writer):
asyncio.get_running_loop().create_task(
handle_client(client_reader, client_writer)
)
# test the server variant with a coroutine as client handler
server = MyServer(self.loop)
addr = server.start()
msg = self.loop.run_until_complete(self.loop.create_task(client(addr)))
server.stop()
self.assertEqual(msg, b"hello world!\n")

server = await asyncio.start_server(
handle_client_callback,
host=socket_helper.HOSTv4
)
addr = server.sockets[0].getsockname()
reader, writer = await asyncio.open_connection(*addr)
msg = await client(addr)
server.close()
await server.wait_closed()
self.assertEqual(msg, b"hello world!\n")
# test the server variant with a callback as client handler
server = MyServer(self.loop)
addr = server.start_callback()
msg = self.loop.run_until_complete(self.loop.create_task(client(addr)))
server.stop()
self.assertEqual(msg, b"hello world!\n")

self.assertEqual(messages, [])

@socket_helper.skip_unless_bind_unix_socket
async def test_start_unix_server(self):
def test_start_unix_server(self):

class MyServer:

def __init__(self, loop, path):
self.server = None
self.loop = loop
self.path = path

async def handle_client(self, client_reader, client_writer):
data = await client_reader.readline()
client_writer.write(data)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()

def start(self):
self.server = self.loop.run_until_complete(
asyncio.start_unix_server(self.handle_client,
path=self.path))

def handle_client_callback(self, client_reader, client_writer):
self.loop.create_task(self.handle_client(client_reader,
client_writer))

def start_callback(self):
start = asyncio.start_unix_server(self.handle_client_callback,
path=self.path)
self.server = self.loop.run_until_complete(start)

def stop(self):
if self.server is not None:
self.server.close()
self.loop.run_until_complete(self.server.wait_closed())
self.server = None

async def client(path):
reader, writer = await asyncio.open_unix_connection(path)
Expand All @@ -629,42 +683,64 @@ async def client(path):
await writer.wait_closed()
return msgback

async def handle_client(client_reader, client_writer):
data = await client_reader.readline()
client_writer.write(data)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()

with self.subTest(msg="coroutine"):
with test_utils.unix_socket_path() as path:
server = await asyncio.start_unix_server(
handle_client,
path=path
)
msg = await client(path)
server.close()
await server.wait_closed()
self.assertEqual(msg, b"hello world!\n")

with self.subTest(msg="callback"):
async def handle_client_callback(client_reader, client_writer):
asyncio.get_running_loop().create_task(
handle_client(client_reader, client_writer)
)

with test_utils.unix_socket_path() as path:
server = await asyncio.start_unix_server(
handle_client_callback,
path=path
)
msg = await client(path)
server.close()
await server.wait_closed()
self.assertEqual(msg, b"hello world!\n")
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))

# test the server variant with a coroutine as client handler
with test_utils.unix_socket_path() as path:
server = MyServer(self.loop, path)
server.start()
msg = self.loop.run_until_complete(
self.loop.create_task(client(path)))
server.stop()
self.assertEqual(msg, b"hello world!\n")

# test the server variant with a callback as client handler
with test_utils.unix_socket_path() as path:
server = MyServer(self.loop, path)
server.start_callback()
msg = self.loop.run_until_complete(
self.loop.create_task(client(path)))
server.stop()
self.assertEqual(msg, b"hello world!\n")

self.assertEqual(messages, [])

@unittest.skipIf(ssl is None, 'No ssl module')
async def test_start_tls(self):
def test_start_tls(self):

class MyServer:

def __init__(self, loop):
self.server = None
self.loop = loop

async def handle_client(self, client_reader, client_writer):
data1 = await client_reader.readline()
client_writer.write(data1)
await client_writer.drain()
assert client_writer.get_extra_info('sslcontext') is None
await client_writer.start_tls(
test_utils.simple_server_sslcontext())
assert client_writer.get_extra_info('sslcontext') is not None
data2 = await client_reader.readline()
client_writer.write(data2)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()

def start(self):
sock = socket.create_server(('127.0.0.1', 0))
self.server = self.loop.run_until_complete(
asyncio.start_server(self.handle_client,
sock=sock))
return sock.getsockname()

def stop(self):
if self.server is not None:
self.server.close()
self.loop.run_until_complete(self.server.wait_closed())
self.server = None

async def client(addr):
reader, writer = await asyncio.open_connection(*addr)
Expand All @@ -681,48 +757,17 @@ async def client(addr):
await writer.wait_closed()
return msgback1, msgback2

async def handle_client(client_reader, client_writer):
data1 = await client_reader.readline()
client_writer.write(data1)
await client_writer.drain()
assert client_writer.get_extra_info('sslcontext') is None
await client_writer.start_tls(
test_utils.simple_server_sslcontext())
assert client_writer.get_extra_info('sslcontext') is not None

data2 = await client_reader.readline()
client_writer.write(data2)
await client_writer.drain()
client_writer.close()
await client_writer.wait_closed()

server = await asyncio.start_server(
handle_client,
host=socket_helper.HOSTv4
)
addr = server.sockets[0].getsockname()

msg1, msg2 = await client(addr)
server.close()
await server.wait_closed()
self.assertEqual(msg1, b"hello world 1!\n")
self.assertEqual(msg2, b"hello world 2!\n")


class StreamTests2(test_utils.TestCase):

def setUp(self):
super().setUp()
self.loop = asyncio.new_event_loop()
self.set_event_loop(self.loop)
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))

def tearDown(self):
# just in case if we have transport close callbacks
test_utils.run_briefly(self.loop)
server = MyServer(self.loop)
addr = server.start()
msg1, msg2 = self.loop.run_until_complete(client(addr))
server.stop()

self.loop.close()
gc.collect()
super().tearDown()
self.assertEqual(messages, [])
self.assertEqual(msg1, b"hello world 1!\n")
self.assertEqual(msg2, b"hello world 2!\n")

@unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
def test_read_all_from_pipe_reader(self):
Expand Down Expand Up @@ -941,20 +986,22 @@ def test_LimitOverrunError_pickleable(self):
self.assertEqual(str(e), str(e2))
self.assertEqual(e.consumed, e2.consumed)

async def test_wait_closed_on_close(self):
async with test_utils.run_test_server() as httpd:
def test_wait_closed_on_close(self):
with test_utils.run_test_server() as httpd:
rd, wr = self.loop.run_until_complete(
asyncio.open_connection(*httpd.address))

wr.write(b'GET / HTTP/1.0\r\n\r\n')
data = await rd.readline()
f = rd.readline()
data = self.loop.run_until_complete(f)
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
await rd.read()
f = rd.read()
data = self.loop.run_until_complete(f)
self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
self.assertFalse(wr.is_closing())
wr.close()
self.assertTrue(wr.is_closing())
await wr.wait_closed()
self.loop.run_until_complete(wr.wait_closed())

def test_wait_closed_on_close_with_unread_data(self):
with test_utils.run_test_server() as httpd:
Expand Down Expand Up @@ -1010,10 +1057,15 @@ async def inner(httpd):

self.assertEqual(messages, [])

async def test_eof_feed_when_closing_writer(self):
def test_eof_feed_when_closing_writer(self):
# See http://bugs.python.org/issue35065
async with test_utils.run_test_server() as httpd:
rd, wr = await asyncio.open_connection(*httpd.address)
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))

with test_utils.run_test_server() as httpd:
rd, wr = self.loop.run_until_complete(
asyncio.open_connection(*httpd.address))

wr.close()
f = wr.wait_closed()
self.loop.run_until_complete(f)
Expand All @@ -1022,6 +1074,8 @@ async def test_eof_feed_when_closing_writer(self):
data = self.loop.run_until_complete(f)
self.assertEqual(data, b'')

self.assertEqual(messages, [])


if __name__ == '__main__':
unittest.main()