Skip to content

ICE: type_op_normalize_ty: forcing query with already existing DepNode #112054

Open
@TimKotowski

Description

@TimKotowski

Code

use anyhow::Error;
use arrow::array::{Array, ArrayData, ArrayRef, StringArray};
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;
use csv::ReaderBuilder;
use datafusion::datasource::MemTable;
use datafusion::prelude::*;
use datafusion::scalar::ScalarValue;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::Debug;
use std::fs::File;
use std::io::{BufReader, Cursor, Read};
use std::ops::Deref;
use std::sync::Arc;
use arrow::ipc::{RecordBatchArgs, RecordBatchBuilder};
use datafusion::row::layout::RowLayout;
use thiserror::Error;
use serde_json::{json, Value};

#[derive(Serialize, Deserialize, Debug)]
struct SchemaTest {
    #[serde(rename(serialize = "firstname"))]
    #[serde(rename(deserialize = "firstname"))]
    first_name: String,

    #[serde(rename(serialize = "lastname"))]
    #[serde(rename(deserialize = "lastname"))]
    last_name: String,

    profession: String,

    id: isize
}

#[derive(Error, Debug)]
pub enum QueryEngineError {
    #[error("fields unable to serde")]
    SerdeIssue(String),
    #[error("unknown data store error")]
    Unknown,
}

#[tokio::main]
async fn main() -> Result<(), Error> {

    // This is not needed in the crate, but needed for testing.
    let file = File::open("./src/myFiles.csv").unwrap();
    let mut buf_reader = BufReader::new(file);
    let mut contents: Vec<u8> = Vec::new();
    buf_reader.read_to_end(&mut contents).unwrap();

    let config =
        SessionConfig::new().set("datafusion.execution.batch_size", ScalarValue::from(100000));

    let ctx = SessionContext::with_config(config);
    // Cursor wraps the in memory buffer to be able to implement Read trait, so it can be a Reader.
    let mut cursor = Cursor::new(contents);
    let mut csv_reader = ReaderBuilder::new()
        .has_headers(true)
        .from_reader(&mut cursor);

    let mut header_tracker: HashMap<usize, Vec<ArrayRef>> = HashMap::new();
    let mut header_tracker_key = 0;
    let headers = csv_reader.headers().unwrap();
    headers.iter().for_each(|_| {
        header_tracker.insert(header_tracker_key, Vec::new());
        header_tracker_key += 1
    });

    let fields: Vec<Field> = headers
        .iter()
        .map(|header| Field::new(header, DataType::Utf8, false))
        .collect();
    let schema1 = Arc::new(Schema::new(fields));

    cursor.set_position(0);
    let mut reader = arrow::csv::ReaderBuilder::new(schema1.clone())
        .with_batch_size(4000)
        .has_header(true)
        .build_buffered(cursor)
        .unwrap();

    let mut batches = Vec::new();
    // RecordBatch should contains Columns for each csv column. Due to the batching size in reader,
    // Must propagte the correect columns toegther then later combine those columnd that match
    // to allow fast index grabbing down the line

    for batch in reader.into_iter() {
        batches.push(batch.unwrap());
    }
    // reader
    //     .into_iter()
    //     .flat_map(|batch| batch.into_iter())
    //     .for_each(|record_batch| {
    //         println!("{:?}", record_batch);
    //         record_batch.columns().iter().enumerate().for_each(|(index, c)| {
    //             println!("{:?}", c);
    //                 header_tracker.entry(index).or_insert(vec![]).push(c.clone());
    //         });
    //     });


    // Register a new MemTable with the context
    let table = MemTable::try_new(schema1.clone(), vec![batches]).unwrap();

    ctx.register_table("example", Arc::new(table)).unwrap();
    let df = ctx.sql("SELECT * FROM example").await.unwrap();

    let results: Vec<RecordBatch> = df.collect().await.unwrap();
    if results.is_empty() {
        return Ok(());
    }

    let all_data = read_and_process_columns::<SchemaTest>(results.first().unwrap()).await;

    return Ok(());
}

async fn read_and_process_columns<S: 'static>(
    record_batch: &RecordBatch,
) -> Result<Vec<S>, QueryEngineError>
where
    S: Serialize + DeserializeOwned + Debug
{
    let column_count = record_batch.columns().len();
    let column = record_batch.columns().first().unwrap();

    // TODO handle large amounts of data. There could be columns with data over 50k +
    // Either shoot off only 4 tokio spawns wait for them and shoot off more to allow fast
    // data parsing. Tokio spawns are light weight but dont want the user to feel that this
    // crate has some resources/memory exhaustion.
    // Or sequentially process in batched start in intervals of 10000.
    let result = process_batch::<S>(column, record_batch, column_count).await?;
    Ok(result)
}

#[inline]
async fn process_batch<S: 'static>(
    column: &ArrayRef,
    record_batch: &RecordBatch,
    column_count: usize,
) -> Result<Vec<S>, QueryEngineError>
where
    S: Serialize + DeserializeOwned + Debug
{
    let mut newish: Vec<S> = Vec::new();
    let column_data = column.as_any().downcast_ref::<StringArray>().unwrap();
    for (column_index, column_value) in column_data.iter().enumerate() {
        newish.push(
            read_and_process_column::<S>(
                column_index,
                detect_empty_column_value(column_value),
                column_count,
                record_batch,
            )
            .await?,
        )
    }
    Ok(newish)
}

fn detect_empty_column_value(column_value: Option<&str>) -> String {
    let column_val = match column_value {
        Some(value) => value.to_string(),
        None => "".to_string(),
    };

    column_val
}

