@@ -31,9 +31,11 @@ use datafusion::common::UnnestOptions;
31
31
use datafusion:: config:: { CsvOptions , TableParquetOptions } ;
32
32
use datafusion:: dataframe:: { DataFrame , DataFrameWriteOptions } ;
33
33
use datafusion:: datasource:: TableProvider ;
34
+ use datafusion:: error:: DataFusionError ;
34
35
use datafusion:: execution:: SendableRecordBatchStream ;
35
36
use datafusion:: parquet:: basic:: { BrotliLevel , Compression , GzipLevel , ZstdLevel } ;
36
37
use datafusion:: prelude:: * ;
38
+ use futures:: { StreamExt , TryStreamExt } ;
37
39
use pyo3:: exceptions:: PyValueError ;
38
40
use pyo3:: prelude:: * ;
39
41
use pyo3:: pybacked:: PyBackedStr ;
@@ -70,6 +72,9 @@ impl PyTableProvider {
70
72
PyTable :: new ( table_provider)
71
73
}
72
74
}
75
+ const MAX_TABLE_BYTES_TO_DISPLAY : usize = 2 * 1024 * 1024 ; // 2 MB
76
+ const MIN_TABLE_ROWS_TO_DISPLAY : usize = 20 ;
77
+ const MAX_LENGTH_CELL_WITHOUT_MINIMIZE : usize = 25 ;
73
78
74
79
/// A PyDataFrame is a representation of a logical plan and an API to compose statements.
75
80
/// Use it to build a plan and `.collect()` to execute the plan and collect the result.
@@ -111,56 +116,151 @@ impl PyDataFrame {
111
116
}
112
117
113
118
fn __repr__ ( & self , py : Python ) -> PyDataFusionResult < String > {
114
- let df = self . df . as_ref ( ) . clone ( ) . limit ( 0 , Some ( 10 ) ) ?;
115
- let batches = wait_for_future ( py, df. collect ( ) ) ?;
116
- let batches_as_string = pretty:: pretty_format_batches ( & batches) ;
117
- match batches_as_string {
118
- Ok ( batch) => Ok ( format ! ( "DataFrame()\n {batch}" ) ) ,
119
- Err ( err) => Ok ( format ! ( "Error: {:?}" , err. to_string( ) ) ) ,
119
+ let ( batches, has_more) = wait_for_future (
120
+ py,
121
+ collect_record_batches_to_display ( self . df . as_ref ( ) . clone ( ) , 10 , 10 ) ,
122
+ ) ?;
123
+ if batches. is_empty ( ) {
124
+ // This should not be reached, but do it for safety since we index into the vector below
125
+ return Ok ( "No data to display" . to_string ( ) ) ;
120
126
}
121
- }
122
127
123
- fn _repr_html_ ( & self , py : Python ) -> PyDataFusionResult < String > {
124
- let mut html_str = "<table border='1'>\n " . to_string ( ) ;
128
+ let batches_as_displ =
129
+ pretty:: pretty_format_batches ( & batches) . map_err ( py_datafusion_err) ?;
130
+
131
+ let additional_str = match has_more {
132
+ true => "\n Data truncated." ,
133
+ false => "" ,
134
+ } ;
125
135
126
- let df = self . df . as_ref ( ) . clone ( ) . limit ( 0 , Some ( 10 ) ) ? ;
127
- let batches = wait_for_future ( py , df . collect ( ) ) ? ;
136
+ Ok ( format ! ( "DataFrame() \n {batches_as_displ}{additional_str}" ) )
137
+ }
128
138
139
+ fn _repr_html_ ( & self , py : Python ) -> PyDataFusionResult < String > {
140
+ let ( batches, has_more) = wait_for_future (
141
+ py,
142
+ collect_record_batches_to_display (
143
+ self . df . as_ref ( ) . clone ( ) ,
144
+ MIN_TABLE_ROWS_TO_DISPLAY ,
145
+ usize:: MAX ,
146
+ ) ,
147
+ ) ?;
129
148
if batches. is_empty ( ) {
130
- html_str . push_str ( "</table> \n " ) ;
131
- return Ok ( html_str ) ;
149
+ // This should not be reached, but do it for safety since we index into the vector below
150
+ return Ok ( "No data to display" . to_string ( ) ) ;
132
151
}
133
152
153
+ let table_uuid = uuid:: Uuid :: new_v4 ( ) . to_string ( ) ;
154
+
155
+ let mut html_str = "
156
+ <style>
157
+ .expandable-container {
158
+ display: inline-block;
159
+ max-width: 200px;
160
+ }
161
+ .expandable {
162
+ white-space: nowrap;
163
+ overflow: hidden;
164
+ text-overflow: ellipsis;
165
+ display: block;
166
+ }
167
+ .full-text {
168
+ display: none;
169
+ white-space: normal;
170
+ }
171
+ .expand-btn {
172
+ cursor: pointer;
173
+ color: blue;
174
+ text-decoration: underline;
175
+ border: none;
176
+ background: none;
177
+ font-size: inherit;
178
+ display: block;
179
+ margin-top: 5px;
180
+ }
181
+ </style>
182
+
183
+ <div style=\" width: 100%; max-width: 1000px; max-height: 300px; overflow: auto; border: 1px solid #ccc;\" >
184
+ <table style=\" border-collapse: collapse; min-width: 100%\" >
185
+ <thead>\n " . to_string ( ) ;
186
+
134
187
let schema = batches[ 0 ] . schema ( ) ;
135
188
136
189
let mut header = Vec :: new ( ) ;
137
190
for field in schema. fields ( ) {
138
- header. push ( format ! ( "<th>{}</td >" , field. name( ) ) ) ;
191
+ header. push ( format ! ( "<th style='border: 1px solid black; padding: 8px; text-align: left; background-color: #f2f2f2; white-space: nowrap; min-width: fit-content; max-width: fit-content;' >{}</th >" , field. name( ) ) ) ;
139
192
}
140
193
let header_str = header. join ( "" ) ;
141
- html_str. push_str ( & format ! ( "<tr>{}</tr>\n " , header_str) ) ;
142
-
143
- for batch in batches {
144
- let formatters = batch
145
- . columns ( )
146
- . iter ( )
147
- . map ( |c| ArrayFormatter :: try_new ( c. as_ref ( ) , & FormatOptions :: default ( ) ) )
148
- . map ( |c| {
149
- c. map_err ( |e| PyValueError :: new_err ( format ! ( "Error: {:?}" , e. to_string( ) ) ) )
150
- } )
151
- . collect :: < Result < Vec < _ > , _ > > ( ) ?;
152
-
153
- for row in 0 ..batch. num_rows ( ) {
194
+ html_str. push_str ( & format ! ( "<tr>{}</tr></thead><tbody>\n " , header_str) ) ;
195
+
196
+ let batch_formatters = batches
197
+ . iter ( )
198
+ . map ( |batch| {
199
+ batch
200
+ . columns ( )
201
+ . iter ( )
202
+ . map ( |c| ArrayFormatter :: try_new ( c. as_ref ( ) , & FormatOptions :: default ( ) ) )
203
+ . map ( |c| {
204
+ c. map_err ( |e| PyValueError :: new_err ( format ! ( "Error: {:?}" , e. to_string( ) ) ) )
205
+ } )
206
+ . collect :: < Result < Vec < _ > , _ > > ( )
207
+ } )
208
+ . collect :: < Result < Vec < _ > , _ > > ( ) ?;
209
+
210
+ let rows_per_batch = batches. iter ( ) . map ( |batch| batch. num_rows ( ) ) ;
211
+
212
+ // We need to build up row by row for html
213
+ let mut table_row = 0 ;
214
+ for ( batch_formatter, num_rows_in_batch) in batch_formatters. iter ( ) . zip ( rows_per_batch) {
215
+ for batch_row in 0 ..num_rows_in_batch {
216
+ table_row += 1 ;
154
217
let mut cells = Vec :: new ( ) ;
155
- for formatter in & formatters {
156
- cells. push ( format ! ( "<td>{}</td>" , formatter. value( row) ) ) ;
218
+ for ( col, formatter) in batch_formatter. iter ( ) . enumerate ( ) {
219
+ let cell_data = formatter. value ( batch_row) . to_string ( ) ;
220
+ // From testing, primitive data types do not typically get larger than 21 characters
221
+ if cell_data. len ( ) > MAX_LENGTH_CELL_WITHOUT_MINIMIZE {
222
+ let short_cell_data = & cell_data[ 0 ..MAX_LENGTH_CELL_WITHOUT_MINIMIZE ] ;
223
+ cells. push ( format ! ( "
224
+ <td style='border: 1px solid black; padding: 8px; text-align: left; white-space: nowrap;'>
225
+ <div class=\" expandable-container\" >
226
+ <span class=\" expandable\" id=\" {table_uuid}-min-text-{table_row}-{col}\" >{short_cell_data}</span>
227
+ <span class=\" full-text\" id=\" {table_uuid}-full-text-{table_row}-{col}\" >{cell_data}</span>
228
+ <button class=\" expand-btn\" onclick=\" toggleDataFrameCellText('{table_uuid}',{table_row},{col})\" >...</button>
229
+ </div>
230
+ </td>" ) ) ;
231
+ } else {
232
+ cells. push ( format ! ( "<td style='border: 1px solid black; padding: 8px; text-align: left; white-space: nowrap;'>{}</td>" , formatter. value( batch_row) ) ) ;
233
+ }
157
234
}
158
235
let row_str = cells. join ( "" ) ;
159
236
html_str. push_str ( & format ! ( "<tr>{}</tr>\n " , row_str) ) ;
160
237
}
161
238
}
239
+ html_str. push_str ( "</tbody></table></div>\n " ) ;
240
+
241
+ html_str. push_str ( "
242
+ <script>
243
+ function toggleDataFrameCellText(table_uuid, row, col) {
244
+ var shortText = document.getElementById(table_uuid + \" -min-text-\" + row + \" -\" + col);
245
+ var fullText = document.getElementById(table_uuid + \" -full-text-\" + row + \" -\" + col);
246
+ var button = event.target;
247
+
248
+ if (fullText.style.display === \" none\" ) {
249
+ shortText.style.display = \" none\" ;
250
+ fullText.style.display = \" inline\" ;
251
+ button.textContent = \" (less)\" ;
252
+ } else {
253
+ shortText.style.display = \" inline\" ;
254
+ fullText.style.display = \" none\" ;
255
+ button.textContent = \" ...\" ;
256
+ }
257
+ }
258
+ </script>
259
+ " ) ;
162
260
163
- html_str. push_str ( "</table>\n " ) ;
261
+ if has_more {
262
+ html_str. push_str ( "Data truncated due to size." ) ;
263
+ }
164
264
165
265
Ok ( html_str)
166
266
}
@@ -771,3 +871,83 @@ fn record_batch_into_schema(
771
871
772
872
RecordBatch :: try_new ( schema, data_arrays)
773
873
}
874
+
875
+ /// This is a helper function to return the first non-empty record batch from executing a DataFrame.
876
+ /// It additionally returns a bool, which indicates if there are more record batches available.
877
+ /// We do this so we can determine if we should indicate to the user that the data has been
878
+ /// truncated. This collects until we have achived both of these two conditions
879
+ ///
880
+ /// - We have collected our minimum number of rows
881
+ /// - We have reached our limit, either data size or maximum number of rows
882
+ ///
883
+ /// Otherwise it will return when the stream has exhausted. If you want a specific number of
884
+ /// rows, set min_rows == max_rows.
885
+ async fn collect_record_batches_to_display (
886
+ df : DataFrame ,
887
+ min_rows : usize ,
888
+ max_rows : usize ,
889
+ ) -> Result < ( Vec < RecordBatch > , bool ) , DataFusionError > {
890
+ let partitioned_stream = df. execute_stream_partitioned ( ) . await ?;
891
+ let mut stream = futures:: stream:: iter ( partitioned_stream) . flatten ( ) ;
892
+ let mut size_estimate_so_far = 0 ;
893
+ let mut rows_so_far = 0 ;
894
+ let mut record_batches = Vec :: default ( ) ;
895
+ let mut has_more = false ;
896
+
897
+ while ( size_estimate_so_far < MAX_TABLE_BYTES_TO_DISPLAY && rows_so_far < max_rows)
898
+ || rows_so_far < min_rows
899
+ {
900
+ let mut rb = match stream. next ( ) . await {
901
+ None => {
902
+ break ;
903
+ }
904
+ Some ( Ok ( r) ) => r,
905
+ Some ( Err ( e) ) => return Err ( e) ,
906
+ } ;
907
+
908
+ let mut rows_in_rb = rb. num_rows ( ) ;
909
+ if rows_in_rb > 0 {
910
+ size_estimate_so_far += rb. get_array_memory_size ( ) ;
911
+
912
+ if size_estimate_so_far > MAX_TABLE_BYTES_TO_DISPLAY {
913
+ let ratio = MAX_TABLE_BYTES_TO_DISPLAY as f32 / size_estimate_so_far as f32 ;
914
+ let total_rows = rows_in_rb + rows_so_far;
915
+
916
+ let mut reduced_row_num = ( total_rows as f32 * ratio) . round ( ) as usize ;
917
+ if reduced_row_num < min_rows {
918
+ reduced_row_num = min_rows. min ( total_rows) ;
919
+ }
920
+
921
+ let limited_rows_this_rb = reduced_row_num - rows_so_far;
922
+ if limited_rows_this_rb < rows_in_rb {
923
+ rows_in_rb = limited_rows_this_rb;
924
+ rb = rb. slice ( 0 , limited_rows_this_rb) ;
925
+ has_more = true ;
926
+ }
927
+ }
928
+
929
+ if rows_in_rb + rows_so_far > max_rows {
930
+ rb = rb. slice ( 0 , max_rows - rows_so_far) ;
931
+ has_more = true ;
932
+ }
933
+
934
+ rows_so_far += rb. num_rows ( ) ;
935
+ record_batches. push ( rb) ;
936
+ }
937
+ }
938
+
939
+ if record_batches. is_empty ( ) {
940
+ return Ok ( ( Vec :: default ( ) , false ) ) ;
941
+ }
942
+
943
+ if !has_more {
944
+ // Data was not already truncated, so check to see if more record batches remain
945
+ has_more = match stream. try_next ( ) . await {
946
+ Ok ( None ) => false , // reached end
947
+ Ok ( Some ( _) ) => true ,
948
+ Err ( _) => false , // Stream disconnected
949
+ } ;
950
+ }
951
+
952
+ Ok ( ( record_batches, has_more) )
953
+ }
0 commit comments