All about Kryo serialiser in Spark
Q. Why Kryo is fatser?
Ans: Kryo is significantly faster and more compact than Java serialization
Q. If Kryo serialiser is faster then why it is not default serialiser?
does not support all Serializable types and requires you to register the classes you’ll use in the program in advance for best performance.
So it is not used by default because:
- Not every
java.io.Serializable
is supported out of the box – if you have custom class that extendsSerializable
it still cannot be serialized with Kryo, unless registered. - One needs to register custom classes.
The only reason Kryo is not set to default is because it requires custom registration. Although, Kryo is supported for RDD caching and shuffling, it’s not natively supported to serialize to the disk. Both the methods, saveAsObjectFile on RDD and objectFile method on SparkContext supports only java serialization.
Still?
If you need a performance boost and also need to reduce memory usage, Kryo is definitely for you. The join operations and the grouping operations are where serialization has an impact on and they usually have data shuffling. Now lesser the amount of data to be shuffled, the faster will be the operation.
Caching also have an impact when caching to disk or when data is spilled over from memory to disk.
Q. How to register class with kryo?
Kryo register a class, we simply have to pass the name of the class in the registerKryoClasses method. i.e :
.registerKryoClasses( Array(classOf[Person], classOf[Furniture]) )
Q. Kryo Example:
Example
Lets look with a simple example to see the difference with the default Java Serialization in practical.
Starting off by registering the required classes.
//class which needs to be registeredcase class Person(name: String, age: Int)val conf = new SparkConf() .setAppName("kyroExample") .setMaster("local[*]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrationRequired", "true") .registerKryoClasses( Array(classOf[Person],classOf[Array[Person]],Class.forName("org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage")) )val sparkContext = new SparkContext(conf)
Now, lets create an array of Person and parallelize it to make an RDD out of it and persist it in memory.
val personList: Array[Person] = (1 to 9999999) .map(value => Person("p"+value, value)).toArray//creating RDD of Personval rddPerson: RDD[Person] = sparkContext.parallelize(personList,5)val evenAgePerson: RDD[Person] = rddPerson.filter(_.age % 2 == 0)//persisting evenAgePerson RDD into memoryevenAgePerson.persist(StorageLevel.MEMORY_ONLY_SER)evenAgePerson.take(50).foreach(x=>println(x.name,x.age))