#[inline]
async fn read_and_process_column<S: 'static>(
    column_index: usize,
    column_value: String,
    column_count: usize,
    record_batch: &RecordBatch,
) -> Result<S, QueryEngineError>
where
    S: Serialize + DeserializeOwned + Debug
{
    let headers = get_headers(record_batch.schema());
    let mut data: HashMap<usize, Vec<String>> = HashMap::new();
    data.entry(column_index)
        .or_insert(vec![])
        .push(column_value);

    // Start at the second Column.
    // Record-batch will contain a Column each with the data of each Column from the csv.
    for i in 1..column_count {
        if let Some(column) = record_batch
            .column(i)
            .as_any()
            .downcast_ref::<StringArray>()
        {
            let val = column.value(column_index);
            data.entry(column_index)
                .or_insert(vec![])
                .push(val.to_string());
        }
    }

    let finish = data.get(&column_index).unwrap().to_owned();
    let final_data: S = finalize_data_transfer::<S>(headers, finish)?;

    Ok(final_data)
}

fn finalize_data_transfer<S: 'static>(
    headers: Vec<String>,
    column_data: Vec<String>,
) -> Result<S, QueryEngineError>
where
    S: Serialize + DeserializeOwned + Debug
{


    // let serialized_cmn  =
    //     serde_json::to_string(&column_data).map_err(|err| QueryEngineError::SerdeIssue(err.to_string())).unwrap();

    // let rows: S = serde_json::from_str(&*serialized_cmn)
    //     .map_err(|err| QueryEngineError::SerdeIssue(err.to_string()))?;

    // println!("LASJ:LDSJLD:S {:?}", rows);
    let daa = headers
        .iter()
        .zip(column_data.deref().iter())
        .map(|(key, value)| (key.to_owned(), json!(value)))
        .collect::<HashMap<String, serde_json::Value>>();

    let serialized_column: Value =
        serde_json::to_value(&daa).map_err(|err| QueryEngineError::SerdeIssue(err.to_string()))?;

    let row: S = serde_json::from_value(serialized_column)
        .map_err(|err| QueryEngineError::SerdeIssue(err.to_string()))?;

    Ok(row)
}

fn get_headers(schema: SchemaRef) -> Vec<String> {
    let mut headers = Vec::new();
    let fields = schema.fields.as_ref();
    for field in fields.iter() {
        headers.push(field.name().to_string())
    }

    headers
}

Adding Debug down the chain of functions is the issue

Meta

rustc --version --verbose:

1.69.0

Error output

thread 'rustc' panicked at 'forcing query with already existing `DepNode`
- query-key: Canonical { max_universe: U0, variables: [CanonicalVarInfo { kind: Region(U0) }, CanonicalVarInfo { kind: Region(U0) }, CanonicalVarInfo { kind: Region(U0) }], value: ParamEnvAnd { param_env: ParamEnv { caller_bounds: [Binder(TraitPredicate(<S as std::fmt::Debug>, polarity:Positive), []), Binder(TraitPredicate(<S as _::_serde::de::DeserializeOwned>, polarity:Positive), []), Binder(TraitPredicate(<S as _::_serde::Deserialize<'de>>, polarity:Positive), [Region(BrNamed(DefId(64:1330 ~ serde[ab07]::de::DeserializeOwned::'de), 'de))]), Binder(TraitPredicate(<S as _::_serde::Serialize>, polarity:Positive), []), Binder(TraitPredicate(<S as std::marker::Sized>, polarity:Positive), []), Binder(OutlivesPredicate(S, ReStatic), [])], reveal: UserFacing, constness: NotConst }, value: Normalize { value: [async fn body@src\main.rs:147:1: 162:2] } } }
- dep-node: type_op_normalize_ty(d01b879686a5fa54-891de594ba26c99e)', /rustc/84c898d65adf2f39a5a98507f1fe0ce10a2b8dbc\compiler\rustc_query_system\src\dep_graph\graph.rs:319:9
Backtrace

stack backtrace:
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
error: the compiler unexpectedly panicked. this is a bug.
note: we would appreciate a bug report: https://github.com/rust-lang/rust/issues/new?labels=C-bug%2C+I-ICE%2C+T-compiler&template=ice.md
note: rustc 1.69.0 (84c898d65 2023-04-16) running on x86_64-pc-windows-msvc
note: compiler flags: --crate-type bin -C embed-bitcode=no -C debuginfo=2 -C incremental=[REDACTED]
note: some of the compiler flags provided by cargo are hidden
query stack during panic:
#0 [type_op_normalize_ty] normalizing `[async fn body@src\main.rs:147:1: 162:2]`
#1 [mir_borrowck] borrow-checking `process_batch::{closure#0}`
#2 [mir_borrowck] borrow-checking `process_batch`
#3 [analysis] running analysis passes on this crate
end of query stack

Metadata

Metadata

Assignees

No one assigned

    Labels

    A-async-awaitArea: Async & AwaitA-query-systemArea: The rustc query system (https://rustc-dev-guide.rust-lang.org/query.html)AsyncAwait-TriagedAsync-await issues that have been triaged during a working group meeting.C-bugCategory: This is a bug.E-needs-mcveCall for participation: This issue has a repro, but needs a Minimal Complete and Verifiable ExampleI-ICEIssue: The compiler panicked, giving an Internal Compilation Error (ICE) ❄️T-compilerRelevant to the compiler team, which will review and decide on the PR/issue.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions