.. _UC09:

Use Case 09 - Replicate MN to MN
--------------------------------

.. index:: Use Case 09, UC09, Replicate MN, replicate

Revisions
  View document revision history_.

Goal
  Replicate data from Member Node to Member Node.

Summary 
  Replication of content between Member Nodes (MN) is done to improve
  persistence of information (avoid data loss with loss of MN) and to improve
  accessibility (more choices for content retrieval can lower bandwidth
  requirements for any particular MN). The process of replication is controlled
  by a Coordinating Node (CN). 

  A full copy of science data and metadata is made during the replication
  process, so the original science metadata and data is copied to the
  recipient MN.

  Data is copied across as an exact copy. Science metadata may be transformed
  into another format if the original can not be supported. 

  It is important that the original metadata is preserved on the CNs, as
  it is always possible that the original MN where the content was published
  may go offline or be removed from the DataONE system.


Actors
  Two Member Nodes, one or more Coordinating Nodes

Preconditions 
  - Content is present on a Member Node

  - The content has been registered with the DataONE system (i.e. Member Node
    Synchronization has occurred for the data and metadata)

Triggers
 - A Coordinating Node detects that there are insufficient copies of the
   object(s) in question.

 - Information on a Member Node is altered

 - Capabilities of a Member Node changes (accepting more or less content)

 - Replication policy of DataONE or a Member Node changes

 - A Member Node goes offline


Post Conditions
  - Content is present on the recipient Member Node

  - System metadata is updated to reflect the change

  - Watchers are notified of the change

  - Member Node and Coordinating Node logs are updated


.. 
   @startuml images/09_uc.png
   usecase "12. Authentication" as authen
   package "DataONE"
     actor "Coordinating Node" as CN
     actor "Member Node 1" as MN1
     actor "Member Node 2" as MN2
     usecase "13. Authorization" as author
     usecase "01. Get Object" as GET
     usecase "04. Create object" as CREATE
     usecase "06. Synchronize content" as SYNC
     usecase "16. Log event" as log
     usecase "21. Notify subscribers" as subscribe
     CN -- CREATE
     CN -- SYNC
     MN1 -- CREATE
     MN2 -- GET
     MN1 -- GET
     GET ..> author: <<includes>>
     GET ..> authen: <<includes>>
     GET ..> log: <<includes>>
     GET ..> subscribe: <<includes>>
     CREATE ..> author: <<includes>>
     CREATE ..> log: <<includes>>
     CREATE ..> subscribe: <<includes>>
    @enduml

.. image:: images/09_uc.png

*Figure 1.* Use case diagram indicating the actors involved in the process of
Member Node replication.

..
  @startuml images/09_seq.png
  skinparam notebordercolor #AAAAAA
  skinparam notefontcolor #222222
  title Replicate an object between two Member Nodes\n\n
  participant "mnA : MNode" as mnA <<MNode>>
  participant "mnB : MNode" as mnB <<MNode>>
  participant "cnXReplService : ReplicationService" as cnXrepl <<CNode>>
  participant "cnZReplService : ReplicationService" as cnZ <<CNode>>
  
  == Replication Event  ==
  [-> cnXrepl : hzSystemMetadata.put(pid, sysmeta)
  activate cnXrepl #D74F57
  
  note right
    Synchronization services adds entry to 
    SystemMetadata map managed by Hazelcast,
    EntryEvent is fired
  end note
  
  cnXrepl -> cnXrepl : entryAdded(EntryEvent<pid, sysmeta>)
  cnXrepl -> cnXrepl : queueEvent(pid)
  cnXrepl -> cnXrepl : itemAdded(pid)
  note right
    hzReplicationEvents.offer(pid) is called to keep track of
    frequent hzSystemMetadata change events. When popped 
    off of the queue, identifiers are placed into the 
    hzHandledReplicationEvents set until the evaluation
    to create a task or not is complete. This prevents
    multiple task creation across CNs for the same event
  end note
  
  cnXrepl -> cnXrepl : createAndQueueTasks(pid)
  
  loop for each ReplicationTask
  cnXrepl -> cnXrepl : taskid = idGenerator.newId()
    note right
    Hazelcast.getIdGenerator("task-ids") has been
    called in ReplicationService constructor 
  end note
  cnXrepl -> cnXrepl : hzReplicationTaskQueue.put(taskid, task)
  note right
    Hazelcast distributes Replication
    Tasks to all CNs
  end note
  cnXrepl -> cnXrepl: setReplicationStatus(session, pid,\n    nodeRef, ReplicationStatus.QUEUED)
  deactivate cnXrepl
  end loop

  == Regular Replication Audit ==

   [-> cnXrepl : auditReplicas()
   activate cnXrepl #D74F57

   note right
     Query the Metacat database to receive a short list of 
     tasks which have not had their checksums verified in 
     greater than 2 months
   end note
   cnXrepl ->] : getAuditShortList()
   cnXrepl <--] : shortList

   note over cnXrepl
   Bin the tasks by NodeReference for 
   bulk processing my MNAuditTask
   end note
   loop for each Identifier in shortList
   cnXrepl -> cnXrepl : hzSystemMetadata.get(Identifier).getReplicaList()
   loop for each Replica in List<Replica>
   alt if Replica.replicaVerified is older than 2 months
   alt if auditTaskMap.containsKey(Replica.replicaMemberNode)
   cnXrepl -> cnXrepl : auditTaskMap.get(Replica.replicaMemberNode).add(Identifier)
   else else
   cnXrepl -> cnXrepl : auditTaskMap.put(Replica.replicaMemberNode, new List<Identifier>())\nauditTaskMap.get(Replica.replicaMemberNode).add(Identifier)
   end
   end
   end loop
   end loop
   loop for each NodeReference in auditTaskMap.keySet()
   cnXrepl -> cnXrepl : taskid = idGenerator.newId()
   cnXrepl -> cnXrepl : auditTask = MNAuditTask(NodeReference, List<Identifier>)
   cnXrepl -> cnXrepl : hzAuditTaskQueue.put(taskid, auditTask)
   end loop
   note right
     Hazelcast distributes Audit
     Tasks to all CNs
   end note
   deactivate cnXrepl

  == Process Replication Tasks ==  
          
  cnZ -> cnZ: itemAdded(task)
  activate cnZ #D74F57
  cnZ -> cnZ: hzReplicationTaskQueue.poll()
  note left
    Each ReplicationService polls the replication task
    queue when events are fired. The first to get the lock
    handles the task.  The others will also get the lock,
    but during evaluation, will not create a task because
    of the new state of the replica in the system metadata 
  end note

  cnZ -> cnZ: ExecutorService.submit(task)
  activate cnZ #DarkSalmon
  cnZ -> cnZ: replicationTask.call(pid)
  cnZ -> mnB: replicate(cnZSession, mnASession, pid)
  activate mnB #D74F57
  mnB --> cnZ: replicateResponse
  deactivate cnZ 
  cnZ -> cnZ: setReplicationStatus(session, pid,\n    nodeRef, ReplicationStatus.REQUESTED)
  
  note left
   Object's system metadata get's updated
  end note
  
  cnZ -> cnZ: updateSystemMetadata(pid)
  cnZ --> cnZ: statusResponse
  
  mnB -> mnA: getReplica(mnBSession, pid)
  deactivate mnB
  activate mnA #D74F57
  mnA -> cnZ: isNodeAuthorized(mnASession, mnBSubject, pid)
  cnZ --> mnA: authorizationResponse
  mnA --> mnB: replicaBytes
  deactivate mnA
  activate mnB #D74F57
  
  mnB -> cnZ: setReplicationStatus(session, pid,\n    nodeRef, ReplicationStatus.COMPLETE)
  deactivate mnB
  cnZ -> mnA: mnA.getChecksum(pid)
  activate mnA #D74F57
  mnA --> cnZ : checksum
  
  note right
  Object's system metadata get's updated
  end note
  
  deactivate mnA
  cnZ -> cnZ: updateSystemMetadata(pid)
  cnZ --> cnZ: statusResponse
  deactivate cnZ

 
  deactivate cnZ
  deactivate cnZ

  @enduml


