Description
What kind an issue is this?
- Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.
Issue description
I am trying to use a basic SQL IN statement to match a field value module_loaded
from security event logs with a list of values.
SECURITY EVENT SAMPLE (ONE)
When I run the following query in Kibana:
event_id:7 AND process_name:"powershell.exe" AND module_loaded:*samlib.dll
I get to the event I want.
{
"_index": "logs-endpoint-winevent-sysmon-2019.05.18",
"_type": "_doc",
"_id": "7436fb5dc506bdd5fbc482a5c4a726402ab38e19",
"_version": 1,
"_score": null,
"_source": {
"module_loaded": "c:\\windows\\system32\\samlib.dll",
"process_name": "powershell.exe",
"process_guid": "03ba39f5-50b2-5ce0-0000-00109995c501",
"process_path": "c:\\windows\\system32\\windowspowershell\\v1.0\\powershell.exe",
"beat_hostname": "WECserver",
"action": "moduleload",
"event_id": 7,
"beat_version": "6.7.0",
}
}
DATA SAMPLE (TWO)
When I run the following query in Kibana:
event_id:7 AND process_name:"powershell.exe" AND module_loaded:*hid.dll
I get to the event I want:
{
"_index": "logs-endpoint-winevent-sysmon-2019.05.18",
"_type": "_doc",
"_id": "25f618931e3c411d648e9db3e416db0c0cf0e1e8",
"_version": 1,
"_score": null,
"_source": {
"module_loaded": "c:\\windows\\system32\\hid.dll",
"process_name": "powershell.exe",
"process_guid": "03ba39f5-50b2-5ce0-0000-00109995c501",
"process_path": "c:\\windows\\system32\\windowspowershell\\v1.0\\powershell.exe",
"beat_hostname": "WECserver",
"action": "moduleload",
"event_id": 7,
"beat_version": "6.7.0",
}
}
Now I want to reproduce something similar with Apache SparkSQL via PySparK. I start by initializing the SprakSession and registering a SQL table mapped to the index I am using to find the two records above.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("HELK JOIN") \
.master("spark://helk-spark-master:7077") \
.enableHiveSupport() \
.getOrCreate()
es_reader = (spark.read
.format("org.elasticsearch.spark.sql")
.option("inferSchema", "true")
.option("es.read.field.as.array.include", "tags")
.option("es.nodes","helk-elasticsearch:9200")
.option("es.net.http.auth.user","elastic")
)
security_df = es_reader.load("logs-endpoint-winevent-sysmon-*/")
security_df.createOrReplaceTempView("sysmon_events")
Now I can run SQL queries on it. I want to use the SQL IN statement and replicate the first query I ran early.
module_loaded = spark.sql(
'''
SELECT event_id,
host_name,
process_name,
module_loaded
FROM sysmon_events
WHERE event_id = 7
AND module_loaded IN ("c:\\\windows\\\system32\\\samlib.dll")
'''
)
module_loaded.show(5,False)
+--------+---------------+--------------+------------------------------+
|event_id|host_name |process_name |module_loaded |
+--------+---------------+--------------+------------------------------+
|7 |it001.shire.com|powershell.exe|c:\windows\system32\samlib.dll|
|7 |hr001.shire.com|powershell.exe|c:\windows\system32\samlib.dll|
|7 |hr001.shire.com|powershell.exe|c:\windows\system32\samlib.dll|
|7 |it001.shire.com|powershell.exe|c:\windows\system32\samlib.dll|
|7 |hr001.shire.com|svchost.exe |c:\windows\system32\samlib.dll|
+--------+---------------+--------------+------------------------------+
only showing top 5 rows
That works. However, what if I want to have both values c:\windows\system32\samlib.dll
and c:\windows\system32\hid.dll
inside of the IN
statement? it unfortunately fails:
module_loaded = spark.sql(
'''
SELECT event_id,
host_name,
process_name,
module_loaded
FROM sysmon_events
WHERE event_id = 7
AND module_loaded IN ("c:\\\windows\\\system32\\\samlib.dll", "c:\\\windows\\\system32\\\hid.dll")
'''
)
module_loaded.show(5,False)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-120-45c477530d44> in <module>
----> 1 module_loaded.show(5,False)
/opt/helk/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
378 print(self._jdf.showString(n, 20, vertical))
379 else:
--> 380 print(self._jdf.showString(n, int(truncate), vertical))
381
382 def __repr__(self):
/opt/helk/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/opt/helk/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/opt/helk/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o1384.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 68.0 failed 4 times, most recent failure: Lost task 0.3 in stage 68.0 (TID 175, 172.18.0.4, executor 24): org.apache.spark.util.TaskCompletionListenerException: Failed to parse filter: {"bool":{"should":[{"match":{"module_loaded":"c:\windows\system32\samlib.dll c:\windows\system32\hid.lab"}}]}}
Previous exception in task: Failed to parse filter: {"bool":{"should":[{"match":{"module_loaded":"c:\windows\system32\samlib.dll c:\windows\system32\hid.lab"}}]}}
org.elasticsearch.hadoop.rest.query.QueryUtils.parseFilters(QueryUtils.java:74)
org.elasticsearch.hadoop.rest.RestService.createReader(RestService.java:453)
org.elasticsearch.spark.rdd.AbstractEsRDDIterator.reader$lzycompute(AbstractEsRDDIterator.scala:49)
org.elasticsearch.spark.rdd.AbstractEsRDDIterator.reader(AbstractEsRDDIterator.scala:42)
org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:61)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
org.apache.spark.scheduler.Task.run(Task.scala:121)
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at sun.reflect.GeneratedMethodAccessor82.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Version Info
OS: : Official Elasticsearch Docker Image 7.1.0 (CentOS Linux 7 (Core))
JVM : OpenJDK 1.8.0_191 (SPARK)
Hadoop/Spark: 2.4.3
ES-Hadoop : 7.1.0
ES : 7.1.0