Skip to content

ES-Hadoop and PySpark SQL Failing with IN Statement #1307

Open
@Cyb3rWard0g

Description

@Cyb3rWard0g

What kind an issue is this?

  • Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.

Issue description

I am trying to use a basic SQL IN statement to match a field value module_loaded from security event logs with a list of values.

SECURITY EVENT SAMPLE (ONE)

When I run the following query in Kibana:
event_id:7 AND process_name:"powershell.exe" AND module_loaded:*samlib.dll

I get to the event I want.

{
  "_index": "logs-endpoint-winevent-sysmon-2019.05.18",
  "_type": "_doc",
  "_id": "7436fb5dc506bdd5fbc482a5c4a726402ab38e19",
  "_version": 1,
  "_score": null,
  "_source": {
    "module_loaded": "c:\\windows\\system32\\samlib.dll",
    "process_name": "powershell.exe",
    "process_guid": "03ba39f5-50b2-5ce0-0000-00109995c501",
    "process_path": "c:\\windows\\system32\\windowspowershell\\v1.0\\powershell.exe",
    "beat_hostname": "WECserver",
    "action": "moduleload",
    "event_id": 7,
    "beat_version": "6.7.0",
  }
}

DATA SAMPLE (TWO)
When I run the following query in Kibana:
event_id:7 AND process_name:"powershell.exe" AND module_loaded:*hid.dll

I get to the event I want:

{
  "_index": "logs-endpoint-winevent-sysmon-2019.05.18",
  "_type": "_doc",
  "_id": "25f618931e3c411d648e9db3e416db0c0cf0e1e8",
  "_version": 1,
  "_score": null,
  "_source": {
    "module_loaded": "c:\\windows\\system32\\hid.dll",
    "process_name": "powershell.exe",
    "process_guid": "03ba39f5-50b2-5ce0-0000-00109995c501",
    "process_path": "c:\\windows\\system32\\windowspowershell\\v1.0\\powershell.exe",
    "beat_hostname": "WECserver",
    "action": "moduleload",
    "event_id": 7,
    "beat_version": "6.7.0",
  }
}

Now I want to reproduce something similar with Apache SparkSQL via PySparK. I start by initializing the SprakSession and registering a SQL table mapped to the index I am using to find the two records above.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("HELK JOIN") \
    .master("spark://helk-spark-master:7077") \
    .enableHiveSupport() \
    .getOrCreate()

es_reader = (spark.read
    .format("org.elasticsearch.spark.sql")
    .option("inferSchema", "true")
    .option("es.read.field.as.array.include", "tags")
    .option("es.nodes","helk-elasticsearch:9200")
    .option("es.net.http.auth.user","elastic")
)

security_df = es_reader.load("logs-endpoint-winevent-sysmon-*/")
security_df.createOrReplaceTempView("sysmon_events")

Now I can run SQL queries on it. I want to use the SQL IN statement and replicate the first query I ran early.

module_loaded = spark.sql(
    '''
    SELECT event_id,
        host_name,
        process_name,
        module_loaded
    FROM sysmon_events
    WHERE event_id = 7
        AND module_loaded IN ("c:\\\windows\\\system32\\\samlib.dll")
    '''
)
module_loaded.show(5,False)
+--------+---------------+--------------+------------------------------+
|event_id|host_name      |process_name  |module_loaded                 |
+--------+---------------+--------------+------------------------------+
|7       |it001.shire.com|powershell.exe|c:\windows\system32\samlib.dll|
|7       |hr001.shire.com|powershell.exe|c:\windows\system32\samlib.dll|
|7       |hr001.shire.com|powershell.exe|c:\windows\system32\samlib.dll|
|7       |it001.shire.com|powershell.exe|c:\windows\system32\samlib.dll|
|7       |hr001.shire.com|svchost.exe   |c:\windows\system32\samlib.dll|
+--------+---------------+--------------+------------------------------+
only showing top 5 rows

That works. However, what if I want to have both values c:\windows\system32\samlib.dll and c:\windows\system32\hid.dll inside of the IN statement? it unfortunately fails:

module_loaded = spark.sql(
    '''
    SELECT event_id,
        host_name,
        process_name,
        module_loaded
    FROM sysmon_events
    WHERE event_id = 7
        AND module_loaded IN ("c:\\\windows\\\system32\\\samlib.dll", "c:\\\windows\\\system32\\\hid.dll")
    '''
)
module_loaded.show(5,False)

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-120-45c477530d44> in <module>
----> 1 module_loaded.show(5,False)

/opt/helk/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    378             print(self._jdf.showString(n, 20, vertical))
    379         else:
--> 380             print(self._jdf.showString(n, int(truncate), vertical))
    381 
    382     def __repr__(self):

/opt/helk/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/opt/helk/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/opt/helk/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o1384.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 68.0 failed 4 times, most recent failure: Lost task 0.3 in stage 68.0 (TID 175, 172.18.0.4, executor 24): org.apache.spark.util.TaskCompletionListenerException: Failed to parse filter: {"bool":{"should":[{"match":{"module_loaded":"c:\windows\system32\samlib.dll c:\windows\system32\hid.lab"}}]}}

Previous exception in task: Failed to parse filter: {"bool":{"should":[{"match":{"module_loaded":"c:\windows\system32\samlib.dll c:\windows\system32\hid.lab"}}]}}
	org.elasticsearch.hadoop.rest.query.QueryUtils.parseFilters(QueryUtils.java:74)
	org.elasticsearch.hadoop.rest.RestService.createReader(RestService.java:453)
	org.elasticsearch.spark.rdd.AbstractEsRDDIterator.reader$lzycompute(AbstractEsRDDIterator.scala:49)
	org.elasticsearch.spark.rdd.AbstractEsRDDIterator.reader(AbstractEsRDDIterator.scala:42)
	org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:61)
	scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	org.apache.spark.scheduler.Task.run(Task.scala:121)
	org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	java.lang.Thread.run(Thread.java:748)
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.GeneratedMethodAccessor82.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

Version Info

OS: : Official Elasticsearch Docker Image 7.1.0 (CentOS Linux 7 (Core))
JVM : OpenJDK 1.8.0_191 (SPARK)
Hadoop/Spark: 2.4.3
ES-Hadoop : 7.1.0
ES : 7.1.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions