Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "parity-db"
version = "0.4.2"
version = "0.4.3"
authors = ["Parity Technologies <[email protected]>"]
edition = "2021"
license = "MIT OR Apache-2.0"
Expand Down
82 changes: 59 additions & 23 deletions src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,10 @@ impl HashColumn {
if let Some(table) = IndexTable::open_existing(path, id)? {
if top.is_none() {
stats = table.load_stats()?;
log::trace!(target: "parity-db", "Opened main index {}", table.id);
top = Some(table);
} else {
log::trace!(target: "parity-db", "Opened stale index {}", table.id);
reindexing.push_front(table);
}
}
Expand Down Expand Up @@ -409,24 +411,25 @@ impl HashColumn {

fn write_reindex_plan_locked<'a, 'b>(
&self,
tables: RwLockUpgradableReadGuard<'a, Tables>,
reindex: RwLockUpgradableReadGuard<'b, Reindex>,
mut tables: RwLockUpgradableReadGuard<'a, Tables>,
mut reindex: RwLockUpgradableReadGuard<'b, Reindex>,
key: &Key,
address: Address,
log: &mut LogWriter,
) -> Result<PlanOutcome> {
if Self::search_index(key, &tables.index, &tables, log)?.is_some() {
if Self::contains_partial_key_with_address(key, address, &tables.index, log)? {
log::trace!(target: "parity-db", "{}: Skipped reindex entry {} when reindexing", tables.index.id, hex(key));
return Ok(PlanOutcome::Skipped)
}
match tables.index.write_insert_plan(key, address, None, log)? {
PlanOutcome::NeedReindex => {
log::debug!(target: "parity-db", "{}: Index chunk full {} when reindexing", tables.index.id, hex(key));
let (tables, reindex) = Self::trigger_reindex(tables, reindex, self.path.as_path());
self.write_reindex_plan_locked(tables, reindex, key, address, log)?;
Ok(PlanOutcome::NeedReindex)
},
_ => Ok(PlanOutcome::Written),
let mut outcome = PlanOutcome::Written;
while let PlanOutcome::NeedReindex =
tables.index.write_insert_plan(key, address, None, log)?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure.
Looping is just in case there is some concurrent reindexing leading to a multiple reindex trigger (could happen , but probably rare in practice).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it should never happen realistically. Insertion into an empty index should always be successful. This code had a small bug. An entry in the value table was created twice if reindex was triggered.

{
log::info!(target: "parity-db", "{}: Index chunk full {} when reindexing", tables.index.id, hex(key));
(tables, reindex) = Self::trigger_reindex(tables, reindex, self.path.as_path());
outcome = PlanOutcome::NeedReindex;
}
Ok(outcome)
}

fn search_index<'a>(
Expand Down Expand Up @@ -455,6 +458,25 @@ impl HashColumn {
Ok(None)
}

fn contains_partial_key_with_address<'a>(
key: &Key,
address: Address,
index: &'a IndexTable,
log: &LogWriter,
) -> Result<bool> {
let (mut existing_entry, mut sub_index) = index.get(key, 0, log)?;
while !existing_entry.is_empty() {
let existing_address = existing_entry.address(index.id.index_bits());
if existing_address == address {
return Ok(true)
}
let (next_entry, next_index) = index.get(key, sub_index + 1, log)?;
existing_entry = next_entry;
sub_index = next_index;
}
Ok(false)
}

fn search_all_indexes<'a>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes me think search_all_indexes could be call search_index and search_index something a bit technical, so first idea would be to search all indexes.
But really not needed.

key: &Key,
tables: &'a Tables,
Expand Down Expand Up @@ -548,8 +570,8 @@ impl HashColumn {

fn write_plan_new<'a, 'b>(
&self,
tables: RwLockUpgradableReadGuard<'a, Tables>,
reindex: RwLockUpgradableReadGuard<'b, Reindex>,
mut tables: RwLockUpgradableReadGuard<'a, Tables>,
mut reindex: RwLockUpgradableReadGuard<'b, Reindex>,
key: &Key,
value: &[u8],
log: &mut LogWriter,
Expand All @@ -567,15 +589,15 @@ impl HashColumn {
log,
stats,
)?;
match tables.index.write_insert_plan(key, address, None, log)? {
PlanOutcome::NeedReindex => {
log::debug!(target: "parity-db", "{}: Index chunk full {}", tables.index.id, hex(key));
let (tables, reindex) = Self::trigger_reindex(tables, reindex, self.path.as_path());
let (_, t, r) = self.write_plan_new(tables, reindex, key, value, log)?;
Ok((PlanOutcome::NeedReindex, t, r))
},
_ => Ok((PlanOutcome::Written, tables, reindex)),
let mut outcome = PlanOutcome::Written;
while let PlanOutcome::NeedReindex =
tables.index.write_insert_plan(key, address, None, log)?
{
log::debug!(target: "parity-db", "{}: Index chunk full {}", tables.index.id, hex(key));
(tables, reindex) = Self::trigger_reindex(tables, reindex, self.path.as_path());
outcome = PlanOutcome::NeedReindex;
}
Ok((outcome, tables, reindex))
}

pub fn enact_plan(&self, action: LogAction, log: &mut LogReader) -> Result<()> {
Expand Down Expand Up @@ -620,14 +642,20 @@ impl HashColumn {
} else if let Some(table) = reindex.queue.iter().find(|r| r.id == record.table) {
table.validate_plan(record.index, log)?;
} else {
if record.table.index_bits() < tables.index.id.index_bits() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember deriving ord the other day when looking at this, just realize it may have been plain wrong (logic to ordering between different index may just be bad).

// Insertion into a previously dropped index.
log::warn!( target: "parity-db", "Index {} is too old. Current is {}", record.table, tables.index.id);
return Err(Error::Corruption("Unexpected log index id".to_string()))
}
// Re-launch previously started reindex
// TODO: add explicit log records for reindexing events.
log::warn!(
target: "parity-db",
"Missing table {}, starting reindex",
record.table,
);
let _lock = Self::trigger_reindex(tables, reindex, self.path.as_path());
let lock = Self::trigger_reindex(tables, reindex, self.path.as_path());
std::mem::drop(lock);
return self.validate_plan(LogAction::InsertIndex(record), log)
}
},
Expand Down Expand Up @@ -723,7 +751,7 @@ impl HashColumn {
});
f(state).unwrap_or(false)
})?;
log::debug!( target: "parity-db", "{}: Done Iterating table {}", source.id, table.id);
log::debug!( target: "parity-db", "{}: Done iterating table {}", source.id, table.id);
}
}

Expand Down Expand Up @@ -1086,4 +1114,12 @@ impl Column {
Column::Tree(_column) => Ok(()),
}
}

#[cfg(test)]
pub fn index_bits(&self) -> Option<u8> {
match self {
Column::Hash(column) => Some(column.tables.read().index.id.index_bits()),
Column::Tree(_column) => None,
}
}
}
77 changes: 71 additions & 6 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ impl DbInner {
}

fn start_reindex(&self, record_id: u64) {
log::trace!(target: "parity-db", "Scheduled reindex at record {}", record_id);
self.next_reindex.store(record_id, Ordering::SeqCst);
}

Expand Down Expand Up @@ -528,7 +529,7 @@ impl DbInner {
if let Some(mut reader) = reader {
log::debug!(
target: "parity-db",
"Enacting log {}",
"Enacting log record {}",
reader.record_id(),
);
if validation_mode {
Expand Down Expand Up @@ -907,18 +908,23 @@ impl Db {
})
}

/// Get a value in a specified column by key. Returns `None` if the key does not exist.
pub fn get(&self, col: ColId, key: &[u8]) -> Result<Option<Value>> {
self.inner.get(col, key)
}

/// Get value size by key. Returns `None` if the key does not exist.
pub fn get_size(&self, col: ColId, key: &[u8]) -> Result<Option<u32>> {
self.inner.get_size(col, key)
}

/// Iterate over all ordered key-value pairs. Only supported for columns configured with
/// `btree_indexed`.
pub fn iter(&self, col: ColId) -> Result<BTreeIterator> {
self.inner.btree_iter(col)
}

/// Commit a set of changes to the database.
pub fn commit<I, K>(&self, tx: I) -> Result<()>
where
I: IntoIterator<Item = (ColId, K, Option<Value>)>,
Expand All @@ -927,6 +933,7 @@ impl Db {
self.inner.commit(tx)
}

/// Commit a set of changes to the database.
pub fn commit_changes<I>(&self, tx: I) -> Result<()>
where
I: IntoIterator<Item = (ColId, Operation<Vec<u8>, Vec<u8>>)>,
Expand All @@ -938,15 +945,14 @@ impl Db {
self.inner.commit_raw(commit)
}

/// Returns the number of columns in the database.
pub fn num_columns(&self) -> u8 {
self.inner.columns.len() as u8
}

pub(crate) fn iter_column_while(
&self,
c: ColId,
f: impl FnMut(IterState) -> bool,
) -> Result<()> {
/// Iterate a column and call a function for each value. Iteration order is unspecified. Note
/// that for hash columns the key is the hash of the original key.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add that this iteration skip the commit overlay (iirc this is on dbinner so user should not be calling it, but still would be good to note).
Could also say that this is only for Hash indexed column (btree is unimplemented, since we got iterator, but could actually probably be).

pub fn iter_column_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
self.inner.iter_column_while(c, f)
}

Expand Down Expand Up @@ -1018,6 +1024,7 @@ impl Db {
self.inner.clear_stats(column)
}

/// Print database contents in text form to stderr.
pub fn dump(&self, check_param: check::CheckOptions) -> Result<()> {
if let Some(col) = check_param.column {
self.inner.columns[col as usize].dump(&self.inner.log, &check_param, col)?;
Expand All @@ -1029,6 +1036,7 @@ impl Db {
Ok(())
}

/// Get database statistics.
pub fn stats(&self) -> StatSummary {
self.inner.stats()
}
Expand Down Expand Up @@ -2424,4 +2432,61 @@ mod tests {
assert!(db.get(0, &[0]).unwrap().is_some());
}
}

#[cfg(feature = "instrumentation")]
#[test]
fn test_continue_reindex() {
let _ = env_logger::try_init();
let tmp = tempdir().unwrap();
let mut options = Options::with_columns(tmp.path(), 1);
options.columns[0].preimage = true;
options.columns[0].uniform = true;
options.always_flush = true;
options.with_background_thread = false;
options.salt = Some(Default::default());

{
// Force a reindex by committing more than 64 values with the same 16 bit prefix
let db = Db::open_or_create(&options).unwrap();
let commit: Vec<_> = (0..65u32)
.map(|index| {
let mut key = [0u8; 32];
key[2] = (index as u8) << 1;
(0, key.to_vec(), Some(vec![index as u8]))
})
.collect();
db.commit(commit).unwrap();

db.process_commits().unwrap();
db.flush_logs().unwrap();
db.enact_logs().unwrap();
// i16 now contains 64 values and i17 contains a single value that did not fit

// Simulate interrupted reindex by processing it first and then restoring the old index
// file. Make a copy of the index file first.
std::fs::copy(tmp.path().join("index_00_16"), tmp.path().join("index_00_16.bak"))
.unwrap();
db.process_reindex().unwrap();
db.flush_logs().unwrap();
db.enact_logs().unwrap();
db.clean_logs().unwrap();
std::fs::rename(tmp.path().join("index_00_16.bak"), tmp.path().join("index_00_16"))
.unwrap();
}

// Reopen the database which should load the reindex.
{
let db = Db::open(&options).unwrap();
db.process_reindex().unwrap();
let mut entries = 0;
db.iter_column_while(0, |_| {
entries += 1;
true
})
.unwrap();

assert_eq!(entries, 65);
assert_eq!(db.inner.columns[0].index_bits(), Some(17));
}
}
}
22 changes: 13 additions & 9 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,27 @@ impl std::error::Error for Error {
}

#[cfg(feature = "instrumentation")]
pub static IO_COUNTER_BEFORE_ERROR: AtomicUsize = AtomicUsize::new(usize::MAX);
thread_local! {
pub static IO_COUNTER_BEFORE_ERROR: AtomicUsize = AtomicUsize::new(usize::MAX);
}

#[cfg(feature = "instrumentation")]
pub fn set_number_of_allowed_io_operations(val: usize) {
IO_COUNTER_BEFORE_ERROR.store(val, Ordering::Relaxed);
IO_COUNTER_BEFORE_ERROR.with(|v| v.store(val, Ordering::Relaxed));
}

#[cfg(feature = "instrumentation")]
macro_rules! try_io {
($e:expr) => {{
if crate::error::IO_COUNTER_BEFORE_ERROR
.fetch_update(
::std::sync::atomic::Ordering::SeqCst,
::std::sync::atomic::Ordering::SeqCst,
|v| Some(v.saturating_sub(1)),
)
.unwrap() == 0
if crate::error::IO_COUNTER_BEFORE_ERROR.with(|value| {
value
.fetch_update(
::std::sync::atomic::Ordering::SeqCst,
::std::sync::atomic::Ordering::SeqCst,
|v| Some(v.saturating_sub(1)),
)
.unwrap()
}) == 0
{
Err(crate::error::Error::Io(::std::io::Error::new(
::std::io::ErrorKind::Other,
Expand Down
4 changes: 2 additions & 2 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl TableId {

impl std::fmt::Display for TableId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:02}-{:02}", self.col(), self.index_bits())
write!(f, "i{:02}-{:02}", self.col(), self.index_bits())
}
}

Expand Down Expand Up @@ -353,7 +353,7 @@ impl IndexTable {
return Ok(PlanOutcome::Written)
}
}
log::trace!(target: "parity-db", "{}: Full at {}", self.id, chunk_index);
log::trace!(target: "parity-db", "{}: Index chunk full at {}", self.id, chunk_index);
Ok(PlanOutcome::NeedReindex)
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod stats;
mod table;

pub use btree::BTreeIterator;
pub use column::ColId;
pub use compress::CompressionType;
pub use db::{check::CheckOptions, Db, Operation, Value};
#[cfg(feature = "instrumentation")]
Expand Down
5 changes: 3 additions & 2 deletions src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl TableId {

impl std::fmt::Display for TableId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:02}-{:02}", self.col(), hex(&[self.size_tier()]))
write!(f, "t{:02}-{:02}", self.col(), hex(&[self.size_tier()]))
}
}

Expand Down Expand Up @@ -475,7 +475,7 @@ impl ValueTable {
self.id,
index,
k,
to_fetch,
to_fetch.as_ref().map(hex),
self.entry_size,
);
return Ok((0, false))
Expand Down Expand Up @@ -869,6 +869,7 @@ impl ValueTable {
let mut header = Header::default();
log.read(&mut header.0)?;
self.file.write_at(&header.0, 0)?;
log::trace!(target: "parity-db", "{}: Enacted header, {} filled", self.id, header.filled());
return Ok(())
}

Expand Down