rust定时器 | rust 技术论坛-江南app体育官方入口
use std::{
collections::{binary_heap::peekmut, binaryheap},
future::future,
sync::{arc, mutex},
thread::{self, thread},
time::{duration, instant},
};
use futures::future::boxfuture;
use thiserror::error;
pub trait swarp: send 'static {
fn swarp(self: box<self>);
}
impl<f> swarp for f
where
f: fnonce() send 'static,
{
fn swarp(self: box<self>) {
std::thread::spawn(self);
}
}
pub struct tokioswarp {
handle: tokio::runtime::handle,
future: boxfuture<'static, ()>,
}
impl tokioswarp {
pub fn new<f>(handle: tokio::runtime::handle, future: f) -> self
where
f: future<output = ()> send 'static,
{
self {
handle,
future: box::pin(future),
}
}
}
impl swarp for tokioswarp {
fn swarp(self: box<self>) {
self.handle.spawn(self.future);
}
}
#[derive(debug, error)]
pub enum error {
#[error("io error {}",.0)]
io(
#[source]
#[from]
std::io::error,
),
#[error("{}",.0)]
msg(string),
}
struct pair {
time: instant,
swarp: box<dyn swarp>,
}
impl eq for pair {}
impl ord for pair {
fn cmp(&self, other: &self) -> std::cmp::ordering {
self.time.cmp(&other.time).reverse()
}
}
impl partialeq for pair {
fn eq(&self, other: &self) -> bool {
self.time == other.time
}
}
impl partialord for pair {
fn partial_cmp(&self, other: &self) -> option<std::cmp::ordering> {
some(self.cmp(other))
}
}
struct inner {
heap: binaryheap<pair>,
thread: option<thread>,
}
#[derive(clone)]
pub struct clock {
inner: arc<mutex<inner>>,
}
impl clock {
pub fn new() -> result<self, error> {
let this = self {
inner: arc::new(mutex::new(inner {
heap: binaryheap::new(),
thread: none,
})),
};
let inner = this.inner.clone();
let join_handle = thread::builder::new()
.name("time clock thread".to_string())
.spawn(move || loop {
let mut guard = inner.lock().expect("获取锁异常");
let sleep = 'a: loop {
let now = instant::now();
if let some(data) = guard.heap.peek_mut() {
if data.time <= now {
let pair = peekmut::pop(data);
pair.swarp.swarp();
continue;
}
break 'a data.time.checked_duration_since(now).unwrap_or_default();
} else {
break 'a duration::from_secs(86400 * 365 * 30);
}
};
drop(guard);
thread::park_timeout(sleep);
})?;
let inner = this.inner.clone();
let mut guard = inner
.lock()
.map_err(|e| error::msg(format!("获取锁异常 {:?}", e)))?;
guard.thread = some(join_handle.thread().clone());
ok(this)
}
pub fn push<s>(&self, time: instant, swarp: s) -> result<(), error>
where
s: swarp,
{
let mut guard = self
.inner
.lock()
.map_err(|e| error::msg(format!("获取锁异常 {:?}", e)))?;
guard.heap.push(pair {
time,
swarp: box::new(swarp),
});
if let some(thread) = &guard.thread {
thread.unpark();
}
ok(())
}
}
本作品采用《cc 协议》,转载必须注明作者和本文链接
謎麟