.. image:: images/09_seq.png


..
  @startuml images/09_seq_audit_1.png
  skinparam notebordercolor #AAAAAA
  skinparam notefontcolor #222222
  title Audit replicas on Member Nodes.\n\n
  participant "ReplicaAuditQuartzJob" as cnQuartz <<CN-1>> 
  participant "ReplicaAuditService" as cnAudit <<CN-1>>
  participant "ReplicaAuditDao" as auditDao <<CN-1>>
  participant "DistributedExecutorService" as cnZ <<CN-Z>>

 == Replication Audit ==
   
   note left of cnQuartz
    Quartz scheduled execution 
    of replica audit processing.
   end note
   
     cnQuartz -> cnAudit : auditReplicas()
   activate cnAudit #D74F57

     cnAudit -> cnAudit : aquireReplicaAuditingLock();

   note over auditDao
     Query the Metacat replica system metadata replica status 
     table for pids of replica records which have not been audited 
     for the audit period length of time.  May be a paged result.
   end note
   cnAudit -> auditDao : getReplicasByDate()
   cnAudit <-- auditDao : List<Identifier> pidsToAudit

   loop for each Identifier pid in pidsToAudit
     note over cnAudit
       Create ReplicaAuditTasks if not already being handled.  
       Tasks can be configured to batch several Identifiers
       into tasks or to create a task for each Identifier.
     end note
       alt if pid NOT in hzProcessingAuditIdentifiers
         cnAudit -> cnAudit : replicaAuditTasks.add(new ReplicaAuditTask(pid));
       end
       alt if replicaAuditTasks.size => replicaBatchSize
         note over cnAudit
             accumulatedTaskFuture List holds futures generated on this iteration.
             futuresToHandle holds futures generated on last iteration.
             Assumption is futures provided to distributed execution service on 
             previous iteration should have executed asynch already.
         end note
         cnAudit -> cnAudit : futuresToHandle = accumulatedTaskFutures;  // futures from last batch
         cnAudit -> cnAudit : accumulatedTaskFutures.clear();
         loop for each ReplicaAuditTask auditTask in replicaAuditTasks
           note over cnZ
             Hazelcast distributed executor service used to 
             distribute audit tasks among CN nodes.
           end note
           cnAudit -> cnZ : submit(auditTask);
           cnAudit -> cnAudit : hzProcessingAuditIdentfiers.removeAll(auditTask.pid);
           cnZ --> cnAudit : Future replicaTaskFuture;
           cnAudit -> cnAudit : accumulatedTaskFutures.add(replicaTaskFuture);
         end loop
         cnAudit -> cnAudit : replicaAuditTasks.clear(); // prepare for next batch tasks.
         loop for each Future replicaTaskFuture in futuresToHandle
           cnAudit -> cnAudit : handleResult(replicaTaskFuture.get());
         end loop
       end
   end loop
   note over cnAudit
       Handle futures from last batch of audit tasks
   end note
   loop for each Future replicaTaskFuture in accumulatedTaskFutures
       cnAudit->cnAudit: handleResult(replicaTaskFuture);
   end loop
   deactivate cnAudit

  @enduml


*Figure 2.* Interactions for use case 09. The diagram describes transfer of a
single object from MN_A to MN_B as directed by a CN. It is assumed that the
object does not exist on MN_A and the object has been identified as requiring
replication by the CN checking its status in the system metadata. The end 
state of a replicate operation is that content is available on the MN, the 
MN has notified the CN of such, and the CN will schedule a synchronize 
operation that will verify the copy as legitimate.

Implementation Details
~~~~~~~~~~~~~~~~~~~~~~

Replication of objects between Member Nodes (MN) within the DataONE system is
managed by the Coordinating Nodes (CN). CNs are aware of the replication
policies of each object (through system metadata) and the capabilities of each
MN (through node capabilities), and populate a distributed queue of replication
tasks to be processed by all of the CNs.

