Skip to content

Commit

Permalink
[SPARK-6781] [SQL] use sqlContext in python shell
Browse files Browse the repository at this point in the history
Use `sqlContext` in PySpark shell, make it consistent with SQL programming guide. `sqlCtx` is also kept for compatibility.

Author: Davies Liu <davies@databricks.com>

Closes #5425 from davies/sqlCtx and squashes the following commits:

af67340 [Davies Liu] sqlCtx -> sqlContext
15a278f [Davies Liu] use sqlContext in python shell
  • Loading branch information
Davies Liu authored and rxin committed Apr 8, 2015
1 parent 66159c3 commit 6ada4f6
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 70 deletions.
2 changes: 1 addition & 1 deletion docs/ml-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row, SQLContext

sc = SparkContext(appName="SimpleTextClassificationPipeline")
sqlCtx = SQLContext(sc)
sqlContext = SQLContext(sc)

# Prepare training documents, which are labeled.
LabeledDocument = Row("id", "text", "label")
Expand Down
4 changes: 2 additions & 2 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1642,15 +1642,15 @@ moved into the udf object in `SQLContext`.
<div data-lang="scala" markdown="1">
{% highlight java %}

sqlCtx.udf.register("strLen", (s: String) => s.length())
sqlContext.udf.register("strLen", (s: String) => s.length())

{% endhighlight %}
</div>

<div data-lang="java" markdown="1">
{% highlight java %}

sqlCtx.udf().register("strLen", (String s) -> { s.length(); });
sqlContext.udf().register("strLen", (String s) -> { s.length(); });

{% endhighlight %}
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void setAge(int age) {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
SQLContext sqlCtx = new SQLContext(ctx);
SQLContext sqlContext = new SQLContext(ctx);

System.out.println("=== Data source: RDD ===");
// Load a text file and convert each line to a Java Bean.
Expand All @@ -74,11 +74,11 @@ public Person call(String line) {
});

// Apply a schema to an RDD of Java Beans and register it as a table.
DataFrame schemaPeople = sqlCtx.createDataFrame(people, Person.class);
DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class);
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
Expand All @@ -99,12 +99,12 @@ public String call(Row row) {
// Read in the parquet file created above.
// Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a DataFrame.
DataFrame parquetFile = sqlCtx.parquetFile("people.parquet");
DataFrame parquetFile = sqlContext.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
DataFrame teenagers2 =
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
Expand All @@ -120,7 +120,7 @@ public String call(Row row) {
// The path can be either a single text file or a directory storing text files.
String path = "examples/src/main/resources/people.json";
// Create a DataFrame from the file(s) pointed by path
DataFrame peopleFromJsonFile = sqlCtx.jsonFile(path);
DataFrame peopleFromJsonFile = sqlContext.jsonFile(path);

// Because the schema of a JSON dataset is automatically inferred, to write queries,
// it is better to take a look at what is the schema.
Expand All @@ -133,8 +133,8 @@ public String call(Row row) {
// Register this DataFrame as a table.
peopleFromJsonFile.registerTempTable("people");

// SQL statements can be run by using the sql methods provided by sqlCtx.
DataFrame teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
// SQL statements can be run by using the sql methods provided by sqlContext.
DataFrame teenagers3 = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are DataFrame and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
Expand All @@ -151,7 +151,7 @@ public String call(Row row) {
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
DataFrame peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());
DataFrame peopleFromJsonRDD = sqlContext.jsonRDD(anotherPeopleRDD.rdd());

// Take a look at the schema of this new DataFrame.
peopleFromJsonRDD.printSchema();
Expand All @@ -164,7 +164,7 @@ public String call(Row row) {

peopleFromJsonRDD.registerTempTable("people2");

DataFrame peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
DataFrame peopleWithCity = sqlContext.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

if __name__ == "__main__":
sc = SparkContext(appName="SimpleTextClassificationPipeline")
sqlCtx = SQLContext(sc)
sqlContext = SQLContext(sc)

# Prepare training documents, which are labeled.
LabeledDocument = Row("id", "text", "label")
Expand Down
6 changes: 3 additions & 3 deletions examples/src/main/python/mllib/dataset_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ def summarize(dataset):
print >> sys.stderr, "Usage: dataset_example.py <libsvm file>"
exit(-1)
sc = SparkContext(appName="DatasetExample")
sqlCtx = SQLContext(sc)
sqlContext = SQLContext(sc)
if len(sys.argv) == 2:
input = sys.argv[1]
else:
input = "data/mllib/sample_libsvm_data.txt"
points = MLUtils.loadLibSVMFile(sc, input)
dataset0 = sqlCtx.inferSchema(points).setName("dataset0").cache()
dataset0 = sqlContext.inferSchema(points).setName("dataset0").cache()
summarize(dataset0)
tempdir = tempfile.NamedTemporaryFile(delete=False).name
os.unlink(tempdir)
print "Save dataset as a Parquet file to %s." % tempdir
dataset0.saveAsParquetFile(tempdir)
print "Load it back and summarize it again."
dataset1 = sqlCtx.parquetFile(tempdir).setName("dataset1").cache()
dataset1 = sqlContext.parquetFile(tempdir).setName("dataset1").cache()
summarize(dataset1)
shutil.rmtree(tempdir)
4 changes: 2 additions & 2 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ class LogisticRegressionModel(JavaModel):
# The small batch size here ensures that we see multiple batches,
# even in these small test examples:
sc = SparkContext("local[2]", "ml.feature tests")
sqlCtx = SQLContext(sc)
sqlContext = SQLContext(sc)
globs['sc'] = sc
globs['sqlCtx'] = sqlCtx
globs['sqlContext'] = sqlContext
(failure_count, test_count) = doctest.testmod(
globs=globs, optionflags=doctest.ELLIPSIS)
sc.stop()
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/ml/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ def setParams(self, numFeatures=1 << 18, inputCol="input", outputCol="output"):
# The small batch size here ensures that we see multiple batches,
# even in these small test examples:
sc = SparkContext("local[2]", "ml.feature tests")
sqlCtx = SQLContext(sc)
sqlContext = SQLContext(sc)
globs['sc'] = sc
globs['sqlCtx'] = sqlCtx
globs['sqlContext'] = sqlContext
(failure_count, test_count) = doctest.testmod(
globs=globs, optionflags=doctest.ELLIPSIS)
sc.stop()
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
try:
# Try to access HiveConf, it will raise exception if Hive is not added
sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
sqlCtx = HiveContext(sc)
sqlCtx = sqlContext = HiveContext(sc)
except py4j.protocol.Py4JError:
sqlCtx = SQLContext(sc)
sqlCtx = sqlContext = SQLContext(sc)

print("""Welcome to
____ __
Expand All @@ -68,7 +68,7 @@
platform.python_version(),
platform.python_build()[0],
platform.python_build()[1]))
print("SparkContext available as sc, %s available as sqlCtx." % sqlCtx.__class__.__name__)
print("SparkContext available as sc, %s available as sqlContext." % sqlContext.__class__.__name__)

if add_files is not None:
print("Warning: ADD_FILES environment variable is deprecated, use --py-files argument instead")
Expand Down
Loading

0 comments on commit 6ada4f6

Please sign in to comment.