i using hivecontext read json file using following code:
df = hive_context.read.json("/users/duttaam/downloads/test.json") df.registertemptable("df");
by default spark determined following schema
root |-- id: string (nullable = true) |-- profiles: array (nullable = true) | |-- element: struct (containsnull = true) | | |-- app_id: string (nullable = true) | | |-- localytics: struct (nullable = true) | | | |-- attributes: struct (nullable = true) | | | | |-- ap: long (nullable = true) | | | | |-- app_version: string (nullable = true) | | | | |-- birthdate: string (nullable = true) | | | | |-- country: string (nullable = true) | | | | |-- device_timezone: string (nullable = true) | | | | |-- language: string (nullable = true) | | | | |-- last_session_date: string (nullable = true) | | | | |-- library_version: string (nullable = true) | | | | |-- os_version: string (nullable = true) | | | | |-- push_enabled: long (nullable = true) | | | | |-- total_sessions: long (nullable = true) | | | | |-- user_type: string (nullable = true)
my json looks follows
{ "id": "dsdasdasdsd", "profiles": [ { "attributes": { "mdn": "eoe/w/5ru1kapdmqq/wq\n/pu/tgrwpa==" }, "localytics": { "attributes": { "last_session_date": "2016-07-17", "device_timezone": "-04:00", "country": "us", "language": "en", "user_type": "known", "city_name": "indianapolis" } } }, { "app_id": "sdas-c824fcf6-bbae-11e5-adasda-asasqwvz", "attributes": { "automatic backup user": "no" }, "localytics": { "attributes": { "last_session_date": "2016-07-17", "os_version": "6.2.1", "app_version": "16.2.19.1", "library_version": "androida_3.7.0", "ap": 1, "custom_1": "unknown (not logged in)", "total_sessions": 4, "birthdate": "2016-07-09", "push_enabled": 1, "user_type": "known", "custom_0": "unknown (not logged in)", "seconds_since_last_session": 1457 } } } ] }
so default spark not capturing attributes fields in both profiles. there way can custom code ad change schema structure?
thanks in advance.
regards, amit
you can try using hivecontxt.jsonfile(infile):
from pyspark import sparkcontext pyspark.sql import hivecontext import json sc = sparkcontext() hive_contxt = hivecontext(sc) your_schema = hive_contxt.jsonfile(infile) your_schema.registertemptable('your title')
you can query using hive_context.sql(your query).collect()
you can try dumping json memory using hive_context.jsonrdd(json_dumped_object)
def make_json_single_row(row, field_names): row_lst = row.split(';') return json.dumps(dict(zip(field_names, row_lst))) def make_json(rdd, field_names): return rdd.map(lambda row: make_json_single_row(row, field_names) field_names = ['column1','column2','column3'] rdd = sc.textfile(infile) split_rdd = make_json(rdd, field_names) your_new_schema = hive_contxt.jsonrdd(split_rdd)
Comments
Post a Comment