Open
Description
Code
use anyhow::Error;
use arrow::array::{Array, ArrayData, ArrayRef, StringArray};
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;
use csv::ReaderBuilder;
use datafusion::datasource::MemTable;
use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Debug;
use std::fs::File;
use std::io::{BufReader, Cursor, Read};
use std::ops::Deref;
use std::sync::Arc;
use arrow::ipc::{RecordBatchArgs, RecordBatchBuilder};
use datafusion::row::layout::RowLayout;
use thiserror::Error;
use serde_json::{json, Value};
#[derive(Serialize, Deserialize, Debug)]
struct SchemaTest {
#[serde(rename(serialize = "firstname"))]
#[serde(rename(deserialize = "firstname"))]
first_name: String,
#[serde(rename(serialize = "lastname"))]
#[serde(rename(deserialize = "lastname"))]
last_name: String,
profession: String,
id: isize
}
#[derive(Error, Debug)]
pub enum QueryEngineError {
#[error("fields unable to serde")]
SerdeIssue(String),
#[error("unknown data store error")]
Unknown,
}
#[tokio::main]
async fn main() -> Result<(), Error> {
// This is not needed in the crate, but needed for testing.
let file = File::open("./src/myFiles.csv").unwrap();
let mut buf_reader = BufReader::new(file);
let mut contents: Vec<u8> = Vec::new();
buf_reader.read_to_end(&mut contents).unwrap();
let config =
SessionConfig::new().set("datafusion.execution.batch_size", ScalarValue::from(100000));
let ctx = SessionContext::with_config(config);
// Cursor wraps the in memory buffer to be able to implement Read trait, so it can be a Reader.
let mut cursor = Cursor::new(contents);
let mut csv_reader = ReaderBuilder::new()
.has_headers(true)
.from_reader(&mut cursor);
let mut header_tracker: HashMap<usize, Vec<ArrayRef>> = HashMap::new();
let mut header_tracker_key = 0;
let headers = csv_reader.headers().unwrap();
headers.iter().for_each(|_| {
header_tracker.insert(header_tracker_key, Vec::new());
header_tracker_key += 1
});
let fields: Vec<Field> = headers
.iter()
.map(|header| Field::new(header, DataType::Utf8, false))
.collect();
let schema1 = Arc::new(Schema::new(fields));
cursor.set_position(0);
let mut reader = arrow::csv::ReaderBuilder::new(schema1.clone())
.with_batch_size(4000)
.has_header(true)
.build_buffered(cursor)
.unwrap();
let mut batches = Vec::new();
// RecordBatch should contains Columns for each csv column. Due to the batching size in reader,
// Must propagte the correect columns toegther then later combine those columnd that match
// to allow fast index grabbing down the line
for batch in reader.into_iter() {
batches.push(batch.unwrap());
}
// reader
// .into_iter()
// .flat_map(|batch| batch.into_iter())
// .for_each(|record_batch| {
// println!("{:?}", record_batch);
// record_batch.columns().iter().enumerate().for_each(|(index, c)| {
// println!("{:?}", c);
// header_tracker.entry(index).or_insert(vec![]).push(c.clone());
// });
// });
// Register a new MemTable with the context
let table = MemTable::try_new(schema1.clone(), vec![batches]).unwrap();
ctx.register_table("example", Arc::new(table)).unwrap();
let df = ctx.sql("SELECT * FROM example").await.unwrap();
let results: Vec<RecordBatch> = df.collect().await.unwrap();
if results.is_empty() {
return Ok(());
}
let all_data = read_and_process_columns::<SchemaTest>(results.first().unwrap()).await;
return Ok(());
}
async fn read_and_process_columns<S: 'static>(
record_batch: &RecordBatch,
) -> Result<Vec<S>, QueryEngineError>
where
S: Serialize + DeserializeOwned + Debug
{
let column_count = record_batch.columns().len();
let column = record_batch.columns().first().unwrap();
// TODO handle large amounts of data. There could be columns with data over 50k +
// Either shoot off only 4 tokio spawns wait for them and shoot off more to allow fast
// data parsing. Tokio spawns are light weight but dont want the user to feel that this
// crate has some resources/memory exhaustion.
// Or sequentially process in batched start in intervals of 10000.
let result = process_batch::<S>(column, record_batch, column_count).await?;
Ok(result)
}
#[inline]
async fn process_batch<S: 'static>(
column: &ArrayRef,
record_batch: &RecordBatch,
column_count: usize,
) -> Result<Vec<S>, QueryEngineError>
where
S: Serialize + DeserializeOwned + Debug
{
let mut newish: Vec<S> = Vec::new();
let column_data = column.as_any().downcast_ref::<StringArray>().unwrap();
for (column_index, column_value) in column_data.iter().enumerate() {
newish.push(
read_and_process_column::<S>(
column_index,
detect_empty_column_value(column_value),
column_count,
record_batch,
)
.await?,
)
}
Ok(newish)
}
fn detect_empty_column_value(column_value: Option<&str>) -> String {
let column_val = match column_value {
Some(value) => value.to_string(),
None => "".to_string(),
};
column_val
}
#[inline]
async fn read_and_process_column<S: 'static>(
column_index: usize,
column_value: String,
column_count: usize,
record_batch: &RecordBatch,
) -> Result<S, QueryEngineError>
where
S: Serialize + DeserializeOwned + Debug
{
let headers = get_headers(record_batch.schema());
let mut data: HashMap<usize, Vec<String>> = HashMap::new();
data.entry(column_index)
.or_insert(vec![])
.push(column_value);
// Start at the second Column.
// Record-batch will contain a Column each with the data of each Column from the csv.
for i in 1..column_count {
if let Some(column) = record_batch
.column(i)
.as_any()
.downcast_ref::<StringArray>()
{
let val = column.value(column_index);
data.entry(column_index)
.or_insert(vec![])
.push(val.to_string());
}
}
let finish = data.get(&column_index).unwrap().to_owned();
let final_data: S = finalize_data_transfer::<S>(headers, finish)?;
Ok(final_data)
}
fn finalize_data_transfer<S: 'static>(
headers: Vec<String>,
column_data: Vec<String>,
) -> Result<S, QueryEngineError>
where
S: Serialize + DeserializeOwned + Debug
{
// let serialized_cmn =
// serde_json::to_string(&column_data).map_err(|err| QueryEngineError::SerdeIssue(err.to_string())).unwrap();
// let rows: S = serde_json::from_str(&*serialized_cmn)
// .map_err(|err| QueryEngineError::SerdeIssue(err.to_string()))?;
// println!("LASJ:LDSJLD:S {:?}", rows);
let daa = headers
.iter()
.zip(column_data.deref().iter())
.map(|(key, value)| (key.to_owned(), json!(value)))
.collect::<HashMap<String, serde_json::Value>>();
let serialized_column: Value =
serde_json::to_value(&daa).map_err(|err| QueryEngineError::SerdeIssue(err.to_string()))?;
let row: S = serde_json::from_value(serialized_column)
.map_err(|err| QueryEngineError::SerdeIssue(err.to_string()))?;
Ok(row)
}
fn get_headers(schema: SchemaRef) -> Vec<String> {
let mut headers = Vec::new();
let fields = schema.fields.as_ref();
for field in fields.iter() {
headers.push(field.name().to_string())
}
headers
}
Adding Debug down the chain of functions is the issue
Meta
rustc --version --verbose
:
1.69.0
Error output
thread 'rustc' panicked at 'forcing query with already existing `DepNode`
- query-key: Canonical { max_universe: U0, variables: [CanonicalVarInfo { kind: Region(U0) }, CanonicalVarInfo { kind: Region(U0) }, CanonicalVarInfo { kind: Region(U0) }], value: ParamEnvAnd { param_env: ParamEnv { caller_bounds: [Binder(TraitPredicate(<S as std::fmt::Debug>, polarity:Positive), []), Binder(TraitPredicate(<S as _::_serde::de::DeserializeOwned>, polarity:Positive), []), Binder(TraitPredicate(<S as _::_serde::Deserialize<'de>>, polarity:Positive), [Region(BrNamed(DefId(64:1330 ~ serde[ab07]::de::DeserializeOwned::'de), 'de))]), Binder(TraitPredicate(<S as _::_serde::Serialize>, polarity:Positive), []), Binder(TraitPredicate(<S as std::marker::Sized>, polarity:Positive), []), Binder(OutlivesPredicate(S, ReStatic), [])], reveal: UserFacing, constness: NotConst }, value: Normalize { value: [async fn body@src\main.rs:147:1: 162:2] } } }
- dep-node: type_op_normalize_ty(d01b879686a5fa54-891de594ba26c99e)', /rustc/84c898d65adf2f39a5a98507f1fe0ce10a2b8dbc\compiler\rustc_query_system\src\dep_graph\graph.rs:319:9
Backtrace
stack backtrace:
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
error: the compiler unexpectedly panicked. this is a bug.
note: we would appreciate a bug report: https://github.com/rust-lang/rust/issues/new?labels=C-bug%2C+I-ICE%2C+T-compiler&template=ice.md
note: rustc 1.69.0 (84c898d65 2023-04-16) running on x86_64-pc-windows-msvc
note: compiler flags: --crate-type bin -C embed-bitcode=no -C debuginfo=2 -C incremental=[REDACTED]
note: some of the compiler flags provided by cargo are hidden
query stack during panic:
#0 [type_op_normalize_ty] normalizing `[async fn body@src\main.rs:147:1: 162:2]`
#1 [mir_borrowck] borrow-checking `process_batch::{closure#0}`
#2 [mir_borrowck] borrow-checking `process_batch`
#3 [analysis] running analysis passes on this crate
end of query stack
Metadata
Metadata
Assignees
Labels
Area: Async & AwaitArea: The rustc query system (https://rustc-dev-guide.rust-lang.org/query.html)Async-await issues that have been triaged during a working group meeting.Category: This is a bug.Call for participation: This issue has a repro, but needs a Minimal Complete and Verifiable ExampleIssue: The compiler panicked, giving an Internal Compilation Error (ICE) ❄️Relevant to the compiler team, which will review and decide on the PR/issue.