缺少scala akka actoractor2.112.3.3.jar怎么解决

At the beginning, SBT worked very well I believe.However, in my project, I need to create a JAVA process by processBuilder.Then, I have to use java -cp to launch akka program.The whole processBuilder command is:/usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java
-cp&&userpath/job123/:./javaLib/typesafe-config.jar:./javaLib/akka-actors.jar:./javaLib/akka-remote_2.10-2.3.2.jar:./lib/scala-library-2.10.3.jar&&javaMapReduce.MapperWorkerActorSystem system = ActorSystem.create(&worker&+name,ConfigFactory.load(&taskupdate&)) gived me an exception:Exception in thread &main& java.lang.NoSuchMethodException:akka.remote.RemoteActorRefProvider.&init&(java.lang.String,akka.actor.ActorSystem$Settings, akka.event.EventStream,akka.actor.Scheduler, akka.actor.DynamicAccess)at java.lang.Class.getConstructor0(Class.java:2800)at java.lang.Class.getDeclaredConstructor(Class.java:2043)atakka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:77)at scala.util.Try$.apply(Try.scala:161)atakka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:74)atakka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:85)atakka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:85)at scala.util.Success.flatMap(Try.scala:200)atakka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:85)at akka.actor.ActorSystemImpl.(ActorSystem.scala:111)at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)at akka.actor.ActorSystem$.create(ActorSystem.scala:66)at akka.actor.ActorSystem.create(ActorSystem.scala)at javaMapReduce.TaskUpdater.(MapperWorker.java:114)at javaMapReduce.MapperWorker.main(MapperWorker.java:226)I searched online, and someone said it is because that scala librarycontains old version of akka. Does it make this happen?--Read the docs: Check the FAQ: Search the archives: ---You received this message because you are subscribed to the Google Groups &Akka User List& group.To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+.To post to this group, send email to akka-.Visit this group at For more options, visit
Search Discussions
12 maj 2014 kl. 22:55 skrev Liang Tang &liang.&:At the beginning, SBT worked very well I believe.However, in my project, I need to create a JAVA process by processBuilder. Then, I have to use java -cp to launch akka program.The whole processBuilder command is:/usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java
userpath/job123/:./javaLib/typesafe-config.jar:./javaLib/akka-actors.jar:./javaLib/akka-remote_2.10-2.3.2.jar:./lib/scala-library-2.10.3.jar
javaMapReduce.MapperWorkerActorSystem system = ActorSystem.create(&worker&+name, ConfigFactory.load(&taskupdate&)) gived me an exception:Exception in thread &main& java.lang.NoSuchMethodException: akka.remote.RemoteActorRefProvider.&init&(java.lang.String, akka.actor.ActorSystem$Settings, akka.event.EventStream, akka.actor.Scheduler, akka.actor.DynamicAccess)at java.lang.Class.getConstructor0(Class.java:2800)at java.lang.Class.getDeclaredConstructor(Class.java:2043)at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:77)at scala.util.Try$.apply(Try.scala:161)at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:74)at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:85)at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:85)at scala.util.Success.flatMap(Try.scala:200)at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:85)at akka.actor.ActorSystemImpl.&init&(ActorSystem.scala:546)at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)at akka.actor.ActorSystem$.create(ActorSystem.scala:66)at akka.actor.ActorSystem.create(ActorSystem.scala)at javaMapReduce.TaskUpdater.&init&(TaskUpdater.java:36)at javaMapReduce.MapperWorker.run(MapperWorker.java:114)at javaMapReduce.MapperWorker.main(MapperWorker.java:226)I searched online, and someone said it is because that scala library contains old version of akka. Does it make this happen?Yes, Scala 2.10 comes with Akka 2.1.0, which is not binary compatible with Akka 2.3.2. I recommend using sbt to resolve the dependencies instead of manually constructing a command line, i.e. using the runMain task.Regards,Roland--Read the docs: Check the FAQ: Search the archives: ---You received this message because you are subscribed to the Google Groups &Akka User List& group.To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+.To post to this group, send email to akka-.Visit this group at For more options, visit Dr.&Roland KuhnAkka Tech LeadTypesafe & Reactive apps on the JVM.twitter: @rolandkuhn--Read the docs: Check the FAQ: Search the archives: ---You received this message because you are subscribed to the Google Groups &Akka User List& group.To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+.To post to this group, send email to akka-.Visit this group at For more options, visit
Related Discussions
viewthread |
categories
user style
2 users in discussion
site design / logo & 2016 Grokbase查看: 4317|回复: 1
akka actor的运行原理
主题帖子积分
中级会员, 积分 253, 距离下一级还需 747 积分
中级会员, 积分 253, 距离下一级还需 747 积分
1.如何配置Dispatcher?
2.Dispatcher的工作原理是什么?&&& && && && && && && && && && &
最近在调研Scala web框架的性能时遇到一些问题, 比如生成巨多的Actor,GC时间过长,CPU使用率太高, 执行Actor的Receive是遇到耗时操作的问题等。怀疑Akka的调度器有些问题,特意整理了一些Akka调度器的背景知识,以及从源代码分析一下Actor是怎么执行地。
Dispatcher
Akka MessageDispatcher驱动Akka actor运行(tick),也可以说是这个机器的引擎。所有的MessageDispatcher都实现了ExecutionContext trait, 这意味着它们可以用来执行任何代码, 例如 .
如果对Actor不做额外配置的话,ActorSystem会使用一个缺省的Dispatcher。缺省的Dispatcher也可以进行参数调整,缺省它使用一个特定的default-executor。如果ActorSystem在创建时传入一个ExecutionContext,则此ExecutionContext 将作为此ActorSystem的所有Dispatcher的缺省executor。缺省的default-executor是fork-join-executor,在大部分情况下它的性能还是不错的。
可以通过下面的代码得到一个配置的Dispatcher:
[Shell] 纯文本查看 复制代码
// for use with Futures, Scheduler, etc.
implicit val executionContext = system.dispatchers.lookup(&my-dispatcher&)
为Actor设置Dispatcher
如果你希望为你的 Actor 设置非缺省的派发器,你需要做两件事:
首先要配置dispatcher:
[Shell] 纯文本查看 复制代码
my-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = &fork-join-executor&
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
或者配置使用thread-pool-executor
[Shell] 纯文本查看 复制代码my-thread-pool-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = &thread-pool-executor&
# Configuration for the thread pool
thread-pool-executor {
# minimum number of threads to cap factor-based core number to
core-pool-size-min = 2
# No of core threads ... ceil(available processors * factor)
core-pool-size-factor = 2.0
# maximum number of threads to cap factor-based number to
core-pool-size-max = 10
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
这样,你就可以配置Actor使用图个特定的disptacher:
[Scala] 纯文本查看 复制代码
import akka.actor.Props
val myActor = context.actorOf(Props[MyActor], &myactor&)
[Scala] 纯文本查看 复制代码
akka.actor.deployment {
/myactor {
dispatcher = my-dispatcher
或者用另外一种方式,可以在代码中指定dispatcher:
[Shell] 纯文本查看 复制代码
import akka.actor.Props
val myActor =
context.actorOf(Props[MyActor].withDispatcher(&my-dispatcher&), &myactor1&)
dispatcher的类型
Dispatcher
可共享性: 无限制邮箱: 任何一种类型,为每一个Actor创建一个使用场景: 缺省派发器,Bulkheading底层使用: java.util.concurrent.ExecutorService
& &可以指定“executor”使用“fork-join-executor”, “thread-pool-executor” 或者 the FQCN(类名的全称) of an akka.dispatcher.ExecutorServiceConfigurator
PinnedDispatcher
可共享性: 无邮箱: 任何一种类型,为每个Actor创建一个使用场景: Bulkheading底层使用: 任何 akka.dispatch.ThreadPoolExecutorConfigurator
& &缺省为一个 “thread-pool-executor”
BalancingDispatcher
可共享性: 仅对同一类型的Actor共享邮箱: 任何,为所有的Actor创建一个使用场景: Work-sharing底层使用: java.util.concurrent.ExecutorService
& &指定使用 “executor” 使用 “fork-join-executor”, “thread-pool-executor” 或 the FQCN(类名的全称) of an akka.dispatcher.ExecutorServiceConfigurator
CallingThreadDispatcher
可共享性: 无限制邮箱: 任何,每Actor每线程创建一个(需要时)使用场景: 仅为测试使用底层使用: 调用的线程 (duh)
kka Mailbox 保存发往某 Actor的消息. 通常每个 Actor 拥有自己的邮箱, 但是如果是使用 BalancingDispatcher 使用同一个 BalancingDispatcher 的所有Actor共享同一个邮箱实例.
内置的邮箱的类型:
UnboundedMailbox - 缺省邮箱SingleConsumerOnlyUnboundedMailboxBoundedMailboxNonBlockingBoundedMailboxUnboundedPriorityMailboxBoundedPriorityMailboxUnboundedStablePriorityMailboxBoundedStablePriorityMailboxUnboundedControlAwareMailboxBoundedControlAwareMailbox
我们只看本地(同一个JVM进程)的ActRef: LocalActorRef,它定义了send (!)方法:
[Shell] 纯文本查看 复制代码
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = actorCell.sendMessage(message, sender)
actorCell实现了akka.actor.dungeon.Dispatch trait。它实现了具体的message的发送:
[Scala] 纯文本查看 复制代码def sendMessage(msg: Envelope): Unit =
if (system.settings.SerializeAllMessages) {
val unwrapped = (msg.message match {
case DeadLetter(wrapped, _, _) ⇒ wrapped
case other
⇒ other
}).asInstanceOf[AnyRef]
if (!unwrapped.isInstanceOf[NoSerializationVerificationNeeded]) {
val s = SerializationExtension(system)
s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get
dispatcher.dispatch(this, msg)
} catch handleException
可以看到,还是交给dispatcher.dispatch进行消息的分发。
看具体的实现类Dispatcher.dispatch:
[Scala] 纯文本查看 复制代码
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
val mbox = receiver.mailbox
mbox.enqueue(receiver.self, invocation)
registerForExecution(mbox, true, false)
将消息放在对应的actor的邮箱中后,就会调用registerForExecution方法。
这个方法最重要的一行就是执行mbox,因为mbox实现了ForkJoinTask和Runnable接口。 (如果执行失败,还可能执行一次)
[Shell] 纯文本查看 复制代码
executorService execute mbox
其实是将mbox放到线程池中执行。
mbox并不是一次全部执行完的,而是有throughput参数确定。每次只执行throughput个消息,执行完会加入到线程池等待队列中,除非全部执行完毕。
因此当throughput=1的时候对actor来说比较“公平”,这样actor能平均的执行。
由此得出几个结论:
可以调解线程池的大小进行调优具体的dispatcher实现该如何执行actor。 比如我们可以实现一个优先级队列来执行优先级比较高的Actor。如果一个Actor执行比较耗时的操作,比如IO操作,就会影响线程池的执行,造成整体吞吐率下降。所以为这些耗时的Actor配置专门的线程池Akka会中断一个Actor而去执行别的actor吗,然后回来继续执行先前的Actor? 答案是不会。 因为一旦Actor交给线程池,线程就会去执行它。 如果你在Actor中sleep线程,会导致线程池中的此线程sleep。 所以你必须想一些办法,比如一个长的业务逻辑分成几个业务逻辑,每次只执行一个业务逻辑,通过状态变换分成多次执行。如果没有其它情况,线程会执行完actor 邮箱中的一部分消息,如果还有消息,会将此邮箱再放入线程池等待执行,直到没有待处理的消息为止。
文章来源:鸟窝
主题帖子积分
高级会员, 积分 1595, 距离下一级还需 3405 积分
高级会员, 积分 1595, 距离下一级还需 3405 积分
站长推荐 /6
about云|新出视频,openstack零基础入门,解决你ping不通外网难题
云计算hadoop视频大全(新增 yarn、flume|storm、hadoop一套视频
视频资料大优惠
大数据零基础由入门到实战
阶段1:hadoop零基础入门基础篇
阶段2:hadoop2入门
阶段3:大数据非hadoop系列课程
阶段4:项目实战篇
阶段5:大数据高级系列应用课程
阶段6:工作实用系列教程
等待验证会员请验证邮箱
新手获取积分方法
Powered by}

我要回帖

更多关于 akka actor 多线程 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信