Replication can be initiated in three ways:

 1) CN synchronization: harvesting of system and science metadata 
 2) CN timed replication: periodic sweep of all system objects 
 3) MN event-based replication: MN sends replication request to a 
    CN (not implemented)

Replication Events
~~~~~~~~~~~~~~~~~~

The CN's maintain a synchronized, distributed Hazelcast Map of system
metadata (hzSystemMetadata). This map reflects the current state of the
DataONE system's object store. This in-memory map is also bound to the backing
Metacat object store via the Hazelcast MapStore and MapLoader interfaces. The
hzSystemMetadata map serves as an object-level locking mechanism across CNs, and
any service that will make changes to an object's system metadata will need to
gain a lock on the given object identifier in the map. The hzSystemMetadata map
is set to be persisted (backed-up) on 3 CNs.

As the CN Synchronization Service becomes aware of create, update, and delete
events for MN objects through harvesting, it updates the hzSystemMetadata map. 
The Replication service monitors this map for entry changes, and builds a list
of ReplicationTask objects for each changed identifier in the map. This is done by
calling ReplicationService.createReplicationTaskList(pid). The Replication
Service evaluates the ReplicationPolicy of the given object's system metadata,
evaluates the capabilities and availability of the potential target MNs, and
creates a ReplicationTask for each MN replication target up to the
numberOfReplicas in the object's ReplicationPolicy. Each ReplicationTask is
listed based on priority. The Replication Service then iterates through the
returned task list and populates the hzReplicationTasks queue with the ordered
tasks. Each item offered to the queue consists of a task identifier and a
ReplicationTask.

.. Note::
    TODO: Describe the CN time-based population of the replication task queue
    that periodically does a full sweep of the object store.
    
.. Note::
    TODO: Describe the MN-based replication via a CNReplication API request
    (not implemented)


Processing Replication Tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

As the hzReplicationTasks queue is populated, each CN's Replication Service
receives entry added events and polls the queue with a short timeout to submit
new tasks to process. A CN Replication Service instance's entryAdded() method is
fired, and it in turn polls the task queue and submits the ReplicationTask
to the cluster-wide Executor Service.  One of the CN's will execute the task by
calling the ReplicationTask.call() method.  This call initiates MN replication.
The CN calls replicate() on the target MN (mnB), passing in the cnZ token
(cnZToken), the originating node reference (mnA), and the identifier of the
object to be replicated (pid). This call triggers the MN (mnB) to call
getReplica() on the originating MN (mnA), passing in mnB token
(mnBToken) and the identifier of the object to be replicated (pid). In turn,
the CN updates the system metadata for the object, setting the ReplicationStatus
to REQUESTED after gaining the lock on the object. The lock is immediately
released.

Before responding to getReplica(), mnA checks for replication authorization by
calling isNodeAuthorized() on the CN, passing in the mnA token
(mnAToken), the Subject listed in the mnBToken (mnBSubject), the object
identifier (pid), and the desired replication permission
(replicationPermission). The Replication Service looks up Subject in the
LDAP replication group, and returns the response.

Upon successful authorization, mnA replicates the object (replicaBytes) to the
target MN (mnB). mnB in turn sends a successful replication response to the CN
(replicateResponse). The CN Replication Service once again updates the system
metadata for the object after gaining a lock in the hzSystemMetadataMap. The
lock is immediately released, and the statusResponse is sent to the CN.

Note (2011.01.07 CWB): This simple authentication scheme will not work on
member nodes that have their own access control rules. In this scheme, each
member node will need to have knowledge of the administrative (or replication)
credentials for each of the other member nodes. The CN needs to handle the
login actions for both of the MNs involved and send an authenticated token
from MN_A to MN_B so that it can use that credential to successfully get the
document. This is only the case if the document on MN_A is read protected. If
it is public, not token is needed.

Note that the call setReplicationStatus with a value of *COMPLETE* is 
functionally equivalent to the *notify(objectCreated, identifier)* call 
indicated in use case 06.


