|
8 | 8 |
|
9 | 9 | import socket |
10 | 10 | import warnings |
| 11 | +import collections |
11 | 12 |
|
12 | 13 | from . import base_events |
13 | 14 | from . import constants |
@@ -327,6 +328,102 @@ def _pipe_closed(self, fut): |
327 | 328 | self.close() |
328 | 329 |
|
329 | 330 |
|
| 331 | +class _ProactorDatagramTransport(_ProactorBasePipeTransport): |
| 332 | + |
| 333 | + def __init__(self, loop, sock, protocol, address=None, |
| 334 | + waiter=None, extra=None): |
| 335 | + super(_ProactorDatagramTransport, self).__init__(loop, sock, protocol, |
| 336 | + waiter=waiter, |
| 337 | + extra=extra) |
| 338 | + self._address = address |
| 339 | + # We don't need to call _protocol.connection_made() since our base |
| 340 | + # constructor does it for us. |
| 341 | + self._buffer = collections.deque() |
| 342 | + self._loop.call_soon(self._loop_reading) |
| 343 | + |
| 344 | + def abort(self): |
| 345 | + self._force_close(None) |
| 346 | + |
| 347 | + def sendto(self, data, addr=None): |
| 348 | + if not isinstance(data, (bytes, bytearray, memoryview)): |
| 349 | + raise TypeError('data argument must be byte-ish (%r)', |
| 350 | + type(data)) |
| 351 | + |
| 352 | + if not data: |
| 353 | + return |
| 354 | + |
| 355 | + if self._conn_lost and self._address: |
| 356 | + # close() or force_close() has been called on the bound endpoint |
| 357 | + return |
| 358 | + |
| 359 | + self._buffer.appendleft((data, addr)) |
| 360 | + |
| 361 | + if self._write_fut is None: |
| 362 | + # No current write operations are active, kick one off |
| 363 | + self._loop_writing() |
| 364 | + else: |
| 365 | + # A write operation is already kicked off |
| 366 | + pass |
| 367 | + |
| 368 | + def _loop_writing(self, fut=None): |
| 369 | + if self._conn_lost: |
| 370 | + return |
| 371 | + |
| 372 | + assert fut is self._write_fut |
| 373 | + if fut: |
| 374 | + # We are in a _loop_writing() done callback, get the result |
| 375 | + fut.result() |
| 376 | + |
| 377 | + if not self._buffer or (self._conn_lost and self._address): |
| 378 | + # The connection has been closed |
| 379 | + self._write_fut = None |
| 380 | + return |
| 381 | + |
| 382 | + data, addr = self._buffer.pop() |
| 383 | + |
| 384 | + self._write_fut = None |
| 385 | + try: |
| 386 | + if self._address: |
| 387 | + self._write_fut = self._loop._proactor.send(self._sock, data) |
| 388 | + else: |
| 389 | + self._write_fut = self._loop._proactor.sendto(self._sock, data, addr=addr) |
| 390 | + except OSError as exc: |
| 391 | + self._protocol.error_received(exc) |
| 392 | + self._fatal_error(exc, 'Fatal error sending UDP datagram') |
| 393 | + else: |
| 394 | + self._write_fut.add_done_callback(self._loop_writing) |
| 395 | + |
| 396 | + def _loop_reading(self, fut=None): |
| 397 | + if self._conn_lost: |
| 398 | + return |
| 399 | + |
| 400 | + assert self._read_fut is fut |
| 401 | + |
| 402 | + if fut: |
| 403 | + res = fut.result() |
| 404 | + |
| 405 | + if self._address: |
| 406 | + data, addr = res, self._address |
| 407 | + else: |
| 408 | + data, addr = res |
| 409 | + |
| 410 | + self._protocol.datagram_received(data, addr) |
| 411 | + |
| 412 | + if self._conn_lost: |
| 413 | + return |
| 414 | + |
| 415 | + try: |
| 416 | + if self._address: |
| 417 | + self._read_fut = self._loop._proactor.recv(self._sock, 4096) |
| 418 | + else: |
| 419 | + self._read_fut = self._loop._proactor.recvfrom(self._sock, 4096) |
| 420 | + except OSError as exc: |
| 421 | + self._protocol.error_received(exc) |
| 422 | + self._fatal_error(exc, "Fatal error reading from UDP endpoint") |
| 423 | + else: |
| 424 | + self._read_fut.add_done_callback(self._loop_reading) |
| 425 | + |
| 426 | + |
330 | 427 | class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport, |
331 | 428 | _ProactorBaseWritePipeTransport, |
332 | 429 | transports.Transport): |
@@ -390,6 +487,11 @@ def _make_socket_transport(self, sock, protocol, waiter=None, |
390 | 487 | return _ProactorSocketTransport(self, sock, protocol, waiter, |
391 | 488 | extra, server) |
392 | 489 |
|
| 490 | + def _make_datagram_transport(self, sock, protocol, |
| 491 | + address=None, waiter=None, extra=None): |
| 492 | + return _ProactorDatagramTransport(self, sock, protocol, address, |
| 493 | + waiter, extra) |
| 494 | + |
393 | 495 | def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None, |
394 | 496 | *, server_side=False, server_hostname=None, |
395 | 497 | extra=None, server=None): |
|
0 commit comments