Skip to content

Commit 0cd175b

Browse files
committed
Add FilesystemStore
We upstream the `FilesystemStore` implementation, which is backwards compatible with `lightning-persister::FilesystemPersister`.
1 parent 05e0a3c commit 0cd175b

File tree

4 files changed

+438
-0
lines changed

4 files changed

+438
-0
lines changed

lightning-persister/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ libc = "0.2"
2020

2121
[target.'cfg(windows)'.dependencies]
2222
winapi = { version = "0.3", features = ["winbase"] }
23+
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
2324

2425
[target.'cfg(ldk_bench)'.dependencies]
2526
criterion = { version = "0.4", optional = true, default-features = false }

lightning-persister/src/fs_store.rs

Lines changed: 374 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,374 @@
1+
//! Objects related to [`FilesystemStore`] live here.
2+
use crate::utils::{check_namespace_key_validity, is_valid_kvstore_str};
3+
4+
use lightning::util::persist::KVStore;
5+
use lightning::util::string::PrintableString;
6+
7+
use std::collections::HashMap;
8+
use std::fs;
9+
use std::io::{Read, Write};
10+
use std::path::{Path, PathBuf};
11+
use std::sync::atomic::{AtomicUsize, Ordering};
12+
use std::sync::{Arc, Mutex, RwLock};
13+
14+
#[cfg(target_os = "windows")]
15+
use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
16+
17+
#[cfg(target_os = "windows")]
18+
macro_rules! call {
19+
($e: expr) => {
20+
if $e != 0 {
21+
Ok(())
22+
} else {
23+
Err(std::io::Error::last_os_error())
24+
}
25+
};
26+
}
27+
28+
#[cfg(target_os = "windows")]
29+
fn path_to_windows_str<T: AsRef<OsStr>>(path: T) -> Vec<u16> {
30+
path.as_ref().encode_wide().chain(Some(0)).collect()
31+
}
32+
33+
// The number of read/write/remove/list operations after which we clean up our `locks` HashMap.
34+
const GC_LOCK_INTERVAL: usize = 25;
35+
36+
/// A [`KVStore`] implementation that writes to and reads from the file system.
37+
pub struct FilesystemStore {
38+
data_dir: PathBuf,
39+
tmp_file_counter: AtomicUsize,
40+
gc_counter: AtomicUsize,
41+
locks: Mutex<HashMap<PathBuf, Arc<RwLock<()>>>>,
42+
}
43+
44+
impl FilesystemStore {
45+
/// Constructs a new [`FilesystemStore`].
46+
pub fn new(data_dir: PathBuf) -> Self {
47+
let locks = Mutex::new(HashMap::new());
48+
let tmp_file_counter = AtomicUsize::new(0);
49+
let gc_counter = AtomicUsize::new(1);
50+
Self { data_dir, tmp_file_counter, gc_counter, locks }
51+
}
52+
53+
/// Returns the data directory.
54+
pub fn get_data_dir(&self) -> PathBuf {
55+
self.data_dir.clone()
56+
}
57+
58+
fn garbage_collect_locks(&self) {
59+
let gc_counter = self.gc_counter.fetch_add(1, Ordering::AcqRel);
60+
61+
if gc_counter % GC_LOCK_INTERVAL == 0 {
62+
// Take outer lock for the cleanup.
63+
let mut outer_lock = self.locks.lock().unwrap();
64+
65+
// Garbage collect all lock entries that are not referenced anymore.
66+
outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
67+
}
68+
}
69+
}
70+
71+
impl KVStore for FilesystemStore {
72+
fn read(&self, namespace: &str, sub_namespace: &str, key: &str) -> std::io::Result<Vec<u8>> {
73+
check_namespace_key_validity(namespace, sub_namespace, Some(key), "read")?;
74+
75+
let mut dest_file_path = self.data_dir.clone();
76+
dest_file_path.push(namespace);
77+
if !sub_namespace.is_empty() {
78+
dest_file_path.push(sub_namespace);
79+
}
80+
dest_file_path.push(key);
81+
82+
let mut buf = Vec::new();
83+
{
84+
let inner_lock_ref = {
85+
let mut outer_lock = self.locks.lock().unwrap();
86+
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
87+
};
88+
let _guard = inner_lock_ref.read().unwrap();
89+
90+
let mut f = fs::File::open(dest_file_path)?;
91+
f.read_to_end(&mut buf)?;
92+
}
93+
94+
self.garbage_collect_locks();
95+
96+
Ok(buf)
97+
}
98+
99+
fn write(&self, namespace: &str, sub_namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
100+
check_namespace_key_validity(namespace, sub_namespace, Some(key), "write")?;
101+
102+
let mut dest_file_path = self.data_dir.clone();
103+
dest_file_path.push(namespace);
104+
if !sub_namespace.is_empty() {
105+
dest_file_path.push(sub_namespace);
106+
}
107+
dest_file_path.push(key);
108+
109+
let parent_directory = dest_file_path
110+
.parent()
111+
.ok_or_else(|| {
112+
let msg =
113+
format!("Could not retrieve parent directory of {}.", dest_file_path.display());
114+
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
115+
})?;
116+
fs::create_dir_all(&parent_directory)?;
117+
118+
// Do a crazy dance with lots of fsync()s to be overly cautious here...
119+
// We never want to end up in a state where we've lost the old data, or end up using the
120+
// old data on power loss after we've returned.
121+
// The way to atomically write a file on Unix platforms is:
122+
// open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
123+
let mut tmp_file_path = dest_file_path.clone();
124+
let tmp_file_ext = format!("{}.tmp", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
125+
tmp_file_path.set_extension(tmp_file_ext);
126+
127+
{
128+
let mut tmp_file = fs::File::create(&tmp_file_path)?;
129+
tmp_file.write_all(&buf)?;
130+
tmp_file.sync_all()?;
131+
}
132+
133+
let res = {
134+
let inner_lock_ref = {
135+
let mut outer_lock = self.locks.lock().unwrap();
136+
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
137+
};
138+
let _guard = inner_lock_ref.write().unwrap();
139+
140+
#[cfg(not(target_os = "windows"))]
141+
{
142+
fs::rename(&tmp_file_path, &dest_file_path)?;
143+
let dir_file = fs::OpenOptions::new().read(true).open(&parent_directory)?;
144+
dir_file.sync_all()?;
145+
Ok(())
146+
}
147+
148+
#[cfg(target_os = "windows")]
149+
{
150+
let res = if dest_file_path.exists() {
151+
call!(unsafe {
152+
windows_sys::Win32::Storage::FileSystem::ReplaceFileW(
153+
path_to_windows_str(dest_file_path.clone()).as_ptr(),
154+
path_to_windows_str(tmp_file_path).as_ptr(),
155+
std::ptr::null(),
156+
windows_sys::Win32::Storage::FileSystem::REPLACEFILE_IGNORE_MERGE_ERRORS,
157+
std::ptr::null_mut() as *const core::ffi::c_void,
158+
std::ptr::null_mut() as *const core::ffi::c_void,
159+
)
160+
})
161+
} else {
162+
call!(unsafe {
163+
windows_sys::Win32::Storage::FileSystem::MoveFileExW(
164+
path_to_windows_str(tmp_file_path).as_ptr(),
165+
path_to_windows_str(dest_file_path.clone()).as_ptr(),
166+
windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
167+
| windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
168+
)
169+
})
170+
};
171+
172+
match res {
173+
Ok(()) => {
174+
// We fsync the dest file in hopes this will also flush the metadata to disk.
175+
let dest_file = fs::OpenOptions::new().read(true).write(true)
176+
.open(&dest_file_path)?;
177+
dest_file.sync_all()?;
178+
Ok(())
179+
}
180+
Err(e) => Err(e),
181+
}
182+
}
183+
};
184+
185+
self.garbage_collect_locks();
186+
187+
res
188+
}
189+
190+
fn remove(&self, namespace: &str, sub_namespace: &str, key: &str, lazy: bool) -> std::io::Result<()> {
191+
check_namespace_key_validity(namespace, sub_namespace, Some(key), "remove")?;
192+
193+
let mut dest_file_path = self.data_dir.clone();
194+
dest_file_path.push(namespace);
195+
if !sub_namespace.is_empty() {
196+
dest_file_path.push(sub_namespace);
197+
}
198+
dest_file_path.push(key);
199+
200+
if !dest_file_path.is_file() {
201+
return Ok(());
202+
}
203+
204+
{
205+
let inner_lock_ref = {
206+
let mut outer_lock = self.locks.lock().unwrap();
207+
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
208+
};
209+
let _guard = inner_lock_ref.write().unwrap();
210+
211+
if lazy {
212+
// If we're lazy we just call remove and be done with it.
213+
fs::remove_file(&dest_file_path)?;
214+
} else {
215+
// If we're not lazy we try our best to persist the updated metadata to ensure
216+
// atomicity of this call.
217+
#[cfg(not(target_os = "windows"))]
218+
{
219+
fs::remove_file(&dest_file_path)?;
220+
221+
let parent_directory = dest_file_path.parent().ok_or_else(|| {
222+
let msg =
223+
format!("Could not retrieve parent directory of {}.", dest_file_path.display());
224+
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
225+
})?;
226+
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
227+
// The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
228+
// to the inode might get cached (and hence possibly lost on crash), depending on
229+
// the target platform and file system.
230+
//
231+
// In order to assert we permanently removed the file in question we therefore
232+
// call `fsync` on the parent directory on platforms that support it,
233+
dir_file.sync_all()?;
234+
}
235+
236+
#[cfg(target_os = "windows")]
237+
{
238+
// Since Windows `DeleteFile` API is not persisted until the last open file handle
239+
// is dropped, and there seemingly is no reliable way to flush the directory
240+
// metadata, we here fall back to use a 'recycling bin' model, i.e., first move the
241+
// file to be deleted to a temporary trash file and remove the latter file
242+
// afterwards.
243+
//
244+
// This should be marginally better, as, according to the documentation,
245+
// `MoveFileExW` APIs should offer stronger persistence guarantees,
246+
// at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set.
247+
// However, all this is partially based on assumptions and local experiments, as
248+
// Windows API is horribly underdocumented.
249+
let mut trash_file_path = dest_file_path.clone();
250+
let trash_file_ext = format!("{}.trash",
251+
self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
252+
trash_file_path.set_extension(trash_file_ext);
253+
254+
call!(unsafe {
255+
windows_sys::Win32::Storage::FileSystem::MoveFileExW(
256+
path_to_windows_str(dest_file_path).as_ptr(),
257+
path_to_windows_str(trash_file_path.clone()).as_ptr(),
258+
windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
259+
| windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
260+
)
261+
})?;
262+
263+
{
264+
// We fsync the trash file in hopes this will also flush the original's file
265+
// metadata to disk.
266+
let trash_file = fs::OpenOptions::new().read(true).write(true)
267+
.open(&trash_file_path.clone())?;
268+
trash_file.sync_all()?;
269+
}
270+
271+
// We're fine if this remove would fail as the trash file will be cleaned up in
272+
// list eventually.
273+
fs::remove_file(trash_file_path).ok();
274+
}
275+
}
276+
}
277+
278+
self.garbage_collect_locks();
279+
280+
Ok(())
281+
}
282+
283+
fn list(&self, namespace: &str, sub_namespace: &str) -> std::io::Result<Vec<String>> {
284+
check_namespace_key_validity(namespace, sub_namespace, None, "list")?;
285+
286+
let mut prefixed_dest = self.data_dir.clone();
287+
prefixed_dest.push(namespace);
288+
if !sub_namespace.is_empty() {
289+
prefixed_dest.push(sub_namespace);
290+
}
291+
292+
let mut keys = Vec::new();
293+
294+
if !Path::new(&prefixed_dest).exists() {
295+
return Ok(Vec::new());
296+
}
297+
298+
for entry in fs::read_dir(&prefixed_dest)? {
299+
let entry = entry?;
300+
let p = entry.path();
301+
302+
if let Some(ext) = p.extension() {
303+
#[cfg(target_os = "windows")]
304+
{
305+
// Clean up any trash files lying around.
306+
if ext == "trash" {
307+
fs::remove_file(p).ok();
308+
continue;
309+
}
310+
}
311+
if ext == "tmp" {
312+
continue;
313+
}
314+
}
315+
316+
let metadata = p.metadata()?;
317+
318+
// We allow the presence of directories in the empty namespace and just skip them.
319+
if metadata.is_dir() {
320+
continue;
321+
}
322+
323+
// If we otherwise don't find a file at the given path something went wrong.
324+
if !metadata.is_file() {
325+
debug_assert!(false, "Failed to list keys of {}/{}: file coulnd't be accessed.",
326+
PrintableString(namespace), PrintableString(sub_namespace));
327+
let msg = format!("Failed to list keys of {}/{}: file couldn't be accessed.",
328+
PrintableString(namespace), PrintableString(sub_namespace));
329+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
330+
}
331+
332+
match p.strip_prefix(&prefixed_dest) {
333+
Ok(stripped_path) => {
334+
if let Some(relative_path) = stripped_path.to_str() {
335+
if is_valid_kvstore_str(relative_path) {
336+
keys.push(relative_path.to_string())
337+
}
338+
} else {
339+
debug_assert!(false, "Failed to list keys of {}/{}: file path is not valid UTF-8",
340+
PrintableString(namespace), PrintableString(sub_namespace));
341+
let msg = format!("Failed to list keys of {}/{}: file path is not valid UTF-8",
342+
PrintableString(namespace), PrintableString(sub_namespace));
343+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
344+
}
345+
}
346+
Err(e) => {
347+
debug_assert!(false, "Failed to list keys of {}/{}: {}",
348+
PrintableString(namespace), PrintableString(sub_namespace), e);
349+
let msg = format!("Failed to list keys of {}/{}: {}",
350+
PrintableString(namespace), PrintableString(sub_namespace), e);
351+
return Err(std::io::Error::new(std::io::ErrorKind::Other, msg));
352+
}
353+
}
354+
}
355+
356+
self.garbage_collect_locks();
357+
358+
Ok(keys)
359+
}
360+
}
361+
362+
#[cfg(test)]
363+
mod tests {
364+
use super::*;
365+
use crate::test_utils::do_read_write_remove_list_persist;
366+
367+
#[test]
368+
fn read_write_remove_list_persist() {
369+
let mut temp_path = std::env::temp_dir();
370+
temp_path.push("test_read_write_remove_list_persist");
371+
let fs_store = FilesystemStore::new(temp_path);
372+
do_read_write_remove_list_persist(&fs_store);
373+
}
374+
}

lightning-persister/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010

1111
#[cfg(ldk_bench)] extern crate criterion;
1212

13+
pub mod fs_store;
14+
15+
mod utils;
16+
1317
#[cfg(test)]
1418
mod test_utils;
1519

0 commit comments

Comments
 (0)