Replication Auditing
~~~~~~~~~~~~~~~~~~~~



.. image:: images/09_seq_audit_1.png

..
  @startuml images/09_seq_audit_2.png
    skinparam notebordercolor #AAAAAA
    skinparam notefontcolor #222222
    title Replica Audit Task Procesing.\n\n
  participant "ReplicaAuditService" as cnAudit <<CN-1>>
    participant "DistributedExecutorService" as cnZ <<CN-Z>>
    participant "ReplicaAuditTask" as auditTask <<CN-Z>>
    participant "d1Client.CNode : CNodeClient" as cNodeClient <<CN-Z>>
  participant "d1Client.MNode : MNodeClient" as mnA <<MN-A>>
  participant "ReplicationManager" as repManager <<CN-Z>>

 == Process replication audit. ==
  cnAudit -> cnZ : submit(replicaAuditTask);
    cnZ -> cnZ: execute(replicaAuditTask);
    cnZ -> auditTask : call()
    activate auditTask #D74F57
  
  loop for each Identifier pid in auditTask.pids
      auditTask -> cNodeClient : getSystemMetadata(pid);
      cNodeClient --> auditTask : sysMetadata;
      auditTask -> auditTask : verifiedReplicaCount = 0;
      loop for each Replica replica in sysMetadata.getReplicaList()
        auditTask -> auditTask : mnA = getMNodeClient(replica.getReplicaMemberNode());
        auditTask -> mnA : replicaChecksum = getChecksum(pid, sysMeta.getChecksum());
        alt if replicaChecksum == sysMetadata.getChecksum()
          auditTask -> auditTask : verfiedReplicaCount++;
          auditTask -> auditTask : replica.updateVerifiedDate(today);
          auditTask -> cNodeClient : updateReplicationMetadata(pid, replica);
        else else
          auditTask -> auditTask : replica.setReplicationStatus(INVALID);
          auditTask -> cNodeClient : updateReplicationMetadata(pid, replica);
          auditTask -> repManager : createAndQueueTasks(pid);
        end
      end loop
      alt if sysMetadata.getReplicationPolicy().getNumberReplicas !== verifiedReplicaCount
        auditTask -> repManager : createAndQueueTasks(pid);
      end
    end loop
  
  auditTask --> cnZ : String audit result
  deactivate auditTask
  
  cnZ --> cnAudit : Future - audit results including exceptions

  @enduml
.. image:: images/09_seq_audit_2.png



.. 
   @startuml images/09_uc_audit_components.png

  title CN Replication Auditing Components
    
  node "CN Audit Process(D1 Processing)" {
    frame "CN Replication Auditor" {
      frame "CN Common" {
        [ReplicationDao]
        }
        frame "CN Replication" {
          [Replication Service]
      }      
        [CNAuditLogClient] --> [cn-audit-index] : add/remove log entries
      }
  }
  
  database "psql" {
    frame "metacat" {
      [smreplicationstatus] --> [ReplicationDao] : select audit candidates
    }
  }
  
  node "CN Service" {
    [REST Service] <-- [Replication Service] : update replica status/verified date
  }
  
  cloud "hazelcast" {
    frame "d1-processing" {
      [Replication Event Queue] <-- [Replication Service] : submit pids to replication
    }
  }
  
  cloud "CN Audit Log Cloud" as auditCloud {
  
    node "Zookeeper" {
      [index config] --> [cn-audit-index] : aquire cloud config
      [leader election] --> [cn-audit-index] : coordinate leader
      [replication] --> [cn-audit-index] : coordinate cloud
    }
    node "jetty" {
      node "Solr 4" {
        frame "cn-audit-index"{
        }
      }
    }
  }
  
  
  
  note "Each CN will be running its own instance of the zookeeper and jetty/solr.\n This forms the cloud cluster." as N1
  
  @enduml
   
   
   
.. image:: images/09_uc_audit_components.png
   

.. _history: https://redmine.dataone.org/projects/d1/repository/changes/documents/Projects/cicore/architecture/api-documentation/source/design/UseCases/09_uc.txt