cdocutils.nodes document q)q}q(U nametypesq}q(XgoalqNXtriggersqNXimplementationqNXuse case 06 - mn synchronizeq NXin more detailq NXsummaryq NXimplementation overviewq NXimplementation detailsq NX preconditionsqNXpost conditionsqNXactorsqNXuc06qXhistoryquUsubstitution_defsq}qUparse_messagesq]qUcurrent_sourceqNU decorationqNUautofootnote_startqKUnameidsq}q(hUgoalqhUtriggersqhUimplementationqh Uuse-case-06-mn-synchronizeqh Uin-more-detailq h Usummaryq!h Uimplementation-overviewq"h Uimplementation-detailsq#hU preconditionsq$hUpost-conditionsq%hUactorsq&hUuc06q'hUhistoryq(uUchildrenq)]q*(cdocutils.nodes target q+)q,}q-(U rawsourceq.X .. _UC06:Uparentq/hUsourceq0Xj/var/lib/jenkins/jobs/API_Documentation_trunk/workspace/api-documentation/source/design/UseCases/06_uc.txtq1Utagnameq2Utargetq3U attributesq4}q5(Uidsq6]Ubackrefsq7]Udupnamesq8]Uclassesq9]Unamesq:]Urefidq;h'uUlineq)q?}q@(h.Uh/hh0h1Uexpect_referenced_by_nameqA}qBhh,sh2UsectionqCh4}qD(h8]h9]h7]h6]qE(hh'eh:]qF(h heuh)qf}qg(h.Uh/h?h0h1hA}h2hCh4}qh(h8]h9]h7]h6]qi(hh]eh:]qjhauh)q}q(h.Uh/h?h0h1h2hCh4}q(h8]h9]h7]h6]qh!ah:]qh auh)q}q(h.Uh/h?h0h1h2hCh4}q(h8]h9]h7]h6]qh&ah:]qhauh author: <> SYNC ..> NOTIFY: <> @enduml h/hh0h1h2Uplantumlqh4}q(h6]h7]h8]h9]h:]UumlqX'@startuml images/06_uc.png actor "Coordinating Node" as CN actor "Member Node" as MN usecase "13. Authorization" as author usecase "06. Synchronize Metadata" as SYNC usecase "43. Notify Indexer" as NOTIFY CN -- SYNC MN -- SYNC SYNC ..> author: <> SYNC ..> NOTIFY: <> @endumluh)q}q(h.Uh/h?h0h1h2hCh4}q(h8]h9]h7]h6]qh$ah:]qhauh)r}r(h.Uh/h?h0h1h2hCh4}r(h8]h9]h7]h6]rhah:]r hauh}r?(h.X>Signal to CN issued by a MN that is requesting synchronizationr@h/j:h0h1h2hyh4}rA(h8]h9]h7]h6]h:]uhSignal to CN issued by a MN that is requesting synchronizationrCrD}rE(h.j@h/j>ubaubaubeubeubh>)rF}rG(h.Uh/h?h0h1h2hCh4}rH(h8]h9]h7]h6]rIh%ah:]rJhauh)rp}rq(h.Uh/h?h0h1h2hCh4}rr(h8]h9]h7]h6]rsh"ah:]rth auh> participant CN <> CN -> MN: listObjects( timePeriod ) activate CN MN -> CN: objectList loop "for each PID" CN -> CN: queue PID for synchronization end deactivate CN ... **Possibly Lengthy Delay** ... CN -> MN: getSystemMetadata(PID) activate CN activate MN MN -> CN: SystemMetadata deactivate MN alt New Objec activate CN CN -> CN: store System Metadata CN ->o]: Notify index of new System Metadata deactivate CN alt "Is Science Metadata or Resource Map" CN -> MN: get(PID) activate CN activate MN MN -> CN: object deactivate MN CN -> CN: store object CN ->o]: Notify index of new content deactivate CN end else "Existing Object" activate CN CN -> CN: update system metadata properties CN ->o]: Notify index of modified content deactivate CN end deactivate CN @enduml h/jph0h1h2hh4}r(h6]h7]h8]h9]h:]hX@startuml images/06_uc_a.png autonumber "[0] " participant MN <> participant CN <> CN -> MN: listObjects( timePeriod ) activate CN MN -> CN: objectList loop "for each PID" CN -> CN: queue PID for synchronization end deactivate CN ... **Possibly Lengthy Delay** ... CN -> MN: getSystemMetadata(PID) activate CN activate MN MN -> CN: SystemMetadata deactivate MN alt New Objec activate CN CN -> CN: store System Metadata CN ->o]: Notify index of new System Metadata deactivate CN alt "Is Science Metadata or Resource Map" CN -> MN: get(PID) activate CN activate MN MN -> CN: object deactivate MN CN -> CN: store object CN ->o]: Notify index of new content deactivate CN end else "Existing Object" activate CN CN -> CN: update system metadata properties CN ->o]: Notify index of modified content deactivate CN end deactivate CN @endumluh> participant CN <> MN -> CN: synchronize(PID) activate CN CN -> CN: queue PID for synchronization CN -> MN: ack deactivate CN ... **Possibly Lengthy Delay** ... CN -> MN: getSystemMetadata(PID) activate CN activate MN MN -> CN: SystemMetadata deactivate MN alt New Objec activate CN CN -> CN: store System Metadata CN ->o]: Notify index of new System Metadata deactivate CN alt "Is Science Metadata or Resource Map" CN -> MN: get(PID) activate CN activate MN MN -> CN: object deactivate MN CN -> CN: store object CN ->o]: Notify index of new content deactivate CN end else "Existing Object" activate CN CN -> CN: update system metadata properties CN ->o]: Notify index of modified content deactivate CN end deactivate CN @enduml h/jph0h1h2hh4}r(h6]h7]h8]h9]h:]hX}@startuml images/06_seq_a autonumber "[0] " participant MN <> participant CN <> MN -> CN: synchronize(PID) activate CN CN -> CN: queue PID for synchronization CN -> MN: ack deactivate CN ... **Possibly Lengthy Delay** ... CN -> MN: getSystemMetadata(PID) activate CN activate MN MN -> CN: SystemMetadata deactivate MN alt New Objec activate CN CN -> CN: store System Metadata CN ->o]: Notify index of new System Metadata deactivate CN alt "Is Science Metadata or Resource Map" CN -> MN: get(PID) activate CN activate MN MN -> CN: object deactivate MN CN -> CN: store object CN ->o]: Notify index of new content deactivate CN end else "Existing Object" activate CN CN -> CN: update system metadata properties CN ->o]: Notify index of modified content deactivate CN end deactivate CN @endumluh)r}r(h.Uh/h?h0h1h2hCh4}r(h8]h9]h7]h6]rh#ah:]rh auh> participant "Replication" as cn_replication << CN >> participant "Index Task Queue" as cn_index << Cluster >> participant "Indexer" as cn_indexer << CN >> participant "Object Store" as cn_objs << CN >> participant "System Metadata Map" as cn_sysmeta << Cluster >> participant "Sync Task Queue" as cn_queue << CN >> participant "Node Map" as cn_nodes << Cluster >> participant "Synchronization" as cn_sync << CN >> participant "Read API" as mn_read << MN >> 'm_crud -> c_notify: notify(session, PID, OBJECT_CREATED) 'c_notify -> cn_queue: addTask(SyncTask, node, PID) 'note right ' notification triggered by successful ' create operation on MN. 'end note 'm_rep -> c_notify: setReplicationStatus(token, PID, COMPLETE) 'c_notify -> cn_queue: addTask(SyncTask, node, PID) 'note right ' notification triggered by completed ' replication operation on MN. 'end note group populateSynchronizationQueue cn_sync -> cn_nodes: lock(node_id) activate cn_sync #D74F57 note right Start of synchronization process triggered by quartz end note activate cn_nodes #D74F57 cn_nodes --> cn_sync: OK cn_sync -> cn_nodes: getLastUpdateTime(node_id) activate cn_nodes #D74F57 note right The Node Map is a hash of (node_id, Node), use Hazelcast query Map.values() passing in SqlPredicate end note cn_nodes --> cn_sync: startTime deactivate cn_nodes cn_sync -> mn_read: listObjects(session, startTime, ...) activate mn_read #D74F57 cn_sync <-- mn_read: ObjectList deactivate mn_read loop for each PID cn_sync -> cn_sync: createTask(PID) note right Each SyncTask implements Callable and will be submitted to the ExecutorService to be executed on a CN end note cn_sync -> cn_queue: offer(taskid, SyncTask) activate cn_queue #D74F57 cn_queue --> cn_sync: OK deactivate cn_queue end note right adding SyncTasks should fail if PID is already in the list and the PID is NOT locked. This enables very recent updates to a PID to occur and be correctly managed by the overall synchronization process. end note cn_sync -> cn_nodes: setLastUpdateTime(node_id, startTime) activate cn_nodes #D74F57 cn_nodes --> cn_sync: OK deactivate cn_nodes cn_sync -> cn_nodes: unlock(node_id) deactivate cn_sync deactivate cn_nodes end group processSynchronizationQueue cn_sync -> cn_sync: entryAdded(EntryEvent) note left Synchronization implements EntryListener, monitors the Sync Queue for changes. end note activate cn_sync #D74F57 cn_sync -> cn_queue: poll(timeout) activate cn_queue #D74F57 note right Only one CN will win the poll and process the SyncTask end note cn_queue --> cn_sync: SyncTask deactivate cn_queue cn_sync -> cn_sync: ExecutorService.submit(SyncTask) cn_sync -> cn_sysmeta: lock(PID) activate cn_sysmeta #D74F57 group ProcessPID( PID ) cn_sync -> mn_read: getSystemMetadata(PID) mn_read -> cn_sync: SystemMetadata cn_sync -> mn_read: get( PID ) mn_read -> cn_sync: object cn_sync -> cn_sync: work note right Check for new object, updates to properties end note end cn_sync -> cn_objs: createOrUpdate(session, PID, object, SystemMetadata) activate cn_objs #D74F57 cn_indexer -> cn_indexer: entryAdded() activate cn_indexer #D74F57 note left Indexer implements EntryListener, monitors the System Metadata Map for inserts, updates, deletes. end note cn_indexer -> cn_indexer: createTask(PID) note left Each IndexTask implements Callable and will be submitted to the ExecutorService to be executed on the local CN end note cn_indexer -> cn_index: offer(taskid, IndexTask) activate cn_index #D74F57 cn_objs --> cn_sync: OK deactivate cn_objs cn_sync -> cn_sysmeta: unlock(PID) deactivate cn_queue deactivate cn_sysmeta cn_replication -> cn_replication: entryAdded() activate cn_replication #D74F57 note left Indexer implements EntryListener, monitors the System Metadata Map for inserts, updates, deletes. end note cn_replication -> cn_replication: createTask(PID) note left Each ReplTask implements Callable and will be submitted to the ExecutorService to be executed on the a CN end note cn_replication -> cn_repl: offer(taskid, ReplTask) activate cn_repl #D74F57 deactivate cn_queue end deactivate cn_sync @endumlh/jh0h1h2Ucommentrh4}r(U xml:spacerUpreserverh6]h7]h8]h9]h:]uh> participant "Replication" as cn_replication << CN >> participant "Index Task Queue" as cn_index << Cluster >> participant "Indexer" as cn_indexer << CN >> participant "Object Store" as cn_objs << CN >> participant "System Metadata Map" as cn_sysmeta << Cluster >> participant "Sync Task Queue" as cn_queue << CN >> participant "Node Map" as cn_nodes << Cluster >> participant "Synchronization" as cn_sync << CN >> participant "Read API" as mn_read << MN >> 'm_crud -> c_notify: notify(session, PID, OBJECT_CREATED) 'c_notify -> cn_queue: addTask(SyncTask, node, PID) 'note right ' notification triggered by successful ' create operation on MN. 'end note 'm_rep -> c_notify: setReplicationStatus(token, PID, COMPLETE) 'c_notify -> cn_queue: addTask(SyncTask, node, PID) 'note right ' notification triggered by completed ' replication operation on MN. 'end note group populateSynchronizationQueue cn_sync -> cn_nodes: lock(node_id) activate cn_sync #D74F57 note right Start of synchronization process triggered by quartz end note activate cn_nodes #D74F57 cn_nodes --> cn_sync: OK cn_sync -> cn_nodes: getLastUpdateTime(node_id) activate cn_nodes #D74F57 note right The Node Map is a hash of (node_id, Node), use Hazelcast query Map.values() passing in SqlPredicate end note cn_nodes --> cn_sync: startTime deactivate cn_nodes cn_sync -> mn_read: listObjects(session, startTime, ...) activate mn_read #D74F57 cn_sync <-- mn_read: ObjectList deactivate mn_read loop for each PID cn_sync -> cn_sync: createTask(PID) note right Each SyncTask implements Callable and will be submitted to the ExecutorService to be executed on a CN end note cn_sync -> cn_queue: offer(taskid, SyncTask) activate cn_queue #D74F57 cn_queue --> cn_sync: OK deactivate cn_queue end note right adding SyncTasks should fail if PID is already in the list and the PID is NOT locked. This enables very recent updates to a PID to occur and be correctly managed by the overall synchronization process. end note cn_sync -> cn_nodes: setLastUpdateTime(node_id, startTime) activate cn_nodes #D74F57 cn_nodes --> cn_sync: OK deactivate cn_nodes cn_sync -> cn_nodes: unlock(node_id) deactivate cn_sync deactivate cn_nodes end group processSynchronizationQueue cn_sync -> cn_sync: entryAdded(EntryEvent) note left Synchronization implements EntryListener, monitors the Sync Queue for changes. end note activate cn_sync #D74F57 cn_sync -> cn_queue: poll(timeout) activate cn_queue #D74F57 note right Only one CN will win the poll and process the SyncTask end note cn_queue --> cn_sync: SyncTask deactivate cn_queue cn_sync -> cn_sync: ExecutorService.submit(SyncTask) cn_sync -> cn_sysmeta: lock(PID) activate cn_sysmeta #D74F57 group ProcessPID( PID ) cn_sync -> mn_read: getSystemMetadata(PID) mn_read -> cn_sync: SystemMetadata cn_sync -> mn_read: get( PID ) mn_read -> cn_sync: object cn_sync -> cn_sync: work note right Check for new object, updates to properties end note end cn_sync -> cn_objs: createOrUpdate(session, PID, object, SystemMetadata) activate cn_objs #D74F57 cn_indexer -> cn_indexer: entryAdded() activate cn_indexer #D74F57 note left Indexer implements EntryListener, monitors the System Metadata Map for inserts, updates, deletes. end note cn_indexer -> cn_indexer: createTask(PID) note left Each IndexTask implements Callable and will be submitted to the ExecutorService to be executed on the local CN end note cn_indexer -> cn_index: offer(taskid, IndexTask) activate cn_index #D74F57 cn_objs --> cn_sync: OK deactivate cn_objs cn_sync -> cn_sysmeta: unlock(PID) deactivate cn_queue deactivate cn_sysmeta cn_replication -> cn_replication: entryAdded() activate cn_replication #D74F57 note left Indexer implements EntryListener, monitors the System Metadata Map for inserts, updates, deletes. end note cn_replication -> cn_replication: createTask(PID) note left Each ReplTask implements Callable and will be submitted to the ExecutorService to be executed on the a CN end note cn_replication -> cn_repl: offer(taskid, ReplTask) activate cn_repl #D74F57 deactivate cn_queue end deactivate cn_sync @endumlr r }r (h.Uh/jubaubj)r }r (h.X@startuml images/06_act.png (*) --> [processObject()] if "CNRead.getSystemMetadata(PID)" then if "Duplicate Object?" then -->[Yes] "Duplicate Content" --> "Update Replica Info" --> "Store System Metadata" --> "Notify watchers\nIndex, Replication" --> (*) else -->[No] "Error: Duplicate Identifier" --> "Notify MN" --> (*) endif else ->[FAIL] "New content" note right The content is unknown to DataONE so needs to be examined, its location recorded, and optionally retrieved and stored on the CN. end note if "Science Metadata \n or Resource Map?" then ->[yes] "Get object from MN" --> "Store object on CN" --> "Update Replica Info" else -->[no] "Update Replica Info" endif endif @endumlh/jh0h1h2jh4}r(jjh6]h7]h8]h9]h:]uh [processObject()] if "CNRead.getSystemMetadata(PID)" then if "Duplicate Object?" then -->[Yes] "Duplicate Content" --> "Update Replica Info" --> "Store System Metadata" --> "Notify watchers\nIndex, Replication" --> (*) else -->[No] "Error: Duplicate Identifier" --> "Notify MN" --> (*) endif else ->[FAIL] "New content" note right The content is unknown to DataONE so needs to be examined, its location recorded, and optionally retrieved and stored on the CN. end note if "Science Metadata \n or Resource Map?" then ->[yes] "Get object from MN" --> "Store object on CN" --> "Update Replica Info" else -->[no] "Update Replica Info" endif endif @endumlrr}r(h.Uh/j ubaubeubh>)r}r(h.Uh/h?h0h1h2hCh4}r(h8]h9]h7]h6]rhah:]rhauh)r?}r@(h.X:func:`MNRead.listObjects`rAh/j7h0h1h2U pending_xrefrBh4}rC(UreftypeXfuncUrefwarnrDU reftargetrEXMNRead.listObjectsU refdomainXpyrFh6]h7]U refexplicith8]h9]h:]UrefdocrGXdesign/UseCases/06_ucrHUpy:classrINU py:modulerJNuh)rk}rl(h.X:func:`MNRead.get`rmh/jdh0h1h2jBh4}rn(UreftypeXfuncjDjEX MNRead.getU refdomainXpyroh6]h7]U refexplicith8]h9]h:]jGjHjINjJNuh)r}}r~(h.X :func:`MNRead.getSystemMetadata`rh/jdh0h1h2jBh4}r(UreftypeXfuncjDjEXMNRead.getSystemMetadataU refdomainXpyrh6]h7]U refexplicith8]h9]h:]jGjHjINjJNuh)r}r(h.X:func:`CNStorage.create`rh/jdh0h1h2jBh4}r(UreftypeXfuncjDjEXCNStorage.createU refdomainXpyrh6]h7]U refexplicith8]h9]h:]jGjHjINjJNuh)r}r(h.X:func:`MNRead.listObjects`rh/jh0h1h2jBh4}r(UreftypeXfuncjDjEXMNRead.listObjectsU refdomainXpyrh6]h7]U refexplicith8]h9]h:]jGjHjINjJNuh)r}r(h.X:func:`MN_get`rh/jh0h1h2jBh4}r(UreftypeXfuncjDjEXMN_getU refdomainXpyrh6]h7]U refexplicith8]h9]h:]jGjHjINjJNuh)r}r(h.X:func:`MN_getSystemMetadata`rh/jh0h1h2jBh4}r(UreftypeXfuncjDjEXMN_getSystemMetadataU refdomainXpyrh6]h7]U refexplicith8]h9]h:]jGjHjINjJNuh(h.XmThis fairly simplistic approach should be enough to get things started. Not ideal, but should suffice to get some data moving around. To implement, there is need for a few new components - a queue, a place to store state information, the code that does the polling, the code that does the object retrieval, the worker thread code, and an overall controller service.r?h/jh0h1h2hyh4}r@(h8]h9]h7]h6]h:]uh)rE}rF(h.Uh/jh0h1h2hCh4}rG(h8]h9]h7]h6]rHh ah:]rIh auh)rZ}r[(h.X:func:`MN_health.ping`r\h/jSh0h1h2jBh4}r](UreftypeXfuncjDjEXMN_health.pingU refdomainXpyr^h6]h7]U refexplicith8]h9]h:]jGjHjINjJNuh(h%jFhhfhjhjh#jh jEh!hhh?h"jph&hjjh]hfh$hh'h?h(juUsubstitution_namesr?}r@h2h=h4}rA(h8]h6]h7]Usourceh1h9]h:]uU footnotesrB]rCUrefidsrD}rE(h]]rFhcaj]rGjah']rHh,auub.