concurrent reading

This commit is contained in:
Jana Dönszelmann 2026-02-25 15:32:13 +01:00
parent 3d9114dea9
commit 1f6679f57f
No known key found for this signature in database
8 changed files with 104 additions and 84 deletions

View file

@ -4,8 +4,12 @@ use std::{
io::{self, BufRead, BufReader},
mem,
path::{Path, PathBuf},
rc::Rc,
sync::OnceLock,
sync::Arc,
sync::{
OnceLock,
mpsc::{Receiver, sync_channel},
},
thread,
};
use crate::tui::{
@ -13,32 +17,14 @@ use crate::tui::{
processing::LogStream,
};
struct Inner {
pub path: PathBuf,
struct LogFileEntryGenerator {
file: BufReader<File>,
cached_entries: Vec<Rc<LogEntry>>,
}
#[derive(Clone)]
pub struct LogfileReader(Rc<RefCell<Inner>>);
impl LogfileReader {
pub fn new(p: &Path) -> io::Result<Self> {
Ok(Self(Rc::new(RefCell::new(Inner {
file: BufReader::new(File::open(p)?),
path: p.to_path_buf(),
cached_entries: Vec::new(),
}))))
}
pub fn path(&self) -> PathBuf {
self.0.borrow().path.clone()
}
impl LogFileEntryGenerator {
fn next_line(&mut self) -> Option<String> {
let mut res = String::new();
match self.0.borrow_mut().file.read_line(&mut res) {
match self.file.read_line(&mut res) {
Err(e) => {
eprintln!("error: {e:?}");
None
@ -59,14 +45,14 @@ impl LogfileReader {
}
}
fn next_entry(&mut self) -> Option<Rc<LogEntry>> {
let mut stack = Vec::<(RawLogEntry, Vec<Rc<LogEntry>>)>::new();
let mut curr = Vec::<Rc<LogEntry>>::new();
fn next_entry(&mut self) -> Option<Arc<LogEntry>> {
let mut stack = Vec::<(RawLogEntry, Vec<Arc<LogEntry>>)>::new();
let mut curr = Vec::<Arc<LogEntry>>::new();
loop {
let entry = self.next_raw_entry()?;
let new_entry = Rc::new(match entry.fields.message() {
let new_entry = Arc::new(match entry.fields.message() {
Some("enter") => {
stack.push((entry, mem::take(&mut curr)));
continue;
@ -94,14 +80,49 @@ impl LogfileReader {
}
}
}
}
fn fill_buf_to_access_index(&mut self, n: usize) -> Option<Rc<LogEntry>> {
struct Inner {
pub path: PathBuf,
entry_generator: Receiver<Arc<LogEntry>>,
cached_entries: Vec<Arc<LogEntry>>,
}
#[derive(Clone)]
pub struct LogfileReader(Arc<RefCell<Inner>>);
impl LogfileReader {
pub fn new(p: &Path) -> io::Result<Self> {
let (tx, rx) = sync_channel(1000);
let mut generator = LogFileEntryGenerator {
file: BufReader::new(File::open(p)?),
};
thread::spawn(move || {
while let Some(i) = generator.next_entry() {
tx.send(i).unwrap();
}
});
Ok(Self(Arc::new(RefCell::new(Inner {
path: p.to_path_buf(),
cached_entries: Vec::new(),
entry_generator: rx,
}))))
}
pub fn path(&self) -> PathBuf {
self.0.borrow().path.clone()
}
fn fill_buf_to_access_index(&mut self, n: usize) -> Option<Arc<LogEntry>> {
while self.0.borrow().cached_entries.len() <= n {
let entry = self.next_entry()?;
let entry = self.0.borrow().entry_generator.recv().ok()?;
self.0.borrow_mut().cached_entries.push(entry);
}
Some(Rc::clone(&self.0.borrow().cached_entries[n]))
Some(Arc::clone(&self.0.borrow().cached_entries[n]))
}
pub fn iter(&self) -> LogFileReaderStream {
@ -118,14 +139,14 @@ pub struct LogFileReaderStream {
}
impl LogStream for LogFileReaderStream {
fn next(&mut self) -> Option<(Rc<LogEntry>, usize)> {
fn next(&mut self) -> Option<(Arc<LogEntry>, usize)> {
let entry = self.reader.fill_buf_to_access_index(self.curr)?;
self.curr += 1;
Some((entry, 0))
}
fn prev(&mut self) -> Option<(Rc<LogEntry>, usize)> {
fn prev(&mut self) -> Option<(Arc<LogEntry>, usize)> {
if self.curr == 0 {
return None;
}
@ -143,7 +164,7 @@ impl LogStream for LogFileReaderStream {
})
}
fn enclosing_log_entry(&self) -> Option<(Rc<LogEntry>, usize)> {
fn enclosing_log_entry(&self) -> Option<(Arc<LogEntry>, usize)> {
None
}
}