Skip to content

Commit 796faba

Browse files
committed
Add FilesystemStore
We upstream the `FilesystemStore` implementation, which is backwards compatible with `lightning-persister::FilesystemPersister`.
1 parent f0cc64f commit 796faba

File tree

2 files changed

+266
-2
lines changed

2 files changed

+266
-2
lines changed

lightning-storage/src/fs_store.rs

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
//! Objects related to [`FilesystemStore`] live here.
2+
#[cfg(target_os = "windows")]
3+
extern crate winapi;
4+
5+
use lightning::util::persist::KVStore;
6+
7+
use std::collections::HashMap;
8+
use std::fs;
9+
use std::io::{BufReader, Read, Write};
10+
use std::path::{Path, PathBuf};
11+
use std::sync::{Arc, Mutex, RwLock};
12+
13+
#[cfg(not(target_os = "windows"))]
14+
use std::os::unix::io::AsRawFd;
15+
16+
use rand::distributions::Alphanumeric;
17+
use rand::{thread_rng, Rng};
18+
19+
#[cfg(target_os = "windows")]
20+
use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
21+
22+
#[cfg(target_os = "windows")]
23+
macro_rules! call {
24+
($e: expr) => {
25+
if $e != 0 {
26+
return Ok(());
27+
} else {
28+
return Err(std::io::Error::last_os_error());
29+
}
30+
};
31+
}
32+
33+
#[cfg(target_os = "windows")]
34+
fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<winapi::shared::ntdef::WCHAR> {
35+
path.as_ref().encode_wide().chain(Some(0)).collect()
36+
}
37+
38+
/// A [`KVStore`] implementation that writes to and reads from the file system.
39+
pub struct FilesystemStore {
40+
data_dir: PathBuf,
41+
locks: Mutex<HashMap<(String, String), Arc<RwLock<()>>>>,
42+
}
43+
44+
impl FilesystemStore {
45+
/// Constructs a new [`FilesystemStore`].
46+
pub fn new(data_dir: PathBuf) -> Self {
47+
let locks = Mutex::new(HashMap::new());
48+
Self { data_dir, locks }
49+
}
50+
51+
/// Returns the data directory.
52+
pub fn get_data_dir(&self) -> PathBuf {
53+
self.data_dir.clone()
54+
}
55+
}
56+
57+
impl KVStore for FilesystemStore {
58+
type Reader = FilesystemReader;
59+
60+
fn read(&self, namespace: &str, key: &str) -> std::io::Result<Self::Reader> {
61+
let mut outer_lock = self.locks.lock().unwrap();
62+
let lock_key = (namespace.to_string(), key.to_string());
63+
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
64+
65+
let mut dest_file_path = self.data_dir.clone();
66+
dest_file_path.push(namespace);
67+
dest_file_path.push(key);
68+
FilesystemReader::new(dest_file_path, inner_lock_ref)
69+
}
70+
71+
fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
72+
let mut outer_lock = self.locks.lock().unwrap();
73+
let lock_key = (namespace.to_string(), key.to_string());
74+
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
75+
let _guard = inner_lock_ref.write().unwrap();
76+
77+
let mut dest_file_path = self.data_dir.clone();
78+
dest_file_path.push(namespace);
79+
dest_file_path.push(key);
80+
81+
let parent_directory = dest_file_path
82+
.parent()
83+
.ok_or_else(|| {
84+
let msg =
85+
format!("Could not retrieve parent directory of {}.", dest_file_path.display());
86+
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
87+
})?
88+
.to_path_buf();
89+
fs::create_dir_all(parent_directory.clone())?;
90+
91+
// Do a crazy dance with lots of fsync()s to be overly cautious here...
92+
// We never want to end up in a state where we've lost the old data, or end up using the
93+
// old data on power loss after we've returned.
94+
// The way to atomically write a file on Unix platforms is:
95+
// open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
96+
let mut tmp_file_path = dest_file_path.clone();
97+
let mut rng = thread_rng();
98+
let rand_str: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect();
99+
let ext = format!("{}.tmp", rand_str);
100+
tmp_file_path.set_extension(ext);
101+
102+
let mut tmp_file = fs::File::create(&tmp_file_path)?;
103+
tmp_file.write_all(&buf)?;
104+
tmp_file.sync_all()?;
105+
106+
#[cfg(not(target_os = "windows"))]
107+
{
108+
fs::rename(&tmp_file_path, &dest_file_path)?;
109+
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory.clone())?;
110+
unsafe {
111+
libc::fsync(dir_file.as_raw_fd());
112+
}
113+
}
114+
115+
#[cfg(target_os = "windows")]
116+
{
117+
if dest_file_path.exists() {
118+
unsafe {
119+
winapi::um::winbase::ReplaceFileW(
120+
path_to_windows_str(dest_file_path).as_ptr(),
121+
path_to_windows_str(tmp_file_path).as_ptr(),
122+
std::ptr::null(),
123+
winapi::um::winbase::REPLACEFILE_IGNORE_MERGE_ERRORS,
124+
std::ptr::null_mut() as *mut winapi::ctypes::c_void,
125+
std::ptr::null_mut() as *mut winapi::ctypes::c_void,
126+
)
127+
};
128+
} else {
129+
call!(unsafe {
130+
winapi::um::winbase::MoveFileExW(
131+
path_to_windows_str(tmp_file_path).as_ptr(),
132+
path_to_windows_str(dest_file_path).as_ptr(),
133+
winapi::um::winbase::MOVEFILE_WRITE_THROUGH
134+
| winapi::um::winbase::MOVEFILE_REPLACE_EXISTING,
135+
)
136+
});
137+
}
138+
}
139+
Ok(())
140+
}
141+
142+
fn remove(&self, namespace: &str, key: &str) -> std::io::Result<()> {
143+
let mut outer_lock = self.locks.lock().unwrap();
144+
let lock_key = (namespace.to_string(), key.to_string());
145+
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key.clone()).or_default());
146+
147+
let _guard = inner_lock_ref.write().unwrap();
148+
149+
let mut dest_file_path = self.data_dir.clone();
150+
dest_file_path.push(namespace);
151+
dest_file_path.push(key);
152+
153+
if !dest_file_path.is_file() {
154+
return Ok(());
155+
}
156+
157+
fs::remove_file(&dest_file_path)?;
158+
#[cfg(not(target_os = "windows"))]
159+
{
160+
let parent_directory = dest_file_path.parent().ok_or_else(|| {
161+
let msg =
162+
format!("Could not retrieve parent directory of {}.", dest_file_path.display());
163+
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
164+
})?;
165+
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
166+
unsafe {
167+
// The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
168+
// to the inode might get cached (and hence possibly lost on crash), depending on
169+
// the target platform and file system.
170+
//
171+
// In order to assert we permanently removed the file in question we therefore
172+
// call `fsync` on the parent directory on platforms that support it,
173+
libc::fsync(dir_file.as_raw_fd());
174+
}
175+
}
176+
177+
if dest_file_path.is_file() {
178+
return Err(std::io::Error::new(std::io::ErrorKind::Other, "Removing key failed"));
179+
}
180+
181+
if Arc::strong_count(&inner_lock_ref) == 2 {
182+
// It's safe to remove the lock entry if we're the only one left holding a strong
183+
// reference. Checking this is necessary to ensure we continue to distribute references to the
184+
// same lock as long as some Readers are around. However, we still want to
185+
// clean up the table when possible.
186+
//
187+
// Note that this by itself is still leaky as lock entries will remain when more Readers/Writers are
188+
// around, but is preferable to doing nothing *or* something overly complex such as
189+
// implementing yet another RAII structure just for this pupose.
190+
outer_lock.remove(&lock_key);
191+
}
192+
193+
// Garbage collect all lock entries that are not referenced anymore.
194+
outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
195+
196+
Ok(())
197+
}
198+
199+
fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
200+
let mut prefixed_dest = self.data_dir.clone();
201+
prefixed_dest.push(namespace);
202+
203+
let mut keys = Vec::new();
204+
205+
if !Path::new(&prefixed_dest).exists() {
206+
return Ok(Vec::new());
207+
}
208+
209+
for entry in fs::read_dir(prefixed_dest.clone())? {
210+
let entry = entry?;
211+
let p = entry.path();
212+
213+
if !p.is_file() {
214+
continue;
215+
}
216+
217+
if let Some(ext) = p.extension() {
218+
if ext == "tmp" {
219+
continue;
220+
}
221+
}
222+
223+
if let Ok(relative_path) = p.strip_prefix(prefixed_dest.clone()) {
224+
keys.push(relative_path.display().to_string())
225+
}
226+
}
227+
228+
Ok(keys)
229+
}
230+
}
231+
232+
/// A buffered [`Read`] implementation as returned from [`FilesystemStore::read`].
233+
pub struct FilesystemReader {
234+
inner: BufReader<fs::File>,
235+
lock_ref: Arc<RwLock<()>>,
236+
}
237+
238+
impl FilesystemReader {
239+
fn new(dest_file_path: PathBuf, lock_ref: Arc<RwLock<()>>) -> std::io::Result<Self> {
240+
let f = fs::File::open(dest_file_path.clone())?;
241+
let inner = BufReader::new(f);
242+
Ok(Self { inner, lock_ref })
243+
}
244+
}
245+
246+
impl Read for FilesystemReader {
247+
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
248+
let _guard = self.lock_ref.read().unwrap();
249+
self.inner.read(buf)
250+
}
251+
}
252+
253+
#[cfg(test)]
254+
mod tests {
255+
use super::*;
256+
use crate::test_utils::do_read_write_remove_list_persist;
257+
258+
#[test]
259+
fn read_write_remove_list_persist() {
260+
let temp_path = std::env::temp_dir();
261+
let fs_store = FilesystemStore::new(temp_path);
262+
do_read_write_remove_list_persist(&fs_store);
263+
}
264+
}

lightning-storage/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
#![deny(private_intra_doc_links)]
66

77
#![deny(missing_docs)]
8-
#![deny(unsafe_code)]
9-
108
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
119

10+
pub mod fs_store;
11+
1212
#[cfg(test)]
1313
mod test_utils;

0 commit comments

Comments
 (0)