我的目标是基于XML数据解析一个大型XML文件并将对象持久化到DB,并快速完成。操作需要是事务性的,因此我可以回滚,以防出现解析XML的问题,或者无法验证创建的对象。
我正在使用Grails执行器插件来线程这个操作。问题是,我在服务中创建的每个线程都有自己的事务和会话。如果我创建了4个线程,而1个线程失败了,那么没有失败的3个线程的会话可能已经刷新了,或者它们将来可能会刷新。
我在想,如果我能告诉每个线程使用“当前”Hibernate会话,这可能会解决我的问题。我的另一个想法是,我可以阻止所有的会议,直到它被知道,所有的完成,没有错误。不幸的是,我不知道怎么做这两件事。
还有一个额外的陷阱。有许多这些XML文件需要解析,还有许多将在将来创建。这些XML文件中有许多包含数据,当解析这些数据时,将创建一个与解析前一个XML文件时已经创建的对象相同的对象。在这种情况下,我需要引用现有的对象。为了解决这个问题,我向每个类添加了一个瞬态isUnique变量。使用Grails 独一无二约束不起作用,因为它没有像我在问题这里中所概述的那样考虑到hasMany关系。
与实际情况相比,下面的示例非常简单。XML文件的I'm解析具有许多属性的深嵌套元素。
想象一下以下域类:
class Foo {
String ver
Set<Bar> bars
Set<Baz> bazs
static hasMany = [bars: Bar, bazs: Baz]
boolean getIsUnique() {
Util.isUnique(this)
}
static transients = [
'isUnique'
]
static constraints = {
ver(nullable: false)
isUnique(
validator: { val, obj ->
obj.isUnique
}
)
}
}
class Bar {
String name
boolean getIsUnique() {
Util.isUnique(this)
}
static transients = [
'isUnique'
]
static constraints = {
isUnique(
validator: { val, obj ->
obj.isUnique
}
)
}
}
class Baz {
String name
boolean getIsUnique() {
Util.isUnique(this)
}
static transients = [
'isUnique'
]
static constraints = {
isUnique(
validator: { val, obj ->
obj.isUnique
}
)
}
}这是位于我的Util.groovy文件夹中的src/groovy类。该类包含用于确定域类的实例是否唯一和/或检索已经存在的相等实例的方法:
import org.hibernate.Hibernate
class Util {
/**
* Gets the first instance of the domain class of the object provided that
* is equal to the object provided.
*
* @param obj
* @return the first instance of obj's domain class that is equal to obj
*/
static def getFirstDuplicate(def obj) {
def objClass = Hibernate.getClass(obj)
objClass.getAll().find{it == obj}
}
/**
* Determines if an object is unique in its domain class
*
* @param obj
* @return true if obj is unique, otherwise false
*/
static def isUnique(def obj) {
getFirstDuplicate(obj) == null
}
/**
* Validates all of an object's constraints except those contained in the
* provided blacklist, then saves the object if it is valid.
*
* @param obj
* @return the validated object, saved if valid
*/
static def validateWithBlacklistAndSave(def obj, def blacklist = null) {
def propertiesToValidate = obj.domainClass.constraints.keySet().collectMany{!blacklist?.contains(it)? [it] : []}
if(obj.validate(propertiesToValidate)) {
obj.save(validate: false)
}
obj
}
}假设XML文件"A“类似于以下内容:
<foo ver="1.0">
<!-- Start bar section -->
<bar name="bar_1"/>
<bar name="bar_2"/>
<bar name="bar_3"/>
...
<bar name="bar_5000"/>
<!-- Start baz section -->
<baz name="baz_1"/>
<baz name="baz_2"/>
<baz name="baz_3"/>
...
<baz name="baz_100000"/>
</foo>假设XML文件"B“与此类似(除了添加了一个新的bar和一个新的baz之外,与XML文件"A”相同)。在XML文件"A“之后解析XML文件"B”时,应该创建三个新对象。)( Bar与name = bar_5001 2.)Baz和name = baz_100001,3.)具有ver = 2.0的ver = 2.0以及与所显示的相同的bars和bazs列表,重用从XML文件A导入中已经存在的Bar和Baz实例。
<foo ver="2.0">
<!-- Start bar section -->
<bar name="bar_1"/>
<bar name="bar_2"/>
<bar name="bar_3"/>
...
<bar name="bar_5000"/>
<bar name="bar_5001"/>
<!-- Start baz section -->
<baz name="baz_1"/>
<baz name="baz_2"/>
<baz name="baz_3"/>
...
<baz name="baz_100000"/>
<baz name="baz_100001"/>
</foo>以及类似于此的服务:
class BigXmlFileUploadService {
// Pass in a 20MB XML file
def upload(def xml) {
String rslt = null
def xsd = Util.getDefsXsd()
if(Util.validateXmlWithXsd(xml, xsd)) { // Validate the structure of the XML file
def fooXml = new XmlParser().parseText(xml.getText()) // Parse the XML
def bars = callAsync { // Make a thread for creating the Bar objects
def bars = []
for(barXml in fooXml.bar) { // Loop through each bar XML element inside the foo XML element
def bar = new Bar( // Create a new Bar object
name: barXml.attribute("name")
)
bar = retrieveExistingOrSave(bar) // If an instance of Bar that is equal to this one already exists then use it
bars.add(bar) // Add the new Bar object to the list of Bars
}
bars // Return the list of Bars
}
def bazs = callAsync { // Make a thread for creating the Baz objects
def bazs = []
for(bazXml in fooXml.baz) { // Loop through each baz XML element inside the foo XML element
def baz = new Baz( // Create a new Baz object
name: bazXml.attribute("name")
)
baz = retrieveExistingOrSave(baz) // If an instance of Baz that is equal to this one already exists then use it
bazs.add(baz) // Add the new Baz object to the list of Bazs
}
bazs // Return the list of Bazs
}
bars = bars.get() // Wait for thread then call Future.get() to get list of Bars
bazs = bazs.get() // Wait for thread then call Future.get() to get list of Bazs
def foo = new Foo( // Create a new Foo object with the list of Bars and Bazs
ver: fooXml.attribute("ver")
bars: bars
bazs: bazs
).save()
rslt = "Successfully uploaded ${xml.getName()}!"
} else {
rslt = "File failed XSD validation!"
}
rslt
}
private def retrieveExistingOrSave(def obj, def existingObjCache) {
def dup = Util.getFirstDuplicate(obj)
obj = dup ?: Util.validateWithBlacklistAndSave(obj, ["isUnique"])
if(obj.errors.allErrors) {
log.error "${obj} has errors ${obj.errors}"
throw new RuntimeException() // Force transaction to rollback
}
obj
}
}因此,问题是如何使服务的upload方法中发生的所有事情都像在单个会话中发生的那样,以便在任何一个部分失败时,所有发生的事情都可以回滚?
发布于 2013-08-28 03:04:15
可以对服务进行优化,以解决某些痛点:
flush。有关优化,请参阅下面。服务类看起来类似于:
// Pass in a 20MB XML file
def upload(def xml) {
String rslt = null
def xsd = Util.getDefsXsd()
if (Util.validateXmlWithXsd(xml, xsd)) {
def fooXml = new XmlParser().parseText(xml.getText())
def foo = new Foo().save(flush: true)
def bars = callAsync {
saveBars(foo, fooXml)
}
def bazs = callAsync {
saveBazs(foo, fooXml)
}
//Merge the detached instances and check whether the child objects
//are populated or not. If children are
//Can also issue a flush, but we do not need it yet
//By default domain class is validated as well.
foo = bars.get().merge() //Future returns foo
foo = bazs.get().merge() //Future returns foo
//Merge the detached instances and check whether the child objects
//are populated or not. If children are
//absent then rollback the whole transaction
handleTransaction {
if(foo.bars && foo.bazs){
foo.save(flush: true)
} else {
//Else block will be reached if any of
//the children is not associated to parent yet
//This would happen if there was a problem in
//either of the thread, corresponding
//transaction would have rolled back
//in the respective sessions. Hence empty associations.
//Set transaction roll-back only
TransactionAspectSupport
.currentTransactionStatus()
.setRollbackOnly()
//Or throw an Exception and
//let handleTransaction handle the rollback
throw new Exception("Rolling back transaction")
}
}
rslt = "Successfully uploaded ${xml.getName()}!"
} else {
rslt = "File failed XSD validation!"
}
rslt
}
def saveBars(Foo foo, fooXml) {
handleTransaction {
for (barXml in fooXml.bar) {
def bar = new Bar(name: barXml.attribute("name"))
foo.addToBars(bar)
}
//Optional I think as session is flushed
//end of method
foo.save(flush: true)
}
foo
}
def saveBazs(Foo foo, fooXml) {
handleTransaction {
for (bazXml in fooXml.baz) {
def baz = new Baz(name: bazXml.attribute("name"))
foo.addToBazs(baz)
}
//Optional I think as session is flushed
//end of method
foo.save(flush: true)
}
foo
}
def handleTransaction(Closure clos){
try {
clos()
} catch (e) {
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly()
}
if (TransactionAspectSupport.currentTransactionStatus().isRollbackOnly())
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly()
}发布于 2013-08-28 02:41:09
你可能做不到你想做的事。
首先,Hibernate会话线程不安全
会话是一个廉价的非线程安全对象,应该使用它一次,然后丢弃它:一个请求、一个会话或一个工作单元。..。
其次,我不认为并行执行SQL查询会带来很大好处。我查看了PostgreSQL的JDBC驱动程序是如何工作的,所有实际运行查询的方法都是synchronized。
您所做的最慢的部分可能是XML处理,因此我建议将其并行化,并在单个线程上执行持久性操作。您可以创建几个工作人员来读取XML,并将对象添加到某种队列中。然后让另一个工作人员拥有该会话,并在解析对象时保存它们。
您可能还想看看Hibernate的批处理文档页面。每次插入后冲洗不是最快的方法。
最后,我不知道您的对象是如何映射的,但是在保存完所有子对象之后,您可能会遇到保存Foo的问题。将对象添加到foo的集合中将导致Hibernate对每个对象设置foo_id引用,最后将对插入的每个对象进行更新查询。您可能希望先创建foo,然后在每次插入之前执行baz.setFoo(foo)。
https://stackoverflow.com/questions/18472013
复制相似问题