ProActive Reference Card
[ Download PDF ]
ProActive is a Java library for parallel, distributed, and concurrent computing, also featuring mobility and security in a uniform framework. ProActive provides a comprehensive API and a graphical interface. The library is based on an Active Object pattern that is a uniform way to encapsulate:
ProActive is only made of standard Java classes, and requires no changes to the Java Virtual Machine. Overall, it simplifies the programming of applications distributed over Local Area Network (LAN), Clusters, Intranet or Internet GRIDs.
B.1. Main concepts and definitions
-
Active Objects (AO): a remote object, with its own thread, receiving calls on its public methods
-
FIFO activity: an AO, by default, executes the request it receives one after the other, in the order they were received
-
No-sharing: standard Java objects cannot be referenced from 2 AOs, ensured by deep-copy of constructor params, method params, and results
-
Asynchronous Communications: method calls towards AOs are asynchronous
-
Future: the result of a non-void asynchronous method call
-
Request: the occurrence of a method call towards an AO
-
Service: the execution by an AO of a request
-
Reply: after a service, the method result is sent back to the caller
-
Wait-by-necessity: automatic wait upon the use of a still awaited future
-
Automatic Continuation: transmission of futures and replies between AO and JVMs
-
Migration: an AO moving from one JVM to another, computational weak mobility: the AO decides to migrate and stack is lost
-
Group: a typed group of objects or AOs. Methods are called in parallel on all group members.
-
Component: made of AOs, a component defines server and client interfaces
-
Primitive Component: directly made of Java code and AOs
-
Composite Component: contains other components (primitives or composites)
-
Parallel Component: a composite that is using groups to multicast calls to inner components
-
Security: X.509 Authentication, Integrity, and Confidentiality defined at deployment in an XML file on entities such as communications, migration, dynamic code loading.
-
Virtual Node (VN): an abstraction (a string) representing where to locate AOs at creation
-
Deployment descriptor: an XML file where a mapping VN --> JVMs --> Machine is specified.
-
Node: the result of mapping a VN to a set of JVMs. After activation, a VN contains a set of nodes, living in a set of JVMs.
-
IC2D: Interactive Control and Debugging of Distribution: a Graphical environment for monitoring and steering Grid applications
B.2. Main principles: asynchronous method calls and implicit futures
A a = (A) ProActive.newActive('A', params, node);
a.foo (param); v = a.bar (param); ... v.gee (param);
B.3. Explicit Synchronization
boolean isAwaited(Object);
void waitFor(Object);
void waitForAll(Vector);
int waitForAny(Vector);
B.4. Programming AO Activity and services
When an AO must implement an activity that is not FIFO, the RunActive interface has to be implemented: it specifies the AO behavior in the method named runActivity():
Interface RunActive void runActivity(Body body)
Example:
public class A implements RunActive {
runActivity(Body body) { Service service = new Service(body); while ( terminate ) { ...
... ... service.serveOldest('foo'); ... } } }
Two other interfaces can also be specified:
Interface InitActive void initActivity(Body body)
EndActive void endActivity(Body body)
B.5. Reactive Active Object
Even when an AO is busy doing its own work, it can remain reactive to external events (method calls). One just has to program non-blocking services to take into account external inputs.
public class BusyButReactive implements RunActive {
public void runActivity(Body body) { Service service = new Service(body); while ( ! hasToTerminate ) { ...
... service.serveOldest('changeParameters', 'terminate'); ... } } public void changeParameters () { ...... }
public void terminate (){ hasToTerminate=true; } }
It also allows one to specify explicit termination of AOs (there is currently no Distributed Garbage Collector). Of course, the reactivity is up to the length of going around the loop. Similar techniques can be used to start, suspend, restart, and stop AOs.
Non-blocking services: returns immediately if no matching request is pending
void serveOldest(); serveOldest(String methodName)
serveOldest(RequestFilter requestFilter)
Blocking services: waits until a matching request can be served
void blockingServeOldest(); blockingServeOldest(String methodName)
blockingServeOldest(RequestFilter requestFilter)
Blocking timed services: wait a matching request at most a time given in ms
void blockingServeOldest (long timeout)
void blockingServeOldest(String methodName, long timeout)
blockingServeOldest(RequestFilter requestFilter)
Waiting primitives:
void waitForRequest();
void waitForRequest(String methodName);
Others:
void fifoServing();
lifoServing()
serveYoungest() flushAll()
B.7. Active Object Creation:
Object newActive(String classname, Object[] constructorParameters,Node node);
Object newActive(String classname,Object[] constructorParameters,VirtualNode virtualnode);
Object turnActive(Object, Node node);
A ga = (A) ProActiveGroup.newGroup( 'A', params, nodes);
ga.foo(...); V gv = ga.bar(...);
gv.gee (...); in group. Group ag = ProActiveGroup.getGroup(ga);
ag.add(o); ag.remove(index) A ga2 = (A) ag.getGroupByType(); void setScatterGroup(g);
void unsetScatterGroup(g);
B.9. Explicit Group Synchronizations
Methods both in Interface Group, and static in class ProActiveGroup
boolean ProActiveGroup.allAwaited (Object); ProActiveGroup.allArrived (Object);
ProActiveGroup.waitAll (Object); ProActiveGroup.waitN (Object, int nb);
int ProActiveGroup.waitOneAndGetIndex (Object);
A spmdGroup = (A) ProSPMD.newSPMDGroup('A', params, nodes);
A mySpmdGroup = (A) ProSPMD.getSPMDGroup();
int rank = ProSPMD.getMyRank();
ProSPMD.barrier('barrierID');
Methods both in Interface Group, and static in class ProActiveGroup
void migrateTo(Object o);
void migrateTo(String nodeURL);
int void migrateTo(Node node);
To initiate the migration of an object from outside, define a public method, that upon service will call the static migrateTo primitive:
public void moveTo(Object) { try{ ProActive.migrateTo(t); } catch (Exception e) { e.printStackTrace(); logger.info('Cannot migrate.'); } }
void onDeparture(String MethodName); onArrival(String MethodName);
setMigrationStrategy(MigrationStrategy); migrationStrategy.add(Destination); migrationStrategy.remove(Destination d) ;
Components are formed from AOs, a component is linked and communicates with other remote components. A component can be composite, made of other components, and as such itself distributed over several machines. Component systems are defined in XML files (ADL: Architecture Description Language); these files describe the definition, the assembly, and the bindings of components.
Components follow the Fractal hierarchical component model specification and API, see http://fractal.objectweb.org
The following methods are specific to ProActive.
In the class org.objectweb.proactive.ProActive:
Component newActiveComponent('A', params, VirtualNode, ComponentParameters);
In the class org.objectweb.proactive.core.component.Fractive:
ProActiveInterface createCollectiveClientInterface(String itfName, String itfSignature);
An X.509 Public Key Infrastructure (PKI) allowing communication Authentication, Integrity, and Confidentiality (AIC) to be configured in an XML security file, at deployment, outside any source code. Security is compatible with mobility, allows for hierarchical domain specificationand dynamically negotiated policies.
Example of specification:
<Rule> <From> <Entity type='VN' name='VN1'/> </From> <To> <Entity type='VN' name='VN2'/> </To> <Communication> <Request value='authorized'> <Attributes authentication='required' integrity='required' confidentiality='optional'/> </Request> </Communication> <Migration>denied</Migration> <AOCreation>denied</AOCreation>
</Rule>
This rule specifies that: from Virual Node 'VN1' to the VN 'VN2', the communications (requests) are authorized, provided authentication and integrity are being used, while confidentiality is optional. Migration and AO creation are not authorized.
Virtual Nodes (VN) allow one to specify the location where to create AOs. A VN is uniquely identified as a String, is defined in an XML Deployment Descriptor where it is mapped onto JVMs. JVMs are themselves mapped onto physical machines: VN --> JVMs --> Machine. Various protocols can be specified to create JVMs onto machines (ssh, Globus, LSF, PBS, rsh, rlogin, Web Services, etc.). After activation, a VN contains a set of nodes, living in a set of JVMs. Overall, VNs and deployment descriptors allow to abstract away from source code: machines, creation, lookup and registry protocols.
Descriptor example: creates one jvm on the local machine
<ProActiveDescriptor xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xsi:noNamespaceSchemaLocation='DescriptorSchema.xsd'>
<virtualNodesDefinition> <virtualNode name='Dispatcher'/> </virtualNodesDefinition> <componentDefinition/>
<deployment> <mapping> <map virtualNode='Dispatcher'> <jvmSet>
<vmName value='Jvm1'/> </jvmSet> </map>
</mapping> <jvms> <jvm name='Jvm1'>
<creation> <processReference refid='creationProcess'/> </creation>
</jvm> </jvms> </deployment> <infrastructure> <processes>
<processDefinition id='creationProcess'> <jvmProcess class='org.objectweb.proactive.core.process.JVMNodeProcess'/>
</processDefinition> </processes> </infrastructure> <componentDefinition> </ProActiveDescriptor>
Deployment API
ProActiveDescriptor pad = ProActive.getProActiveDescriptor(String File);
pad.activateMapping(String VN);
pad.activateMappings();
VirtualNode vn = pad.getVirtualNode(String)
Node[] n = vn.getNodes();
Object[] n[0].getActiveObjects();
ProActiveRuntime part = n[0].getProActiveRuntime();
pad.killall(boolean softly);
Functional exceptions with asynchrony
ProActive.tryWithCatch(MyException.class);
try {
ProActive.endTryWithCatch(); } catch (MyException e) {
} finally { ProActive.removeTryWithCatch();
Non-Functional Exceptions
Adding a handler to an active object on its side:
ProActive.addNFEListenerOnAO(myAO, new NFEListener() {
public boolean handleNFE(NonFunctionalException nfe) { return true; } });
Handlers can also be added to the client side of an active object with
ProActive.addNFEListenerOnProxy(ao, handler)
or to a JVM with
ProActive.addNFEListenerOnJVM(handler)
These handlers can also be removed with
ProActive.removeNFEListenerOnAO(ao, handler), ProActive.removeNFEListenerOnProxy(ao, handler), ProActive.removeNFEListenerOnJVM(handler)
It's possible to define an handler only for some exception types, for example:
ProActive.addNFEListenerOnJVM(new TypedNFEListener( SendRequestCommunicationException.class, new NFEListener() { public boolean handleNFE(NonFunctionalException e) {
; } } ));
The behaviour of the default handler (if none could handle the exception) is to throw the exception if it's on the proxy side, or log it if it's on the body side.
B.16. Export Active Objects as Web services
ProActive allows active objects exportation as web services. The service is deployed onto a Jakarta Tomcat web server with a given url. It is identified by its urn, an unique id of the service. It is also possible to choose the exported methods of the object.
The WSDL file matching the service will be accesible at http://localhost:8080/servlet/wsdl?id=a for a service which name is 'a' and which id deployed on a web server which location is http://localhost:8080.
A a = (A) ProActive.newActive('A', new Object []{});
String [] methods = new String [] {'foo', 'bar'};
ProActive.exposeAsWebService(a,'http:
ProActive.unExposeAsWebService('a', 'http:
B.17. Deploying a fault-tolerant application
ProActive can provide fault-tolerance capabilities through two differents protocols: a Communication-Induced Checkpointing protocol (CIC) or a pessimistic message logging protocol (PML). Making a ProActive application fault-tolerant is fully transparent; active objects are turned fault-tolerant using Java properties that can be set in the deployment descriptor. The programmer can select at deployment time the most adapted protocol regarding the application and the execution environment.
A Fault-tolerant deployment descriptor
<ProActiveDescriptor> ... <virtualNodesDefinition> <virtualNode name='NonFT-Workers' property='multiple'/> <virtualNode name='FT-Workers' property='multiple' ftServiceId='appli'/>
</virtualNodesDefinition> ... <serviceDefinition id='appli'> <faultTolerance> <protocol type='cic' />
<globalServer url='rmi://localhost:1100/FTServer'/>
<resourceServer url='rmi://localhost:1100/FTServer'/>
<ttc value='5'/> </faultTolerance> </serviceDefinition> </services> ...
</ProActiveDescriptor>
Starting the fault-tolerance server
The global fault-tolerance server can be launched using the ProActive/scripts/[unix|windows]/FT/startGlobalFTServer.[sh|bat] script, with 5 optional parameters:
-
the protocol: -proto [cic|pml]. Default value is cic.
-
the server name: -name [serverName]. Default name is FTServer.
-
the port number: -port [portNumber]. Default port number is 1100.
-
the fault detection period: -fdperiod [periodInSec], the time between two consecutive fault detection scanning. Default value is 10 sec.
-
the URL of a p2p service that can be used by the resource server: -p2p [serviceURL]. No default value.
B.18. Peer-to-Peer Infrastructure
This aims to help you to create a P2P infrastructure over your desktop workstations network. It is self-organized and configurable. The infrastructure maintains a dynamic JVMs network for deploying computational applications.
Deploying the Infrastructure:
Firstly, you have to start P2P Services on each shared machine:
$ cd ProActive/scripts/unix/p2p
$ ./startP2PService [-acq acquisitionMethod] [-port portNumber] [-s Peer ...]
With that parameters (all are optionals):
-
-acq is the ProActive Runtime communication protocol used by the peer. Examples: rmi, http, ibis,... By default it is rmi.
-
-port is the port number where the P2P Service listens. By default it is 2410.
-
-s specify addresses of peers which are used to join the P2P infrastructure. Example: rmi://applepie.proactive.org:8080
A simple example:
first.peer.host$ ./startP2PService.sh
second.peer.host$ ./startP2PService.sh -s //first.peer.host
third.peer.host$ ./startP2PService.sh -s //second.peer.host
Acquiring Nodes:
Now you have a P2P Infrastructure running, you might want to deploy your ProActive application on it. That is simple, just modify the XML deployment descriptor:
... <jvms> <jvm name='Jvm1'> <acquisition> <serviceReference refid='p2plookup'/> </acquisition>
</jvm> ... </jvms> ... <infrastructure> ... <services> <serviceDefinition id='p2plookup'> <P2PService nodesAsked='2' acq='rmi' port='6666'> <peerSet>
<peer>//second.peer.host</peer> </peerSet> </P2PService> </serviceDefinition> ... </services> ...
</infrastructure> ...
In the nodesAsked argument, a special value MAX is allowed. When it is used, the P2P infrastructure returns the maximun number of nodes avilable, and continue while the application running to return new nodes to the application. To use all the benefit of that feature, you might add a nodes creation event listener to your application.
Usage Example:
VirtualNode vn = pad.getVirtualNode('p2pvn');
((VirtualNodeImpl) vn).addNodeCreationEventListener(this);
vn.activate();
'this' has to implement the NodeCreationEventListener interface:
public void nodeCreated(NodeCreationEvent event) { newNode = event.getNode(); }
B.19. Branch and Bound API
Firstly, create your own task:
import org.objectweb.proactive.branchnbound.core.Task;
public class YourTask extends Task {
public Result execute() { }
public Vector split() { } public Result gather(Result[] results) {
}
public void initLowerBound() { }
public void initUpperBound() { }
public int compareTo(Object arg) {
}
}
How to interact with the framework from inside a task:
-
Some class variables:
protected Result initLowerBound;
protected Result initUpperBound;
protected Object bestKnownSolution;
protected Worker worker;
-
Interact with the framework (inside a Task):
this.worker.setBestCurrentResult(newBestSolution);
this.worker.sendSubTasksToTheManager(subTaskList);
BooleanWrapper workersAvailable = this.worker.isHungry();
Secondly, choose your task queue:
-
BasicQueueImpl: execute task in FIFO order.
-
LargerQueueIml: execute task in larger order.
-
Extend TaskQueue: your own one.
Finally, start the compution:
Task task = new YourTask(someArguments); Manager manager = ProActiveBranchNBound.newBnB(task, nodes, LargerQueueImpl.class.getName());
Result futureResult = manager.start();
Keep in mind that is only 'initLower/UpperBound' and 'split' methods are called on the root task. The 'execute' method is called on the root task's splitted task. Here the methods order execution:
-
rootTask.initLowerBound(); // compute a first lower bound
-
rootTask.initUpperBound(); // compute a first upper bound
-
Task splitted = rootTask.split(); // generate a set of tasks
-
for i in splitted do in parallel
splitted[i].initLowerBound(); splitted[i].initUpperBound(); Result ri = splitted.execute();
-
Result final = rootTask.gather(Result[] ri); // gathering all result
B.20. File Transfer Deployment
File Transfer Deployment is a tool for transfering files at deployment time. This files are specified using the ProActive XML Deployment Descriptor in the following way:
<VirtualNode name='exampleVNode' FileTransferDeploy='example'/>
.... </deployment> <FileTransferDefinitions> <FileTransfer id='example'> <file src='hello.dat' dest='world.dat'/>
<dir src='exampledir' dest='exampledir'/> </FileTransfer> ... </FileTransferDefinitions> <infrastructure> ....
<processDefinition id='xyz'> <sshProcess>... <FileTransferDeploy='implicit'>
<copyProtocol>processDefault, scp, rcp</copyProtocol>
<sourceInfo prefix='/home/user'/> <destinationInfo prefix='/tmp' hostname='foo.org' username='smith' /> </FileTransferDeploy> </sshProcess>
</processDefinition> ...
|