use mio::{Ready, Poll, PollOpt, Token}; use mio::event::Evented; use mio::unix::EventedFd; use std::io; use std::marker::Unpin; #[cfg(not(windows))] use std::os::unix::io::RawFd; use std::pin::Pin; use super::Activated; use super::Packet; use super::Error; use super::State; use super::Capture; pub struct SelectableFd { fd: RawFd } impl Evented for SelectableFd { fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { EventedFd(&self.fd).register(poll, token, interest, opts) } fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> { EventedFd(&self.fd).reregister(poll, token, interest, opts) } fn deregister(&self, poll: &Poll) -> io::Result<()> { EventedFd(&self.fd).deregister(poll) } } pub trait PacketCodec { type Type; fn decode<'a>(&mut self, packet: Packet<'a>) -> Result; } pub struct PacketStream { cap: Capture, fd: tokio::io::PollEvented, codec: C, } impl PacketStream { pub fn new(cap: Capture, fd: RawFd, codec: C) -> Result, Error> { Ok(PacketStream { cap, fd: tokio::io::PollEvented::new(SelectableFd { fd })?, codec }) } } impl<'a, T: Activated + ? Sized + Unpin, C: PacketCodec + Unpin> futures::Stream for PacketStream { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut core::task::Context) -> futures::task::Poll> { let stream = Pin::into_inner(self); let p = match stream.cap.next_noblock(cx, &mut stream.fd) { Ok(t) => t, Err(Error::IoError(ref e)) if *e == ::std::io::ErrorKind::WouldBlock => { return futures::task::Poll::Pending; } Err(e) => return futures::task::Poll::Ready(Some(Err(e))), }; let frame_result = stream.codec.decode(p); futures::task::Poll::Ready(Some(frame_result)) } }